"""Robot interfaces for CoppeliaSim via the ZMQ Remote API.
This module provides RobotBlockSet robot backends for CoppeliaSim. It includes
the generic `robot_coppelia` interface for state acquisition, joint-space
commanding, object pose access, and simulator lifecycle handling, together with
concrete robot wrappers such as `lwr`, `ur10`, `panda`, and `iiwa` that define
robot-specific joint naming and base-frame conventions.
Copyright (c) 2025 Jozef Stefan Institute
Authors: Leon Zlajpah.
"""
import numpy as np
from typing import Union, Optional, Any
from time import sleep
import ctypes
from robotblockset.tools import isvector, vector
from robotblockset.transformations import map_pose, r2q, world2frame, rp2t, rot_z
from robotblockset.robot_spec import panda_spec, lwr_spec, iiwa_spec, ur10_spec # , ur5_spec, ur5e_spec, ur10e_spec
from robotblockset.robots import robot, MotionResultCodes
from robotblockset.rbs_typing import ArrayLike, HomogeneousMatrixType, JointConfigurationType, JointTorqueType, JointVelocityType, Pose3DType, RotationMatrixType, Vector3DType
try:
from coppeliasim_zmqremoteapi_client import RemoteAPIClient
except Exception as e:
raise e from RuntimeError("Python interface for CoppeliaSim not installed. \nYou can install it through pip:\n pip install coppeliasim-zmqremoteapi-client")
c_float_p = ctypes.POINTER(ctypes.c_float)
[docs]
class robot_coppelia(robot):
"""
Interface class for controlling a robot in CoppeliaSim via RemoteAPI.
This class extends the base `robot` class and provides simulation-specific
setup and naming conventions for joints, end effectors, and sensors. It uses
the CoppeliaSim RemoteAPI to establish communication with the simulation.
Attributes
----------
Name : str
Name of the RobotBlockSet robot instance.
BaseName : str
Base name of the robot in the CoppeliaSim scene hierarchy.
JointNames : list[str] or None
Explicit list of joint object names, if configured.
JointNamesAlias : str or None
Alias used when joints are accessed by indexed object names.
FlangeName : str
Name of the end-effector flange object in the scene.
TCPName : str
Name of the tool center point object in the scene.
SensorForceName : str
Name of the force-torque sensor object in the scene.
client : RemoteAPIClient
Remote API client connected to CoppeliaSim.
sim : Any
Simulation API object returned by the remote client.
"""
[docs]
def __init__(self, robot_name: str, host: str = "localhost", port: int = 23000, **kwargs: Any) -> None:
"""Create a CoppeliaSim-backed robot interface.
Parameters
----------
robot_name : str
Base name of the robot used in the CoppeliaSim scene.
host : str, optional
Host running the CoppeliaSim ZMQ Remote API server.
port : int, optional
Port used by the CoppeliaSim ZMQ Remote API server.
**kwargs : Any
Additional keyword arguments used to configure joint, flange, TCP,
and sensor object names.
Returns
-------
None
This constructor initializes the CoppeliaSim robot interface in place.
"""
robot.__init__(self, **kwargs)
self.Name = robot_name + "_Coppelia"
self.BaseName = robot_name
kwargs.setdefault("JointNames", None)
kwargs.setdefault("JointNamesTemplate", None)
kwargs.setdefault("JointNamesAlias", None)
if kwargs["JointNames"] is not None:
self.JointNames = kwargs["JointNames"]
else:
self.JointNames = []
for i in range(self.nj):
if kwargs["JointNamesTemplate"] is not None:
self.JointNames.append(kwargs["JointNamesTemplate"] % (i + 1))
elif kwargs["JointNamesAlias"] is not None:
self.JointNames = None
self.JointNamesAlias = kwargs["JointNamesAlias"]
else:
self.JointNames.append(f"{self.BaseName}_joint{i + 1}")
kwargs.setdefault("FlangeName", None)
if kwargs["FlangeName"] is not None:
self.FlangeName = kwargs["FlangeName"]
else:
self.FlangeName = "EE"
kwargs.setdefault("TCPName", None)
if kwargs["TCPName"] is not None:
self.TCPName = kwargs["TCPName"]
else:
self.TCPName = "TCP"
kwargs.setdefault("SensorForceName", None)
if kwargs["SensorForceName"] is not None:
self.SensorForceName = kwargs["SensorForceName"]
else:
self.SensorForceName = "FT_sensor"
self._control_strategy = "JointPosition"
self.client = RemoteAPIClient(host=host, port=port)
self.sim = self.client.require("sim")
if not self.isReady():
self.sim.startSimulation()
sleep(0.5)
self.tsamp = 0.05
_ts = round(self.sim.getSimulationTimeStep(), 3)
if self.tsamp < _ts:
self.WarningMessage(f"tsamp={self.tsamp} is smaller than simulation time step ({_ts}) and will be set to t={_ts}")
self.tsamp = _ts
self.Message("Robot connected to Coppelia", 1)
self.Init()
def __del__(self) -> None:
"""Release the CoppeliaSim robot interface.
Returns
-------
None
This destructor closes the simulation interface.
"""
self.Close()
[docs]
def Close(self) -> None:
"""
Close or clean up the simulation interface.
Currently does not stop the simulation explicitly.
Returns
-------
None
This method is reserved for simulation cleanup.
"""
# self.sim.stopSimulation()
pass
[docs]
def Init(self) -> None:
"""
Initialize the robot from the CoppeliaSim scene.
This method performs the following:
- Retrieves the robot base handle.
- Computes the robot's base transform matrix.
- Retrieves joint handles based on configured joint names or aliases.
- Retrieves and computes the TCP (Tool Center Point) transform relative to the end effector.
- Retrieves the force/torque sensor handle if defined.
- Initializes simulation-related internal state, joint positions, and handles.
Raises
------
ValueError
If the robot base is not found in the CoppeliaSim scene.
Returns
-------
None
This method initializes simulator handles and cached robot state.
"""
h = self.sim.getObject(f"/{self.BaseName}", {"noError": True})
if h == -1:
raise ValueError(f"Robot {self.BaseName} not found in the scene. ")
else:
self._BaseHandle = h
self.TBase = map_pose(x=self.sim.getObjectPose(self._BaseHandle | self.sim.handleflag_wxyzquat), out="T") @ np.linalg.inv(self.BaseOffset)
self._JointHandles = [None] * self.nj
self._actual.q = [None] * self.nj
self._actual.qdot = [None] * self.nj
for i in range(self.nj):
if self.JointNames is None and self.JointNamesAlias is not None:
self._JointHandles[i] = self.sim.getObject(f"/{self.BaseName}/{self.JointNamesAlias}", {"index": i})
else:
self._JointHandles[i] = self.sim.getObject(f"/{self.BaseName}/{self.JointNames[i]}")
if self.FlangeName is not None and self.TCPName is not None:
self._TCPHandle = self.sim.getObject(f"/{self.BaseName}/{self.TCPName}", {"noError": True})
self._EEHandle = self.sim.getObject(f"/{self.BaseName}/{self.FlangeName}", {"noError": True})
if self._TCPHandle > -1 and self._EEHandle > -1:
_TTCP = map_pose(x=self.sim.getObjectPose(self._TCPHandle | self.sim.handleflag_wxyzquat), out="T")
_TEE = map_pose(x=self.sim.getObjectPose(self._EEHandle | self.sim.handleflag_wxyzquat), out="T")
self.TCP = np.linalg.inv(_TEE) @ _TTCP
else:
self._TCPHandle = None
self._EEHandle = None
self.TCP = np.eye(4)
else:
self._TCPHandle = None
self._EEHandle = None
self.TCP = np.eye(4)
if self.SensorForceName is not None:
self._SensorForceHandle = self.sim.getObject(f"/{self.BaseName}/{self.SensorForceName}", {"noError": True})
if self._SensorForceHandle == -1:
self._SensorForceHandle = None
self.InitObject()
self.GetState()
self.ResetCurrentTarget()
self.ResetTime()
self.Message("Initialized", 2)
[docs]
def GetState(self) -> None:
"""
Update the robot's internal state from the CoppeliaSim simulation.
This method queries joint positions, velocities, and torques, along with the
TCP pose and velocity (if available), and force/torque sensor data (if available).
The update occurs only if the elapsed simulation time exceeds the sampling period.
Updates the following internal state attributes:
- self._actual.q : Joint positions
- self._actual.qdot : Joint velocities
- self._actual.trq : Joint torques
- self._actual.x : TCP pose (in base frame)
- self._actual.v : TCP velocity (in base frame)
- self._actual.FT : Force/torque sensor readings
- self._actual.trqExt : External joint torques (zeroed by default)
Notes
-----
- Uses self.tsamp as the minimum time interval between updates.
- Falls back to forward kinematics if TCP is not defined.
- Sets force/torque to zeros if sensor is unavailable or inactive.
Returns
-------
None
This method refreshes the internal robot state caches.
"""
if (self.simtime() - self._last_update) > self.tsamp:
for i in range(self.nj):
self._actual.q[i] = self.joint_k * self.sim.getJointPosition(self._JointHandles[i])
self._actual.qdot[i] = self.joint_k * self.sim.getJointVelocity(self._JointHandles[i])
self._actual.trq[i] = self.joint_k * self.sim.getJointForce(self._JointHandles[i])
if self._TCPHandle is not None:
_x = self.sim.getObjectPose(self._TCPHandle | self.sim.handleflag_wxyzquat, -1)
self._actual.x = self.WorldToBase(_x)
_pd, _w = self.sim.getObjectVelocity(self._TCPHandle, -1)
_v = np.concatenate([_pd, _w])
self._actual.v = self.WorldToBase(_v, typ="Twist")
else:
x, J = self.Kinmodel(self._actual.q)
self._actual.x = x
self._actual.v = J @ self._actual.qdot
if self._SensorForceHandle is not None:
_res, _F, _T = self.sim.readForceSensor(self._SensorForceHandle)
if _res > 0:
_FT = np.concatenate((_F, _T))
if self.FTSensorFrame is None:
_frame = self.TCP
else:
_frame = self.FTSensorFrame
_FT2TCP = np.linalg.pinv(_frame) @ self.TCP
_FT = world2frame(_FT, _FT2TCP, typ="Wrench")
self._actual.FT = _FT
else:
self._actual.FT = np.zeros(6)
self._actual.trqExt = np.zeros(self.nj)
self._tt = self.simtime()
self._last_update = self.simtime()
[docs]
def isReady(self) -> bool:
"""
Check whether the simulation is running.
Returns
-------
bool
True if the simulation is not in a stopped state, False otherwise.
"""
return self.sim.getSimulationState() != self.sim.simulation_stopped
[docs]
def isActive(self) -> bool:
"""
Check if the robot target is active.
Returns
-------
bool
Indicating if the robot target is active.
"""
return True
[docs]
def Restart(self, qpos: Optional[JointConfigurationType] = None) -> None:
"""
Restart the simulation.
Stops and restarts the CoppeliaSim simulation. This method waits briefly
after each step to allow the simulator to cleanly stop and start.
Parameters
----------
qpos : JointConfigurationType, optional
Placeholder for future support of setting initial joint positions after restart.
Currently unused.
Returns
-------
None
This method restarts the CoppeliaSim simulation.
"""
self.sim.stopSimulation()
t0 = self.simtime()
while self.sim.getSimulationState() != self.sim.simulation_stopped:
if (self.simtime() - t0) > 10.0:
self.WarningMessage("Simulation did not stop within 10 seconds", 2)
return
sleep(0.1)
self.sim.startSimulation()
sleep(1.0)
[docs]
def GoTo_q(self, q: JointConfigurationType, qdot: Optional[JointVelocityType] = None, trq: Optional[JointTorqueType] = None, wait: Optional[float] = None, **kwargs: Any) -> int:
"""Update joint positions and wait
This method sets the commanded joint positions (`q`), velocities (`qdot`), and torques (`trq`),
then sends them to the robot and waits for the specified time (`wait`).
Parameters
----------
q : JointConfigurationType
Desired joint positions (nj,).
qdot : JointVelocityType, optional
Desired joint velocities (nj,).
trq : JointTorqueType, optional
Desired joint torques (nj,).
wait : float, optional
Time to wait (in seconds) after commanding the robot to move.
Returns
-------
int
Status of the move (0 for success, non-zero for error).
"""
if qdot is None:
qdot = np.zeros(self.nj)
else:
qdot = vector(qdot, dim=self.nj)
if trq is None:
trq = np.zeros(self.nj)
else:
trq = vector(trq, dim=self.nj)
if wait is None:
wait = self.tsamp
self._synchro_control(wait)
for i in range(self.nj):
self.sim.setJointTargetPosition(self._JointHandles[i], self.joint_k * q[i])
self._command.q = q
self._command.qdot = qdot
self._command.trq = trq
if np.floor(self._command.mode) == 1:
x, J = self.Kinmodel(q)
self._command.x = x
self._command.v = J @ qdot
self.Update()
return MotionResultCodes.MOTION_SUCCESS.value
[docs]
def SetStrategy(self, strategy: str) -> None:
"""
Set the robot's control strategy.
Parameters
----------
strategy : str
Name of the control strategy to be used (e.g., 'JointPosition').
Returns
-------
None
This placeholder currently does not change controller behavior.
Notes
-----
This method is currently a placeholder and does not implement any behavior.
"""
pass
[docs]
def GetObjectPose(self, ide: Union[str, int], out: str = "x") -> Optional[Union[Pose3DType, HomogeneousMatrixType, Vector3DType, RotationMatrixType]]:
"""
Retrieve the pose of an object from the simulation.
Parameters
----------
ide : Union[str, int]
Object name or handle.
out : str, optional
Output format.
Returns
-------
Optional[Union[Pose3DType, HomogeneousMatrixType, Vector3DType, RotationMatrixType]]
Pose in the requested format if found, else None.
"""
if isinstance(ide, str):
handle = self.sim.getObject(ide, {"noError": True})
if handle == -1:
self.Message(f"No object with name '{ide}' exists", 2)
return None
else:
handle = ide
return map_pose(x=self.sim.getObjectPose(handle | self.sim.handleflag_wxyzquat, -1), out=out)
[docs]
def SetObjectPose(self, ide: Union[str, int], x: ArrayLike) -> None:
"""
Set the pose of an object in the simulation.
Parameters
----------
ide : Union[str, int]
Object name or body ID.
x : ArrayLike
Pose to be set. Supported formats:
- 4x4 transformation matrix
- 3x3 rotation matrix
- 7D vector [pos(3), quat(4)]
- 3D position vector
- 4D quaternion
Returns
-------
None
This method updates the pose of the selected simulation object.
Raises
------
ValueError
If the shape of `x` is not one of the supported formats.
"""
if self.isReady():
if isinstance(ide, str):
handle = self.sim.getObject(ide, {"noError": True})
if handle == -1:
self.Message(f"No object with name '{ide}' exists", 2)
return None
else:
handle = ide
x = self.spatial(x)
xx = self.GetObjectPose(handle)
if x.shape == (4, 4):
xx = map_pose(T=x)
elif x.shape == (3, 3):
xx[3:] = r2q(x)
elif isvector(x, dim=7):
xx = x
elif isvector(x, dim=3):
xx[:3] = x
elif isvector(x, dim=4):
xx[3:] = x
else:
raise ValueError(f"Parameter shape {x.shape} not supported")
self.sim.setObjectPose(handle | self.sim.handleflag_wxyzquat, xx.tolist())
[docs]
def UpdateRobotBaseFromModel(self) -> HomogeneousMatrixType:
"""Update the cached robot base transform from the CoppeliaSim model.
Returns
-------
HomogeneousMatrixType
Current robot base transform in homogeneous matrix form.
"""
if self._BaseHandle is not None:
self.TBase = map_pose(x=self.sim.getObjectPose(self._BaseHandle | self.sim.handleflag_wxyzquat), out="T") @ np.linalg.inv(self.BaseOffset)
return self.TBase
[docs]
class lwr(robot_coppelia, lwr_spec):
"""CoppeliaSim wrapper for the KUKA LWR manipulator."""
[docs]
def __init__(self, robot_name: str = "LBR4p", **kwargs: Any) -> None:
"""Create a CoppeliaSim KUKA LWR robot wrapper.
Parameters
----------
robot_name : str, optional
Robot object name in the CoppeliaSim scene.
**kwargs : Any
Additional keyword arguments passed to `robot_coppelia.__init__`.
Returns
-------
None
This constructor initializes the LWR robot wrapper in place.
"""
lwr_spec.__init__(self)
self.joint_k = [1.0, -1.0, 1.0, 1.0, 1.0, -1.0, 1.0]
self.BaseOffset = rp2t(rot_z(np.pi), [0.0, 0.0, 0.059]) # Robot base offset between Coppelia and correct model
self.joint_k = 1.0 # Joint position factors between Coppelia and correct model
robot_coppelia.__init__(self, robot_name, **kwargs)
def __del__(self) -> None:
robot_coppelia.__del__(self)
self.Message("Robot deleted", 2)
[docs]
class ur10(robot_coppelia, ur10_spec):
"""CoppeliaSim wrapper for the Universal Robots UR10 manipulator."""
[docs]
def __init__(self, robot_name: str = "UR10", **kwargs: Any) -> None:
"""Create a CoppeliaSim UR10 robot wrapper.
Parameters
----------
robot_name : str, optional
Robot object name in the CoppeliaSim scene.
**kwargs : Any
Additional keyword arguments passed to `robot_coppelia.__init__`.
Returns
-------
None
This constructor initializes the UR10 robot wrapper in place.
"""
ur10_spec.__init__(self)
self.BaseOffset = rp2t(rot_z(np.pi), [0.0, 0.0, 0.019]) # Robot base offset between Coppelia and correct model
self.joint_k = 1.0 # Joint position factors between Coppelia and correct model
robot_coppelia.__init__(self, robot_name, **kwargs)
def __del__(self) -> None:
robot_coppelia.__del__(self)
self.Message("Robot deleted", 2)
[docs]
class panda(robot_coppelia, panda_spec):
"""CoppeliaSim wrapper for the Franka Emika Panda manipulator."""
[docs]
def __init__(self, robot_name: str = "Franka", JointNamesAlias: str = "joint", **kwargs: Any) -> None:
"""Create a CoppeliaSim Franka Panda robot wrapper.
Parameters
----------
robot_name : str, optional
Robot object name in the CoppeliaSim scene.
JointNamesAlias : str, optional
Indexed alias used to access joint objects in the scene.
**kwargs : Any
Additional keyword arguments passed to `robot_coppelia.__init__`.
Returns
-------
None
This constructor initializes the Panda robot wrapper in place.
"""
panda_spec.__init__(self)
self.BaseOffset = rp2t(np.eye(3), [-0.04873, 0, 0.07]) # Robot base offset between Coppelia and correct model
self.joint_k = 1.0 # Joint position factors between Coppelia and correct model
robot_coppelia.__init__(self, robot_name, JointNamesAlias=JointNamesAlias, **kwargs)
def __del__(self) -> None:
robot_coppelia.__del__(self)
self.Message("Robot deleted", 2)
[docs]
class iiwa(robot_coppelia, iiwa_spec):
"""CoppeliaSim wrapper for the KUKA iiwa manipulator."""
[docs]
def __init__(self, robot_name: str = "iiwa", JointNamesAlias: str = "joint", **kwargs: Any) -> None:
"""Create a CoppeliaSim KUKA iiwa robot wrapper.
Parameters
----------
robot_name : str, optional
Robot object name in the CoppeliaSim scene.
JointNamesAlias : str, optional
Indexed alias used to access joint objects in the scene.
**kwargs : Any
Additional keyword arguments passed to `robot_coppelia.__init__`.
Returns
-------
None
This constructor initializes the iiwa robot wrapper in place.
"""
iiwa_spec.__init__(self)
self.BaseOffset = rp2t(np.eye(3), [0.00726, 0.00005, 0.07866]) # Robot base offset between Coppelia and correct model
self.joint_k = 1.0 # Joint position factors between Coppelia and correct model
robot_coppelia.__init__(self, robot_name, JointNamesAlias=JointNamesAlias, **kwargs)
def __del__(self) -> None:
robot_coppelia.__del__(self)
self.Message("Robot deleted", 2)
if __name__ == "__main__":
# import sys
# sys.path.append("robot_models")
# sys.path.append("sim")
# sys.path.append("..")
print("Start")
r = panda(SensorForceName="connection", host="178.172.42.81")
r.JMove(r.q_home + 0.2)
print("q: ", r.q)
print("x: ", r.x)