Source code for liesl.files.labrecorder.cli_wrapper

"""
LabRecorderCLI
--------------
"""

from subprocess import PIPE, Popen, run
from pathlib import Path
from liesl.files.labrecorder.manager import find_file, validate
from liesl.files.run import Run
import logging
import time
from typing import List
import pkg_resources
import sys


def find_lrcmd_os(platform: str) -> Path:
    "return the default path to labrecorder for this platform"
    root = Path(
        pkg_resources.resource_filename("liesl", "files/labrecorder/lib")
    )
    if platform.startswith("win"):
        path_to_cmd = root / "LabRecorderCLI.exe"
    elif platform.startswith("linux"):
        path_to_cmd = root / "LabRecorderCLI"
    elif platform.startswith("darwin"):
        path_to_cmd = root / "LabRecorderCLI_mac"
    else:
        raise NotImplementedError()

    return path_to_cmd


def find_lrcmd(path_to_cmd: str = None) -> Path:
    "Find and validate the path to LabRecorder"
    if path_to_cmd is None:
        path_to_cmd = find_lrcmd_os(sys.platform)
    path_to_cmd = Path(str(path_to_cmd)).expanduser().absolute()

    if not path_to_cmd.exists():
        raise FileNotFoundError("Path to command does not exist")

    if sys.platform.startswith(("linux", "darwin")):
        if path_to_cmd.name.startswith("LabRecorderCLI"):
            return path_to_cmd
    elif sys.platform.startswith("win"):
        if path_to_cmd.name == "LabRecorderCLI.exe":
            return path_to_cmd

    # if nothing worked, we end here
    raise FileNotFoundError("No valid path to LabRecorder")


[docs]class LabRecorderCLI: """Process based interface for LabRecorderCLI args ---- path_to_cmd:str defaults to None and select the LabRecorder installed in liesl/files/labrecorder/lib. Otherwise, use the path to the LabRecorderCLI of your desire. Use :meth:`~.start_recording` and :meth:`~.stop_recording` to record a set of streams identified by keyword arguments. Example:: filename = '~/Desktop/untitled.xdf' streamargs = [{"type":"EEG"},{"type":"Marker"}] streamargs = [{"type":"EEG", "name":"Liesl-Mock-EEG"}, {"type":"Marker"}] lr = LabRecorderCLI() lr.start_recording(filename, streamargs) time.sleep(5) lr.stop_recording() """ def __init__(self, path_to_cmd: str = None) -> None: self.streamargs = None self.cmd = find_lrcmd(path_to_cmd)
[docs] def bind(self, streamargs: List[dict,] = [None]) -> None: """bind a set of required streams to start recording Recording will throw a ConnectionError if these streams are not present at time of binding or at the time of starting a recording """ self.streamargs = ( validate(streamargs) if streamargs is not None else None )
[docs] def start_recording( self, filename: str = "~/recordings/recording.xdf", streamargs: List[dict,] = None, ): "start recording the streams identified by streamargs to filename" if streamargs is not None: self.bind(streamargs) if self.streamargs is None: raise ValueError("No streams were specified") filename = Run(filename) filename.parent.mkdir(exist_ok=True, parents=True) # start encoding the command streams = [] for idx, uid in enumerate(self.streamargs): stream = '"' prt = f"source_id='{uid}'" stream += prt stream += '"' streams.append(stream) cmd = " ".join((str(self.cmd), str(filename), *streams)) if sys.platform.startswith("win"): self.process = Popen( cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE, bufsize=1, ) else: # linux, mac self.process = Popen( cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE, bufsize=1, shell=True, ) peek = self.process.stdout.peek() # print(peek.decode()) if b"matched no stream" in peek: # pragma no cover # would be already catched by self.validate self.stop_recording() print("\a") # makes a platfrom independent beep raise ConnectionError(peek.decode().strip()) self.t0 = time.time() print("Start recording to {0}".format(filename)) return filename
[docs] def stop_recording(self) -> None: "stop recording" if hasattr(self, "process"): o, e = self.process.communicate(b"\n") if self.process.poll() != 0: # pragma no cover raise ConnectionError(o + e) del self.process self.dur = time.time() - self.t0 print("Stopped recording after {0}s".format(self.dur))