from pybpodapi.com.arcom import ArCOM, ArduinoTypes
[docs]class RotaryEncoderModule(object):
COM_HANDSHAKE = 'C'
COM_TOGGLEEVTTRANSM = ord('V')
COM_TOGGLESTREAM = ord('S')
COM_STARTLOGGING = ord('L')
COM_STOPLOGGING = ord('F')
COM_GETLOGDATA = ord('R')
COM_GETCURRENTPOS = ord('Q')
COM_SETZEROPOS = ord('Z')
COM_SETPOS = ord('P')
COM_ENABLETHRESHOLDS = ord(';')
COM_SETPREFIX = ord('I')
COM_SETTHRESHOLDS = ord('T')
COM_SETWRAPPOINT = ord('W')
def __init__(self, serialport=None):
"""
Constructer of the RotaryEncoderModule object
A a serial connection to the Rotary Encoder board is oppened at the construction of the object.
:ivar str serialport: PC serial port where the module is connect
"""
if serialport: self.open(serialport)
[docs] def open(self, serialport):
"""
Opens a serial connection to the Rotary Encoder board.
:ivar str serialport: PC serial port where the module is connect
"""
self.arcom = ArCOM().open(serialport, 115200)
self.arcom.write_char(self.COM_HANDSHAKE)
if self.arcom.read_uint8() != 217:
raise Exception('Could not connect =( ')
[docs] def close(self):
"""
Closes the serial connection to the Rotary Encoder board.
"""
self.arcom.close()
def __pos_2_degrees(self, pos): return round(((float(pos)/512.0)*180.0)*10.0)/10.0;
def __degrees_2_pos(self, degrees): return int(round( (float(degrees)/180.0)*512.0, 0) );
[docs] def enable_evt_transmission(self):
"""
Enable the transmission of the events.
"""
self.arcom.write_array([self.COM_TOGGLEEVTTRANSM, 1])
return self.arcom.read_uint8()==1
[docs] def disable_evt_transmission(self):
"""
Disable the transmission of the events.
"""
self.arcom.write_array([self.COM_TOGGLEEVTTRANSM, 0])
return self.arcom.read_uint8()==1
[docs] def enable_stream(self):
"""
Enable the streaming of the position and the time measurements to the USB port.
"""
self.arcom.write_array([self.COM_TOGGLESTREAM, 1])
[docs] def disable_stream(self):
"""
Disable the streaming of the position and the time measurements to the USB port.
"""
self.arcom.write_array([self.COM_TOGGLESTREAM, 0])
[docs] def read_stream(self):
"""
Read the data being streamed through the USB port.
"""
res = []
available = self.arcom.bytes_available()
if available>1:
msg = self.arcom.read_bytes_array(available)
while len(msg)>=7:
if msg[0]==b'P':
position = int.from_bytes( b''.join(msg[1:3]), byteorder='little', signed=True)
evt_time = float(int.from_bytes( b''.join(msg[3:7]), byteorder='little', signed=False))/1000.0
position_degrees = self.__pos_2_degrees(position)
res.append(['P',evt_time, position_degrees])
elif msg[0]==b'E':
origin = msg[1]
event = msg[2]
evt_time = float(int.from_bytes( b''.join(msg[3:7]), byteorder='little', signed=False))/1000.0
res.append(['E',evt_time, origin, event])
msg = msg[7:]
return res
[docs] def enable_logging(self):
"""
Enable the logging to the SD Card.
"""
self.arcom.write_array([self.COM_STARTLOGGING])
[docs] def disable_logging(self):
"""
Disable the logging to the SD Card.
"""
self.arcom.write_array([self.COM_STOPLOGGING])
[docs] def get_logged_data(self):
"""
Retreave the logged data in the SD Card.
"""
self.arcom.write_array([self.COM_GETLOGDATA])
msg = self.arcom.read_bytes_array(4)
n_logs = int.from_bytes( b''.join(msg), byteorder='little', signed=False)
data = []
for i in range(0, n_logs):
msg = self.arcom.read_bytes_array(8)
data_in_bytes = b''.join(msg)
position = int.from_bytes( data_in_bytes[:4], byteorder='little', signed=True)
evt_time = float(int.from_bytes( data_in_bytes[4:], byteorder='little', signed=False))/1000.0
position_degrees = self.__pos_2_degrees(position)
data.append((evt_time, position_degrees))
return data
[docs] def current_position(self):
"""
Retreave the current position.
"""
self.arcom.write_array([self.COM_GETCURRENTPOS])
data_in_bytes = b''.join(self.arcom.read_bytes_array(2))
ticks = int.from_bytes( data_in_bytes, byteorder='little', signed=True)
return self.__pos_2_degrees(ticks)
[docs] def set_zero_position(self):
"""
Set current rotary encoder position to zero.
"""
self.arcom.write_array([self.COM_SETZEROPOS])
[docs] def set_prefix(self, prefix):
"""
:ivar char prefix: One character to be used as prefix.
Set 1-character prefix for module output stream.
"""
self.arcom.write_array([self.COM_SETPREFIX, prefix])
return self.arcom.read_uint8()==1
[docs] def set_thresholds(self, thresholds):
"""
Set the thresholds values to trigger the events.
:ivar list(int) thresholds: List, in maximum, of 6 thresholds to trigger events.
"""
data = ArduinoTypes.get_uint8_array([self.COM_SETTHRESHOLDS, len(thresholds) ])
data += ArduinoTypes.get_uint16_array([ self.__degrees_2_pos(thresh) for thresh in thresholds])
self.arcom.write_array(data )
return self.arcom.read_uint8()==1
[docs] def set_position(self, degrees):
"""
Set the current position in degrees.
:ivar int degrees: current position in degrees.
"""
ticks = self.__degrees_2_pos(degrees)
data = ArduinoTypes.get_uint8_array([self.COM_SETPOS])
data += ticks.to_bytes(2, byteorder='little', signed=True)
self.arcom.write_array(data)
return self.arcom.read_uint8()==1
[docs] def set_wrappoint(self, wrap_point):
"""
Set wrap point (number of tics in a half-rotation)
:ivar int wrap_point: number of tics in a half-rotation.
"""
ticks = self.__degrees_2_pos(wrap_point)
self.arcom.write_array([self.COM_SETWRAPPOINT]+ ArduinoTypes.get_uint16_array([ticks]) )
return self.arcom.read_uint8()==1
[docs] def enable_thresholds(self, thresholds):
"""
Enable the thresholds.
:ivar list(boolean) thresholds: list of 6 booleans indicating which thresholds are active to trigger events.
"""
if len(thresholds)!=8: raise Exception('Thresholds array has to be of length 8')
string = ''.join(map(lambda x: str(int(x)), thresholds))
bits = int(string, 2)
self.arcom.write_array([self.COM_ENABLETHRESHOLDS, bits])
if __name__=='__main__':
import time
m = RotaryEncoderModule('/dev/ttyACM0')
m.enable_logging()
time.sleep(5)
m.disable_logging()
print(m.get_logged_data())
"""
m.enable_stream()
count = 0
while count<100 or True:
data = m.read_stream()
if len(data)==0: continue
print(data)
count += 1
m.disable_stream()
print('set', m.set_position(179))
m.set_zero_position()
m.enable_thresholds([True, False, True, True, False, False, True, True])
print(m.current_position())
print(m.get_logged_data())
"""
m.close()