From 6d7112e58dae2da3e312b4b7c14ee2c087398776 Mon Sep 17 00:00:00 2001 From: Austin Yang Date: Sat, 24 Feb 2024 20:13:34 -0600 Subject: [PATCH] WIP: ARMCommunicator --- Communication/communicator.py | 328 +++++++++++++++++++-------------- minipc.py | 12 +- requirements.txt | 1 + thirdparty/__init__.py | 0 thirdparty/dynamixel/driver.py | 10 +- 5 files changed, 199 insertions(+), 152 deletions(-) create mode 100644 thirdparty/__init__.py diff --git a/Communication/communicator.py b/Communication/communicator.py index 51a2b9a..93f6606 100644 --- a/Communication/communicator.py +++ b/Communication/communicator.py @@ -12,6 +12,10 @@ import struct from copy import deepcopy from typing import Optional, Union +from subrocess import run, PIPE + +from thirdparty import dynamixel_sdk +from thirdparty.dynamixel.driver import DynamixelDriver logger = logging.getLogger(__name__) @@ -19,7 +23,6 @@ STJ_MAX_PACKET_SIZE = 33 STJ_MIN_PACKET_SIZE = 10 - class MiniPCCommunicationError(Exception): pass @@ -279,23 +282,22 @@ def list_uart_device_paths(): OSX prefix: "tty.usbmodem" Jetson / Linux prefix: "ttyUSB", "ttyACM" - Linux: look under "/dev/serial/by-id": "usb-STMicroelectronics_STM32_STLink_" + Linux: look under "/dev/serial/by-id" for "usb-STMicroelectronics_STM32_STLink_" Returns: - [Maybe dev_path] : a list of possible device paths + [Maybe dev_paths] : a list of possible device paths """ # list of possible prefixes - UART_PREFIX_LIST = ('tty.usbmodem', 'usb-STMicroelectronics_STM32_STLink_') + UART_PREFIX_LIST = ('usb-STMicroelectronics_STM32_STLink_',) dev_basename = '/dev/serial/by-id' try: - dev_list = os.listdir(dev_basename) + path_list = os.listdir(dev_basename) except FileNotFoundError: - dev_list = [] + path_list = [] - dev_paths = [] # ret val - for dev_name in dev_list: - if dev_name.startswith(UART_PREFIX_LIST): - dev_paths += [os.path.join(dev_basename, dev_name)] + dev_paths = [os.path.join(dev_basename, path) + for path in path_list + if path.startswith(UART_PREFIX_LIST)] return dev_paths or [None] # path -> [path] -> Maybe serial.Serial @@ -616,10 +618,50 @@ def read_out(self): class ARMCommunicator(Communicator): - def __init__(self): - pass + def __init__(self, cfg, serial_dev_path, in_use=None, ): + self.cfg = cfg + self.serial_dev_path = serial_dev_path + self.in_use = in_use + + self.arm_state = StateDict(**{ + 'float0': 0.0, + 'float1': 0.0, + 'float2': 0.0, + 'float3': 0.0, + 'float4': 0.0, + 'float5': 0.0, + }) - def start_listening(self) -> None: + @staticmethod + def list_arm_device_paths(id_serial_short): + """Guess a port for the arm with `udevadm`. + + Note: this function is for UNIX-like systems only! + + Linux: look under "/dev/serial/by-id" + + Returns: + [Maybe dev_paths] : a list of possible device paths + """ + # Exclude UARTs + id_cmd = "udevadm info -q property '%s' | awk -F= '/^ID_SERIAL_SHORT/ { print $2 }'" + UART_PREFIX_LIST = ('usb-STMicroelectronics_STM32_STLink_',) + # list of possible prefixes + dev_basename = '/dev/serial/by-id' + try: + path_list = os.listdir(dev_basename) + except FileNotFoundError: + path_list = [] + + dev_paths = [os.path.join(dev_basename, path) + for path in path_list + if not path.startswith(UART_PREFIX_LIST) + if run(['/bin/bash', '-c', cmd % dev_path], + stdout=pipe).stdout.decode() == id_serial_short] + + return dev_paths or [None] + + def start_listening(self) -> none: pass def is_valid(self) -> bool: @@ -631,28 +673,28 @@ def is_vacuum(self) -> bool: def is_alive(self) -> bool: pass - def get_port(self) -> Optional[str]: + def get_port(self) -> optional[str]: pass - def create_and_send_packet(self, cmd_id, data) -> None: + def create_and_send_packet(self, cmd_id, data) -> none: pass - def create_packet(self, cmd_id, data) -> Union[bytes, dict]: + def create_packet(self, cmd_id, data) -> union[bytes, dict]: pass - def send_packet(self, packet) -> None: + def send_packet(self, packet) -> none: pass def read_out(self) -> dict: pass -class SPMCommunicator(Communicator): +class spmcommunicator(communicator): def __init__(self): pass - def start_listening(self) -> None: + def start_listening(self) -> none: pass def is_valid(self) -> bool: @@ -664,52 +706,52 @@ def is_vacuum(self) -> bool: def is_alive(self) -> bool: pass - def get_port(self) -> Optional[str]: + def get_port(self) -> optional[str]: pass - def create_and_send_packet(self, cmd_id, data) -> None: + def create_and_send_packet(self, cmd_id, data) -> none: pass - def create_packet(self, cmd_id, data) -> Union[bytes, dict]: + def create_packet(self, cmd_id, data) -> union[bytes, dict]: pass - def send_packet(self, packet) -> None: + def send_packet(self, packet) -> none: pass def read_out(self) -> dict: pass -# XXX: Move tests out, leave simpler unit tests? i.e. only pingpong -# Latency test by Richard, flash example/minipc/LatencyTest.cc -# Modified by Austin. -# Tests Minipc <-> Type C board circuit time +# xxx: move tests out, leave simpler unit tests? i.e. only pingpong +# latency test by richard, flash example/minipc/latencytest.cc +# modified by austin. +# tests minipc <-> type c board circuit time def test_board_latency(uart, rounds=15, timeout=1, hz=200, - listening=True, verbose=True): - print('\nCommunicator beginning minipc <-> board latency test: ' - f'{rounds} rounds at {hz} Hertz') - cmd_id = uart.cfg.SELFCHECK_CMD_ID + listening=true, verbose=true): + print('\ncommunicator beginning minipc <-> board latency test: ' + f'{rounds} rounds at {hz} hertz') + cmd_id = uart.cfg.selfcheck_cmd_id def send_packets(rounds, hz): send_time = [0] * rounds - packet_status = [False] * rounds + packet_status = [false] * rounds for i in range(rounds): - logger.debug(f'Sending packet #{i} to stm32...') - data = {'mode': 'ECHO', 'debug_int': i} + logger.debug(f'sending packet #{i} to stm32...') + data = {'mode': 'echo', 'debug_int': i} send_time[i] = time.time() uart.create_and_send_packet(cmd_id, data) - packet_status[i] = True + packet_status[i] = true time.sleep(1 / hz) return (send_time, packet_status) - def receive_packets(rounds, timeout, listening, ret): # Async + def receive_packets(rounds, timeout, listening, ret): # async received = 0 receive_time = [0] * rounds - packet_status = [False] * rounds - # Receive loop + packet_status = [false] * rounds + # receive loop current_time = time.time() while time.time() - current_time < timeout and received != rounds: if not listening: @@ -724,28 +766,28 @@ def receive_packets(rounds, timeout, listening, ret): # Async # debug_int acts as the index if not receive_time[i]: receive_time[i] = time.time() - logger.debug(f'Received packet #{i} from stm32...') - except IndexError: + logger.debug(f'received packet #{i} from stm32...') + except indexerror: pass - time.sleep(0.001) # Use same frequency as listen_. + time.sleep(0.001) # use same frequency as listen_. for i, t in enumerate(receive_time): if t != 0: - packet_status[i] = True + packet_status[i] = true ret[0:1] = [receive_time, packet_status] - return ret[0:1] # If not run as Thread. + return ret[0:1] # if not run as thread. - # Start the receive thread first + # start the receive thread first rt_return = [] - receive_thread = threading.Thread(target=receive_packets, + receive_thread = threading.thread(target=receive_packets, args=(rounds, timeout, listening, rt_return)) receive_thread.start() - # Send packets second + # send packets second send_time, send_packet_status = send_packets(rounds, hz) receive_thread.join() receive_time, receive_packet_status = rt_return - # Flatten data + # flatten data not_all_received = not all(receive_packet_status) # 0 if packet not received latencies = [(tf or ti) - ti @@ -753,52 +795,52 @@ def receive_packets(rounds, timeout, listening, ret): # Async statuses = [*zip(send_packet_status, receive_packet_status)] loss = latencies.count(0.0) - average_latency = sum(latencies) / (len(latencies) - loss or 1) # Prevent 0/0 + average_latency = sum(latencies) / (len(latencies) - loss or 1) # prevent 0/0 for i in range(rounds): is_sent = statuses[i][0] is_received = statuses[i][1] - logger.debug('Status of packet %d: send: %s, receive: %s' % - (i, ('UNSENT!', 'sent')[is_sent], - ('NOT RECEIVED!', 'received')[is_received])) - logger.debug(f'Latency of packet #{i}: {latencies[i]}') + logger.debug('status of packet %d: send: %s, receive: %s' % + (i, ('unsent!', 'sent')[is_sent], + ('not received!', 'received')[is_received])) + logger.debug(f'latency of packet #{i}: {latencies[i]}') - print('Attempted to send', rounds, 'packets.', - send_packet_status.count(True), 'Packets transmitted,', + print('attempted to send', rounds, 'packets.', + send_packet_status.count(true), 'packets transmitted,', rounds - loss, 'packets received.') - print(f'Packets lost: {loss}/{loss/rounds*100}%. ' - f'Average latency: {average_latency}') + print(f'packets lost: {loss}/{loss/rounds*100}%. ' + f'average latency: {average_latency}') if not_all_received: - logger.warning('Latency test: not all packets were received.') + logger.warning('latency test: not all packets were received.') return {'average': average_latency, 'loss': (loss, loss / rounds), 'detailed': [*zip(statuses, latencies)]} -# RX/TX test by YHY modified by Richard, flash example/minipc/PingPongTest.cc +# rx/tx test by yhy modified by richard, flash example/minipc/pingpongtest.cc # this ping pong test first trys to send a packet # and then attmepts to read the response from stm32 for 10 seconds # then send a second packet # after that entering ping pong mode: # receive packet from stm32, rel_pitch += 1 then immediately send back -# each "ping pong" has a ID for differentiating during pingping-ing -# TODO: This test shows the issue that a response can only be received after the data -# in circular_buffer is at least the maximum size of a packet (STJ_MAX_PACKET_SIZE). -# So if sending some small packets, +# each "ping pong" has a id for differentiating during pingping-ing +# todo: this test shows the issue that a response can only be received after the data +# in circular_buffer is at least the maximum size of a packet (stj_max_packet_size). +# so if sending some small packets, # they will stay in the circular_buffer waiting to be parsed, # until new packets are received. -# For example, if STJ_MAX_PACKET_SIZE is 21 and GIMBAL data size is 19, +# for example, if stj_max_packet_size is 21 and gimbal data size is 19, # then only after receiving 2 packets (2 * 19 > 21) # then the first packet will be parsed. -# If a data type is 10 bytes long then sending a third packet is necessary +# if a data type is 10 bytes long then sending a third packet is necessary # before pingpong -# Modified by Austin +# modified by austin def test_board_pingpong(uart, rounds=5, timeout=1, hz=2, - listening=True, verbose=True): - print('\nCommunicator beginning minipc <-> board pingpong test: ' - f'{rounds} rounds at {hz} Hertz') + listening=true, verbose=true): + print('\ncommunicator beginning minipc <-> board pingpong test: ' + f'{rounds} rounds at {hz} hertz') def receive_packet(j, timeout): current_time = time.time() @@ -806,23 +848,23 @@ def receive_packet(j, timeout): if not listening: uart.try_read_one() if uart.packet_search(): - return True + return true else: received_data = uart.get_current_stm32_state() i = int(received_data['debug_int']) if i == j: - return True - time.sleep(0.001) # Use same frequency as listen_. + return true + time.sleep(0.001) # use same frequency as listen_. - return False + return false def send_recv_packets(rounds, timeout, hz): sent, received = 0, 0 - cmd_id = uart.cfg.SELFCHECK_CMD_ID - flusher = uart.create_packet(cmd_id, {'mode': 'FLUSH', 'debug_int': 0}) + cmd_id = uart.cfg.selfcheck_cmd_id + flusher = uart.create_packet(cmd_id, {'mode': 'flush', 'debug_int': 0}) for i in range(rounds): - print(f'Sending packet #{i} to stm32...') - data = {'mode': 'ECHO', 'debug_int': i + 1} + print(f'sending packet #{i} to stm32...') + data = {'mode': 'echo', 'debug_int': i + 1} uart.create_and_send_packet(cmd_id, data) for _ in range(5): time.sleep(1 / 200) @@ -832,48 +874,48 @@ def send_recv_packets(rounds, timeout, hz): received_data = receive_packet(i + 1, timeout) if received_data: received += 1 - print(f'Received packet #{i}') + print(f'received packet #{i}') else: - print(f'Lost packet #{i}.') + print(f'lost packet #{i}.') time.sleep(1 / hz) return (sent, received) sent, received = send_recv_packets(rounds, timeout, hz) -# rate test by Roger modified by Richard, flash example/minipc/StressTestTypeC.cc -# Modified by Austin. -# TODO: currently this test will never receive full 1000 packets +# rate test by roger modified by richard, flash example/minipc/stresstesttypec.cc +# modified by austin. +# todo: currently this test will never receive full 1000 packets # but only 998 packets because the last two packets # remain in circular buffer and not parsed because -# its size is not reaching STJ_MAX_PACKET_SIZE -# NOTE: please reflash or restart program on stm32 every time you want to run this test -# TODO: Notify the board and use COLOR packets instead? +# its size is not reaching stj_max_packet_size +# note: please reflash or restart program on stm32 every time you want to run this test +# todo: notify the board and use color packets instead? def test_board_crc(uart, rounds=15, timeout=1, hz=200, - listening=True, verbose=True): - print('\nCommunicator beginning minipc <-> board crc stress test: ' - f'{rounds} rounds at {hz} Hertz') - cmd_id = uart.cfg.SELFCHECK_CMD_ID + listening=true, verbose=true): + print('\ncommunicator beginning minipc <-> board crc stress test: ' + f'{rounds} rounds at {hz} hertz') + cmd_id = uart.cfg.selfcheck_cmd_id def send_packets(rounds, hz): - packet_status = [False] * rounds + packet_status = [false] * rounds for i in range(rounds): - logger.debug(f'Sending packet #{i} to stm32...') - data = {'mode': 'ECHO', 'debug_int': 0} + logger.debug(f'sending packet #{i} to stm32...') + data = {'mode': 'echo', 'debug_int': 0} uart.create_and_send_packet(cmd_id, data) - packet_status[i] = True + packet_status[i] = true time.sleep(1 / hz) return packet_status - def receive_packets(rounds, timeout, ret): # Async + def receive_packets(rounds, timeout, ret): # async received = 0 - packet_status = [False] * rounds - # Receive loop + packet_status = [false] * rounds + # receive loop current_time = time.time() while time.time() - current_time < timeout and received != rounds: if not listening: @@ -887,55 +929,55 @@ def receive_packets(rounds, timeout, ret): # Async # debug_int acts as the index i = int(received_data['debug_int']) if not packet_status[i]: - packet_status[i] = True - logger.debug(f'Received packet #{i} from stm32...') + packet_status[i] = true + logger.debug(f'received packet #{i} from stm32...') received += 1 - except IndexError: + except indexerror: pass - time.sleep(0.001) # Use same frequency as listen_. + time.sleep(0.001) # use same frequency as listen_. ret[0] = packet_status - return ret[0] # If not run as Thread. + return ret[0] # if not run as thread. - # Send packets first - print('This test should be run without a listening thread. ' - 'Otherwise, expect only one packet.') + # send packets first + print('this test should be run without a listening thread. ' + 'otherwise, expect only one packet.') send_packet_status = send_packets(rounds, hz) - print(f'Packet sending test complete: sent {rounds} packets.') - print('You should see the light change from blue to green on type C board.') - print('When the led turns red the stm32 is sending data.') - print('Starting packet receiving test.') - # Start the receive thread second - rt_return = [None] - receive_thread = threading.Thread(target=receive_packets, + print(f'packet sending test complete: sent {rounds} packets.') + print('you should see the light change from blue to green on type c board.') + print('when the led turns red the stm32 is sending data.') + print('starting packet receiving test.') + # start the receive thread second + rt_return = [none] + receive_thread = threading.thread(target=receive_packets, args=(rounds, timeout, rt_return)) - receive_thread.daemon = True + receive_thread.daemon = true receive_thread.start() receive_thread.join() receive_packet_status = rt_return[0] - # Flatten data + # flatten data not_all_received = not all(receive_packet_status) statuses = [*zip(send_packet_status, receive_packet_status)] - loss = receive_packet_status.count(False) + loss = receive_packet_status.count(false) - print(f'\nAttempted to send {rounds} packets: ' - f'{send_packet_status.count(True)} packets transmitted, ' + print(f'\nattempted to send {rounds} packets: ' + f'{send_packet_status.count(true)} packets transmitted, ' f'{rounds-loss} packets received.') - print(f'Packets lost: {loss}/{loss/rounds*100}%.') + print(f'packets lost: {loss}/{loss/rounds*100}%.') if not_all_received: - logger.warning('Crc test: not all packets were received.') + logger.warning('crc test: not all packets were received.') return {'loss': (loss, loss / rounds), 'detailed': statuses} def test_board_typea(uart, rounds=5, interval=1, - verbose=True): - print('Communicator beginning minipc <-> board Type A test: ' + verbose=true): + print('communicator beginning minipc <-> board type a test: ' f'{rounds} rounds at {interval} intervals') - # vanilla send test, flash typeA.cc + # vanilla send test, flash typea.cc cur_packet_cnt = uart.parsed_packet_cnt prv_parsed_packet_cnt = 0 for i in range(rounds): @@ -946,50 +988,50 @@ def test_board_typea(uart, rounds=5, interval=1, if uart.parsed_packet_cnt > cur_packet_cnt: cur_packet_cnt = uart.parsed_packet_cnt time.sleep(0.001) - print("Parsed {} packets in 1 second.".format( + print("parsed {} packets in 1 second.".format( cur_packet_cnt - prv_parsed_packet_cnt)) prv_parsed_packet_cnt = cur_packet_cnt cur_time = time.time() if __name__ == '__main__': - # Unit testing + # unit testing sys.path.append(os.path.join(os.path.dirname(__file__), '..')) - from enum import Enum + from enum import enum import config - uart = UARTCommunicator(config) + uart = uartcommunicator(config) - class Test(Enum): - """Class used to choose test for communicator.""" - LATENCY = 1 - PINGPONG = 2 - CRC = 3 - TYPE_A = 4 - testing = Test.PINGPONG + class test(enum): + """class used to choose test for communicator.""" + latency = 1 + pingpong = 2 + crc = 3 + type_a = 4 + testing = test.pingpong - # Remove first arg if called with python. + # remove first arg if called with python. if 'python' in sys.argv[0]: sys.argv.pop(0) - testing = Test.PINGPONG + testing = test.pingpong if len(sys.argv) > 1: - testing = (Test.LATENCY, Test.PINGPONG, - Test.CRC, Test.TYPE_A)[int(sys.argv[1]) - 1] - print(f'\nUsing test type: {testing}') + testing = (test.latency, test.pingpong, + test.crc, test.type_a)[int(sys.argv[1]) - 1] + print(f'\nusing test type: {testing}') else: - print(f'\nUsing default test type: {testing}') - print("Change test type: ./communicator.py [1|2|3|4]") - print("1: LATENCY, 2: PINGPONG, 3: CRC, 4: TYPE_A\n") + print(f'\nusing default test type: {testing}') + print("change test type: ./communicator.py [1|2|3|4]") + print("1: latency, 2: pingpong, 3: crc, 4: type_a\n") match testing: - case Test.LATENCY: + case test.latency: test_board_latency() - case Test.PINGPONG: + case test.pingpong: test_board_pingpong() - case Test.CRC: + case test.crc: test_board_crc() - case Test.TYPE_A: + case test.type_a: test_board_typea() case _: print("Invalid selection") diff --git a/minipc.py b/minipc.py index 8f2fbcd..679af04 100644 --- a/minipc.py +++ b/minipc.py @@ -21,7 +21,8 @@ from util.atomiclist import AtomicList from Communication import communicator from Communication.communicator import Communicator, \ - UARTCommunicator, ARMCommunicator, SPMCommunicator, \ + UARTCommunicator, USBCommunicator, \ + ARMCommunicator, SPMCommunicator, \ StateDict _m = MatchAll() @@ -58,11 +59,12 @@ c_logger := logging.getLogger('Communication.communicator')] -# UART >= 0, others < 0 +# UART > 0, USB <= 0 class DeviceType(IntEnum): - UART = 0 # portless uart + UART = 1 # portless uart BRD = 2 # board + USB = 0 # portless usb SPM = -1 # spacemouse ARM = -2 # small arm @@ -76,6 +78,7 @@ class DeviceType(IntEnum): # Muting data in an (Int)Enum is not expected behavior, so: serial_devices: Dict[DeviceType, Communicator] = { UART: UARTCommunicator(config, serial_dev_path=False, warn=False), + USB: USBCommunicator(config, serial_dev_path=False, warn=False) BRD: None, SPM: None, ARM: None, @@ -83,6 +86,7 @@ class DeviceType(IntEnum): intenum_to_name = { UART: 'UART', + USB: 'USB', BRD: 'BOARD', SPM: 'SPACEMOUSE', ARM: 'ARM', @@ -108,7 +112,7 @@ def is_uart(dev_type: DeviceType) -> bool: return dev_type >= UART -def is_other(dev_type: DeviceType) -> bool: +def is_usb(dev_type: DeviceType) -> bool: return dev_type < UART diff --git a/requirements.txt b/requirements.txt index 1d5beea..0b96de4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ pylint pyserial crc +numpy diff --git a/thirdparty/__init__.py b/thirdparty/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/thirdparty/dynamixel/driver.py b/thirdparty/dynamixel/driver.py index 29a707e..09cbb81 100644 --- a/thirdparty/dynamixel/driver.py +++ b/thirdparty/dynamixel/driver.py @@ -3,11 +3,11 @@ from typing import Protocol, Sequence import numpy as np -from dynamixel_sdk.group_sync_read import GroupSyncRead -from dynamixel_sdk.group_sync_write import GroupSyncWrite -from dynamixel_sdk.packet_handler import PacketHandler -from dynamixel_sdk.port_handler import PortHandler -from dynamixel_sdk.robotis_def import ( +from thirdparty.dynamixel_sdk.group_sync_read import GroupSyncRead +from thirdparty.dynamixel_sdk.group_sync_write import GroupSyncWrite +from thirdparty.dynamixel_sdk.packet_handler import PacketHandler +from thirdparty.dynamixel_sdk.port_handler import PortHandler +from thirdparty.dynamixel_sdk.robotis_def import ( COMM_SUCCESS, DXL_HIBYTE, DXL_HIWORD,