From 843d0fc54016fc0340687bd592324993ce24c4de Mon Sep 17 00:00:00 2001 From: Eric Date: Sat, 22 Jul 2023 11:10:10 -0700 Subject: [PATCH] updating takeoff to support both px4 and ardupilot --- change_mode.py | 21 +---- takeoff.py | 86 ++++++++----------- utilities/connect_to_sysid.py | 22 +++++ .../get_autopilot_info.py | 0 utilities/wait_for_position_aiding.py | 56 ++++++++++++ 5 files changed, 116 insertions(+), 69 deletions(-) create mode 100644 utilities/connect_to_sysid.py rename get_autopilot_info.py => utilities/get_autopilot_info.py (100%) create mode 100644 utilities/wait_for_position_aiding.py diff --git a/change_mode.py b/change_mode.py index 22916f8..2b3f7ce 100644 --- a/change_mode.py +++ b/change_mode.py @@ -1,26 +1,7 @@ import sys from pymavlink import mavutil +from utilities.connect_to_sysid import connect_to_sysid import argparse -import time - -def connect_to_sysid(connection_str : str, sysid : int, timeout: float = 3) -> any: - """connect_to_sysid connects to a mavlink stream with a specific sysid - - Args: - connection_str (str, optional): _description_. Defaults to 3)->mavutil.mavlink_connection(. - - Returns: - _type_: _description_ - """ - time_start = time.time() - while time.time() - time_start < timeout: - the_connection = mavutil.mavlink_connection(connection_str) - the_connection.wait_heartbeat() - print( - f"Heartbeat from system system {the_connection.target_system} component {the_connection.target_component}") - if the_connection.target_system == sysid: - print(f"Now connected to SYSID {sysid}") - return the_connection def change_mode(master, mode): diff --git a/takeoff.py b/takeoff.py index 1e74f4d..cdeedaa 100644 --- a/takeoff.py +++ b/takeoff.py @@ -1,80 +1,68 @@ # takeoff.py import argparse from pymavlink import mavutil -import time - -def get_enum_value_by_name(enum_dict, name): - for key, enum_entry in enum_dict.items(): - if enum_entry.name == name: - return key - raise ValueError("No enum entry with name: " + name) - -def wait_until_position_aiding(mav_connection): - flags = ['EKF_PRED_POS_HORIZ_REL', 'EKF_PRED_POS_HORIZ_REL'] - time_start = time.time() - while not ekf_pos_aiding(mav_connection, flags) and time.time() - time_start < 120: - print(time.time() - time_start) - continue - return - -def ekf_pos_aiding(mav_connection, flags): - msg = mav_connection.recv_match(type='EKF_STATUS_REPORT', blocking=True, timeout=3) - if not msg: - return 0 - print(f"from sysid {msg.get_srcSystem()} {msg}") - ekf_flags = msg.flags - - for flag in flags: - flag_val = get_enum_value_by_name(mavutil.mavlink.enums['EKF_STATUS_FLAGS'], flag) - if ekf_flags & flag_val: - continue - else: - return 0 - - return 1 - - - -def takeoff(mav_connection, takeoff_altitude): - mav_connection.wait_heartbeat() - wait_until_position_aiding(mav_connection) +from utilities.connect_to_sysid import connect_to_sysid +from utilities.wait_for_position_aiding import wait_until_position_aiding +from utilities.get_autopilot_info import get_autopilot_info + + +def takeoff(mav_connection, takeoff_altitude: float, tgt_sys_id: int = 1, tgt_comp_id=1): + print("Heartbeat from system (system %u component %u)" % (mav_connection.target_system, mav_connection.target_component)) - mode_id = mav_connection.mode_mapping()["TAKEOFF"] + wait_until_position_aiding(mav_connection) + + autopilot_info = get_autopilot_info(mav_connection, tgt_sys_id) + + if autopilot_info["autopilot"] == "ardupilotmega": + print("Connected to ArduPilot autopilot") + mode_id = mav_connection.mode_mapping()["GUIDED"] + takeoff_params = [0, 0, 0, 0, 0, 0, takeoff_altitude] + + elif autopilot_info["autopilot"] == "px4": + print("Connected to PX4 autopilot") + print(mav_connection.mode_mapping()) + mode_id = mav_connection.mode_mapping()["TAKEOFF"][1] + print(mode_id) + msg = mav_connection.recv_match(type='GLOBAL_POSITION_INT', blocking=True) + starting_alt = msg.alt / 1000 + takeoff_params = [0, 0, 0, 0, float("NAN"), float("NAN"), starting_alt + takeoff_altitude] + else: + raise ValueError("Autopilot not supported") - mav_connection.mav.command_long_send(1, 1, mavutil.mavlink.MAV_CMD_DO_SET_MODE, + + # Change mode to guided (Ardupilot) or takeoff (PX4) + mav_connection.mav.command_long_send(tgt_sys_id, tgt_comp_id, mavutil.mavlink.MAV_CMD_DO_SET_MODE, 0, mavutil.mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED, mode_id, 0, 0, 0, 0, 0) ack_msg = mav_connection.recv_match(type='COMMAND_ACK', blocking=True, timeout=3) print(f"Change Mode ACK: {ack_msg}") - mav_connection.mav.command_long_send(1, 1, + # Arm the UAS + mav_connection.mav.command_long_send(tgt_sys_id, tgt_comp_id, mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM, 0, 1, 0, 0, 0, 0, 0, 0) arm_msg = mav_connection.recv_match(type='COMMAND_ACK', blocking=True, timeout=3) print(f"Arm ACK: {arm_msg}") - mav_connection.mav.command_long_send(1, 1, - mavutil.mavlink.MAV_CMD_NAV_TAKEOFF, 0, -1.0, 0, 0, 0, float("NAN"), float("NAN"), 600) + # Command Takeoff + mav_connection.mav.command_long_send(tgt_sys_id, tgt_comp_id, + mavutil.mavlink.MAV_CMD_NAV_TAKEOFF, 0, takeoff_params[0], takeoff_params[1], takeoff_params[2], takeoff_params[3], takeoff_params[4], takeoff_params[5], takeoff_params[6]) takeoff_msg = mav_connection.recv_match(type='COMMAND_ACK', blocking=True, timeout=3) print(f"Takeoff ACK: {takeoff_msg}") - mav_connection.mav.command_long_send(1, 1, - mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM, 0, 1, 0, 0, 0, 0, 0, 0) - - arm_msg = mav_connection.recv_match(type='COMMAND_ACK', blocking=True, timeout=3) - print(f"Arm ACK: {arm_msg}") return takeoff_msg.result def main(): parser = argparse.ArgumentParser(description="A simple script to command a UAV to takeoff.") parser.add_argument("--altitude", type=int, help="Altitude for the UAV to reach upon takeoff.", default=10) + parser.add_argument("--sysid", type=int, help="System ID of the UAV to command.", default=1) - args = parser.parse_args() - mav_connection = mavutil.mavlink_connection('udpin:localhost:14551') + args = parser.parse_args() + mav_connection = connect_to_sysid('udpin:localhost:14551', args.sysid) takeoff(mav_connection, args.altitude) if __name__ == "__main__": diff --git a/utilities/connect_to_sysid.py b/utilities/connect_to_sysid.py new file mode 100644 index 0000000..bb8fe55 --- /dev/null +++ b/utilities/connect_to_sysid.py @@ -0,0 +1,22 @@ + +from pymavlink import mavutil +import time + +def connect_to_sysid(connection_str : str, sysid : int, timeout: float = 3) -> any: + """connect_to_sysid connects to a mavlink stream with a specific sysid + + Args: + connection_str (str, optional): _description_. Defaults to 3)->mavutil.mavlink_connection(. + + Returns: + _type_: _description_ + """ + time_start = time.time() + while time.time() - time_start < timeout: + the_connection = mavutil.mavlink_connection(connection_str) + the_connection.wait_heartbeat() + print( + f"Heartbeat from system system {the_connection.target_system} component {the_connection.target_component}") + if the_connection.target_system == sysid: + print(f"Now connected to SYSID {sysid}") + return the_connection \ No newline at end of file diff --git a/get_autopilot_info.py b/utilities/get_autopilot_info.py similarity index 100% rename from get_autopilot_info.py rename to utilities/get_autopilot_info.py diff --git a/utilities/wait_for_position_aiding.py b/utilities/wait_for_position_aiding.py new file mode 100644 index 0000000..d795809 --- /dev/null +++ b/utilities/wait_for_position_aiding.py @@ -0,0 +1,56 @@ +import time +import sys +import os +from pymavlink import mavutil + +# add this folder to the path +utilities_path = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(utilities_path) +from get_autopilot_info import get_autopilot_info + + +def get_enum_value_by_name(enum_dict, name): + for key, enum_entry in enum_dict.items(): + if enum_entry.name == name: + return key + raise ValueError("No enum entry with name: " + name) + + +def wait_until_position_aiding(mav_connection): + autopilot_info = get_autopilot_info(mav_connection) + if autopilot_info["autopilot"] == "ardupilotmega": + estimator_status_msg = "EKF_STATUS_REPORT" + elif autopilot_info["autopilot"] == "px4": + estimator_status_msg = "ESTIMATOR_STATUS" + else: + raise ValueError("Autopilot not supported") + + flags = ["EKF_PRED_POS_HORIZ_REL", "EKF_PRED_POS_HORIZ_REL"] + time_start = time.time() + while ( + not ekf_pos_aiding(mav_connection, flags, estimator_status_msg) + and time.time() - time_start < 120 + ): + print(time.time() - time_start) + continue + return + + +def ekf_pos_aiding(mav_connection, flags, estimator_status_msg="ESTIMATOR_STATUS"): + # msg = mav_connection.recv_match(type='EKF_STATUS_REPORT', blocking=True, timeout=3) + msg = mav_connection.recv_match(type=estimator_status_msg, blocking=True, timeout=3) + if not msg: + return 0 + print(f"from sysid {msg.get_srcSystem()} {msg}") + ekf_flags = msg.flags + + for flag in flags: + flag_val = get_enum_value_by_name( + mavutil.mavlink.enums["EKF_STATUS_FLAGS"], flag + ) + if ekf_flags & flag_val: + continue + else: + return 0 + + return 1