liesl API
The main API can be accessed directly from liesl using e.g.
import liesl
stream = liesl.print_available_streams()
Stream Handling
Detect and print information about available streams.
Find and open streams
-
open_stream
(**kwargs)[source] get only a single streaminlets which matches kwargs
raises a ConnectionError if more than one match is found
- Parameters
**kwargs – keyword arguments to identify the desired stream
- Returns
stream – a single StreamInlet matching the kwargs.
- Return type
StreamInlet
Example:
import liesl stream = liesl.open_stream(name="Liesl-Mock-EEG")
-
open_streaminfo
(**kwargs)[source] get only a single StreamInfo which matches kwargs
raises a ConnectionError if more than one match is found
- Parameters
**kwargs – keyword arguments to identify the desired stream
- Returns
sinfo – a single StreamInfo matching the kwargs.
- Return type
StreamInfo
Example:
import liesl sinfo = liesl.open_streaminfo(name="Liesl-Mock-EEG")
-
print_available_streams
()[source] prints all available streams in their xml form
Example:
import liesl liesl.print_available_streams()
-
get_streams_matching
(**kwargs)[source] get all streaminlets matching kwargs
- Parameters
**kwargs – keyword arguments to identify the desired stream
- Returns
streams – a list of StreamInlets matching the kwargs
- Return type
List[StreamInlet]
Example:
import liesl streams = liesl.get_streams_matching(name="Liesl-Mock-EEG")
Convert streams
-
inlet_to_dict
(inlet)[source] convert inlet information into a dictionary
- Parameters
inlet (pylsl.StreamInlet) – the inlet to convert into a dictionary
- Returns
output – a dictionary of key information parsed from the xml
- Return type
dict
Example:
import liesl stream = liesl.open_stream() d = liesl.inlet_to_dict(stream)
-
get_channel_map
(inlet)[source] convert inlet information into a ChannelMapping
- Parameters
inlet (pylsl.StreamInlet) – the inlet to convert into a dictionary
- Returns
output – a dictionary mapping channel names to indices
- Return type
Dict[str, int]
Example:
import liesl stream = liesl.open_stream() chanmap = liesl.get_channel_map(stream)
localhost
-
localhostname
= 'build-16524033-project-533603-pyliesl' the name of the local machine
-
localhost
= '127.0.0.1' the localhost
-
localip
= '172.17.0.2' the local ip address
Buffers
Ringbuffer
-
class
RingBuffer
(streaminfo, duration_in_ms=1000, verbose=False)[source] A ringbuffer subscribed to an LSL outlet
The ringbuffer automatically updating itself as a thread.
- Parameters
streaminfo (StreamInfo) – identifies the StreamOutlet and will be connected once the buffer started
duration_in_ms (float) – the length of the ringbuffer in ms. is automatically converted into samples based on the nominal sampling rate of the LSL Outlet. All data older than duration_in_ms (normalized by expected samples) will be discarded
verbose (bool) – how verbose the ringbuffer should be. defaults to False
Example:
sinfo = get_streaminfos_matching(name="Liesl-Mock-EEG")[0] # the mock EEG has a sampling rate of 100 and 8 channels rb = RingBuffer(streaminfo=sinfo, duration_in_ms=1000) rb.await_running() time.sleep(1) # wait a second to collect sufficient data chunk, tstamps = rb.get() assert chunk.shape == [100, 8] # for 1s of data and 8 channels
-
get
()[source] get the current data with timestamps
- Return type
Tuple
[ndarray
,ndarray
]- Returns
chunk (ndarray) – the data (usually in samples x channels)
tstamps (ndarray) – the timestamps for each sample
-
get_data
()[source] get only the current data without timestamps
- Returns
chunk – the data (usually in samples x channels)
- Return type
ndarray
-
property
is_full
whether the ringbuffer is full
- Return type
bool
-
property
is_running
whether the ringbuffer is receiving new data or not
-
property
max_shape
the maximal size of the ringbuffer
- Return type
Tuple
[int
,int
]
-
property
shape
the current size of the data currently in the ringbuffer
- Return type
Tuple
[int
,int
]
BlockBuffer
-
class
SimpleBlockBuffer
(max_samples=50, channel_count=64, max_queued=None)[source] Convert incoming data into blocks.
Put new samples or chunks using
handle_sample()
orhandle_chunk()
. The SimpleBlockBuffer takes the data, and only publishes full blocks. These blocks are available by callingget()
- Parameters
max_samples (int) – how many samples (rowlen) each block should have
channel_count (int) – how many channels (columnlen) each block should have
max_queued (Union[None, int]) – Whether to queue as many blocks as possible (None) or drop if more than max_queued
Example:
blockbuffer = SimpleBlockBuffer(50, 1) chunk = np.arange(0, 200) blockbuffer.handle_chunk(chunk) block = blockbuffer.get() assert block[0] == 0.0 # the first block starts with the first sample block = blockbuffer.get() assert block[0] == 50.0 # the second block starts with the 50th sample assert block.shape == [50, 1] # as defined during initalization
-
get
()[source] return none or a block if there is one waiting in the queue
- Return type
Optional
[ndarray
]
Responses
-
class
Response
(chunk, tstamps, onset_in_ms, fs=1000, pre_in_ms=30, post_in_ms=75, ep_window_in_ms=(15.0, 50.0))[source] A reponse to a trigger
- Parameters
chunk (np.ndarray) – a data chunk as received from pylsl.StreamInlet.pull_chunk() or
liesl.buffers.ringbuffer.RingBuffer.get()
should be 2-dimensional (samples x channels)tstamps (np.ndarray) – the timestamps of this data chunk as e.g. received from pylsl.StreamInlet.pull_chunk() or
liesl.buffers.ringbuffer.RingBuffer.get()
can be 1-dimensional (timepoints) or 2 dimensional (timepoints x 1)onset_in_ms (float) – the timestamp of the trigger as e.g. received from pylsl.StreamInlet.pull_sample()
fs (int) – sampling rate in Hz, defaults to 1000
pre_in_ms (float) – how many milliseconds to use before the trigger
post_in_ms (float) – how many milliseconds to use after the trigger
ep_window_in_ms (Tuple[float, float]) – the expected timeframe when the evoked potential starts and stops. defaults to [15,50]
-
as_json
(channel_idx=0)[source] encodes the response as json
- Parameters
channel_idx (int) – which channel to use for calculation of MEP parameters
- Returns
msg – a json-encoded dictionary to be e.g. sent wwith an LSL MarkerOutlet
- Return type
str
-
get_latency
(channel_idx=0)[source] the latency of the MEP in a specific channel
Based on the time of TMS given during initialization, and the hard-coded pre_in_ms, post_in_ms and ep_window_in_ms calculates the latency
- Parameters
channel_idx (int) – which channel to use for calculation of latency
- Returns
vpp – the latency in ms relative to the TMS pulse of the negative and the positive peak
- Return type
List[float]
-
get_trace
(channel_idx=0, baseline_correction=True)[source] Cuts a chunk of data
Based on the given onset this function cuts out a trace for one or more (if an slice is given) channel. It does a baseline correction by default.
- Parameters
channel_idx (int) – which channel to use for calculation of latency.
- Returns
trace – Contains the trace of the data cut from pre to post around the onset. Type is a ndarray containing (pre+post) samples and shape (samples, 1)
- Return type
np.ndarray
-
get_vpp
(channel_idx=0)[source] the peak-to-peak amplitude of the MEP in a specific channel
Based on the time of TMS given during initialization, and the hard-coded pre_in_ms, post_in_ms and ep_window_in_ms calculates the Vpp
- Parameters
channel_idx (int) – which channel to use for calculation of Vpp
- Returns
vpp – the peak-to-peak amplitude in native units of the data chunk
- Return type
np.ndarray
-
get_xaxis
(stepsize=5)[source] get xaxis objects for plotting
- Parameters
stepsize (float) – the size of the stepos between xticks
- Return type
Tuple
[ndarray
,List
[str
],Tuple
[int
,int
]]- Returns
xticks (ndarray) – an array of xticks
xticklabels (List[str]) – a list fo xticklabels
xlim (Tuple[int, int]) – the limits of the xaxis
Recording
Runfile
Session
-
class
Session
(prefix='VvNn', mainfolder='~/labrecordings', recorder=None, streamargs=[None])[source] Manages recordings for a whole session
Example:
streamargs = [{'name':"localite_marker", "hostname": localhostname}, {'name':"reiz_marker_sa", "hostname": localhostname}, {'name':"BrainVision RDA Markers", "hostname": localhostname}, {'name':"BrainVision RDA", "hostname": localhostname}] session = Session(prefix="VvNn", streamargs=streamargs) with session("task"): run_task() # run your task, and while it runs, the streams are recorded # to ~/labrecording/VvNn/task_R001.xdf with session("task"): run_task() # run your task, and while it runs, the streams are recorded # to ~/labrecording/VvNn/task_R002.xdf with session("othertask"): run_othertask() # run your task, and while it runs, the streams are recorded # to ~/labrecording/VvNn/othertask_R001.xdf
-
Recorder
Loading
XDFStreams
-
class
XDFStream
(unparsed_stream)[source] Interface to an XDFstream loaded from an
xdf
-file-
property
channel_count
get the channel count as int
- Return type
int
-
property
channel_format
get the streams data format as str
- Return type
str
-
property
channel_labels
get the channel labels as a list of strings
- Return type
Optional
[List
[str
]]
-
property
channel_types
get the channel types as a list of strings
- Return type
Optional
[List
[str
]]
-
property
channel_units
get the channel units as a list of strings
- Return type
Optional
[List
[str
]]
-
property
created_at
get the time stamp from when the channel was created as float
-
property
hostname
get the hostname of the machine where the streams was created as str
-
property
name
get the streams name as str
- Return type
str
-
property
nominal_srate
get the nominal sampling rate as float
- Return type
float
-
property
time_series
get the time_series of the stream, i.e its data as ndarray
- Return type
ndarray
-
property
time_stamps
get the time_stamps for each sample as ndarray
- Return type
ndarray
-
property
type
get the streams type as str
- Return type
str
-
property