Source code for robotblockset.optitrack.optitrack_localization

"""OptiTrack Localization Module.

This module provides functionality for localizing rigid bodies using the OptiTrack NatNet system.
It includes methods to fetch real-time frame data, manage rigid body and marker sets, and perform
transformations between task spaces (e.g., world and optical frames).

The module integrates with the NatNet client to retrieve and process motion capture data.

Copyright (c) 2025- Jozef Stefan Institute

Authors: Leon Zlajpah.
"""

import time
import numpy as np
from time import sleep
import copy
from typing import Callable, Optional, Union

from robotblockset.rbs_typing import ArrayLike
from robotblockset.optitrack.natnet_client import NatNetClient
from robotblockset.tools import _struct, rbs_object, rbs_type, check_option, isvector
from robotblockset.transformations import map_pose, world2frame


class _default(_struct):
    """
    Default configuration class for the optitrack_localization.

    Attributes
    ----------
    TaskSpace : str
        The task space for pose representation. Default is "World".
    TaskPoseForm : str
        The pose form to use ("Pose", "Position", etc.). Default is "Pose".
    UpdateTime : float
        Time interval between updates. Default is 1.0 seconds.
    """

    def __init__(self) -> None:
        """
        Initialize the default OptiTrack localization settings.

        Returns
        -------
        None
            This constructor initializes the default configuration object in place.
        """
        self.TaskSpace = "World"
        self.TaskPoseForm = "Pose"
        self.UpdateTime = 1.0


