From a8a4069e21602d48bf57d7ca25f9c941c4d71e8f Mon Sep 17 00:00:00 2001 From: Peter Mitrano Date: Mon, 18 Jan 2021 11:47:23 -0500 Subject: [PATCH] change return types when planning & executing hopefully this is easier to use than mysterious tuples of tuples also this returns planning info from plan_to_joint_config which previously was not done --- arm_robots/src/arm_robots/base_robot.py | 1 + arm_robots/src/arm_robots/robot.py | 82 ++++++++++++++++++++----- 2 files changed, 69 insertions(+), 14 deletions(-) diff --git a/arm_robots/src/arm_robots/base_robot.py b/arm_robots/src/arm_robots/base_robot.py index 34ef220..4afa79e 100644 --- a/arm_robots/src/arm_robots/base_robot.py +++ b/arm_robots/src/arm_robots/base_robot.py @@ -90,6 +90,7 @@ def close_right_gripper(self): self.right_gripper_command_pub.publish(self.get_close_gripper_msg()) def get_close_gripper_msg(self): + # FIXME: this is a bad abstraction, not all grippers work like this raise NotImplementedError() def get_open_gripper_msg(self): diff --git a/arm_robots/src/arm_robots/robot.py b/arm_robots/src/arm_robots/robot.py index 40be228..e0a0dde 100644 --- a/arm_robots/src/arm_robots/robot.py +++ b/arm_robots/src/arm_robots/robot.py @@ -1,4 +1,5 @@ #! /usr/bin/env python +from dataclasses import dataclass from typing import List, Union, Tuple, Callable, Optional, Dict import numpy as np @@ -14,6 +15,7 @@ from control_msgs.msg import FollowJointTrajectoryAction, FollowJointTrajectoryFeedback, FollowJointTrajectoryResult, \ FollowJointTrajectoryGoal from geometry_msgs.msg import Point, Pose, Quaternion +from moveit_msgs.msg import RobotTrajectory, MoveItErrorCodes from rosgraph.names import ns_join from rospy import logfatal from rospy.logger_level_service_caller import LoggerLevelServiceCaller @@ -25,6 +27,42 @@ "You have to call store_tool_orientations or store_current_tool_orientations first") +@dataclass +class ExecutionResult: + trajectory: Optional[JointTrajectory] + execution_result: Optional[FollowJointTrajectoryResult] + action_client_state: int + success: bool + + +class PlanningResult: + + def __init__(self, + move_group_plan_tuple: Optional[Tuple[bool, RobotTrajectory, float, MoveItErrorCodes]] = None, + success: Optional[bool] = None, + plan: Optional[RobotTrajectory] = None, + planning_time: Optional[float] = None, + planning_error_code: Optional[int] = None, + ): + if move_group_plan_tuple is not None: + self.success = move_group_plan_tuple[0] + self.plan = move_group_plan_tuple[1] + self.planning_time = move_group_plan_tuple[2] + self.planning_error_code = move_group_plan_tuple[3] + else: + self.success = success + self.plan = plan + self.planning_time = planning_time + self.planning_error_code = planning_error_code + + +class PlanningAndExecutionResult: + def __init__(self, planning_result: PlanningResult, execution_result: ExecutionResult): + self.planning_result = planning_result + self.execution_result = execution_result + self.success = planning_result.success or execution_result.success + + def set_move_group_log_level(event): # Up the logging level for MoveGroupInterface because it's annoying log_level = LoggerLevelServiceCaller() @@ -110,8 +148,9 @@ def plan_to_position(self, move_group = self.get_move_group_commander(group_name) move_group.set_end_effector_link(ee_link_name) move_group.set_position_target(list(target_position)) - plan = move_group.plan()[1] - return self.follow_arms_joint_trajectory(plan.joint_trajectory) + planning_result = PlanningResult(move_group.plan()) + execution_result = self.follow_arms_joint_trajectory(planning_result.plan.joint_trajectory) + return PlanningAndExecutionResult(planning_result, execution_result) def plan_to_position_cartesian(self, group_name: str, @@ -134,7 +173,9 @@ def plan_to_position_cartesian(self, plan, fraction = move_group.compute_cartesian_path(waypoints=waypoints, eef_step=step_size, jump_threshold=0.0) if fraction != 1.0: raise RuntimeError(f"Cartesian path is only {fraction * 100}% complete") - return self.follow_arms_joint_trajectory(plan.joint_trajectory) + planning_result = PlanningResult(success=(fraction == 1.0), plan=plan) + execution_result = self.follow_arms_joint_trajectory(plan.joint_trajectory) + return PlanningAndExecutionResult(planning_result, execution_result) def plan_to_pose(self, group_name, ee_link_name, target_pose, frame_id: str = 'robot_root'): self.check_inputs(group_name, ee_link_name) @@ -146,9 +187,9 @@ def plan_to_pose(self, group_name, ee_link_name, target_pose, frame_id: str = 'r move_group.set_goal_position_tolerance(0.002) move_group.set_goal_orientation_tolerance(0.02) - plan = move_group.plan()[1] - result = self.follow_arms_joint_trajectory(plan.joint_trajectory) - return result + planning_result = PlanningResult(move_group.plan()) + execution_result = self.follow_arms_joint_trajectory(planning_result.plan.joint_trajectory) + return PlanningAndExecutionResult(planning_result, execution_result) def get_link_pose(self, group_name: str, link_name: str): self.check_inputs(group_name, link_name) @@ -160,8 +201,9 @@ def get_link_pose(self, group_name: str, link_name: str): def plan_to_joint_config(self, group_name: str, joint_config): move_group = self.get_move_group_commander(group_name) move_group.set_joint_value_target(list(joint_config)) - plan = move_group.plan()[1] - return self.follow_arms_joint_trajectory(plan.joint_trajectory) + planning_result = PlanningResult(move_group.plan()) + execution_result = self.follow_arms_joint_trajectory(planning_result.plan.joint_trajectory) + return PlanningAndExecutionResult(planning_result, execution_result) def make_follow_joint_trajectory_goal(self, trajectory) -> FollowJointTrajectoryGoal: return make_follow_joint_trajectory_goal(trajectory) @@ -174,7 +216,10 @@ def follow_joint_trajectory(self, rospy.logdebug(f"ignoring empty trajectory") result = FollowJointTrajectoryResult() result.error_code = FollowJointTrajectoryResult.SUCCESSFUL - return trajectory, result, client.get_state() + return ExecutionResult(trajectory=trajectory, + execution_result=result, + action_client_state=client.get_state(), + success=True) rospy.logdebug(f"sending trajectory goal with f{len(trajectory.points)} points") result: Optional[FollowJointTrajectoryResult] = None @@ -191,11 +236,14 @@ def _feedback_cb(feedback: FollowJointTrajectoryFeedback): if self.block: client.wait_for_result() result = client.get_result() - return trajectory, result, client.get_state() + return ExecutionResult(trajectory=trajectory, + execution_result=result, + action_client_state=client.get_state(), + success=(result.error_code == FollowJointTrajectoryResult.SUCCESSFUL)) def follow_joint_config(self, joint_names: List[str], joint_positions, client: SimpleActionClient): trajectory = JointTrajectory(joint_names=joint_names) - point = JointTrajectoryPoint(time_from_start=rospy.Duration(1.0), positions=joint_positions) + point = JointTrajectoryPoint(time_from_start=rospy.Duration(secs=1), positions=joint_positions) trajectory.points.append(point) return self.follow_joint_trajectory(trajectory, client) @@ -246,9 +294,12 @@ def follow_jacobian_to_position(self, for k in tool_names: if k not in self.stored_tool_orientations: rospy.logerr(f"tool {k} has no stored orientation. aborting.") - return + return ExecutionResult(trajectory=None, + execution_result=None, + action_client_state=self.arms_client.get_state(), + success=False) preferred_tool_orientations.append(self.stored_tool_orientations[k]) - robot_trajectory_msg: moveit_commander.RobotTrajectory = self.jacobian_follower.plan( + robot_trajectory_msg: RobotTrajectory = self.jacobian_follower.plan( group_name=group_name, tool_names=tool_names, preferred_tool_orientations=preferred_tool_orientations, @@ -256,7 +307,10 @@ def follow_jacobian_to_position(self, max_velocity_scaling_factor=vel_scaling, max_acceleration_scaling_factor=0.1, ) - return self.follow_arms_joint_trajectory(robot_trajectory_msg.joint_trajectory, stop_condition=stop_condition) + planning_result = PlanningResult(success=True, plan=robot_trajectory_msg) + execution_result = self.follow_arms_joint_trajectory(robot_trajectory_msg.joint_trajectory, + stop_condition=stop_condition) + return PlanningAndExecutionResult(planning_result, execution_result) def get_both_arm_joints(self): return self.get_left_arm_joints() + self.get_right_arm_joints()