Source code for liesl.files.xdf.inspect_xdf

"""
Peeking
-------
"""

from pyxdf import load_xdf
from collections import Counter
from typing import List, Dict
from liesl.show.textplot import zoom_plot as plot
import textwrap
from math import inf


def shorten(text: str, width: int, placeholder="..."):
    if len(text) <= width:
        return text
    else:
        return textwrap.wrap(text, width - len(placeholder))[0] + placeholder


[docs]def peek(filename: str, at_most=1, max_duration=10) -> List[Dict]: """peek into an xdf-file Find the first `at_most` N streaminfos of the xdf-file but do not search for longer that `max duration` seconds, whatever comes first. """ from pyxdf.pyxdf import _read_varlen_int, parse_chunks, open_xdf from pyxdf.pyxdf import _parse_streamheader import struct import xml.etree.ElementTree as ET from math import inf from itertools import islice import time def _read_chunks(f, max_duration=10): t0 = time.time() while True: chunk = dict() try: chunk["nbytes"] = _read_varlen_int(f) except EOFError: return chunk["tag"] = struct.unpack("<H", f.read(2))[0] if chunk["tag"] == 2: chunk["stream_id"] = struct.unpack("<I", f.read(4))[0] xml = ET.fromstring(f.read(chunk["nbytes"] - 6).decode()) chunk = {**chunk, **_parse_streamheader(xml)} yield chunk else: f.seek(chunk["nbytes"] - 2, 1) # skip remaining chunk contents if time.time() - t0 > max_duration: return chunks = [] with open_xdf(filename) as f: for chunk in islice( _read_chunks(f, max_duration=max_duration), at_most ): chunks.append(chunk) return parse_chunks(chunks)
def load_concise(filename: str, at_most=1, timeout: float = inf): print(f"\r\nLoading {filename:3} concisely\n") line = "{0:<25s}{1:^20s}{2:4s}{3:^5s}{4:>26s}" print(line.format("Name", "Type", "Ch", "Fs", "Source")) print("-" * 80) sinfos = peek(filename, at_most=at_most, max_duration=timeout) for sinfo in sinfos: name = sinfo["name"] typ = sinfo["type"] cc = sinfo["channel_count"] fs = sinfo["nominal_srate"] name = shorten(name, 25) typ = shorten(typ, 20) try: sid = sinfo["source_id"] sid = sid if sid else "Unknown" sid = shorten(sid, 25) except IndexError: sid = '""' print(line.format(name, typ, str(cc), str(fs), sid)) print("\n") def main(filename): print("main is deprecated, use load_fully instead") return load_fully(filename) def load_fully(filename): print(f"\r\nLoading {filename:3} fully\n") streams, info = load_xdf(filename) hdr = "XDF Fileversion " + info["info"]["version"][0] print(f"{hdr:>80}\n") line = "{0:<25s}{1:^20s}{2:4s}{3:^5s}{4:>26s}" print(line.format("Name", "Type", "Ch", "Fs", "Source")) print("-" * 80) for s in streams: name = s["info"]["name"][0] typ = s["info"]["type"][0] cc = s["info"]["channel_count"][0] fs = float(s["info"]["nominal_srate"][0]) fs = str(int(fs)) if int(fs) == fs else str(fs) name = shorten(name, 25) typ = shorten(typ, 20) try: sid = s["info"]["source_id"][0] # can be None in edge cases sid = sid if sid else "Unknown" sid = shorten(sid, 25) except IndexError: sid = '""' print(line.format(name, typ, cc, fs, sid)) print("\n") line = "{0:<30s}{1:>50s}" for s in streams: typ = s["info"]["type"][0] name = s["info"]["name"][0] if "marker" in typ.lower(): events = Counter([s[0] for s in s["time_series"]]) print(line.format(name, "Events")) print("-" * 80) for key, val in events.items(): if key == "": wrapped_key = '""' else: wrapped_key = textwrap.shorten( key, width=70, placeholder="..." ) alignment = 80 - len(wrapped_key) print( "{0}{1:>{align}}".format(wrapped_key, val, align=alignment) ) print() else: print(line.format(name, "Exemplary data")) print("-" * 80) sz = s["time_series"].shape if sz[1]: x = s["time_stamps"] y = s["time_series"][:, 0] plot(y, x) else: print("No data found, array has shape({0}, {1})".format(*sz)) print() print(f"Overview finished for {filename:3}\n") if __name__ == "__main__": pass