liesl API

The main API can be accessed directly from liesl using e.g.

import liesl
stream = liesl.print_available_streams()

Stream Handling

Detect and print information about available streams.

Find and open streams

open_stream(**kwargs)[source]

get only a single streaminlets which matches kwargs

raises a ConnectionError if more than one match is found

Parameters

**kwargs – keyword arguments to identify the desired stream

Returns

stream – a single StreamInlet matching the kwargs.

Return type

StreamInlet

Example:

import liesl
stream = liesl.open_stream(name="Liesl-Mock-EEG")
open_streaminfo(**kwargs)[source]

get only a single StreamInfo which matches kwargs

raises a ConnectionError if more than one match is found

Parameters

**kwargs – keyword arguments to identify the desired stream

Returns

sinfo – a single StreamInfo matching the kwargs.

Return type

StreamInfo

Example:

import liesl
sinfo = liesl.open_streaminfo(name="Liesl-Mock-EEG")
print_available_streams()[source]

prints all available streams in their xml form

Example:

import liesl
liesl.print_available_streams()
get_streams_matching(**kwargs)[source]

get all streaminlets matching kwargs

Parameters

**kwargs – keyword arguments to identify the desired stream

Returns

streams – a list of StreamInlets matching the kwargs

Return type

List[StreamInlet]

Example:

import liesl
streams = liesl.get_streams_matching(name="Liesl-Mock-EEG")
get_streaminfos_matching(**kwargs)[source]
Parameters

**kwargs – keyword arguments to identify the desired stream

Returns

sinfos – a list of StreamInfo matching the kwargs

Return type

List[StreamInfo]

Example:

import liesl
sinfos = liesl.get_streaminfos_matching(name="Liesl-Mock-EEG")

Convert streams

inlet_to_dict(inlet)[source]

convert inlet information into a dictionary

Parameters

inlet (pylsl.StreamInlet) – the inlet to convert into a dictionary

Returns

output – a dictionary of key information parsed from the xml

Return type

dict

Example:

import liesl
stream = liesl.open_stream()
d = liesl.inlet_to_dict(stream)
get_channel_map(inlet)[source]

convert inlet information into a ChannelMapping

Parameters

inlet (pylsl.StreamInlet) – the inlet to convert into a dictionary

Returns

output – a dictionary mapping channel names to indices

Return type

Dict[str, int]

Example:

import liesl
stream = liesl.open_stream()
chanmap = liesl.get_channel_map(stream)

localhost

localhostname = 'build-16524033-project-533603-pyliesl'

the name of the local machine

localhost = '127.0.0.1'

the localhost

localip = '172.17.0.2'

the local ip address

Buffers

Ringbuffer

class RingBuffer(streaminfo, duration_in_ms=1000, verbose=False)[source]

A ringbuffer subscribed to an LSL outlet

The ringbuffer automatically updating itself as a thread.

Parameters
  • streaminfo (StreamInfo) – identifies the StreamOutlet and will be connected once the buffer started

  • duration_in_ms (float) – the length of the ringbuffer in ms. is automatically converted into samples based on the nominal sampling rate of the LSL Outlet. All data older than duration_in_ms (normalized by expected samples) will be discarded

  • verbose (bool) – how verbose the ringbuffer should be. defaults to False

Example:

sinfo = get_streaminfos_matching(name="Liesl-Mock-EEG")[0]
# the mock EEG has a sampling rate of 100 and 8 channels
rb = RingBuffer(streaminfo=sinfo, duration_in_ms=1000)
rb.await_running()
time.sleep(1) # wait a second to collect sufficient data
chunk, tstamps = rb.get()
assert chunk.shape == [100, 8] # for 1s of data and 8 channels
await_running()[source]

block until the buffer has subscribed to the LSL outlet

get()[source]

get the current data with timestamps

Return type

Tuple[ndarray, ndarray]

Returns

  • chunk (ndarray) – the data (usually in samples x channels)

  • tstamps (ndarray) – the timestamps for each sample

get_data()[source]

get only the current data without timestamps

