Source code for robotblockset.ros2.grippers_ros2

"""ROS 2 gripper interface implementations.

This module defines ROS 2-backed gripper wrappers used by RobotBlockSet robot
interfaces.

Authors: Mihael Simonic
"""

from time import sleep
from threading import Thread
from typing import Any, Optional

try:
    import rclpy
    from rclpy.action import ActionClient
    from rclpy.executors import MultiThreadedExecutor
    from rclpy.node import Node
except Exception as e:
    raise e from RuntimeError("ROS2 rclpy not installed or sourced.")

try:
    from franka_msgs.action import Grasp, Homing, Move
    from std_srvs.srv import Trigger
    from sensor_msgs.msg import JointState
except Exception as e:
    raise e from RuntimeError("Problems with importing ROS2 messages. Check if franka_msgs and std_srvs are installed.")

from robotblockset.grippers import gripper
from robotblockset.robots import robot


[docs] class FrankaGripper(gripper): """ ROS 2 action-based interface for the Franka gripper (Panda / FR3). Uses ``franka_msgs`` action servers for homing, moving, and grasping, and a ``std_srvs/Trigger`` service for stopping. Can be used standalone (``robot=None``) or attached to an existing ``robot`` instance. Attributes ---------- Name : str Identifier for the gripper instance. GripperTagNames : str Tag used for identifying the gripper in logs or systems. Robot : robot or None The robot associated with the gripper, or ``None`` if standalone. """
[docs] def __init__(self, robot: Optional[robot] = None, namespace: str = "franka_gripper", **kwargs: Any) -> None: """ Initialize the Franka gripper ROS 2 wrapper. Parameters ---------- robot : robot, optional Robot instance to which the gripper is attached. Must be an ``robot`` subclass instance. If ``None``, a minimal internal node is created and spun in a background thread so the gripper can be used standalone. namespace : str, optional ROS 2 namespace the ``franka_gripper_node`` runs in. The node is always named ``franka_gripper``, so topics resolve to ``/{namespace}/franka_gripper/{action}``. Defaults to ``"franka_gripper"`` which matches the standard launch setup. **kwargs : Any Additional keyword arguments for future extensions or configuration. """ self.Name = "Franka:Gripper:ROS2" self.GripperTagNames = "gripper" self.Robot = robot self._namespace = namespace.strip("/") self._width_grasp = 0 self._width = 0 self._width_max = 0.08 self._speed = 0.0 self._speed_max = 0.5 self._verbose = 1 self._state = -1 self._own_node = False # True when we created the node ourselves # Resolve the ROS 2 node used for creating clients. if robot is not None and hasattr(robot, "_node"): self._node: Node = robot._node elif robot is not None and isinstance(robot, Node): self._node = robot elif robot is None: self._node = Node("franka_gripper_client") self._own_node = True self._executor = MultiThreadedExecutor() self._executor.add_node(self._node) self._spin_thread = Thread(target=self._executor.spin, daemon=True) self._spin_thread.start() else: raise TypeError("robot must be a robot_ros2 instance or None.") # Build namespaced topic prefix. # The franka_gripper_node is always named 'franka_gripper', so # topics resolve to /{namespace}/franka_gripper/{action}. _ns = f"/{self._namespace}" if self._namespace else "" self._topic_homing = f"{_ns}/franka_gripper/homing" self._topic_grasp = f"{_ns}/franka_gripper/grasp" self._topic_move = f"{_ns}/franka_gripper/move" self._topic_stop = f"{_ns}/franka_gripper/stop" self._topic_joint_states = f"{_ns}/franka_gripper/joint_states" # Action clients self._client_homing = ActionClient(self._node, Homing, self._topic_homing) self._client_grasp = ActionClient(self._node, Grasp, self._topic_grasp) self._client_move = ActionClient(self._node, Move, self._topic_move) # Stop service client self._client_stop = self._node.create_client(Trigger, self._topic_stop) # Joint-state subscriber for finger width feedback self._joint_states_sub = self._node.create_subscription( JointState, self._topic_joint_states, self._joint_state_callback, 10, ) self.Message("Created", 2)
def __del__(self) -> None: """Shut down the internal node and executor when used standalone.""" if getattr(self, "_own_node", False): try: self._executor.shutdown(timeout_sec=1.0) self._node.destroy_node() except Exception: pass # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _joint_state_callback(self, msg: JointState) -> None: """ Store the latest joint-state message received from the gripper. Parameters ---------- msg : JointState Joint-state message received from ROS 2. """ if msg.position: self._width = sum(msg.position) def _wait_future(self, future: Any, timeout_sec: float) -> bool: """Poll a future until done or timeout. Returns True if done.""" elapsed = 0.0 while not future.done() and elapsed < timeout_sec: sleep(0.05) elapsed += 0.05 return future.done() def _send_goal_sync(self, client: ActionClient, goal: Any, timeout_sec: float = 10.0) -> Optional[Any]: """ Send an action goal and block until the result is available. The robot's background executor processes the futures; this method polls them without stalling that thread. Returns the action result, or ``None`` on timeout or rejection. """ if not client.wait_for_server(timeout_sec=2.0): self._node.get_logger().warning(f"Action server not available: {client._action_name}") return None send_future = client.send_goal_async(goal) if not self._wait_future(send_future, timeout_sec): self._node.get_logger().error("Timeout waiting for goal acceptance.") return None goal_handle = send_future.result() if not goal_handle.accepted: self._node.get_logger().warning("Goal was rejected by action server.") return None result_future = goal_handle.get_result_async() if not self._wait_future(result_future, timeout_sec): self._node.get_logger().error("Timeout waiting for action result.") return None return result_future.result().result # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ @property def width(self) -> float: """ Get the last known gripper width from joint-state feedback. Returns ------- float Current gripper finger width in metres. """ return self._width
[docs] def GetState(self) -> str: """ Return the current state of the gripper as a human-readable string. Returns ------- str ``"Opened"``, ``"Closed"``, or ``"Undefined"``. """ if self._state == 0: return "Opened" elif self._state == 1: return "Closed" else: return "Undefined"
[docs] def Grasp(self, width: float, speed: float = 0.1, force: float = 5.0, eps: float = 0.005) -> bool: """ Grasp an object with the gripper. Parameters ---------- width : float Target grasp width in metres. speed : float, optional Closing speed in m/s (default 0.1). force : float, optional Grasp force in N (default 5.0). eps : float, optional Inner and outer epsilon tolerance in metres (default 0.005). Returns ------- bool ``True`` if the grasp was reported as successful, ``False`` otherwise. """ goal = Grasp.Goal() goal.width = float(width) goal.speed = float(speed) goal.force = float(force) goal.epsilon.inner = float(eps) goal.epsilon.outer = float(eps) result = self._send_goal_sync(self._client_grasp, goal) success = result.success if result is not None else False if success: self._state = 1 return success
[docs] def Move(self, width: float, speed: float = 0.1) -> bool: """ Move the gripper to a specified width. Parameters ---------- width : float Target width in metres. speed : float, optional Desired speed in m/s (default 0.1). Returns ------- bool ``True`` if the move was reported as successful, ``False`` otherwise. """ self._width = abs(min(width, self._width_max)) self._speed = abs(min(speed, self._speed_max)) self._state = -1 goal = Move.Goal() goal.width = self._width goal.speed = self._speed result = self._send_goal_sync(self._client_move, goal) return result.success if result is not None else False
[docs] def Homing(self) -> bool: """ Home the gripper and open it fully. Returns ------- bool ``True`` if homing was reported as successful, ``False`` otherwise. """ goal = Homing.Goal() result = self._send_goal_sync(self._client_homing, goal) success = result.success if result is not None else False if success: self._state = 0 return success
[docs] def Stop(self, timeout_sec: float = 1.0) -> Optional[bool]: """ Stop any ongoing gripper action. Parameters ---------- timeout_sec : float, optional Timeout in seconds for the service call (default 1.0). Returns ------- bool or None ``True`` if the stop service call succeeded, ``None`` if it was unavailable or timed out. """ if not self._client_stop.wait_for_service(timeout_sec=timeout_sec): self._node.get_logger().warning("Stop service not available.") return None request = Trigger.Request() future = self._client_stop.call_async(request) elapsed = 0.0 step = 0.05 while rclpy.ok() and not future.done() and elapsed < timeout_sec: sleep(step) elapsed += step try: response = future.result() return response.success except Exception as e: self._node.get_logger().error(f"Stop service call failed: {e}") return None
if __name__ == "__main__": import time rclpy.init() # Standalone usage — no robot needed gripper = FrankaGripper() print(f"Gripper state: {gripper.GetState()}") print(f"Gripper width: {gripper.width:.4f} m") print("Homing...") ok = gripper.Homing() print(f" Homing result: {ok} state: {gripper.GetState()}") time.sleep(0.5) print("Moving to 0.04 m...") ok = gripper.Move(0.04) print(f" Move result: {ok} width: {gripper.width:.4f} m") time.sleep(0.5) print("Grasping at 0.00 m (force=10 N)...") ok = gripper.Grasp(0.00, force=10.0) print(f" Grasp result: {ok} state: {gripper.GetState()}") time.sleep(0.5) print("Stop to release...") gripper.Stop() print(f" Final state: {gripper.GetState()}") del gripper rclpy.shutdown()