Source code for robotblockset.mujoco.scene_pymujoco_sim

"""Scene utilities for synchronous Python MuJoCo simulations.

This module provides the `mujoco_scene` helper used by RobotBlockSet synchronous
MuJoCo backends to load models, manage the viewer, render auxiliary camera
windows, and control simulation stepping and resets.

Copyright (c) 2025 Jozef Stefan Institute

Authors: Leon Zlajpah.
"""

import numpy as np
from time import perf_counter, sleep, time
from typing import List, Optional, Union

try:
    import mujoco
    import mujoco.viewer
    import glfw
except Exception as e:
    raise e from RuntimeError("MuJoCo not installed. \nYou can install MuJoCo through pip:\n   pip install mujoco")

from robotblockset.tools import rbs_object


[docs] class mujoco_scene(rbs_object): """ MuJoCo scene manager with viewer support for synchronous backends. Attributes ---------- spec : Optional[mujoco.MjSpec] MuJoCo specification object when the scene was loaded from XML. model : mujoco.MjModel Compiled MuJoCo model used by the scene. data : mujoco.MjData Runtime MuJoCo data associated with `model`. pause : bool Flag indicating whether simulation stepping is paused. viewer : Optional[mujoco.viewer.Handle] Passive MuJoCo viewer used for the main scene window. show_camera : list[Union[str, int]] Camera names or IDs rendered in additional windows. cam_windows : list[dict[str, object]] Metadata for additional GLFW camera windows. """
[docs] def __init__(self, model_xml_file: Optional[str] = None, model: Optional[mujoco.MjModel] = None, show_viewer: bool = True, show_camera: Optional[List[Union[str, int]]] = None, verbose: int = 0) -> None: """Create a synchronous MuJoCo scene manager. Parameters ---------- model_xml_file : str, optional Path to the MuJoCo XML model file to load. model : mujoco.MjModel, optional Existing MuJoCo model to use instead of loading from XML. show_viewer : bool, optional If `True`, open the main passive MuJoCo viewer. show_camera : Optional[List[Union[str, int]]], optional Camera names or IDs to render in auxiliary windows. verbose : int, optional Verbosity level used for status messages. Returns ------- None This constructor initializes the synchronous MuJoCo scene object in place. """ super().__init__() self._verbose = verbose self.Name = "MuJoCo Scene" if model is not None: self.spec = None self.model = model elif model_xml_file is not None: self.spec = mujoco.MjSpec.from_file(model_xml_file) self.model = self.spec.compile() else: raise ValueError("Either model_xml_file or model must be provided.") self.data = mujoco.MjData(self.model) mujoco.mj_forward(self.model, self.data) self.pause = False self._initial_time = time() self._samples = 0 self.Message("Model loaded successfully.", 2) self._connected = 1 self._last_step_time = perf_counter() - self.model.opt.timestep self.viewer = None self._viewer_active = show_viewer self.cam_windows = [] self.show_camera = show_camera if show_camera is not None else [] self.start_simulation() self.Message("Viewer started.", 2)
def _synchro_simulation(self) -> None: """Synchronize simulation stepping to the configured model timestep. Returns ------- None This method delays the caller so the next step matches the target timestep. """ next_time = self._last_step_time + self.model.opt.timestep self._remaining = next_time - time() if self._remaining > 0: sleep(self._remaining) self._last_step_time = time()
[docs] def start_simulation(self) -> None: """ Start the viewer and auxiliary camera windows. Returns ------- None This method creates the passive viewer and any requested camera windows. """ if self._viewer_active: self.viewer = mujoco.viewer.launch_passive(self.model, self.data) default_w, default_h = 400, 300 # sensible 4:3 default if self.show_camera: # Initialize additional windows for each camera for cam in self.show_camera: cam_id = cam if isinstance(cam, int) else self.model.camera(cam).id if 0 <= cam_id < self.model.ncam: # Create a GLFW window for the camera win = glfw.create_window(default_w, default_h, f"Camera {cam_id}", None, None) if not win: continue glfw.make_context_current(win) # A separate MjrContext per window (tied to that GL context) ctx = mujoco.MjrContext(self.model, mujoco.mjtFontScale.mjFONTSCALE_100) # A fixed camera pointing to this defined camera cam = mujoco.MjvCamera() cam.type = mujoco.mjtCamera.mjCAMERA_FIXED cam.fixedcamid = cam_id self.cam_windows.append({"id": cam_id, "win": win, "ctx": ctx, "cam": cam}) if self.cam_windows: self.scene = mujoco.MjvScene(self.model, maxgeom=20_000) self.opt = mujoco.MjvOption() self.opt.frame = mujoco.mjtFrame.mjFRAME_NONE # Example: hide labels/contacts if you like self.opt.flags[mujoco.mjtVisFlag.mjVIS_CONTACTPOINT.value] = 0 self.opt.flags[mujoco.mjtVisFlag.mjVIS_CONTACTFORCE.value] = 1
[docs] def stop_simulation(self) -> None: """ Stop rendering by closing the passive viewer. Returns ------- None This method closes the main viewer window. """ if self.viewer is not None: self.viewer.close() self.viewer = None
[docs] def restart_simulation(self) -> None: """ Restart rendering and recompile the model when possible. Returns ------- None This method refreshes the scene after stopping the viewer. """ self.stop_simulation() if self.spec is not None: self.model, self.data = self.spec.recompile(self.model, self.data) self.start_simulation()
[docs] def update_cam_windows(self) -> None: """Render all active auxiliary camera windows. Returns ------- None This method updates and prunes additional GLFW camera windows. """ if not self.cam_windows: return alive = [] for cw in self.cam_windows: win = cw["win"] if glfw.window_should_close(win): glfw.destroy_window(win) self.cam_windows.remove(cw) continue else: alive.append(cw) # Get framebuffer viewport glfw.make_context_current(win) fb_w, fb_h = glfw.get_framebuffer_size(win) viewport = mujoco.MjrRect(0, 0, fb_w, fb_h) # Update scene and render mujoco.mjv_updateScene(self.model, self.data, self.opt, None, cw["cam"], mujoco.mjtCatBit.mjCAT_ALL.value, self.scene) mujoco.mjr_render(viewport, self.scene, cw["ctx"]) # Swap OpenGL buffers (blocking vsync) glfw.swap_buffers(win) # Process window events (keyboard, mouse, etc.) glfw.poll_events() self.cam_windows = alive
[docs] def mj_step(self) -> None: """ Advances the simulation by one time step. Returns ------- None This method steps the MuJoCo simulation and refreshes viewers. """ if not self.pause: mujoco.mj_step(self.model, self.data) if self._viewer_active and self.viewer.is_running(): self.viewer.sync() self.update_cam_windows()
[docs] def mj_forward(self) -> None: """ Advances the simulation by using forward dynamics. Returns ------- None This method recomputes forward dynamics and refreshes viewers. """ if not self.pause: mujoco.mj_forward(self.model, self.data) if self._viewer_active and self.viewer.is_running(): self.viewer.sync() self.update_cam_windows()
[docs] def mj_pause(self) -> None: """Pause simulation stepping. Returns ------- None This method sets the internal pause flag. """ self.pause = True
[docs] def mj_run(self) -> None: """Resume simulation stepping. Returns ------- None This method clears the internal pause flag. """ self.pause = False
[docs] def mj_wait(self, wait: float = 0) -> None: """ Advance the MuJoCo simulation for a specified duration. This method performs repeated simulation steps until the internal simulation time (`self.data.time`) has advanced by the given amount. Unlike a passive delay (e.g., ``time.sleep``), this function actively progresses the MuJoCo physics simulation. Parameters ---------- wait : float, optional Amount of simulated time (in seconds) to advance. A value of ``0`` (default) performs no additional steps. Notes ----- - Simulation progresses by repeatedly calling ``self.mj_step``. - The function returns only after the MuJoCo model time exceeds the initial time plus ``wait``. - This does *not* block real time; the speed of advancement depends on computation speed and simulation complexity. Returns ------- None This method blocks until the requested amount of simulated time has elapsed. """ _t0 = self.data.time while self.data.time < _t0 + wait: self.mj_step()
[docs] def mj_reset(self, keyframe: Optional[int] = None) -> None: """ Resets the simulation state to its initial state, or to a specified keyframe. Parameters ---------- keyframe : int, optional The index of the keyframe to reset to. If None, the simulation is reset to the initial state. Returns ------- None """ if keyframe is None: mujoco.mj_resetData(self.model, self.data) else: mujoco.mj_resetDataKeyframe(self.model, self.data, keyframe) mujoco.mj_forward(self.model, self.data)
[docs] def mj_capture_camera(self, camera: Union[int, str] = -1, scene_option: Optional[mujoco.MjvOption] = None, **kwargs: object) -> np.ndarray: """ Captures an image from a specified camera in the simulation. Parameters ---------- camera : Union[int, str], optional The ID or name of the camera to capture. Default is -1, which captures the default camera. scene_option : mujoco.MjvOption, optional Rendering options to customize the scene appearance. Default is None. **kwargs : object Additional keyword arguments passed to `mujoco.Renderer`. Returns ------- np.ndarray The rendered image as a NumPy array (RGB format). """ with mujoco.Renderer(self.model, **kwargs) as renderer: renderer.update_scene(self.data, camera=camera, scene_option=scene_option) frame = renderer.render() return frame