Source code for robotblockset.mujoco.ompl_pymujoco

"""OMPL MuJoCo Motion Planning Utilities.

This module provides a collection of utilities that integrate the
`MuJoCo` physics engine with the `OMPL` (Open Motion Planning Library)
to perform collision-aware motion planning in joint space.

It offers helper functions for:

- Creating OMPL state spaces and converting between OMPL states and
  NumPy vectors.
- Building MuJoCo-based state-validity checkers that detect collisions
  using MuJoCo's contact data.
- Configuring a wide selection of OMPL planners, including
  sampling-based, optimal, and meta-planners (e.g., BIT*, RRT#, PRM*,
  CForest, APS).
- Performing complete motion-planning queries returning both raw and
  interpolated paths.
- Planning motions for MuJoCo-modeled robots (or multi-robot systems)
  with optional clearance inflation for more conservative planning.

The core functionality revolves around mapping OMPL states to MuJoCo
`qpos`, invoking MuJoCo forward dynamics to detect collisions, and using
standard OMPL pipelines to generate feasible paths.

Dependencies
------------
- **MuJoCo** (https://mujoco.org/)
- **OMPL** (https://ompl.kavrakilab.org/)

Typical Workflow
----------------
1. Extract robot joint limits from a MuJoCo model.
2. Build an OMPL RealVector state space.
3. Construct a MuJoCo-based validity checker using
   :func:`make_mujoco_validity_fn`.
4. Set and configure an OMPL planner.
5. Solve the motion-planning problem with :func:`plan_motion` or
   :func:`plan_robot_motion`.

Notes
-----
- MuJoCo `MjData` is not thread-safe. For safety, a private copy of
  `MjData` is used internally for validity checking.
- Some OMPL planners may not be available in all Python builds; unknown
  planner names result in informative errors.
- Collision checking can consider only robot contacts or all contacts,
  depending on user settings.

This module is intended for research, prototyping, and teaching
applications involving sampling-based planning for articulated robots in
simulation.

Copyright (c) 2025 Jozef Stefan Institute

Authors: Leon Zlajpah.
"""

from __future__ import annotations

# pyright: reportMissingImports=false

import numpy as np
from copy import copy
from typing import Callable, Optional, Tuple, Iterable, Sequence, Set, TYPE_CHECKING

try:
    import mujoco
except Exception as e:
    raise e from RuntimeError("MuJoCo not installed. \nYou can install MuJoCo through pip:\n   pip install mujoco")

try:
    from ompl import base as ob, geometric as og
except Exception as e:
    raise e from RuntimeError("Open Motion Planning Library (OMPL) not installed. \nYou can install OMPL through pip:\n   pip install ompl")


from robotblockset.mujoco.tools_pymujoco import get_robot_joints_data, get_body_descendants, get_geoms_of_body
from robotblockset.tools import check_option
from robotblockset.rbs_typing import JointConfigurationType

if TYPE_CHECKING:
    from robotblockset.robots import robot


