Source code for pybpod_soundcard_module.module_api

import array
import math
import time

import numpy as np
from enum import Enum, IntEnum
from aenum import auto
import os
import collections
import usb.core
import usb.util


[docs]class SampleRate(IntEnum): """ Enumeration for the Sample rate of the sounds in the Sound Card """ #: 96KHz sample rate _96000HZ = 96000 #: 192KHz sample rate _192000HZ = 192000
[docs]class DataType(IntEnum): """ Type of the data to be send to the Sound Card """ #: Integer 32 bits INT32 = 0, #: Single precision float FLOAT32 = 1
class SoundCardErrorCode(Enum): OK = 0, BAD_USER_INPUT = -1, HARP_SOUND_CARD_NOT_DETECTED = -1000, NOT_ABLE_TO_SEND_METADATA = auto(), NOT_ABLE_TO_READ_METADATA_COMMAND_REPLY = auto(), METADATA_COMMAND_REPLY_NOT_CORRECT = auto(), NOT_ABLE_TO_SEND_DATA = auto(), NOT_ABLE_TO_READ_DATA_COMMAND_REPLY = auto(), DATA_COMMAND_REPLY_NOT_CORRECT = auto(), NOT_ABLE_TO_SEND_READ_METADATA = auto(), NOT_ABLE_TO_READ_READ_METADATA_COMMAND_REPLY = auto(), READ_METADATA_COMMAND_REPLY_NOT_CORRECT = auto(), BAD_SOUND_INDEX = -1020, BAD_SOUND_LENGTH = auto(), BAD_SAMPLE_RATE = auto(), BAD_DATA_TYPE = auto(), DATA_TYPE_DO_NOT_MATCH = auto(), BAD_DATA_INDEX = auto(), PRODUCING_SOUND = -1030, STARTED_PRODUCING_SOUND = auto(), NOT_ABLE_TO_OPEN_FILE = -1040 class SoundMetadata(object): def __init__(self, sound_index, sound_length, sample_rate, data_type): """ :param self: :param sound_index: Sound index in the soundcard (2 -> 31 since 0 and 1 are reserved) :param sound_length: Sound length in number of samples :param sample_rate: Sample rate :param data_type: 0 for Int32 and 1 for Float32 (not available right now) """ self._sound_index = sound_index self._sound_length = sound_length self._sample_rate = sample_rate self._data_type = data_type def check_data(self): if self._sound_index < 2 or self._sound_index > 32: return SoundCardErrorCode.BAD_SOUND_INDEX if self._sound_length < 16: return SoundCardErrorCode.BAD_SOUND_LENGTH if self._sample_rate is not SampleRate._96000HZ and self._sample_rate is not SampleRate._192000HZ: return SoundCardErrorCode.BAD_SAMPLE_RATE if self._data_type is not DataType.INT32 and self._data_type is not DataType.FLOAT32: return SoundCardErrorCode.BAD_DATA_TYPE if self._sound_index == 0 and self._data_type is not DataType.FLOAT32: return SoundCardErrorCode.DATA_TYPE_DO_NOT_MATCH if self._sound_index == 1 and self._data_type is not DataType.FLOAT32: return SoundCardErrorCode.DATA_TYPE_DO_NOT_MATCH if self._sound_index > 1 and self._data_type is not DataType.INT32: return SoundCardErrorCode.DATA_TYPE_DO_NOT_MATCH return SoundCardErrorCode.OK def as_array(self): return np.array([self._sound_index, self._sound_length, self._sample_rate, self._data_type], dtype=np.int32)
[docs]class SoundCardModule(object): """ Provides access to the Harp Sound Card. It allows to send and read the sounds in the Sound Card, through a normal USB connection. """ def __init__(self, device=None): """ If a libUSB's device is given, it will try to open it. If none is given it will try to connect to the first Sound Card that is connected to the computer. :param device: (Optional) libUSB device to use. If nothing is passed, it will try to connect automatically. """ self._dev = None self._devices = list(usb.core.find(idVendor=0x04d8, idProduct=0xee6a, find_all=True)) self._cfg = None self._port = None self._connected = False self.open(device) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close()
[docs] def open(self, device=None): """ Opens the connection to the Sound Card. If no device is given, it will try to connect to the first Sound Card that is connected to the computer. :param device: (Optional) Already initialized libUSB's device to use. """ if device is None: self._dev = usb.core.find(idVendor=0x04d8, idProduct=0xee6a) else: self._dev = device if self._dev is None: print( "Unable to connect to the Sound Card through the USB port. You will be unable to send and receive sounds.") else: # set the active configuration. With no arguments, the first configuration will be the active one # note: some devices reset when setting an already selected configuration so we should check for it before self._cfg = self._dev.get_active_configuration() if self._cfg is None or self._cfg.bConfigurationValue != 1: self._dev.set_configuration(1) self._connected = True
@property def devices(self): self._devices = list(usb.core.find(idVendor=0x04d8, idProduct=0xee6a, find_all=True)) return self._devices @property def connected(self): return self._connected
[docs] def close(self): """ Closes the connection with the Sound Card. It will close USB connection (to read and save sounds) """ if self._dev: usb.util.dispose_resources(self._dev)
[docs] def reset(self): """ Resets the device, waits 700ms and tries to connect again so that the current instance of the SoundCard object can still be used. .. note:: Necessary at the moment after sending a sound. """ if not self._dev: raise Exception("Sound card might not be connected. Please connect it before any operation.") # Reset command length: 'c' 'm' 'd' '0x88' + 'f' reset_cmd = [ord('c'), ord('m'), ord('d'), 0x88, ord('f')] # cmd = 'cmd' + chr(0x88) + 'f' wrt = self._dev.write(1, reset_cmd, 100) assert wrt == len(reset_cmd) time.sleep(700.0 / 1000.0) self.open()
[docs] def read_sounds(self, output_folder=None, sound_index=None, clean_dst_folder=True): """ Reads sounds from the sound card. .. note:: by default, it will clear the destination folder of all data. It will also write by default to a "from_soundcard" folder in the working directory if none is given. :param output_folder: Destination folder's path. :param sound_index: If a sound_index is given, it will get only that sound, if nothing is passed it will gather all sounds from all indexes. :param clean_dst_folder: Flag that defines if the method should clean the destination folder or not """ if not self._dev: raise Exception("Sound card might not be connected. Please connect it before any operation.") # admit that if the output_folder is None, write inside a 'from_soundcard' folder in the current directory if not output_folder: output_folder = os.path.join(os.getcwd(), 'from_soundcard') if not os.path.isdir(output_folder): os.makedirs(output_folder) else: # create folder if it doesn't exists if not os.path.exists(output_folder): os.makedirs(output_folder) if clean_dst_folder: for file in os.listdir(output_folder): file_path = os.path.join(output_folder, file) try: if os.path.isfile(file_path): os.unlink(file_path) except Exception as e: # probably a permissions error while deleting, ignore and try the next one print("Error occurred when deleting file '{file_path}'. Ignoring error and continuing.".format(file_path=file_path)) continue if sound_index is None: for i in range(2, 32): self._from_soundcard(output_folder, i) else: self._from_soundcard(output_folder, sound_index) print("All files read!")
[docs] def send_sound(self, wave_int, sound_index, sample_rate, data_type, sound_filename=None, metadata_filename=None, description_filename=None): """ This method will send the sound to the Harp Sound Card as a byte array (int8) :param wave_int: NumPy array as int32 that represents the sound data :param sound_index: The destination index in the Sound Card (>=2 and <= 32) :param sample_rate: The SampleRate enum value for either 96KHz or 192KHz :param data_type: The DataType enum value for either Int32 or Float32 (not implemented yet in the hardware) :param sound_filename: The name of the sound filename to be saved with the sound in the board (str) :param metadata_filename: The name of the metadata filename to be saved with the sound in the board (str) :param description_filename: The name of the description filename to be saved with the sound in the board (str) """ self._to_soundcard(wave_int, sound_index, sample_rate, data_type, sound_filename, metadata_filename, description_filename)
def _from_soundcard(self, output_folder=None, sound_index=None): """ Reads sounds from the sound card. :param output_folder: Destination folder's path. :param sound_index: If a sound_index is given, it will get only that sound, if nothing is passed it will gather all sounds from all indexes. """ if not self._dev: raise Exception("Sound card might not be connected. Please connect it before any operation.") if sound_index is None or sound_index < 2 or sound_index > 31: raise Exception("sound_index must have a value between 2 and 31") metadata = self.__get_metadata_from_device(sound_index) if metadata is None: raise Exception('SoundCardModule: Error while getting metadata from device') # define prefix prefix = 'i' if sound_index < 9: prefix += '0' + str(sound_index) + '_' else: prefix += str(sound_index) + '_' sound_filename = metadata.sound_filename.decode('utf-8') metadata_filename = metadata.metadata_filename.decode('utf-8') if metadata.metadata_filename else None description_filename = metadata.description_filename.decode( 'utf-8') if metadata.description_filename else None if prefix not in sound_filename: sound_filename = prefix + sound_filename if metadata_filename and prefix not in metadata_filename: metadata_filename = prefix + metadata_filename if description_filename and prefix not in description_filename: description_filename = prefix + description_filename if metadata.has_sound: with open(os.path.join(output_folder, sound_filename), 'w', encoding='utf8') as f: # TODO: read the sound so we can write it here f.write('TODO') if metadata.has_metadata: with open(os.path.join(output_folder, metadata_filename), 'wb') as f: # clean the zeros at the end f.write(metadata.metadata_array.tobytes().strip(b'\0')) if metadata.has_description: with open(os.path.join(output_folder, description_filename), 'wb') as f: f.write(metadata.description.tobytes().strip(b'\0')) # create summary info file if metadata.has_sound: with open(os.path.join(output_folder, sound_filename + '.metadata.txt'), 'w') as f: f.write('SOUND_INDEX = ' + str(sound_index)) used_pos = math.ceil(metadata.sound_length / (33554432.0 * 2.0 / 32.0)) - 1 if used_pos > 0: f.write(", ") f.write(", ".join(str(sound_index + idx + 1) for idx in range(used_pos))) f.write("\n") f.write("TOTAL_SAMPLES = " + str(metadata.sound_length) + "\n") f.write( "TOTAL_LENGTH_MS = " + str(int(metadata.sound_length / 2 / metadata.sample_rate * 1000)) + "\n") f.write("SAMPLE_RATE = " + str(metadata.sample_rate) + "\n") if metadata.data_type == 0: f.write("DATA_TYPE = Int32\n") else: f.write("DATA_TYPE = Float32\n") f.write("SOUND_FILENAME = " + sound_filename + "\n") if metadata.has_metadata: f.write("USER_METADATA_FILENAME = " + metadata_filename + "\n") if metadata.has_description: f.write("USER_DESCRIPTION_FILENAME = " + description_filename + "\n") def _to_soundcard(self, wave_int, sound_index, sample_rate, data_type, sound_filename=None, metadata_filename=None, description_filename=None): """ This method will send the sound to the Harp Sound Card as a byte array (int8) :param wave_int: NumPy array as int32 that represents the sound data :param sound_index: The destination index in the Sound Card (>=2 and <= 32) :param sample_rate: The SampleRate enum value for either 96KHz or 192KHz :param data_type: The DataType enum value for either Int32 or Float32 (not implemented yet in the hardware) :param sound_filename: The name of the sound filename to be saved with the sound in the board (str) :param metadata_filename: The name of the metadata filename to be saved with the sound in the board (str) :param description_filename: The name of the description filename to be saved with the sound in the board (str) """ # confirm that the dev exists and is ready if not self._dev: raise EnvironmentError( 'Sound card not initialized. Please call the initialize method before any operation.') int32_size = np.dtype(np.int32).itemsize # work with a int8 view of the wave_int (which is int32) wave_int8 = wave_int.view(np.int8) # get number of commands to send sound_file_size_in_samples = len(wave_int8) // 4 commands_to_send = int(sound_file_size_in_samples * 4 // 32768 + ( 1 if ((sound_file_size_in_samples * 4) % 32768) is not 0 else 0)) # Metadata command length: 'c' 'm' 'd' '0x80' + random + metadata + 32768 + 2048 + 'f' metadata_cmd_header_size = 4 + int32_size + (4 * int32_size) metadata_cmd = np.zeros(metadata_cmd_header_size + 32768 + 2048 + 1, dtype=np.int8) metadata_cmd[0] = ord('c') metadata_cmd[1] = ord('m') metadata_cmd[2] = ord('d') metadata_cmd[3] = 0x80 metadata_cmd[-1] = ord('f') rand_val = np.random.randint(-32768, 32768, size=1, dtype=np.int32) # copy that random data metadata_cmd[4: 4 + int32_size] = rand_val.view(np.int8) # create metadata info and add it to the metadata_cmd metadata = SoundMetadata(sound_index, sound_file_size_in_samples, sample_rate, data_type) if metadata.check_data() is not SoundCardErrorCode.OK: print("Input data incorrect, please correct it before proceeding.") return metadata_cmd[8: 8 + (4 * int32_size)] = metadata.as_array().view(np.int8) # add first data block of data to the metadata_cmd metadata_cmd_data_index = metadata_cmd_header_size metadata_cmd[metadata_cmd_data_index: metadata_cmd_data_index + 32768] = wave_int8[0: 32768] # prepare user_metadata # [0:169] sound_filename # [170:339] metadata_filename # [340:511] description_filename # [512:1535] metadata_filename content # [1536:2047] description_filename content user_metadata = np.zeros(2048, dtype=np.int8) user_metadata_index = metadata_cmd_data_index + 32768 if sound_filename: tmp = bytearray() tmp.extend(map(ord, os.path.basename(sound_filename))) tmp_size = len(tmp) if len(tmp) < 169 else 169 user_metadata[0:tmp_size] = tmp[0:tmp_size] if metadata_filename: tmp = bytearray() tmp.extend(map(ord, os.path.basename(metadata_filename))) tmp_size = len(tmp) if len(tmp) < 169 else 169 user_metadata[170: 170 + tmp_size] = tmp[0:tmp_size] # get file contents, truncate data if required try: with open(metadata_filename, 'r', encoding='utf8') as f: text = f.read() text_tmp = bytearray() text_tmp.extend(map(ord, text)) data_tmp = np.array(text_tmp) data = data_tmp.view(np.int8) data_size = len(data) if len(data) < 1023 else 1023 user_metadata[512: 512 + data_size] = data[0: data_size] except OSError as e: # TODO: should be a stronger error print("Error opening metadata file.") if description_filename: tmp = bytearray() tmp.extend(map(ord, os.path.basename(description_filename))) tmp_size = len(tmp) if len(tmp) < 169 else 169 user_metadata[340: 340 + tmp_size] = tmp[0: tmp_size] # get file contents, truncate data if required try: with open(description_filename, 'r', encoding='utf8') as f: text = f.read() text_tmp = bytearray() text_tmp.extend(map(ord, text)) data_tmp = np.array(text_tmp) data = data_tmp.view(np.int8) data_size = len(data) if len(data) < 511 else 511 user_metadata[1536: 1536 + data_size] = data[0: data_size] except OSError as e: print(e) # TODO: should be a stronger error print("Error opening description file.") # add user metadata (2048 bytes) to metadata_cmd metadata_cmd[user_metadata_index: user_metadata_index + 2048] = user_metadata # Metadata command reply: 'c' 'm' 'd' '0x80' + random + error metadata_cmd_reply = array.array('b', [0] * (4 + int32_size + int32_size)) # send metadata_cmd and get it's reply try: res_write = self._dev.write(0x01, metadata_cmd.tobytes(), 100) except usb.core.USBError as e: # TODO: we probably should try again print("something went wrong while writing to the device") return assert res_write == len(metadata_cmd) try: ret = self._dev.read(0x81, metadata_cmd_reply, 100) except usb.core.USBError as e: # TODO: we probably should try again print("something went wrong while reading from the device") return # get the random received and the error received from the reply command rand_val_received = int.from_bytes(metadata_cmd_reply[4: 4 + int32_size], byteorder='little', signed=True) error_received = int.from_bytes(metadata_cmd_reply[8: 8 + int32_size], byteorder='little', signed=False) assert rand_val_received == rand_val[0] assert error_received == 0 # prepare command to send and to receive # Data command length: 'c' 'm' 'd' '0x81' + random + dataIndex + 32768 + 'f' data_cmd = np.zeros(4 + int32_size + int32_size + 32768 + 1, dtype=np.int8) data_cmd_data_index = 4 + int32_size + int32_size data_cmd[0] = ord('c') data_cmd[1] = ord('m') data_cmd[2] = ord('d') data_cmd[3] = 0x81 data_cmd[-1] = ord('f') # Data command reply: 'c' 'm' 'd' '0x81' + random + error data_cmd_reply = array.array('b', [0] * (4 + int32_size + int32_size)) # loop to send the rest of the commands # check reply for each command sent for i in range(1, commands_to_send): # it has to be as an np.array of int32 so that we can get a view as int8s rand_val = np.random.randint(-32768, 32768, size=1, dtype=np.int32) # copy that random data data_cmd[4: 4 + int32_size] = rand_val.view(np.int8) # write dataIndex to the data_cmd (2 ints size) data_cmd[8: 8 + int32_size] = np.array([i], dtype=np.int32).view(np.int8) # write data from wave_int to cmd wave_idx = i * 32768 data_block = wave_int8[wave_idx: wave_idx + 32768] data_cmd[data_cmd_data_index: data_cmd_data_index + len(data_block)] = data_block # send data to device try: res_write = self._dev.write(0x01, data_cmd.tobytes(), 100) except usb.core.USBError as e: # TODO: we probably should try again print("something went wrong while writing to the device") return # TODO: we probably should try again assert res_write == len(data_cmd) try: ret = self._dev.read(0x81, data_cmd_reply, 100) except usb.core.USBError as e: # TODO: we probably should try again print("something went wrong while reading from the device") return # get the random received and the error received from the reply command rand_val_received = int.from_bytes(data_cmd_reply[4: 4 + int32_size], byteorder='little', signed=True) error_received = int.from_bytes(data_cmd_reply[8: 8 + int32_size], byteorder='little', signed=False) assert rand_val_received == rand_val[0] assert error_received == 0 def __get_metadata_from_device(self, sound_index): int32_size = np.dtype(np.int32).itemsize # Read metadata command length: 'c' 'm' 'd' '0x84' + random + soundIndex + 'f' read_metadata_cmd = np.zeros(4 + int32_size + int32_size + 1, dtype=np.int8) read_metadata_cmd[0] = ord('c') read_metadata_cmd[1] = ord('m') read_metadata_cmd[2] = ord('d') read_metadata_cmd[3] = 0x84 read_metadata_cmd[-1] = ord('f') rand_val = np.random.randint(-32768, 32768, size=1, dtype=np.int32) # copy that random data read_metadata_cmd[4: 4 + int32_size] = rand_val.view(np.int8) read_metadata_cmd[8: 8 + int32_size] = np.array([sound_index], dtype=np.int32).view(np.int8) # prepare to send command and receive the reply read_reply_cmd = array.array('b', [0] * (4 + 6 * int32_size + 2048)) try: res_write = self._dev.write(0x01, read_metadata_cmd.tobytes(), 100) except usb.core.USBError as e: # TODO: we probably should try again print("something went wrong while writing to the device") return assert res_write == len(read_metadata_cmd) try: ret = self._dev.read(0x81, read_reply_cmd, 100) except usb.core.USBError as e: # TODO: we probably should try again print("something went wrong while reading from the device") return metadata = collections.namedtuple('Metadata', ['metadata_array', 'description', 'bit_mask', 'sound_length', 'data_type', 'sample_rate', 'sound_filename', 'metadata_filename', 'description_filename', 'has_sound', 'has_metadata', 'has_description']) # get data from the reply array metadata.metadata_array = array.array('b', [0] * 1024) metadata.description = array.array('b', [0] * 512) # get the random received and the error received from the reply command rand_val_received = int.from_bytes(read_reply_cmd[4: 4 + int32_size], byteorder='little', signed=True) error_received = int.from_bytes(read_reply_cmd[8: 8 + int32_size], byteorder='little', signed=False) assert rand_val_received == rand_val[0] assert error_received == 0 # bitmask metadata.bit_mask = int.from_bytes(read_reply_cmd[12:12 + int32_size + int32_size], byteorder='little', signed=True) metadata.has_sound = metadata.bit_mask & (1 << sound_index) == (1 << sound_index) metadata.sound_length = int.from_bytes(read_reply_cmd[16:16 + int32_size], byteorder='little', signed=True) metadata.sample_rate = int.from_bytes(read_reply_cmd[20:20 + int32_size], byteorder='little', signed=True) metadata.data_type = int.from_bytes(read_reply_cmd[24:24 + int32_size], byteorder='little', signed=True) metadata.sound_filename = read_reply_cmd[28:170].tobytes().strip(b'\0') metadata.has_metadata = False metadata.metadata_filename = '' if read_reply_cmd[28 + 170]: metadata.has_metadata = True metadata.metadata_array[0:1024] = read_reply_cmd[28 + 512:28 + 512 + 1024] metadata.metadata_filename = read_reply_cmd[28 + 170: 28 + 170 + 170].tobytes().strip(b'\0') metadata.has_description = False metadata.description_filename = '' if read_reply_cmd[28 + 170 + 170]: metadata.has_description = True metadata.description[0:512] = read_reply_cmd[28 + 512 + 1024:28 + 512 + 1024 + 512] metadata.description_filename = read_reply_cmd[28 + 170 + 170: 28 + 170 + 170 + 170].tobytes().strip(b'\0') return metadata