Source code for pybpod_rotaryencoder_module.module_api

from pybpodapi.com.arcom import ArCOM, ArduinoTypes


[docs]class RotaryEncoderModule(object): COM_HANDSHAKE = 'C' COM_TOGGLEEVTTRANSM = ord('V') COM_TOGGLEOUTPUTSTREAM = ord('O') COM_TOGGLESTREAM = ord('S') COM_STARTLOGGING = ord('L') COM_STOPLOGGING = ord('F') COM_GETLOGDATA = ord('R') COM_GETCURRENTPOS = ord('Q') COM_SETZEROPOS = ord('Z') COM_SETPOS = ord('P') COM_ENABLETHRESHOLDS = ord(';') COM_SETPREFIX = ord('I') COM_SETTHRESHOLDS = ord('T') COM_SETWRAPPOINT = ord('W') def __init__(self, serialport=None): """ Constructor of the RotaryEncoderModule object A serial connection to the Rotary Encoder board is opened at the construction of the object. :ivar str serialport: PC serial port where the module is connected """ if serialport: self.open(serialport)
[docs] def open(self, serialport): """ Opens a serial connection to the Rotary Encoder board. :ivar str serialport: PC serial port where the module is connected """ self.arcom = ArCOM().open(serialport, 115200) self.arcom.write_char(self.COM_HANDSHAKE) if self.arcom.read_uint8() != 217: raise Exception('Could not connect =( ')
[docs] def close(self): """ Closes the serial connection to the Rotary Encoder board. """ self.arcom.close()
def __pos_2_degrees(self, pos): return round(((float(pos)/512.0)*180.0)*10.0)/10.0 def __degrees_2_pos(self, degrees): return int(round((float(degrees)/180.0)*512.0, 0))
[docs] def enable_evt_transmission(self): """ Enables the transmission of threshold crossing events to the Bpod state machine. """ self.arcom.write_array([self.COM_TOGGLEEVTTRANSM, 1]) return self.arcom.read_uint8() == 1
[docs] def disable_evt_transmission(self): """ Disables the transmission of events. """ self.arcom.write_array([self.COM_TOGGLEEVTTRANSM, 0]) return self.arcom.read_uint8() == 1
[docs] def enable_module_outputstream(self): """ Enables the streaming of current position data directly to another Bpod module (e.g. DDS, AnalogOutput). """ self.arcom.write_array([self.COM_TOGGLEOUTPUTSTREAM, 1]) return self.arcom.read_uint8() == 1
[docs] def disable_module_outputstream(self): """ Disables the streaming of current position data directly to another Bpod module. """ self.arcom.write_array([self.COM_TOGGLEOUTPUTSTREAM, 0]) return self.arcom.read_uint8() == 1
[docs] def enable_stream(self): """ Enables the streaming of the position and the time measurements to the USB port. """ self.arcom.write_array([self.COM_TOGGLESTREAM, 1])
[docs] def disable_stream(self): """ Disables the streaming of the position and the time measurements to the USB port. """ self.arcom.write_array([self.COM_TOGGLESTREAM, 0])
[docs] def read_stream(self): """ Reads the data being streamed through the USB port. """ res = [] available = self.arcom.bytes_available() if available > 1: msg = self.arcom.read_bytes_array(available) while len(msg) >= 7: if msg[0] == b'P': position = int.from_bytes(b''.join(msg[1:3]), byteorder='little', signed=True) evt_time = float(int.from_bytes(b''.join(msg[3:7]), byteorder='little', signed=False))/1000.0 position_degrees = self.__pos_2_degrees(position) res.append(['P', evt_time, position_degrees]) elif msg[0] == b'E': origin = msg[1] event = msg[2] evt_time = float(int.from_bytes(b''.join(msg[3:7]), byteorder='little', signed=False))/1000.0 res.append(['E', evt_time, origin, event]) msg = msg[7:] return res
[docs] def current_position(self): """ Retrieves the current position. """ self.arcom.write_array([self.COM_GETCURRENTPOS]) data_in_bytes = b''.join(self.arcom.read_bytes_array(2)) ticks = int.from_bytes(data_in_bytes, byteorder='little', signed=True) return self.__pos_2_degrees(ticks)
[docs] def set_zero_position(self): """ Sets current rotary encoder position to zero. """ self.arcom.write_array([self.COM_SETZEROPOS])
[docs] def set_position(self, degrees): """ Sets the current position in degrees. :ivar int degrees: current position in degrees. """ ticks = self.__degrees_2_pos(degrees) data = ArduinoTypes.get_uint8_array([self.COM_SETPOS]) data += ticks.to_bytes(2, byteorder='little', signed=True) self.arcom.write_array(data) return self.arcom.read_uint8() == 1
[docs] def enable_thresholds(self, thresholds): """ Enables the thresholds. :ivar list(boolean) thresholds: list of 6 booleans indicating which thresholds are active to trigger events. """ if len(thresholds) != 8: raise Exception('Thresholds array has to be of length 8') string = ''.join(map(lambda x: str(int(x)), thresholds)) bits = int(string, 2) self.arcom.write_array([self.COM_ENABLETHRESHOLDS, bits])
[docs] def enable_logging(self): """ Enables the logging to the SD Card. """ self.arcom.write_array([self.COM_STARTLOGGING])
[docs] def disable_logging(self): """ Disables the logging to the SD Card. """ self.arcom.write_array([self.COM_STOPLOGGING])
[docs] def get_logged_data(self): """ Retrieves the logged data in the SD Card. """ self.arcom.write_array([self.COM_GETLOGDATA]) msg = self.arcom.read_bytes_array(4) n_logs = int.from_bytes(b''.join(msg), byteorder='little', signed=False) data = [] for i in range(0, n_logs): msg = self.arcom.read_bytes_array(8) data_in_bytes = b''.join(msg) position = int.from_bytes(data_in_bytes[:4], byteorder='little', signed=True) evt_time = float(int.from_bytes(data_in_bytes[4:], byteorder='little', signed=False))/1000.0 position_degrees = self.__pos_2_degrees(position) data.append((evt_time, position_degrees)) return data
[docs] def set_prefix(self, prefix): """ Sets 1-character prefix for module output stream. :ivar char prefix: One character to be used as prefix. """ self.arcom.write_array([self.COM_SETPREFIX, prefix]) return self.arcom.read_uint8() == 1
[docs] def set_thresholds(self, thresholds): """ Sets the thresholds values to trigger the events. :ivar list(int) thresholds: List, in maximum, of 6 thresholds to trigger events. """ data = ArduinoTypes.get_uint8_array([self.COM_SETTHRESHOLDS, len(thresholds)]) data += ArduinoTypes.get_uint16_array([self.__degrees_2_pos(thresh) for thresh in thresholds]) self.arcom.write_array(data) return self.arcom.read_uint8() == 1
[docs] def set_wrappoint(self, wrap_point): """ Sets wrap point (number of tics in a half-rotation) :ivar int wrap_point: number of tics in a half-rotation. """ ticks = self.__degrees_2_pos(wrap_point) self.arcom.write_array([self.COM_SETWRAPPOINT] + ArduinoTypes.get_uint16_array([ticks])) return self.arcom.read_uint8() == 1
if __name__ == '__main__': import time m = RotaryEncoderModule('/dev/ttyACM0') m.enable_logging() time.sleep(5) m.disable_logging() print(m.get_logged_data()) """ m.enable_stream() count = 0 while count<100 or True: data = m.read_stream() if len(data)==0: continue print(data) count += 1 m.disable_stream() print('set', m.set_position(179)) m.set_zero_position() m.enable_thresholds([True, False, True, True, False, False, True, True]) print(m.current_position()) print(m.get_logged_data()) """ m.close()