Source code for liesl.buffers.response

# -*- coding: utf-8 -*-
"""
Responses
---------
"""
from typing import Tuple, List
import numpy as np
from numpy import ndarray


[docs]class Response: """A reponse to a trigger args ---- chunk:np.ndarray a data chunk as received from pylsl.StreamInlet.pull_chunk() or :meth:`liesl.buffers.ringbuffer.RingBuffer.get` should be 2-dimensional (samples x channels) tstamps:np.ndarray the timestamps of this data chunk as e.g. received from pylsl.StreamInlet.pull_chunk() or :meth:`liesl.buffers.ringbuffer.RingBuffer.get` can be 1-dimensional (timepoints) or 2 dimensional (timepoints x 1) onset_in_ms:float the timestamp of the trigger as e.g. received from pylsl.StreamInlet.pull_sample() fs:int sampling rate in Hz, defaults to 1000 pre_in_ms:float how many milliseconds to use before the trigger post_in_ms:float how many milliseconds to use after the trigger ep_window_in_ms:Tuple[float, float] the expected timeframe when the evoked potential starts and stops. defaults to [15,50] """ def __init__( self, chunk: np.ndarray, tstamps: np.ndarray, onset_in_ms: float, fs: int = 1000, pre_in_ms: float = 30, post_in_ms: float = 75, ep_window_in_ms: Tuple[float, float] = (15.0, 50.0), ): if tstamps.ndim == 1: tstamps = np.atleast_2d(tstamps).T if chunk.ndim == 1: raise ValueError("Chunk must be 2D of form (samples x channels)") self.chunk = chunk self.tstamps = tstamps self.onset_in_ms = onset_in_ms self.fs = fs self.pre_in_ms = pre_in_ms self.post_in_ms = post_in_ms self.ep_window_in_ms = ep_window_in_ms @property def onset(self) -> int: onset = abs((self.onset_in_ms - self.tstamps)[:, 0]).argmin() return onset @property def pre(self) -> int: pre = int(self.pre_in_ms * 1000 / self.fs) return self.onset - pre @property def post(self) -> int: post = int(self.post_in_ms * 1000 / self.fs) return self.onset + post @property def ep_window(self) -> List[int]: ep_window = [self.onset + int(m * 1000 / self.fs) for m in self.ep_window_in_ms] return ep_window
[docs] def get_trace( self, channel_idx: int = 0, baseline_correction: bool = True ) -> ndarray: """Cuts a chunk of data Based on the given onset this function cuts out a trace for one or more (if an slice is given) channel. It does a baseline correction by default. args ---- channel_idx:int which channel to use for calculation of latency. returns ------- trace: np.ndarray Contains the trace of the data cut from pre to post around the onset. Type is a ndarray containing (pre+post) samples and shape (samples, 1) """ response = self.chunk[self.pre : self.post, channel_idx].copy() if baseline_correction: bl = self.chunk[self.pre : self.onset + 1, channel_idx] response -= bl.mean() return response
[docs] def get_latency(self, channel_idx: int = 0) -> List[float]: """the latency of the MEP in a specific channel Based on the time of TMS given during initialization, and the hard-coded pre_in_ms, post_in_ms and ep_window_in_ms calculates the latency args ---- channel_idx:int which channel to use for calculation of latency returns ------- vpp: List[float] the latency in ms relative to the TMS pulse of the negative and the positive peak """ bl = self.chunk[self.pre : self.onset, channel_idx].mean(0) data = self.chunk[self.ep_window[0] : self.ep_window[1] + 1, channel_idx] - bl peakpos = [data.argmin(), data.argmax()] peakpos = [p + self.ep_window[0] for p in peakpos] peakpos_in_ms = [float(p - self.onset) * 1000 / self.fs for p in peakpos] return peakpos_in_ms
[docs] def get_vpp(self, channel_idx: int = 0) -> ndarray: """the peak-to-peak amplitude of the MEP in a specific channel Based on the time of TMS given during initialization, and the hard-coded pre_in_ms, post_in_ms and ep_window_in_ms calculates the Vpp args ---- channel_idx:int which channel to use for calculation of Vpp returns ------- vpp:np.ndarray the peak-to-peak amplitude in native units of the data chunk """ bl = self.chunk[self.pre : self.onset, channel_idx].mean(0) data = self.chunk[self.ep_window[0] : self.ep_window[1] + 1, channel_idx] - bl peakpos = [data.argmin(), data.argmax()] self.peakpos = [p + self.ep_window[0] for p in peakpos] self.peakpos_in_ms = [ p * 1000 / self.fs + self.ep_window_in_ms[0] + self.pre_in_ms for p in peakpos ] self.peakval = [data.min(), data.max()] return data.max() - data.min()
[docs] def get_xaxis( self, stepsize: float = 5 ) -> Tuple[ndarray, List[str], Tuple[int, int]]: """get xaxis objects for plotting args ---- stepsize: float the size of the stepos between xticks returns -------- xticks:ndarray an array of xticks xticklabels:List[str] a list fo xticklabels xlim: Tuple[int, int] the limits of the xaxis """ if stepsize <= 0: raise ValueError("Stepsize must be larger than 0") xticks = np.arange(0, self.post - self.pre, stepsize * 1000 / self.fs) xlim = (0, self.post - self.pre) xticklabels = [ "{0:.0f}".format(x) for x in np.arange( -self.pre_in_ms * 1000 / self.fs, (self.post_in_ms + stepsize) * 1000 / self.fs, stepsize, ) ] return xticks, xticklabels, xlim
[docs] def as_json(self, channel_idx: int = 0) -> str: """encodes the response as json args ---- channel_idx:int which channel to use for calculation of MEP parameters returns ------- msg:str a json-encoded dictionary to be e.g. sent wwith an LSL MarkerOutlet """ bl = self.chunk[self.pre : self.onset + 1, channel_idx].mean(0) data = self.chunk[self.ep_window[0] : self.ep_window[1] + 1, channel_idx] - bl mi, ma = [data.min(), data.max()] max_latency = self.get_latency(channel_idx)[0] msg = ( '{"mepmaxtime": ' + f"{max_latency:.2f}, " + '"mepamplitude": ' + f"{ma-mi:.2f}, " + '"mepmin": ' + f"{mi:.2f}, " + '"mepmax": ' + f"{ma:.2f}" + "}" ) return msg
[docs]class MockResponse(Response): """mocks a response for testing and development""" def __new__(cls): tstamps = np.linspace(0, 1, 1001) chunk = np.arange(0, 1000, dtype=float) chunk = np.repeat(np.atleast_2d(chunk).T, 8, 1) onset_in_ms = tstamps[500] return Response( chunk=chunk, tstamps=tstamps, onset_in_ms=onset_in_ms, fs=1000, pre_in_ms=50, post_in_ms=50, ep_window_in_ms=[15, 50], )