Returns

chunk – the data (usually in samples x channels)

Return type

ndarray

property is_full

whether the ringbuffer is full

Return type

bool

property is_running

whether the ringbuffer is receiving new data or not

property max_shape

the maximal size of the ringbuffer

Return type

Tuple[int, int]

reset()[source]

clear the internal buffer and start collecting fresh

property shape

the current size of the data currently in the ringbuffer

Return type

Tuple[int, int]

stop()[source]

stop the subscription to the outlet

BlockBuffer

class SimpleBlockBuffer(max_samples=50, channel_count=64, max_queued=None)[source]

Convert incoming data into blocks.

Put new samples or chunks using handle_sample() or handle_chunk(). The SimpleBlockBuffer takes the data, and only publishes full blocks. These blocks are available by calling get()

Parameters
  • max_samples (int) – how many samples (rowlen) each block should have

  • channel_count (int) – how many channels (columnlen) each block should have

  • max_queued (Union[None, int]) – Whether to queue as many blocks as possible (None) or drop if more than max_queued

Example:

blockbuffer = SimpleBlockBuffer(50, 1)
chunk = np.arange(0, 200)
blockbuffer.handle_chunk(chunk)
block = blockbuffer.get()
assert block[0] == 0.0 # the first block starts with the first sample
block = blockbuffer.get()
assert block[0] == 50.0 # the second block starts with the 50th sample
assert block.shape == [50, 1]  # as defined during initalization
get()[source]

return none or a block if there is one waiting in the queue

Return type

Optional[ndarray]

get_last()[source]

return the last valid block

handle_chunk(chunk)[source]

iteratively append all samples of a chunk to the block :param chunk: a new chunk of data to be processed columnwise :type chunk: np.ndarray

handle_sample(sample)[source]

try to append new samples to the block

reset()[source]

clear all buffers and the queue

Responses

class Response(chunk, tstamps, onset_in_ms, fs=1000, pre_in_ms=30, post_in_ms=75, ep_window_in_ms=(15.0, 50.0))[source]

A reponse to a trigger

Parameters
  • chunk (np.ndarray) – a data chunk as received from pylsl.StreamInlet.pull_chunk() or liesl.buffers.ringbuffer.RingBuffer.get() should be 2-dimensional (samples x channels)

  • tstamps (np.ndarray) – the timestamps of this data chunk as e.g. received from pylsl.StreamInlet.pull_chunk() or liesl.buffers.ringbuffer.RingBuffer.get() can be 1-dimensional (timepoints) or 2 dimensional (timepoints x 1)

  • onset_in_ms (float) – the timestamp of the trigger as e.g. received from pylsl.StreamInlet.pull_sample()

  • fs (int) – sampling rate in Hz, defaults to 1000

  • pre_in_ms (float) – how many milliseconds to use before the trigger

  • post_in_ms (float) – how many milliseconds to use after the trigger

  • ep_window_in_ms (Tuple[float, float]) – the expected timeframe when the evoked potential starts and stops. defaults to [15,50]

as_json(channel_idx=0)[source]

encodes the response as json

Parameters

channel_idx (int) – which channel to use for calculation of MEP parameters

Returns

msg – a json-encoded dictionary to be e.g. sent wwith an LSL MarkerOutlet

Return type

str

get_latency(channel_idx=0)[source]

the latency of the MEP in a specific channel

Based on the time of TMS given during initialization, and the hard-coded pre_in_ms, post_in_ms and ep_window_in_ms calculates the latency

Parameters

channel_idx (int) – which channel to use for calculation of latency

Returns

vpp – the latency in ms relative to the TMS pulse of the negative and the positive peak

Return type

List[float]

get_trace(channel_idx=0, baseline_correction=True)[source]

Cuts a chunk of data

Based on the given onset this function cuts out a trace for one or more (if an slice is given) channel. It does a baseline correction by default.

Parameters

channel_idx (int) – which channel to use for calculation of latency.

Returns

trace – Contains the trace of the data cut from pre to post around the onset. Type is a ndarray containing (pre+post) samples and shape (samples, 1)

