Skip to content

Commit

Permalink
examples: change mavftp class to be a python3 library with a simple API.
Browse files Browse the repository at this point in the history
Add a example that uses the library
  • Loading branch information
amilcarlucas committed Aug 8, 2024
1 parent 6827605 commit c1114ac
Show file tree
Hide file tree
Showing 2 changed files with 727 additions and 311 deletions.
246 changes: 246 additions & 0 deletions examples/mavftp_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
#!/usr/bin/env python3

'''
MAVLink File Transfer Protocol support example
SPDX-FileCopyrightText: 2024 Amilcar Lucas
SPDX-License-Identifier: GPL-3.0-or-later
'''

from argparse import ArgumentParser

from logging import basicConfig as logging_basicConfig
from logging import getLevelName as logging_getLevelName

from logging import info as logging_info
from logging import warning as logging_warning
from logging import error as logging_error

import sys
import os
import requests

from typing import Tuple


from pymavlink import mavutil
import mavftp


if __name__ == "__main__":

def argument_parser():
"""
Parses command-line arguments for the script.
This function sets up an argument parser to handle the command-line arguments for the script.
Returns:
argparse.Namespace: An object containing the parsed arguments.
"""
parser = ArgumentParser(description='This main is for testing and development only. '
'Usually, the mavftp is called from another script')
parser.add_argument("--baudrate", type=int,
help="master port baud rate", default=115200)
parser.add_argument("--device", required=True, help="serial device")
parser.add_argument("--source-system", dest='SOURCE_SYSTEM', type=int,
default=250, help='MAVLink source system for this GCS')
parser.add_argument("--loglevel", default="INFO", help="log level")
parser.add_argument("--filename", default="@PARAM/param.pck?withdefaults=1", help="file to fetch")
parser.add_argument("--decode-parameters", action='store_true', help="decode as a parameter file")
parser.add_argument("--output-parameters-to-files", action='store_true', help="output parameters to files")
parser.add_argument('-s', '--sort',
choices=['none', 'missionplanner', 'mavproxy'],
default='missionplanner',
help='Sort the parameters in the file. Defaults to %(default)s.',
)

return parser.parse_args()


def wait_heartbeat(m):
'''wait for a heartbeat so we know the target system IDs'''
logging_info("Waiting for flight controller heartbeat")
m.wait_heartbeat()
logging_info("Got heartbeat from system %u, component %u", m.target_system, m.target_system)


def missionplanner_sort(item: str) -> Tuple[str, ...]:
"""
Sorts a parameter name according to the rules defined in the Mission Planner software.
"""
return tuple(item.split("_"))


def extract_params(pdata, sort_type):
pdict = {}
if pdata:
for (name, value, _ptype) in pdata:
pdict[name.decode('utf-8')] = value

if sort_type == "missionplanner":
pdict = dict(sorted(pdict.items(), key=lambda x: missionplanner_sort(x[0]))) # sort alphabetically
elif sort_type == "mavproxy":
pdict = dict(sorted(pdict.items())) # sort in ASCIIbetical order
elif sort_type == "none":
pass
return pdict


def save_params(pdict, filename, sort_type):
if not pdict:
return
with open(filename, 'w', encoding='utf-8') as f:
for name, value in pdict.items():
if sort_type == "missionplanner":
f.write(f"{name},{format(value, '.6f').rstrip('0').rstrip('.')}\n")
elif sort_type == "mavproxy":
f.write(f"{name:<16} {value:<8.6f}")
elif sort_type == "none":
f.write(f"{name:<16} {value:<8.6f}")
logging_info("Outputted %u parameters to %s", len(pdict), filename)


def get_params(args, master, mav_ftp):
def get_param_callback(fh):
'''called on ftp completion'''
if fh is None:
return
data = fh.read()
logging_info("param get done! got %u bytes", len(data))
if args.decode_parameters or args.output_parameters_to_files:
pdata = mavftp.MAVFTP.ftp_param_decode(data)
if pdata is None:
logging_error("param decode failed")
sys.exit(1)

pdict = extract_params(pdata.params, args.sort)
defdict = extract_params(pdata.defaults, args.sort)

