Source code for robotblockset.ros.joint_trajectory_interface

"""ROS joint trajectory action helpers.

This module provides helpers for ROS FollowJointTrajectory action clients used
by RobotBlockSet robot interfaces. It manages trajectory points, goal sending,
status monitoring, and result handling for joint-space motion commands.

based on: http://sdk.rethinkrobotics.com/wiki/Simple_Joint_trajectory_example

Copyright (c) 2024- Jozef Stefan Institute

Authors: Leon Zlajpah, Mihael Simonic.
"""

from typing import Any, Callable, Optional

from copy import copy

import rospy
import time
import actionlib

from control_msgs.msg import FollowJointTrajectoryAction, FollowJointTrajectoryGoal, JointTrajectoryControllerState

from actionlib_msgs.msg import GoalStatusArray, GoalStatus, GoalID
from trajectory_msgs.msg import (
    JointTrajectoryPoint,
)

from robotblockset.rbs_typing import JointAccelerationType, JointConfigurationType, JointVelocityType


[docs] class JointTrajectory(object): """ Helper for ROS ``FollowJointTrajectory`` action clients. Attributes ---------- robot : Any Robot object passed to the motion-check callback. generic_state : GoalStatus Reusable goal-status message used for status comparisons. state : int Cached action state. READY : int Readiness flag indicating whether a new trajectory can be sent. """
[docs] def __init__(self, controller_ns: str, do_motion_check: bool = False, motion_check_callback: Optional[Callable[[Any], bool]] = None) -> None: """ Initialize a ROS joint-trajectory action client. Parameters ---------- controller_ns : str Controller namespace used to resolve action and status topics. do_motion_check : bool, optional If ``True``, use the feedback callback to monitor motion execution. motion_check_callback : callable, optional Callback used to decide whether motion should be interrupted. Notes ----- The constructor subscribes to the controller status topic, creates the action client, queries the joint names from ROS parameters, and resets the internal goal state. """ self._do_motion_check = do_motion_check self._motion_check_callback = motion_check_callback self.robot = None self.generic_state = GoalStatus() self.status_listener = rospy.Subscriber("/%s/position_joint_trajectory_controller/follow_joint_trajectory/status" % controller_ns, GoalStatusArray, self.handle_status_callback) time.sleep(0.2) self.state = self.generic_state.SUCCEEDED # 3 means ready self.READY = 0 self._client = actionlib.SimpleActionClient("/%s/position_joint_trajectory_controller/follow_joint_trajectory" % controller_ns, FollowJointTrajectoryAction) self._goal = FollowJointTrajectoryGoal() self._goal_time_tolerance = rospy.Time(0.5) self._goal.goal_time_tolerance = self._goal_time_tolerance server_up = self._client.wait_for_server(timeout=rospy.Duration(0.1)) # if not server_up: # rospy.logerr("Timed out waiting for Joint Trajectory" # " Action Server to connect. Start the action server" # " before running example.") # rospy.signal_shutdown("Timed out waiting for Action Server") # sys.exit(1) self._joint_names = list(rospy.get_param("/%s/position_joint_trajectory_controller/joints" % controller_ns)) self.clear()
[docs] def handle_status_callback(self, data: GoalStatusArray) -> None: """ Update controller readiness from the latest action-status message. Parameters ---------- data : GoalStatusArray Action-status message received from the controller. Notes ----- The helper marks the controller as ready only when all reported goal statuses are in the ``SUCCEEDED`` state. """ self.READY = 1 # Get all statuses. If any of them is not SUCCEEDED, forbid sending goals until all of them are succeeded. COULD BE BUGGY. statuses = data.status_list for st in statuses: state = int(st.status) if state != self.generic_state.SUCCEEDED: self.READY = 0
# status = data.status_list[0].goal_id
[docs] def add_point(self, positions: JointConfigurationType, velocities: JointVelocityType, accelerations: Optional[JointAccelerationType] = None, delta_time: float = 0.01) -> None: """ Append a trajectory point to the active goal. Parameters ---------- positions : JointConfigurationType Joint positions of the trajectory point. velocities : JointVelocityType Joint velocities of the trajectory point. accelerations : JointAccelerationType, optional Joint accelerations of the trajectory point. delta_time : float, optional Time offset from the previous point in seconds. """ self._point_msg.positions = positions if velocities is not None: self._point_msg.velocities = velocities if accelerations is not None: 0 # self._point_msg.accelerations = accelerations self._time_sum += delta_time self._point_msg.time_from_start = rospy.Duration(self._time_sum) self._goal.trajectory.points.append(copy(self._point_msg))
[docs] def start(self, max_wait_until_controller_ready: float = 10) -> None: """ Send the currently prepared trajectory goal. Parameters ---------- max_wait_until_controller_ready : float, optional Maximum time to wait for the controller to become ready before the goal is sent. Notes ----- The trajectory header timestamp is forced to zero as a workaround for controllers that may drop the first points when they are stamped in the near future. """ start_t = time.time() while self.READY != 1: t = time.time() if (t - start_t) > max_wait_until_controller_ready: raise Exception("Joint Traj controller - cant run trajectory, controller was not ready within the timeout period {} s.".format(max_wait_until_controller_ready)) # HACK: workaround for dropping first X points out of X as they occur before the current time self._goal.trajectory.header.stamp = rospy.Duration(0) # rospy.Time.now()+rospy.Duration(0.7) if not self._do_motion_check: self._client.send_goal(self._goal) else: print("using callback") self._client.send_goal(self._goal, feedback_cb=self.callback_wrapper)
[docs] def callback_wrapper(self, gh: Any) -> None: """ Wrap the user motion-check callback for action feedback handling. Parameters ---------- gh : Any Action feedback payload forwarded by the ROS action client. Notes ----- The feedback object is not used directly. The wrapper forwards the stored robot instance to ``_motion_check_callback``. """ if self._motion_check_callback(self.robot): self.stop() else: print("ok")
[docs] def stop(self) -> None: """ Cancel the currently active trajectory goal. """ print("Stop") self._client.cancel_goal()
[docs] def wait(self, timeout: float = 10.0) -> None: """ Wait until the active trajectory goal finishes or times out. Parameters ---------- timeout : float, optional Maximum time to wait in seconds. """ self._client.wait_for_result(timeout=rospy.Duration(timeout + 0.4))
# def wait(self, timeout=10.0, refresh_dt = 0.05): # """An attempt to make a non-blocking wait but i guess it's blocking anyway. """ # # #self._client.wait_for_result(timeout=rospy.Duration(timeout+0.4)) # ct = time.time() # last_upd = time.time() # while self._client.get_state() not in [3,9]: # #print(self._client.get_state()) # time.sleep(refresh_dt)
[docs] def result(self) -> Any: """ Return the action result for the current trajectory goal. Returns ------- Any Result object returned by the ROS action client. """ return self._client.get_result()
[docs] def clear(self) -> None: """ Reset the current trajectory goal and point accumulator. Notes ----- The method creates a fresh goal, restores the joint-name list, resets the time tolerance, and clears the accumulated trajectory duration. """ self._goal = FollowJointTrajectoryGoal() self._goal.trajectory.joint_names = self._joint_names self._point_msg = JointTrajectoryPoint() self._goal.goal_time_tolerance = self._goal_time_tolerance self._time_sum = 0