Welcome to LieSL’s documentation!¶
LieSL is a set of convenient tools to manage LSL streams and record or load xdf files wrapping pyxdf. and LabRecorder.
Installation¶
Stable¶
pip install liesl
Development¶
git clone https://github.com/pyreiz/pyliesl.git
cd pyliesl
pip install -e .
Basic Usage¶
Liesl offers a python API and a set of command line tools
Two common use case for the API are recording a set of streams with Session
or subscribing
a RingBuffer
to a specific LSL Outlet detected by open_streaminfo()
.
Three common use cases for the CLI are mocking a Outlet for development and testing using
liesl mock
, printing information about all currently visible LSL outlets using
liesl list
, or fast peeking into the content of an xdf file with
liesl xdf <filename> --at-most 10
.
Documentation¶
liesl API¶
The main API can be accessed directly from liesl using e.g.
import liesl
stream = liesl.print_available_streams()
Stream Handling¶
Detect and print information about available streams.
Find and open streams¶
-
get_streaminfos_matching
(**kwargs)[source]¶ - Parameters
**kwargs – keyword arguments to identify the desired stream
- Returns
sinfos – a list of StreamInfo matching the kwargs
- Return type
List[StreamInfo]
Example:
import liesl sinfos = liesl.get_streaminfos_matching(name="Liesl-Mock-EEG")
-
get_streams_matching
(**kwargs)[source]¶ get all streaminlets matching kwargs
- Parameters
**kwargs – keyword arguments to identify the desired stream
- Returns
streams – a list of StreamInlets matching the kwargs
- Return type
List[StreamInlet]
Example:
import liesl streams = liesl.get_streams_matching(name="Liesl-Mock-EEG")
-
open_stream
(**kwargs)[source]¶ get only a single streaminlets which matches kwargs
raises a ConnectionError if more than one match is found
- Parameters
**kwargs – keyword arguments to identify the desired stream
- Returns
stream – a single StreamInlet matching the kwargs.
- Return type
StreamInlet
Example:
import liesl stream = liesl.open_stream(name="Liesl-Mock-EEG")
-
open_streaminfo
(**kwargs)[source]¶ get only a single StreamInfo which matches kwargs
raises a ConnectionError if more than one match is found
- Parameters
**kwargs – keyword arguments to identify the desired stream
- Returns
sinfo – a single StreamInfo matching the kwargs.
- Return type
StreamInfo
Example:
import liesl sinfo = liesl.open_streaminfo(name="Liesl-Mock-EEG")
Convert streams¶
-
get_channel_map
(inlet)[source]¶ convert inlet information into a ChannelMapping
- Parameters
inlet (pylsl.StreamInlet) – the inlet to convert into a dictionary
- Returns
output – a dictionary mapping channel names to indices
- Return type
Dict[str, int]
Example:
import liesl stream = liesl.open_stream() chanmap = liesl.get_channel_map(stream)
-
inlet_to_dict
(inlet)[source]¶ convert inlet information into a dictionary
- Parameters
inlet (pylsl.StreamInlet) – the inlet to convert into a dictionary
- Returns
output – a dictionary of key information parsed from the xml
- Return type
dict
Example:
import liesl stream = liesl.open_stream() d = liesl.inlet_to_dict(stream)
Buffers¶
Ringbuffer¶
-
class
RingBuffer
(streaminfo, duration_in_ms=1000, verbose=False)[source]¶ A ringbuffer subscribed to an LSL outlet
The ringbuffer automatically updating itself as a thread.
- Parameters
streaminfo (StreamInfo) – identifies the StreamOutlet and will be connected once the buffer started
duration_in_ms (float) – the length of the ringbuffer in ms. is automatically converted into samples based on the nominal sampling rate of the LSL Outlet. All data older than duration_in_ms (normalized by expected samples) will be discarded
verbose (bool) – how verbose the ringbuffer should be. defaults to False
Example:
sinfo = get_streaminfos_matching(name="Liesl-Mock-EEG")[0] # the mock EEG has a sampling rate of 100 and 8 channels rb = RingBuffer(streaminfo=sinfo, duration_in_ms=1000) rb.await_running() time.sleep(1) # wait a second to collect sufficient data chunk, tstamps = rb.get() assert chunk.shape == [100, 8] # for 1s of data and 8 channels
-
get
()[source]¶ get the current data with timestamps
- Return type
Tuple
[ndarray
,ndarray
]- Returns
chunk (ndarray) – the data (usually in samples x channels)
tstamps (ndarray) – the timestamps for each sample
-
get_data
()[source]¶ get only the current data without timestamps
- Returns
chunk – the data (usually in samples x channels)
- Return type
ndarray
-
property
is_full
¶ whether the ringbuffer is full
- Return type
bool
-
property
is_running
¶ whether the ringbuffer is receiving new data or not
-
property
max_shape
¶ the maximal size of the ringbuffer
- Return type
Tuple
[int
,int
]
-
property
shape
¶ the current size of the data currently in the ringbuffer
- Return type
Tuple
[int
,int
]
BlockBuffer¶
-
class
SimpleBlockBuffer
(max_samples=50, channel_count=64, max_queued=None)[source]¶ Convert incoming data into blocks.
Put new samples or chunks using
handle_sample()
orhandle_chunk()
. The SimpleBlockBuffer takes the data, and only publishes full blocks. These blocks are available by callingget()
- Parameters
max_samples (int) – how many samples (rowlen) each block should have
channel_count (int) – how many channels (columnlen) each block should have
max_queued (Union[None, int]) – Whether to queue as many blocks as possible (None) or drop if more than max_queued
Example:
blockbuffer = SimpleBlockBuffer(50, 1) chunk = np.arange(0, 200) blockbuffer.handle_chunk(chunk) block = blockbuffer.get() assert block[0] == 0.0 # the first block starts with the first sample block = blockbuffer.get() assert block[0] == 50.0 # the second block starts with the 50th sample assert block.shape == [50, 1] # as defined during initalization
-
get
()[source]¶ return none or a block if there is one waiting in the queue
- Return type
Optional
[ndarray
]
Responses¶
-
class
MockResponse
(chunk, tstamps, onset_in_ms, fs=1000, pre_in_ms=30, post_in_ms=75, ep_window_in_ms=(15.0, 50.0))[source]¶ mocks a response for testing and development
-
class
Response
(chunk, tstamps, onset_in_ms, fs=1000, pre_in_ms=30, post_in_ms=75, ep_window_in_ms=(15.0, 50.0))[source]¶ A reponse to a trigger
- Parameters
chunk (np.ndarray) – a data chunk as received from pylsl.StreamInlet.pull_chunk() or
liesl.buffers.ringbuffer.RingBuffer.get()
should be 2-dimensional (samples x channels)tstamps (np.ndarray) – the timestamps of this data chunk as e.g. received from pylsl.StreamInlet.pull_chunk() or
liesl.buffers.ringbuffer.RingBuffer.get()
can be 1-dimensional (timepoints) or 2 dimensional (timepoints x 1)onset_in_ms (float) – the timestamp of the trigger as e.g. received from pylsl.StreamInlet.pull_sample()
fs (int) – sampling rate in Hz, defaults to 1000
pre_in_ms (float) – how many milliseconds to use before the trigger
post_in_ms (float) – how many milliseconds to use after the trigger
ep_window_in_ms (Tuple[float, float]) – the expected timeframe when the evoked potential starts and stops. defaults to [15,50]
-
as_json
(channel_idx=0)[source]¶ encodes the response as json
- Parameters
channel_idx (int) – which channel to use for calculation of MEP parameters
- Returns
msg – a json-encoded dictionary to be e.g. sent wwith an LSL MarkerOutlet
- Return type
str
-
get_latency
(channel_idx=0)[source]¶ the latency of the MEP in a specific channel
Based on the time of TMS given during initialization, and the hard-coded pre_in_ms, post_in_ms and ep_window_in_ms calculates the latency
- Parameters
channel_idx (int) – which channel to use for calculation of latency
- Returns
vpp – the latency in ms relative to the TMS pulse of the negative and the positive peak
- Return type
List[float]
-
get_trace
(channel_idx=0, baseline_correction=True)[source]¶ Cuts a chunk of data
Based on the given onset this function cuts out a trace for one or more (if an slice is given) channel. It does a baseline correction by default.
- Parameters
channel_idx (int) – which channel to use for calculation of latency.
- Returns
trace – Contains the trace of the data cut from pre to post around the onset. Type is a ndarray containing (pre+post) samples and shape (samples, 1)
- Return type
np.ndarray
-
get_vpp
(channel_idx=0)[source]¶ the peak-to-peak amplitude of the MEP in a specific channel
Based on the time of TMS given during initialization, and the hard-coded pre_in_ms, post_in_ms and ep_window_in_ms calculates the Vpp
- Parameters
channel_idx (int) – which channel to use for calculation of Vpp
- Returns
vpp – the peak-to-peak amplitude in native units of the data chunk
- Return type
np.ndarray
-
get_xaxis
(stepsize=5)[source]¶ get xaxis objects for plotting
- Parameters
stepsize (float) – the size of the stepos between xticks
- Return type
Tuple
[ndarray
,List
[str
],Tuple
[int
,int
]]- Returns
xticks (ndarray) – an array of xticks
xticklabels (List[str]) – a list fo xticklabels
xlim (Tuple[int, int]) – the limits of the xaxis
Recording¶
Runfile¶
Session¶
-
Recorder
¶
-
class
Session
(prefix='VvNn', mainfolder='~/labrecordings', recorder=None, streamargs=[None])[source]¶ Manages recordings for a whole session
Example:
streamargs = [{'name':"localite_marker", "hostname": localhostname}, {'name':"reiz_marker_sa", "hostname": localhostname}, {'name':"BrainVision RDA Markers", "hostname": localhostname}, {'name':"BrainVision RDA", "hostname": localhostname}] session = Session(prefix="VvNn", streamargs=streamargs) with session("task"): run_task() # run your task, and while it runs, the streams are recorded # to ~/labrecording/VvNn/task_R001.xdf with session("task"): run_task() # run your task, and while it runs, the streams are recorded # to ~/labrecording/VvNn/task_R002.xdf with session("othertask"): run_othertask() # run your task, and while it runs, the streams are recorded # to ~/labrecording/VvNn/othertask_R001.xdf
Loading¶
XDFStreams¶
-
class
XDFStream
(unparsed_stream)[source]¶ Interface to an XDFstream loaded from an
xdf
-file-
property
channel_count
¶ get the channel count as int
- Return type
int
-
property
channel_format
¶ get the streams data format as str
- Return type
str
-
property
channel_labels
¶ get the channel labels as a list of strings
- Return type
Optional
[List
[str
]]
-
property
channel_types
¶ get the channel types as a list of strings
- Return type
Optional
[List
[str
]]
-
property
channel_units
¶ get the channel units as a list of strings
- Return type
Optional
[List
[str
]]
-
property
created_at
¶ get the time stamp from when the channel was created as float
-
property
hostname
¶ get the hostname of the machine where the streams was created as str
-
property
name
¶ get the streams name as str
- Return type
str
-
property
nominal_srate
¶ get the nominal sampling rate as float
- Return type
float
-
property
time_series
¶ get the time_series of the stream, i.e its data as ndarray
- Return type
ndarray
-
property
time_stamps
¶ get the time_stamps for each sample as ndarray
- Return type
ndarray
-
property
type
¶ get the streams type as str
- Return type
str
-
property
liesl CLI¶
liesl also offers a command line interface. This interface can be accessed after installation of the package from the terminal, e.g. create a mock LSL outlet producing fake EEG with
liesl mock --type EEG
liesl¶
usage: liesl [-h] {config,list,show,mock,xdf} ...
positional arguments:
{config,list,show,mock,xdf}
config initialize the lsl_api.cfg for all users with system,
globally for this user with global or locally in this
folder with local
list list available LSL streams
show Visualize a specific LSL streams
mock mock a LSL stream
xdf inspect an XDF file
optional arguments:
-h, --help show this help message and exit
liesl config¶
usage: liesl config [-h] [--default] [--sessionid SESSIONID]
[--knownpeers KNOWNPEERS]
{system,user,local}
positional arguments:
{system,user,local} system: /etc/lsl_api/lsl_api.cfg or
C:\etc\lsl_api\lsl_api.cfg on Windows. user:
~/lsl_api/lsl_api.cfg or
C:\Users\username\lsl_api\lsl_api.cfg on Windows.
local: lsl_api.cfg in the current working directory
optional arguments:
-h, --help show this help message and exit
--default initializes a configuration from default
--sessionid SESSIONID
sets the sessionid for this level
--knownpeers KNOWNPEERS
set knownpeers for this level
liesl list¶
usage: liesl list [-h] [--field FIELD]
optional arguments:
-h, --help show this help message and exit
--field FIELD which field to print. For example: liesl list --field
'["name", "source_id"]'
liesl show¶
usage: liesl show [-h] [--name NAME] [--type TYPE] [--channel CHANNEL]
[--backend {mpl,ascii}] [--frate FRATE]
optional arguments:
-h, --help show this help message and exit
--name NAME name of the stream
--type TYPE type of the stream
--channel CHANNEL which channel to visualize
--backend {mpl,ascii}
what backend to use
--frate FRATE at which frequency the plot will be updated
liesl mock¶
usage: liesl mock [-h] [--type TYPE]
optional arguments:
-h, --help show this help message and exit
--type TYPE type of the stream
liesl xdf¶
usage: liesl xdf [-h] [-a AT_MOST] [-t TIMEOUT] filename
positional arguments:
filename filename
optional arguments:
-h, --help show this help message and exit
-a AT_MOST, --at-most AT_MOST
return lastest once that many streams were found,
regardloss of how long it takes. Useful if file is
very large, and can prevent parsing the whole file.
defaults to sys.maxsize because integers are required,
but unbound in python. Set it to 0' to load the file
completely
-t TIMEOUT, --timeout TIMEOUT
return latest after this many seconds, regardless of
how many streams were found. Useful if the file is
very large, and you are sure you started recording all
streams at the beginnging, as this prevents parsing
the whole file. defaults to 1 second. Set it to 'inf'
to load the file completely