Source code for liesl.buffers.ringbuffer

"""
Ringbuffer
----------
"""
from liesl.streams.convert import inlet_to_dict
from pylsl import StreamInlet, StreamInfo
import numpy as np
from numpy import ndarray
from typing import Union, Tuple
import threading
import time


class SimpleRingBuffer:
    def __init__(self, rowlen: int, columnlen: int, verbose=False) -> None:
        """a simple ringbuffer

        args
        ----
        rowlen: int
            the number of samples
        columnlen:int
            the number of channels
        verbose:bool {False}
            how verbose the buffer should be
            
        Example
        -------
            rb = SimpleRingBuffer(1000, 2)

        """
        self.max_row = int(rowlen)
        self.max_column = int(columnlen)
        self.verbose = verbose
        self.reset()

    def reset(self):
        "empty the internal buffer"
        self.buffer = np.empty((0, self.max_column))

    def put(self, chunk: Union[list, np.ndarray], transpose=False):
        """append a chunk of data and delete old samples
        
        should be faster than the implementation using np.roll according to 
        https://gist.github.com/cchwala/dea03fb55d9a50660bd52e00f5691db5
        
        """
        chunk = np.atleast_2d(chunk)
        if self.verbose:
            if chunk.shape[0] > self.max_row:
                print("Ringbuffer Overflow")

        if transpose:
            chunk = chunk.T

        buffer = np.atleast_2d(np.concatenate((self.buffer, chunk), axis=0))
        if buffer.shape[0] > self.max_row:
            self.buffer = buffer[-self.max_row :, :]
        else:
            self.buffer = buffer

    def get(self) -> np.ndarray:
        "return a copy of the internal buffer"
        return self.buffer.copy()

    @property
    def is_full(self) -> bool:
        "whether the internal buffer is full or not"
        return self.buffer.shape[0] == self.max_row

    @property
    def shape(self):
        "the current size of the internal buffer"
        return self.buffer.shape

    @property
    def max_shape(self):
        "the maximal size of the internal buffer"
        return (self.max_row, self.max_column)


# -------------------------------------------------------------------------------


[docs]class RingBuffer(threading.Thread): """A ringbuffer subscribed to an LSL outlet The ringbuffer automatically updating itself as a thread. args ---- streaminfo: StreamInfo identifies the StreamOutlet and will be connected once the buffer started duration_in_ms: float the length of the ringbuffer in ms. is automatically converted into samples based on the nominal sampling rate of the LSL Outlet. All data older than duration_in_ms (normalized by expected samples) will be discarded verbose:bool how verbose the ringbuffer should be. defaults to False Example:: sinfo = get_streaminfos_matching(name="Liesl-Mock-EEG")[0] # the mock EEG has a sampling rate of 100 and 8 channels rb = RingBuffer(streaminfo=sinfo, duration_in_ms=1000) rb.await_running() time.sleep(1) # wait a second to collect sufficient data chunk, tstamps = rb.get() assert chunk.shape == [100, 8] # for 1s of data and 8 channels """ def __init__( self, streaminfo: StreamInfo, duration_in_ms: float = 1000, verbose: bool = False, ) -> None: threading.Thread.__init__(self) self.streaminfo = streaminfo fs = streaminfo.nominal_srate() if fs == 0: # pragma no cover self.fs = 1000 print( "Irregular sampling rate. Assuming fs=1000 and " + "convert duration_in_ms into duration_in_samples" ) else: self.fs = fs max_row = int(duration_in_ms * (self.fs / 1000)) max_column = int(streaminfo.channel_count()) self.buffer = SimpleRingBuffer( rowlen=max_row, columnlen=max_column, verbose=verbose ) self.tstamps = SimpleRingBuffer(rowlen=max_row, columnlen=1, verbose=verbose) self.bufferlock = threading.Lock() self._is_running = threading.Event()
[docs] def reset(self): "clear the internal buffer and start collecting fresh" self.bufferlock.acquire() self.buffer.reset() self.bufferlock.release()
[docs] def get_data(self) -> ndarray: """get only the current data without timestamps returns ------- chunk: ndarray the data (usually in samples x channels) """ with self.bufferlock: buffer = self.buffer.get() return buffer
[docs] def get(self) -> Tuple[ndarray, ndarray]: """get the current data with timestamps returns ------- chunk: ndarray the data (usually in samples x channels) tstamps: ndarray the timestamps for each sample """ with self.bufferlock: buffer = self.buffer.get() tstamps = self.tstamps.get() tstamps += self.offset return buffer, tstamps
@property def shape(self) -> Tuple[int, int]: "the current size of the data currently in the ringbuffer" return self.buffer.shape @property def max_shape(self) -> Tuple[int, int]: "the maximal size of the ringbuffer" return self.buffer.max_shape @property def is_full(self) -> bool: "whether the ringbuffer is full" return self.buffer.shape[0] == self.buffer.max_shape[0]
[docs] def stop(self): "stop the subscription to the outlet" self.is_running = False self.join()
@property def is_running(self): "whether the ringbuffer is receiving new data or not" return self._is_running.is_set() @is_running.setter def is_running(self, state: bool): if state: self._is_running.set() else: self._is_running.clear()
[docs] def await_running(self): "block until the buffer has subscribed to the LSL outlet" print("[", end="") try: self.start() except RuntimeError: # pragma no cover pass while not self.is_running: time.sleep(0.1) print(".", end="") print("]")
def run(self): """""" stream = StreamInlet( self.streaminfo ) # create the inlet locally so it can be properly garbage collected self.info = inlet_to_dict(stream) self.offset = stream.time_correction() self.is_running = True while self.is_running: chunk, tstamp = stream.pull_chunk() if chunk: with self.bufferlock: # to prevent writing while reading self.buffer.put(chunk) self.tstamps.put(tstamp, transpose=True) else: time.sleep(0.001) # can prevent hiccups when run in a repl