Source code for pybpod_soundcard_module.module_api

import array
import math
import time

import numpy as np
from enum import Enum, IntEnum
from aenum import auto
import os
import collections
import usb.core
import usb.util
from usb.backend import libusb1 as libusb

[docs]class SampleRate(IntEnum): """ Enumeration for the Sample rate of the sounds in the Sound Card """ #: 96KHz sample rate _96000HZ = 96000 #: 192KHz sample rate _192000HZ = 192000
[docs]class DataType(IntEnum): """ Type of the data to be send to the Sound Card """ #: Integer 32 bits INT32 = 0, #: Single precision float FLOAT32 = 1
class SoundCardErrorCode(Enum): OK = 0, BAD_USER_INPUT = -1, HARP_SOUND_CARD_NOT_DETECTED = -1000, NOT_ABLE_TO_SEND_METADATA = auto(), NOT_ABLE_TO_READ_METADATA_COMMAND_REPLY = auto(), METADATA_COMMAND_REPLY_NOT_CORRECT = auto(), NOT_ABLE_TO_SEND_DATA = auto(), NOT_ABLE_TO_READ_DATA_COMMAND_REPLY = auto(), DATA_COMMAND_REPLY_NOT_CORRECT = auto(), NOT_ABLE_TO_SEND_READ_METADATA = auto(), NOT_ABLE_TO_READ_READ_METADATA_COMMAND_REPLY = auto(), READ_METADATA_COMMAND_REPLY_NOT_CORRECT = auto(), BAD_SOUND_INDEX = -1020, BAD_SOUND_LENGTH = auto(), BAD_SAMPLE_RATE = auto(), BAD_DATA_TYPE = auto(), DATA_TYPE_DO_NOT_MATCH = auto(), BAD_DATA_INDEX = auto(), PRODUCING_SOUND = -1030, STARTED_PRODUCING_SOUND = auto(), NOT_ABLE_TO_OPEN_FILE = -1040 class SoundMetadata(object): def __init__(self, sound_index, sound_length, sample_rate, data_type): """ :param self: :param sound_index: Sound index in the soundcard (2 -> 31 since 0 and 1 are reserved) :param sound_length: Sound length in number of samples :param sample_rate: Sample rate :param data_type: 0 for Int32 and 1 for Float32 (not available right now) """ self._sound_index = sound_index self._sound_length = sound_length self._sample_rate = sample_rate self._data_type = data_type def check_data(self): if self._sound_index < 2 or self._sound_index > 32: return SoundCardErrorCode.BAD_SOUND_INDEX if self._sound_length < 16: return SoundCardErrorCode.BAD_SOUND_LENGTH if self._sample_rate is not SampleRate._96000HZ and self._sample_rate is not SampleRate._192000HZ: return SoundCardErrorCode.BAD_SAMPLE_RATE if self._data_type is not DataType.INT32 and self._data_type is not DataType.FLOAT32: return SoundCardErrorCode.BAD_DATA_TYPE if self._sound_index == 0 and self._data_type is not DataType.FLOAT32: return SoundCardErrorCode.DATA_TYPE_DO_NOT_MATCH if self._sound_index == 1 and self._data_type is not DataType.FLOAT32: return SoundCardErrorCode.DATA_TYPE_DO_NOT_MATCH if self._sound_index > 1 and self._data_type is not DataType.INT32: return SoundCardErrorCode.DATA_TYPE_DO_NOT_MATCH return SoundCardErrorCode.OK def as_array(self): return np.array([self._sound_index, self._sound_length, self._sample_rate, self._data_type], dtype=np.int32)
[docs]class SoundCardModule(object): """ Provides access to the Harp Sound Card. It allows to send and read the sounds in the Sound Card, through a normal USB connection. """ def __init__(self, device=None): """ If a libUSB's device is given, it will try to open it. If none is given it will try to connect to the first Sound Card that is connected to the computer. :param device: (Optional) libUSB device to use. If nothing is passed, it will try to connect automatically. """ self._backend = libusb.get_backend() try: self._devices = list(usb.core.find(backend=self._backend, idVendor=0x04d8, idProduct=0xee6a, find_all=True)) except OSError as e: pass self._dev = self._devices[0] if self._devices else None self._cfg = None self._port = None self._connected = False self.open(self._dev if device is None else device) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close()
[docs] def open(self, device=None): """ Opens the connection to the Sound Card. If no device is given, it will try to connect to the first Sound Card that is connected to the computer. :param device: (Optional) Already initialized libUSB's device to use. """ if device is None: self._backend = libusb.get_backend() try: self._dev = usb.core.find(backend=self._backend, idVendor=0x04d8, idProduct=0xee6a) except OSError as e: self._dev = None pass else: self._dev = device if self._dev is None: print( "Unable to connect to the Sound Card through the USB port. You will be unable to send and receive sounds.") else: # set the active configuration. With no arguments, the first configuration will be the active one # note: some devices reset when setting an already selected configuration so we should check for it before self._cfg = self._dev.get_active_configuration() if self._cfg is None or self._cfg.bConfigurationValue != 1: self._dev.set_configuration(1) self._connected = True if self._dev else False
@property def devices(self): return self._devices @property def connected(self): return self._connected
[docs] def close(self): """ Closes the connection with the Sound Card. It will close USB connection (to read and save sounds) """ if self._dev: usb.util.dispose_resources(self._dev)
[docs] def reset(self): """ Resets the device, waits 700ms and tries to connect again so that the current instance of the SoundCard object can still be used. .. note:: Necessary at the moment after sending a sound. """ if not self._dev: raise Exception("Sound card might not be connected. Please connect it before any operation.") # Reset command length: 'c' 'm' 'd' '0x88' + 'f' reset_cmd = [ord('c'), ord('m'), ord('d'), 0x88, ord('f')] # cmd = 'cmd' + chr(0x88) + 'f' wrt = self._dev.write(1, reset_cmd, 100) assert wrt == len(reset_cmd) time.sleep(700.0 / 1000.0) self.open()
[docs] def read_sounds(self, output_folder=None, sound_index=None, clean_dst_folder=True): """ Reads sounds from the sound card. .. note:: by default, it will clear the destination folder of all data. It will also write by default to a "from_soundcard" folder in the working directory if none is given. :param output_folder: Destination folder's path. :param sound_index: If a sound_index is given, it will get only that sound, if nothing is passed it will gather all sounds from all indexes. :param clean_dst_folder: Flag that defines if the method should clean the destination folder or not """ if not self._dev: raise Exception("Sound card might not be connected. Please connect it before any operation.") # admit that if the output_folder is None, write inside a 'from_soundcard' folder in the current directory if not output_folder: output_folder = os.path.join(os.getcwd(), 'from_soundcard') if not os.path.isdir(output_folder): os.makedirs(output_folder) else: # create folder if it doesn't exists if not os.path.exists(output_folder): os.makedirs(output_folder) if clean_dst_folder: for file in os.listdir(output_folder): file_path = os.path.join(output_folder, file) try: if os.path.isfile(file_path): os.unlink(file_path) except Exception as e: # probably a permissions error while deleting, ignore and try the next one print("Error occurred when deleting file '{file_path}'. Ignoring error and continuing.".format(file_path=file_path)) continue if sound_index is None: for i in range(2, 32): self._from_soundcard(output_folder, i) else: self._from_soundcard(output_folder, sound_index) print("All files read!")
[docs] def send_sound(self, wave_int, sound_index, sample_rate, data_type, sound_filename=None, metadata_filename=None, description_filename=None): """ This method will send the sound to the Harp Sound Card as a byte array (int8) :param wave_int: NumPy array as int32 that represents the sound data :param sound_index: The destination index in the Sound Card (>=2 and <= 32) :param sample_rate: The SampleRate enum value for either 96KHz or 192KHz :param data_type: The DataType enum value for either Int32 or Float32 (not implemented yet in the hardware) :param sound_filename: The name of the sound filename to be saved with the sound in the board (str) :param metadata_filename: The name of the metadata filename to be saved with the sound in the board (str) :param description_filename: The name of the description filename to be saved with the sound in the board (str) """ self._to_soundcard(wave_int, sound_index, sample_rate, data_type, sound_filename, metadata_filename, description_filename)
def _from_soundcard(self, output_folder=None, sound_index=None): """ Reads sounds from the sound card. :param output_folder: Destination folder's path. :param sound_index: If a sound_index is given, it will get only that sound, if nothing is passed it will gather all sounds from all indexes. """ if not self._dev: raise Exception("Sound card might not be connected. Please connect it before any operation.") if sound_index is None or sound_index < 2 or sound_index > 31: raise Exception("sound_index must have a value between 2 and 31") metadata = self.__get_metadata_from_device(sound_index) if metadata is None: raise Exception('SoundCardModule: Error while getting metadata from device') # define prefix prefix = 'i' if sound_index < 9: prefix += '0' + str(sound_index) + '_' else: prefix += str(sound_index) + '_' sound_filename = metadata.sound_filename.decode('utf-8') metadata_filename = metadata.metadata_filename.decode('utf-8') if metadata.metadata_filename else None description_filename = metadata.description_filename.decode( 'utf-8') if metadata.description_filename else None if prefix not in sound_filename: sound_filename = prefix + sound_filename if metadata_filename and prefix not in metadata_filename: metadata_filename = prefix + metadata_filename if description_filename and prefix not in description_filename: description_filename = prefix + description_filename if metadata.has_sound: with open(os.path.join(output_folder, sound_filename), 'w', encoding='utf8') as f: # TODO: read the sound so we can write it here f.write('TODO') if metadata.has_metadata: with open(os.path.join(output_folder, metadata_filename), 'wb') as f: # clean the zeros at the end f.write(metadata.metadata_array.tobytes().strip(b'\0')) if metadata.has_description: with open(os.path.join(output_folder, description_filename), 'wb') as f: f.write(metadata.description.tobytes().strip(b'\0')) # create summary info file if metadata.has_sound: with open(os.path.join(output_folder, sound_filename + '.metadata.txt'), 'w') as f: f.write('SOUND_INDEX = ' + str(sound_index)) used_pos = math.ceil(metadata.sound_length / (33554432.0 * 2.0 / 32.0)) - 1 if used_pos > 0: f.write(", ") f.write(", ".join(str(sound_index + idx + 1) for idx in range(used_pos))) f.write("\n") f.write("TOTAL_SAMPLES = " + str(metadata.sound_length) + "\n") f.write( "TOTAL_LENGTH_MS = " + str(int(metadata.sound_length / 2 / metadata.sample_rate * 1000)) + "\n") f.write("SAMPLE_RATE = " + str(metadata.sample_rate) + "\n") if metadata.data_type == 0: f.write("DATA_TYPE = Int32\n") else: f.write("DATA_TYPE = Float32\n") f.write("SOUND_FILENAME = " + sound_filename + "\n") if metadata.has_metadata: f.write("USER_METADATA_FILENAME = " + metadata_filename + "\n") if metadata.has_description: f.write("USER_DESCRIPTION_FILENAME = " + description_filename + "\n") def _to_soundcard(self, wave_int, sound_index, sample_rate, data_type, sound_filename=None, metadata_filename=None, description_filename=None): """ This method will send the sound to the Harp Sound Card as a byte array (int8) :param wave_int: NumPy array as int32 that represents the sound data :param sound_index: The destination index in the Sound Card (>=2 and <= 32) :param sample_rate: The SampleRate enum value for either 96KHz or 192KHz :param data_type: The DataType enum value for either Int32 or Float32 (not implemented yet in the hardware) :param sound_filename: The name of the sound filename to be saved with the sound in the board (str) :param metadata_filename: The name of the metadata filename to be saved with the sound in the board (str) :param description_filename: The name of the description filename to be saved with the sound in the board (str) """ # confirm that the dev exists and is ready if not self._dev: raise EnvironmentError( 'Sound card not initialized. Please call the initialize method before any operation.') int32_size = np.dtype(np.int32).itemsize # work with a int8 view of the wave_int (which is int32) wave_int8 = wave_int.view(np.int8) # get number of commands to send sound_file_size_in_samples = len(wave_int8) // 4 commands_to_send = int(sound_file_size_in_samples * 4 // 32768 + ( 1 if ((sound_file_size_in_samples * 4) % 32768) is not 0 else 0)) # Metadata command length: 'c' 'm' 'd' '0x80' + random + metadata + 32768 + 2048 + 'f' metadata_cmd_header_size = 4 + int32_size + (4 * int32_size) metadata_cmd = np.zeros(metadata_cmd_header_size + 32768 + 2048 + 1, dtype=np.int8) metadata_cmd[0] = ord('c') metadata_cmd[1] = ord('m') metadata_cmd[2] = ord('d') metadata_cmd[3] = 0x80 metadata_cmd[-1] = ord('f') rand_val = np.random.randint(-32768, 32768, size=1, dtype=np.int32) # copy that random data metadata_cmd[4: 4 + int32_size] = rand_val.view(np.int8) # create metadata info and add it to the metadata_cmd metadata = SoundMetadata(sound_index, sound_file_size_in_samples, sample_rate, data_type) if metadata.check_data() is not SoundCardErrorCode.OK: print("Input data incorrect, please correct it before proceeding.") return metadata_cmd[8: 8 + (4 * int32_size)] = metadata.as_array().view(np.int8) # add first data block of data to the metadata_cmd metadata_cmd_data_index = metadata_cmd_header_size metadata_cmd[metadata_cmd_data_index: metadata_cmd_data_index + 32768] = wave_int8[0: 32768] # prepare user_metadata # [0:169] sound_filename # [170:339] metadata_filename # [340:511] description_filename # [512:1535] metadata_filename content # [1536:2047] description_filename content user_metadata = np.zeros(2048, dtype=np.int8) user_metadata_index = metadata_cmd_data_index + 32768 if sound_filename: tmp = bytearray() tmp.extend(map(ord, os.path.basename(sound_filename))) tmp_size = len(tmp) if len(tmp) < 169 else 169 user_metadata[0:tmp_size] = tmp[0:tmp_size] if metadata_filename: tmp = bytearray() tmp.extend(map(ord, os.path.basename(metadata_filename))) tmp_size = len(tmp) if len(tmp) < 169 else 169 user_metadata[170: 170 + tmp_size] = tmp[0:tmp_size] # get file contents, truncate data if required try: with open(metadata_filename, 'r', encoding='utf8') as f: text = f.read() text_tmp = bytearray() text_tmp.extend(map(ord, text)) data_tmp = np.array(text_tmp) data = data_tmp.view(np.int8) data_size = len(data) if len(data) < 1023 else 1023 user_metadata[512: 512 + data_size] = data[0: data_size] except OSError as e: # TODO: should be a stronger error print("Error opening metadata file.") if description_filename: tmp = bytearray() tmp.extend(map(ord, os.path.basename(description_filename))) tmp_size = len(tmp) if len(tmp) < 169 else 169 user_metadata[340: 340 + tmp_size] = tmp[0: tmp_size] # get file contents, truncate data if required try: with open(description_filename, 'r', encoding='utf8') as f: text = f.read() text_tmp = bytearray() text_tmp.extend(map(ord, text)) data_tmp = np.array(text_tmp) data = data_tmp.view(np.int8) data_size = len(data) if len(data) < 511 else 511 user_metadata[1536: 1536 + data_size] = data[0: data_size] except OSError as e: print(e) # TODO: should be a stronger error print("Error opening description file.") # add user metadata (2048 bytes) to metadata_cmd metadata_cmd[user_metadata_index: user_metadata_index + 2048] = user_metadata # Metadata command reply: 'c' 'm' 'd' '0x80' + random + error metadata_cmd_reply = array.array('b', [0] * (4 + int32_size + int32_size)) # send metadata_cmd and get it's reply try: res_write = self._dev.write(0x01, metadata_cmd.tobytes(), 100) except usb.core.USBError as e: # TODO: we probably should try again print("something went wrong while writing to the device") return assert res_write == len(metadata_cmd) try: ret = self._dev.read(0x81, metadata_cmd_reply, 1000) except usb.core.USBError as e: # TODO: we probably should try again print("something went wrong while reading from the device") return # get the random received and the error received from the reply command rand_val_received = int.from_bytes(metadata_cmd_reply[4: 4 + int32_size], byteorder='little', signed=True) error_received = int.from_bytes(metadata_cmd_reply[8: 8 + int32_size], byteorder='little', signed=False) assert rand_val_received == rand_val[0] assert error_received == 0 # prepare command to send and to receive # Data command length: 'c' 'm' 'd' '0x81' + random + dataIndex + 32768 + 'f' data_cmd = np.zeros(4 + int32_size + int32_size + 32768 + 1, dtype=np.int8) data_cmd_data_index = 4 + int32_size + int32_size data_cmd[0] = ord('c') data_cmd[1] = ord('m') data_cmd[2] = ord('d') data_cmd[3] = 0x81 data_cmd[-1] = ord('f') # Data command reply: 'c' 'm' 'd' '0x81' + random + error data_cmd_reply = array.array('b', [0] * (4 + int32_size + int32_size)) # loop to send the rest of the commands # check reply for each command sent for i in range(1, commands_to_send): # it has to be as an np.array of int32 so that we can get a view as int8s rand_val = np.random.randint(-32768, 32768, size=1, dtype=np.int32) # copy that random data data_cmd[4: 4 + int32_size] = rand_val.view(np.int8) # write dataIndex to the data_cmd (2 ints size) data_cmd[8: 8 + int32_size] = np.array([i], dtype=np.int32).view(np.int8) # write data from wave_int to cmd wave_idx = i * 32768 data_block = wave_int8[wave_idx: wave_idx + 32768] data_cmd[data_cmd_data_index: data_cmd_data_index + len(data_block)] = data_block # send data to device try: res_write = self._dev.write(0x01, data_cmd.tobytes(), 100) except usb.core.USBError as e: # TODO: we probably should try again print("something went wrong while writing to the device") return # TODO: we probably should try again assert res_write == len(data_cmd) try: ret = self._dev.read(0x81, data_cmd_reply, 400) except usb.core.USBError as e: # TODO: we probably should try again print("something went wrong while reading from the device") return # get the random received and the error received from the reply command rand_val_received = int.from_bytes(data_cmd_reply[4: 4 + int32_size], byteorder='little', signed=True) error_received = int.from_bytes(data_cmd_reply[8: 8 + int32_size], byteorder='little', signed=False) assert rand_val_received == rand_val[0] assert error_received == 0 def __get_metadata_from_device(self, sound_index): int32_size = np.dtype(np.int32).itemsize # Read metadata command length: 'c' 'm' 'd' '0x84' + random + soundIndex + 'f' read_metadata_cmd = np.zeros(4 + int32_size + int32_size + 1, dtype=np.int8) read_metadata_cmd[0] = ord('c') read_metadata_cmd[1] = ord('m') read_metadata_cmd[2] = ord('d') read_metadata_cmd[3] = 0x84 read_metadata_cmd[-1] = ord('f') rand_val = np.random.randint(-32768, 32768, size=1, dtype=np.int32) # copy that random data read_metadata_cmd[4: 4 + int32_size] = rand_val.view(np.int8) read_metadata_cmd[8: 8 + int32_size] = np.array([sound_index], dtype=np.int32).view(np.int8) # prepare to send command and receive the reply read_reply_cmd = array.array('b', [0] * (4 + 6 * int32_size + 2048)) try: res_write = self._dev.write(0x01, read_metadata_cmd.tobytes(), 100) except usb.core.USBError as e: # TODO: we probably should try again print("something went wrong while writing to the device") return assert res_write == len(read_metadata_cmd) try: ret = self._dev.read(0x81, read_reply_cmd, 100) except usb.core.USBError as e: # TODO: we probably should try again print("something went wrong while reading from the device") return metadata = collections.namedtuple('Metadata', ['metadata_array', 'description', 'bit_mask', 'sound_length', 'data_type', 'sample_rate', 'sound_filename', 'metadata_filename', 'description_filename', 'has_sound', 'has_metadata', 'has_description']) # get data from the reply array metadata.metadata_array = array.array('b', [0] * 1024) metadata.description = array.array('b', [0] * 512) # get the random received and the error received from the reply command rand_val_received = int.from_bytes(read_reply_cmd[4: 4 + int32_size], byteorder='little', signed=True) error_received = int.from_bytes(read_reply_cmd[8: 8 + int32_size], byteorder='little', signed=False) assert rand_val_received == rand_val[0] assert error_received == 0 # bitmask metadata.bit_mask = int.from_bytes(read_reply_cmd[12:12 + int32_size + int32_size], byteorder='little', signed=True) metadata.has_sound = metadata.bit_mask & (1 << sound_index) == (1 << sound_index) metadata.sound_length = int.from_bytes(read_reply_cmd[16:16 + int32_size], byteorder='little', signed=True) metadata.sample_rate = int.from_bytes(read_reply_cmd[20:20 + int32_size], byteorder='little', signed=True) metadata.data_type = int.from_bytes(read_reply_cmd[24:24 + int32_size], byteorder='little', signed=True) metadata.sound_filename = read_reply_cmd[28:170].tobytes().strip(b'\0') metadata.has_metadata = False metadata.metadata_filename = '' if read_reply_cmd[28 + 170]: metadata.has_metadata = True metadata.metadata_array[0:1024] = read_reply_cmd[28 + 512:28 + 512 + 1024] metadata.metadata_filename = read_reply_cmd[28 + 170: 28 + 170 + 170].tobytes().strip(b'\0') metadata.has_description = False metadata.description_filename = '' if read_reply_cmd[28 + 170 + 170]: metadata.has_description = True metadata.description[0:512] = read_reply_cmd[28 + 512 + 1024:28 + 512 + 1024 + 512] metadata.description_filename = read_reply_cmd[28 + 170 + 170: 28 + 170 + 170 + 170].tobytes().strip(b'\0') return metadata