From 7ec42487f62eb07d8542f873d14eea8148ae6b7c Mon Sep 17 00:00:00 2001 From: "Dr.-Ing. Amilcar do Carmo Lucas" Date: Thu, 19 Sep 2024 01:38:15 +0200 Subject: [PATCH] pylint fixes to mavftp --- mavftp.py | 51 +++++++++++++++++++++++---------------------------- mavftp_op.py | 28 ++++++++++++++++++++++++++-- 2 files changed, 49 insertions(+), 30 deletions(-) diff --git a/mavftp.py b/mavftp.py index 7bf3bf040..5ca64dc87 100644 --- a/mavftp.py +++ b/mavftp.py @@ -31,6 +31,9 @@ from pymavlink import mavutil + +# pylint: disable=too-many-lines + from pymavlink.mavftp_op import ( FTP_OP, OP_Ack, @@ -53,9 +56,9 @@ OP_CalcFileCRC32, ) - -# error codes +# pylint: disable=invalid-name class FtpError(IntEnum): + """error codes""" Success = 0 Fail = 1 FailErrno = 2 @@ -84,21 +87,11 @@ class FtpError(IntEnum): @dataclass class DirectoryEntry: + """Directory entry, either a file or a directory and the size""" name: str is_dir: bool size_b: int - def items(self): - """Yield each attribute and its value for the FTP_OP instance. For debugging purposes.""" - yield "seq", self.seq - yield "session", self.session - yield "opcode", self.opcode - yield "size", self.size - yield "req_opcode", self.req_opcode - yield "burst_complete", self.burst_complete - yield "offset", self.offset - yield "payload", self.payload - class WriteQueue: # pylint: disable=too-few-public-methods """ @@ -238,7 +231,7 @@ class MAVFTP: # pylint: disable=too-many-instance-attributes Handles file operations such as reading, writing, listing directories, and managing sessions. """ - def __init__(self, master, target_system, target_component, + def __init__(self, master, target_system, target_component, # pylint: disable=too-many-statements settings=MAVFTPSettings( [('debug', int, 0), ('pkt_loss_tx', int, 0), @@ -298,6 +291,8 @@ def __init__(self, master, target_system, target_component, self.master = master self.target_system = target_system self.target_component = target_component + self.get_result = None + self.done = False # Reset the flight controller FTP state-machine self.__send(FTP_OP(self.seq, self.session, OP_ResetSessions, 0, 0, 0, 0, None)) @@ -415,7 +410,7 @@ def __handle_list_reply(self, op, _m) -> MAVFTPReturn: self.dir_offset += 1 try: dir_entry = str(d, "ascii") - except Exception as error: + except Exception as error: # pylint: disable=broad-exception-caught logging.debug(error) continue if dir_entry[0] == "D": @@ -435,7 +430,7 @@ def __handle_list_reply(self, op, _m) -> MAVFTPReturn: def read_sector(self, path: str, offset: int, size: int) -> Optional[bytes]: - logging.info(f"reading sector {path}, offset={offset}, size={size}") + logging.info("reading sector %s, offset=%u, size=%u", path, offset, size) return self.read(path, size, offset) def read(self, path: str, size: int, offset: int = 0) -> Optional[bytes]: @@ -446,7 +441,7 @@ def read(self, path: str, size: int, offset: int = 0) -> Optional[bytes]: self.filename = path self.done = False - logging.info(f"Getting {path} starting at {self.requested_offset}, reading {self.requested_size} bytes") + logging.info("Getting %s starting at %u reading %u bytes", path, self.requested_offset, self.requested_size) self.op_start = time.time() self.read_total = 0 @@ -477,10 +472,10 @@ def read(self, path: str, size: int, offset: int = 0) -> Optional[bytes]: logging.error(e) self.__idle_task() time.sleep(0.0001) - logging.info(f"loop closed, gaps:{self.read_gaps}, done: {self.done}") + logging.info("loop closed, gaps:%u, done: %u", self.read_gaps, self.done) if len(self.read_gaps) == 0: return self.get_result - logging.error(f"closed read with {self.read_gaps} gaaps") + logging.error("closed read with %u gaps", self.read_gaps) return None def cmd_get(self, args, callback=None, progress_callback=None) -> MAVFTPReturn: @@ -569,10 +564,9 @@ def __check_read_finished(self) -> bool: self.fh.seek(0) print(self.fh.read().decode('utf-8')) else: - logging.info( - f"Wrote {self.read_total}/{self.requested_size} bytes to {self.filename} in {dt:.2f}s {rate:.1f}kByte/s" - ) - logging.info(f"terminating with {self.read_total} out of {self.requested_size} (ofs={ofs})") + logging.info("Wrote %u/%u bytes to %s in %.2fs %.1fkByte/s", + self.read_total, self.requested_size, self.filename, dt, rate) + logging.info("terminating with %u out of %u (ofs=%u)", self.read_total, self.requested_size, ofs) self.done = True assert self.fh is not None @@ -581,8 +575,8 @@ def __check_read_finished(self) -> bool: self.get_result = result[self.requested_offset : self.requested_offset + self.requested_size] assert self.get_result is not None if len(self.get_result) < self.requested_size: - logging.warning(f"expected {self.requested_size}, got {len(self.get_result)}") - logging.info(f"read {len(self.get_result)} bytes") + logging.warning("expected %u, got %u", self.requested_size, len(self.get_result)) + logging.info("read %u bytes", len(self.get_result)) self.fh.flush() self.fh.close() self.__terminate_session() @@ -1155,7 +1149,7 @@ def process_ftp_reply(self, operation_name, timeout=5) -> MAVFTPReturn: break return ret - def __decode_ftp_ack_and_nack(self, op: FTP_OP, operation_name: str='') -> MAVFTPReturn: + def __decode_ftp_ack_and_nack(self, op: FTP_OP, operation_name: str='') -> MAVFTPReturn: # pylint: disable=too-many-branches '''decode FTP Acknowledge reply''' system_error = 0 invalid_error_code = 0 @@ -1196,8 +1190,9 @@ def __decode_ftp_ack_and_nack(self, op: FTP_OP, operation_name: str='') -> MAVFT error_code = FtpError.NoErrorCodeInNack elif error_code == FtpError.FailErrno: error_code = FtpError.NoFilesystemErrorInPayload - elif error_code not in [FtpError.Fail, FtpError.InvalidDataSize, FtpError.InvalidSession, FtpError.NoSessionsAvailable, - FtpError.EndOfFile, FtpError.UnknownCommand, FtpError.FileExists, FtpError.FileProtected, + elif error_code not in [FtpError.Fail, FtpError.InvalidDataSize, FtpError.InvalidSession, + FtpError.NoSessionsAvailable, FtpError.EndOfFile, + FtpError.UnknownCommand, FtpError.FileExists, FtpError.FileProtected, FtpError.FileNotFound]: invalid_error_code = error_code error_code = FtpError.InvalidErrorCode diff --git a/mavftp_op.py b/mavftp_op.py index e4c9590e5..682b23810 100644 --- a/mavftp_op.py +++ b/mavftp_op.py @@ -1,8 +1,18 @@ +#!/usr/bin/env python3 + +''' +MAVLink File Transfer Protocol support - https://mavlink.io/en/services/ftp.html + +SPDX-FileCopyrightText: 2011-2024 Andrew Tridgell, 2024 Amilcar Lucas + +SPDX-License-Identifier: GPL-3.0-or-later +''' import struct from typing import Optional +# pylint: disable=invalid-name OP_None = 0 OP_TerminateSession = 1 OP_ResetSessions = 2 @@ -22,12 +32,14 @@ OP_Ack = 128 OP_Nack = 129 +# pylint: enable=invalid-name # pylint: disable=too-many-arguments # pylint: disable=too-many-instance-attributes -class FTP_OP: +class FTP_OP: # pylint: disable=invalid-name + """FTP operation""" def __init__( self, seq: int, @@ -70,7 +82,19 @@ def __str__(self) -> str: plen = 0 if self.payload is not None: plen = len(self.payload) - ret = f"OP seq:{self.seq} sess:{self.session} opcode:{self.opcode} req_opcode:{self.req_opcode} size:{self.size} bc:{self.burst_complete} ofs:{self.offset} plen={plen}" + ret = f"OP seq:{self.seq} sess:{self.session} opcode:{self.opcode} req_opcode:{self.req_opcode} size:{self.size}" \ + f" bc:{self.burst_complete} ofs:{self.offset} plen={plen}" if self.payload is not None and plen > 0: ret += f" [{self.payload[0]}]" return ret + + def items(self): + """Yield each attribute and its value for the FTP_OP instance. For debugging purposes.""" + yield "seq", self.seq + yield "session", self.session + yield "opcode", self.opcode + yield "size", self.size + yield "req_opcode", self.req_opcode + yield "burst_complete", self.burst_complete + yield "offset", self.offset + yield "payload", self.payload