Source code for robotblockset.ros2.robots_ros2

"""ROS 2 robot interface implementations.

This module defines ROS 2-backed robot wrappers used by RobotBlockSet robot
interfaces. It provides communication plumbing, controller integration, state
feedback handling, and motion-execution helpers for multiple robot platforms.

Copyright (c) 2024- Jozef Stefan Institute

Authors: Leon Zlajpah, Mihael Simonic.
"""

from __future__ import annotations

# pyright: reportMissingImports=false

from typing import Optional, Any, Dict, List
import numpy as np
from time import sleep, time
from copy import deepcopy
from threading import Thread

try:
    import rclpy
    from rclpy.node import Node
    from rclpy.executors import MultiThreadedExecutor
    from rclpy.qos import qos_profile_sensor_data
except Exception as e:
    raise e from RuntimeError("ROS2 rclpy not installed.\nYou can install rclpy with commands:\n   sudo apt update\nsudo apt install ros-<ros-distro>-rclpy")

try:
    from sensor_msgs.msg import JointState
    from geometry_msgs.msg import WrenchStamped
except Exception as e:
    raise e from RuntimeError("Problems with importing ROS2 messages. Check if all are installed.")

from robotblockset.robots import robot, MotionResultCodes
from robotblockset.tools import rbs_type, isvector, vector
from robotblockset.ros2.controllers_ros2 import controller_manager_helper
from robotblockset.ros2.controllers_ros2 import JointTrajectoryControllerInterface, RosControllerInterface
from robotblockset.rbs_typing import JointConfigurationType, JointVelocityType, JointTorqueType, Pose3DType, Velocity3DType, WrenchType, JointPathType, TimesType