if args.decode_parameters:
for name, value in pdict.items():
if name in defdict:
print(f"{name:<16} {value} (default {defdict[name]})")
else:
print(f"{name:<16} {value}")

if args.output_parameters_to_files:
save_params(pdict, 'params.param', args.sort)
save_params(defdict, 'defaults.param', args.sort)

#sys.exit(0)

mav_ftp.cmd_get([args.filename], callback=get_param_callback)

while mav_ftp.op_pending:
m = master.recv_match(type=['FILE_TRANSFER_PROTOCOL'], timeout=0.1)
if m is not None:
mav_ftp.mavlink_packet(m)
mav_ftp.idle_task()


def get_last_log(master, mav_ftp):
mav_ftp.cmd_list(['/log'])

while mav_ftp.op_pending:
m = master.recv_match(type=['FILE_TRANSFER_PROTOCOL'], timeout=0.1)
if m is not None:
mav_ftp.mavlink_packet(m)
mav_ftp.idle_task()


def get_list_dir(master, mav_ftp, directory):
mav_ftp.cmd_list([directory])

while mav_ftp.op_pending:
m = master.recv_match(type=['FILE_TRANSFER_PROTOCOL'], timeout=0.1)
if m is not None:
mav_ftp.mavlink_packet(m)
mav_ftp.idle_task()


def download_script(url, local_filename):
# Download the script from the internet to the PC
response = requests.get(url)

if response.status_code == 200:
with open(local_filename, "wb") as file:
file.write(response.content)
else:
logging_error("Failed to download the file")


def create_directory(master, mav_ftp, remote_directory):
mav_ftp.cmd_mkdir([remote_directory])

while mav_ftp.op_pending:
m = master.recv_match(type=['FILE_TRANSFER_PROTOCOL'], timeout=0.1)
if m is not None:
mav_ftp.mavlink_packet(m)
mav_ftp.idle_task()


def upload_script(master, mav_ftp, remote_directory, local_filename):
# Upload it from the PC to the flight controller

def put_callback(len):
logging_info('Uploaded %u bytes to %s file', len, local_filename)

mav_ftp.cmd_put([local_filename, remote_directory], callback=put_callback)

while mav_ftp.op_pending:
m = master.recv_match(type=['FILE_TRANSFER_PROTOCOL'], timeout=0.1)
if m is not None:
mav_ftp.mavlink_packet(m)
mav_ftp.idle_task()


def main():
'''for testing/example purposes only'''
args = argument_parser()

logging_basicConfig(level=logging_getLevelName(args.loglevel), format='%(asctime)s - %(levelname)s - %(message)s')

logging_warning("This main is for testing and development only. "
"Usually the mavftp is called from another script")

# create a mavlink serial instance
master = mavutil.mavlink_connection(args.device, baud=args.baudrate, source_system=args.SOURCE_SYSTEM)

# wait for the heartbeat msg to find the system ID
wait_heartbeat(master)

mav_ftp = mavftp.MAVFTP(master,
target_system=master.target_system,
target_component=master.target_component)

get_params(args, master, mav_ftp)

get_list_dir(master, mav_ftp, '/')

#get_last_log(master, mav_ftp)

remote_directory = '/APM/Scripts/'
#create_directory(master, mav_ftp, remote_directory)

url = "https://discuss.ardupilot.org/uploads/short-url/4pyrl7PcfqiMEaRItUhljuAqLSs.lua"
local_filename = "copter-magfit-helper.lua"

if not os.path.exists(local_filename):
download_script(url, local_filename)

upload_script(master, mav_ftp, remote_directory, local_filename)

url = "https://raw.githubusercontent.com/ArduPilot/ardupilot/Copter-4.5/libraries/AP_Scripting/applets/VTOL-quicktune.lua"
local_filename = "VTOL-quicktune.lua"

if not os.path.exists(local_filename):
download_script(url, local_filename)

upload_script(master, mav_ftp, remote_directory, local_filename)

master.close()


main()
Loading

0 comments on commit c1114ac

Please sign in to comment.