[docs] def make_state_space(bounds: np.ndarray) -> ob.RealVectorStateSpace: """ Create a RealVector state space with the given bounds. Parameters ---------- bounds : np.ndarray Per-dimension lower/upper limits; shape ``(n, 2)``. Returns ------- ompl.base.RealVectorStateSpace State space with bounds applied. Raises ------ ValueError If ``bounds`` is not ``(n, 2)`` or any ``low > high``. """ arr = np.asarray(bounds, dtype=float) if arr.ndim != 2 or arr.shape[1] != 2: raise ValueError("bounds must have shape (n, 2)") if arr.shape[0] == 0: raise ValueError("bounds must define at least one dimension") if np.any(arr[:, 0] > arr[:, 1]): i = int(np.argmax(arr[:, 0] > arr[:, 1])) raise ValueError(f"low > high for dimension {i}: {arr[i]}") dim = int(arr.shape[0]) space = ob.RealVectorStateSpace(dim) rb = ob.RealVectorBounds(dim) for i, (lo, hi) in enumerate(arr): rb.setLow(i, float(lo)) rb.setHigh(i, float(hi)) space.setBounds(rb) return space
[docs] def vec_to_state(space: ob.StateSpace, v: Sequence[float]) -> ob.State: """ Convert a vector of coordinates to an OMPL state. Parameters ---------- space : ob.StateSpace Target state space (e.g., ``RealVectorStateSpace``). v : Sequence[float] Coordinates; length must equal ``space.getDimension()``. Returns ------- ompl.base.State Newly created state with values from ``v``. Raises ------ ValueError If ``len(v)`` does not match the space dimension. """ dim = space.getDimension() if len(v) != dim: raise ValueError(f"Expected vector of length {dim}, got {len(v)}") s = ob.State(space) for i, val in enumerate(v): s[i] = float(val) return s
[docs] def state_to_vec(s: ob.State, n: int) -> np.ndarray: """ Convert an OMPL state to a NumPy vector. Parameters ---------- s : ob.State State to convert. n : int Dimension of the state (e.g., ``space.getDimension()``). Returns ------- (n,) ndarray of float State coordinates as a NumPy array. """ return np.fromiter((s[i] for i in range(n)), dtype=float, count=n)
[docs] def make_mujoco_validity_fn(model: mujoco.MjModel, data: mujoco.MjData, robot_geoms_ids: Iterable[int], qaddr: Sequence[int], *, only_robot_contacts: bool = True) -> Callable[[ob.State], bool]: """ Build an OMPL state-validity function that uses MuJoCo contacts. The returned callable maps an OMPL state (joint vector) to ``True`` iff the MuJoCo scene has **no contacts**. By default, only contacts where **either geom belongs to the robot** are considered (set ``only_robot_contacts=False`` to consider all contacts). Parameters ---------- model : mujoco.MjModel Loaded MuJoCo model. data : mujoco.MjData A reference data object providing the template ``qpos`` (start pose). This object is **not** mutated. robot_geoms_ids : Iterable[int] IDs of geoms that belong to the robot (used to filter contacts). qaddr : Sequence[int] Indices into ``qpos`` for the planned joints (order must match the OMPL RealVector state). only_robot_contacts : bool, optional If ``True`` (default), only contacts where ``geom1`` or ``geom2`` is in ``robot_geom_ids`` are considered. If ``False``, *any* contact invalidates the state. Returns ------- Callable[[ompl.base.State], bool] A function that returns ``True`` for valid (collision-free) states. Notes ----- - A private ``MjData`` is created for thread-safety; MuJoCo data is not thread-safe to share across calls. - ``contact.dist`` is positive when separated, 0 at touching, and negative when penetrating. """ # Normalize inputs robot_geoms: Set[int] = {int(g) for g in robot_geoms_ids} qpos_template = np.array(data.qpos, copy=True) # Private work buffer for kinematics/contacts _data = copy(data) def is_valid(state: ob.State) -> bool: # Fill qpos from OMPL state q = qpos_template.copy() for i, adr in enumerate(qaddr): q[adr] = float(state[i]) _data.qpos[:] = q _data.qvel[:] = 0.0 # Compute kinematics + contacts mujoco.mj_forward(model, _data) # Reject if any relevant contact violates clearance for k in range(_data.ncon): c = _data.contact[k] if only_robot_contacts and (c.geom1 not in robot_geoms and c.geom2 not in robot_geoms): continue return False return True return is_valid
[docs] def set_optimizing_planner(ss: og.SimpleSetup, name: str, *, objective: Optional[ob.OptimizationObjective] = None) -> ob.Planner: """ Attach an **optimizing** OMPL planner to a ``SimpleSetup`` by name. This convenience maps common planner names (e.g., ``"BIT*"`` or ``"RRT#"``) to their Python classes in ``ompl.geometric`` and sets a path-length optimization objective by default. Parameters ---------- ss : og.SimpleSetup Problem container with a configured state space and validity checker. name : str Planner name (case-insensitive). Supported keys include: ``"PRM*"`` (PRMstar), ``"LazyPRM*"`` (LazyPRMstar), ``"RRT*"`` (RRTstar), ``"RRT#"`` (RRTsharp), ``"RRTX"`` (RRTXstatic), ``"Informed RRT*"`` (InformedRRTstar), ``"BIT*"`` (BITstar), ``"ABIT*"`` (ABITstar), ``"AIT*"`` (AITstar), ``"EIT*"`` (EITstar), ``"LBTRRT"`` (LBTRRT), ``"SST"`` (SST), ``"T-RRT"`` (TRRT), ``"SPARS"`` (SPARS), ``"SPARS2"`` (SPARStwo), ``"FMT*"`` (FMT), ``"ST-RRT*"`` (STRRTstar), meta-planners: ``"CForest"`` (CForest), ``"APS"`` / ``"AnytimePathShortening"``. objective : ob.OptimizationObjective, optional Custom objective (defaults to ``PathLengthOptimizationObjective``). Meta-planners and *-star variants expect objectives that obey the triangle inequality (e.g., path length). Returns ------- ompl.base.Planner The instantiated planner attached to ``ss`` via ``ss.setPlanner(...)``. Raises ------ ValueError If the planner name is unknown or not available in the current build. Notes ----- - Meta-planners require adding sub-planners: - ``CForest`` runs several optimal planners in parallel; add with ``planner.addPlanner(...)``. - ``AnytimePathShortening`` (``APS``) wraps one or more planners and repeatedly hybridizes/shortcuts; add with ``planner.addPlanner(...)``. - Some classes such as ``ABITstar``, ``AITstar``, ``EITstar``, and ``RRTXstatic`` may not be present in older OMPL Python wheels. """ si = ss.getSpaceInformation() # Default objective: path length if objective is None: objective = ob.PathLengthOptimizationObjective(si) ss.setOptimizationObjective(objective) key = name.strip().lower().replace(" ", "") key = key.replace("star", "*").replace("rrtsharp", "rrt#").replace("strrt*", "st-rrt*") table = { "prm*": og.PRMstar, # PRM* "lazyprm*": og.LazyPRMstar, # LazyPRM* "rrt*": og.RRTstar, # RRT* "rrt#": og.RRTsharp, # RRT# "rrtx": og.RRTXstatic, # RRTX (static variant) "informedrrt*": og.InformedRRTstar, # Informed RRT* "bit*": og.BITstar, # BIT* "abit*": og.ABITstar, # ABIT* "ait*": og.AITstar, # AIT* "eit*": og.EITstar, # EIT* "lbtrrt": og.LBTRRT, # LBTRRT "sst": og.SST, # Sparse Stable RRT "t-rrt": og.TRRT, # T-RRT "spars": og.SPARS, # SPARS "spars2": og.SPARStwo, # SPARS2 "fmt*": og.FMT, # FMT* "st-rrt*": og.STRRTstar, # ST-RRT* } # Meta-planners if key in ("cforest", "aps", "anytimepathshortening"): if key == "cforest": planner = og.CForest(si) # Add your sub-planners (examples): planner.addPlanner(og.RRTstar(si)) planner.addPlanner(og.BITstar(si)) else: planner = og.AnytimePathShortening(si) planner.addPlanner(og.BITstar(si)) planner.addPlanner(og.InformedRRTstar(si)) ss.setPlanner(planner) return planner # Regular planners if key not in table: raise ValueError(f"Unknown/unsupported planner '{name}'") PlannerCls = table[key] try: planner = PlannerCls(si) except Exception as e: raise ValueError(f"Planner '{name}' not available in this OMPL build") from e ss.setPlanner(planner) return planner
[docs] def plan_motion( q_start: JointConfigurationType, q_goal: JointConfigurationType, bounds: np.ndarray, validity_fn: Callable[[ob.State], bool], *, algorithm: Optional[str] = "RRTConnect", objective: Optional[ob.OptimizationObjective] = None, max_planning_time: float = 5.0, edge_resolution: float = 0.01, ) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]: """ Plan a collision-free joint-space path with OMPL and return both the raw and interpolated waypoint arrays. Parameters ---------- q_start : JointConfigurationType Start joint vector (length ``n``). Must lie within ``bounds``. q_goal : JointConfigurationType Goal joint vector (length ``n``). Must lie within ``bounds``. bounds : np.ndarray Joint limits for a RealVector state space; each row is ``[low, high]``. validity_fn : Callable[[ob.State], bool] State-validity checker that returns ``True`` iff the state is collision-free. (Typically calls MuJoCo to check contacts.) algorithm : str, optional Planner to use. If ``None``, no planner is set (use OMPL defaults). Supported optimization planners include: ``"PRM*"`` (PRMstar), ``"LazyPRM*"`` (LazyPRMstar), ``"RRT*"`` (RRTstar), ``"RRT#"`` (RRTsharp), ``"RRTX"`` (RRTXstatic), ``"Informed RRT*"`` (InformedRRTstar), ``"BIT*"`` (BITstar), ``"ABIT*"`` (ABITstar), ``"AIT*"`` (AITstar), ``"EIT*"`` (EITstar), ``"LBTRRT"`` (LBTRRT), ``"SST"`` (SST), ``"T-RRT"`` (TRRT), ``"SPARS"`` (SPARS), ``"SPARS2"`` (SPARStwo), ``"FMT*"`` (FMT), and ``"ST-RRT*"`` (STRRTstar). For the optimization planners, a path-length objective in joint space is used. objective : ob.OptimizationObjective, optional Custom objective (defaults to ``PathLengthOptimizationObjective``). Meta-planners and star-variant planners expect objectives that obey the triangle inequality (e.g., path length). max_planning_time : float, optional Initial solve time budget in seconds. The routine may call ``solve`` again to continue improving the solution. edge_resolution : float, optional Fraction of state space extent used for discrete validity checking along edges (smaller is safer but slower), e.g. ``0.01`` = 1% of extent. Returns ------- path_int : (m, n) ndarray or None Interpolated path waypoints after simplification (``m`` points, including start and goal). ``None`` if planning failed. waypoints : (k, n) ndarray or None Raw solution path waypoints before interpolation (``k`` points). ``None`` if planning failed. Raises ------ ValueError If an unsupported ``algorithm`` is requested. Notes ----- - This builds a RealVector state space from ``bounds``. - Post-processing uses ``SimpleSetup.simplifySolution()`` for portability across OMPL Python builds, then upsamples to 100 points via ``path.interpolate(100)``. """ # Set up the OMPL planning problem space = make_state_space(bounds) ss = og.SimpleSetup(space) # Set validity function ss.setStateValidityChecker(ob.StateValidityCheckerFn(validity_fn)) # Set planner if algorithm is None: pass elif check_option(algorithm, "RRTConnect"): planner = og.RRTConnect(ss.getSpaceInformation()) ss.setPlanner(planner) else: set_optimizing_planner(ss, algorithm, objective=objective) ss.getSpaceInformation().setStateValidityCheckingResolution(edge_resolution) # Define start/goal states start_state = vec_to_state(space, q_start) goal_state = vec_to_state(space, q_goal) ss.setStartAndGoalStates(start_state, goal_state, 1e-3) # Solve (allow a bit more time to improve) if ss.solve(max_planning_time): pass # ss.solve(2.0) # continues refinement else: return None, None # Raw solution waypoints path = ss.getSolutionPath() nq = space.getDimension() waypoints = [state_to_vec(path.getState(i), nq) for i in range(path.getStateCount())] # Path post-processing (portable across OMPL Python builds) ss.simplifySolution() path = ss.getSolutionPath() # Interpolated path with 100 points path.interpolate(100) path_int = [state_to_vec(path.getState(i), nq) for i in range(path.getStateCount())] return np.asarray(path_int, dtype=float).reshape(-1, nq), np.asarray(waypoints, dtype=float).reshape(-1, nq)
[docs] def plan_robot_motion( robot: "robot", q_start: JointConfigurationType, q_goal: JointConfigurationType, algorithm: Optional[str] = "RRTConnect", max_planning_time: float = 5.0, clearance: float = 0.0, only_robot_contacts: bool = True, edge_resolution: float = 0.01, validity_fn: Optional[Callable[[ob.State], bool]] = None, objective: Optional[ob.OptimizationObjective] = None, ) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]: """ Plan a collision-free joint-space motion for a single robot or a composite multi-robot object using MuJoCo for collision checks and OMPL for planning. The function temporarily **inflates the robot geoms' collision margins** by ``clearance`` during planning (for a safety buffer) and restores the original margins afterwards. Parameters ---------- robot : 'robot' robot Either a single robot object exposing ``scene``, ``JointNames``, ``BaseName``; or a composite with ``robots`` (each having those fields). q_start : JointConfigurationType Start joint configuration (order must match ``joint_names``). q_goal : JointConfigurationType Goal joint configuration. algorithm : str, optional {'RRTConnect', 'RRTstar', 'InformedRRTstar', 'BITstar', None}, optional Planner to use (forwarded to ``plan_motion``). Supportedplanners include: ``"PRM*"`` (PRMstar), ``"LazyPRM*"`` (LazyPRMstar), ``"RRT*"`` (RRTstar), ``"RRT#"`` (RRTsharp), ``"RRTX"`` (RRTXstatic), ``"Informed RRT*"`` (InformedRRTstar), ``"BIT*"`` (BITstar), ``"ABIT*"`` (ABITstar), ``"AIT*"`` (AITstar), ``"EIT*"`` (EITstar), ``"LBTRRT"`` (LBTRRT), ``"SST"`` (SST), ``"T-RRT"`` (TRRT), ``"SPARS"`` (SPARS), ``"SPARS2"`` (SPARStwo), ``"FMT*"`` (FMT), ``"ST-RRT*"`` (STRRTstar), For the optimization planners, a path-length objective in joint space is used. max_planning_time : float, optional Time budget (seconds) given to the planner. clearance : float, optional Extra safety distance [m]; implemented here by inflating robot geoms' MuJoCo ``margin`` during planning (default 0.0). only_robot_contacts : bool, optional If ``True`` (default), only robot contacts are considered. If ``False``, *any* contact invalidates the state. edge_resolution : float, optional along edges (smaller = denser checks). validity_fn : Callable[[ob.State], bool], optional Custom validity function (overrides the default MuJoCo-based checker). objective : ob.OptimizationObjective, optional Custom objective (defaults to ``PathLengthOptimizationObjective``). Returns ------- path_int : (m, n) ndarray or None Interpolated path from ``plan_motion`` (or ``None`` if planning failed). waypoints : (k, n) ndarray or None Raw solution waypoints from ``plan_motion`` (or ``None`` if failed). Raises ------ ValueError If joint dimensions do not match between inputs and model. """ # Resolve scene/model/data + aggregate joint names and robot bases if hasattr(robot, "robots"): robots = robot.robots _scene = robots[0].scene _model = _scene.model _data = _scene.data joint_names = [name for rbt in robots for name in rbt.JointNames] base_names: Iterable[str] = [rbt.BaseName for rbt in robots] else: _scene = robot.scene _model = _scene.model _data = _scene.data joint_names = list(robot.JointNames) base_names = [robot.BaseName] # Build joint addresses and bounds (user-provided helper) qaddr, bounds = get_robot_joints_data(_model, joint_names) n = bounds.shape[0] if len(q_start) != n or len(q_goal) != n: raise ValueError(f"Expected q_start/q_goal of length {n}; got {len(q_start)} and {len(q_goal)}") # Collect robot bodies and geoms (subtrees of the base bodies) robot_bodies_ids = get_body_descendants(_model, base_names, include_self=True, dedup=True) robot_geoms_ids = get_geoms_of_body(_model, robot_bodies_ids) # Validity function: if validity_fn is None: validity_fn = make_mujoco_validity_fn(_model, _data, robot_geoms_ids, qaddr, only_robot_contacts=only_robot_contacts) # --- Temporarily inflate geom margins by `clearance` and plan # Use array API if available; otherwise fall back to per-geom setter. prev_margins = None use_array_api = hasattr(_model, "geom_margin") try: if clearance > 0.0: if use_array_api: idx = np.asarray(robot_geoms_ids, dtype=int) prev_margins = _model.geom_margin[idx].copy() _model.geom_margin[idx] = clearance else: prev_margins = {g: _scene.model.geom(g).margin for g in robot_geoms_ids} for g in robot_geoms_ids: _scene.model.geom(g).margin = clearance # Call your planner (returns (path_int, waypoints)) path = plan_motion( q_start, q_goal, bounds, validity_fn=validity_fn, algorithm=algorithm, objective=objective, max_planning_time=max_planning_time, edge_resolution=edge_resolution, ) finally: # Restore original margins if prev_margins is not None: if use_array_api and isinstance(prev_margins, np.ndarray): _model.geom_margin[idx] = prev_margins else: for g, val in prev_margins.items(): _scene.model.geom(g).margin = float(val) return path