[docs] class robot_ros2(robot): """ Base class for ROS 2-backed robot interfaces. Attributes ---------- Name : str Name of the ROS 2 node and RobotBlockSet robot instance. _namespace : str Normalized ROS namespace prefix used for topics, actions, and services. controller : RosControllerInterface | None Active controller interface bound to the robot. joint_state : JointState | None Last received joint-state message. force_state : WrenchStamped | None Last received force/torque message. """
[docs] def __init__(self, name: str, namespace: str = "", joint_states_topic: Optional[str] = "joint_states", control_strategy: Optional[str] = None, strategy_to_controller_interface_mapping: Optional[Dict[str, RosControllerInterface]] = None, node: Optional[Node] = None, control_manager_name: Optional[str] = "controller_manager") -> None: """ Initialize a ROS 2 robot interface. Parameters ---------- name : str Name of the robot node. namespace : str, optional Namespace for the robot topics, actions, and services. joint_states_topic : str | None, optional Relative topic used to subscribe to joint states. If ``None``, no joint-state subscription is created. control_strategy : str | None, optional Initial RobotBlockSet control strategy. strategy_to_controller_interface_mapping : dict[str, RosControllerInterface] | None, optional Mapping from strategy names to controller interface instances. node : Node | None, optional Optional externally created ROS 2 node to use. If ``None``, a new node is created with the given name. control_manager_name : str, optional Name of the ROS 2 controller manager. Notes ----- Subclasses may create additional subscriptions, clients, and publishers before calling :meth:`_start_spinning`. """ robot.__init__(self) if node is None: self._node = Node(name) self._owns_node = True else: self._node = node self._owns_node = False if namespace is None or namespace == "": self._namespace = "" else: self._namespace = "/" + str(namespace).strip("/") # Robot name for logging self.Name = name # Mapping of RBS strategies to ros_control controller interfaces self._strategy_to_controller_interface_mapping = strategy_to_controller_interface_mapping or {} self._control_strategy = control_strategy self.controller = None # Create the executor self._executor = MultiThreadedExecutor() self._executor.add_node(self._node) # Initialize the control helper if self._strategy_to_controller_interface_mapping == {}: self._node.get_logger().info("No Strategy<->Control_interface mapping and control manager is not used") self._control_helper = None else: self._control_helper = controller_manager_helper(self._node, namespace=namespace, controller_manager_name=control_manager_name) # Spin control self._spinning = False self._spin_thread = None # Connection and state variables self._connected = False self.joint_state = None self._last_joint_state_callback_time = 0 self.force_state = None self._last_force_state_callback_time = 0 self._reorder = None self._wait_for_action_server = True # Motion command wait to get Action server response # Initialize ROS2 subscription to joint states if joint_states_topic is not None: topic_full_name = f"{self._namespace}/{joint_states_topic.strip('/')}" self._node.get_logger().info(f"Subscribing to joint states topic: {topic_full_name}") self._joint_states_sub = self._node.create_subscription(msg_type=JointState, topic=topic_full_name, callback=self._joint_state_callback, qos_profile=qos_profile_sensor_data)
# We don't start spinning here yet. Subclasses may add more subs/clients first to avoid # concurrent entity mutations during wait set building. They should call # self._start_spinning(wait_for_state=True) when ready. # Subclasses should also SetStrategy after spinning starts. def __del__(self) -> None: """ Clean up ROS 2 resources during object destruction. Notes ----- Python may call ``__del__`` during interpreter shutdown when logging and ROS resources are already partially destroyed. All shutdown actions are therefore wrapped in ``try`` blocks. """ try: # Logging may fail during interpreter shutdown try: if hasattr(self, "_node") and self._node is not None: self._node.get_logger().info(f"{self.Name} object is being deleted...") except Exception: pass # Perform full shutdown of executors, thread, and node try: self._shutdown() except Exception: pass try: if hasattr(self, "_node") and self._node is not None: self._node.get_logger().info(f"{self.Name} deletion complete.") except Exception: pass except Exception: # Never allow destructor exceptions to propagate pass # ------------------------------------------------------------------ # Spinning # ------------------------------------------------------------------ def _run(self) -> None: """ Execute the background ROS 2 spin loop. Notes ----- The loop repeatedly calls ``spin_once`` on the shared executor so that callback processing remains responsive to shutdown requests. Transient errors from entity destruction (e.g. when switching controllers) are caught per-iteration so the loop continues rather than exiting. """ while rclpy.ok() and getattr(self, "_spinning", False): self._executor.spin_once(timeout_sec=0.1) def _start_spinning(self, wait_for_state: bool = True, timeout_sec: float = 10.0) -> None: """ Start the executor thread and optionally wait for initial state feedback. Parameters ---------- wait_for_state : bool, optional Whether to wait for the first joint state message to arrive. timeout_sec : float, optional Timeout in seconds when waiting for the first joint state. Returns ------- None Notes ----- If ``wait_for_state`` is enabled and no joint state arrives within the timeout, the robot remains marked as disconnected. """ if self._spinning: return self._spinning = True self._spin_thread = Thread(target=self._run, args=(), kwargs={}, daemon=True) self._spin_thread.start() # Optionally wait for first state message to arrive if wait_for_state: waited = 0.0 step = 0.05 while self.joint_state is None and waited < timeout_sec: sleep(step) waited += step if self.joint_state is None: self._node.get_logger().warning("No joint state received within timeout; continuing without connection confirmation.") self._connected = False else: self._connected = True self._node.get_logger().info("Connected") # Apply desired strategy now that spinning is active if self._control_strategy is not None: self.SetStrategy(self._control_strategy) else: self.controller = None self._control_strategy = None self._node.get_logger().warning("Initializing robot object without a controller.") def _shutdown(self, join_timeout: float = 2.0) -> None: """ Stop spinning and release ROS 2 resources. Parameters ---------- join_timeout : float, optional Seconds to wait for the spin thread to finish. Returns ------- None Notes ----- This method stops the background thread, shuts down the controller helper, removes the node from the executor, and destroys the node. """ # Stop spin loop and join background thread try: self._spinning = False if self._spin_thread is not None and self._spin_thread.is_alive(): self._node.get_logger().info("Waiting for ROS2 spin thread to finish...") self._spin_thread.join(timeout=join_timeout) if self._spin_thread.is_alive(): self._node.get_logger().warning("ROS2 spin thread did not exit within timeout") except Exception as e: try: self._node.get_logger().error(f"Error while stopping spin thread: {e}") except Exception: pass # Shut down control helper try: if hasattr(self, "_control_helper") and self._control_helper is not None: self._node.get_logger().info("Shutting down control helper...") try: self._control_helper._shutdown() except Exception as e: self._node.get_logger().error(f"Error while shutting down control helper: {e}") except Exception: # Swallow any logging issues pass # Remove node from executor and shut it down try: if hasattr(self, "_executor") and self._executor is not None: self._node.get_logger().info("Removing node from executor...") try: self._executor.remove_node(self._node) except Exception: # It's fine if the node was already removed pass self._node.get_logger().info("Shutting down executor...") try: self._executor.shutdown() except Exception as e: self._node.get_logger().error(f"Error while shutting down executor: {e}") except Exception: # Again, avoid raising during shutdown pass # Destroy node try: if hasattr(self, "_node") and self._node is not None: self._node.get_logger().info("Destroying node...") if self._owns_node: self._node.destroy_node() except Exception as e: try: if hasattr(self, "_node") and self._node is not None: self._node.get_logger().error(f"Error while destroying node: {e}") except Exception: pass # States def _joint_state_callback(self, data: JointState) -> None: """ Store the latest joint-state message. Parameters ---------- data : JointState Joint-state message received from ROS 2. """ self.joint_state = deepcopy(data) self._last_joint_state_callback_time = self.simtime() def _force_state_callback(self, data: WrenchStamped) -> None: """ Store the latest force/torque message. Parameters ---------- data : WrenchStamped Force/torque message received from ROS 2. """ self.force_state = data self._last_force_state_callback_time = self.simtime()
[docs] def GetState(self) -> None: """ Update the RobotBlockSet state from the latest ROS 2 messages. Notes ----- Joint state, force/torque data, and controller reference state are copied into the RobotBlockSet ``_actual`` and ``_command`` containers. """ if self.joint_state is not None: if self._reorder is None: self._reorder = [self.joint_state.name.index(a) for a in self.joint_names if a in self.joint_state.name] pos = [self.joint_state.position[i] for i in self._reorder] vel = [self.joint_state.velocity[i] for i in self._reorder] trq = [self.joint_state.effort[i] for i in self._reorder] self._tt = self.simtime() self._actual.q = rbs_type(pos) self._actual.qdot = rbs_type(vel) self._actual.trq = rbs_type(trq) x, J = self.Kinmodel() self._actual.x = x self._actual.v = J @ self._actual.qdot if self.force_state: _f = self.force_state.wrench.force _t = self.force_state.wrench.torque self._actual.FT = rbs_type([_f.x, _f.y, _f.z, _t.x, _t.y, _t.z]) self._last_update = self.simtime() # Do not change !
# Strategies
[docs] def AvailableStrategies(self) -> List[str]: """ Return the available control strategies for the robot. Returns ------- List[str] A list of available control strategies. """ return list(self._strategy_to_controller_interface_mapping.keys())
[docs] def SetStrategy(self, new_strategy: str) -> None: """ Switch the active control strategy. Parameters ---------- new_strategy : str Name of the control strategy to activate. Returns ------- None Notes ----- The current controller is stopped and unregistered before the new ROS 2 controller is loaded, switched, and activated. Raises ------ ValueError If the specified strategy is not supported. """ if new_strategy not in self.AvailableStrategies(): raise ValueError(f"Strategy '{new_strategy}' not supported") # Check if any of the possible controllers is already active ci = self._strategy_to_controller_interface_mapping possible_controllers_names = [ci[s]._ros_plugin_name for s in self.AvailableStrategies()] loaded_controllers = self._control_helper.list_controllers() # Identify active controllers that claim actuation interfaces. # Note: ROS allows multiple controllers to run concurrently (e.g., state broadcasters), # and multiple controllers may claim different types of interfaces for the same joint. # However, actuation interfaces (e.g., joint_x) must be exclusively owned — only one controller # can claim a given actuation interface at a time. # Other interfaces may still be claimed by other controllers without conflict. active_controller_names = [c.name for c in loaded_controllers if c.name in possible_controllers_names and c.state == "active" and len(c.claimed_interfaces) > 0] # ... but we don't handle this yet if len(active_controller_names) > 1: raise RuntimeError("Undefined behavior: Multiple active controllers claiming actuation interfaces detected! " f"Active controllers: {active_controller_names}") # Make reverse mapping from controller name to strategy if self._control_strategy is not None and self.controller is not None: # Make sure internal state is consistent with the actual active controller if len(active_controller_names) > 0: # ToDo : poglej:self._control_strategy = active_controller_names[0] active_controller_name = active_controller_names[0] reverse_map = {iface._ros_plugin_name: strategy for strategy, iface in ci.items()} self._control_strategy = reverse_map.get(active_controller_name, "") else: self._control_strategy = "" # Check if already using the desired strategy. # Also verify the controller object matches, because two strategies may share # the same ros_plugin_name (e.g. "JointPosition" and "JointPositionTrajectory" # both use "fr3_arm_controller") — the reverse_map would collapse them to one # entry, so we must confirm the actual interface object is correct too. if self._control_strategy == new_strategy and self.controller is not None and self.controller is ci.get(new_strategy): self.Message(f"Not switching; already using '{new_strategy}'", 2) return # Stop motion before switching. For action-backed controllers, first # abort goals. try: if self._control_strategy is not None: self.Abort() self.Stop() except Exception: # Non-critical if semaphore not yet initialized. pass # Snapshot the feedback state as the new command target before the old # interface is torn down. self.ResetCurrentTarget(do_move=False) # Safely unload current controller interface if self.controller is not None and self.controller._registered: self.controller._unregister_interface() self.controller = None # Check if the target controller is loaded target_controller_name = ci.get(new_strategy)._ros_plugin_name loaded_controllers = self._control_helper.list_controllers() if target_controller_name not in [c.name for c in loaded_controllers]: if not self._control_helper.load_controller(target_controller_name): raise ValueError(f"ROS2 controller '{target_controller_name}' can not be loaded!") # Prepare switch request if target_controller_name not in active_controller_names: if not self._control_helper.switch_controllers(stop_list=active_controller_names, start_list=[target_controller_name]): self.Message("Switching failed. Check ROS2 logs!", 0) return False # Activate controller interface controller_instance = ci.get(new_strategy) self.controller = controller_instance self.controller.Activate(robot=self, node=self._node) self._control_strategy = new_strategy self.Message(f"Strategy set to '{new_strategy}'", 2)
# Status
[docs] def isConnected(self) -> bool: """ Check whether the robot has received initial state feedback. Returns ------- bool ``True`` if the robot is connected, otherwise ``False``. """ return self._connected
[docs] def Shutdown(self, join_timeout: float = 2.0) -> None: """ Shut down the robot node and its background executor. Parameters ---------- join_timeout : float, optional Seconds to wait for the background spin thread to stop. """ self._shutdown(join_timeout=join_timeout)
# Movements
[docs] def StopMotion(self) -> Optional[int]: """ Stop the current motion and reset the local target state. Returns ------- int | None Result code from the controller's abort call, if available. Notes ----- The robot state is switched to stop mode and the current RobotBlockSet target is reset even when the controller does not provide an explicit abort method. """ _res = None if hasattr(self.controller, "abort_motion"): _res = self.controller.abort_motion() self.Stop() self.ResetCurrentTarget() return _res
[docs] def EnableWaitForActionServer(self, check: bool = True) -> None: """ Enable waiting for action completion callbacks. Parameters ---------- check : bool, optional Whether to enable check. Default is True. Returns ------- None """ self._wait_for_action_server = check
[docs] def DisableWaitForActionServer(self) -> None: """ Disable waiting for action completion callbacks. """ self._wait_for_action_server = False
[docs] def WaitUntilDone(self, timeout: Optional[float] = None) -> int: """ Wait until the controller reports motion completion. Parameters ---------- timeout : float, optional Maximum time to wait in seconds. If ``None``, wait indefinitely. Returns ------- int Motion result code reported by the controller. Notes ----- This method polls ``self.controller.motion_done`` and updates the robot state while waiting. It is a blocking helper and should not be used from real-time or callback threads. """ # If no motion_done attribute, assume completed if not hasattr(self.controller, "motion_done") or self.controller.motion_done is None: return MotionResultCodes.MOTION_SUCCESS.value elif self.controller.motion_done: return self.controller.motion_result start_time = time() # Loop until motion completes or timeout expires while not self.controller.motion_done: if timeout is not None and (time() - start_time) >= timeout: return MotionResultCodes.NO_RESPONSE.value # Timed out sleep(self.tsamp) self.Update() return self.controller.motion_result # Completed
[docs] def GoTo_q(self, q: JointConfigurationType, qdot: Optional[JointVelocityType] = None, trq: Optional[JointTorqueType] = None, wait: Optional[float] = None, **kwargs: Any) -> int: """ Command the robot to move to a joint configuration. Parameters ---------- q : JointConfigurationType Desired joint positions (nj,). qdot : JointVelocityType, optional Desired joint velocities (nj,). trq : JointTorqueType, optional Desired joint torques (nj,). wait : float, optional Time step used for synchronization and command execution. If ``None``, ``self.tsamp`` is used. Returns ------- int Motion result code. Notes ----- If the active controller does not implement direct joint commands, the method falls back to a two-point joint trajectory. Raises ------ ValueError If control is not implemented in controller. """ self._synchro_control(wait) q = vector(q, dim=self.nj) if qdot is None: qdot = np.zeros(self.nj) else: qdot = vector(qdot, dim=self.nj) if trq is None: trq = np.zeros(self.nj) else: trq = vector(trq, dim=self.nj) if wait is None: wait = self.tsamp self._synchro_control(wait) if hasattr(self.controller, "GoTo_q"): res = self.controller.GoTo_q(q, qdot, trq, wait) elif hasattr(self.controller, "GoTo_qtraj"): # Two points are needed. We use last commanded q ar first point qq = np.vstack((self._actual.q, q)) qqdot = np.vstack((qdot, 0 * qdot)) time = np.hstack((0, wait)) res = self.controller.GoTo_qtraj(qq, qqdot, np.zeros(q.shape), time) else: raise NotImplementedError("GoTo_q method not implemented for the current controller") if res == 0: self._command.q = q self._command.qdot = qdot if qdot is not None else np.zeros(self.nj) x, J = self.Kinmodel(q) self._command.x = x self._command.v = J @ self._command.qdot self.Update() return res
[docs] def GoTo_qtraj(self, q: JointPathType, qdot: JointPathType, qddot: JointPathType, time: TimesType) -> int: """ Command the robot to follow a joint trajectory. Parameters ---------- q : JointPathType Desired joint positions for the trajectory (n, nj), where n is the number of trajectory points. qdot : JointPathType Desired joint velocities for the trajectory (n, nj), where n is the number of trajectory points. qddot : JointPathType Desired joint accelerations for the trajectory (n, nj). Not used in ROS2 controller. time : TimesType Time points for the trajectory (n,). Returns ------- int Controller status code (0 for success). Notes ----- On success, the final trajectory point is copied into the RobotBlockSet command state. Raises ------ ValueError If control is not implemented in controller. """ if hasattr(self.controller, "GoTo_qtraj"): _stat = self.controller.GoTo_qtraj(q, qdot, qddot, time) if _stat == 0: self._command.q = q[-1] self._command.qdot = np.zeros(self.nj) x = self.Kinmodel(self._command.q)[0] self._command.x = x self._command.v = np.zeros(6) return _stat else: raise NotImplementedError("GoTo_qtraj_pub method not implemented for the current controller")
[docs] def GoTo_X(self, x: Pose3DType, xdot: Velocity3DType, trq: WrenchType, wait: float, **kwargs: Any) -> int: """ Command the robot in Cartesian space. Parameters ---------- x : Pose3DType Desired end-effector pose. xdot : Velocity3DType Desired end-effector twist. trq : WrenchType Desired end-effector wrench. wait : float Time step used for controller synchronization. **kwargs : Any Additional controller-specific keyword arguments. Returns ------- int Motion result code. Notes ----- Successful commands update the RobotBlockSet Cartesian command state. Raises ------ ValueError If control is not implemented in controller. """ if hasattr(self.controller, "GoTo_X"): if not isvector(x, dim=7): raise Exception("%s: GoTo_x: NAN x value" % self.Name) if not isvector(xdot, dim=6): raise Exception("%s: GoTo_x: NAN xdot value" % self.Name) if not isvector(trq, dim=6): raise Exception("%s: GoTo_x: NAN trq value" % self.Name) self.Message(f"pose: {x}, vel: {xdot}, trq: {trq}", 4) res = self.controller.GoTo_X(x, xdot, trq, wait, **kwargs) if res == MotionResultCodes.MOTION_SUCCESS.value: self._command.x = x self._command.v = xdot self._command.FT = trq self.Update() #self._command.q = self._actual.q # Keep current joint state as target #self._command.qdot = np.zeros(self.nj) # Clear joint velocity command self._synchro_control(self.tsamp) return res else: raise NotImplementedError("GoTo_X method not implemented for the current controller")
[docs] def GetCartesianCompliance(self) -> Any: """Return Cartesian compliance parameters from the active controller.""" if hasattr(self.controller, "GetCartesianCompliance"): return self.controller.GetCartesianCompliance() raise NotImplementedError("Cartesian compliance not implemented for the current controller")
[docs] def SetCartesianCompliance( self, Kp: Optional[Any] = None, Kr: Optional[Any] = None, R: Optional[Any] = None, D: Optional[float] = None, hold_pose: bool = True, ) -> int: """Update Cartesian compliance through the active controller.""" if hasattr(self.controller, "SetCartesianCompliance"): return self.controller.SetCartesianCompliance(Kp=Kp, Kr=Kr, R=R, D=D, hold_pose=hold_pose) raise NotImplementedError("Cartesian compliance not implemented for the current controller")
[docs] def GetCartesianStiffness(self) -> Any: """Return Cartesian stiffness parameters from the active controller.""" if hasattr(self.controller, "GetCartesianStiffness"): return self.controller.GetCartesianStiffness() raise NotImplementedError("Cartesian stiffness not implemented for the current controller")
[docs] def SetCartesianStiffness(self, Kp: Optional[Any] = None, Kr: Optional[Any] = None, hold_pose: bool = True) -> int: """Update Cartesian stiffness through the active controller.""" if hasattr(self.controller, "SetCartesianStiffness"): return self.controller.SetCartesianStiffness(Kp=Kp, Kr=Kr, hold_pose=hold_pose) raise NotImplementedError("Cartesian stiffness not implemented for the current controller")
[docs] def GetCartesianDamping(self) -> Any: """Return Cartesian damping parameters from the active controller.""" if hasattr(self.controller, "GetCartesianDamping"): return self.controller.GetCartesianDamping() raise NotImplementedError("Cartesian damping not implemented for the current controller")
[docs] def SetCartesianDamping(self, D: Optional[float] = None, hold_pose: bool = True) -> int: """Update Cartesian damping through the active controller.""" if hasattr(self.controller, "SetCartesianDamping"): return self.controller.SetCartesianDamping(D=D, hold_pose=hold_pose) raise NotImplementedError("Cartesian damping not implemented for the current controller")
[docs] def SetCartesianSoft(self, stiffness: Any, hold_pose: bool = True) -> int: """Scale Cartesian compliance through the active controller.""" if hasattr(self.controller, "SetCartesianSoft"): return self.controller.SetCartesianSoft(stiffness, hold_pose=hold_pose) raise NotImplementedError("Cartesian softness not implemented for the current controller")
[docs] def SetCartesianStiff(self, hold_pose: bool = True) -> int: """Set Cartesian stiffness to the active controller's stiff defaults.""" if hasattr(self.controller, "SetCartesianStiff"): return self.controller.SetCartesianStiff(hold_pose=hold_pose) return self.SetCartesianSoft(1.0, hold_pose=hold_pose)
[docs] def SetCartesianCompliant(self, hold_pose: bool = True) -> int: """Set Cartesian stiffness to compliant mode on the active controller.""" if hasattr(self.controller, "SetCartesianCompliant"): return self.controller.SetCartesianCompliant(hold_pose=hold_pose) return self.SetCartesianSoft(0.0, hold_pose=hold_pose)
[docs] @staticmethod def MotionResultStr(code: int) -> str: """ Convert a motion result code to a human-readable description. Parameters ---------- code : int The `result code` value. Returns ------- str A human-readable result code string. """ # Handle success case if code == 0: return "Trajectory execution successful" if code < 0: _msg = JointTrajectoryControllerInterface.FollowJointTrajectory_error_str(code) else: _msg = robot.MotionResultStr(code) return _msg