diff --git a/examples/mavftp_example.py b/examples/mavftp_example.py new file mode 100755 index 000000000..5b8fb3b7d --- /dev/null +++ b/examples/mavftp_example.py @@ -0,0 +1,241 @@ +#!/usr/bin/env python3 + +''' +MAVLink File Transfer Protocol support example + +SPDX-FileCopyrightText: 2024 Amilcar Lucas + +SPDX-License-Identifier: GPL-3.0-or-later +''' + +from argparse import ArgumentParser + +from logging import basicConfig as logging_basicConfig +from logging import getLevelName as logging_getLevelName + +from logging import info as logging_info +from logging import warning as logging_warning +from logging import error as logging_error + +import sys +import os +import requests +import time + +from typing import Tuple + +from pymavlink import mavutil +from pymavlink import mavftp + + +if __name__ == "__main__": + + def argument_parser(): + """ + Parses command-line arguments for the script. + + This function sets up an argument parser to handle the command-line arguments for the script. + + Returns: + argparse.Namespace: An object containing the parsed arguments. + """ + parser = ArgumentParser(description='This main is for testing and development only. ' + 'Usually, the mavftp is called from another script') + parser.add_argument("--baudrate", type=int, + help="master port baud rate", default=115200) + parser.add_argument("--device", required=True, help="serial device") + parser.add_argument("--source-system", dest='SOURCE_SYSTEM', type=int, + default=250, help='MAVLink source system for this GCS') + parser.add_argument("--loglevel", default="INFO", help="log level") + parser.add_argument("--filename", default="@PARAM/param.pck?withdefaults=1", help="file to fetch") + parser.add_argument("--decode-parameters", action='store_true', help="decode as a parameter file") + parser.add_argument("--output-parameters-to-files", action='store_true', help="output parameters to files") + parser.add_argument('-s', '--sort', + choices=['none', 'missionplanner', 'mavproxy'], + default='missionplanner', + help='Sort the parameters in the file. Defaults to %(default)s.', + ) + + return parser.parse_args() + + + def wait_heartbeat(m): + '''wait for a heartbeat so we know the target system IDs''' + logging_info("Waiting for flight controller heartbeat") + m.wait_heartbeat() + logging_info("Got heartbeat from system %u, component %u", m.target_system, m.target_system) + + + def delete_local_file_if_exists(filename): + if os.path.exists(filename): + os.remove(filename) + + + def missionplanner_sort(item: str) -> Tuple[str, ...]: + """ + Sorts a parameter name according to the rules defined in the Mission Planner software. + """ + return tuple(item.split("_")) + + + def extract_params(pdata, sort_type): + pdict = {} + if pdata: + for (name, value, _ptype) in pdata: + pdict[name.decode('utf-8')] = value + + if sort_type == "missionplanner": + pdict = dict(sorted(pdict.items(), key=lambda x: missionplanner_sort(x[0]))) # sort alphabetically + elif sort_type == "mavproxy": + pdict = dict(sorted(pdict.items())) # sort in ASCIIbetical order + elif sort_type == "none": + pass + return pdict + + + def save_params(pdict, filename, sort_type): + if not pdict: + return + with open(filename, 'w', encoding='utf-8') as f: + for name, value in pdict.items(): + if sort_type == "missionplanner": + f.write(f"{name},{format(value, '.6f').rstrip('0').rstrip('.')}\n") + elif sort_type == "mavproxy": + f.write(f"{name:<16} {value:<8.6f}") + elif sort_type == "none": + f.write(f"{name:<16} {value:<8.6f}") + logging_info("Outputted %u parameters to %s", len(pdict), filename) + + + def get_params(args, mav_ftp): + def get_param_callback(fh): + '''called on ftp completion''' + if fh is None: + return + data = fh.read() + logging_info("param get done! got %u bytes", len(data)) + if args.decode_parameters or args.output_parameters_to_files: + pdata = mavftp.MAVFTP.ftp_param_decode(data) + if pdata is None: + logging_error("param decode failed") + sys.exit(1) + + pdict = extract_params(pdata.params, args.sort) + defdict = extract_params(pdata.defaults, args.sort) + + if args.decode_parameters: + for name, value in pdict.items(): + if name in defdict: + print(f"{name:<16} {value} (default {defdict[name]})") + else: + print(f"{name:<16} {value}") + + if args.output_parameters_to_files: + save_params(pdict, 'params.param', args.sort) + save_params(defdict, 'defaults.param', args.sort) + + mav_ftp.cmd_get([args.filename], callback=get_param_callback) + mav_ftp.execute_ftp_operation() + + + def get_list_dir(mav_ftp, directory): + mav_ftp.cmd_list([directory]) + mav_ftp.execute_ftp_operation() + + + def get_file(mav_ftp, remote_filename, local_filename, timeout=5): + logging_info("Will download %s to %s", remote_filename, local_filename) + mav_ftp.cmd_get([remote_filename, local_filename]) + mav_ftp.execute_ftp_operation(timeout=timeout) + + + def get_last_log(mav_ftp): + try: + with open('LASTLOG.TXT', 'r', encoding='UTF-8') as file: + remote_filenumber = int(file.readline().strip()) + except FileNotFoundError: + logging_error("File LASTLOG.TXT not found.") + return + remote_filenumber = remote_filenumber - 1 # we do not want the very last log + remote_filename = f'/APM/LOGS/{remote_filenumber:08}.BIN' + get_file(mav_ftp, remote_filename, 'LASTLOG.BIN', 500) + time.sleep(1) # wait for the file to be written to disk and for the FC to catch it's breath + + + def download_script(url, local_filename): + # Download the script from the internet to the PC + response = requests.get(url, timeout=5) + + if response.status_code == 200: + with open(local_filename, "wb") as file: + file.write(response.content) + else: + logging_error("Failed to download the file") + + + def create_directory(mav_ftp, remote_directory): + mav_ftp.cmd_mkdir([remote_directory]) + mav_ftp.execute_ftp_operation() + + + def upload_script(mav_ftp, remote_directory, local_filename): + # Upload it from the PC to the flight controller + mav_ftp.cmd_put([local_filename, remote_directory]) + mav_ftp.execute_ftp_operation() + time.sleep(0.3) # wait for the FC to catch it's breath + + + def main(): + '''for testing/example purposes only''' + args = argument_parser() + + logging_basicConfig(level=logging_getLevelName(args.loglevel), format='%(asctime)s - %(levelname)s - %(message)s') + + logging_warning("This main is for testing and development only. " + "Usually the mavftp is called from another script") + + # create a mavlink serial instance + master = mavutil.mavlink_connection(args.device, baud=args.baudrate, source_system=args.SOURCE_SYSTEM) + + # wait for the heartbeat msg to find the system ID + wait_heartbeat(master) + + mav_ftp = mavftp.MAVFTP(master, + target_system=master.target_system, + target_component=master.target_component) + + delete_local_file_if_exists("params.param") + delete_local_file_if_exists("defaults.param") + get_params(args, mav_ftp) + + get_list_dir(mav_ftp, '/APM/LOGS/') + + delete_local_file_if_exists("LASTLOG.TXT") + delete_local_file_if_exists("LASTLOG.BIN") + get_file(mav_ftp, '/APM/LOGS/LASTLOG.TXT', 'LASTLOG.TXT') + + get_last_log(mav_ftp) + + remote_directory = '/APM/Scripts/' + #create_directory(mav_ftp, remote_directory) + + url = "https://discuss.ardupilot.org/uploads/short-url/4pyrl7PcfqiMEaRItUhljuAqLSs.lua" + local_filename = "copter-magfit-helper.lua" + + if not os.path.exists(local_filename): + download_script(url, local_filename) + + upload_script(mav_ftp, remote_directory, local_filename) + + url = "https://raw.githubusercontent.com/ArduPilot/ardupilot/Copter-4.5/libraries/AP_Scripting/applets/VTOL-quicktune.lua" + local_filename = "VTOL-quicktune.lua" + + if not os.path.exists(local_filename): + download_script(url, local_filename) + + upload_script(mav_ftp, remote_directory, local_filename) + + master.close() + + + main() diff --git a/mavftp.py b/mavftp.py index 9c5d2a43d..8a822fdce 100644 --- a/mavftp.py +++ b/mavftp.py @@ -1,22 +1,37 @@ -#!/usr/bin/env python -'''mavlink file transfer support''' +#!/usr/bin/env python3 + +''' +MAVLink File Transfer Protocol support +Original from MAVProxy/MAVProxy/modules/mavproxy_ftp.py + +SPDX-FileCopyrightText: 2011-2024 Andrew Tridgell, 2024 Amilcar Lucas + +SPDX-License-Identifier: GPL-3.0-or-later +''' + +from argparse import ArgumentParser + +from logging import basicConfig as logging_basicConfig +from logging import getLevelName as logging_getLevelName + +from logging import debug as logging_debug +from logging import info as logging_info +from logging import warning as logging_warning +from logging import error as logging_error -import io -import time, os, sys import struct -import random -from pymavlink import mavutil +from time import time as time_time +from random import uniform as random_uniform +from os import path as os_path -try: - # py2 - from StringIO import StringIO as SIO -except ImportError: - # py3 - from io import BytesIO as SIO +from io import BytesIO as SIO -from MAVProxy.modules.lib import mp_module -from MAVProxy.modules.lib import mp_settings +import sys +from pymavlink import mavutil + + +# pylint: disable=invalid-name # opcodes OP_None = 0 OP_TerminateSession = 1 @@ -52,21 +67,30 @@ HDR_Len = 12 MAX_Payload = 239 +# pylint: enable=invalid-name -class FTP_OP: - def __init__(self, seq, session, opcode, size, req_opcode, burst_complete, offset, payload): - self.seq = seq - self.session = session - self.opcode = opcode - self.size = size - self.req_opcode = req_opcode - self.burst_complete = burst_complete - self.offset = offset - self.payload = payload +class FTP_OP: # pylint: disable=invalid-name, too-many-instance-attributes + """ + Represents an operation in the MAVLink File Transfer Protocol (FTP). + + This class encapsulates the details of a single FTP operation such as reading, writing, listing directories, etc., + including the necessary parameters and payload for the operation. + """ + def __init__(self, seq, session, opcode, size, # pylint: disable=too-many-arguments + req_opcode, burst_complete, offset, payload): + self.seq = seq # Sequence number for the operation. + self.session = session # Session identifier. + self.opcode = opcode # Operation code indicating the type of FTP operation. + self.size = size # Size of the operation. + self.req_opcode = req_opcode # Request operation code. + self.burst_complete = burst_complete # (bool) Flag indicating if the burst transfer is complete. + self.offset = offset # Offset for read/write operations. + self.payload = payload # (bytes) Payload for the operation. def pack(self): '''pack message''' - ret = struct.pack(" 0: - ret += " [%u]" % self.payload[0] + ret += f" [{self.payload[0]}]" return ret -class WriteQueue: - def __init__(self, ofs, size): - self.ofs = ofs - self.size = size - self.last_send = 0 - -class FTPModule(mp_module.MPModule): - def __init__(self, mpstate): - super(FTPModule, self).__init__(mpstate, "ftp", public=True) - self.add_command('ftp', self.cmd_ftp, "file transfer", - ["", - "set (FTPSETTING)", - "put (FILENAME) (FILENAME)"]) - self.ftp_settings = mp_settings.MPSettings( - [('debug', int, 0), - ('pkt_loss_tx', int, 0), - ('pkt_loss_rx', int, 0), - ('max_backlog', int, 5), - ('burst_read_size', int, 80), - ('write_size', int, 80), - ('write_qsize', int, 5), - ('retry_time', float, 0.5)]) - self.add_completion_function('(FTPSETTING)', - self.ftp_settings.completion) + +class WriteQueue: # pylint: disable=too-few-public-methods + """ + Manages a queue of write operations for the MAVFTP class. + + Keeps track of offsets and sizes for pending write operations to ensure orderly processing. + """ + def __init__(self, ofs: int, size: int): + self.ofs = ofs # Offset where the write operation starts. + self.size = size # Size of the data to be written. + self.last_send = 0 # Timestamp of the last send operation. + + +class ParamData(): + """ + A class to manage parameter values and defaults for ArduPilot configuration. + """ + def __init__(self): + self.params = [] # params as (name, value, ptype) + self.defaults = None # defaults as (name, value, ptype) + + def add_param(self, name, value, ptype): + self.params.append((name, value, ptype)) + + def add_default(self, name, value, ptype): + if self.defaults is None: + self.defaults = [] + self.defaults.append((name, value, ptype)) + + +class MAVFTP: # pylint: disable=too-many-instance-attributes + """ + Implements the client-side logic for the MAVLink File Transfer Protocol (FTP) over MAVLink connections. + + Handles file operations such as reading, writing, listing directories, and managing sessions. + """ + def __init__(self, master, target_system, target_component): + self.ftp_settings_debug = 0 + self.ftp_settings_pkt_loss_rx = 0 + self.ftp_settings_pkt_loss_tx = 0 + self.ftp_settings_burst_read_size = 80 + self.ftp_settings_max_backlog = 5 + self.ftp_settings_write_size = 80 + self.ftp_settings_write_qsize = 5 + self.ftp_settings_retry_time = 0.5 self.seq = 0 self.session = 0 self.network = 0 @@ -133,11 +172,11 @@ def __init__(self, mpstate): self.last_burst_read = None self.op_start = None self.dir_offset = 0 - self.last_op_time = time.time() + self.last_op_time = time_time() self.rtt = 0.5 self.reached_eof = False self.backlog = 0 - self.burst_size = self.ftp_settings.burst_read_size + self.burst_size = self.ftp_settings_burst_read_size self.write_list = None self.write_block_size = 0 self.write_acks = 0 @@ -147,40 +186,14 @@ def __init__(self, mpstate): self.write_recv_idx = -1 self.write_pending = 0 self.write_last_send = None - self.warned_component = False + self.open_retries = 0 - def cmd_ftp(self, args): - '''FTP operations''' - usage = "Usage: ftp " - if len(args) < 1: - print(usage) - return - if args[0] == 'list': - self.cmd_list(args[1:]) - elif args[0] == "set": - self.ftp_settings.command(args[1:]) - elif args[0] == 'get': - self.cmd_get(args[1:]) - elif args[0] == 'put': - self.cmd_put(args[1:]) - elif args[0] == 'rm': - self.cmd_rm(args[1:]) - elif args[0] == 'rmdir': - self.cmd_rmdir(args[1:]) - elif args[0] == 'rename': - self.cmd_rename(args[1:]) - elif args[0] == 'mkdir': - self.cmd_mkdir(args[1:]) - elif args[0] == 'crc': - self.cmd_crc(args[1:]) - elif args[0] == 'status': - self.cmd_status() - elif args[0] == 'cancel': - self.cmd_cancel() - else: - print(usage) + self.master = master + self.target_system = target_system + self.target_component = target_component + self.op_pending = False - def send(self, op): + def __send(self, op): '''send a request''' op.seq = self.seq payload = op.pack() @@ -190,14 +203,15 @@ def send(self, op): self.master.mav.file_transfer_protocol_send(self.network, self.target_system, self.target_component, payload) self.seq = (self.seq + 1) % 256 self.last_op = op - now = time.time() - if self.ftp_settings.debug > 1: - print("> %s dt=%.2f" % (op, now - self.last_op_time)) - self.last_op_time = time.time() + now = time_time() + if self.ftp_settings_debug > 1: + logging_info("> %s dt=%.2f", op, now - self.last_op_time) + self.last_op_time = time_time() + self.op_pending = True - def terminate_session(self): + def __terminate_session(self): '''terminate current session''' - self.send(FTP_OP(self.seq, self.session, OP_TerminateSession, 0, 0, 0, 0, None)) + self.__send(FTP_OP(self.seq, self.session, OP_TerminateSession, 0, 0, 0, 0, None)) self.fh = None self.filename = None self.write_list = None @@ -205,6 +219,9 @@ def terminate_session(self): # tell caller that the transfer failed self.callback(None) self.callback = None + if self.callback_progress is not None: + self.callback_progress(None) + self.callback_progress = None if self.put_callback is not None: # tell caller that the transfer failed self.put_callback(None) @@ -221,8 +238,8 @@ def terminate_session(self): self.reached_eof = False self.backlog = 0 self.duplicates = 0 - if self.ftp_settings.debug > 0: - print("Terminated session") + if self.ftp_settings_debug > 0: + logging_info("Terminated session") def cmd_list(self, args): '''list files''' @@ -230,18 +247,18 @@ def cmd_list(self, args): dname = args[0] else: dname = '/' - print("Listing %s" % dname) + logging_info("Listing %s", dname) enc_dname = bytearray(dname, 'ascii') self.total_size = 0 self.dir_offset = 0 op = FTP_OP(self.seq, self.session, OP_ListDirectory, len(enc_dname), 0, 0, self.dir_offset, enc_dname) - self.send(op) + self.__send(op) - def handle_list_reply(self, op, m): + def __handle_list_reply(self, op, _m): '''handle OP_ListDirectory reply''' if op.opcode == OP_Ack: dentries = sorted(op.payload.split(b'\x00')) - #print(dentries) + #logging_info(dentries) for d in dentries: if len(d) == 0: continue @@ -251,47 +268,48 @@ def handle_list_reply(self, op, m): d = str(d, 'ascii') else: d = str(d) - except Exception: + except Exception: # pylint: disable=broad-exception-caught continue if d[0] == 'D': - print(" D %s" % d[1:]) + logging_info(" D %s", d[1:]) elif d[0] == 'F': (name, size) = d[1:].split('\t') size = int(size) self.total_size += size - print(" %s\t%u" % (name, size)) + logging_info(" %s\t%u", name, size) else: - print(d) + logging_info(d) # ask for more more = self.last_op more.offset = self.dir_offset - self.send(more) + self.__send(more) elif op.opcode == OP_Nack and len(op.payload) == 1 and op.payload[0] == ERR_EndOfFile: - print("Total size %.2f kByte" % (self.total_size / 1024.0)) + logging_info("Total size %.2f kByte", self.total_size / 1024.0) self.total_size = 0 + self.op_pending = False else: - print('LIST: %s' % op) + logging_info('LIST: %s', op) def cmd_get(self, args, callback=None, callback_progress=None): '''get file''' if len(args) == 0: - print("Usage: get FILENAME ") + logging_error("cmd_get usage: [FILENAME ]") return - self.terminate_session() + self.__terminate_session() fname = args[0] if len(args) > 1: self.filename = args[1] else: - self.filename = os.path.basename(fname) - if callback is None or self.ftp_settings.debug > 1: - print("Getting %s as %s" % (fname, self.filename)) - self.op_start = time.time() + self.filename = os_path.basename(fname) + if callback is None or self.ftp_settings_debug > 1: + logging_info("Getting %s from %s", fname, self.filename) + self.op_start = time_time() self.callback = callback self.callback_progress = callback_progress self.read_retries = 0 self.duplicates = 0 self.reached_eof = False - self.burst_size = self.ftp_settings.burst_read_size + self.burst_size = self.ftp_settings_burst_read_size if self.burst_size < 1: self.burst_size = 239 elif self.burst_size > 239: @@ -299,9 +317,9 @@ def cmd_get(self, args, callback=None, callback_progress=None): enc_fname = bytearray(fname, 'ascii') self.open_retries = 0 op = FTP_OP(self.seq, self.session, OP_OpenFileRO, len(enc_fname), 0, 0, 0, enc_fname) - self.send(op) + self.__send(op) - def handle_open_RO_reply(self, op, m): + def __handle_open_ro_reply(self, op, _m): '''handle OP_OpenFileRO reply''' if op.opcode == OP_Ack: if self.filename is None: @@ -310,24 +328,25 @@ def handle_open_RO_reply(self, op, m): if self.callback is not None or self.filename == '-': self.fh = SIO() else: - self.fh = open(self.filename, 'wb') - except Exception as ex: - print("Failed to open %s: %s" % (self.filename, ex)) - self.terminate_session() + self.fh = open(self.filename, 'wb') # pylint: disable=consider-using-with + except Exception as ex: # pylint: disable=broad-except + logging_info("Failed to open %s: %s", self.filename, ex) + self.__terminate_session() return read = FTP_OP(self.seq, self.session, OP_BurstReadFile, self.burst_size, 0, 0, 0, None) - self.last_burst_read = time.time() - self.send(read) + self.last_burst_read = time_time() + self.__send(read) else: - if self.callback is None or self.ftp_settings.debug > 0: - print("ftp open failed") - self.terminate_session() + if self.callback is None or self.ftp_settings_debug > 0: + logging_info("ftp open failed") + self.__terminate_session() - def check_read_finished(self): + def __check_read_finished(self): '''check if download has completed''' + logging_debug("check_read_finished: %s %s", self.reached_eof, self.read_gaps) if self.reached_eof and len(self.read_gaps) == 0: ofs = self.fh.tell() - dt = time.time() - self.op_start + dt = time_time() - self.op_start rate = (ofs / dt) / 1024.0 if self.callback is not None: self.fh.seek(0) @@ -335,45 +354,43 @@ def check_read_finished(self): self.callback = None elif self.filename == "-": self.fh.seek(0) - if sys.version_info.major < 3: - print(self.fh.read()) - else: - print(self.fh.read().decode('utf-8')) + logging_info(self.fh.read().decode('utf-8')) else: - print("Wrote %u bytes to %s in %.2fs %.1fkByte/s" % (ofs, self.filename, dt, rate)) - self.terminate_session() + logging_info("Got %u bytes from %s in %.2fs %.1fkByte/s", ofs, self.filename, dt, rate) + self.__terminate_session() + self.op_pending = False return True return False - def write_payload(self, op): + def __write_payload(self, op): '''write payload from a read op''' self.fh.seek(op.offset) self.fh.write(op.payload) self.read_total += len(op.payload) if self.callback_progress is not None: - self.callback_progress(self.fh, self.read_total) - - def handle_burst_read(self, op, m): + self.callback_progress(self.read_total, self.read_total+1) + + def __handle_burst_read(self, op, _m): # pylint: disable=too-many-branches, too-many-statements, too-many-return-statements '''handle OP_BurstReadFile reply''' - if self.ftp_settings.pkt_loss_tx > 0: - if random.uniform(0,100) < self.ftp_settings.pkt_loss_tx: - if self.ftp_settings.debug > 0: - print("FTP: dropping TX") + if self.ftp_settings_pkt_loss_tx > 0: + if random_uniform(0, 100) < self.ftp_settings_pkt_loss_tx: + if self.ftp_settings_debug > 0: + logging_warning("FTP: dropping TX") return if self.fh is None or self.filename is None: if op.session != self.session: # old session return - print("FTP Unexpected burst read reply") - print(op) + logging_warning("FTP Unexpected burst read reply") + logging_info(op) return - self.last_burst_read = time.time() + self.last_burst_read = time_time() size = len(op.payload) if size > self.burst_size: # this server doesn't handle the burst size argument self.burst_size = MAX_Payload - if self.ftp_settings.debug > 0: - print("Setting burst size to %u" % self.burst_size) + if self.ftp_settings_debug > 0: + logging_info("Setting burst size to %u", self.burst_size) if op.opcode == OP_Ack and self.fh is not None: ofs = self.fh.tell() if op.offset < ofs: @@ -382,16 +399,16 @@ def handle_burst_read(self, op, m): if gap in self.read_gaps: self.read_gaps.remove(gap) self.read_gap_times.pop(gap) - if self.ftp_settings.debug > 0: - print("FTP: removed gap", gap, self.reached_eof, len(self.read_gaps)) + if self.ftp_settings_debug > 0: + logging_info("FTP: removed gap", gap, self.reached_eof, len(self.read_gaps)) else: - if self.ftp_settings.debug > 0: - print("FTP: dup read reply at %u of len %u ofs=%u" % (op.offset, op.size, self.fh.tell())) + if self.ftp_settings_debug > 0: + logging_info("FTP: dup read reply at %u of len %u ofs=%u", op.offset, op.size, self.fh.tell()) self.duplicates += 1 return - self.write_payload(op) + self.__write_payload(op) self.fh.seek(ofs) - if self.check_read_finished(): + if self.__check_read_finished(): return elif op.offset > ofs: # we have a gap @@ -406,52 +423,54 @@ def handle_burst_read(self, op, m): self.read_gaps.append(g) self.read_gap_times[g] = 0 gap = (gap[0] + max_read, gap[1] - max_read) - self.write_payload(op) + self.__write_payload(op) else: - self.write_payload(op) + self.__write_payload(op) if op.burst_complete: if op.size > 0 and op.size < self.burst_size: # a burst complete with non-zero size and less than burst packet size # means EOF - if not self.reached_eof and self.ftp_settings.debug > 0: - print("EOF at %u with %u gaps t=%.2f" % (self.fh.tell(), len(self.read_gaps), time.time() - self.op_start)) + if not self.reached_eof and self.ftp_settings_debug > 0: + logging_info("EOF at %u with %u gaps t=%.2f", self.fh.tell(), + len(self.read_gaps), time_time() - self.op_start) self.reached_eof = True - if self.check_read_finished(): + if self.__check_read_finished(): return - self.check_read_send() + self.__check_read_send() return more = self.last_op more.offset = op.offset + op.size - if self.ftp_settings.debug > 0: - print("FTP: burst continue at %u %u" % (more.offset, self.fh.tell())) - self.send(more) + if self.ftp_settings_debug > 0: + logging_info("FTP: burst continue at %u %u", more.offset, self.fh.tell()) + self.__send(more) elif op.opcode == OP_Nack: ecode = op.payload[0] - if self.ftp_settings.debug > 0: - print("FTP: burst nack: ", op) - if ecode == ERR_EndOfFile or ecode == 0: + if self.ftp_settings_debug > 0: + logging_info("FTP: burst nack: %s", op) + if ecode in (ERR_EndOfFile, 0): if not self.reached_eof and op.offset > self.fh.tell(): # we lost the last part of the burst - if self.ftp_settings.debug > 0: - print("burst lost EOF %u %u" % (self.fh.tell(), op.offset)) + if self.ftp_settings_debug > 0: + logging_info("burst lost EOF %u %u", self.fh.tell(), op.offset) return - if not self.reached_eof and self.ftp_settings.debug > 0: - print("EOF at %u with %u gaps t=%.2f" % (self.fh.tell(), len(self.read_gaps), time.time() - self.op_start)) + if not self.reached_eof and self.ftp_settings_debug > 0: + logging_info("EOF at %u with %u gaps t=%.2f", self.fh.tell(), + len(self.read_gaps), time_time() - self.op_start) self.reached_eof = True - if self.check_read_finished(): + if self.__check_read_finished(): return - self.check_read_send() - elif self.ftp_settings.debug > 0: - print("FTP: burst Nack (ecode:%u): %s" % (ecode, op)) + self.__check_read_send() + elif self.ftp_settings_debug > 0: + logging_info("FTP: burst Nack (ecode:%u): %s", ecode, op) else: - print("FTP: burst error: %s" % op) + logging_warning("FTP: burst error: %s", op) - def handle_reply_read(self, op, m): + def __handle_reply_read(self, op, _m): '''handle OP_ReadFile reply''' if self.fh is None or self.filename is None: - if self.ftp_settings.debug > 0: - print("FTP Unexpected read reply") - print(op) + if self.ftp_settings_debug > 0: + logging_warning("FTP Unexpected read reply") + logging_warning(op) return if self.backlog > 0: self.backlog -= 1 @@ -461,54 +480,54 @@ def handle_reply_read(self, op, m): self.read_gaps.remove(gap) self.read_gap_times.pop(gap) ofs = self.fh.tell() - self.write_payload(op) + self.__write_payload(op) self.fh.seek(ofs) - if self.ftp_settings.debug > 0: - print("FTP: removed gap", gap, self.reached_eof, len(self.read_gaps)) - if self.check_read_finished(): + if self.ftp_settings_debug > 0: + logging_info("FTP: removed gap", gap, self.reached_eof, len(self.read_gaps)) + if self.__check_read_finished(): return elif op.size < self.burst_size: - print("FTP: file size changed to %u" % op.offset+op.size) - self.terminate_session() + logging_info("FTP: file size changed to %u", op.offset+op.size) + self.__terminate_session() else: self.duplicates += 1 - if self.ftp_settings.debug > 0: - print("FTP: no gap read", gap, len(self.read_gaps)) + if self.ftp_settings_debug > 0: + logging_info("FTP: no gap read", gap, len(self.read_gaps)) elif op.opcode == OP_Nack: - print("Read failed with %u gaps" % len(self.read_gaps), str(op)) - self.terminate_session() - self.check_read_send() - + logging_info("Read failed with %u gaps", len(self.read_gaps), str(op)) + self.__terminate_session() + self.__check_read_send() + def cmd_put(self, args, fh=None, callback=None, progress_callback=None): '''put file''' if len(args) == 0: - print("Usage: put FILENAME ") + logging_error("cmd_put Usage: [FILENAME ]") return if self.write_list is not None: - print("put already in progress") + logging_error("put already in progress") return fname = args[0] self.fh = fh if self.fh is None: try: - self.fh = open(fname, 'rb') - except Exception as ex: - print("Failed to open %s: %s" % (fname, ex)) + self.fh = open(fname, 'rb') # pylint: disable=consider-using-with + except Exception as ex: # pylint: disable=broad-exception-caught + logging_error("Failed to open %s: %s", fname, ex) return if len(args) > 1: self.filename = args[1] else: - self.filename = os.path.basename(fname) + self.filename = os_path.basename(fname) if self.filename.endswith("/"): - self.filename += os.path.basename(fname) + self.filename += os_path.basename(fname) if callback is None: - print("Putting %s as %s" % (fname, self.filename)) + logging_info("Putting %s to %s", fname, self.filename) self.fh.seek(0,2) file_size = self.fh.tell() self.fh.seek(0) # setup write list - self.write_block_size = self.ftp_settings.write_size + self.write_block_size = self.ftp_settings_write_size self.write_file_size = file_size write_blockcount = file_size // self.write_block_size @@ -526,12 +545,12 @@ def cmd_put(self, args, fh=None, callback=None, progress_callback=None): self.put_callback = callback self.put_callback_progress = progress_callback self.read_retries = 0 - self.op_start = time.time() + self.op_start = time_time() enc_fname = bytearray(self.filename, 'ascii') op = FTP_OP(self.seq, self.session, OP_CreateFile, len(enc_fname), 0, 0, 0, enc_fname) - self.send(op) + self.__send(op) - def put_finished(self, flen): + def __put_finished(self, flen): '''finish a put''' if self.put_callback_progress: self.put_callback_progress(1.0) @@ -540,35 +559,40 @@ def put_finished(self, flen): self.put_callback(flen) self.put_callback = None else: - print("Sent file of length ", flen) - - def handle_create_file_reply(self, op, m): + dt = time_time() - self.op_start + rate = (flen / dt) / 1024.0 + logging_info("Put %u bytes to %s file in %.2fs %.1fkByte/s", flen, self.filename, dt, rate) + + def __handle_create_file_reply(self, op, _m): '''handle OP_CreateFile reply''' if self.fh is None: - self.terminate_session() + self.__terminate_session() + self.op_pending = False return if op.opcode == OP_Ack: - self.send_more_writes() + self.__send_more_writes() else: - print("Create failed") - self.terminate_session() + logging_error("Create failed") + self.__terminate_session() + self.op_pending = False - def send_more_writes(self): + def __send_more_writes(self): '''send some more writes''' if len(self.write_list) == 0: # all done - self.put_finished(self.write_file_size) - self.terminate_session() + self.__put_finished(self.write_file_size) + self.__terminate_session() + self.op_pending = False return - now = time.time() + now = time_time() if self.write_last_send is not None: if now - self.write_last_send > max(min(10*self.rtt, 1),0.2): # we seem to have lost a block of replies self.write_pending = max(0, self.write_pending-1) - n = min(self.ftp_settings.write_qsize-self.write_pending, len(self.write_list)) - for i in range(n): + n = min(self.ftp_settings_write_qsize-self.write_pending, len(self.write_list)) + for _i in range(n): # send in round-robin, skipping any that have been acked idx = self.write_idx while idx not in self.write_list: @@ -577,19 +601,19 @@ def send_more_writes(self): self.fh.seek(ofs) data = self.fh.read(self.write_block_size) write = FTP_OP(self.seq, self.session, OP_WriteFile, len(data), 0, 0, ofs, bytearray(data)) - self.send(write) + self.__send(write) self.write_idx = (idx + 1) % self.write_total self.write_pending += 1 self.write_last_send = now - def handle_write_reply(self, op, m): + def __handle_write_reply(self, op, _m): '''handle OP_WriteFile reply''' if self.fh is None: - self.terminate_session() + self.__terminate_session() return if op.opcode != OP_Ack: - print("Write failed") - self.terminate_session() + logging_warning("Write failed") + self.__terminate_session() return # assume the FTP server processes the blocks sequentially. This means @@ -604,190 +628,192 @@ def handle_write_reply(self, op, m): self.write_acks += 1 if self.put_callback_progress: self.put_callback_progress(self.write_acks/float(self.write_total)) - self.send_more_writes() + self.__send_more_writes() def cmd_rm(self, args): '''remove file''' if len(args) == 0: - print("Usage: rm FILENAME") + logging_error("cmd_rm Usage: [FILENAME]") return fname = args[0] - print("Removing %s" % fname) + logging_info("Removing %s", fname) enc_fname = bytearray(fname, 'ascii') op = FTP_OP(self.seq, self.session, OP_RemoveFile, len(enc_fname), 0, 0, 0, enc_fname) - self.send(op) + self.__send(op) def cmd_rmdir(self, args): '''remove directory''' if len(args) == 0: - print("Usage: rmdir FILENAME") + logging_error("cmd_rmdir Usage: [DIRECTORYNAME]") return dname = args[0] - print("Removing %s" % dname) + logging_info("Removing %s", dname) enc_dname = bytearray(dname, 'ascii') op = FTP_OP(self.seq, self.session, OP_RemoveDirectory, len(enc_dname), 0, 0, 0, enc_dname) - self.send(op) + self.__send(op) - def handle_remove_reply(self, op, m): + def __handle_remove_reply(self, op, _m): '''handle remove reply''' if op.opcode != OP_Ack: - print("Remove failed %s" % op) + logging_warning("Remove failed %s", op) + self.op_pending = False def cmd_rename(self, args): '''rename file''' if len(args) < 2: - print("Usage: rename OLDNAME NEWNAME") + logging_error("cmd_rename Usage: [OLDNAME NEWNAME]") return name1 = args[0] name2 = args[1] - print("Renaming %s to %s" % (name1, name2)) + logging_info("Renaming %s to %s", name1, name2) enc_name1 = bytearray(name1, 'ascii') enc_name2 = bytearray(name2, 'ascii') enc_both = enc_name1 + b'\x00' + enc_name2 op = FTP_OP(self.seq, self.session, OP_Rename, len(enc_both), 0, 0, 0, enc_both) - self.send(op) + self.__send(op) - def handle_rename_reply(self, op, m): + def __handle_rename_reply(self, op, _m): '''handle rename reply''' if op.opcode != OP_Ack: - print("Rename failed %s" % op) + logging_error("Rename failed %s", op) + self.op_pending = False def cmd_mkdir(self, args): '''make directory''' if len(args) < 1: - print("Usage: mkdir NAME") + logging_error("Usage: mkdir NAME") return name = args[0] - print("Creating directory %s" % name) + logging_info("Creating directory %s", name) enc_name = bytearray(name, 'ascii') op = FTP_OP(self.seq, self.session, OP_CreateDirectory, len(enc_name), 0, 0, 0, enc_name) - self.send(op) + self.__send(op) - def handle_mkdir_reply(self, op, m): + def __handle_mkdir_reply(self, op, _m): '''handle mkdir reply''' if op.opcode != OP_Ack: - print("Create directory failed %s" % op) + logging_error("Create directory failed %s", op) + self.op_pending = False def cmd_crc(self, args): '''get crc''' if len(args) < 1: - print("Usage: crc NAME") + logging_error("cmd_crc Usage: [NAME]") return name = args[0] self.filename = name - self.op_start = time.time() - print("Getting CRC for %s" % name) + self.op_start = time_time() + logging_info("Getting CRC for %s", name) enc_name = bytearray(name, 'ascii') op = FTP_OP(self.seq, self.session, OP_CalcFileCRC32, len(enc_name), 0, 0, 0, bytearray(enc_name)) - self.send(op) + self.__send(op) - def handle_crc_reply(self, op, m): + def __handle_crc_reply(self, op, _m): '''handle crc reply''' if op.opcode == OP_Ack and op.size == 4: crc, = struct.unpack(" 1: - print("< %s dt=%.2f" % (op, dt)) + if self.ftp_settings_debug > 1: + logging_info("< %s dt=%.2f", op, dt) self.last_op_time = now - if self.ftp_settings.pkt_loss_rx > 0: - if random.uniform(0,100) < self.ftp_settings.pkt_loss_rx: - if self.ftp_settings.debug > 1: - print("FTP: dropping packet RX") + if self.ftp_settings_pkt_loss_rx > 0: + if random_uniform(0, 100) < self.ftp_settings_pkt_loss_rx: + if self.ftp_settings_debug > 1: + logging_warning("FTP: dropping packet RX") return if op.req_opcode == self.last_op.opcode and op.seq == (self.last_op.seq + 1) % 256: self.rtt = max(min(self.rtt, dt), 0.01) if op.req_opcode == OP_ListDirectory: - self.handle_list_reply(op, m) + self.__handle_list_reply(op, m) elif op.req_opcode == OP_OpenFileRO: - self.handle_open_RO_reply(op, m) + self.__handle_open_ro_reply(op, m) elif op.req_opcode == OP_BurstReadFile: - self.handle_burst_read(op, m) + self.__handle_burst_read(op, m) elif op.req_opcode == OP_TerminateSession: pass elif op.req_opcode == OP_CreateFile: - self.handle_create_file_reply(op, m) + self.__handle_create_file_reply(op, m) elif op.req_opcode == OP_WriteFile: - self.handle_write_reply(op, m) + self.__handle_write_reply(op, m) elif op.req_opcode in [OP_RemoveFile, OP_RemoveDirectory]: - self.handle_remove_reply(op, m) + self.__handle_remove_reply(op, m) elif op.req_opcode == OP_Rename: - self.handle_rename_reply(op, m) + self.__handle_rename_reply(op, m) elif op.req_opcode == OP_CreateDirectory: - self.handle_mkdir_reply(op, m) + self.__handle_mkdir_reply(op, m) elif op.req_opcode == OP_ReadFile: - self.handle_reply_read(op, m) + self.__handle_reply_read(op, m) elif op.req_opcode == OP_CalcFileCRC32: - self.handle_crc_reply(op, m) + self.__handle_crc_reply(op, m) else: - print('FTP Unknown %s' % str(op)) + logging_info('FTP Unknown %s', str(op)) - def send_gap_read(self, g): + def __send_gap_read(self, g): '''send a read for a gap''' (offset, length) = g - if self.ftp_settings.debug > 0: - print("Gap read of %u at %u rem=%u blog=%u" % (length, offset, len(self.read_gaps), self.backlog)) + if self.ftp_settings_debug > 0: + logging_info("Gap read of %u at %u rem=%u blog=%u", length, offset, len(self.read_gaps), self.backlog) read = FTP_OP(self.seq, self.session, OP_ReadFile, length, 0, 0, offset, None) - self.send(read) + self.__send(read) self.read_gaps.remove(g) self.read_gaps.append(g) - self.last_gap_send = time.time() + self.last_gap_send = time_time() self.read_gap_times[g] = self.last_gap_send self.backlog += 1 - def check_read_send(self): + def __check_read_send(self): '''see if we should send another gap read''' if len(self.read_gaps) == 0: return g = self.read_gaps[0] - now = time.time() + now = time_time() dt = now - self.read_gap_times[g] if not self.reached_eof: # send gap reads once - for g in self.read_gap_times.keys(): - if self.read_gap_times[g] == 0: - self.send_gap_read(g) + for g, v in self.read_gap_times.items(): + if v == 0: + self.__send_gap_read(g) return - if self.read_gap_times[g] > 0 and dt > self.ftp_settings.retry_time: + if self.read_gap_times[g] > 0 and dt > self.ftp_settings_retry_time: if self.backlog > 0: self.backlog -= 1 self.read_gap_times[g] = 0 @@ -795,17 +821,17 @@ def check_read_send(self): if self.read_gap_times[g] != 0: # still pending return - if not self.reached_eof and self.backlog >= self.ftp_settings.max_backlog: + if not self.reached_eof and self.backlog >= self.ftp_settings_max_backlog: # don't fill queue too far until we have got past the burst return if now - self.last_gap_send < 0.05: # don't send too fast return - self.send_gap_read(g) + self.__send_gap_read(g) - def idle_task(self): + def __idle_task(self): '''check for file gaps and lost requests''' - now = time.time() + now = time_time() # see if we lost an open reply if self.op_start is not None and now - self.op_start > 1.0 and self.last_op.opcode == OP_OpenFileRO: @@ -814,15 +840,15 @@ def idle_task(self): if self.open_retries > 2: # fail the get self.op_start = None - self.terminate_session() + self.__terminate_session() return - if self.ftp_settings.debug > 0: - print("FTP: retry open") + if self.ftp_settings_debug > 0: + logging_info("FTP: retry open") send_op = self.last_op - self.send(FTP_OP(self.seq, self.session, OP_TerminateSession, 0, 0, 0, 0, None)) + self.__send(FTP_OP(self.seq, self.session, OP_TerminateSession, 0, 0, 0, 0, None)) self.session = (self.session + 1) % 256 send_op.session = self.session - self.send(send_op) + self.__send(send_op) if len(self.read_gaps) == 0 and self.last_burst_read is None and self.write_list is None: return @@ -831,20 +857,180 @@ def idle_task(self): return # see if burst read has stalled - if not self.reached_eof and self.last_burst_read is not None and now - self.last_burst_read > self.ftp_settings.retry_time: + if not self.reached_eof and self.last_burst_read is not None and \ + now - self.last_burst_read > self.ftp_settings_retry_time: dt = now - self.last_burst_read self.last_burst_read = now - if self.ftp_settings.debug > 0: - print("Retry read at %u rtt=%.2f dt=%.2f" % (self.fh.tell(), self.rtt, dt)) - self.send(FTP_OP(self.seq, self.session, OP_BurstReadFile, self.burst_size, 0, 0, self.fh.tell(), None)) + if self.ftp_settings_debug > 0: + logging_info("Retry read at %u rtt=%.2f dt=%.2f", self.fh.tell(), self.rtt, dt) + self.__send(FTP_OP(self.seq, self.session, OP_BurstReadFile, self.burst_size, 0, 0, self.fh.tell(), None)) self.read_retries += 1 # see if we can fill gaps - self.check_read_send() + self.__check_read_send() - if self.write_list is not None: - self.send_more_writes() + def execute_ftp_operation(self, timeout=5): + start_time = time_time() + timed_out = False + while self.op_pending and not timed_out: + m = self.master.recv_match(type=['FILE_TRANSFER_PROTOCOL'], timeout=0.1) + if m is not None: + self.__mavlink_packet(m) + self.__idle_task() + timed_out = time_time() - start_time > timeout + if timed_out: + logging_warning("FTP timed out") + self.__terminate_session() + + @staticmethod + def ftp_param_decode(data): # pylint: disable=too-many-locals + '''decode parameter data, returning ParamData''' + pdata = ParamData() + + magic = 0x671b + magic_defaults = 0x671c + if len(data) < 6: + return None + magic2, _num_params, total_params = struct.unpack(" 0 and data[0] == pad_byte: + data = data[1:] # skip pad bytes + + if len(data) == 0: + break + + ptype, plen = struct.unpack("> 4) & 0x0F + has_default = with_defaults and (flags & 1) != 0 + ptype &= 0x0F + + if ptype not in data_types: + logging_error("paramftp: bad type 0x%x", ptype) + return None + + (type_len, type_format) = data_types[ptype] + default_len = type_len if has_default else 0 + + name_len = ((plen >> 4) & 0x0F) + 1 + common_len = plen & 0x0F + name = last_name[0:common_len] + data[2:2+name_len] + vdata = data[2+name_len:2+name_len+type_len+default_len] + last_name = name + data = data[2+name_len+type_len+default_len:] + if with_defaults: + if has_default: + v1, v2, = struct.unpack("<" + type_format + type_format, vdata) + pdata.add_param(name, v1, ptype) + pdata.add_default(name, v2, ptype) + else: + v, = struct.unpack("<" + type_format, vdata) + pdata.add_param(name, v, ptype) + pdata.add_default(name, v, ptype) + else: + v, = struct.unpack("<" + type_format, vdata) + pdata.add_param(name, v, ptype) + count += 1 + + if count != total_params: + logging_error("paramftp: bad count %u should be %u", count, total_params) + return None + + return pdata + + +if __name__ == "__main__": + + def argument_parser(): + """ + Parses command-line arguments for the script. + + This function sets up an argument parser to handle the command-line arguments for the script. + """ + parser = ArgumentParser(description='This main is for testing and development only. ' + 'Usually, the mavftp is called from another script') + parser.add_argument("--baudrate", type=int, + help="master port baud rate", default=115200) + parser.add_argument("--device", required=True, help="serial device") + parser.add_argument("--source-system", dest='SOURCE_SYSTEM', type=int, + default=250, help='MAVLink source system for this GCS') + parser.add_argument("--loglevel", default="INFO", help="log level") + parser.add_argument("--filename", default="@PARAM/param.pck?withdefaults=1", help="file to fetch") + parser.add_argument("--decode-parameters", action='store_true', help="decode as a parameter file") + + return parser.parse_args() + + def wait_heartbeat(m): + '''wait for a heartbeat so we know the target system IDs''' + logging_info("Waiting for flight controller heartbeat") + m.wait_heartbeat(timeout=5) + logging_info("Heartbeat from system %u, component %u", m.target_system, m.target_system) + + def main(): + '''for testing/example purposes only''' + args = argument_parser() + + logging_basicConfig(level=logging_getLevelName(args.loglevel), format='%(asctime)s - %(levelname)s - %(message)s') + + logging_warning("This main is for testing and development only. " + "Usually the backend_mavftp is called from another script") + + # create a mavlink serial instance + master = mavutil.mavlink_connection(args.device, baud=args.baudrate, source_system=args.SOURCE_SYSTEM) + + # wait for the heartbeat msg to find the system ID + wait_heartbeat(master) + + mavftp = MAVFTP(master, + target_system=master.target_system, + target_component=master.target_component) + + def callback(fh): + '''called on ftp completion''' + data = fh.read() + logging_info("done! got %u bytes", len(data)) + if args.decode_parameters: + pdata = MAVFTP.ftp_param_decode(data) + if pdata is None: + logging_error("param decode failed") + sys.exit(1) + + pdict = {} + defdict = {} + for (name,value,_ptype) in pdata.params: + pdict[name] = value + if pdata.defaults: + for (name,value,_ptype) in pdata.defaults: + defdict[name] = value + for n in sorted(pdict.keys()): + if n in defdict: + logging_info("%-16s %f (default %f)", n.decode('utf-8'), pdict[n], defdict[n]) + else: + logging_info("%-16s %f", n.decode('utf-8'), pdict[n]) + sys.exit(0) + + mavftp.cmd_get([args.filename], callback=callback) + + while True: + m = master.recv_match(type=['FILE_TRANSFER_PROTOCOL'], timeout=0.1) + if m is not None: + mavftp.__mavlink_packet(m) + mavftp.__idle_task() -def init(mpstate): - '''initialise module''' - return FTPModule(mpstate) + main()