Source code for liesl.buffers.blockbuffer

"""
BlockBuffer
-----------
"""
import numpy as np
from collections import deque
from typing import Union
from numpy import ndarray

# %%
class RawBlockBuffer:
    def __init__(self, rowlen: int, columnlen: int) -> None:
        self.buffer = np.empty((rowlen, columnlen))
        self.buffer.fill(None)
        self.head = iter(range(0, rowlen, 1))

    def append(self, sample: ndarray):
        "append a column of data but raise a StopIteration when full"
        head = next(self.head)
        self.buffer[head, :] = sample


[docs]class SimpleBlockBuffer: """Convert incoming data into blocks. Put new samples or chunks using :meth:`~.handle_sample` or :meth:`~.handle_chunk`. The SimpleBlockBuffer takes the data, and only publishes full blocks. These blocks are available by calling :meth:`~.get` args ---- max_samples:int how many samples (rowlen) each block should have channel_count:int how many channels (columnlen) each block should have max_queued: Union[None, int] Whether to queue as many blocks as possible (None) or drop if more than max_queued Example:: blockbuffer = SimpleBlockBuffer(50, 1) chunk = np.arange(0, 200) blockbuffer.handle_chunk(chunk) block = blockbuffer.get() assert block[0] == 0.0 # the first block starts with the first sample block = blockbuffer.get() assert block[0] == 50.0 # the second block starts with the 50th sample assert block.shape == [50, 1] # as defined during initalization """ def __init__(self, max_samples=50, channel_count=64, max_queued=None): self.max_samples = max_samples self.channel_count = channel_count self.max_queued = max_queued self.reset()
[docs] def reset(self): "clear all buffers and the queue" self.queue = deque(maxlen=self.max_queued) self.buffer = RawBlockBuffer( rowlen=self.max_samples, columnlen=self.channel_count ) self.last_block = None
[docs] def handle_sample(self, sample): "try to append new samples to the block" try: self.buffer.append(sample) except StopIteration: # when the RawBlockBuffer is full self.queue.append(self.buffer.buffer) self.buffer = RawBlockBuffer(self.max_samples, self.channel_count) self.buffer.append(sample)
[docs] def handle_chunk(self, chunk): """iteratively append all samples of a chunk to the block args ---- chunk:np.ndarray a new chunk of data to be processed columnwise """ for sample in chunk: self.handle_sample(sample)
[docs] def get_last(self): "return the last valid block" return self.last_block
[docs] def get(self) -> Union[None, ndarray]: "return none or a block if there is one waiting in the queue" try: block = self.queue.popleft() self.last_block = block return block.copy() except IndexError: return None