Source code for robotblockset.ros.robots_ros

"""ROS robot interface implementations.

This module defines ROS-backed robot wrappers used by RobotBlockSet robot
interfaces. It provides strategy-to-controller mapping, controller switching,
logger configuration, and joint- and Cartesian-command forwarding.

Copyright (c) 2024- Jozef Stefan Institute

Authors: Mihael Simonic, Leon Zlajpah.
"""

from typing import Any, Dict, List, Optional, Type

from robotblockset.robots import robot, MotionResultCodes
from robotblockset.ros.controllers_ros import joint_impedance_controller, controller_manager_proxy
from robotblockset.tools import isvector, ismatrix, check_option
from roscpp.srv import SetLoggerLevel, SetLoggerLevelRequest
from robotblockset.rbs_typing import JointConfigurationType, JointTorqueType, JointVelocityType, Pose3DType, Velocity3DType, WrenchType

import rospy


[docs] class robot_ros(robot): """ Base class for ROS-backed robot interfaces. Attributes ---------- _namespace : str ROS namespace prefix used by the robot wrapper. controller_helper : controller_manager_proxy Helper object for loading and switching ROS controllers. controller : Any Active RobotBlockSet controller wrapper. """
[docs] def __init__(self, ns: str = "", init_node: bool = True, multi_node: bool = False, control_strategy: Optional[str] = None, strategy_controller_mapping: Optional[dict] = None) -> None: """ Initialize the generic ROS robot interface. Parameters ---------- ns : str, optional ROS namespace of the robot. init_node : bool, optional If ``True``, initialize a ROS node for the wrapper. multi_node : bool, optional If ``True``, initialize the ROS node in anonymous mode. control_strategy : str, optional Initial RobotBlockSet control strategy. strategy_controller_mapping : dict[str, tuple[str, Type[Any]]] | None, optional Mapping from RobotBlockSet strategy names to pairs of ROS controller names and controller wrapper classes. Notes ----- The constructor initializes the controller-manager helper and optionally activates the requested control strategy. """ robot.__init__(self) if ns == "": self._namespace = "" self.Name = "robot_ros" else: self._namespace = "/" + ns self.Name = ns if init_node: self._init_ros_node(multi_node) else: if not rospy.core.is_initialized(): self.WarningMessage("The ROS node must be initialized externally before proceeding.") # Mapping of RBS strategies into ros_control controller names self._controller_to_strategy_mapping = {v[0]: k for k, v in strategy_controller_mapping.items()} self._strategy_to_controller_mapping = {k: v[0] for k, v in strategy_controller_mapping.items()} self._strategy_to_class_mapping = {k: v[1] for k, v in strategy_controller_mapping.items()} # Controller helper self.controller_helper = controller_manager_proxy(controller_manager_node_location=self._namespace + "/controller_manager", robot_resource_name=self.Name) if control_strategy is not None: self.SetStrategy(control_strategy) else: self.controller = None self._control_strategy = None self.Message("Initializing robot object without a controller. You will only be able to read robot state.", 1)
def _init_ros_node(self, anonymous: bool = False) -> None: """ Initialize the ROS node used by the robot wrapper. Parameters ---------- anonymous : bool, optional If ``True``, initialize the ROS node in anonymous mode. """ try: rospy.init_node("FrankaHandler_{}".format(self.Name), anonymous=anonymous) except rospy.ROSException as e: self.Message("Skipping node init because of exception: {}".format(e), 0) # @abstractmethod # def save_ros_parameters(self): # pass # @abstractmethod # def load_ros_parameters(self): # pass # @abstractmethod # def _preload_ros_messages(self): # pass def _get_controller_from_strategy(self, strategy: str) -> Optional[str]: """ Return the ROS controller mapped to a RobotBlockSet strategy. Parameters ---------- strategy : str RobotBlockSet control strategy name. Returns ------- str | None ROS controller name mapped to the strategy. """ return self._strategy_to_controller_mapping.get(strategy) def _get_strategy_from_controller(self, controller: str) -> Optional[str]: """ Return the RobotBlockSet strategy mapped to a ROS controller. Parameters ---------- controller : str ROS controller name. Returns ------- str | None RobotBlockSet strategy mapped to the controller. """ return self._controller_to_strategy_mapping.get(controller) def _cleanup_ros_topic_interface(self, interface_name: str) -> None: """ Unregister and clear a ROS topic interface attribute when present. Parameters ---------- interface_name : str Name of the attribute holding the ROS topic interface. """ if hasattr(self, interface_name): interface = getattr(self, interface_name, None) if interface: interface.unregister() setattr(self, interface_name, None) # Strategies
[docs] def GetStrategy(self) -> Optional[str]: """ Return the currently selected RobotBlockSet control strategy. Returns ------- str | None Name of the active control strategy. """ return self._control_strategy
# self.controller_helper.update_active_controller()
[docs] def AvailableStrategies(self) -> List[str]: """ List the control strategies supported by the ROS robot wrapper. Returns ------- list[str] Available RobotBlockSet control strategies. """ return list(self._strategy_to_controller_mapping.keys())
[docs] def SetStrategy(self, new_strategy: str) -> Optional[bool]: """ Switch the active ROS controller to match a RobotBlockSet strategy. Parameters ---------- new_strategy : str Requested RobotBlockSet control strategy. Returns ------- bool | None ``False`` if no switch was needed or if switching failed. Returns ``None`` when the switch succeeds. Notes ----- Existing motion is stopped before switching controllers. """ if new_strategy in self.AvailableStrategies(): # First check if strategy was changed elsewhere active_controller = self.controller_helper.update_active_controller() self._control_strategy = self._get_strategy_from_controller(active_controller) # Check if object is initialized if self.isReady(): if self._control_strategy == new_strategy: self.Message(f"Not switching because already using '{new_strategy}'", 2) return False # Stop any existing movements self.Stop() self._semaphore.release() # Check if controller is loaded ros_controller = self._get_controller_from_strategy(new_strategy) if ros_controller not in self.controller_helper.list_loaded_controllers(): self.controller_helper.load_controller(ros_controller) # Prepare switch request if hasattr(self, "controller"): stop_controller = [] if self.controller is None else [self.controller._ros_controller_name] else: self.controller_helper.stop_active_controller() stop_controller = [] start_controller = [self._get_controller_from_strategy(new_strategy)] resp = self.controller_helper.switch_controller(stop_controllers=stop_controller, start_controllers=start_controller) if resp: self._control_strategy = new_strategy self.controller = self._strategy_to_class_mapping[new_strategy](self, self._namespace) else: self.Message("Switching failed. Check ros logs!", 0) return False else: raise ValueError(f"Strategy '{new_strategy}' not supported")
[docs] def SetLoggerLevel(self, level: str = "info", logger: Optional[str] = None) -> None: """ Set the ROS logger verbosity level for the active controller. Parameters ---------- level : str, optional Target logger level. logger : str, optional Explicit logger name. If ``None``, use the active controller logger. """ if level in ["debug", "info", "warn"]: log_msg = SetLoggerLevelRequest() log_msg.logger = logger if logger is not None else self.controller._ros_logger_name log_msg.level = level self.logger_svc_proxy.call(log_msg) if check_option(level, "debug"): self.verbose = 3 elif check_option(level, "info"): self.verbose = 1 else: raise ValueError("unsupported ")
[docs] def GoTo_q(self, q: JointConfigurationType, qdot: JointVelocityType, trq: JointTorqueType, wait: float, **kwargs: Any) -> int: """ Command the robot in joint space through the active ROS controller. Parameters ---------- q : JointConfigurationType Desired joint positions. qdot : JointVelocityType Desired joint velocities. trq : JointTorqueType Desired joint torques. wait : float Synchronization time after the command is sent. Returns ------- int Motion result code. """ self.Message(f"position: {q}, vel: {qdot}, trq: {trq}", 4) self.controller.GoTo_q(q, qdot, trq) self._synchro_control(wait) self._command.q = q self._command.qdot = qdot self._command.trq = trq x, J = self.Kinmodel(q) self._command.x = x self._command.v = J @ qdot self.Update() self._last_control_time = self.simtime() return MotionResultCodes.MOTION_SUCCESS.value
[docs] def GoTo_X(self, x: Pose3DType, xdot: Velocity3DType, trq: WrenchType, wait: float, do_not_publish_msg: bool = False, **kwargs: Any) -> int: """ Command the robot in Cartesian space through the active ROS controller. Parameters ---------- x : Pose3DType Desired Cartesian pose. xdot : Velocity3DType Desired Cartesian twist. trq : WrenchType Desired end-effector wrench. wait : float Synchronization time after the command is sent. do_not_publish_msg : bool, optional Legacy flag kept for compatibility. **kwargs : Any Additional controller-specific keyword arguments. Returns ------- int Motion result code. """ if not isvector(x, dim=7): raise Exception("%s: GoTo_x: NAN x value" % self.Name) if not isvector(xdot, dim=6): raise Exception("%s: GoTo_x: NAN xdot value" % self.Name) if not isvector(trq, dim=6): raise Exception("%s: GoTo_x: NAN trq value" % self.Name) if do_not_publish_msg: raise Exception("Not supported anymore. Use GoTo_Xtraj instead.") self.Message(f"pose: {x}, vel: {xdot}, trq: {trq}", 4) self.controller.GoTo_X(x, xdot, trq, wait, **kwargs) self._command.x = x self._command.v = xdot self._command.FT = trq self.Update() self._synchro_control(wait) self._last_control_time = self.simtime() return 0