Return type

np.ndarray

get_vpp(channel_idx=0)[source]

the peak-to-peak amplitude of the MEP in a specific channel

Based on the time of TMS given during initialization, and the hard-coded pre_in_ms, post_in_ms and ep_window_in_ms calculates the Vpp

Parameters

channel_idx (int) – which channel to use for calculation of Vpp

Returns

vpp – the peak-to-peak amplitude in native units of the data chunk

Return type

np.ndarray

get_xaxis(stepsize=5)[source]

get xaxis objects for plotting

Parameters

stepsize (float) – the size of the stepos between xticks

Return type

Tuple[ndarray, List[str], Tuple[int, int]]

Returns

  • xticks (ndarray) – an array of xticks

  • xticklabels (List[str]) – a list fo xticklabels

  • xlim (Tuple[int, int]) – the limits of the xaxis

class MockResponse(chunk, tstamps, onset_in_ms, fs=1000, pre_in_ms=30, post_in_ms=75, ep_window_in_ms=(15.0, 50.0))[source]

mocks a response for testing and development

Recording

Runfile

class Run[source]

a runfile automatically increasing its runcount during instantiation

Parameters

fname (str) – the filename, requires the suffix .xdf

Example:

run = Run("test")
print(run.name) # prints test_R001.xdf

Session

class Session(prefix='VvNn', mainfolder='~/labrecordings', recorder=None, streamargs=[None])[source]

Manages recordings for a whole session

Example:

streamargs = [{'name':"localite_marker", "hostname": localhostname},
              {'name':"reiz_marker_sa", "hostname": localhostname},
              {'name':"BrainVision RDA Markers", "hostname": localhostname},
              {'name':"BrainVision RDA", "hostname": localhostname}]

session = Session(prefix="VvNn",
                  streamargs=streamargs)


with session("task"):
    run_task()
    #  run your task, and while it runs, the streams are recorded
    # to ~/labrecording/VvNn/task_R001.xdf


with session("task"):
    run_task()
    # run your task, and while it runs, the streams are recorded
    # to ~/labrecording/VvNn/task_R002.xdf

with session("othertask"):
    run_othertask()
    # run your task, and while it runs, the streams are recorded
    # to ~/labrecording/VvNn/othertask_R001.xdf
start_recording(task='recording')[source]

start recording all streams

Parameters

task (str) – the name of the file. will be instantiated as a Run and therefore auto-increment

stop_recording()[source]

stop the current recording

Recorder

alias of liesl.files.labrecorder.cli_wrapper.LabRecorderCLI

Loading

XDFStreams

XDFFile(filename, verbose=False)[source]

load an xdf-file and return a dictionary of its streams

Parameters

filename (Union[Path, str]) – the name of the xdffile

Returns

streams – a collection of all XDFStream s in the file. These can be indexed by their respective name

Return type

Dict[str, XDFStream]

class XDFStream(unparsed_stream)[source]

Interface to an XDFstream loaded from an xdf-file

property channel_count

get the channel count as int

Return type

int

property channel_format

get the streams data format as str

Return type

str

property channel_labels

get the channel labels as a list of strings

Return type

Optional[List[str]]

property channel_types

get the channel types as a list of strings

Return type

Optional[List[str]]

property channel_units

get the channel units as a list of strings

Return type

Optional[List[str]]

property created_at

get the time stamp from when the channel was created as float

property hostname

get the hostname of the machine where the streams was created as str

property name

get the streams name as str

Return type

str

property nominal_srate

get the nominal sampling rate as float

Return type

float

property time_series

get the time_series of the stream, i.e its data as ndarray

Return type

ndarray

property time_stamps

get the time_stamps for each sample as ndarray

Return type

ndarray

property type

get the streams type as str

Return type

str

Peeking

peek(filename, at_most=1, max_duration=10)[source]

peek into an xdf-file

Find the first at_most N streaminfos of the xdf-file but do not search for longer that max duration seconds, whatever comes first.

Return type

List[Dict]