"""Socket-based client interface to the MuJoCo server backend.
This module defines the low-level communication layer used to exchange data
with an external MuJoCo simulation server. It includes command and error
constants, lightweight data containers for MuJoCo model and runtime state, and
the `mjInterface` client class used to connect to the server, load models,
control simulation execution, and get or set kinematic, dynamic, sensor, and
visualization data.
Copyright (c) 2024 Jozef Stefan Institute
Authors: Leon Zlajpah.
"""
# pylint: disable=invalid-name
import socket
import numpy as np
import struct
from threading import Semaphore
# ----------------------------- MuJoCo API constants -------------------------------------
BUFSZ = 100000
# Communication errors
mjCOM_OK = 0 # success
# Server-to-client errors
mjCOM_BADSIZE = -1 # data has invalid size
mjCOM_BADINDEX = -2 # object has invalid index
mjCOM_BADTYPE = -3 # invalid object type
mjCOM_BADCOMMAND = -4 # unknown command
mjCOM_NOMODEL = -5 # model has not been loaded
mjCOM_CANNOTSEND = -6 # could not send data
mjCOM_CANNOTRECV = -7 # could not receive data
mjCOM_TIMEOUT = -8 # receive timeout
# Client-side errors
mjCOM_NOCONNECTION = -9 # connection not established
mjCOM_CONNECTED = -10 # already connected
# Communication commands
# client-to-server
mjCOM_INFO = 3 # get model info
mjCOM_STEP = 4 # advance simulation in paused mode
mjCOM_UPDATE = 5 # set control, advance, get sensor data
mjCOM_RESET = 6 # reset simulation
mjCOM_PAUSE = 7 # pause simulation
mjCOM_RUN = 8 # run simulation
mjCOM_EQUALITY = 9 # set state of equality constraint
mjCOM_MESSAGE = 10 # show text message
mjCOM_NAME2ID = 11 # convert object name to id
mjCOM_ID2NAME = 12 # convert object id to name
# load model
mjCOM_LOAD = 13 # load MuJoCo model
# get dynamic data
mjCOM_GETSTATE = 16 # state
mjCOM_GETCONTROL = 17 # control
mjCOM_GETAPPLIED = 18 # applied forces
mjCOM_GETONEBODY = 19 # detailed info for one body
mjCOM_GETMOCAP = 20 # mocap
mjCOM_GETDYNAMICS = 21 # output of forward dynamics
mjCOM_GETSENSOR = 22 # sensor data
mjCOM_GETBODY = 23 # body kinematics
mjCOM_GETGEOM = 24 # geom kinematics
mjCOM_GETGEOMSIZE = 25 # geom kinematics
mjCOM_GETSITE = 26 # site kinematics
mjCOM_GETTENDON = 27 # tendon kinematics
mjCOM_GETACTUATOR = 28 # actuator kinematics and force
mjCOM_GETFORCE = 29 # generalized forces
mjCOM_GETCONTACT = 30 # contact info
# set dynamic data
mjCOM_SETSTATE = 48 # state
mjCOM_SETCONTROL = 49 # control
mjCOM_SETAPPLIED = 50 # applied forces
mjCOM_SETONEBODY = 51 # detailed info for one body
mjCOM_SETMOCAP = 52 # mocap
mjCOM_SETGEOMSIZE = 53 # geom size
# get and set rgba static data
mjCOM_GETRGBA = 64 # object rgba
mjCOM_SETRGBA = 65 # object rgba
# simulation control
mjCOM_SCREENSHOT = 66 # make a screenshot request
mjCOM_GETCAMERA = 67 # camera info
mjCOM_GETGLCAMERA = 68 # active (GL) camera info
# Type of geometric shape
mjGEOM_PLANE = 0 # plane
mjGEOM_HFIELD = 1 # height field
mjGEOM_SPHERE = 2 # sphere
mjGEOM_CAPSULE = 3 # capsule
mjGEOM_ELLIPSOID = 4 # ellipsoid
mjGEOM_CYLINDER = 5 # cylinder
mjGEOM_BOX = 6 # box
mjGEOM_MESH = 7 # mesh
# Sensors
# common robotic sensors, attached to a site
mjSENS_TOUCH = 0 # scalar contact normal forces summed over sensor zone
mjSENS_ACCELEROMETER = 1 # 3D linear acceleration, in local frame
mjSENS_VELOCIMETER = 2 # 3D linear velocity, in local frame
mjSENS_GYRO = 3 # 3D angular velocity, in local frame
mjSENS_FORCE = 4 # 3D force between site's body and its parent body
mjSENS_TORQUE = 5 # 3D torque between site's body and its parent body
mjSENS_MAGNETOMETER = 6 # 3D magnetometer
mjSENS_RANGEFINDER = 7 # scalar distance to nearest geom or site along z-axis
# sensors related to scalar joint tendons, actuatorss, tendons, actuators
mjSENS_JOINTPOS = 8 # scalar joint position (hinge and slide only)
mjSENS_JOINTVEL = 9 # scalar joint velocity (hinge and slide only)
mjSENS_TENDONPOS = 10 # scalar tendon position
mjSENS_TENDONVEL = 11 # scalar tendon velocity
mjSENS_ACTUATORPOS = 12 # scalar actuator position
mjSENS_ACTUATORVEL = 13 # scalar actuator velocity
mjSENS_ACTUATORFRC = 14 # scalar actuator force
# sensors related to ball joints
mjSENS_BALLQUAT = 15 # 4D ball joint quaterion
mjSENS_BALLANGVEL = 16 # 3D ball joint angular velocity
# sensors attached to an object h spatial frame: (x)body, geom, site, camerawith spatial frame: (x)body, geom, site, camera
mjSENS_FRAMEPOS = 17 # 3D position
mjSENS_FRAMEQUAT = 18 # 4D unit quaternion orientation
mjSENS_FRAMEXAXIS = 19 # 3D unit vector: x-axis of object's frame
mjSENS_FRAMEYAXIS = 20 # 3D unit vector: y-axis of object's frame
mjSENS_FRAMEZAXIS = 21 # 3D unit vector: z-axis of object's frame
mjSENS_FRAMELINVEL = 22 # 3D linear velocity
mjSENS_FRAMEANGVEL = 23 # 3D angular velocity
mjSENS_FRAMELINACC = 24 # 3D linear acceleration
mjSENS_FRAMEANGACC = 25 # 3D angular acceleration
# sensors related to kinematic srees; attached to a body (which is the subtree root)ubtrees; attached to a body (which is the subtree root)
mjSENS_SUBTREECOM = 26 # 3D center of mass of subtree
mjSENS_SUBTREELINVEL = 27 # 3D linear velocity of subtree
mjSENS_SUBTREEANGMOM = 28 # 3D angular momentum of subtree
# user-defined sensor
mjSENS_USER = 29 # sensor data provided by mjcb_sensor callback
# Type of joint
mjJNT_FREE = 0 # "joint" defining floating body
mjJNT_BALL = 1 # ball joint
mjJNT_SLIDE = 2 # sliding/prismatic joint
mjJNT_HINGE = 3 # hinge joint
# Type of actuator transmission
mjTRN_JOINT = 0 # force on joint
mjTRN_JOINTINPARENT = 1 # force on joint, expressed in parent frame
mjTRN_SLIDERCRANK = 2 # force via slider-crank linkage
mjTRN_TENDON = 3 # force on tendon
mjTRN_SITE = 4 # force on site
# Type of equality constraint
mjEQ_CONNECT = 0 # connect two bodies at a point (ball joint)
mjEQ_WELD = 1 # fix relative position and orientation of two bodies
mjEQ_JOINT = 2 # couple the values of two scalar joints with cubic
mjEQ_TENDON = 3 # couple the lengths of two tendons with cubic
mjEQ_DISTANCE = 4 # fix the contact distance between two geoms
# ----------------------------- MuJoCo API structures ------------------------------------
[docs]
class mjInfo:
def __init__(
self,
nq, # number of generalized positions
nv, # number of generalized velocities
na, # number of actuator activations
njnt, # number of joints
nbody, # number of bodies
ngeom, # number of geoms
nsite, # number of sites
ntendon, # number of tendons
nu, # number of actuators/controls
neq, # number of equality constraints
nkey, # number of keyframes
nmocap, # number of mocap bodies
nsensor, # number of sensors
nsensordata, # number of elements in sensor data array
nmat, # number of materials
ncam, # number of cameras
timestep, # simulation timestep
apirate, # API update rate (same as hxRobotInfo.update_rate)
sensor_type=None, # sensor type (mjtSensor)
sensor_datatype=None, # type of sensorized object
sensor_objtype=None, # type of sensorized object
sensor_objid=None, # id of sensorized object
sensor_dim=None, # number of (scalar) sensor outputs
sensor_adr=None, # address in sensor data array
sensor_noise=None, # noise standard deviation
jnt_type=None, # joint type (mjtJoint)
jnt_bodyid=None, # id of body to which joint belongs
jnt_qposadr=None, # address of joint position data in qpos
jnt_dofadr=None, # address of joint velocity data in qvel
jnt_range=None, # joint range; (0,0): no limits
geom_type=None, # geom type (mjtGeom)
geom_bodyid=None, # id of body to which geom is attached
eq_type=None, # equality constraint type (mjtEq)
eq_obj1id=None, # id of constrained object
eq_obj2id=None, # id of 2nd constrained object; -1 if not applicable
actuator_trntype=None, # transmission type (mjtTrn)
actuator_trnid=None, # transmission target id
actuator_ctrlrange=None, # actuator control range; (0,0): no limits
):
self.nq = nq
self.nv = nv
self.na = na
self.njnt = njnt
self.nbody = nbody
self.ngeom = ngeom
self.nsite = nsite
self.ntendon = ntendon
self.nu = nu
self.neq = neq
self.nkey = nkey
self.nmocap = nmocap
self.nsensor = nsensor
self.nsensordata = nsensordata
self.nmat = nmat
self.ncam = ncam
self.timestep = timestep
self.apirate = apirate
self.sensor_type = sensor_type if sensor_type is not None else []
self.sensor_datatype = sensor_datatype if sensor_datatype is not None else []
self.sensor_objtype = sensor_objtype if sensor_objtype is not None else []
self.sensor_objid = sensor_objid if sensor_objid is not None else []
self.sensor_dim = sensor_dim if sensor_dim is not None else []
self.sensor_adr = sensor_adr if sensor_adr is not None else []
self.sensor_noise = sensor_noise if sensor_noise is not None else []
self.jnt_type = jnt_type if jnt_type is not None else []
self.jnt_bodyid = jnt_bodyid if jnt_bodyid is not None else []
self.jnt_qposadr = jnt_qposadr if jnt_qposadr is not None else []
self.jnt_dofadr = jnt_dofadr if jnt_dofadr is not None else []
self.jnt_range = jnt_range if jnt_range is not None else []
self.geom_type = geom_type if geom_type is not None else []
self.geom_bodyid = geom_bodyid if geom_bodyid is not None else []
self.eq_type = eq_type if eq_type is not None else []
self.eq_obj1id = eq_obj1id if eq_obj1id is not None else []
self.eq_obj2id = eq_obj2id if eq_obj2id is not None else []
self.actuator_trntype = actuator_trntype if actuator_trntype is not None else []
self.actuator_trnid = actuator_trnid if actuator_trnid is not None else []
self.actuator_ctrlrange = actuator_ctrlrange if actuator_ctrlrange is not None else []
[docs]
def serialize(self):
data = bytearray(struct.pack("<16i2f", self.nq, self.nv, self.na, self.njnt, self.nbody, self.ngeom, self.nsite, self.ntendon, self.nu, self.neq, self.nkey, self.nmocap, self.nsensor, self.nsensordata, self.nmat, self.ncam, self.timestep, self.apirate))
data.extend(struct.pack(f"{self.nsensor}i", *self.sensor_type))
data.extend(struct.pack(f"{self.nsensor}i", *self.sensor_datatype))
data.extend(struct.pack(f"{self.nsensor}i", *self.sensor_objtype))
data.extend(struct.pack(f"{self.nsensor}i", *self.sensor_objid))
data.extend(struct.pack(f"{self.nsensor}i", *self.sensor_dim))
data.extend(struct.pack(f"{self.nsensor}i", *self.sensor_adr))
data.extend(struct.pack(f"{self.nsensor}f", *self.sensor_noise))
data.extend(struct.pack(f"{self.njnt}i", *self.jnt_type))
data.extend(struct.pack(f"{self.njnt}i", *self.jnt_bodyid))
data.extend(struct.pack(f"{self.njnt}i", *self.jnt_qposadr))
data.extend(struct.pack(f"{self.njnt}i", *self.jnt_dofadr))
for row in self.jnt_range:
data.extend(struct.pack(f"{len(row)}f", *row))
data.extend(struct.pack(f"{self.ngeom}i", *self.geom_type))
data.extend(struct.pack(f"{self.ngeom}i", *self.geom_bodyid))
data.extend(struct.pack(f"{self.neq}i", *self.eq_type))
data.extend(struct.pack(f"{self.neq}i", *self.eq_obj1id))
data.extend(struct.pack(f"{self.neq}i", *self.eq_obj2id))
data.extend(struct.pack(f"{self.nu}i", *self.actuator_trntype))
for row in self.actuator_trnid:
data.extend(struct.pack(f"{len(row)}i", *row))
for row in self.actuator_ctrlrange:
data.extend(struct.pack(f"{len(row)}f", *row))
return data
[docs]
@staticmethod
def deserialize(data):
if not data:
return None
off = 0
(
nq,
nv,
na,
njnt,
nbody,
ngeom,
nsite,
ntendon,
nu,
neq,
nkey,
nmocap,
nsensor,
nsensordata,
nmat,
ncam,
timestep,
apirate,
) = struct.unpack("<16i2f", data[off : off + 72])
off += 72
sensor_type = list(struct.unpack(f"{nsensor}i", data[off : off + nsensor * 4]))
off += nsensor * 4
sensor_datatype = list(struct.unpack(f"{nsensor}i", data[off : off + nsensor * 4]))
off += nsensor * 4
sensor_objtype = list(struct.unpack(f"{nsensor}i", data[off : off + nsensor * 4]))
off += nsensor * 4
sensor_objid = list(struct.unpack(f"{nsensor}i", data[off : off + nsensor * 4]))
off += nsensor * 4
sensor_dim = list(struct.unpack(f"{nsensor}i", data[off : off + nsensor * 4]))
off += nsensor * 4
sensor_adr = list(struct.unpack(f"{nsensor}i", data[off : off + nsensor * 4]))
off += nsensor * 4
sensor_noise = np.array(struct.unpack(f"{nsensor}f", data[off : off + nsensor * 4]))
off += nsensor * 4
jnt_type = list(struct.unpack(f"{njnt}i", data[off : off + njnt * 4]))
off += njnt * 4
jnt_bodyid = list(struct.unpack(f"{njnt}i", data[off : off + njnt * 4]))
off += njnt * 4
jnt_qposadr = list(struct.unpack(f"{njnt}i", data[off : off + njnt * 4]))
off += njnt * 4
jnt_dofadr = list(struct.unpack(f"{njnt}i", data[off : off + njnt * 4]))
off += njnt * 4
jnt_range = []
for _ in range(njnt):
row = list(struct.unpack("2f", data[off : off + 8]))
off += 8
jnt_range.append(row)
jnt_range = np.array(jnt_range)
geom_type = list(struct.unpack(f"{ngeom}i", data[off : off + ngeom * 4]))
off += ngeom * 4
geom_bodyid = list(struct.unpack(f"{ngeom}i", data[off : off + ngeom * 4]))
off += ngeom * 4
eq_type = list(struct.unpack(f"{neq}i", data[off : off + neq * 4]))
off += neq * 4
eq_obj1id = list(struct.unpack(f"{neq}i", data[off : off + neq * 4]))
off += neq * 4
eq_obj2id = list(struct.unpack(f"{neq}i", data[off : off + neq * 4]))
off += neq * 4
actuator_trntype = list(struct.unpack(f"{nu}i", data[off : off + nu * 4]))
off += nu * 4
actuator_trnid = []
for _ in range(nu):
row = list(struct.unpack("2i", data[off : off + 8]))
off += 8
actuator_trnid.append(row)
actuator_ctrlrange = []
for _ in range(nu):
row = list(struct.unpack("2f", data[off : off + 8]))
off += 8
actuator_ctrlrange.append(row)
actuator_ctrlrange = np.array(actuator_ctrlrange)
return mjInfo(
nq,
nv,
na,
njnt,
nbody,
ngeom,
nsite,
ntendon,
nu,
neq,
nkey,
nmocap,
nsensor,
nsensordata,
nmat,
ncam,
timestep,
apirate,
sensor_type,
sensor_datatype,
sensor_objtype,
sensor_objid,
sensor_dim,
sensor_adr,
sensor_noise,
jnt_type,
jnt_bodyid,
jnt_qposadr,
jnt_dofadr,
jnt_range,
geom_type,
geom_bodyid,
eq_type,
eq_obj1id,
eq_obj2id,
actuator_trntype,
actuator_trnid,
actuator_ctrlrange,
)
def __str__(self):
s = ""
for field_name in self.__dict__:
s += "\n {0:s}: {1:}".format(field_name, self.__dict__[field_name])
return "{0:s}\n".format(s)
[docs]
class mjState:
def __init__(self, nq, nv, na, time, qpos=None, qvel=None, act=None):
self
self.nq = nq # number of generalized positions
self.nv = nv # number of generalized velocities
self.na = na # number of actuator activations
self.time = time # simulation time
self.qpos = qpos if qpos is not None else [] # generalized positions
self.qvel = qvel if qvel is not None else [] # generalized velocities
self.act = act if act is not None else [] # actuator activations
[docs]
def serialize(self):
data = struct.pack("iii", self.nq, self.nv, self.na)
# data += struct.pack("f", self.time) # Not send!
data += struct.pack(f"{self.nq}f", *self.qpos)
data += struct.pack(f"{self.nv}f", *self.qvel)
data += struct.pack(f"{self.na}f", *self.act)
return data
[docs]
@staticmethod
def deserialize(data):
if not data:
return None
# Deserialize the bytes object into a mjState instance
nq, nv, na = struct.unpack("iii", data[:12])
time = struct.unpack("f", data[12:16])[0]
off = 16
qpos = np.array(struct.unpack(f"{nq}f", data[off : off + nq * 4]))
off += nq * 4
qvel = np.array(struct.unpack(f"{nv}f", data[off : off + nv * 4]))
off += nv * 4
act = np.array(struct.unpack(f"{na}f", data[off : off + na * 4]))
return mjState(nq, nv, na, time, qpos, qvel, act)
def __str__(self):
s = ""
for field_name in self.__dict__:
s += "\n {0:s}: {1:}".format(field_name, self.__dict__[field_name])
return "{0:s}\n".format(s)
[docs]
class mjControl:
def __init__(self, nu=0, time=0.0, ctrl=None):
self.nu = nu # number of actuators
self.time = time # simulation time
self.ctrl = ctrl if ctrl is not None else [] # control signals
[docs]
def serialize(self):
data = bytearray(struct.pack("<i", self.nu))
# data.extend(struct.pack("f", *self.time))
data.extend(struct.pack(f"{self.nu}f", *self.ctrl))
return data
[docs]
@staticmethod
def deserialize(data):
if not data:
return None
nu, time = struct.unpack("<if", data[:8])
ctrl = np.array(struct.unpack(f"{nu}f", data[8:]))
return mjControl(nu, time, ctrl)
def __str__(self):
s = ""
for field_name in self.__dict__:
s += "\n {0:s}: {1:}".format(field_name, self.__dict__[field_name])
return "{0:s}\n".format(s)
[docs]
class mjApplied:
def __init__(self, nv=0, nbody=0, time=0, qfrc_applied=None, xfrc_applied=None):
self.nv = nv # number of generalized velocities
self.nbody = nbody # number of bodies
self.time = time # simulation time
self.qfrc_applied = qfrc_applied if qfrc_applied is not None else [] # generalized forces
self.xfrc_applied = xfrc_applied if xfrc_applied is not None else [] #
[docs]
def serialize(self):
# Serialize the applied attributes into a bytes object
data = struct.pack("ii", self.nv, self.nbody)
# data += struct.pack("f", self.time)
data += struct.pack(f"{len(self.qfrc_applied)}f", *self.qfrc_applied)
for i in range(self.nbody):
data += struct.pack(f"{6}f", *self.xfrc_applied[i])
return data
[docs]
@staticmethod
def deserialize(data):
if not data:
return None
# Deserialize the bytes object into an mjApplied instance
nv, nbody = struct.unpack("ii", data[:8])
time = struct.unpack("f", data[8:12])[0]
qfrc_applied = np.array(struct.unpack(f"{nv}f", data[12 : 12 + nv * 4]))
xfrc_applied = []
offset = 12 + nv * 4
for _ in range(nbody):
xfrc_applied.append(list(struct.unpack(f"{6}f", data[offset : offset + 6 * 4])))
offset += 6 * 4
xfrc_applied = np.array(xfrc_applied)
return mjApplied(nv, nbody, time, qfrc_applied, xfrc_applied)
def __str__(self):
s = ""
for field_name in self.__dict__:
s += "\n {0:s}: {1:}".format(field_name, self.__dict__[field_name])
return "{0:s}\n".format(s)
[docs]
class mjOneBody:
def __init__(self, bodyid=0, isfloating=0, time=0, linacc=0, angacc=0, contactforce=0, pos=None, quat=None, linvel=None, angvel=None, force=None, torque=None):
self.bodyid = bodyid # body id, provided by user
self.isfloating = isfloating # 1 if body is floating, 0 otherwise
self.time = time # simulation time
self.linacc = linacc # linear acceleration
self.angacc = angacc # angular acceleration
self.contactforce = contactforce # net force from all contacts on this body
self.pos = pos if pos is not None else [] # position
self.quat = quat if quat is not None else [] # orientation quaternion
self.linvel = linvel if linvel is not None else [] # linear velocity
self.angvel = angvel if angvel is not None else [] # angular velocity
self.force = force if force is not None else [] # Cartesian force applied to body CoM
self.torque = torque if torque is not None else [] # Cartesian torque applied to body
[docs]
def serialize(self):
# Serialize the mjOneBody attributes into a bytes object
data = struct.pack("i", self.bodyid)
# data = struct.pack("i", self.isfloating)
# data = struct.pack("f", self.time)
# data += struct.pack("3f", *self.linacc)
# data += struct.pack("3f", *self.angacc)
# data += struct.pack("3f", *self.contactforce)
data += struct.pack("3f", *self.pos)
data += struct.pack("4f", *self.quat)
data += struct.pack("3f", *self.linvel)
data += struct.pack("3f", *self.angvel)
data += struct.pack("3f", *self.force)
data += struct.pack("3f", *self.torque)
return data
[docs]
def deserialize(self, data):
if not data:
return None
# Deserialize the bytes object into an mjOneBody instance
self.isfloating, self.time = struct.unpack("if", data[:8])
self.linacc = np.array(struct.unpack("3f", data[8:20]))
self.angacc = np.array(struct.unpack("3f", data[20:32]))
self.contactforce = np.array(struct.unpack("3f", data[32:44]))
self.pos = np.array(struct.unpack("3f", data[44:56]))
self.quat = np.array(struct.unpack("4f", data[56:72]))
self.linvel = np.array(struct.unpack("3f", data[72:84]))
self.angvel = np.array(struct.unpack("3f", data[84:96]))
self.force = np.array(struct.unpack("3f", data[96:108]))
self.torque = np.array(struct.unpack("3f", data[108:120]))
return self
def __str__(self):
s = ""
for field_name in self.__dict__:
s += "\n {0:s}: {1:}".format(field_name, self.__dict__[field_name])
return "{0:s}\n".format(s)
[docs]
class mjMocap:
def __init__(self, nmocap, time, pos=None, quat=None):
self.nmocap = nmocap # number of mocap bodies
self.time = time # simulation time
self.pos = pos if pos is not None else [] # positions
self.quat = quat if quat is not None else [] # quaternion orientations
[docs]
def serialize(self):
# Serialize the mjMocap attributes into a bytes object
data = struct.pack("i", self.nmocap)
# data += struct.pack("f", self.time)
for i in range(self.nmocap):
data += struct.pack("3f", *self.pos[i])
for i in range(self.nmocap):
data += struct.pack("4f", *self.quat[i])
return data
[docs]
@staticmethod
def deserialize(data):
if not data:
return None
# Deserialize the bytes object into an mjMocap instance
nmocap, time = struct.unpack("if", data[:8])
pos = []
quat = []
offset = 8
for i in range(nmocap):
pos.append(np.array(struct.unpack("3f", data[offset : offset + 12])))
offset += 12
for i in range(nmocap):
quat.append(np.array(struct.unpack("4f", data[offset : offset + 16])))
offset += 16
pos = np.array(pos)
quat = np.array(quat)
return mjMocap(nmocap, time, pos, quat)
def __str__(self):
s = ""
for field_name in self.__dict__:
s += "\n {0:s}: {1:}".format(field_name, self.__dict__[field_name])
return "{0:s}\n".format(s)
[docs]
class mjDynamics:
def __init__(self, nv, na, time, qacc=None, actdot=None):
self.nv = nv # number of generalized velocities
self.na = na # number of actuator activations
self.time = time # simulation time
self.qacc = qacc if qacc is not None else [] # generalized accelerations
self.actdot = actdot if actdot is not None else [] # time-derivatives of actuator activations
[docs]
def serialize(self):
# Serialize the mjDynamics attributes into a bytes object
data = struct.pack("ii", self.nv, self.na)
data += struct.pack("f", self.time)
data += struct.pack(f"{len(self.qacc)}f", *self.qacc)
data += struct.pack(f"{len(self.actdot)}f", *self.actdot)
return data
[docs]
@staticmethod
def deserialize(data):
if not data:
return None
# Deserialize the bytes object into an mjDynamics instance
nv, na = struct.unpack("ii", data[:8])
time = struct.unpack("f", data[8:12])[0]
qacc = np.array(struct.unpack(f"{nv}f", data[12 : 12 + nv * 4]))
actdot = np.array(struct.unpack(f"{na}f", data[12 + nv * 4 :]))
return mjDynamics(nv, na, time, qacc, actdot)
def __str__(self):
s = ""
for field_name in self.__dict__:
s += "\n {0:s}: {1:}".format(field_name, self.__dict__[field_name])
return "{0:s}\n".format(s)
[docs]
class mjSensor:
def __init__(self, nsensordata, time, sensordata=None):
self.nsensordata = nsensordata # size of sensor data array
self.time = time # simulation time
self.sensordata = sensordata if sensordata is not None else [] # sensor data array
[docs]
def serialize(self):
# Serialize the mjSensor attributes into a bytes object
data = struct.pack("if", self.nsensordata, self.time)
data += struct.pack(f"{len(self.sensordata)}f", *self.sensordata)
return data
[docs]
@staticmethod
def deserialize(data):
if not data:
return None
# Deserialize the bytes object into an mjSensor instance
nsensordata, time = struct.unpack("if", data[:8])
sensordata = np.array(struct.unpack(f"{nsensordata}f", data[8:]))
return mjSensor(nsensordata, time, sensordata)
def __str__(self):
s = ""
for field_name in self.__dict__:
s += "\n {0:s}: {1:}".format(field_name, self.__dict__[field_name])
return "{0:s}\n".format(s)
[docs]
class mjBody:
def __init__(self, nbody, time, pos=None, mat=None):
self.nbody = nbody # number of bodies
self.time = time # simulation time
self.pos = pos if pos is not None else [] # positions
self.mat = mat if mat is not None else [] # frame orientations
[docs]
def serialize(self):
# Serialize the mjBody attributes into a bytes object
data = struct.pack("if", self.nbody, self.time)
for i in range(self.nbody):
data += struct.pack("3f", *self.pos[i])
for i in range(self.nbody):
data += struct.pack("9f", *self.mat[i])
return data
[docs]
@staticmethod
def deserialize(data):
if not data:
return None
# Deserialize the bytes object into an mjBody instance
nbody, time = struct.unpack("if", data[:8])
pos = []
mat = []
offset = 8
for _ in range(nbody):
pos.append(list(struct.unpack("3f", data[offset : offset + 12])))
offset += 12
for _ in range(nbody):
mat.append(list(struct.unpack("9f", data[offset : offset + 36])))
offset += 36
pos = np.array(pos)
mat = np.array(mat)
return mjBody(nbody, time, pos, mat)
def __str__(self):
s = ""
for field_name in self.__dict__:
s += "\n {0:s}: {1:}".format(field_name, self.__dict__[field_name])
return "{0:s}\n".format(s)
[docs]
class mjGeom:
def __init__(self, ngeom, time, pos=None, mat=None):
self.ngeom = ngeom # number of geoms
self.time = time # simulation time
self.pos = pos if pos is not None else [] # positions
self.mat = mat if mat is not None else [] # frame orientations
[docs]
def serialize(self):
# Serialize the mjGeom attributes into a bytes object
data = struct.pack("if", self.ngeom, self.time)
for i in range(self.ngeom):
data += struct.pack("3f", *self.pos[i])
for i in range(self.ngeom):
data += struct.pack("9f", *self.mat[i])
return data
[docs]
@staticmethod
def deserialize(data):
if not data:
return None
# Deserialize the bytes object into an mjGeom instance
ngeom, time = struct.unpack("if", data[:8])
pos = []
mat = []
offset = 8
for _ in range(ngeom):
pos.append(list(struct.unpack("3f", data[offset : offset + 12])))
offset += 12
for _ in range(ngeom):
mat.append(list(struct.unpack("9f", data[offset : offset + 36])))
offset += 36
pos = np.array(pos)
mat = np.array(mat)
return mjGeom(ngeom, time, pos, mat)
def __str__(self):
s = ""
for field_name in self.__dict__:
s += "\n {0:s}: {1:}".format(field_name, self.__dict__[field_name])
return "{0:s}\n".format(s)
[docs]
class mjSite:
def __init__(self, nsite, time, pos=None, mat=None):
self.nsite = nsite # number of sites
self.time = time # simulation time
self.pos = pos if pos is not None else [] # positions
self.mat = mat if mat is not None else [] # frame orientations
[docs]
def serialize(self):
# Serialize the mjSite attributes into a bytes object
data = struct.pack("if", self.nsite, self.time)
for i in range(self.nsite):
data += struct.pack("3f", *self.pos[i])
for i in range(self.nsite):
data += struct.pack("9f", *self.mat[i])
return data
[docs]
@staticmethod
def deserialize(data):
if not data:
return None
# Deserialize the bytes object into an mjSite instance
nsite, time = struct.unpack("if", data[:8])
pos = []
mat = []
offset = 8
for _ in range(nsite):
pos.append(list(struct.unpack("3f", data[offset : offset + 12])))
offset += 12
for _ in range(nsite):
mat.append(list(struct.unpack("9f", data[offset : offset + 36])))
offset += 36
pos = np.array(pos)
mat = np.array(mat)
return mjSite(nsite, time, pos, mat)
def __str__(self):
s = ""
for field_name in self.__dict__:
s += "\n {0:s}: {1:}".format(field_name, self.__dict__[field_name])
return "{0:s}\n".format(s)
[docs]
class mjTendon:
def __init__(self, ntendon, time, length=None, velocity=None):
self.ntendon = ntendon # number of tendons
self.time = time # simulation time
self.length = length if length is not None else [] # tendon lengths
self.velocity = velocity if velocity is not None else [] # tendon velocities
[docs]
def serialize(self):
# Serialize the mjTendon attributes into a bytes object
data = struct.pack("if", self.ntendon, self.time)
data += struct.pack(f"{len(self.length)}f", *self.length)
data += struct.pack(f"{len(self.velocity)}f", *self.velocity)
return data
[docs]
@staticmethod
def deserialize(data):
if not data:
return None
# Deserialize the bytes object into an mjTendon instance
ntendon, time = struct.unpack("if", data[:8])
length = np.array(struct.unpack(f"{ntendon}f", data[8 : 8 + 4 * ntendon]))
offset = 8 + 4 * ntendon
velocity = np.array(struct.unpack(f"{ntendon}f", data[offset:]))
return mjTendon(ntendon, time, length, velocity)
def __str__(self):
s = ""
for field_name in self.__dict__:
s += "\n {0:s}: {1:}".format(field_name, self.__dict__[field_name])
return "{0:s}\n".format(s)
[docs]
class mjActuator:
def __init__(self, nu, time, length=None, velocity=None, force=None):
self.nu = nu # number of actuators
self.time = time # simulation time
self.length = length if length is not None else [] # actuator lengths
self.velocity = velocity if velocity is not None else [] # actuator velocities
self.force = force if force is not None else [] # actuator forces
[docs]
def serialize(self):
# Serialize the mjActuator attributes into a bytes object
data = struct.pack("if", self.nu, self.time)
data += struct.pack(f"{len(self.length)}f", *self.length)
data += struct.pack(f"{len(self.velocity)}f", *self.velocity)
data += struct.pack(f"{len(self.force)}f", *self.force)
return data
[docs]
@staticmethod
def deserialize(data):
if not data:
return None
# Deserialize the bytes object into an mjActuator instance
nu, time = struct.unpack("if", data[:8])
length = np.array(struct.unpack(f"{nu}f", data[8 : 8 + 4 * nu]))
offset = 8 + 4 * nu
velocity = np.array(struct.unpack(f"{nu}f", data[offset : offset + 4 * nu]))
offset += 4 * nu
force = np.array(struct.unpack(f"{nu}f", data[offset:]))
return mjActuator(nu, time, length, velocity, force)
def __str__(self):
s = ""
for field_name in self.__dict__:
s += "\n {0:s}: {1:}".format(field_name, self.__dict__[field_name])
return "{0:s}\n".format(s)
[docs]
class mjForce:
def __init__(self, nv, time, nonconstraint=None, constraint=None):
self.nv = nv # number of generalized velocities/forces
self.time = time # simulation time
self.nonconstraint = nonconstraint if nonconstraint is not None else [] # sum of all non-constraint forces
self.constraint = constraint if constraint is not None else [] # constraint forces (including contacts)
[docs]
def serialize(self):
# Serialize the mjForce attributes into a bytes object
data = struct.pack("if", self.nv, self.time)
data += struct.pack(f"{len(self.nonconstraint)}f", *self.nonconstraint)
data += struct.pack(f"{len(self.constraint)}f", *self.constraint)
return data
[docs]
@staticmethod
def deserialize(data):
if not data:
return None
# Deserialize the bytes object into an mjForce instance
nv, time = struct.unpack("if", data[:8])
nonconstraint = np.array(struct.unpack(f"{nv}f", data[8 : 8 + 4 * nv]))
offset = 8 + 4 * nv
constraint = np.array(struct.unpack(f"{nv}f", data[offset:]))
return mjForce(nv, time, nonconstraint, constraint)
def __str__(self):
s = ""
for field_name in self.__dict__:
s += "\n {0:s}: {1:}".format(field_name, self.__dict__[field_name])
return "{0:s}\n".format(s)
[docs]
class mjCamera:
def __init__(self, ncam, time, cam_xpos, cam_xmat, cam_mode, cam_bodyid, cam_targetbodyid, cam_pos, cam_quat, cam_poscom0, cam_pos0, cam_mat0, cam_fovy, cam_ipd):
self.ncam = ncam # number of camera bodies
self.time = time # simulation time
self.cam_xpos = cam_xpos if cam_xpos is not None else [] # Cartesian camera position (ncam x 3)
self.cam_xmat = cam_xmat if cam_xmat is not None else [] # Cartesian camera orientation (ncam x 9)
self.cam_mode = cam_mode if cam_mode is not None else [] # camera tracking mode (mjtCamLight) (ncam x 1)
self.cam_bodyid = cam_bodyid if cam_bodyid is not None else [] # id of camera's body (ncam x 1)
self.cam_targetbodyid = cam_targetbodyid # id of targeted body; -1: none (ncam x 1)
self.cam_pos = cam_pos if cam_pos is not None else [] # position rel. to body frame (ncam x 3)
self.cam_quat = cam_quat if cam_quat is not None else [] # orientation rel. to body frame (ncam x 4)
self.cam_poscom0 = cam_poscom0 if cam_poscom0 is not None else [] # global position rel. to sub-com in qpos0 (ncam x 3)
self.cam_pos0 = cam_pos0 if cam_pos0 is not None else [] # global position rel. to body in qpos0 (ncam x 3)
self.cam_mat0 = cam_mat0 if cam_mat0 is not None else [] # global orientation in qpos0 (ncam x 9)
self.cam_fovy = cam_fovy if cam_fovy is not None else [] # y-field of view (deg) (ncam x 1)
self.cam_ipd = cam_ipd if cam_ipd is not None else [] # inter-pupilary distance (ncam x 1)
[docs]
def serialize(self):
data = struct.pack("if", self.ncam, self.time)
data += struct.pack(f"{self.ncam * 3}f", *self.cam_xpos)
data += struct.pack(f"{self.ncam * 9}f", *self.cam_xmat)
data += struct.pack(f"{self.ncam}i", *self.cam_mode)
data += struct.pack(f"{self.ncam}i", *self.cam_bodyid)
data += struct.pack(f"{self.ncam}i", *self.cam_targetbodyid)
data += struct.pack(f"{self.ncam * 3}f", *self.cam_pos)
data += struct.pack(f"{self.ncam * 4}f", *self.cam_quat)
data += struct.pack(f"{self.ncam * 3}f", *self.cam_poscom0)
data += struct.pack(f"{self.ncam * 3}f", *self.cam_pos0)
data += struct.pack(f"{self.ncam * 9}f", *self.cam_mat0)
data += struct.pack(f"{self.ncam}f", *self.cam_fovy)
data += struct.pack(f"{self.ncam}f", *self.cam_ipd)
return data
[docs]
@staticmethod
def deserialize(data):
offset = 0
ncam, time = struct.unpack("if", data[offset : offset + 8])
offset += 8
cam_xpos = np.array(struct.unpack(f"{ncam * 3}f", data[offset : offset + ncam * 12]))
offset += ncam * 12
cam_xmat = np.array(struct.unpack(f"{ncam * 9}f", data[offset : offset + ncam * 36]))
offset += ncam * 36
cam_mode = np.array(struct.unpack(f"{ncam}i", data[offset : offset + ncam * 4]))
offset += ncam * 4
cam_bodyid = np.array(struct.unpack(f"{ncam}i", data[offset : offset + ncam * 4]))
offset += ncam * 4
cam_targetbodyid = np.array(struct.unpack(f"{ncam}i", data[offset : offset + ncam * 4]))
offset += ncam * 4
cam_pos = np.array(struct.unpack(f"{ncam * 3}f", data[offset : offset + ncam * 12]))
offset += ncam * 12
cam_quat = np.array(struct.unpack(f"{ncam * 4}f", data[offset : offset + ncam * 16]))
offset += ncam * 16
cam_poscom0 = np.array(struct.unpack(f"{ncam * 3}f", data[offset : offset + ncam * 12]))
offset += ncam * 12
cam_pos0 = np.array(struct.unpack(f"{ncam * 3}f", data[offset : offset + ncam * 12]))
offset += ncam * 12
cam_mat0 = np.array(struct.unpack(f"{ncam * 9}f", data[offset : offset + ncam * 36]))
offset += ncam * 36
cam_fovy = np.array(struct.unpack(f"{ncam}f", data[offset : offset + ncam * 4]))
offset += ncam * 4
cam_ipd = np.array(struct.unpack(f"{ncam}f", data[offset : offset + ncam * 4]))
return mjCamera(ncam, time, cam_xpos, cam_xmat, cam_mode, cam_bodyid, cam_targetbodyid, cam_pos, cam_quat, cam_poscom0, cam_pos0, cam_mat0, cam_fovy, cam_ipd)
[docs]
class mjGLCamera:
def __init__(self, nglcam, time, fixedcamid, type, trackbodyid, lookat, distance, azimuth, elevation, pos, forward, up, frustum_center, frustum_bottom, frustum_top, frustum_near, frustum_far):
self.nglcam = nglcam
self.time = time
self.fixedcamid = fixedcamid if fixedcamid is not None else []
self.type = type if type is not None else []
self.trackbodyid = trackbodyid if trackbodyid is not None else []
self.lookat = lookat if lookat is not None else []
self.distance = distance if distance is not None else []
self.azimuth = azimuth if azimuth is not None else []
self.elevation = elevation if elevation is not None else []
self.pos = pos if pos is not None else []
self.forward = forward if forward is not None else []
self.up = up if up is not None else []
self.frustum_center = frustum_center if frustum_center is not None else []
self.frustum_bottom = frustum_bottom if frustum_bottom is not None else []
self.frustum_top = frustum_top if frustum_top is not None else []
self.frustum_near = frustum_near if frustum_near is not None else []
self.frustum_far = frustum_far if frustum_far is not None else []
[docs]
def serialize(self):
data = struct.pack("if", self.nglcam, self.time)
data += struct.pack(f"{self.nglcam}i", *self.fixedcamid)
data += struct.pack(f"{self.nglcam}i", *self.type)
data += struct.pack(f"{self.nglcam}i", *self.trackbodyid)
data += struct.pack(f"{self.nglcam * 3}f", *self.lookat)
data += struct.pack(f"{self.nglcam}f", *self.distance)
data += struct.pack(f"{self.nglcam}f", *self.azimuth)
data += struct.pack(f"{self.nglcam}f", *self.elevation)
data += struct.pack(f"{self.nglcam * 3}f", *self.pos)
data += struct.pack(f"{self.nglcam * 3}f", *self.forward)
data += struct.pack(f"{self.nglcam * 3}f", *self.up)
data += struct.pack(f"{self.nglcam}f", *self.frustum_center)
data += struct.pack(f"{self.nglcam}f", *self.frustum_bottom)
data += struct.pack(f"{self.nglcam}f", *self.frustum_top)
data += struct.pack(f"{self.nglcam}f", *self.frustum_near)
data += struct.pack(f"{self.nglcam}f", *self.frustum_far)
return data
[docs]
@staticmethod
def deserialize(data):
offset = 0
nglcam, time = struct.unpack("if", data[offset : offset + 8])
offset += 8
fixedcamid = np.array(struct.unpack(f"{nglcam}i", data[offset : offset + nglcam * 4]))
offset += nglcam * 4
type = np.array(struct.unpack(f"{nglcam}i", data[offset : offset + nglcam * 4]))
offset += nglcam * 4
trackbodyid = np.array(struct.unpack(f"{nglcam}i", data[offset : offset + nglcam * 4]))
offset += nglcam * 4
lookat = np.array(struct.unpack(f"{nglcam * 3}f", data[offset : offset + nglcam * 12]))
offset += nglcam * 12
distance = np.array(struct.unpack(f"{nglcam}f", data[offset : offset + nglcam * 4]))
offset += nglcam * 4
azimuth = np.array(struct.unpack(f"{nglcam}f", data[offset : offset + nglcam * 4]))
offset += nglcam * 4
elevation = np.array(struct.unpack(f"{nglcam}f", data[offset : offset + nglcam * 4]))
offset += nglcam * 4
pos = np.array(struct.unpack(f"{nglcam * 3}f", data[offset : offset + nglcam * 12]))
offset += nglcam * 12
forward = np.array(struct.unpack(f"{nglcam * 3}f", data[offset : offset + nglcam * 12]))
offset += nglcam * 12
up = np.array(struct.unpack(f"{nglcam * 3}f", data[offset : offset + nglcam * 12]))
offset += nglcam * 12
frustum_center = np.array(struct.unpack(f"{nglcam}f", data[offset : offset + nglcam * 4]))
offset += nglcam * 4
frustum_bottom = np.array(struct.unpack(f"{nglcam}f", data[offset : offset + nglcam * 4]))
offset += nglcam * 4
frustum_top = np.array(struct.unpack(f"{nglcam}f", data[offset : offset + nglcam * 4]))
offset += nglcam * 4
frustum_near = np.array(struct.unpack(f"{nglcam}f", data[offset : offset + nglcam * 4]))
offset += nglcam * 4
frustum_far = np.array(struct.unpack(f"{nglcam}f", data[offset : offset + nglcam * 4]))
return mjGLCamera(nglcam, time, fixedcamid, type, trackbodyid, lookat, distance, azimuth, elevation, pos, forward, up, frustum_center, frustum_bottom, frustum_top, frustum_near, frustum_far)
mujoco_semaphore = Semaphore(1)
# ----------------------------- MuJoCo API object ----------------------------------------
[docs]
class mjInterface(object):
"""Creation of MuJoCo interface"""
def __init__(self, host="localhost", port=50000):
self._soc = None
self._host = host
self._port = port
self._connected = 0
self._lastError = 0
self.return_codes = {
0: "OK",
-1: "BADSIZE",
-2: "BADINDEX",
-3: "BADTYPE",
-4: "BADCOMMAND",
-5: "NOMODEL",
-6: "CANNOTSEND",
-7: "CANNOTRECV",
-8: "TIMEOUT",
-9: "NOCONNECTION",
-10: "CONNECTED",
}
self.geom_type = (
"PLANE",
"HFIELD",
"SPHERE",
"CAPSULE",
"ELLIPSOID",
"CYLINDER",
"BOX",
"MESH",
)
self.sensor_type = (
"TOUCH", # scalar contact normal forces summed over sensor zone
"ACCELEROMETER", # 3D linear acceleration', in local frame
"VELOCIMETER", # 3D linear velocity', in local frame
"GYRO", # 3D angular velocity', in local frame
"FORCE", # 3D force between site's body and its parent body
"TORQUE", # 3D torque between site's body and its parent body
"MAGNETOMETER", # 3D magnetometer
"RANGEFINDER", # distance geom along the positive size Z-axis
"JOINTPOS", # scalar joint position (hinge and slide only)
"JOINTVEL", # scalar joint velocity (hinge and slide only)
"TENDONPOS", # scalar tendon position
"TENDONVEL", # scalar tendon velocity
"ACTUATORPOS", # scalar actuator position
"ACTUATORVEL", # scalar actuator velocity
"ACTUATORFRC", # scalar actuator force
"BALLQUAT", # 4D ball joint quaterion
"BALLANGVEL", # 3D ball joint angular velocity
"FRAMEPOS", # 3D position
"FRAMEQUAT", # 4D unit quaternion orientation
"FRAMEXAXIS", # 3D unit vector: x-axis of object's frame
"FRAMEYAXIS", # 3D unit vector: y-axis of object's frame
"FRAMEZAXIS", # 3D unit vector: z-axis of object's frame
"FRAMELINVEL", # 3D linear velocity
"FRAMEANGVEL", # 3D angular velocity
"FRAMELINACC", # 3D linear acceleration
"FRAMEANGACC", # 3D angular acceleration
"SUBTREECOM", # 3D center of mass of subtree
"SUBTREELINVEL", # 3D linear velocity of subtree
"SUBTREEANGMOM", # 3D angular momentum of subtree
"USER",
)
self.joint_type = ("FREE", "BALL", "SLIDE", "HINGE")
self.transmission_type = (
"JOINT",
"JOINTINPARENT",
"SLIDERCRANK",
"TENDON",
"SITE",
)
self.constraint_type = (
"CONNECT", # connect two bodies at a point (ball joint)
"WELD", # fix relative position and orientation of two bodies
"JOINT", # couple the values of two scalar joints with cubic
"TENDON", # couple the lengths of two tendons with cubic
"DISTANCE",
) # fix the contact distance between two geoms
onebody = mjOneBody()
[docs]
def mjSetError(self, err):
self._lastError = err
# Socket
[docs]
def mjSend(self, cmd, data=None):
if not self._soc:
self.mjSetError(mjCOM_CANNOTSEND)
raise RuntimeError("No communication to server")
# Flush input buffer
"""
self._soc.settimeout(0.001)
try:
self._soc.recv(1024)
except socket.timeout:
pass
except Exception as e:
self.mjSetError(mjCOM_CANNOTSEND)
raise RuntimeError("Send: " + str(e))
"""
# Prepare buffer
if not mujoco_semaphore.acquire(timeout=10):
raise RuntimeError(f"No mujoco_semaphore ({mujoco_semaphore._value})")
if data:
_buffer = bytearray(struct.pack("<2i", cmd, len(data)))
_buffer.extend(data)
else:
_buffer = bytearray(struct.pack("<2i", cmd, 0))
# Try to send
try:
self._soc.sendall(_buffer)
except Exception as e:
self._soc.close()
self._soc = None
self.mjSetError(mjCOM_CANNOTSEND)
raise RuntimeError("Send: " + str(e))
else:
self.mjSetError(mjCOM_OK)
[docs]
def mjRecv(self):
if self._lastError != mjCOM_OK:
raise RuntimeError(self.return_codes[self._lastError])
# Get message header, disconnect on error or timeout
try:
header = self._soc.recv(8)
except socket.timeout:
self._soc.close()
self._lastError = mjCOM_TIMEOUT
raise RuntimeError("Timeout - could not get data")
except Exception as e:
self._soc.close()
self._lastError = mjCOM_CANNOTRECV
raise RuntimeError("Recieve: " + str(e))
cmd, size = struct.unpack("2i", header)
self.mjSetError(cmd)
# Get message data, disconnect on error
if size > 0 and size <= BUFSZ:
try:
data = self._soc.recv(size)
mujoco_semaphore.release()
return data
except Exception as e:
self._soc.close()
self.mjSetError(mjCOM_CANNOTRECV)
raise RuntimeError("Recieve: " + str(e))
elif size:
self._soc.close()
self.mjSetError(mjCOM_BADSIZE)
raise RuntimeError("Received data has invalid size")
mujoco_semaphore.release()
[docs]
def mjGetAck(self):
if self._lastError != mjCOM_OK:
raise RuntimeError(self.return_codes[self._lastError])
# try to receive acknowledgment
self._soc.settimeout(0.1)
try:
_data = self._soc.recv(8)
except socket.timeout:
self._soc.close()
self.mjSetError(mjCOM_TIMEOUT)
raise RuntimeError("Timeout - could not get acknowledge data")
# Check if data was received successfully
if not _data:
self._soc.close()
self.mjSetError(mjCOM_CANNOTRECV)
raise RuntimeError("Could not get acknowledge data")
_cmd, _size = struct.unpack("ii", _data)
# data size should be 0
if _size != 0:
self._soc.close()
self.mjSetError(mjCOM_CANNOTRECV)
raise RuntimeError("Bad acknowledgement")
else:
self.mjSetError(_cmd)
mujoco_semaphore.release()
# Get functions
[docs]
def mj_get_state(self) -> mjState:
"""Read model state
Returns
-------
-------
structure mjState
nq : int
number of data in qpos
nv : int
number of data in qvel
na : int
number of data in act
time : float)
simulation time
qpos : array [nq]
generalized positions
qvel : array [nv]
generalized velocities
act : array [na]
actuator activations
"""
self.mjSend(mjCOM_GETSTATE)
_data = self.mjRecv()
_out = mjState.deserialize(_data)
return _out
[docs]
def mj_get_control(self) -> mjControl:
"""Read control signals
Returns
-------
--------
structure mjControl
nu (int) : number of data in ctrl
time (float) : simulation time
ctrl[nu] (float): control array
"""
self.mjSend(mjCOM_GETCONTROL)
_data = self.mjRecv()
_out = mjControl.deserialize(_data)
return _out
[docs]
def mj_get_applied(self) -> mjApplied:
"""Read applied forces
Returns
-------
--------
structure mjApplied
nv (int) : number of data in forces
nbody (int) : id of body
time (float) : simulation time
qfrc[nv] (float): applied generalized forces
xfrc[nv] (float): Cartesian F/T applied to body
"""
self.mjSend(mjCOM_GETAPPLIED)
_data = self.mjRecv()
_out = mjApplied.deserialize(_data)
return _out
[docs]
def mj_get_onebody(self, bodyid: int) -> mjOneBody:
"""Read information about one body
Parameters
----------
-----------
bodyid (int) : body id, provided by user
Returns
-------
--------
structure mjOneBody
bodyid; (int) : body id, provided by user
get only:
isfloating (int) : 1 if body is floating, 0 otherwise
time (float) : simulation time
linacc[3] (float): linear acceleration
angacc[3] (float): angular acceleration
contactforce[3] (float) : net force from all contacts on this body
get for all bodies; set for floating bodies only:
pos[3] (float) : position
quat[4] (float) : orientation quaternion
linvel[3] (float): linear velocity
angvel[3] (float): angular velocity
get and set for all bodies:
force[3] (float) : Cartesian force applied to body CoM
torque[3] (float): Cartesian torque applied to body
"""
self.mjSend(mjCOM_GETONEBODY, struct.pack("i", bodyid))
_data = self.mjRecv()
self.onebody.deserialize(_data)
self.onebody.bodyid = bodyid
return self.onebody
[docs]
def mj_get_mocap(self) -> mjMocap:
"""Read mocaps
Returns
-------
--------
structure mjMocap
nmocap (int) : number of mocap bodies
time (float) : simulation time
pos[nmocap][3] (float) : positions
quat[nmocap][4] (float): quaternion orientations
"""
self.mjSend(mjCOM_GETMOCAP)
_data = self.mjRecv()
_out = mjMocap.deserialize(_data)
return _out
[docs]
def mj_get_dynamics(self) -> mjDynamics:
"""Read forward dynamics
Returns
-------
--------
structure mjDynamics
nv (int) : number of generalized velocities
na (int) : number of actuator activations
time (float) : simulation time
qacc[nv] (float) : generalized accelerations
actdot[na] (float): time-derivatives of actuator activations
"""
self.mjSend(mjCOM_GETDYNAMICS)
_data = self.mjRecv()
_out = mjDynamics.deserialize(_data)
return _out
[docs]
def mj_get_sensor(self) -> mjSensor:
"""
Read sensor data from the simulator.
Use the sensor descriptors available in ``mjInfo`` to decode the
returned values.
Returns
-------
mjSensor
Sensor data structure with:
- ``nsensordata``: number of sensor values
- ``time``: simulation time
- ``sensordata``: sensor data array
"""
self.mjSend(mjCOM_GETSENSOR)
_data = self.mjRecv()
_out = mjSensor.deserialize(_data)
return _out
[docs]
def mj_get_body(self) -> mjBody:
"""Read body positions
Returns
-------
--------
structure mjBody
nbody (int) : number of bodies
time (float) : simulation time
pos[nbody][3] (float): positions
mat[nbody][9] (float): frame orientations
"""
self.mjSend(mjCOM_GETBODY)
_data = self.mjRecv()
_out = mjBody.deserialize(_data)
return _out
[docs]
def mj_get_geom(self) -> mjGeom:
"""Read geom positions
Returns
-------
--------
structure mjGeom
ngeom (int) : number of geoms
time (float) : simulation time
pos[ngeom][3] (float): positions
mat[ngeom][9] (float): frame orientations
"""
self.mjSend(mjCOM_GETGEOM)
_data = self.mjRecv()
_out = mjGeom.deserialize(_data)
return _out
[docs]
def mj_get_site(self) -> mjSite:
"""Read site positions
Returns
-------
--------
structure mjSite
nsite (int) : number of bodies
time (float) : simulation time
pos[nsite][3] (float): positions
mat[nsite][9] (float): frame orientations
"""
self.mjSend(mjCOM_GETSITE)
_data = self.mjRecv()
_out = mjSite.deserialize(_data)
return _out
[docs]
def mj_get_tendon(self) -> mjTendon:
"""Read tendons data
Returns
-------
--------
structure mjTendon
ntendon (int) : number of tendons
time (float) : simulation time
length[ntendon] (float) : tendon lengths
velocity[ntendon] (float): tendon velocities
"""
self.mjSend(mjCOM_GETTENDON)
_data = self.mjRecv()
_out = mjTendon.deserialize(_data)
return _out
[docs]
def mj_get_actuator(self) -> mjActuator:
"""Read tendons data
Returns
-------
--------
structure mjTendon
nu (int) : number of actuators
time (float) : simulation time
length[nu] (float) : actuator lengths
velocity[nu] (float): actuator velocities
force[nu] (float) : actuator forces
"""
self.mjSend(mjCOM_GETACTUATOR, 0)
_data = self.mjRecv()
_out = mjActuator.deserialize(_data)
return _out
[docs]
def mj_get_force(self) -> mjForce:
"""Read tendons data
Returns
-------
--------
structure mjTendon
nv (int) : number of generalized velocities/forces
time (float) : simulation time
nonconstraint[nv] (float): sum of all non-constraint forces
constraint[nv] (float) : constraint forces (including contacts)
"""
self.mjSend(mjCOM_GETFORCE, 0)
_data = self.mjRecv()
_out = mjForce.deserialize(_data)
return _out
[docs]
def mj_get_camera(self) -> mjCamera:
"""Read camera data
Returns
-------
--------
structure mjCamera
"""
self.mjSend(mjCOM_GETCAMERA)
_data = self.mjRecv()
_out = mjCamera.deserialize(_data)
return _out
[docs]
def mj_get_glcamera(self) -> mjGLCamera:
"""Read GL camera data
Returns
-------
--------
structure mjGLCamera
"""
self.mjSend(mjCOM_GETGLCAMERA)
_data = self.mjRecv()
_out = mjCamera.deserialize(_data)
return _out
# Set functions
[docs]
def mj_set_state(self, state: mjState):
"""Set simulator state
Parameters
----------
-----------
state : mjState
structure mjState
Returns
-------
--------
int
API return code
"""
_buffer = state.serialize()
self.mjSend(mjCOM_SETSTATE, _buffer)
self.mjGetAck()
return self._lastError
[docs]
def mj_set_control(self, control: mjControl):
"""Set control signals
Parameters
----------
-----------
control : mjControl
structure mjControl
Returns
-------
--------
int
API return code
"""
_buffer = control.serialize()
self.mjSend(mjCOM_SETCONTROL, _buffer)
self.mjGetAck()
return self._lastError
[docs]
def mj_set_applied(self, applied: mjApplied):
"""Set applied forces
Parameters
----------
-----------
applied : mjApplied
structure mjApplied
Returns
-------
--------
int
API return code
"""
_buffer = applied.serialize()
self.mjSend(mjCOM_SETAPPLIED, _buffer)
self.mjGetAck()
return self._lastError
[docs]
def mj_set_onebody(self, onebody: mjOneBody):
"""Set one body data
Parameters
----------
-----------
onebody : mjOneBody
structure mjOneBody
Returns
-------
--------
int
API return code
"""
_buffer = onebody.serialize()
self.mjSend(mjCOM_SETONEBODY, _buffer)
self.mjGetAck()
return self._lastError
[docs]
def mj_set_mocap(self, mocap: mjMocap):
"""Set mocap positions
Parameters
----------
-----------
mocap : mjMocap
structure mjMocap
Returns
-------
--------
int
API return code
"""
_buffer = mocap.serialize()
self.mjSend(mjCOM_SETMOCAP, _buffer)
self.mjGetAck()
return self._lastError
# RGBA functions
[docs]
def mj_get_rgba(self, typ: str, id: int) -> np.array:
_buffer = struct.pack("20s i", typ.encode("utf-8"), id)
self.mjSend(mjCOM_GETRGBA, _buffer)
_data = self.mjRecv()
if _data:
rgba = np.array(struct.unpack("4f", _data))
return rgba
else:
self.mjSetError(mjCOM_BADSIZE)
print(f"No RGBA data for object {typ}:{id}")
[docs]
def mj_set_rgba(self, typ: str, id: int, rgba: np.array) -> int:
clamped_rgba = [min(1, max(0, x)) for x in rgba]
_buffer = struct.pack("20s i 4f", typ.encode("utf-8"), id, *clamped_rgba)
self.mjSend(mjCOM_SETRGBA, _buffer)
self.mjGetAck()
return self._lastError
# Command and information functions
[docs]
def mj_load(self, model_filename: str = None) -> int:
"""Show text message in simulator
Parameters
----------
-----------
message : string
message, None: clear currently shown message
Returns
-------
--------
int
API return code
"""
if model_filename:
_buffer = struct.pack(f"{len(model_filename) + 1}s", model_filename.encode("utf-8"))
self.mjSend(mjCOM_LOAD, _buffer)
self.mjGetAck()
return self._lastError
[docs]
def mj_connect(self):
"""Connect to Haptix simulator
Parameters
----------
-----------
ip : string
host IP
Returns
-------
--------
int
API return code
"""
self._soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
self._soc.connect((self._host, self._port))
self._connected = 1
self.mjSetError(mjCOM_OK)
return self._lastError
except Exception as e:
self.mjSetError(mjCOM_NOCONNECTION)
raise RuntimeError("Connect: " + str(e))
[docs]
def connect(self):
"""Connect to MuJoCo simulator (alias)"""
return self.mj_connect()
[docs]
def mj_close(self):
"""Close connection to Haptix simulator
Returns
-------
--------
int
API return code
"""
if not self._soc:
self._connected = 0
return self._connected
try:
self._soc.sendall(struct.pack("ii", 0, 0))
self._soc.close()
self._connected = 0
return self._connected
except Exception as e:
raise RuntimeError("Close: " + str(e))
[docs]
def close(self):
"""Close connection to MuJoCo simulator (alias)"""
return self.mj_close()
[docs]
def mj_result(self):
"""
Return the last API result code.
Returns
-------
str
Last result code reported by the MuJoCo interface.
"""
return self._lastError
[docs]
def mj_connected(self):
"""
Return the connection status.
Returns
-------
int
``1`` if connected to the simulator, otherwise ``0``.
"""
return self._connected
[docs]
def connected(self):
"""Returns connection status (alias)"""
return self._connected
[docs]
def mj_info(self):
"""Get static properties of current model
Returns
-------
--------
myInfo: structure
nq (int) : number of generalized positions
nv (int) : number of generalized velocities
na (int) : number of actuator activations
njnt (int) : number of joints
nbody (int) : number of bodies
ngeom (int) : number of geoms
nsite (int) : number of sites
ntendon (int) : number of tendons
nu (int) : number of actuators/controls
neq (int) : number of equality constraints
nkey (int) : number of keyframes
nmocap (int) : number of mocap bodies
nsensor (int) : number of sensors
nsensordata (int) : number of elements in sensor data array
nmat (int) : number of materials
timestep (float) : simulation timestep
apirate (float) : API update rate
sensor_type[nsensor] (int) : sensor type
sensor_datatype[nsensor] (int) : type of sensorized object
sensor_objtype[nsensor] (int) : type of sensorized object
sensor_objid[nsensor] (int) : id of sensorized object
sensor_dim[nsensor] (int) : number of sensor outputs
sensor_adr[nsensor] (int) : address in sensor data array
sensor_noise[nsensor] (float) : noise standard deviation
jnt_type[njnt] (int) : joint type (mjtJoint)
jnt_bodyid[njnt] (int) : id of body to which joint belongs
jnt_qposadr[njnt] (int) : address of joint position data in qpos
jnt_dofadr[njnt] (int) : address of joint velocity data in qvel
jnt_range[njnt][2] (float) : joint range (0,0): no limits
geom_type[ngeom] (int) : geom type (mjtGeom)
geom_bodyid[ngeom] (int) : id of body to which geom is attached
eq_type[neq] (int) : equality constraint type (mjtEq)
eq_obj1id[neq] (int) : id of constrained object
eq_obj2id[neq] (int) : id of 2nd constrained object -1 if not applicable
actuator_trntype[nu] (int) : transmission type (mjtTrn)
actuator_trnid[nu][2] (int) : transmission target id
actuator_ctrlrange[nu][2](float): actuator control range (0,0): no limits
"""
self.mjSend(mjCOM_INFO, 0)
_data = self.mjRecv()
_info = mjInfo.deserialize(_data)
return _info
[docs]
def mj_step(self) -> int:
"""Advance simulation if paused, no effect if running
Returns
-------
--------
int
API return code
"""
self.mjSend(mjCOM_STEP)
self.mjGetAck()
return self._lastError
[docs]
def mj_pause(self) -> int:
"""Pause simulation
Returns
-------
--------
int
API return code
"""
self.mjSend(mjCOM_PAUSE)
self.mjGetAck()
return self._lastError
[docs]
def mj_run(self) -> int:
"""Run simulation
Returns
-------
--------
int
API return code
"""
self.mjSend(mjCOM_RUN)
self.mjGetAck()
return self._lastError
[docs]
def mj_update(self, control: mjControl) -> int:
"""
Advance the simulation and refresh sensor data.
The method sends the control command, advances the simulation, and
returns the refreshed sensor data structure received from the simulator.
Parameters
----------
control : mjControl
Control command structure to send to MuJoCo.
Returns
-------
mjSensor
Updated sensor data returned by the simulator.
"""
_buffer = control.serialize()
self.mjSend(mjCOM_UPDATE, _buffer)
_data = self.mjRecv()
_out = mjSensor.deserialize(_data)
return _out
[docs]
def mj_reset(self, keyframe: int = None) -> int:
"""Reset simulation to specified key frame
Parameters
----------
-----------
keyframe : int, optional
key frame; -1: reset to model reference configuration
Returns
-------
--------
int
API return code
"""
if keyframe is None:
self.mjSend(mjCOM_RESET)
else:
_buffer = struct.pack("i", keyframe)
self.mjSend(mjCOM_RESET, _buffer)
self.mjGetAck()
return self._lastError
[docs]
def mj_equality(self, eqid: int, state: int) -> int:
"""Modify state of specified equality constraint
Parameters
----------
-----------
eqid : int
equality id
state : int
equality constraint, 1: enable, 0: disable
Returns
-------
--------
int
API return code
"""
_buffer = struct.pack("2i", eqid, state)
self.mjSend(mjCOM_EQUALITY, _buffer)
self.mjGetAck()
return self._lastError
[docs]
def mj_message(self, message: str = None) -> int:
"""Show text message in simulator
Parameters
----------
-----------
message : str, optional
message, None: clear currently shown message
Returns
-------
--------
int
API return code
"""
if message:
_buffer = struct.pack(f"{len(message) + 1}s", message.encode("utf-8"))
self.mjSend(mjCOM_MESSAGE, _buffer)
else:
self.mjSend(mjCOM_MESSAGE)
self.mjGetAck()
return self._lastError
[docs]
def mj_name2id(self, typ: str, name: str) -> int:
"""
Return the id of an object with the specified type and name.
Valid object types are ``body``, ``geom``, ``site``, ``joint``,
``tendon``, ``sensor``, ``actuator``, and ``equality``.
Parameters
----------
typ : str
Object type.
name : str
Object name.
Returns
-------
int
Object id. Returns ``-1`` if not found and ``-2`` on error.
"""
_buffer = struct.pack("100s 100s", typ.encode("utf-8"), name.encode("utf-8"))
self.mjSend(mjCOM_NAME2ID, _buffer)
_data = self.mjRecv()
_out = struct.unpack("i", _data)[0]
return _out
[docs]
def mj_id2name(self, typ: str, id: int) -> str:
"""Returns name of object with specified type and id
valid object types: body, geom, site, joint, tendon, sensor, actuator, equality
Parameters
----------
-----------
typ : str
object type
id : int
object id
Returns
-------
--------
string
object name
"""
_buffer = struct.pack("i 100s", id, typ.encode("utf-8"))
self.mjSend(mjCOM_ID2NAME, _buffer)
_data = self.mjRecv()
idx = _data.find(b"\x00")
if idx != -1:
_out = _data[:idx].decode("utf-8")
return _out
else:
return None
[docs]
def isMuJoCo(scn):
return isinstance(scn, mjInterface)
# --------------------------------------------------------------------
if __name__ == "__main__":
# Run MuJoCo and load model
np.set_printoptions(formatter={"float": "{: 0.4f}".format})
scn = mjInterface()
res = scn.mj_connect()
print(scn.mj_result())
if scn.mj_connected():
print("Connected to the simulator.")
info = scn.mj_info()
if not info:
print("No model loaded - terminating")
else:
scn.mj_message("Test")
print("Model info:\n}", info)
state = scn.mj_get_state()
print("State:\n", state)
# state.qpos = np.array([0, -1, 0, 1, 0, 0.5, 0.7, 0, 0])
# print(" SET: ", scn.mj_set_state(state))
# state = scn.mj_get_state()
# print("State:\n", state)
control = scn.mj_get_control()
print("Control:\n ", control)
control.ctrl[:7] = [0, -1, 0, 1, 0, 0.5, 0.7]
scn.mj_set_control(control)
applied = scn.mj_get_applied()
print("applied:\n ", applied)
applied.qfrc_applied[2] = 0.1
scn.mj_set_applied(applied)
body = scn.mj_get_body()
print("body:\n ", body)
id = scn.mj_name2id("body", "Target")
print("Id of Target:", id)
if id >= 0:
onebody = scn.mj_get_onebody(id)
print("onebody:\n ", onebody)
if onebody.bodyid >= 0:
# onebody.bodyid = id
onebody.pos[1] = 0.2
onebody.pos[2] = 1.0
scn.mj_set_onebody(onebody)
onebody = scn.mj_get_onebody(0)
print("onebody:\n ", onebody)
mocap = scn.mj_get_mocap()
print("mocap:\n ", mocap)
dynamics = scn.mj_get_dynamics()
print("dynamics:\n ", dynamics)
sensor = scn.mj_get_sensor()
print("sensor:\n ", sensor)
geom = scn.mj_get_geom()
print("geom:\n ", geom)
site = scn.mj_get_site()
print("site:\n ", site)
tendon = scn.mj_get_tendon()
print("tendon:\n ", tendon)
actuator = scn.mj_get_actuator()
print("actuator:\n ", actuator)
force = scn.mj_get_force()
print("force:\n ", force)
rgba = scn.mj_get_rgba("geom", 1)
print(rgba)
rgba[0] = 1
scn.mj_set_rgba("geom", 4, rgba)
scn.mj_message()
print("Site id: ", scn.mj_name2id("site", "Panda_hand"))
print("Site name: ", scn.mj_id2name("site", 14))
scn.mj_close()