[docs] class optitrack_localization(rbs_object): """ A class for managing the localization of objects (rigid bodies) using OptiTrack NatNet. Attributes ---------- host_ip : str IP address of the OptiTrack host. client_ip : str IP address of the client. multicast_ip : str Multicast IP address for OptiTrack communication. natnet_model : MoCapModel, optional Model description from the OptiTrack system. frame : MoCapFrame, optional Latest frame captured from OptiTrack. bodies : list List of rigid body names. n_bodies : int Number of rigid bodies. markersets : list List of marker set names. n_markersets : int Number of marker sets. n_markers : int Total number of markers. n_markers_in_set : list List with the number of markers in each marker set. BodyID : int ID for the platform's rigid body. last_valid_body_data : RigidBody, optional Last valid rigid body data for the platform. last_valid_body_frame_id : int, optional Last frame ID with valid rigid body data. last_valid_body_frame_time : float, optional Last frame time with valid rigid body data. TWorld : np.ndarray Transformation matrix between optical and world frames. client : NatNetClient The NatNet client used to interface with OptiTrack. """
[docs] def __init__(self, host_ip: str = "127.0.0.1", client_ip: str = "127.0.0.1", multicast_ip: str = "239.255.42.99", frame_callback: Optional[Callable[..., None]] = None, body_callback: Optional[Callable[..., None]] = None) -> None: """ Initializes the optitrack_localization class by connecting to the OptiTrack system. Parameters ---------- host_ip : str, optional IP address of the OptiTrack host. Default is "127.0.0.1". client_ip : str, optional IP address of the client. Default is "127.0.0.1". multicast_ip : str, optional Multicast IP address for OptiTrack communication. Default is "239.255.42.99". frame_callback : Callable[..., None], optional Callback function to process new frames. body_callback : Callable[..., None], optional Callback function to process rigid body data. """ self.Name = "OptiTrack" self.host_ip = host_ip self.client_ip = client_ip self.multicast_ip = multicast_ip self._default = _default() # Default options self.natnet_model = None # OptiTrack model self.frame = None # OptiTrack captured frame self.bodies = {} # Optitrack bodies self.n_bodies = 0 # number of rigid bodies self.markersets = {} # Optitrack marker sets self.n_markersets = 0 # number of marker sets self.n_markers = 0 # number of all markers self.n_markers_in_set = [] # number of markers in each marker set self.BodyID = 0 # rigidbody id for platform self.last_valid_body_data = None # last valid rigidbody data for platform self.last_valid_body_frame_id = None # last frame id with valid rigidybody data for platform self.last_valid_body_frame_time = None # last frame time with valid rigidybody data for platform self.TWorld = np.eye(4) # transformation between optical and world frame self.last_update = time.time() self.client = NatNetClient() self.client.localIPAddress = self.client_ip self.client.serverIPAddress = self.host_ip self.client.multicastAddress = "239.255.42.99" if frame_callback is not None: self.client.newFrameListener = frame_callback if body_callback is not None: self.client.rigidBodyListener = body_callback self.client.run() i = 0 while self.client.Model is None: i += 1 if i > 3: raise ConnectionError("Client did not get the model") self.client.GetModel() sleep(0.1) i = 0 while self.frame is None: i += 1 if i > 3: raise ConnectionError("Client did not get the frame") self.GetFrame() sleep(0.1) self.natnet_model = self.client.GetModelDescription() if self.natnet_model.RigidBodyCount < 1: self.WarningMessage("No rigid bodies defined") else: self.n_bodies = self.natnet_model.RigidBodyCount self.bodies = self.natnet_model.GetRigidBodyNames() if self.natnet_model.MarkerSetCount < 1: self.WarningMessage("No marker sets defined") else: self.n_markersets = self.natnet_model.MarkerSetCount self.markersets = self.natnet_model.GetMarkerSetNames() self.n_markers_in_set = self.natnet_model.GetMarkerSetMarkerCount() self.n_markers = sum(self.n_markers_in_set[:-1])
[docs] def isReady(self) -> bool: """ Checks if the OptiTrack system is ready (connected). Returns ------- bool True if the system is connected and ready, False otherwise. """ return self.client.isConnected()
[docs] def InitObject(self) -> None: """ Initializes the object for various attributes. Returns ------- None This method does not return any value. It modifies the internal state of the object. """ pass
[docs] def SetBodyID(self, id: Union[int, str] = 0) -> None: """ Sets the ID of the rigid body to be used for localization. Parameters ---------- id : Union[int, str], optional The rigid body ID or name to set. Default is 0. Raises ------ ValueError If the rigid body ID is invalid. """ if isinstance(id, str): if id in self.bodies: id = self.name2id("body", id) else: raise ValueError(f"Rigid body {id} does not exist") if id < self.frame.RigidBodyCount: self.BodyID = id self.last_valid_body_data = None self.GetFrame() if self.last_valid_body_data is None: print(f"No valid data for platform Rigidbody {id}") else: raise ValueError("Wrong RigidBody ID for platform")
@property def x(self) -> np.ndarray: """ Returns the current pose (position and orientation) of the rigid body. Returns ------- np.ndarray The pose as a 7-element array [position, orientation]. """ return copy.deepcopy(self.GetPose(out="x")) @property def p(self) -> np.ndarray: """ Returns the current position of the rigid body. Returns ------- np.ndarray The position as a 3-element array. """ return copy.deepcopy(self.GetPose(out="p")) @property def Q(self) -> np.ndarray: """ Returns the current orientation of the rigid body. Returns ------- np.ndarray The orientation as a 4-element array (quaternion). """ return copy.deepcopy(self.GetPose(out="Q")) @property def R(self) -> np.ndarray: """ Returns the current rotation matrix of the rigid body. Returns ------- np.ndarray The rotation matrix as a 3x3 matrix. """ return copy.deepcopy(self.GetPose(out="R")) @property def T(self) -> np.ndarray: """ Returns the current transformation matrix of the rigid body. Returns ------- np.ndarray The transformation matrix as a 4x4 matrix. """ return copy.deepcopy(self.GetPose(state="Actual", task_space="World", out="T"))
[docs] def spatial(self, x: ArrayLike) -> np.ndarray: """ Validates the shape of the input `x` and returns it in an appropriate format. Parameters ---------- x : ArrayLike The input array representing a spatial quantity, which can be one of the following shapes: - (7,) : pose (position and quaternion) - (4, 4) : transformation matrix - (3,) : position vector - (4,) : quaternion - (3, 3) : rotation matrix - (6,) : twist (linear and angular velocity) - (3, 4) : homogeneous matrix without the last row (assumed to be 3x4) Returns ------- np.ndarray The input `x` in the validated shape, possibly modified if the shape was (3, 4). Raises ------ TypeError If the input `x` does not have a valid shape. """ x = rbs_type(x) # Check for valid shapes if x.shape == (7,) or x.shape == (4, 4) or x.shape == (3,) or x.shape == (4,) or x.shape == (3, 3) or x.shape == (6,): return x elif x.shape == (3, 4): x = np.vstack((x, np.array([0, 0, 0, 1]))) return x else: raise TypeError("Parameter has not proper shape")
[docs] def SetWorld(self, x: ArrayLike) -> None: """ Sets the transformation between the world frame and the optical frame. Parameters ---------- x : ArrayLike The transformation matrix or pose to set. """ _x = self.spatial(x) if _x.shape == (4, 4): _T = _x elif isvector(_x, dim=7): _T = map_pose(x=_x, out="T") else: raise ValueError(f"Base pose shape {_x.shape} not supported") self.TWorld = _T
[docs] def GetFrameTime(self) -> float: """ Retrieves the time of the current frame if available. This method checks if the `frame` attribute is not `None` and returns the time of the current frame. If the `frame` is `None`, it returns a default value of 0. Returns ------- float The time of the current frame. Returns 0 if the `frame` is not set. """ if self.frame is not None: return self.frame.Time else: return 0
[docs] def GetPose(self, out: Optional[str] = None, task_space: Optional[str] = None) -> np.ndarray: """ Gets the pose of the rigid body in the specified task space. Parameters ---------- out : str, optional The output format ("Pose", "Position", "Quaternion", "RotationMatrix", etc.). task_space : str, optional The task space frame ("World", "Optical"). Returns ------- np.ndarray The pose in the requested format. """ if out is None: out = self._default.TaskPoseForm if task_space is None: task_space = self._default.TaskSpace self.GetFrame() _x = self.last_valid_body_data.x if check_option(task_space, "World"): _x = world2frame(_x, self.TWorld) elif check_option(task_space, "Optical"): pass else: raise ValueError(f"Task space '{task_space}' not supported in GetPose") return map_pose(x=_x, out=out)
[docs] def GetPos(self, out: str = "p", task_space: Optional[str] = None) -> np.ndarray: """ Get the platform's position in the specified task space. Parameters ---------- out : str, optional The output form of the position. By default, it returns the position ("p"). Other possible values: "Position". task_space : str, optional The task space frame in which to return the position. By default, it is "World". Other options may include "Robot" or "Object". Returns ------- np.ndarray The platform's position, represented as a 3-element array (3,). Raises ------ ValueError If the `out` parameter is not one of the supported values. """ if out in ["Position", "p"]: return self.GetPose(out=out, task_space=task_space) else: raise ValueError(f"Output form '{out}' not supported in GetPos")
[docs] def GetOri(self, out: str = "Q", task_space: Optional[str] = None) -> np.ndarray: """ Get the platform's orientation in the specified task space. Parameters ---------- out : str, optional The output form of the orientation. By default, it returns the orientation as a quaternion ("Q"). Other possible values: "Quaternion", "RotationMatrix", "R". task_space : str, optional The task space frame in which to return the orientation. By default, it is "World". Other options may include "Robot" or "Object". Returns ------- np.ndarray The platform's orientation, either as a quaternion (4,) or rotation matrix (3, 3). Raises ------ ValueError If the `out` parameter is not one of the supported values. """ if out in ["Quaternion", "Q", "RotationMatrix", "R"]: return self.GetPose(out=out, task_space=task_space) else: raise ValueError(f"Output form '{out}' not supported in GetOri")
[docs] def GetRigidBodyPose(self, id: Union[int, str] = 0, task_space: Optional[str] = None, out: Optional[str] = None) -> np.ndarray: """ Get the pose of a specific rigid body in the specified task space. Parameters ---------- id : Union[int, str], optional The ID of the rigid body. By default, it is 0. If a string is provided and it corresponds to a body name, it will be converted to the corresponding ID. task_space : str, optional The task space frame in which to return the pose. By default, it is the task space defined in the default settings. out : str, optional The output form of the pose. By default, it is set to the task pose form defined in the default settings. Returns ------- np.ndarray The pose of the rigid body, represented as a 7-element array (position + orientation). Raises ------ ValueError If the `id` is incorrect, or if the rigid body does not exist. """ if out is None: out = self._default.TaskPoseForm if isinstance(id, str): if id in self.bodies: id = self.name2id("body", id) else: raise ValueError(f"Rigid body {id} does not exist") if id < self.frame.RigidBodyCount: return self.GetRigidBodies(task_space=task_space, out=out)[id] else: raise ValueError("Wrong RigidBody ID for platform")
# ----------------------------------------------------------
[docs] def GetModel(self) -> None: """ Fetch the model from OptiTrack. This method retrieves the current model from the OptiTrack system via the `client`. It waits briefly after fetching the model and then retrieves the model description. Returns ------- None """ self.client.GetModel() sleep(0.1) self.natnet_model = self.client.GetModelDescription()
[docs] def GetFrame(self) -> None: """ Fetch the latest frame from OptiTrack. This method retrieves the latest frame of data from the OptiTrack system. It checks if the frame contains valid data for the rigid body specified by the `BodyID` and updates relevant fields accordingly. Returns ------- None """ self.client.GetFrame() self.frame = self.client.Frame if self.frame is not None: if (self.frame.RigidBodyCount > self.BodyID) and (self.frame.RigidBodies[self.BodyID].Tracked): self.last_valid_body_data = self.frame.RigidBodies[self.BodyID] self.last_valid_body_frame_id = self.frame.FrameID self.last_valid_body_frame_time = self.frame.Time
[docs] def GetRigidBodies(self, task_space: Optional[str] = None, out: str = "x") -> np.ndarray: """ Retrieves the poses of all rigid bodies. Parameters ---------- task_space : str, optional The task space frame ("World", "Optical"). out : str, optional The output format ("Pose", "Position", "Quaternion", etc.). Returns ------- np.ndarray The poses of all rigid bodies. """ if task_space is None: task_space = self._default.TaskSpace self.GetFrame() _x = self.frame.RigidBodies_x if check_option(task_space, "World"): if isvector(_x): _x = world2frame(_x, self.TWorld) else: for i in range(_x.shape[0]): _x[i, :] = world2frame(_x[i, :], self.TWorld) elif check_option(task_space, "Optical"): pass else: raise ValueError(f"Task space '{task_space}' not supported in GetPose") return map_pose(x=_x, out=out)
[docs] def GetMarkers(self, markerset_id: Optional[Union[int, str]] = None, task_space: Optional[str] = None, out: str = "p") -> Optional[np.ndarray]: """ Retrieves the marker positions for a specified marker set. Parameters ---------- markerset_id : Union[int, str], optional The marker set ID or name. task_space : str, optional The task space frame ("World", "Optical"). out : str, optional The output format ("Position", "p"). Returns ------- np.ndarray or None The marker positions in the requested format, or None if unavailable. """ if task_space is None: task_space = self._default.TaskSpace self.client.GetFrame() if isinstance(markerset_id, str): if markerset_id in self.markersets: id = self.name2id("markerset", markerset_id) else: raise ValueError(f"MarkerSet {markerset_id} does not exist") else: id = markerset_id try: _p = self.client.Frame.MarkerSets[id].Markers_p if check_option(task_space, "World"): if isvector(_p): _p = world2frame(_p, self.TWorld) else: for i in range(_p.shape[0]): _p[i, :] = world2frame(_p[i, :], self.TWorld) elif check_option(task_space, "Optical"): pass else: raise ValueError(f"Task space '{task_space}' not supported in GetPose") return map_pose(p=_p, out=out) except Exception: return None
[docs] def name2id(self, typ: str = "body", name: Optional[str] = None) -> Optional[int]: """ Converts the name of an object (rigid body or marker set) to its corresponding ID. Parameters ---------- typ : str, optional The type of object ("body" or "markerset"). name : str, optional The name of the object. Returns ------- Optional[int] The ID of the object if it exists, otherwise None. Raises ------ ValueError If the object type is not supported. """ idx = None if name is not None: if check_option(typ, "body"): if name in self.bodies: idx = self.bodies.index(name) elif check_option(typ, "markerset"): if name in self.markersets: idx = self.markersets.index(name) else: raise ValueError(f"Object type '{typ}' not supported in name2id") return idx
[docs] def receiveNewFrame(frameNumber: int, markerSetCount: int, unlabeledMarkersCount: int, rigidBodyCount: int, skeletonCount: int, labeledMarkerCount: int, timecode: int, timecodeSub: int, timestamp: float, isRecording: bool, trackedModelsChanged: bool) -> None: print("Received frame", frameNumber)
[docs] def receiveRigidBodyFrame(id: int, position: ArrayLike, rotation: ArrayLike) -> None: print("Received frame for rigid body", id)
# if __name__ == "__main__": if __name__ == "__main__": np.set_printoptions(formatter={"float": "{: 0.4f}".format}) opti = optitrack_localization() opti.GetFrame() for i in range(2): sleep(0.5) opti.GetFrame() print(f"Frame: {opti.frame.FrameID}\nRigidBodies:\n", opti.frame.RigidBodies_x) print("Markers:\n", opti.frame.MarkerSets[-1].Markers_p) opti.SetWorld([0, 0, 10, 1, 0, 0, 0]) print(opti.TWorld) print(f"Frame: {opti.frame.FrameID} RigidBodies:\n", opti.GetRigidBodies())