"""ROS 2 controller interfaces and helpers.
This module defines ROS 2-backed controller wrappers used by RobotBlockSet
robot interfaces. It provides controller-manager helpers, joint-trajectory
interfaces, and Cartesian impedance command interfaces through ROS 2 topics,
actions, and services.
Copyright (c) 2024- Jozef Stefan Institute
Authors: Leon Zlajpah, Mihael Simonic.
"""
from __future__ import annotations
# pyright: reportMissingImports=false
from abc import abstractmethod
from typing import Optional, Iterable, List, Any
try:
import rclpy
from rclpy.node import Node
from rclpy.task import Future
from rclpy.action import ActionClient
except Exception as e:
raise e from RuntimeError("ROS2 rclpy not installed.\nYou can install rclpy with commands:\n sudo apt update\nsudo apt install ros-<ros-distro>-rclpy")
try:
from builtin_interfaces.msg import Duration
from controller_manager_msgs.srv import ListControllers, SwitchController, LoadController, UnloadController, ConfigureController
from controller_manager_msgs.msg import ControllerState
from control_msgs.action import FollowJointTrajectory
from action_msgs.msg import GoalStatus
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
except Exception as e:
raise e from RuntimeError("Problems with importin ROS2 messages. Check if all are installed.")
from robotblockset.tools import rbs_object, matrix, vector, isvector, ismatrix, isscalar, rbs_type
from robotblockset.robots import robot, MotionResultCodes, CommandModeCodes
from robotblockset.rbs_typing import ArrayLike, JointConfigurationType, JointPathType, JointVelocityType, Pose3DType, TimesType, Velocity3DType, WrenchType, JointTorqueType
import numpy as np
from time import monotonic, sleep
try:
from compliant_controllers_msgs.msg import CartesianCommand
except ImportError:
CartesianCommand = None
[docs]
class controller_manager_helper:
"""
Helper for ROS 2 controller-manager lifecycle operations.
High-level lifecycle transitions:
load(name) -> ensure controller plugin instance exists (UNCONFIGURED)
configure(name) -> UNCONFIGURED -> INACTIVE
activate(name) -> INACTIVE -> ACTIVE (via switch_controllers)
deactivate(name) -> ACTIVE -> INACTIVE
unload(name) -> remove controller instance (must not be ACTIVE)
Legacy wrappers:
load_controller(name) -> load + configure
start_controller(name) -> activate
stop_controller(name) -> deactivate
switch_controllers(start, stop) -> activate/deactivate
Utilities:
list_controllers(), get_state(name), is_active(name)
Attributes
----------
cm_name : str
Name of the controller-manager namespace.
list_client : Any
Service client for listing controllers.
switch_client : Any
Service client for switching controllers.
load_client : Any
Service client for loading controllers.
unload_client : Any
Service client for unloading controllers.
configure_client : Any
Service client for configuring controllers.
"""
[docs]
def __init__(self, node: Node, namespace: str = "", controller_manager_name: Optional[str] = "controller_manager") -> None:
"""
Initialize the controller-manager helper.
Parameters
----------
node : Node
The ROS 2 node used to create service clients. The caller is
responsible for spinning this node.
namespace : str, optional
Namespace for the controller manager.
controller_manager_name : str, optional
Name of the ROS 2 controller manager.
"""
self._node = node
if namespace is None or namespace == "":
self._namespace = ""
else:
self._namespace = "/" + str(namespace).strip("/")
self.cm_name = f"{self._namespace}/{controller_manager_name('/')}"
# Core service clients
self.list_client = self._node.create_client(ListControllers, f"{self.cm_name}/list_controllers")
self.switch_client = self._node.create_client(SwitchController, f"{self.cm_name}/switch_controller")
self.load_client = self._node.create_client(LoadController, f"{self.cm_name}/load_controller")
self.unload_client = self._node.create_client(UnloadController, f"{self.cm_name}/unload_controller")
self.configure_client = self._node.create_client(ConfigureController, f"{self.cm_name}/configure_controller")
self._node.get_logger().info("Waiting for controller_manager lifecycle services ...")
for c in [self.list_client, self.switch_client, self.load_client, self.unload_client, self.configure_client]:
c.wait_for_service()
self._node.get_logger().info("Lifecycle helper connected.")
# ----------------------------- Query utilities -----------------------------
[docs]
def list_controllers(self) -> List[ControllerState]:
"""
Query the controller_manager for all available controllers.
Returns
-------
list of controller_manager_msgs.msg.ControllerState
A list of controller state objects. If the service call fails,
an empty list is returned and an error is logged.
"""
req = ListControllers.Request()
result = self.list_client.call(req)
if result is not None:
return result.controller
self._node.get_logger().error("Failed to list controllers")
return []
[docs]
def get_state(self, name: str) -> Optional[str]:
"""
Get the lifecycle state of a specific controller.
Parameters
----------
name : str
Name of the controller.
Returns
-------
str or None
The controller state string (e.g. ``"active"``, ``"inactive"``),
or ``None`` if the controller is not found.
"""
for c in self.list_controllers():
if c.name == name:
return c.state
return None
[docs]
def is_active(self, name: str) -> bool:
"""
Check whether a controller is in the ``"active"`` state.
Parameters
----------
name : str
Name of the controller.
Returns
-------
bool
``True`` if the controller exists and is active, otherwise ``False``.
"""
return self.get_state(name) == "active"
def _wait_for_state(self, name: str, states: Iterable[Optional[str]], timeout: float = 2.0, period: float = 0.05) -> Optional[str]:
"""
Wait briefly for controller-manager state to match one of ``states``.
ROS 2 controller-manager lifecycle service replies can be visible before
``list_controllers`` has settled, and some managers may complete the
requested transition even when the immediate response is not successful
from the client's point of view. The helper methods are intentionally
idempotent, so confirming the eventual state avoids spurious first-call
failures during strategy switches.
"""
states = set(states)
deadline = monotonic() + timeout
while True:
state = self.get_state(name)
if state in states:
return state
if monotonic() >= deadline:
return state
sleep(period)
# ----------------------------- Lifecycle ops -----------------------------
[docs]
def load(self, name: str) -> bool:
"""
Load a controller by name.
This requests that the controller_manager loads the specified controller
plugin so that it can be configured and activated later.
Parameters
----------
name : str
Name of the controller to load.
Returns
-------
bool
``True`` if the controller is already loaded or was loaded successfully,
``False`` if the load request failed.
"""
state = self.get_state(name)
if state is not None:
return True # already loaded
req = LoadController.Request()
req.name = name
result = self.load_client.call(req)
ok = bool(result and result.ok)
state = self._wait_for_state(name, states=["unconfigured", "inactive", "active"])
if state is not None:
return True
if not ok:
self._node.get_logger().error(f"Load failed: {name}")
return ok
[docs]
def activate(self, name: str) -> bool:
"""
Activate a controller.
Ensures the controller is loaded and configured, then requests the
controller_manager to start (activate) it.
Parameters
----------
name : str
Name of the controller to activate.
Returns
-------
bool
``True`` if the controller is already active or was activated successfully,
``False`` otherwise.
"""
if self.is_active(name):
return True
if not self.configure(name):
return False
req = SwitchController.Request()
req.start_controllers = [name]
req.stop_controllers = []
req.strictness = SwitchController.Request.STRICT
req.start_asap = False
req.activate_asap = True
req.timeout = Duration(sec=5)
result = self.switch_client.call(req)
ok = bool(result and result.ok)
if self._wait_for_state(name, states=["active"]) == "active":
return True
if not ok:
self._node.get_logger().error(f"Activate failed: {name}")
return False
[docs]
def deactivate(self, name: str) -> bool:
"""
Deactivate (stop) a controller.
Parameters
----------
name : str
Name of the controller to deactivate.
Returns
-------
bool
``True`` if the controller is already not active or was deactivated successfully,
``False`` otherwise.
"""
if self.get_state(name) != "active":
return True # already not active
req = SwitchController.Request()
req.start_controllers = []
req.stop_controllers = [name]
req.strictness = SwitchController.Request.STRICT
req.start_asap = False
req.activate_asap = False
req.timeout = Duration(sec=5)
result = self.switch_client.call(req)
ok = bool(result and result.ok)
if not ok:
self._node.get_logger().error(f"Deactivate failed: {name}")
return ok
[docs]
def unload(self, name: str) -> bool:
"""
Unload a controller.
The controller must not be active when unloading. If it is active,
this function first attempts to deactivate it.
Parameters
----------
name : str
Name of the controller to unload.
Returns
-------
bool
``True`` if the controller is already absent or was unloaded successfully,
``False`` if deactivation or unload failed.
"""
# Must be inactive to unload
state = self.get_state(name)
if state is None:
return True # already gone
if state == "active" and not self.deactivate(name):
return False
req = UnloadController.Request()
req.name = name
result = self.unload_client.call(req)
ok = bool(result and result.ok)
if not ok:
self._node.get_logger().error(f"Unload failed: {name}")
return ok
# Backward compatibility wrappers
[docs]
def start_controller(self, name: str) -> bool:
"""
Backwards-compatible wrapper to start (activate) a controller.
Parameters
----------
name : str
Name of the controller to start.
Returns
-------
bool
``True`` if activation succeeded or the controller is already active,
``False`` otherwise.
"""
return self.activate(name)
[docs]
def load_controller(self, name: str) -> bool:
"""
Backwards-compatible wrapper to load and configure a controller.
Parameters
----------
name : str
Name of the controller to load and configure.
Returns
-------
bool
``True`` if the controller is loaded and configured, ``False`` otherwise.
"""
return self.load(name) and self.configure(name)
[docs]
def stop_controller(self, name: str) -> bool:
"""
Backwards-compatible wrapper to stop (deactivate) a controller.
Parameters
----------
name : str
Name of the controller to stop.
Returns
-------
bool
``True`` if the controller is not active or was deactivated successfully,
``False`` otherwise.
"""
return self.deactivate(name)
[docs]
def switch_controllers(self, start_list: Iterable[str], stop_list: Iterable[str]) -> bool:
"""
Switch sets of controllers in a single operation.
Parameters
----------
start_list : Iterable[str]
Names of controllers to activate.
stop_list : Iterable[str]
Names of controllers to deactivate.
Returns
-------
bool
``True`` if the switch request succeeded, ``False`` otherwise.
"""
start_list = list(start_list)
stop_list = list(stop_list)
req = SwitchController.Request()
req.activate_controllers = start_list
req.deactivate_controllers = stop_list
req.strictness = SwitchController.Request.BEST_EFFORT
req.start_asap = False
req.activate_asap = True
req.timeout = Duration(sec=5)
result = self.switch_client.call(req)
ok = bool(result and result.ok)
if start_list or stop_list:
deadline = monotonic() + 2.0
while True:
controllers = {c.name: c.state for c in self.list_controllers()}
started = all(controllers.get(name) == "active" for name in start_list)
stopped = all(controllers.get(name) != "active" for name in stop_list)
if started and stopped:
return True
if monotonic() >= deadline:
break
sleep(0.05)
return ok
[docs]
def get_active_controller(self, controller_names: Optional[Iterable[str]] = None) -> Optional[str]:
"""
Get the name of the first active controller from a candidate list.
Parameters
----------
controller_names : Iterable[str], optional
Iterable of controller names to check. If ``None``, all known controllers
are considered.
Returns
-------
str or None
Name of the first active controller from the list, or ``None`` if none
of them are active.
"""
# Use provided list of controller names or call list_controllers() to get all controllers
if controller_names is None:
controller_names = [c.name for c in self.list_controllers()]
# Get the full list of controllers
controllers = self.list_controllers()
# Check for active controller in the provided list of names
for c in controllers:
if c.name in controller_names and c.state == "active":
return c.name # Return the name of the first active controller
# If no active controller is found in the provided list, return None
return None
def _shutdown(self) -> None:
"""
Gracefully destroy the service clients owned by this helper.
The underlying node is owned by the caller and is not destroyed here.
Returns
-------
None
"""
# Destroy service clients only; the node is owned by the caller.
for attr in [
"list_client",
"switch_client",
"load_client",
"unload_client",
"configure_client",
]:
if hasattr(self, attr):
try:
client = getattr(self, attr)
if client is not None:
client.destroy()
except Exception:
pass
# Controllers
[docs]
class RosControllerInterface(rbs_object):
"""
Generic interface for a ROS2-based controller.
This class defines a skeleton for ROS2 controller interfaces used within
Robotics Behavior Specification (RBS) systems. Derived classes must implement
the `_init_ros_interfaces` method to create all ROS-specific communication
elements such as publishers, subscribers, actions, or services.
Attributes
----------
motion_result : int | None
Last motion result code reported by the controller interface.
motion_done : bool | None
Completion flag for the active motion command.
_robot : robot
RobotBlockSet robot instance bound during activation.
_node : Node
ROS 2 node used to create communication entities.
"""
[docs]
def __init__(self) -> None:
"""
Initialize a `RosControllerInterface` instance.
Notes
-----
- This method only initializes the `rbs_object` base class.
- ROS interfaces are initialized later via `Activate()`.
"""
rbs_object.__init__(self)
self.motion_result = None
self.motion_done = None
[docs]
def Activate(self, robot: robot, node: Node) -> None:
"""
Activate the controller using the specified robot and ROS node.
This method initializes the controller with data from the robot, registers
ROS interfaces, and marks the controller as active.
Parameters
----------
robot : robot
The robot instance providing joint and state data.
node : Node
The ROS2 node used to create publishers, subscribers, or action clients.
Returns
-------
None
Notes
-----
- This method should be called before issuing any control commands.
- Derived classes typically call `_init_ros_interfaces()` to build ROS2 entities.
"""
self._robot = robot
self._node = node
self.nj = robot.nj
self._init_ros_interfaces()
self._registered = True
self._last_control_time = robot._last_control_time
self._node.get_logger().info("Activated RosControllerInterface")
# Verbosity level is proxied to the robot so changes propagate automatically after Activate()
# integers are immutable so setting self._verbose = robot._verbose won't keep them in sync
@property
def _verbose(self) -> int:
"""Return the effective verbosity level, proxied from the active robot when available."""
if hasattr(self, "_robot") and self._robot is not None:
return getattr(self._robot, "_verbose", 1)
return getattr(self, "__local_verbose", 1)
@_verbose.setter
def _verbose(self, value: int) -> None:
"""Set the effective verbosity level on the robot or locally before activation."""
if hasattr(self, "_robot") and self._robot is not None:
self._robot._verbose = value
else:
self.__local_verbose = value
@abstractmethod
def _init_ros_interfaces(self) -> None:
"""
Initialize ROS2 communication interfaces.
This method must be implemented by derived classes to set up all ROS
interfaces needed by the controller, such as:
- publishers
- subscribers
- action clients
- service clients
Returns
-------
None
Raises
------
NotImplementedError
If not implemented by the subclass.
"""
pass
[docs]
class CsfCartesianImpedanceControllerInterface(RosControllerInterface):
"""
Interface for a Cartesian impedance controller.
Attributes
----------
Kp : np.ndarray
Cartesian translational stiffness vector.
Kr : np.ndarray
Cartesian rotational stiffness vector.
R : np.ndarray
Rotation matrix used to orient the translational stiffness frame.
D : float
Damping scaling factor.
"""
[docs]
def __init__(
self,
robot: Optional[robot] = None,
ros_plugin_name: str = "cartesian_impedance_controller",
namespace: str = "",
topic: str = "cartesian_command",
Kp: np.ndarray = np.array([2000.0, 2000.0, 2000.0]),
Kr: np.ndarray = np.array([30.0, 30.0, 30.0]),
R: np.ndarray = np.eye(3),
D: float = 2.0,
command_frame: str = "",
) -> None:
"""
Initialize the Cartesian impedance controller interface.
Parameters
----------
robot : robot, optional
Reserved robot argument kept for compatibility.
ros_plugin_name : str, optional
Name of the ROS 2 controller plugin.
namespace : str, optional
Namespace used for the controller topics.
topic : str, optional
Cartesian command topic name.
Kp : np.ndarray, optional
Translational stiffness gains.
Kr : np.ndarray, optional
Rotational stiffness gains.
R : np.ndarray, optional
Rotation matrix of the translational stiffness frame.
D : float, optional
Damping scaling factor.
"""
RosControllerInterface.__init__(self)
if CartesianCommand is None:
raise ImportError("compliant_controllers_msgs package not found. CsfCartesianImpedanceControllerInterface will not be available.")
self._ros_plugin_name = ros_plugin_name
self.Name = "CsfCartesianImpedanceControllerInterface"
self._namespace = namespace
self._topic = topic
self.Kp = vector(Kp, dim=3)
self.Kr = vector(Kr, dim=3)
self.R = matrix(R, shape=(3, 3))
self.D = float(D)
self._default_Kp = vector(Kp, dim=3)
self._default_Kr = vector(Kr, dim=3)
self._default_R = matrix(R, shape=(3, 3))
self._default_D = float(D)
self.command_frame = command_frame
self._registered = False
def _normalise_compliance(
self,
Kp: Optional[ArrayLike] = None,
Kr: Optional[ArrayLike] = None,
R: Optional[ArrayLike] = None,
D: Optional[float] = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray, float]:
"""Return validated Cartesian impedance parameters."""
if Kp is None:
Kp = self.Kp
elif isscalar(Kp):
Kp = np.ones(3) * Kp
else:
Kp = vector(Kp, dim=3)
if Kr is None:
Kr = self.Kr
elif isscalar(Kr):
Kr = np.ones(3) * Kr
else:
Kr = vector(Kr, dim=3)
if R is None:
R = self.R
else:
R = matrix(R, shape=(3, 3))
if D is None:
D = self.D
elif not isscalar(D):
raise ValueError("Damping 'D' is not scalar")
D = float(D)
if D < 0 or D > 2:
raise ValueError("Damping 'D' must be in range [0, 2]")
return np.asarray(Kp, dtype=float), np.asarray(Kr, dtype=float), np.asarray(R, dtype=float), D
[docs]
def GetCartesianCompliance(self) -> tuple[np.ndarray, np.ndarray, np.ndarray, float]:
"""Return the active Cartesian impedance parameters."""
return self.Kp, self.Kr, self.R, self.D
[docs]
def GetCartesianStiffness(self) -> tuple[np.ndarray, np.ndarray]:
"""Return translational and rotational Cartesian stiffness."""
return self.Kp, self.Kr
[docs]
def GetCartesianDamping(self) -> float:
"""Return the Cartesian damping scaling factor."""
return self.D
[docs]
def SetCartesianCompliance(
self,
Kp: Optional[ArrayLike] = None,
Kr: Optional[ArrayLike] = None,
R: Optional[ArrayLike] = None,
D: Optional[float] = None,
hold_pose: bool = True,
) -> int:
"""Update Cartesian impedance parameters for subsequent CSF commands."""
Kp, Kr, R, D = self._normalise_compliance(Kp=Kp, Kr=Kr, R=R, D=D)
self.Kp = Kp
self.Kr = Kr
self.R = R
self.D = D
if hold_pose and getattr(self, "_registered", False) and getattr(self, "_robot", None) is not None:
x = getattr(self._robot._command, "x", None)
if x is not None:
trq = getattr(self._robot._command, "FT", None)
if trq is None:
trq = np.zeros(6)
self.GoTo_X(x, np.zeros(6), trq, 0.0, Kp=Kp, Kr=Kr, R=R, D=D)
return MotionResultCodes.MOTION_SUCCESS.value
[docs]
def SetCartesianStiffness(self, Kp: Optional[ArrayLike] = None, Kr: Optional[ArrayLike] = None, hold_pose: bool = True) -> int:
"""Update Cartesian stiffness while preserving orientation frame and damping."""
return self.SetCartesianCompliance(Kp=Kp, Kr=Kr, R=self.R, D=self.D, hold_pose=hold_pose)
[docs]
def SetCartesianDamping(self, D: Optional[float] = None, hold_pose: bool = True) -> int:
"""Update Cartesian damping while preserving stiffness."""
return self.SetCartesianCompliance(Kp=self.Kp, Kr=self.Kr, R=self.R, D=D, hold_pose=hold_pose)
[docs]
def SetCartesianSoft(self, stiffness: ArrayLike, hold_pose: bool = True) -> int:
"""Scale Cartesian stiffness relative to this controller's configured defaults."""
if isscalar(stiffness):
fac_p = np.ones(3) * stiffness
fac_r = fac_p
elif isvector(stiffness, dim=3):
fac_p = vector(stiffness, dim=3)
fac_r = fac_p
else:
fac = vector(stiffness, dim=6)
fac_p = fac[:3]
fac_r = fac[3:]
fac_p = np.clip(fac_p, 0.0, 1.0)
fac_r = np.clip(fac_r, 0.0, 1.0)
fac = float(np.max(np.hstack((fac_p, fac_r))))
return self.SetCartesianCompliance(
Kp=self._default_Kp * fac_p,
Kr=self._default_Kr * fac_r,
R=self._default_R,
D=self._default_D * fac,
hold_pose=hold_pose,
)
[docs]
def SetCartesianStiff(self, hold_pose: bool = True) -> int:
"""Set Cartesian impedance to the configured stiff defaults."""
return self.SetCartesianSoft(1.0, hold_pose=hold_pose)
[docs]
def SetCartesianCompliant(self, hold_pose: bool = True) -> int:
"""Set Cartesian impedance to zero stiffness."""
return self.SetCartesianSoft(0.0, hold_pose=hold_pose)
def _init_ros_interfaces(self) -> None:
"""Create ROS 2 publishers used by the Cartesian impedance controller."""
self._cartesian_command_publisher = self._node.create_publisher(CartesianCommand, f"{self._namespace}/{str(self._topic).strip('/')}", 1)
def _unregister_interface(self) -> None:
"""Destroy ROS 2 entities owned by the Cartesian impedance controller."""
if hasattr(self, "_cartesian_command_publisher"):
try:
self._node.destroy_publisher(self._cartesian_command_publisher)
except Exception:
pass
del self._cartesian_command_publisher
self._registered = False
self._robot = None
[docs]
def GoTo_X(
self,
x: Pose3DType,
xdot: Velocity3DType,
trq: WrenchType,
wait: TimesType,
Kp: Optional[ArrayLike] = None,
Kr: Optional[ArrayLike] = None,
R: Optional[ArrayLike] = None,
D: Optional[float] = None,
**kwargs: Any,
) -> int:
"""
Publish a Cartesian impedance command.
Parameters
----------
x : Pose3DType
Desired task pose represented as position and quaternion.
xdot : Velocity3DType
Desired task twist.
trq : WrenchType
Desired feed-forward wrench.
wait : TimesType
Kept for API compatibility; not used by the ROS 2 publisher.
Kp : ArrayLike, optional
Translational stiffness vector.
Kr : ArrayLike, optional
Rotational stiffness vector.
R : ArrayLike, optional
Rotation matrix of the translational stiffness frame.
D : float, optional
Damping scaling factor.
Returns
-------
int
Motion result code.
Notes
-----
The method updates the stored compliance parameters with the values
used for the published command.
"""
Kp, Kr, R, D = self._normalise_compliance(Kp=Kp, Kr=Kr, R=R, D=D)
command_x = np.asarray(x, dtype=float)
command_xdot = np.asarray(xdot, dtype=float)
command_trq = np.asarray(trq, dtype=float)
# Build CartesianCommand message
cmd = CartesianCommand()
cmd.header.stamp = self._node.get_clock().now().to_msg()
cmd.header.frame_id = self.command_frame
# Pose
cmd.pose.position.x = command_x[0]
cmd.pose.position.y = command_x[1]
cmd.pose.position.z = command_x[2]
cmd.pose.orientation.x = command_x[4]
cmd.pose.orientation.y = command_x[5]
cmd.pose.orientation.z = command_x[6]
cmd.pose.orientation.w = command_x[3]
# Twist
cmd.velocity.linear.x = command_xdot[0]
cmd.velocity.linear.y = command_xdot[1]
cmd.velocity.linear.z = command_xdot[2]
cmd.velocity.angular.x = command_xdot[3]
cmd.velocity.angular.y = command_xdot[4]
cmd.velocity.angular.z = command_xdot[5]
# Wrench
cmd.wrench_ff.force.x = command_trq[0]
cmd.wrench_ff.force.y = command_trq[1]
cmd.wrench_ff.force.z = command_trq[2]
cmd.wrench_ff.torque.x = command_trq[3]
cmd.wrench_ff.torque.y = command_trq[4]
cmd.wrench_ff.torque.z = command_trq[5]
# Stiffness and damping
trM = np.diag(Kp)
rotM = np.diag(Kr)
trK = R @ trM @ R.T
rotK = rotM
trD = R @ (2 * np.sqrt(trM)) @ R.T
rotD = D * np.sqrt(rotM)
# Check if any is NaN
if np.isnan(trM).any() or np.isnan(rotM).any() or np.isnan(trK).any() or np.isnan(rotK).any() or np.isnan(trD).any() or np.isnan(rotD).any():
raise Exception("%s: GoTo_x: trM or rotM or trK or rotK or trD or rotD - NaN error" % self.Name)
cmd.stiffness_pos = trK.flatten(order="C").tolist()
cmd.stiffness_ori = rotK.flatten(order="C").tolist()
cmd.damping_pos = trD.flatten(order="C").tolist()
cmd.damping_ori = rotD.flatten(order="C").tolist()
# Keep configured values for next call
self.Kp = Kp
self.Kr = Kr
self.R = R
self.D = D
# Publish command
self._cartesian_command_publisher.publish(cmd)
return MotionResultCodes.MOTION_SUCCESS.value
[docs]
class JointPositionControllerInterface(RosControllerInterface):
"""
Interface for a joint-position controller.
Attributes
----------
_namespace : str
Namespace used for the controller topics.
_ros_plugin_name : str
Name of the ROS 2 controller plugin.
_topic : str
Topic used to publish joint commands.
_use_publisher : bool
Indicates whether a publisher interface is active.
"""
[docs]
def __init__(self, namespace: str = "", ros_plugin_name: str = "command", topic: str = "joint_position") -> None:
"""
Initialize the joint-position controller interface.
This class provides a thin wrapper around a ROS 2 controller that
accepts single-step joint commands through a `trajectory_msgs/JointTrajectory`
topic. It is used for direct joint position setpoints without action
goal management.
Parameters
----------
namespace : str, optional
Namespace prefix for the controller (for example ``"arm"``).
If empty, no namespace prefix is used.
ros_plugin_name : str, optional
Name of the controller plugin (e.g. ``"command"``).
If empty or ``None``, the controller is assumed to be directly
under the namespace (no extra name component is added).
topic : str, optional
Topic name used when publishing joint-position commands.
Notes
-----
- Effective topic names are constructed as
``<namespace>/<ros_plugin_name>/<topic>`` or ``<namespace>/<topic>``.
- The actual ROS node is provided later through
:meth:`RosControllerInterface.Activate`.
"""
RosControllerInterface.__init__(self)
self.Name = "JointPositionControllerInterface"
self._namespace = namespace
if ros_plugin_name is None or ros_plugin_name == "":
self._ros_plugin_name = ""
else:
self._ros_plugin_name = str(ros_plugin_name).strip("/")
self._topic = str(topic).strip("/")
self._use_publisher = False
def _init_ros_interfaces(self) -> None:
"""
Initialize ROS2 communication interfaces for the controller.
Depending on configuration, this will either create:
- a `JointTrajectory` publisher used for single-point commands
The chosen interface is stored in:
- ``self.command_publisher`` if using topics.
Raises
------
ValueError
If topic name is not configured.
Notes
-----
- This method assumes that `self._node` has been set by
:meth:`RosControllerInterface.Activate`.
"""
self._use_publisher = True
if self._topic is not None:
if self._ros_plugin_name == "":
topic_full_name = f"{self._namespace}/{self._topic}"
else:
topic_full_name = f"{self._namespace}/{self._ros_plugin_name}/{self._topic}"
self._node.get_logger().info(f"Publishing to topic: {topic_full_name}")
self.command_publisher = self._node.create_publisher(JointTrajectory, topic_full_name, 10)
self._use_publisher = True
else:
raise ValueError("Topic for joint position controller must be defined.")
def _unregister_interface(self) -> None:
"""
Tear down ROS2 interfaces and reset controller bindings.
Destroys any created publishers or action clients and clears references
to the associated robot and command/state buffers. After this call,
the interface is no longer considered registered.
Returns
-------
None
Notes
-----
- This is typically called as part of a shutdown sequence or when
switching to a different controller interface.
- Any exceptions during destruction are caught and ignored.
"""
if hasattr(self, "command_publisher"):
try:
self._node.destroy_publisher(self.command_publisher)
except Exception:
pass
del self.command_publisher
self.motion_done = None
self._registered = False
self._robot = None
self._use_publisher = False
[docs]
def GoTo_q(self, q: JointConfigurationType, qdot: Optional[JointVelocityType] = None, trq: Optional[JointTorqueType] = None, wait: Optional[float] = None, **kwargs: Any) -> int:
"""
Publish a joint-position command.
Parameters
----------
q : JointConfigurationType
Desired joint positions (nj,).
qdot : JointVelocityType, optional
Desired joint velocities (nj,).
trq : JointTorqueType, optional
Desired joint torques (nj,).
wait : float, optional
Kept for API compatibility; not used by the publisher interface.
Returns
-------
int
Motion result code.
Raises
------
ValueError
If control is not implemented in controller.
"""
if self._use_publisher:
q = vector(q, dim=self.nj)
if qdot is None:
qdot = np.zeros(self.nj)
else:
qdot = vector(qdot, dim=self.nj)
if wait is None:
wait = float(getattr(self._robot, "tsamp", 0.01))
trajectory_msg = JointTrajectory()
trajectory_msg.joint_names = list(self._robot.joint_names)
point_msg = JointTrajectoryPoint()
point_msg.positions = q.astype(float).tolist()
point_msg.velocities = qdot.astype(float).tolist()
point_msg.time_from_start.sec = int(wait)
point_msg.time_from_start.nanosec = int((wait - int(wait)) * 1e9)
trajectory_msg.points.append(point_msg)
self.command_publisher.publish(trajectory_msg)
self.motion_done = True
self.motion_result = MotionResultCodes.MOTION_SUCCESS.value
self._robot._command.mode = CommandModeCodes.JOINT.value
self._robot._command.q = q
self._robot._command.qdot = qdot
x = self._robot.Kinmodel(q)[0]
self._robot._command.x = x
self._robot._command.v = np.zeros(6)
return MotionResultCodes.MOTION_SUCCESS.value
else:
raise ValueError("Joint position controller is not ready.")
[docs]
class JointTrajectoryControllerInterface(RosControllerInterface):
"""
Interface for a joint-trajectory controller.
Attributes
----------
_namespace : str
Namespace used for the controller topics and actions.
_ros_plugin_name : str
Name of the ROS 2 controller plugin.
_topic : str
Topic used for trajectory publishing.
_action : str
Action used for trajectory execution.
_use_publisher : bool
Indicates whether the publisher path is active.
_use_action : bool
Indicates whether the action-client path is active.
"""
[docs]
def __init__(self, namespace: str = "", ros_plugin_name: str = "joint_trajectory_controller", topic: str = "joint_trajectory", action: str = "follow_joint_trajectory") -> None:
"""
Initialize the joint-trajectory controller interface.
This class provides a thin wrapper around a ROS 2 controller manager
plugin exposing a `control_msgs/FollowJointTrajectory` action or a
`trajectory_msgs/JointTrajectory` topic. It can be configured to
either communicate via an action client or a plain publisher.
Parameters
----------
namespace : str, optional
Namespace prefix for the controller (for example ``"arm"``).
If empty, no namespace prefix is used.
ros_plugin_name : str, optional
Name of the controller plugin (e.g. ``"joint_trajectory_controller"``).
If empty or ``None``, the controller is assumed to be directly
under the namespace (no extra name component is added).
topic : str, optional
Topic name used when publishing ``JointTrajectory`` commands.
action : str, optional
Action name used when creating a ``FollowJointTrajectory`` action
client. If not ``None``, this path is preferred over publishing.
Notes
-----
- Effective topic/action names are constructed as:
- ``<namespace>/<ros_plugin_name>/<action>`` or ``<namespace>/<action>``
- ``<namespace>/<ros_plugin_name>/<topic>`` or ``<namespace>/<topic>``
- The actual ROS node is provided later through
:meth:`RosControllerInterface.Activate`.
"""
RosControllerInterface.__init__(self)
self.Name = "JointTrajectoryControllerInterface"
self._namespace = namespace
if ros_plugin_name is None or ros_plugin_name == "":
self._ros_plugin_name = ""
else:
self._ros_plugin_name = str(ros_plugin_name).strip("/")
self._topic = str(topic).strip("/")
self._action = str(action).strip("/")
self._use_publisher = False
self._use_action = True
def _init_ros_interfaces(self) -> None:
"""
Initialize ROS2 communication interfaces for the controller.
Depending on configuration, this will either create:
- a `FollowJointTrajectory` :class:`ActionClient` (preferred), or
- a `JointTrajectory` publisher, if `action` is ``None`` and `topic`
is provided.
The chosen interface is stored in:
- ``self.command_action_client`` if using actions.
- ``self.command_publisher`` if using topics.
Raises
------
ValueError
If neither an action name nor a topic name is configured.
Notes
-----
- This method assumes that `self._node` has been set by
:meth:`RosControllerInterface.Activate`.
- If the action server is not available within a short timeout,
a warning is logged and `_use_action` remains ``False``.
"""
self._use_publisher = False
self._use_action = False
# Prefer action client if configured
if self._action is not None:
if self._ros_plugin_name == "":
topic_full_name = f"{self._namespace}/{self._action}"
else:
topic_full_name = f"{self._namespace}/{self._ros_plugin_name}/{self._action}"
self._node.get_logger().info(f"Action topic: {topic_full_name}")
self.command_action_client: ActionClient = ActionClient(self._node, FollowJointTrajectory, topic_full_name)
self._node.get_logger().info(f"Waiting for '{topic_full_name}' action server ...")
if not self.command_action_client.wait_for_server(timeout_sec=2.0):
self.WarningMessage(f"Action {topic_full_name} not available")
else:
self._use_action = True
elif self._topic is not None:
if self._ros_plugin_name == "":
topic_full_name = f"{self._namespace}/{self._topic}"
else:
topic_full_name = f"{self._namespace}/{self._ros_plugin_name}/{self._topic}"
self._node.get_logger().info(f"Publishing to topic: {topic_full_name}")
self.command_publisher = self._node.create_publisher(JointTrajectory, topic_full_name, 10)
self._use_publisher = True
else:
raise ValueError("Topic or action for joint trajectory controller must be defined.")
def _unregister_interface(self) -> None:
"""
Tear down ROS2 interfaces and reset controller bindings.
Destroys any created publishers or action clients and clears references
to the associated robot and command/state buffers. After this call,
the interface is no longer considered registered.
Returns
-------
None
Notes
-----
- This is typically called as part of a shutdown sequence or when
switching to a different controller interface.
- Any exceptions during destruction are caught and ignored.
"""
if hasattr(self, "command_publisher"):
try:
self._node.destroy_publisher(self.command_publisher)
except Exception:
pass
del self.command_publisher
if hasattr(self, "command_action_client"):
try:
self._node.remove_waitable(self.command_action_client)
self.command_action_client.destroy()
except Exception:
pass
del self.command_action_client
self.motion_done = None
self._registered = False
self._robot = None
self._use_publisher = False
self._use_action = False
[docs]
def GoTo_qtraj(self, q: JointPathType, qdot: JointPathType, qddot: JointPathType, time: TimesType) -> int:
"""
Command the robot to follow a joint trajectory.
Parameters
----------
q : JointPathType
Desired joint positions for the trajectory (n, nj), where n is the number of trajectory points.
qdot : JointPathType
Desired joint velocities for the trajectory (n, nj), where n is the number of trajectory points.
qddot : JointPathType
Desired joint accelerations for the trajectory. The current
interfaces do not use these values.
time : TimesType
Time points for the trajectory (n,).
Returns
-------
int
Motion result code.
Notes
-----
The command is routed to either the publisher-based or action-based
trajectory backend depending on the active ROS 2 interface.
"""
if self._use_publisher:
return self.GoTo_qtraj_pub(q, qdot, qddot, rbs_type(time).flatten())
elif self._use_action:
return self.GoTo_qtraj_act(q, qdot, qddot, rbs_type(time).flatten())
else:
raise ValueError("Joint trajectory controller is not ready.")
[docs]
def GoTo_qtraj_pub(self, q: JointPathType, qdot: JointPathType, qddot: JointPathType, time: TimesType) -> int:
"""
Publish a joint trajectory through the ROS 2 topic interface.
Parameters
----------
q : JointPathType
Desired joint positions for the trajectory (n, nj), where n is the number of trajectory points.
qdot : JointPathType
Desired joint velocities for the trajectory (n, nj), where n is the number of trajectory points.
qddot : JointPathType
Desired joint accelerations for the trajectory. The publisher path
does not use these values.
time : TimesType
Time points for the trajectory (n,).
Returns
-------
int
Motion result code.
Notes
-----
On success, the final trajectory point is copied into the robot command
state and the motion is marked complete immediately after publishing.
"""
if isvector(q, dim=self.nj):
q = rbs_type(q).reshape((1, self.nj))
qdot = rbs_type(qdot).reshape((1, self.nj))
if np.isscalar(time):
time = np.array([float(time)])
if ismatrix(q, shape=self.nj) and ismatrix(qdot, shape=self.nj) and isvector(time, dim=q.shape[0]):
_trajectory_msg = JointTrajectory()
_trajectory_msg.joint_names = self._robot.joint_names
for qt, qdt, t in zip(q, qdot, time):
_point_msg = JointTrajectoryPoint()
_point_msg.positions = qt.astype(float).tolist()
_point_msg.velocities = qdt.astype(float).tolist()
_point_msg.time_from_start.sec = int(t)
_point_msg.time_from_start.nanosec = int((t - int(t)) * 1e9)
_trajectory_msg.points.append(_point_msg)
self.command_publisher.publish(_trajectory_msg)
self._robot._command.mode = CommandModeCodes.JOINT_TRAJ.value
self.motion_done = True
self._robot._command.q = q[-1]
self._robot._command.qdot = np.zeros(self.nj)
x = self._robot.Kinmodel(self._robot._command.q)[0]
self._robot._command.x = x
self._robot._command.v = np.zeros(6)
return MotionResultCodes.MOTION_SUCCESS.value
else:
raise ValueError("Invalid joint trajectory data")
[docs]
def GoTo_qtraj_act(self, q: JointPathType, qdot: JointPathType, qddot: JointPathType, time: TimesType) -> int:
"""
Send a joint trajectory through the ROS 2 action interface.
Parameters
----------
q : JointPathType
Desired joint positions for the trajectory (n, nj), where n is the number of trajectory points.
qdot : JointPathType
Desired joint velocities for the trajectory (n, nj), where n is the number of trajectory points.
qddot : JointPathType
Desired joint accelerations for the trajectory. The action path
does not use these values.
time : TimesType
Time points for the trajectory (n,).
Returns
-------
int
Motion result code.
Notes
-----
The method waits for the action response and, when enabled, for the
final motion-completion callback.
"""
if isvector(q, dim=self.nj):
q = rbs_type(q).reshape((1, self.nj))
qdot = rbs_type(qdot).reshape((1, self.nj))
if np.isscalar(time):
time = np.array([float(time)])
if ismatrix(q, shape=self.nj) and ismatrix(qdot, shape=self.nj) and isvector(time, dim=q.shape[0]):
_trajectory_msg = JointTrajectory()
_trajectory_msg.joint_names = self._robot.joint_names
# Do NOT stamp header to avoid sim-time vs wall-time mismatch; let controller start immediately
for qt, qdt, t in zip(q, qdot, time):
_point_msg = JointTrajectoryPoint()
_point_msg.positions = qt.astype(float).tolist()
_point_msg.velocities = qdt.astype(float).tolist()
_point_msg.time_from_start.sec = int(t)
_point_msg.time_from_start.nanosec = int((t - int(t)) * 1e9)
_trajectory_msg.points.append(_point_msg)
goal = FollowJointTrajectory.Goal()
goal.trajectory = _trajectory_msg
self.motion_done = False
self.motion_result = None
self._node.get_logger().info(f"FollowJointTrajectory goal: points={len(_trajectory_msg.points)} duration={time[-1]:.3f}s")
try:
qtraj_send_goal_future = self.command_action_client.send_goal_async(goal, feedback_callback=self.qtraj_feedback_callback)
qtraj_send_goal_future.add_done_callback(self.qtraj_goal_response_callback)
except Exception as e:
self._node.get_logger().warning(f"FollowJointTrajectory send_goal_async exception: {e}")
return GoalStatus.STATUS_CANCELED
# Waiting for motion result (timeout expected motion time)
self._node.get_logger().debug("Waiting for FollowJointTrajectory motion result ...")
waited = 0
while self.motion_result is None: # and waited < np.max(t):
sleep(self._robot.tsamp)
waited += self._robot.tsamp
self._robot.Update()
if waited > np.max(time):
self._node.get_logger().debug("Timeout for FollowJointTrajectory motion result!")
return MotionResultCodes.NO_RESPONSE.value
# Waiting further for motion done reported (maximal 2 x expected motion time)
if self._robot._wait_for_action_server:
self._node.get_logger().debug("Waiting for FollowJointTrajectory motion done ...")
while not self.motion_done and waited < 2 * np.max(time): # and waited < np.max(t):
sleep(self._robot.tsamp)
waited += self._robot.tsamp
self._robot.Update()
return self.motion_result
else:
raise ValueError("Invalid joint trajectory data")
[docs]
def qtraj_goal_response_callback(self, future: Future) -> None:
"""
Callback executed when the action server responds to a trajectory goal request.
Parameters
----------
future : Future
Future containing the ``ClientGoalHandle`` result from the ``send_goal_async`` call.
Notes
-----
- If the goal is rejected, ``motion_result`` is set to
``INVALID_GOAL``.
- If the goal is accepted, the completion callback is registered
through ``get_result_async``.
"""
self._node.get_logger().debug("goal_response_callback invoked")
# print type of self
self._node.get_logger().debug(f"type(self) = {type(self)}")
# print if self has message method
self._node.get_logger().debug(f"hasattr(self, 'Message') = {hasattr(self, 'Message')}")
# print verbositz level of self if hasattr(self, "_verbose"):
self._node.get_logger().debug("GoTo_qtraj: Goal response received")
self._goal_handle = future.result()
if not self._goal_handle or not self._goal_handle.accepted:
self._node.get_logger().error("FollowJointTrajectory response: Goal rejected!")
self.motion_result = FollowJointTrajectory.Result.INVALID_GOAL
else:
self._node.get_logger().info("FollowJointTrajectory response: Goal accepted")
self.motion_result = FollowJointTrajectory.Result.SUCCESSFUL
self._robot._command.mode = CommandModeCodes.JOINT_TRAJ.value
self._goal_handle.get_result_async().add_done_callback(self.qtraj_get_result_callback)
[docs]
def qtraj_get_result_callback(self, future: Future) -> None:
"""
Callback executed when the trajectory has finished execution.
Parameters
----------
future : Future
Future containing the result of the action execution.
Notes
-----
- Updates ``motion_result`` with the final action result error code.
- Sets ``motion_done`` to ``True`` indicating motion completion.
- Logs success or error based on trajectory outcome.
- If motion was aborted, sets ``_abort = True`` on the robot instance (if available).
Returns
-------
None
"""
self._node.get_logger().debug("Trajecotry result callback invoked")
wrapped = future.result()
status = wrapped.status
result = wrapped.result
code = result.error_code
if code == 0:
self._node.get_logger().info(f"FollowJointTrajectory result: {result.error_string}")
else:
self._node.get_logger().error(f"FollowJointTrajectory result: {result.error_string}")
self.motion_result = code
self.motion_done = True
self._robot._command.mode = CommandModeCodes.JOINT_TRAJ.value
if status == GoalStatus.STATUS_ABORTED:
if hasattr(self, "_robot"):
setattr(self._robot, "_abort", True)
[docs]
def qtraj_feedback_callback(self, feedback_msg: FollowJointTrajectory.Feedback) -> None:
"""
Callback executed when feedback is received during trajectory execution.
Parameters
----------
feedback_msg : FollowJointTrajectory.Feedback
Feedback message containing intermediate trajectory updates.
Notes
-----
- Stores feedback data in ``last_feedback``.
- Feedback may include joint states and progress updates.
- Logging is optional and typically kept disabled for performance reasons.
Returns
-------
None
"""
# self._node.get_logger().debug("Trajectory feedback callback invoked")
q = np.array(feedback_msg.feedback.desired.positions)
qd = np.array(feedback_msg.feedback.desired.velocities)
self._robot._command.q = q
x, J = self._robot.Kinmodel(q)
self._robot._command.x = x
if len(qd) > 0:
self._robot._command.qd = qd
try:
self._robot._command.v = J @ qd
except Exception as e:
self._node.get_logger().error(f"FollowJointTrajectory feedback: {e}\nq = {q}\nqd = {qd}\nJ=\n{J}")
self.last_feedback = feedback_msg.feedback
[docs]
def abort_motion(self) -> int | None:
"""
Attempt to abort the current trajectory motion by canceling the active FollowJointTrajectory goal.
This method sends a cancel request to the ROS2 action server for the active trajectory goal.
If no goal is currently active, a warning is logged and the method exits without performing any action.
Notes
-----
- This does not immediately stop the robot; it requests the controller
to perform a controlled stop.
- After the cancel request is sent, the result is processed in
:meth:`qtraj_cancel_done_callback`.
Returns
-------
int | None
Motion result code of the cancel operation, or ``None`` if no
active goal exists.
"""
if not hasattr(self, "_goal_handle") or self._goal_handle is None:
self._node.get_logger().warn("No active FollowJointTrajectory goal to abort.")
return
self.cancel_response = None
self._node.get_logger().info("FollowJointTrajectory goal canceling ...")
cancel_future = self._goal_handle.cancel_goal_async()
cancel_future.add_done_callback(self.qtraj_cancel_done_callback)
waited = 0
while self.cancel_response is None: # and waited < np.max(t):
sleep(self._robot.tsamp)
waited += self._robot.tsamp
self._robot.Update()
# Timeout 2s
if waited > np.max(2.0):
self._node.get_logger().debug("Timeout for FollowJointTrajectory cancel motion result!")
return MotionResultCodes.NO_RESPONSE.value
return self.cancel_response
[docs]
def qtraj_cancel_done_callback(self, future: Future) -> None:
"""
Process the result of a cancel request for an active trajectory goal.
Parameters
----------
future : Future
Future object returned from ``cancel_goal_async`` containing the cancel response.
Notes
-----
- Logs whether cancellation was successful.
- Does not raise exceptions; any failure is logged.
- This callback does not wait for the robot to fully stop; it only
indicates whether the cancel request was accepted.
"""
self.motion_done = True
_result = future.result()
if _result and len(_result.goals_canceling) > 0:
self._node.get_logger().info("FollowJointTrajectory goal successfully canceled.")
self.cancel_response = MotionResultCodes.MOTION_ABORTED.value
self._robot._command.mode = CommandModeCodes.JOINT_TRAJ.value
else:
self._node.get_logger().warn("Failed to cancel FollowJointTrajectory goal.")
self.cancel_response = MotionResultCodes.MOTION_FAILURE.value
[docs]
@staticmethod
def FollowJointTrajectory_error_str(error_code: int) -> str:
"""
Convert a ``FollowJointTrajectory`` result code to text.
Parameters
----------
error_code : int
Error code from ``FollowJointTrajectory`` action result.
Returns
-------
str
Human-readable explanation of the failure cause.
"""
FOLLOWJOINTTRAJECTORY_STATUS_MAP = {
FollowJointTrajectory.Result.SUCCESSFUL: "Trajectory executed successfully.",
FollowJointTrajectory.Result.INVALID_GOAL: "The trajectory goal is invalid (e.g., unreachable target or invalid constraints).",
FollowJointTrajectory.Result.INVALID_JOINTS: "The trajectory references unknown or mismatched joint names.",
FollowJointTrajectory.Result.OLD_HEADER_TIMESTAMP: "The command timestamp is older than the robot’s current time.",
FollowJointTrajectory.Result.PATH_TOLERANCE_VIOLATED: "The robot deviated from the desired trajectory beyond allowed path tolerance.",
FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED: "The final joint or pose error exceeded the goal tolerance.",
}
return FOLLOWJOINTTRAJECTORY_STATUS_MAP.get(error_code, f"Unknown FollowJointTrajectory error code: {error_code}")
[docs]
@staticmethod
def FollowJointTrajectory_goal_status_str(status_code: int) -> str:
"""
Convert an action goal status code to text.
Parameters
----------
status_code : int
Action goal execution status.
Returns
-------
str
Human-readable status description.
"""
STATUS_MAP = {
GoalStatus.STATUS_UNKNOWN: "Goal status unknown (controller did not report a valid state).",
GoalStatus.STATUS_ACCEPTED: "Goal accepted and queued for execution.",
GoalStatus.STATUS_EXECUTING: "Goal is currently being executed.",
GoalStatus.STATUS_CANCELING: "Goal is being canceled.",
GoalStatus.STATUS_SUCCEEDED: "Goal execution finished successfully.",
GoalStatus.STATUS_CANCELED: "Goal was canceled before finishing.",
GoalStatus.STATUS_ABORTED: "Goal execution was aborted due to an error or external interruption.",
}
return STATUS_MAP.get(status_code, f"Unknown FollowJointTrajectory goal status: {status_code}")