Source code for pybpodapi.session

# !/usr/bin/python3
# -*- coding: utf-8 -*-

import logging, uuid
from confapp import conf
from datetime import datetime as datetime_now

import pybpodapi
#from pybpodapi.state_machine import StateMachine
from                  import Trial
from       import EventOccurrence
from       import StateOccurrence
from    import SoftcodeOccurrence
from           import SessionInfo

from sca.formats import csv

import io
import sys
import pickle
import base64
from import StdoutBuffer
from import StderrBuffer

logger = logging.getLogger(__name__)

class StreamsWrapper(object):

    def __init__(self, streams):
        self.streams = streams

    def write(self, data):
        for stream in self.streams:

    def flush(self):
        for stream in self.streams:

    def close(self):
        for stream in self.streams:

[docs]class Session(object): """ Stores information about bpod run, including the list of trials. :ivar list(Trial) trials: a list of trials :ivar int firmware_version: firmware version of Bpod when experiment was run :ivar int bpod_version: version of Bpod hardware when experiment was run :ivar datetime start_timestamp: it stores session start timestamp """ MSGTYPE_DEBUG = 'debug' MSGTYPE_ENDTRIAL = 'END-TRIAL' MSGTYPE_ERROR = 'error' MSGTYPE_INFO = 'INFO' MSGTYPE_SOFTCODE = 'SOFTCODE' MSGTYPE_STDERR = 'stderr' MSGTYPE_STDOUT = 'stdout' MSGTYPE_TRIAL = 'TRIAL' MSGTYPE_WARNING = 'warning' MSGTYPE_STATE = 'STATE' MSGTYPE_TRANSITION = 'TRANSITION' INFO_SESSION_NAME = 'SESSION-NAME' INFO_SESSION_STARTED = 'SESSION-STARTED' INFO_SESSION_ENDED = 'SESSION-ENDED' INFO_SERIAL_PORT = 'SERIAL-PORT' INFO_NET_PORT = 'NET-PORT' INFO_BPODAPI_VERSION = 'BPOD-API-VERSION' INFO_PROTOCOL_NAME = 'PROTOCOL-NAME' INFO_CREATOR_NAME = 'CREATOR-NAME' INFO_PROJECT_NAME = 'PROJECT-NAME' INFO_EXPERIMENT_NAME = 'EXPERIMENT-NAME' INFO_BOARD_NAME = 'BOARD-NAME' INFO_SETUP_NAME = 'SETUP-NAME' INFO_SUBJECT_NAME = 'SUBJECT-NAME' INFO_BPODGUI_VERSION = 'BPOD-GUI-VERSION' INFO_TRIAL_BPODTIME = 'TRIAL-BPOD-TIME' def __init__(self, path=None): self.ostdout = sys.stdout self.ostderr = sys.stderr # the variable will contain a list of streams where the session output # should be written. streams = [] self.history = [] # type: list[Trial] self.trials = [] # type: list[Trial] self.firmware_version = None # type: int self.bpod_version = None # type: int self.start_timestamp = # type: datetime self.csvwriter = None self._path = path # stream data to a file. if path: streams += [open(path, 'w')] # stream data to the stdout. if conf.PYBPOD_API_STREAM2STDOUT: sys.stdout = StdoutBuffer(self) sys.stderr = StderrBuffer(self) streams += [self.ostdout] self.csvstream = StreamsWrapper(streams) self.csvwriter = csv.writer( self.csvstream, columns_headers=['TYPE', 'PC-TIME', 'BPOD-INITIAL-TIME', 'BPOD-FINAL-TIME', 'MSG', '+INFO'], software='PyBpod API v'+str(pybpodapi.__version__), def_url='', def_text='This file contains data recorded during a session from the PyBpod system' ) def __del__(self): self.csvstream.close() sys.stdout = self.ostdout sys.stderr = self.ostderr def __add__(self, msg): """ Add new trial to this session and associate a state machine to it :param pybpodapi.model.state_machine sma: state machine associated with this trial """ if isinstance(msg, Trial): self.trials.append(msg) elif self.current_trial is not None: self.current_trial += msg self.history.append(msg) if self.csvwriter: self.csvwriter.writerow( msg.tolist() ) self.csvwriter.flush() return self def add_trial_events(self): current_trial = self.current_trial # type: Trial sma = current_trial.sma visitedStates = [0] * current_trial.sma.total_states_added # determine unique states while preserving visited order uniqueStates = [] nUniqueStates = 0 uniqueStateIndexes = [0] * len(current_trial.states) for i in range(len(current_trial.states)): if current_trial.states[i] in uniqueStates: uniqueStateIndexes[i] = uniqueStates.index(current_trial.states[i]) else: uniqueStateIndexes[i] = nUniqueStates nUniqueStates += 1 uniqueStates.append(current_trial.states[i]) visitedStates[current_trial.states[i]] = 1 # Create a 2-d matrix for each state in a list uniqueStateDataMatrices = [[] for i in range(len(current_trial.states))] # Append one matrix for each unique state for i in range(len(current_trial.states)): if len(current_trial.state_timestamps) > 1: uniqueStateDataMatrices[uniqueStateIndexes[i]] += [ (current_trial.state_timestamps[i], current_trial.state_timestamps[i + 1])] for i in range(nUniqueStates): thisStateName = sma.state_names[uniqueStates[i]] for state_dur in uniqueStateDataMatrices[i]: self += StateOccurrence(thisStateName, state_dur[0], state_dur[1] ) logger.debug("State names: %s", sma.state_names) logger.debug("nPossibleStates: %s", sma.total_states_added) for i in range(sma.total_states_added): thisStateName = sma.state_names[i] if not visitedStates[i]: self += StateOccurrence(thisStateName, float('NaN'), float('NaN') ) logger.debug("Trial states: %s", [str(state) for state in current_trial.states_occurrences]) # save events occurrences on trial #current_trial.events_occurrences = sma.raw_data.events_occurrences # type: list logger.debug("Trial events: %s", [str(event) for event in current_trial.events_occurrences]) logger.debug("Trial info: %s", str(current_trial)) @property def current_trial(self): """ Get current trial :rtype: Trial """ return self.trials[-1] if len(self.trials)>0 else None @current_trial.setter def current_trial(self, value): """ Get current trial :rtype: Trial """ self.trials[-1] = value