diff --git a/core/services/ardupilot_manager/ArduPilotManager.py b/core/services/ardupilot_manager/ArduPilotManager.py deleted file mode 100644 index 957b808134..0000000000 --- a/core/services/ardupilot_manager/ArduPilotManager.py +++ /dev/null @@ -1,619 +0,0 @@ -import asyncio -import os -import pathlib -import subprocess -import time -from copy import deepcopy -from typing import Any, List, Optional, Set - -import psutil -from commonwealth.mavlink_comm.VehicleManager import VehicleManager -from commonwealth.utils.Singleton import Singleton -from elftools.elf.elffile import ELFFile -from loguru import logger - -from exceptions import ( - ArdupilotProcessKillFail, - EndpointAlreadyExists, - NoDefaultFirmwareAvailable, - NoPreferredBoardSet, -) -from firmware.FirmwareManagement import FirmwareManager -from flight_controller_detector.Detector import Detector as BoardDetector -from flight_controller_detector.linux.linux_boards import LinuxFlightController -from mavlink_proxy.Endpoint import Endpoint -from mavlink_proxy.Manager import Manager as MavlinkManager -from settings import Settings -from typedefs import ( - EndpointType, - Firmware, - FlightController, - FlightControllerFlags, - Parameters, - Platform, - PlatformType, - Serial, - SITLFrame, - Vehicle, -) - - -class ArduPilotManager(metaclass=Singleton): - # pylint: disable=too-many-instance-attributes - def __init__(self) -> None: - self.settings = Settings() - self.settings.create_app_folders() - self._current_board: Optional[FlightController] = None - - # Load settings and do the initial configuration - if self.settings.load(): - logger.info(f"Loaded settings from {self.settings.settings_file}.") - logger.debug(self.settings.content) - else: - self.settings.create_settings_file() - - async def setup(self) -> None: - # This is the logical continuation of __init__(), extracted due to its async nature - self.configuration = deepcopy(self.settings.content) - self.mavlink_manager = MavlinkManager(self.load_preferred_router()) - if not self.load_preferred_router(): - await self.set_preferred_router(self.mavlink_manager.available_interfaces()[0].name()) - logger.info(f"Setting {self.mavlink_manager.available_interfaces()[0].name()} as preferred router.") - self.mavlink_manager.set_logdir(self.settings.log_path) - - self._load_endpoints() - self.ardupilot_subprocess: Optional[Any] = None - self.firmware_manager = FirmwareManager( - self.settings.firmware_folder, self.settings.defaults_folder, self.settings.user_firmware_folder - ) - self.vehicle_manager = VehicleManager() - - self.should_be_running = False - self.remove_old_logs() - self.current_sitl_frame = self.load_sitl_frame() - - def remove_old_logs(self) -> None: - def need_to_remove_file(file: pathlib.Path) -> bool: - now = time.time() - week_old = now - 7 * 24 * 60 * 60 - return file.stat().st_size == 0 and file.stat().st_mtime < week_old - - try: - firmware_log_files = list((self.settings.firmware_folder / "logs").iterdir()) - router_log_files = list(self.settings.log_path.iterdir()) - - # Get all files with zero bytes and more than 7 days older - files = [file for file in firmware_log_files + router_log_files if need_to_remove_file(file)] - for file in files: - logger.debug(f"Removing invalid log file: {file}") - os.remove(file) - except Exception as error: - logger.warning(f"Failed to remove logs: {error}") - - async def auto_restart_ardupilot(self) -> None: - """Auto-restart Ardupilot when it's not running but was supposed to.""" - while True: - process_not_running = ( - self.ardupilot_subprocess is not None and self.ardupilot_subprocess.poll() is not None - ) or len(self.running_ardupilot_processes()) == 0 - needs_restart = self.should_be_running and ( - self.current_board is None - or (self.current_board.type in [PlatformType.SITL, PlatformType.Linux] and process_not_running) - ) - if needs_restart: - logger.debug("Restarting ardupilot...") - try: - await self.kill_ardupilot() - except Exception as error: - logger.warning(f"Could not kill Ardupilot: {error}") - try: - await self.start_ardupilot() - except Exception as error: - logger.warning(f"Could not start Ardupilot: {error}") - await asyncio.sleep(5.0) - - async def start_mavlink_manager_watchdog(self) -> None: - await self.mavlink_manager.auto_restart_router() - - @property - def current_board(self) -> Optional[FlightController]: - return self._current_board - - @property - def current_sitl_frame(self) -> SITLFrame: - return self._current_sitl_frame - - @current_sitl_frame.setter - def current_sitl_frame(self, frame: SITLFrame) -> None: - self._current_sitl_frame = frame - logger.info(f"Setting {frame.value} as frame for SITL.") - - @staticmethod - def firmware_has_debug_symbols(firmware_path: pathlib.Path) -> bool: - with open(firmware_path, "rb") as f: - elffile = ELFFile(f) - - for section in elffile.iter_sections(): - if section.name.startswith(".debug_line"): - # 100k is Empirical data. non-debug binaries seem to have around 700 entries here, - # while debug ones have 28 million entries - if section.header.sh_size > 100000: - return True - return False - return False - - def update_serials(self, serials: List[Serial]) -> None: - self.configuration["serials"] = [vars(serial) for serial in serials] - self.settings.save(self.configuration) - - def get_serials(self) -> List[Serial]: - # The mapping of serial ports works as in the following table: - # - # | ArduSub | Navigator | - # | -C = Serial1 | Serial1 => /dev/ttyS0 | - # | -B = Serial3 | Serial3 => /dev/ttyAMA1 | - # | -E = Serial4 | Serial4 => /dev/ttyAMA2 | - # | -F = Serial5 | Serial5 => /dev/ttyAMA3 | - # - # The first column comes from https://ardupilot.org/dev/docs/sitl-serial-mapping.html - - if "serials" not in self.configuration: - assert isinstance(self._current_board, LinuxFlightController) - return self._current_board.get_serials() - serials = [] - for entry in self.configuration["serials"]: - try: - serials.append(Serial(port=entry["port"], endpoint=entry["endpoint"])) - except Exception as e: - logger.error(f"Entry is invalid! {entry['port']}:{entry['endpoint']}") - logger.error(e) - return serials - - def get_default_params_cmdline(self, platform: Platform) -> str: - # check if file exists and return it's path as --defaults parameter - default_params_path = self.firmware_manager.default_user_params_path(platform) - if default_params_path.is_file(): - return f"--defaults {default_params_path}" - return "" - - async def start_linux_board(self, board: LinuxFlightController) -> None: - self._current_board = board - if not self.firmware_manager.is_firmware_installed(self._current_board): - if board.platform == Platform.Navigator: - self.firmware_manager.install_firmware_from_file( - pathlib.Path("/root/blueos-files/ardupilot-manager/default/ardupilot_navigator"), - board, - ) - else: - raise NoDefaultFirmwareAvailable( - f"No firmware installed for '{board.platform}' and no default firmware available. Please install the firmware manually." - ) - - firmware_path = self.firmware_manager.firmware_path(self._current_board.platform) - self.firmware_manager.validate_firmware(firmware_path, self._current_board.platform) - - # ArduPilot process will connect as a client on the UDP server created by the mavlink router - master_endpoint = Endpoint( - name="Master", - owner=self.settings.app_name, - connection_type=EndpointType.UDPServer, - place="127.0.0.1", - argument=8852, - protected=True, - ) - - # Run ardupilot inside while loop to avoid exiting after reboot command - ## Can be changed back to a simple command after https://github.com/ArduPilot/ardupilot/issues/17572 - ## gets fixed. - # pylint: disable=consider-using-with - # - # The mapping of serial ports works as in the following table: - # - # | ArduSub | Navigator | - # | -C = Serial1 | Serial1 => /dev/ttyS0 | - # | -B = Serial3 | Serial3 => /dev/ttyAMA1 | - # | -E = Serial4 | Serial4 => /dev/ttyAMA2 | - # | -F = Serial5 | Serial5 => /dev/ttyAMA3 | - # - # The first column comes from https://ardupilot.org/dev/docs/sitl-serial-mapping.html - - command_line = ( - f"{firmware_path}" - f" -A udp:{master_endpoint.place}:{master_endpoint.argument}" - f" --log-directory {self.settings.firmware_folder}/logs/" - f" --storage-directory {self.settings.firmware_folder}/storage/" - f" {self._current_board.get_serial_cmdlines()}" - f" {self.get_default_params_cmdline(board.platform)}" - ) - - if self.firmware_has_debug_symbols(firmware_path): - logger.info("Debug symbols found, launching with gdb server...") - command_line = f"gdbserver 0.0.0.0:5555 {command_line}" - - logger.info(f"Using command line: '{command_line}'") - self.ardupilot_subprocess = subprocess.Popen( - command_line, - shell=True, - encoding="utf-8", - errors="ignore", - cwd=self.settings.firmware_folder, - ) - - await self.start_mavlink_manager(master_endpoint) - - async def start_serial(self, board: FlightController) -> None: - if not board.path: - raise ValueError(f"Could not find device path for board {board.name}.") - self._current_board = board - baudrate = 115200 - is_px4 = "px4" in board.name.lower() - if is_px4: - baudrate = 57600 - await self.start_mavlink_manager( - Endpoint( - name="Master", - owner=self.settings.app_name, - connection_type=EndpointType.Serial, - place=board.path, - argument=baudrate, - protected=True, - ) - ) - if is_px4: - # PX4 needs at least one initial heartbeat to start sending data - await self.vehicle_manager.burst_heartbeat() - - def set_sitl_frame(self, frame: SITLFrame) -> None: - self.current_sitl_frame = frame - self.configuration["sitl_frame"] = frame - self.settings.save(self.configuration) - - def load_sitl_frame(self) -> SITLFrame: - if self.settings.sitl_frame != SITLFrame.UNDEFINED: - return self.settings.sitl_frame - frame = SITLFrame.VECTORED - logger.warning(f"SITL frame is undefined. Setting {frame} as current frame.") - self.set_sitl_frame(frame) - return frame - - async def set_preferred_router(self, router: str) -> None: - self.settings.preferred_router = router - self.configuration["preferred_router"] = router - self.settings.save(self.configuration) - await self.mavlink_manager.set_preferred_router(router) - - def load_preferred_router(self) -> Optional[str]: - try: - return self.configuration["preferred_router"] # type: ignore - except KeyError: - return None - - def get_available_routers(self) -> List[str]: - return [router.name() for router in self.mavlink_manager.available_interfaces()] - - async def start_sitl(self) -> None: - self._current_board = BoardDetector.detect_sitl() - if not self.firmware_manager.is_firmware_installed(self._current_board): - self.firmware_manager.install_firmware_from_params(Vehicle.Sub, self._current_board) - frame = self.settings.sitl_frame - if frame == SITLFrame.UNDEFINED: - frame = SITLFrame.VECTORED - logger.warning(f"SITL frame is undefined. Setting {frame} as current frame.") - self.current_sitl_frame = frame - - firmware_path = self.firmware_manager.firmware_path(self._current_board.platform) - self.firmware_manager.validate_firmware(firmware_path, self._current_board.platform) - - # ArduPilot SITL binary will bind TCP port 5760 (server) and the mavlink router will connect to it as a client - master_endpoint = Endpoint( - name="Master", - owner=self.settings.app_name, - connection_type=EndpointType.TCPClient, - place="127.0.0.1", - argument=5760, - protected=True, - ) - # pylint: disable=consider-using-with - self.ardupilot_subprocess = subprocess.Popen( - [ - firmware_path, - "--model", - self.current_sitl_frame.value, - "--base-port", - str(master_endpoint.argument), - "--home", - "-27.563,-48.459,0.0,270.0", - ], - shell=False, - encoding="utf-8", - errors="ignore", - cwd=self.settings.firmware_folder, - ) - - await self.start_mavlink_manager(master_endpoint) - - async def start_mavlink_manager(self, device: Endpoint) -> None: - default_endpoints = [ - Endpoint( - name="GCS Server Link", - owner=self.settings.app_name, - connection_type=EndpointType.UDPServer, - place="0.0.0.0", - argument=14550, - persistent=True, - enabled=False, - ), - Endpoint( - name="GCS Client Link", - owner=self.settings.app_name, - connection_type=EndpointType.UDPClient, - place="192.168.2.1", - argument=14550, - persistent=True, - enabled=True, - ), - Endpoint( - name="MAVLink2RestServer", - owner=self.settings.app_name, - connection_type=EndpointType.UDPServer, - place="127.0.0.1", - argument=14001, - persistent=True, - protected=True, - ), - Endpoint( - name="MAVLink2Rest", - owner=self.settings.app_name, - connection_type=EndpointType.UDPClient, - place="127.0.0.1", - argument=14000, - persistent=True, - protected=True, - overwrite_settings=True, - ), - Endpoint( - name="Internal Link", - owner=self.settings.app_name, - connection_type=EndpointType.TCPServer, - place="127.0.0.1", - argument=5777, - persistent=True, - protected=True, - overwrite_settings=True, - ), - Endpoint( - name="Ping360 Heading", - owner=self.settings.app_name, - connection_type=EndpointType.UDPServer, - place="0.0.0.0", - argument=14660, - persistent=True, - protected=True, - ), - ] - for endpoint in default_endpoints: - try: - self.mavlink_manager.add_endpoint(endpoint) - except EndpointAlreadyExists: - pass - except Exception as error: - logger.warning(str(error)) - await self.mavlink_manager.start(device) - - @staticmethod - async def available_boards(include_bootloaders: bool = False) -> List[FlightController]: - all_boards = await BoardDetector.detect(True) - if include_bootloaders: - return all_boards - return [board for board in all_boards if FlightControllerFlags.is_bootloader not in board.flags] - - async def change_board(self, board: FlightController) -> None: - logger.info(f"Trying to run with '{board.name}'.") - boards = await self.available_boards() - if not any(board.name == detectedboard.name for detectedboard in boards): - logger.error(f"Attempted to change active board to {board} which is not detected.") - logger.info(f"detected boards are: {boards}") - raise ValueError(f"Cannot use '{board.name}'. Board not detected.") - self.set_preferred_board(board) - await self.kill_ardupilot() - await self.start_ardupilot() - - def set_preferred_board(self, board: FlightController) -> None: - logger.info(f"Setting {board.name} as preferred flight-controller.") - self.configuration["preferred_board"] = board.dict(exclude={"path"}) - self.settings.save(self.configuration) - - def get_preferred_board(self) -> FlightController: - preferred_board = self.configuration.get("preferred_board") - if not preferred_board: - raise NoPreferredBoardSet("Preferred board not set yet.") - return FlightController(**preferred_board) - - def get_board_to_be_used(self, boards: List[FlightController]) -> FlightController: - """Check if preferred board exists and is connected. If so, use it, otherwise, choose by priority.""" - try: - preferred_board = self.get_preferred_board() - logger.debug(f"Preferred flight-controller is {preferred_board.name}.") - for board in boards: - # Compare connected boards with saved board, excluding path (which can change between sessions) - if preferred_board.dict(exclude={"path"}).items() <= board.dict().items(): - return board - logger.debug(f"Flight-controller {preferred_board.name} not connected.") - except NoPreferredBoardSet as error: - logger.warning(error) - - # SITL should only be used if explicitly set by user, in which case it's a preferred board and the - # previous return logic will get it. We do this to prevent the user thinking that it's physical board - # is correctly running when in fact it was SITL automatically starting. - real_boards = [board for board in boards if board.type != PlatformType.SITL] - if not real_boards: - raise RuntimeError("Only available board is SITL, and it wasn't explicitly chosen.") - real_boards.sort(key=lambda board: board.platform) - return real_boards[0] - - def running_ardupilot_processes(self) -> List[psutil.Process]: - """Return list of all Ardupilot process running on system.""" - - def is_ardupilot_process(process: psutil.Process) -> bool: - """Checks if given process is using a Ardupilot's firmware file, for any known platform.""" - for platform in Platform: - firmware_path = self.firmware_manager.firmware_path(platform) - if str(firmware_path) in " ".join(process.cmdline()): - return True - return False - - return list(filter(is_ardupilot_process, psutil.process_iter())) - - async def terminate_ardupilot_subprocess(self) -> None: - """Terminate Ardupilot subprocess.""" - if self.ardupilot_subprocess: - self.ardupilot_subprocess.terminate() - for _ in range(10): - if self.ardupilot_subprocess.poll() is not None: - logger.info("Ardupilot subprocess terminated.") - return - logger.debug("Waiting for process to die...") - await asyncio.sleep(0.5) - raise ArdupilotProcessKillFail("Could not terminate Ardupilot subprocess.") - logger.warning("Ardupilot subprocess already not running.") - - async def prune_ardupilot_processes(self) -> None: - """Kill all system processes using Ardupilot's firmware file.""" - for process in self.running_ardupilot_processes(): - try: - logger.debug(f"Killing Ardupilot process {process.name()}::{process.pid}.") - process.kill() - except Exception as error: - logger.debug(f"Could not kill Ardupilot: {error}") - - try: - process.wait(3) - continue - except Exception as error: - logger.debug(f"Ardupilot appears to be running.. going to call pkill: {error}") - - try: - subprocess.run(["pkill", "-9", process.pid], check=True) - except Exception as error: - raise ArdupilotProcessKillFail(f"Failed to kill {process.name()}::{process.pid}.") from error - - async def kill_ardupilot(self) -> None: - self.should_be_running = False - if not self.current_board or self.current_board.platform != Platform.SITL: - try: - logger.info("Disarming vehicle.") - await self.vehicle_manager.disarm_vehicle() - logger.info("Vehicle disarmed.") - except Exception as error: - logger.warning(f"Could not disarm vehicle: {error}. Proceeding with kill.") - - # TODO: Add shutdown command on HAL_SITL and HAL_LINUX, changing terminate/prune - # logic with a simple "self.vehicle_manager.shutdown_vehicle()" - logger.info("Terminating Ardupilot subprocess.") - await self.terminate_ardupilot_subprocess() - logger.info("Ardupilot subprocess terminated.") - logger.info("Pruning Ardupilot's system processes.") - await self.prune_ardupilot_processes() - logger.info("Ardupilot's system processes pruned.") - - logger.info("Stopping Mavlink manager.") - await self.mavlink_manager.stop() - logger.info("Mavlink manager stopped.") - - async def start_ardupilot(self) -> None: - await self.setup() - try: - available_boards = await self.available_boards() - if not available_boards: - raise RuntimeError("No boards available.") - if len(available_boards) > 1: - logger.warning(f"More than a single board detected: {available_boards}") - - flight_controller = self.get_board_to_be_used(available_boards) - logger.info(f"Using {flight_controller.name} flight-controller.") - - if flight_controller.platform.type == PlatformType.Linux: - assert isinstance(flight_controller, LinuxFlightController) - await self.start_linux_board(flight_controller) - elif flight_controller.platform.type == PlatformType.Serial: - await self.start_serial(flight_controller) - elif flight_controller.platform == Platform.SITL: - await self.start_sitl() - else: - raise RuntimeError(f"Invalid board type: {flight_controller}") - finally: - self.should_be_running = True - - async def restart_ardupilot(self) -> None: - if self.current_board is None or self.current_board.type in [PlatformType.SITL, PlatformType.Linux]: - await self.kill_ardupilot() - await self.start_ardupilot() - return - await self.vehicle_manager.reboot_vehicle() - - def _get_configuration_endpoints(self) -> Set[Endpoint]: - return {Endpoint(**endpoint) for endpoint in self.configuration.get("endpoints") or []} - - def _save_endpoints_to_configuration(self, endpoints: Set[Endpoint]) -> None: - self.configuration["endpoints"] = list(map(Endpoint.as_dict, endpoints)) - - def _load_endpoints(self) -> None: - """Load endpoints from the configuration file to the mavlink manager.""" - for endpoint in self._get_configuration_endpoints(): - try: - self.mavlink_manager.add_endpoint(endpoint) - except Exception as error: - logger.error(f"Could not load endpoint {endpoint}: {error}") - - def _save_current_endpoints(self) -> None: - try: - persistent_endpoints = set(filter(lambda endpoint: endpoint.persistent, self.get_endpoints())) - self._save_endpoints_to_configuration(persistent_endpoints) - self.settings.save(self.configuration) - except Exception as error: - logger.error(f"Could not save endpoints. {error}") - - def get_endpoints(self) -> Set[Endpoint]: - """Get all endpoints from the mavlink manager.""" - return self.mavlink_manager.endpoints() - - async def add_new_endpoints(self, new_endpoints: Set[Endpoint]) -> None: - """Add multiple endpoints to the mavlink manager and save them on the configuration file.""" - logger.info(f"Adding endpoints {[e.name for e in new_endpoints]} and updating settings file.") - self.mavlink_manager.add_endpoints(new_endpoints) - self._save_current_endpoints() - await self.mavlink_manager.restart() - - async def remove_endpoints(self, endpoints_to_remove: Set[Endpoint]) -> None: - """Remove multiple endpoints from the mavlink manager and save them on the configuration file.""" - logger.info(f"Removing endpoints {[e.name for e in endpoints_to_remove]} and updating settings file.") - self.mavlink_manager.remove_endpoints(endpoints_to_remove) - self._save_current_endpoints() - await self.mavlink_manager.restart() - - async def update_endpoints(self, endpoints_to_update: Set[Endpoint]) -> None: - """Update multiple endpoints from the mavlink manager and save them on the configuration file.""" - logger.info(f"Modifying endpoints {[e.name for e in endpoints_to_update]} and updating settings file.") - self.mavlink_manager.update_endpoints(endpoints_to_update) - self._save_current_endpoints() - await self.mavlink_manager.restart() - - def get_available_firmwares(self, vehicle: Vehicle, platform: Platform) -> List[Firmware]: - return self.firmware_manager.get_available_firmwares(vehicle, platform) - - def install_firmware_from_file( - self, firmware_path: pathlib.Path, board: FlightController, default_parameters: Optional[Parameters] = None - ) -> None: - self.firmware_manager.install_firmware_from_file(firmware_path, board, default_parameters) - - def install_firmware_from_url( - self, - url: str, - board: FlightController, - make_default: bool = False, - default_parameters: Optional[Parameters] = None, - ) -> None: - self.firmware_manager.install_firmware_from_url(url, board, make_default, default_parameters) - - def restore_default_firmware(self, board: FlightController) -> None: - self.firmware_manager.restore_default_firmware(board) diff --git a/core/services/ardupilot_manager/autopilot/firmware.py b/core/services/ardupilot_manager/autopilot/firmware.py new file mode 100644 index 0000000000..0e67ff099a --- /dev/null +++ b/core/services/ardupilot_manager/autopilot/firmware.py @@ -0,0 +1,18 @@ +import abc + +class AutopilotFirmware(abc.ABC): + @abc.abstractmethod + def is_installed(self) -> None: + raise NotImplementedError + + @abc.abstractmethod + def install(self) -> None: + raise NotImplementedError + + @abc.abstractmethod + def uninstall(self) -> None: + raise NotImplementedError + + @abc.abstractmethod + def update(self) -> None: + raise NotImplementedError diff --git a/core/services/ardupilot_manager/autopilot/firmwares/px4/bootloader.py b/core/services/ardupilot_manager/autopilot/firmwares/px4/bootloader.py new file mode 100644 index 0000000000..5660fffb68 --- /dev/null +++ b/core/services/ardupilot_manager/autopilot/firmwares/px4/bootloader.py @@ -0,0 +1,216 @@ + +class PX4Board: + boards_id_to_firmware_name_mapping = { + 9: "px4_fmu-v2_default", + 255: "px4_fmu-v3_default", + 11: "px4_fmu-v4_default", + 13: "px4_fmu-v4pro_default", + 20: "uvify_core_default", + 50: "px4_fmu-v5_default", + 51: "px4_fmu-v5x_default", + 52: "px4_fmu-v6_default", + 53: "px4_fmu-v6x_default", + 54: "px4_fmu-v6u_default", + 56: "px4_fmu-v6c_default", + 57: "ark_fmu-v6x_default", + 35: "px4_fmu-v6xrt_default", + 55: "sky-drones_smartap-airlink_default", + 88: "airmind_mindpx-v2_default", + 12: "bitcraze_crazyflie_default", + 14: "bitcraze_crazyflie21_default", + 42: "omnibus_f4sd_default", + 33: "mro_x21_default", + 65: "intel_aerofc-v1_default", + 123: "holybro_kakutef7_default", + 41775: "modalai_fc-v1_default", + 41776: "modalai_fc-v2_default", + 78: "holybro_pix32v5_default", + 79: "holybro_can-gps-v1_default", + 28: "nxp_fmuk66-v3_default", + 30: "nxp_fmuk66-e_default", + 31: "nxp_fmurt1062-v1_default", + 85: "freefly_can-rtk-gps_default", + 120: "cubepilot_cubeyellow_default", + 136: "mro_x21-777_default", + 139: "holybro_durandal-v1_default", + 140: "cubepilot_cubeorange_default", + 1063: "cubepilot_cubeorangeplus_default", + 141: "mro_ctrl-zero-f7_default", + 142: "mro_ctrl-zero-f7-oem_default", + 212: "thepeach_k1_default", + 213: "thepeach_r1_default", + 1009: "cuav_nora_default", + 1010: "cuav_x7pro_default", + 1017: "mro_pixracerpro_default", + 1022: "mro_ctrl-zero-classic_default", + 1023: "mro_ctrl-zero-h7_default", + 1024: "mro_ctrl-zero-h7-oem_default", + 1048: "holybro_kakuteh7_default", + 1053: "holybro_kakuteh7v2_default", + 1054: "holybro_kakuteh7mini_default", + 1110: "jfb_jfb110_default", + } + + def __init__(self, id: int, name: str) -> None: + self.id = id + self.name = name + + @classmethod + def from_id(cls, id: int) -> "PX4Board": + board_name = cls.boards_id_to_firmware_name_mapping.get(id, None) + if board_name is None: + raise ValueError(f"Board with id {id} not found") + + return cls(id, board_name) + + +import serial +import time +from enum import Enum +from dataclasses import dataclass + + +class PX4BootLoaderCommands(int, Enum): + # Protocol Bytes + PROTO_INSYNC = 0x12 # 'in sync' byte sent before status + PROTO_BAD_SILICON_REV = 0x14 # Device is using silicon not suitable + PROTO_EOC = 0x20 # End of command + + # Reply bytes + PROTO_OK = 0x10 # 'ok' response + PROTO_FAILED = 0x11 # 'fail' response + PROTO_INVALID = 0x13 # 'invalid' response for bad commands + + # Command bytes + PROTO_GET_SYNC = 0x21 # NOP for re-establishing sync + PROTO_GET_DEVICE = 0x22 # get device ID bytes + PROTO_CHIP_ERASE = 0x23 # erase program area and reset program address + PROTO_LOAD_ADDRESS = 0x24 # set next programming address + PROTO_PROG_MULTI = 0x27 # write bytes at program address and increment + PROTO_GET_CRC = 0x29 # compute & return a CRC + PROTO_BOOT = 0x30 # boot the application + + # Command bytes - Rev 2 boootloader only + PROTO_CHIP_VERIFY = 0x24 # begin verify mode + PROTO_READ_MULTI = 0x28 # read bytes at programm address and increment + + INFO_BL_REV = 1 # bootloader protocol revision + BL_REV_MIN = 2 # Minimum supported bootlader protocol + BL_REV_MAX = 5 # Maximum supported bootloader protocol + INFO_BOARD_ID = 2 # board type + INFO_BOARD_REV = 3 # board revision + INFO_FLASH_SIZE = 4 # max firmware size in bytes + + PROG_MULTI_MAX = 64, # write size for PROTO_PROG_MULTI, must be multiple of 4 + READ_MULTI_MAX = 0x28 # read size for PROTO_READ_MULTI, must be multiple of 4. Sik Radio max size is 0x28 + + +@dataclass +class PX4BootLoaderBoardInfo: + board_id: int + boot_loader_version: int + flash_size: int + + +class PX4BootLoader: + # Specific boards that need special handling when in certain versions of boot loader + _board_id_px4_fmu_v2 = 9 + _board_id_px4_fmu_v3 = 255 + _board_flash_size_small = 1032192 + _boot_loader_version_v2_correct_flash = 5 + + def __init__(self, port: serial.Serial): + self.port = port + + def get_board_info(self) -> PX4BootLoaderBoardInfo: + # Try getting in sync + self._sync() + + # Get bootloader version + bootloader_version = self._proto_get_device(PX4BootLoaderCommands.INFO_BL_REV) + + if ( + bootloader_version < PX4BootLoaderCommands.BL_REV_MIN or + bootloader_version > PX4BootLoaderCommands.BL_REV_MAX + ): + raise Exception("Unsupported bootloader version") + + # Get board ID + board_id = self._proto_get_device(PX4BootLoaderCommands.INFO_BOARD_ID) + + # Get flash size + flash_size = self._proto_get_device(PX4BootLoaderCommands.INFO_FLASH_SIZE) + + # From QGC source code + # Older V2 boards have large flash space but silicon error which prevents it from being used. Bootloader v5 and above + # will correctly account/report for this. Older bootloaders will not. Newer V2 board which support larger flash space are + # reported as V3 board id. + if ( + board_id == self._board_id_px4_fmu_v2 + and bootloader_version >= self._boot_loader_version_v2_correct_flash + and flash_size > self._board_flash_size_small + ): + board_id = self._board_id_px4_fmu_v3 + + return PX4BootLoaderBoardInfo(board_id, bootloader_version, flash_size) + + def _safe_read(self, expected_bytes: int, timeout: int) -> bytearray: + started_reading_at = time.time() + while self.port.in_waiting < expected_bytes: + if time.time() - started_reading_at > timeout: + raise serial.SerialTimeoutException("Timeout while waiting bootloader response") + time.sleep(0.1) + + return bytearray(self.port.read(expected_bytes)) + + def _read_command_response(self, timeout: int) -> bytearray: + # By default PX4 responses are 2 bytes long + res = self._safe_read(2, timeout) + + if res[0] != PX4BootLoaderCommands.PROTO_INSYNC: + raise serial.SerialException("Unable to sync with bootloader") + if res[0] == PX4BootLoaderCommands.PROTO_INSYNC and res[1] == PX4BootLoaderCommands.PROTO_BAD_SILICON_REV: + raise serial.SerialException("Device is using silicon not suitable") + + if res[1] == PX4BootLoaderCommands.PROTO_FAILED: + raise serial.SerialException("Failed to execute command") + if res[1] == PX4BootLoaderCommands.PROTO_INVALID: + raise serial.SerialException("Invalid command") + + return res + + def _safe_serial_write(self, data: bytearray) -> None: + bytes_written = self.port.write(data) + if bytes_written != len(data): + raise serial.SerialException("Invalid number of bytes written to serial port") + + def _send_serial_command(self, command: PX4BootLoaderCommands, timeout: int = 1) -> None: + buffer = bytearray([command, PX4BootLoaderCommands.PROTO_EOC]) + + self._safe_serial_write(buffer) + self.port.flush() + + self._read_command_response(timeout) + + def _sync(self) -> None: + # Clear the buffer prior to syncing + self.port.read_all() + + # Getting in sync some times requires multiple attempts + for _ in range(10): + try: + self._send_serial_command(PX4BootLoaderCommands.PROTO_GET_SYNC) + return + except Exception: + time.sleep(0.1) + + raise Exception("Failed to sync with bootloader") + + def _proto_get_device(self, command: PX4BootLoaderCommands) -> int: + buffer = bytearray([PX4BootLoaderCommands.PROTO_GET_DEVICE, command, PX4BootLoaderCommands.PROTO_EOC]) + + self._safe_serial_write(buffer) + val = self._safe_read(4, 1) + self._read_command_response(1) + + return int.from_bytes(val, byteorder="little") diff --git a/core/services/ardupilot_manager/firmware/__init__.py b/core/services/ardupilot_manager/autopilot/firmwares/px4/px4_firm.py similarity index 100% rename from core/services/ardupilot_manager/firmware/__init__.py rename to core/services/ardupilot_manager/autopilot/firmwares/px4/px4_firm.py diff --git a/core/services/ardupilot_manager/autopilot/platform.py b/core/services/ardupilot_manager/autopilot/platform.py new file mode 100644 index 0000000000..c11e41723f --- /dev/null +++ b/core/services/ardupilot_manager/autopilot/platform.py @@ -0,0 +1,14 @@ +import abc + +class AutopilotPlatform(abc.ABC): + @abc.abstractmethod + def start(self) -> None: + raise NotImplementedError + + @abc.abstractmethod + def stop(self) -> None: + raise NotImplementedError + + @abc.abstractmethod + def restart(self) -> None: + raise NotImplementedError diff --git a/core/services/ardupilot_manager/autopilot/platforms/linux.py b/core/services/ardupilot_manager/autopilot/platforms/linux.py new file mode 100644 index 0000000000..69280454cd --- /dev/null +++ b/core/services/ardupilot_manager/autopilot/platforms/linux.py @@ -0,0 +1,13 @@ +from loguru import logger + +from autopilot.platform import AutopilotPlatform + +class LinuxPlatform(AutopilotPlatform): + def start(self) -> None: + print("Starting AutopiPlatformLinux") + + def stop(self) -> None: + print("Stopping AutopiPlatformLinux") + + def restart(self) -> None: + print("Restarting AutopiPlatformLinux") diff --git a/core/services/ardupilot_manager/flight_controller_detector/__init__.py b/core/services/ardupilot_manager/autopilot/platforms/serial.py similarity index 100% rename from core/services/ardupilot_manager/flight_controller_detector/__init__.py rename to core/services/ardupilot_manager/autopilot/platforms/serial.py diff --git a/core/services/ardupilot_manager/flight_controller_detector/linux/__init__.py b/core/services/ardupilot_manager/autopilot/platforms/sitl.py similarity index 100% rename from core/services/ardupilot_manager/flight_controller_detector/linux/__init__.py rename to core/services/ardupilot_manager/autopilot/platforms/sitl.py diff --git a/core/services/ardupilot_manager/autopilot_manager.py b/core/services/ardupilot_manager/autopilot_manager.py new file mode 100644 index 0000000000..23d9f759f9 --- /dev/null +++ b/core/services/ardupilot_manager/autopilot_manager.py @@ -0,0 +1,49 @@ +from copy import deepcopy +from typing import Optional + +from loguru import logger + +from autopilot.platform import AutopilotPlatform +from autopilot.firmware import AutopilotFirmware +from controller.controller import FlightController +from settings import Settings +from mavlink_proxy.Endpoint import Endpoint +from mavlink_proxy.Manager import Manager as MavlinkManager + + +class AutopilotManager: + def __init__(self) -> None: + self.controller: Optional[FlightController] = None + self.platform: Optional[AutopilotPlatform] = None + self.firmware: Optional[AutopilotFirmware] = None + + self.settings = Settings() + self.settings.create_app_folders() + + # Load settings and do the initial configuration + if self.settings.load(): + logger.info(f"Loaded settings from {self.settings.settings_file}.") + logger.debug(self.settings.content) + else: + self.settings.create_settings_file() + + async def setup(self) -> None: + # This is the logical continuation of __init__(), extracted due to its async nature + self.configuration = deepcopy(self.settings.content) + self.mavlink_manager = MavlinkManager(self.load_preferred_router()) + if not self.load_preferred_router(): + await self.set_preferred_router(self.mavlink_manager.available_interfaces()[0].name()) + logger.info(f"Setting {self.mavlink_manager.available_interfaces()[0].name()} as preferred router.") + self.mavlink_manager.set_logdir(self.settings.log_path) + + self._load_endpoints() + self.ardupilot_subprocess: Optional[Any] = None + + self.firmware_manager = FirmwareManager( + self.settings.firmware_folder, self.settings.defaults_folder, self.settings.user_firmware_folder + ) + self.vehicle_manager = VehicleManager() + + self.should_be_running = False + self.remove_old_logs() + self.current_sitl_frame = self.load_sitl_frame() diff --git a/core/services/ardupilot_manager/controller/controller.py b/core/services/ardupilot_manager/controller/controller.py new file mode 100644 index 0000000000..8393eed4ad --- /dev/null +++ b/core/services/ardupilot_manager/controller/controller.py @@ -0,0 +1,26 @@ +from enum import Enum +from typing import List, Optional + +from pydantic import BaseModel + +from controller.platform import Platform, PlatformType + + +class FlightControllerFlags(str, Enum): + """Flags for the Flight-controller class.""" + + is_bootloader = "is_bootloader" + + +class FlightController(BaseModel): + """Flight-controller board.""" + + name: str + manufacturer: Optional[str] + platform: Platform + path: Optional[str] + flags: List[FlightControllerFlags] = [] + + @property + def type(self) -> PlatformType: + return self.platform.type diff --git a/core/services/ardupilot_manager/controller/detector/__init__.py b/core/services/ardupilot_manager/controller/detector/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/core/services/ardupilot_manager/flight_controller_detector/board_identification.py b/core/services/ardupilot_manager/controller/detector/board_identification.py similarity index 97% rename from core/services/ardupilot_manager/flight_controller_detector/board_identification.py rename to core/services/ardupilot_manager/controller/detector/board_identification.py index d536497b0e..c8528479b7 100644 --- a/core/services/ardupilot_manager/flight_controller_detector/board_identification.py +++ b/core/services/ardupilot_manager/controller/detector/board_identification.py @@ -3,7 +3,7 @@ from pydantic import BaseModel -from typedefs import Platform +from controller.platform import Platform class SerialAttr(str, Enum): diff --git a/core/services/ardupilot_manager/flight_controller_detector/Detector.py b/core/services/ardupilot_manager/controller/detector/detector.py similarity index 94% rename from core/services/ardupilot_manager/flight_controller_detector/Detector.py rename to core/services/ardupilot_manager/controller/detector/detector.py index a1753a14b8..64c0048a9d 100644 --- a/core/services/ardupilot_manager/flight_controller_detector/Detector.py +++ b/core/services/ardupilot_manager/controller/detector/detector.py @@ -4,9 +4,10 @@ from commonwealth.utils.general import is_running_as_root from serial.tools.list_ports_linux import SysFS, comports -from flight_controller_detector.board_identification import identifiers -from flight_controller_detector.linux.detector import LinuxFlightControllerDetector -from typedefs import FlightController, FlightControllerFlags, Platform +from controller.detector.board_identification import identifiers +from controller.detector.linux.detector import LinuxFlightControllerDetector + +from controller.controller import FlightController, FlightControllerFlags, Platform class Detector: diff --git a/core/services/ardupilot_manager/controller/detector/linux/__init__.py b/core/services/ardupilot_manager/controller/detector/linux/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/core/services/ardupilot_manager/flight_controller_detector/linux/argonot.py b/core/services/ardupilot_manager/controller/detector/linux/argonot.py similarity index 63% rename from core/services/ardupilot_manager/flight_controller_detector/linux/argonot.py rename to core/services/ardupilot_manager/controller/detector/linux/argonot.py index e486d39429..1c7ae870d3 100644 --- a/core/services/ardupilot_manager/flight_controller_detector/linux/argonot.py +++ b/core/services/ardupilot_manager/controller/detector/linux/argonot.py @@ -1,5 +1,5 @@ -from flight_controller_detector.linux.navigator import NavigatorPi4 -from typedefs import Platform +from controller.detector.linux.navigator import NavigatorPi4 +from controller.platform import Platform class Argonot(NavigatorPi4): diff --git a/core/services/ardupilot_manager/flight_controller_detector/linux/detector.py b/core/services/ardupilot_manager/controller/detector/linux/detector.py similarity index 74% rename from core/services/ardupilot_manager/flight_controller_detector/linux/detector.py rename to core/services/ardupilot_manager/controller/detector/linux/detector.py index 3b1dd5c258..e239c853f3 100644 --- a/core/services/ardupilot_manager/flight_controller_detector/linux/detector.py +++ b/core/services/ardupilot_manager/controller/detector/linux/detector.py @@ -1,11 +1,8 @@ -# pylint: disable=unused-import -from typing import List, Optional, Type +from typing import Optional from loguru import logger -from flight_controller_detector.linux.argonot import Argonot -from flight_controller_detector.linux.linux_boards import LinuxFlightController -from flight_controller_detector.linux.navigator import NavigatorPi4, NavigatorPi5 +from controller.detector.linux.linux_boards import LinuxFlightController class LinuxFlightControllerDetector: diff --git a/core/services/ardupilot_manager/flight_controller_detector/linux/linux_boards.py b/core/services/ardupilot_manager/controller/detector/linux/linux_boards.py similarity index 91% rename from core/services/ardupilot_manager/flight_controller_detector/linux/linux_boards.py rename to core/services/ardupilot_manager/controller/detector/linux/linux_boards.py index 9907597123..5ff33b0603 100644 --- a/core/services/ardupilot_manager/flight_controller_detector/linux/linux_boards.py +++ b/core/services/ardupilot_manager/controller/detector/linux/linux_boards.py @@ -2,7 +2,8 @@ from smbus2 import SMBus -from typedefs import FlightController, PlatformType, Serial +from controller.serial import Serial +from controller.controller import FlightController, PlatformType class LinuxFlightController(FlightController): diff --git a/core/services/ardupilot_manager/flight_controller_detector/linux/navigator.py b/core/services/ardupilot_manager/controller/detector/linux/navigator.py similarity index 94% rename from core/services/ardupilot_manager/flight_controller_detector/linux/navigator.py rename to core/services/ardupilot_manager/controller/detector/linux/navigator.py index 0ebf9da4e4..35533996f7 100644 --- a/core/services/ardupilot_manager/flight_controller_detector/linux/navigator.py +++ b/core/services/ardupilot_manager/controller/detector/linux/navigator.py @@ -2,8 +2,9 @@ from commonwealth.utils.commands import load_file -from flight_controller_detector.linux.linux_boards import LinuxFlightController -from typedefs import Platform, Serial +from controller.detector.linux.linux_boards import LinuxFlightController +from controller.platform import Platform +from controller.serial import Serial class Navigator(LinuxFlightController): diff --git a/core/services/ardupilot_manager/controller/platform.py b/core/services/ardupilot_manager/controller/platform.py new file mode 100644 index 0000000000..cab7cfd315 --- /dev/null +++ b/core/services/ardupilot_manager/controller/platform.py @@ -0,0 +1,100 @@ +from enum import Enum, StrEnum, auto +from platform import machine + + +class SITLFrame(str, Enum): + """Valid SITL frame types""" + + QUADPLANE = "quadplane" + XPLANE = "xplane" + FIREFLY = "firefly" + PLUS_CONFIG = "+" + QUAD = "quad" + COPTER = "copter" + X_CONFIG = "x" + BFXREV = "bfxrev" + BFX = "bfx" + DJIX = "djix" + CWX = "cwx" + HEXA = "hexa" + HEXA_CWX = "hexa-cwx" + HEXA_DJI = "hexa-dji" + OCTA = "octa" + OCTA_CWX = "octa-cwx" + OCTA_DJI = "octa-dji" + OCTA_QUAD_CWX = "octa-quad-cwx" + DODECA_HEXA = "dodeca-hexa" + TRI = "tri" + Y_SIX = "y6" + HELI = "heli" + HELI_DUAL = "heli-dual" + HELI_COMPOUND = "heli-compound" + SINGLECOPTER = "singlecopter" + COAXCOPTER = "coaxcopter" + ROVER = "rover" + BALANCEBOT = "balancebot" + SAILBOAT = "sailboat" + MOTORBOAT = "motorboat" + CRRCSIM = "crrcsim" + JSBSIM = "jsbsim" + FLIGHTAXIS = "flightaxis" + GAZEBO = "gazebo" + LAST_LETTER = "last_letter" + TRACKER = "tracker" + BALLOON = "balloon" + PLANE = "plane" + CALIBRATION = "calibration" + VECTORED = "vectored" + VECTORED_6DOF = "vectored_6dof" + SILENTWINGS = "silentwings" + MORSE = "morse" + AIRSIM = "airsim" + SCRIMMAGE = "scrimmage" + WEBOTS = "webots" + JSON = " JSON" + UNDEFINED = " undefined" + + +def get_sitl_platform_name(machine_arch: str) -> str: + """Get SITL platform name based on machine architecture.""" + + if "arm" not in machine_arch.lower() and "aarch" not in machine_arch.lower(): + return "SITL_x86_64_linux_gnu" + return "SITL_arm_linux_gnueabihf" + + +class PlatformType(StrEnum): + Serial = auto() + Linux = auto() + SITL = auto() + Unknown = auto() + + +class Platform(str, Enum): + """Valid Ardupilot platform types. + The Enum values are 1:1 representations of the platforms available on the ArduPilot manifest.""" + + Pixhawk1 = "Pixhawk1" + Pixhawk4 = "Pixhawk4" + Pixhawk6X = "Pixhawk6X" + Pixhawk6C = "Pixhawk6C" + CubeOrange = "CubeOrange" + GenericSerial = "GenericSerial" + Navigator = "navigator" + Argonot = "argonot" + SITL = get_sitl_platform_name(machine()) + + @property + def type(self) -> PlatformType: + platform_types = { + Platform.Pixhawk1: PlatformType.Serial, + Platform.Pixhawk4: PlatformType.Serial, + Platform.Pixhawk6X: PlatformType.Serial, + Platform.Pixhawk6C: PlatformType.Serial, + Platform.CubeOrange: PlatformType.Serial, + Platform.GenericSerial: PlatformType.Serial, + Platform.Navigator: PlatformType.Linux, + Platform.Argonot: PlatformType.Linux, + Platform.SITL: PlatformType.SITL, + } + return platform_types.get(self, PlatformType.Unknown) diff --git a/core/services/ardupilot_manager/controller/serial.py b/core/services/ardupilot_manager/controller/serial.py new file mode 100644 index 0000000000..6089711d3d --- /dev/null +++ b/core/services/ardupilot_manager/controller/serial.py @@ -0,0 +1,43 @@ +import ipaddress +import re +from pathlib import Path +from typing import Any + +from pydantic import BaseModel, validator + + +class Serial(BaseModel): + """Simplified representation of linux serial port configurations, + gets transformed into command line arguments such as + --serial1 /dev/ttyS0 + --serial3 /dev/ttyAMA1 + --serial4 /dev/ttyAMA2 + --serial5 /dev/ttyAMA3 + """ + + port: str + endpoint: str + + @validator("port") + @classmethod + def valid_letter(cls: Any, value: str) -> str: + if value in "BCDEFGH" and len(value) == 1: + return value + raise ValueError(f"Invalid serial port: {value}. These must be between B and H. A is reserved.") + + @validator("endpoint") + @classmethod + def valid_endpoint(cls: Any, value: str) -> str: + if Path(value).exists(): + return value + if re.compile(r"tcp:\d*:wait$").match(value): + return value + matches = re.compile(r"(tcpclient|udp|tcpin|udpin):(?P(\d*\.){3}\d+):(?P\d*)$").match(value) + if matches: + ipaddress.ip_address(matches.group("ip")) + if 0 <= int(matches.group("port")) <= 65535: + return value + raise ValueError(f"Invalid endpoint configuration: {value}") + + def __hash__(self) -> int: # make hashable BaseModel subclass + return hash(self.port + self.endpoint) diff --git a/core/services/ardupilot_manager/firmware/FirmwareDownload.py b/core/services/ardupilot_manager/firmware/FirmwareDownload.py deleted file mode 100644 index 1c6591b1f5..0000000000 --- a/core/services/ardupilot_manager/firmware/FirmwareDownload.py +++ /dev/null @@ -1,232 +0,0 @@ -import gzip -import json -import os -import pathlib -import random -import ssl -import string -import tempfile -from typing import Any, Dict, List, Optional -from urllib.parse import urlparse -from urllib.request import urlopen, urlretrieve - -from commonwealth.utils.decorators import temporary_cache -from loguru import logger -from packaging.version import Version - -from exceptions import ( - FirmwareDownloadFail, - InvalidManifest, - ManifestUnavailable, - NoCandidate, - NoVersionAvailable, -) -from typedefs import FirmwareFormat, Platform, PlatformType, Vehicle - -# TODO: This should be not necessary -# Disable SSL verification -if not os.environ.get("PYTHONHTTPSVERIFY", "") and getattr(ssl, "_create_unverified_context", None): - ssl._create_default_https_context = ssl._create_unverified_context - - -class FirmwareDownloader: - _manifest_remote = "https://firmware.ardupilot.org/manifest.json.gz" - _supported_firmware_formats = { - PlatformType.SITL: FirmwareFormat.ELF, - PlatformType.Serial: FirmwareFormat.APJ, - PlatformType.Linux: FirmwareFormat.ELF, - } - - def __init__(self) -> None: - self._manifest: Dict[str, Any] = {} - - @staticmethod - def _generate_random_filename(length: int = 16) -> pathlib.Path: - """Generate a random name for a temporary file. - - Args: - length (int, optional): Size of the name. Defaults to 16. - - Returns: - pathlib.Path: Path of the temporary file. - """ - - filename = "".join(random.sample(string.ascii_lowercase, length)) - folder = pathlib.Path(tempfile.gettempdir()).absolute() - return pathlib.Path.joinpath(folder, filename) - - @staticmethod - def _download(url: str) -> pathlib.Path: - """Download a specific file for a temporary location. - - Args: - url (str): Url to download the file. - - Returns: - pathlib.Path: File of the temporary file. - """ - - # We append the url filename to the generated random name to avoid collisions and preserve extension - name = pathlib.Path(urlparse(url).path).name - filename = pathlib.Path(f"{FirmwareDownloader._generate_random_filename()}-{name}") - try: - logger.debug(f"Downloading: {url}") - urlretrieve(url, filename) - except Exception as error: - raise FirmwareDownloadFail("Could not download firmware file.") from error - return filename - - def _manifest_is_valid(self) -> bool: - """Check if internal content is valid and update it if not. - - Returns: - bool: True if valid, False if was unable to validate. - """ - return "format-version" in self._manifest or self.download_manifest() - - def download_manifest(self) -> bool: - """Download ArduPilot manifest file - - Returns: - bool: True if file was downloaded and validated, False if not. - """ - with urlopen(FirmwareDownloader._manifest_remote) as http_response: - manifest_gzip = http_response.read() - manifest = gzip.decompress(manifest_gzip) - self._manifest = json.loads(manifest) - - if "format-version" not in self._manifest: - raise InvalidManifest("Invalid Manifest file. Does not contain 'format-version' key.") - - if self._manifest["format-version"] != "1.0.0": - logger.warning("Firmware description file format changed, compatibility may be broken.") - - return True - - def _find_version_item(self, **args: str) -> List[Dict[str, Any]]: - """Find version objects in the manifest that match the specific case of **args - - The arguments should follow the same name described in the dictionary inside the manifest - for firmware item. Valid arguments can be: - mav_type, vehicletype, mav_firmware_version_minor, format, mav_firmware_version_type, - platform, latest and others. `-` should be replaced by `_` to use valid python arguments. - E.g: `self._find_version_item(vehicletype="Sub", platform="Pixhawk1", mav_firmware_version_type="4.0.1")` - - Returns: - List[Dict[str, Any]]: A list of firmware items that match the arguments. - """ - if not self._manifest and not self.download_manifest(): - raise ManifestUnavailable("Manifest file is not available. Cannot use it to find firmware candidates.") - - found_version_item = [] - - # Make sure that the item matches all args value - for item in self._manifest["firmware"]: - for key, value in args.items(): - real_key = key.replace("_", "-") - if real_key not in item or item[real_key] != value: - break - else: - found_version_item.append(item) - - return found_version_item - - @temporary_cache(timeout_seconds=3600) - def get_available_versions(self, vehicle: Vehicle, platform: Platform) -> List[str]: - """Get available firmware versions for the specific plataform and vehicle - - Args: - vehicle (Vehicle): Desired vehicle. - platform (Platform): Desired platform. - - Returns: - List[str]: List of available versions that match the specific desired configuration. - """ - available_versions: List[str] = [] - - if not self._manifest_is_valid(): - raise InvalidManifest("Manifest file is invalid. Cannot use it to find available versions.") - - items = self._find_version_item(vehicletype=vehicle.value, platform=platform.value) - - for item in items: - if item["format"] == FirmwareDownloader._supported_firmware_formats[platform.type]: - available_versions.append(item["mav-firmware-version-type"]) - - return available_versions - - @temporary_cache(timeout_seconds=3600) - def get_download_url(self, vehicle: Vehicle, platform: Platform, version: str = "") -> str: - """Find a specific firmware URL from manifest that matches the arguments. - - Args: - vehicle (Vehicle): Desired vehicle. - platform (Platform): Desired platform. - version (str, optional): Desired version, if None provided the latest stable will be used. - Defaults to None. - - Returns: - str: URL of valid firmware. - """ - versions = self.get_available_versions(vehicle, platform) - logger.debug(f"Got following versions for {vehicle} running {platform}: {versions}") - - if not versions: - raise NoVersionAvailable(f"Could not find available firmware versions for {platform}/{vehicle}.") - - if version and version not in versions: - raise NoVersionAvailable(f"Version {version} was not found for {platform}/{vehicle}.") - - firmware_format = FirmwareDownloader._supported_firmware_formats[platform.type] - - # Autodetect the latest supported version. - # For .apj firmwares (e.g. Pixhawk), we use the latest STABLE version while for the others (e.g. SITL and - # Navigator) we use latest BETA. Specially on this development phase of the blueos-docker/navigator, using - # the BETA release allow us to track and fix introduced bugs faster. - if not version: - if firmware_format == FirmwareFormat.APJ: - supported_versions = [version for version in versions if "STABLE" in version] - newest_version: Optional[str] = None - for supported_version in supported_versions: - semver_version = supported_version.split("-")[1] - if not newest_version or Version(newest_version) < Version(semver_version): - newest_version = semver_version - if not newest_version: - raise NoVersionAvailable(f"No firmware versions found for {platform}/{vehicle}.") - version = f"STABLE-{newest_version}" - else: - version = "BETA" - - items = self._find_version_item( - vehicletype=vehicle.value, - platform=platform.value, - mav_firmware_version_type=version, - format=firmware_format, - ) - - if len(items) == 0: - raise NoCandidate( - f"Found no candidate for configuration: {vehicle=}, {platform=}, {version=}, {firmware_format=}" - ) - - if len(items) != 1: - logger.warning(f"Found a number of candidates different of one ({len(items)}): {items}.") - - item = items[0] - logger.debug(f"Downloading following firmware: {item}") - return str(item["url"]) - - def download(self, vehicle: Vehicle, platform: Platform, version: str = "") -> pathlib.Path: - """Download a specific firmware that matches the arguments. - - Args: - vehicle (Vehicle): Desired vehicle. - platform (Platform): Desired platform. - version (str, optional): Desired version, if None provided the latest stable will be used. - Defaults to None. - - Returns: - pathlib.Path: Temporary path for the firmware file. - """ - url = self.get_download_url(vehicle, platform, version) - return FirmwareDownloader._download(url) diff --git a/core/services/ardupilot_manager/firmware/FirmwareInstall.py b/core/services/ardupilot_manager/firmware/FirmwareInstall.py deleted file mode 100644 index b14e7b4738..0000000000 --- a/core/services/ardupilot_manager/firmware/FirmwareInstall.py +++ /dev/null @@ -1,166 +0,0 @@ -import json -import os -import pathlib -import platform as system_platform -import shutil -import stat -from typing import Optional, Union - -from ardupilot_fw_decoder import BoardSubType, BoardType, Decoder -from elftools.elf.elffile import ELFFile - -from exceptions import FirmwareInstallFail, InvalidFirmwareFile, UnsupportedPlatform -from firmware.FirmwareDownload import FirmwareDownloader -from firmware.FirmwareUpload import FirmwareUploader -from typedefs import FirmwareFormat, FlightController, Platform, PlatformType - - -def get_board_id(platform: Platform) -> int: - ardupilot_board_ids = { - Platform.Pixhawk1: 9, - Platform.Pixhawk4: 50, - Platform.Pixhawk6X: 53, - Platform.Pixhawk6C: 56, - Platform.CubeOrange: 140, - } - return ardupilot_board_ids.get(platform, -1) - - -def is_valid_elf_type(elf_arch: str) -> bool: - arch_mapping = {"i386": "x86", "x86_64": "x64", "armv7l": "ARM", "aarch64": "AArch64"} - system_arch = system_platform.machine() - system_arch = arch_mapping.get(system_arch, system_arch) - - if system_arch == elf_arch: - return True - if system_arch == "AArch64" and elf_arch == "ARM": - return True - return False - - -def get_correspondent_decoder_platform(current_platform: Platform) -> Union[BoardType, BoardSubType]: - correspondent_decoder_platform = { - Platform.SITL: BoardType.SITL, - Platform.Navigator: BoardSubType.LINUX_NAVIGATOR, - Platform.Argonot: BoardSubType.LINUX_NAVIGATOR, - } - return correspondent_decoder_platform.get(current_platform, BoardType.EMPTY) - - -class FirmwareInstaller: - """Abstracts the install procedures for different supported boards. - - For proper usage one needs to set the platform before using other methods. - - Args: - firmware_folder (pathlib.Path): Path for firmware folder. - """ - - def __init__(self) -> None: - pass - - @staticmethod - def _validate_apj(firmware_path: pathlib.Path, platform: Platform) -> None: - try: - with open(firmware_path, "r", encoding="utf-8") as firmware_file: - firmware_data = firmware_file.read() - firm_board_id = int(json.loads(firmware_data).get("board_id", -1)) - expected_board_id = get_board_id(platform) - if expected_board_id == -1: - raise UnsupportedPlatform("Firmware validation is not implemented for this board yet.") - if firm_board_id == -1: - raise InvalidFirmwareFile("Could not find board_id specification in the firmware file.") - if firm_board_id != expected_board_id: - raise InvalidFirmwareFile(f"Expected board_id {expected_board_id}, found {firm_board_id}.") - return - except Exception as error: - raise InvalidFirmwareFile("Could not load firmware file for validation.") from error - - @staticmethod - def _validate_elf(firmware_path: pathlib.Path, platform: Platform) -> None: - # Check if firmware's architecture matches system's architecture - with open(firmware_path, "rb") as file: - try: - elf_file = ELFFile(file) - firm_arch = elf_file.get_machine_arch() - except Exception as error: - raise InvalidFirmwareFile("Given file is not a valid ELF.") from error - if not is_valid_elf_type(firm_arch): - raise InvalidFirmwareFile( - f"Firmware's architecture ({firm_arch}) does not match system's ({system_platform.machine()})." - ) - - # Check if firmware's platform matches system platform - try: - firm_decoder = Decoder() - firm_decoder.process(firmware_path) - firm_board = BoardType(firm_decoder.fwversion.board_type) - firm_sub_board = BoardSubType(firm_decoder.fwversion.board_subtype) - current_decoder_platform = get_correspondent_decoder_platform(platform) - if current_decoder_platform not in [firm_board, firm_sub_board]: - raise InvalidFirmwareFile( - ( - f"Firmware's platform ({current_decoder_platform}) does not match system's ({platform})," - f"for board ({firm_board}) or sub board ({firm_sub_board})." - ) - ) - except Exception as error: - raise InvalidFirmwareFile("Given firmware is not a supported version.") from error - - @staticmethod - def validate_firmware(firmware_path: pathlib.Path, platform: Platform) -> None: - """Check if given firmware is valid for given platform.""" - firmware_format = FirmwareDownloader._supported_firmware_formats[platform.type] - - if firmware_format == FirmwareFormat.APJ: - FirmwareInstaller._validate_apj(firmware_path, platform) - return - - if firmware_format == FirmwareFormat.ELF: - FirmwareInstaller._validate_elf(firmware_path, platform) - return - - raise UnsupportedPlatform("Firmware validation is not implemented for this platform.") - - @staticmethod - def add_run_permission(firmware_path: pathlib.Path) -> None: - """Add running permission for firmware file.""" - # Make the binary executable - ## S_IX: Execution permission for - ## OTH: Others - ## USR: User - ## GRP: Group - ## For more information: https://www.gnu.org/software/libc/manual/html_node/Permission-Bits.html - os.chmod(firmware_path, firmware_path.stat().st_mode | stat.S_IXOTH | stat.S_IXUSR | stat.S_IXGRP) - - def install_firmware( - self, - new_firmware_path: pathlib.Path, - board: FlightController, - firmware_dest_path: Optional[pathlib.Path] = None, - ) -> None: - """Install given firmware.""" - if not new_firmware_path.is_file(): - raise InvalidFirmwareFile("Given path is not a valid file.") - - firmware_format = FirmwareDownloader._supported_firmware_formats[board.platform.type] - if firmware_format == FirmwareFormat.ELF: - self.add_run_permission(new_firmware_path) - - self.validate_firmware(new_firmware_path, board.platform) - - if board.type == PlatformType.Serial: - firmware_uploader = FirmwareUploader() - if not board.path: - raise ValueError("Board path not available.") - firmware_uploader.set_autopilot_port(pathlib.Path(board.path)) - firmware_uploader.upload(new_firmware_path) - return - if firmware_format == FirmwareFormat.ELF: - # Using copy() instead of move() since the last can't handle cross-device properly (e.g. docker binds) - if not firmware_dest_path: - raise FirmwareInstallFail("Firmware file destination not provided.") - shutil.copy(new_firmware_path, firmware_dest_path) - return - - raise UnsupportedPlatform("Firmware install is not implemented for this platform.") diff --git a/core/services/ardupilot_manager/firmware/FirmwareManagement.py b/core/services/ardupilot_manager/firmware/FirmwareManagement.py deleted file mode 100644 index 1d063a1a07..0000000000 --- a/core/services/ardupilot_manager/firmware/FirmwareManagement.py +++ /dev/null @@ -1,173 +0,0 @@ -import pathlib -import shutil -import subprocess -import tempfile -from pathlib import Path -from typing import List, Optional - -from loguru import logger - -from exceptions import ( - FirmwareInstallFail, - NoDefaultFirmwareAvailable, - NoVersionAvailable, - UnsupportedPlatform, -) -from firmware.FirmwareDownload import FirmwareDownloader -from firmware.FirmwareInstall import FirmwareInstaller -from typedefs import ( - Firmware, - FirmwareFormat, - FlightController, - Parameters, - Platform, - PlatformType, - Vehicle, -) - - -class FirmwareManager: - def __init__( - self, firmware_folder: pathlib.Path, defaults_folder: pathlib.Path, user_defaults_folder: pathlib.Path - ) -> None: - self.firmware_folder = firmware_folder - self.defaults_folder = defaults_folder - self.user_defaults_folder = user_defaults_folder - self.firmware_download = FirmwareDownloader() - self.firmware_installer = FirmwareInstaller() - - @staticmethod - def firmware_name(platform: Platform) -> str: - """Get consistent firmware name for given platform.""" - return f"ardupilot_{platform.value.lower()}" - - def firmware_path(self, platform: Platform) -> pathlib.Path: - """Get firmware's path for given platform. This is the path where we expect to find - a valid Ardupilot binary for Linux boards.""" - return pathlib.Path.joinpath(self.firmware_folder, self.firmware_name(platform)) - - def default_user_firmware_path(self, platform: Platform) -> pathlib.Path: - """Get path of user-defined default firmware for given platform.""" - return pathlib.Path.joinpath(self.user_defaults_folder, self.firmware_name(platform) + "_default") - - def default_user_params_path(self, platform: Platform) -> pathlib.Path: - """Get path of user-defined default firmware for given platform.""" - return pathlib.Path.joinpath(self.user_defaults_folder, self.firmware_name(platform) + "params.params") - - def default_firmware_path(self, platform: Platform) -> pathlib.Path: - """Get path of default firmware for given platform.""" - if self.default_user_firmware_path(platform).is_file(): - return self.default_user_firmware_path(platform) - return pathlib.Path.joinpath(self.defaults_folder, self.firmware_name(platform)) - - def is_default_firmware_available(self, platform: Platform) -> bool: - return pathlib.Path.is_file(self.default_firmware_path(platform)) or pathlib.Path.is_file( - self.default_user_firmware_path(platform) - ) - - def is_firmware_installed(self, board: FlightController) -> bool: - """Check if firmware for given platform is installed.""" - if board.type == PlatformType.Serial: - # Assumes for now that a serial board always has a firmware installed, which is true most of the time - # TODO: Validate if properly. The uploader tool seems capable of doing this. - return True - - firmware_format = FirmwareDownloader._supported_firmware_formats[board.platform.type] - if firmware_format == FirmwareFormat.ELF: - return pathlib.Path.is_file(self.firmware_path(board.platform)) - - raise UnsupportedPlatform("Install check is not implemented for this platform.") - - def get_available_firmwares(self, vehicle: Vehicle, platform: Platform) -> List[Firmware]: - firmwares = [] - versions = self.firmware_download.get_available_versions(vehicle, platform) - if not versions: - raise NoVersionAvailable(f"Failed to find any version for vehicle {vehicle}.") - for version in versions: - try: - url = self.firmware_download.get_download_url(vehicle, platform, version) - firmware = Firmware(name=version, url=url) - firmwares.append(firmware) - except Exception as error: - logger.debug(f"Error fetching URL for version {version} on vehicle {vehicle}: {error}") - if not firmwares: - raise NoVersionAvailable(f"Failed do get any valid URL for vehicle {vehicle}.") - return firmwares - - def install_firmware_from_file( - self, new_firmware_path: pathlib.Path, board: FlightController, default_parameters: Optional[Parameters] = None - ) -> None: - if default_parameters is not None: - if board.platform.type == PlatformType.Serial: - self.embed_params_into_apj(new_firmware_path, default_parameters) - else: - self.save_params_to_default_linux_path(board.platform, default_parameters) - try: - if board.type == PlatformType.Serial: - self.firmware_installer.install_firmware(new_firmware_path, board) - else: - self.firmware_installer.install_firmware(new_firmware_path, board, self.firmware_path(board.platform)) - logger.info(f"Succefully installed firmware for {board.name}.") - except Exception as error: - raise FirmwareInstallFail("Could not install firmware.") from error - - def embed_params_into_apj(self, firmware_path: Path, default_parameters: Parameters) -> None: - # create a temporary file - with tempfile.NamedTemporaryFile(mode="w+", delete=False) as temp: - # write each parameter to the file - for param, value in default_parameters.params.items(): - temp.write(f"{param}={value}\n") - # make sure all data is written to the file - temp.flush() - # construct the command as a list of strings - command = ["apj_tool.py", str(firmware_path), "--set-file", temp.name] - # use subprocess.run to execute the command - result = subprocess.run(command, capture_output=True, text=True, check=False) - # check for errors - if result.returncode != 0: - logger.error(f"Error setting parameters: {result.stderr}") - - # clean up the temporary file - Path(temp.name).unlink() - - def save_params_to_default_linux_path(self, platform: Platform, default_parameters: Optional[Parameters]) -> None: - # write the params to the default params file as csv format - if default_parameters: - with open(self.default_user_params_path(platform), "w", encoding="utf-8") as f: - for key in default_parameters.params.keys(): - f.write(f"{key},{default_parameters.params[key]}\n") - else: - # if no params are provided, delete the file if it exists - if self.default_user_params_path(platform).is_file(): - self.default_user_params_path(platform).unlink() - - def install_firmware_from_url( - self, - url: str, - board: FlightController, - makeDefault: bool = False, - default_parameters: Optional[Parameters] = None, - ) -> None: - temporary_file = self.firmware_download._download(url.strip()) - if default_parameters is not None: - if board.platform.type == PlatformType.Serial: - self.embed_params_into_apj(temporary_file, default_parameters) - else: - self.save_params_to_default_linux_path(board.platform, default_parameters) - if makeDefault: - shutil.copy(temporary_file, self.default_user_firmware_path(board.platform)) - self.install_firmware_from_file(temporary_file, board, default_parameters) - - def install_firmware_from_params(self, vehicle: Vehicle, board: FlightController, version: str = "") -> None: - url = self.firmware_download.get_download_url(vehicle, board.platform, version) - self.install_firmware_from_url(url, board) - - def restore_default_firmware(self, board: FlightController) -> None: - if not self.is_default_firmware_available(board.platform): - raise NoDefaultFirmwareAvailable(f"Default firmware not available for '{board.name}'.") - - self.install_firmware_from_file(self.default_firmware_path(board.platform), board) - - @staticmethod - def validate_firmware(firmware_path: pathlib.Path, platform: Platform) -> None: - FirmwareInstaller.validate_firmware(firmware_path, platform) diff --git a/core/services/ardupilot_manager/firmware/FirmwareUpload.py b/core/services/ardupilot_manager/firmware/FirmwareUpload.py deleted file mode 100644 index 46a41a8b93..0000000000 --- a/core/services/ardupilot_manager/firmware/FirmwareUpload.py +++ /dev/null @@ -1,82 +0,0 @@ -import pathlib -import shutil -import subprocess -import threading -import time - -from loguru import logger - -from exceptions import FirmwareUploadFail, InvalidUploadTool, UploadToolNotFound - - -class FirmwareUploader: - def __init__(self) -> None: - self._autopilot_port: pathlib.Path = pathlib.Path("/dev/autopilot") - self._baudrate_bootloader: int = 115200 - self._baudrate_flightstack: int = 57600 - - binary_path = shutil.which(self.binary_name()) - if binary_path is None: - raise UploadToolNotFound("Uploader binary not found on system's PATH.") - self._binary = pathlib.Path(binary_path) - - self.validate_binary() - - @staticmethod - def binary_name() -> str: - return "ardupilot_fw_uploader.py" - - def binary(self) -> pathlib.Path: - return self._binary - - def validate_binary(self) -> None: - try: - subprocess.check_output([self.binary(), "--help"]) - except subprocess.CalledProcessError as error: - raise InvalidUploadTool(f"Binary returned {error.returncode} on '--help' call: {error.output}") from error - - def set_autopilot_port(self, port: pathlib.Path) -> None: - self._autopilot_port = port - - def set_baudrate_bootloader(self, baudrate: int) -> None: - self._baudrate_bootloader = baudrate - - def set_baudrate_flightstack(self, baudrate: int) -> None: - self._baudrate_flightstack = baudrate - - def upload(self, firmware_path: pathlib.Path) -> None: - logger.info("Starting upload of firmware to board.") - # pylint: disable=consider-using-with - process = subprocess.Popen( - f"{self.binary()} {firmware_path}" - f" --port {self._autopilot_port}" - f" --baud-bootloader {self._baudrate_bootloader}" - f" --baud-flightstack {self._baudrate_flightstack}", - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - shell=True, - encoding="utf-8", - ) - - # Using a timer of 180 seconds to prevent being stuck on upload. Upload usually takes 20-40 seconds. - timer = threading.Timer(180, process.kill) - try: - timer.start() - if process.stdout is not None: - for line in iter(process.stdout.readline, b""): - logger.debug(line) - if process.poll() is not None: - break - while process.poll() is None: - logger.debug("Waiting for upload tool to finish its job.") - time.sleep(0.5) - if process.returncode != 0: - raise FirmwareUploadFail(f"Upload process returned non-zero code {process.returncode}.") - logger.info("Successfully uploaded firmware to board.") - except Exception as error: - process.kill() - raise FirmwareUploadFail("Unable to upload firmware to board.") from error - finally: - timer.cancel() - # Give some time for the board to reboot (preventing fail reconnecting to it) - time.sleep(10) diff --git a/core/services/ardupilot_manager/firmware/test_FirmwareDownload.py b/core/services/ardupilot_manager/firmware/test_FirmwareDownload.py deleted file mode 100644 index 1246b7a417..0000000000 --- a/core/services/ardupilot_manager/firmware/test_FirmwareDownload.py +++ /dev/null @@ -1,60 +0,0 @@ -import os -import platform - -import pytest - -from firmware.FirmwareDownload import FirmwareDownloader -from typedefs import Platform, Vehicle - - -def test_static() -> None: - downloaded_file = FirmwareDownloader._download(FirmwareDownloader._manifest_remote) - assert downloaded_file, "Failed to download file." - assert downloaded_file.exists(), "Download file does not exist." - - smaller_valid_size_bytes = 180 * 1024 - assert downloaded_file.stat().st_size > smaller_valid_size_bytes, "Download file size is not big enough." - - -def test_firmware_download() -> None: - firmware_download = FirmwareDownloader() - assert firmware_download.download_manifest(), "Failed to download/validate manifest file." - - versions = firmware_download._find_version_item( - vehicletype="Sub", format="apj", mav_firmware_version_type="STABLE-4.0.1", platform=Platform.Pixhawk1 - ) - assert len(versions) == 1, "Failed to find a single firmware." - - versions = firmware_download._find_version_item( - vehicletype="Sub", mav_firmware_version_type="STABLE-4.0.1", platform=Platform.Pixhawk1 - ) - # There are two versions, one for the firmware and one with the bootloader - assert len(versions) == 2, "Failed to find multiple versions." - - available_versions = firmware_download.get_available_versions(Vehicle.Sub, Platform.Pixhawk1) - assert len(available_versions) == len(set(available_versions)), "Available versions are not unique." - - test_available_versions = ["STABLE-4.0.1", "STABLE-4.0.0", "OFFICIAL", "DEV", "BETA"] - assert len(set(available_versions)) >= len( - set(test_available_versions) - ), "Available versions are missing know versions." - - assert firmware_download.download( - Vehicle.Sub, Platform.Pixhawk1, "STABLE-4.0.1" - ), "Failed to download a valid firmware file." - - assert firmware_download.download(Vehicle.Sub, Platform.Pixhawk1), "Failed to download latest valid firmware file." - - assert firmware_download.download(Vehicle.Sub, Platform.Pixhawk4), "Failed to download latest valid firmware file." - - assert firmware_download.download(Vehicle.Sub, Platform.SITL), "Failed to download SITL." - - # skipt these tests for MacOS - if platform.system() == "Darwin": - pytest.skip("Skipping test for MacOS") - # It'll fail if running in an arch different of ARM - if "x86" in os.uname().machine: - assert firmware_download.download(Vehicle.Sub, Platform.Navigator), "Failed to download navigator binary." - else: - with pytest.raises(Exception): - firmware_download.download(Vehicle.Sub, Platform.Navigator) diff --git a/core/services/ardupilot_manager/firmware/test_FirmwareInstall.py b/core/services/ardupilot_manager/firmware/test_FirmwareInstall.py deleted file mode 100644 index 559e5aa30c..0000000000 --- a/core/services/ardupilot_manager/firmware/test_FirmwareInstall.py +++ /dev/null @@ -1,39 +0,0 @@ -import pathlib -import platform - -import pytest - -from exceptions import InvalidFirmwareFile -from firmware.FirmwareDownload import FirmwareDownloader -from firmware.FirmwareInstall import FirmwareInstaller -from typedefs import FlightController, Platform, Vehicle - - -def test_firmware_validation() -> None: - downloader = FirmwareDownloader() - installer = FirmwareInstaller() - - # Pixhawk1 and Pixhawk4 APJ firmwares should always work - temporary_file = downloader.download(Vehicle.Sub, Platform.Pixhawk1) - installer.validate_firmware(temporary_file, Platform.Pixhawk1) - - temporary_file = downloader.download(Vehicle.Sub, Platform.Pixhawk4) - installer.validate_firmware(temporary_file, Platform.Pixhawk4) - - # New SITL firmwares should always work, except for MacOS - # there are no SITL builds for MacOS - if platform.system() != "Darwin": - temporary_file = downloader.download(Vehicle.Sub, Platform.SITL, version="DEV") - installer.validate_firmware(temporary_file, Platform.SITL) - - # Raise when validating Navigator firmwares (as test platform is x86) - temporary_file = downloader.download(Vehicle.Sub, Platform.Navigator) - with pytest.raises(InvalidFirmwareFile): - installer.validate_firmware(temporary_file, Platform.Navigator) - - # Install SITL firmware - if platform.system() != "Darwin": - # there are no SITL builds for MacOS - temporary_file = downloader.download(Vehicle.Sub, Platform.SITL, version="DEV") - board = FlightController(name="SITL", manufacturer="ArduPilot Team", platform=Platform.SITL) - installer.install_firmware(temporary_file, board, pathlib.Path(f"{temporary_file}_dest")) diff --git a/core/services/ardupilot_manager/main.py b/core/services/ardupilot_manager/main.py old mode 100755 new mode 100644 index 2789fbb4a3..cbcdbd44a8 --- a/core/services/ardupilot_manager/main.py +++ b/core/services/ardupilot_manager/main.py @@ -1,324 +1,6 @@ -#! /usr/bin/env python3 -import argparse -import asyncio -import logging -import os -import shutil -from pathlib import Path -from typing import Any, Dict, List, Optional, Set -from commonwealth.mavlink_comm.exceptions import ( - FetchUpdatedMessageFail, - MavlinkMessageReceiveFail, - MavlinkMessageSendFail, -) -from commonwealth.mavlink_comm.typedefs import FirmwareInfo, MavlinkVehicleType -from commonwealth.utils.apis import ( - GenericErrorHandlingRoute, - PrettyJSONResponse, - StackedHTTPException, -) -from commonwealth.utils.decorators import single_threaded -from commonwealth.utils.general import is_running_as_root -from commonwealth.utils.logs import InterceptHandler, init_logger -from fastapi import Body, FastAPI, File, HTTPException, UploadFile, status -from fastapi.responses import HTMLResponse, PlainTextResponse -from fastapi_versioning import VersionedFastAPI, version -from loguru import logger -from uvicorn import Config, Server +def main() -> None: + pass -from ArduPilotManager import ArduPilotManager -from exceptions import InvalidFirmwareFile -from flight_controller_detector.Detector import Detector as BoardDetector -from mavlink_proxy.Endpoint import Endpoint -from settings import SERVICE_NAME -from typedefs import Firmware, FlightController, Parameters, Serial, SITLFrame, Vehicle - -FRONTEND_FOLDER = Path.joinpath(Path(__file__).parent.absolute(), "frontend") - -parser = argparse.ArgumentParser(description="ArduPilot Manager service for Blue Robotics BlueOS") -parser.add_argument("-s", "--sitl", help="run SITL instead of connecting any board", action="store_true") - -args = parser.parse_args() - -logging.basicConfig(handlers=[InterceptHandler()], level=0) -init_logger(SERVICE_NAME) - -app = FastAPI( - title="ArduPilot Manager API", - description="ArduPilot Manager is responsible for managing ArduPilot devices connected to BlueOS.", - default_response_class=PrettyJSONResponse, - debug=True, -) -app.router.route_class = GenericErrorHandlingRoute -logger.info("Starting ArduPilot Manager.") -autopilot = ArduPilotManager() -if not is_running_as_root(): - raise RuntimeError("ArduPilot manager needs to run with root privilege.") - - -async def target_board(board_name: Optional[str]) -> FlightController: - """Returns the board that should be used to perform operations on. - - Most of the API routes that have operations related to board management will give the option to perform those - operations on the running board or on some of the connected boards. This function abstract this logic that is - common on all the routes. - - If the `board_name` argument is None, it will check if there's a running board, and return it if so. If one - provides the `board_name`, this function will check if there's a connected board with that name and return it if so. - """ - if board_name is not None: - try: - return next(board for board in await autopilot.available_boards(True) if board.name == board_name) - except StopIteration as error: - raise ValueError("Chosen board not available.") from error - if autopilot.current_board is None: - raise RuntimeError("No board running and no target board set.") - return autopilot.current_board - - -def raise_lock(*raise_args: str, **kwargs: int) -> None: - """Raise a 423 HTTP Error status - - Raises: - HTTPException: Exception that the operation is already in progress. - """ - raise HTTPException(status_code=status.HTTP_423_LOCKED, detail="Operation already in progress.") - - -@app.get("/endpoints", response_model=List[Dict[str, Any]]) -@version(1, 0) -def get_available_endpoints() -> Any: - return list(map(Endpoint.as_dict, autopilot.get_endpoints())) - - -@app.post("/endpoints", status_code=status.HTTP_201_CREATED) -@version(1, 0) -async def create_endpoints(endpoints: Set[Endpoint] = Body(...)) -> Any: - await autopilot.add_new_endpoints(endpoints) - - -@app.delete("/endpoints", status_code=status.HTTP_200_OK) -@version(1, 0) -async def remove_endpoints(endpoints: Set[Endpoint] = Body(...)) -> Any: - await autopilot.remove_endpoints(endpoints) - - -@app.put("/endpoints", status_code=status.HTTP_200_OK) -@version(1, 0) -async def update_endpoints(endpoints: Set[Endpoint] = Body(...)) -> Any: - await autopilot.update_endpoints(endpoints) - - -@app.put("/serials", status_code=status.HTTP_200_OK) -@version(1, 0) -def update_serials(serials: List[Serial] = Body(...)) -> Any: - autopilot.update_serials(serials) - - -@app.get("/serials", response_model=List[Serial]) -@version(1, 0) -def get_serials() -> Any: - return autopilot.get_serials() - - -@app.get("/firmware_info", response_model=FirmwareInfo, summary="Get version and type of current firmware.") -@version(1, 0) -async def get_firmware_info() -> Any: - if not autopilot.current_board: - message = "No board running, firmware information is unavailable" - logger.warning(message) - return PlainTextResponse(message, status_code=503) - try: - return await autopilot.vehicle_manager.get_firmware_info() - except ValueError: - return PlainTextResponse("Failed to get autopilot version", status_code=500) - except MavlinkMessageSendFail: - return PlainTextResponse("Timed out requesting Firmware Info message", status_code=500) - - -@app.get("/vehicle_type", response_model=MavlinkVehicleType, summary="Get mavlink vehicle type.") -@version(1, 0) -async def get_vehicle_type() -> Any: - if not autopilot.current_board: - message = "No board running, vehicle type is unavailable" - logger.warning(message) - return PlainTextResponse(message, status_code=503) - try: - return await autopilot.vehicle_manager.get_vehicle_type() - except FetchUpdatedMessageFail as error: - return PlainTextResponse(f"Timed out fetching message: {error}", status_code=500) - except MavlinkMessageReceiveFail as error: - return PlainTextResponse(f"Failed to get vehicle type: {error}", status_code=500) - - -@app.post("/sitl_frame", summary="Set SITL Frame type.") -@version(1, 0) -async def set_sitl_frame(frame: SITLFrame) -> Any: - return autopilot.set_sitl_frame(frame) - - -@app.get("/firmware_vehicle_type", response_model=str, summary="Get firmware vehicle type.") -@version(1, 0) -async def get_firmware_vehicle_type() -> Any: - if not autopilot.current_board: - raise RuntimeError("Cannot fetch vehicle type info as there's no board running.") - return await autopilot.vehicle_manager.get_firmware_vehicle_type() - - -@app.get( - "/available_firmwares", - response_model=List[Firmware], - summary="Retrieve dictionary of available firmwares versions with their respective URL.", -) -@version(1, 0) -async def get_available_firmwares(vehicle: Vehicle, board_name: Optional[str] = None) -> Any: - return autopilot.get_available_firmwares(vehicle, (await target_board(board_name)).platform) - - -@app.post("/install_firmware_from_url", summary="Install firmware for given URL.") -@version(1, 0) -@single_threaded(callback=raise_lock) -async def install_firmware_from_url( - url: str, - board_name: Optional[str] = None, - make_default: bool = False, - parameters: Optional[Parameters] = None, -) -> Any: - try: - await autopilot.kill_ardupilot() - autopilot.install_firmware_from_url(url, await target_board(board_name), make_default, parameters) - finally: - await autopilot.start_ardupilot() - - -@app.post("/install_firmware_from_file", summary="Install firmware from user file.") -@version(1, 0) -@single_threaded(callback=raise_lock) -async def install_firmware_from_file( - binary: UploadFile = File(...), - board_name: Optional[str] = None, - parameters: Optional[Parameters] = None, -) -> Any: - try: - custom_firmware = Path.joinpath(autopilot.settings.firmware_folder, "custom_firmware") - with open(custom_firmware, "wb") as buffer: - shutil.copyfileobj(binary.file, buffer) - logger.debug("Going to kill ardupilot") - await autopilot.kill_ardupilot() - logger.debug("Installing firmware from file") - autopilot.install_firmware_from_file(custom_firmware, await target_board(board_name), parameters) - os.remove(custom_firmware) - except InvalidFirmwareFile as error: - raise StackedHTTPException(status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE, error=error) from error - finally: - binary.file.close() - logger.debug("Starting ardupilot again") - await autopilot.start_ardupilot() - - -@app.get("/board", response_model=FlightController, summary="Check what is the current running board.") -@version(1, 0) -def get_board() -> Any: - return autopilot.current_board - - -@app.post("/board", summary="Set board to be used.") -@version(1, 0) -async def set_board(board: FlightController, sitl_frame: SITLFrame = SITLFrame.VECTORED) -> Any: - autopilot.current_sitl_frame = sitl_frame - await autopilot.change_board(board) - - -@app.post("/restart", summary="Restart the autopilot with current set options.") -@version(1, 0) -async def restart() -> Any: - logger.debug("Restarting ardupilot...") - await autopilot.restart_ardupilot() - logger.debug("Ardupilot successfully restarted.") - - -@app.post("/start", summary="Start the autopilot.") -@version(1, 0) -async def start() -> Any: - logger.debug("Starting ardupilot...") - await autopilot.start_ardupilot() - logger.debug("Ardupilot successfully started.") - - -@app.post("/preferred_router", summary="Set the preferred MAVLink router.") -@version(1, 0) -async def set_preferred_router(router: str) -> Any: - logger.debug("Setting preferred Router") - await autopilot.set_preferred_router(router) - logger.debug(f"Preferred Router successfully set to {router}") - - -@app.get("/preferred_router", summary="Retrieve preferred router") -@version(1, 0) -def preferred_router() -> Any: - return autopilot.load_preferred_router() - - -@app.get("/available_routers", summary="Retrieve preferred router") -@version(1, 0) -def available_routers() -> Any: - return autopilot.get_available_routers() - - -@app.post("/stop", summary="Stop the autopilot.") -@version(1, 0) -async def stop() -> Any: - logger.debug("Stopping ardupilot...") - await autopilot.kill_ardupilot() - logger.debug("Ardupilot successfully stopped.") - - -@app.post("/restore_default_firmware", summary="Restore default firmware.") -@version(1, 0) -async def restore_default_firmware(board_name: Optional[str] = None) -> Any: - try: - await autopilot.kill_ardupilot() - autopilot.restore_default_firmware(await target_board(board_name)) - finally: - await autopilot.start_ardupilot() - - -@app.get("/available_boards", response_model=List[FlightController], summary="Retrieve list of connected boards.") -@version(1, 0) -async def available_boards() -> Any: - return await autopilot.available_boards(True) - - -app = VersionedFastAPI(app, version="1.0.0", prefix_format="/v{major}.{minor}", enable_latest=True) - - -@app.get("/") -async def root() -> HTMLResponse: - html_content = """ - - - ArduPilot Manager - - - """ - return HTMLResponse(content=html_content, status_code=200) - - -if __name__ == "__main__": - loop = asyncio.new_event_loop() - - # # Running uvicorn with log disabled so loguru can handle it - config = Config(app=app, loop=loop, host="0.0.0.0", port=8000, log_config=None) - server = Server(config) - - if args.sitl: - autopilot.set_preferred_board(BoardDetector.detect_sitl()) - try: - loop.run_until_complete(autopilot.start_ardupilot()) - except Exception as start_error: - logger.exception(start_error) - loop.create_task(autopilot.auto_restart_ardupilot()) - loop.create_task(autopilot.start_mavlink_manager_watchdog()) - loop.run_until_complete(server.serve()) - loop.run_until_complete(autopilot.kill_ardupilot()) +if __name__ == '__main__': + main() diff --git a/core/services/ardupilot_manager/settings.py b/core/services/ardupilot_manager/settings.py index 098dcb189f..0111729068 100644 --- a/core/services/ardupilot_manager/settings.py +++ b/core/services/ardupilot_manager/settings.py @@ -6,7 +6,7 @@ import appdirs from loguru import logger -from typedefs import SITLFrame +from controller.platform import SITLFrame SERVICE_NAME = "ardupilot-manager" diff --git a/core/services/ardupilot_manager/typedefs.py b/core/services/ardupilot_manager/typedefs.py index 27239ef2f0..8261b8eee2 100644 --- a/core/services/ardupilot_manager/typedefs.py +++ b/core/services/ardupilot_manager/typedefs.py @@ -1,171 +1,13 @@ -import ipaddress -import re -from enum import Enum, auto -from pathlib import Path -from platform import machine -from typing import Any, Dict, List, Optional +from enum import Enum +from typing import Dict -from pydantic import BaseModel, validator - - -class SITLFrame(str, Enum): - """Valid SITL frame types""" - - QUADPLANE = "quadplane" - XPLANE = "xplane" - FIREFLY = "firefly" - PLUS_CONFIG = "+" - QUAD = "quad" - COPTER = "copter" - X_CONFIG = "x" - BFXREV = "bfxrev" - BFX = "bfx" - DJIX = "djix" - CWX = "cwx" - HEXA = "hexa" - HEXA_CWX = "hexa-cwx" - HEXA_DJI = "hexa-dji" - OCTA = "octa" - OCTA_CWX = "octa-cwx" - OCTA_DJI = "octa-dji" - OCTA_QUAD_CWX = "octa-quad-cwx" - DODECA_HEXA = "dodeca-hexa" - TRI = "tri" - Y_SIX = "y6" - HELI = "heli" - HELI_DUAL = "heli-dual" - HELI_COMPOUND = "heli-compound" - SINGLECOPTER = "singlecopter" - COAXCOPTER = "coaxcopter" - ROVER = "rover" - BALANCEBOT = "balancebot" - SAILBOAT = "sailboat" - MOTORBOAT = "motorboat" - CRRCSIM = "crrcsim" - JSBSIM = "jsbsim" - FLIGHTAXIS = "flightaxis" - GAZEBO = "gazebo" - LAST_LETTER = "last_letter" - TRACKER = "tracker" - BALLOON = "balloon" - PLANE = "plane" - CALIBRATION = "calibration" - VECTORED = "vectored" - VECTORED_6DOF = "vectored_6dof" - SILENTWINGS = "silentwings" - MORSE = "morse" - AIRSIM = "airsim" - SCRIMMAGE = "scrimmage" - WEBOTS = "webots" - JSON = " JSON" - UNDEFINED = " undefined" - - -def get_sitl_platform_name(machine_arch: str) -> str: - """Get SITL platform name based on machine architecture.""" - - if "arm" not in machine_arch.lower() and "aarch" not in machine_arch.lower(): - return "SITL_x86_64_linux_gnu" - return "SITL_arm_linux_gnueabihf" - - -class Firmware(BaseModel): - """Simplified representation of a firmware, as available on Ardupilot's manifest.""" - - name: str - url: str - - -class Vehicle(str, Enum): - """Valid Ardupilot vehicle types. - The Enum values are 1:1 representations of the vehicles available on the ArduPilot manifest.""" - - Sub = "Sub" - Rover = "Rover" - Plane = "Plane" - Copter = "Copter" - - -# TODO: This class can be deprecated once we move to Python 3.11, which introduces the equivalent StrEnum -class LowerStringEnum(str, Enum): - def __str__(self) -> str: - return self.name.lower() - - -class PlatformType(LowerStringEnum): - Serial = auto() - Linux = auto() - SITL = auto() - Unknown = auto() - - -class Platform(str, Enum): - """Valid Ardupilot platform types. - The Enum values are 1:1 representations of the platforms available on the ArduPilot manifest.""" - - Pixhawk1 = "Pixhawk1" - Pixhawk4 = "Pixhawk4" - Pixhawk6X = "Pixhawk6X" - Pixhawk6C = "Pixhawk6C" - CubeOrange = "CubeOrange" - GenericSerial = "GenericSerial" - Navigator = "navigator" - Argonot = "argonot" - SITL = get_sitl_platform_name(machine()) - - @property - def type(self) -> PlatformType: - platform_types = { - Platform.Pixhawk1: PlatformType.Serial, - Platform.Pixhawk4: PlatformType.Serial, - Platform.Pixhawk6X: PlatformType.Serial, - Platform.Pixhawk6C: PlatformType.Serial, - Platform.CubeOrange: PlatformType.Serial, - Platform.GenericSerial: PlatformType.Serial, - Platform.Navigator: PlatformType.Linux, - Platform.Argonot: PlatformType.Linux, - Platform.SITL: PlatformType.SITL, - } - return platform_types.get(self, PlatformType.Unknown) - - -class FlightControllerFlags(str, Enum): - """Flags for the Flight-controller class.""" - - is_bootloader = "is_bootloader" +from pydantic import BaseModel class Parameters(BaseModel): params: Dict[str, float] -class FlightController(BaseModel): - """Flight-controller board.""" - - name: str - manufacturer: Optional[str] - platform: Platform - path: Optional[str] - flags: List[FlightControllerFlags] = [] - - @property - def type(self) -> PlatformType: - return self.platform.type - - -class AvailableBoards(BaseModel): - regular: List[FlightController] - bootloaders: List[FlightController] - - -class FirmwareFormat(str, Enum): - """Valid firmware formats. - The Enum values are 1:1 representations of the formats available on the ArduPilot manifest.""" - - APJ = "apj" - ELF = "ELF" - - class EndpointType(str, Enum): """Supported Mavlink endpoint types.""" @@ -174,40 +16,3 @@ class EndpointType(str, Enum): TCPServer = "tcpin" TCPClient = "tcpout" Serial = "serial" - - -class Serial(BaseModel): - """Simplified representation of linux serial port configurations, - gets transformed into command line arguments such as - --serial1 /dev/ttyS0 - --serial3 /dev/ttyAMA1 - --serial4 /dev/ttyAMA2 - --serial5 /dev/ttyAMA3 - """ - - port: str - endpoint: str - - @validator("port") - @classmethod - def valid_letter(cls: Any, value: str) -> str: - if value in "BCDEFGH" and len(value) == 1: - return value - raise ValueError(f"Invalid serial port: {value}. These must be between B and H. A is reserved.") - - @validator("endpoint") - @classmethod - def valid_endpoint(cls: Any, value: str) -> str: - if Path(value).exists(): - return value - if re.compile(r"tcp:\d*:wait$").match(value): - return value - matches = re.compile(r"(tcpclient|udp|tcpin|udpin):(?P(\d*\.){3}\d+):(?P\d*)$").match(value) - if matches: - ipaddress.ip_address(matches.group("ip")) - if 0 <= int(matches.group("port")) <= 65535: - return value - raise ValueError(f"Invalid endpoint configuration: {value}") - - def __hash__(self) -> int: # make hashable BaseModel subclass - return hash(self.port + self.endpoint)