Source code for robotblockset.cameras.realsense

"""RealSense RGB-D camera backend.

This module defines a RealSense RGB-D camera wrapper built on ``pyrealsense2``.
It provides synchronized color and depth acquisition, optional point-cloud and confidence-map generation,
camera intrinsics access, and post-processing utilities such as depth alignment and hole filling.
The implementation exposes RealSense devices through the shared robotblockset RGB-D camera interface
for consistent use in perception and calibration pipelines.

Key functionalities include:
- RealSense device integration via the ``RGBDCamera`` interface.
- Configurable color/depth streaming resolution and frame rate.
- Synchronized RGB, depth, and optional confidence-map acquisition.
- Optional point-cloud generation from aligned depth and color data.
- Camera intrinsics retrieval from active device stream profiles.
- Depth post-processing support, including alignment and hole filling.

Copyright (c) 2026 Jozef Stefan Institute

Authors: Leon Zlajpah.
"""

from __future__ import annotations

import time
from typing import Any, Optional

import cv2
import numpy as np
from robotblockset.cameras.interfaces import RGBDCamera
from robotblockset.cameras.image_converter import ImageConverter
from robotblockset.rbs_typing import (
    CameraIntrinsicsMatrixType,
    CameraResolutionType,
    NumpyConfidenceMapType,
    NumpyDepthMapType,
    NumpyFloatImageType,
    NumpyIntImageType,
    PointCloud,
)

import pyrealsense2 as rs  # type: ignore

# from loguru import logger
from robotblockset.tools import get_logger

logger = get_logger(__name__)


[docs] class Realsense(RGBDCamera): """Wrapper around the pyrealsense2 library to use the RealSense cameras (tested for the D415 and D435). Design decisions we made for this class: * Depth and color fps are the same * Depth resolution is automatically set * Depth frames are always aligned to color frames * Hole filling is enabled by default * Getting the confidence map is optional (disabled by default). The L515 supports a built-in confidence map, but the D415 and D435 do not. We do not support the L515 at this time. """ # Built-in resolutions (16:9 aspect ratio) for convenience # for all resolutions see: realsense_scan_profiles.py RESOLUTION_1080 = (1920, 1080) RESOLUTION_720 = (1280, 720) RESOLUTION_540 = (960, 540) RESOLUTION_480 = (848, 480)
[docs] def __init__( self, resolution: CameraResolutionType = RESOLUTION_1080, fps: int = 30, enable_depth: bool = True, enable_pointcloud: bool = True, enable_confidence_map: bool = False, enable_hole_filling: bool = True, serial_number: Optional[str] = None, ) -> None: """ Initialize a RealSense RGB-D camera wrapper. Parameters ---------- resolution : CameraResolutionType, optional CameraResolutionType, default=RESOLUTION_1080 Color stream resolution ``(width, height)`` in pixels. fps : int, optional int, default=30 Target frame rate for color and depth streams. enable_depth : bool, optional bool, default=True Enable depth acquisition. enable_pointcloud : bool, optional bool, default=True Enable point-cloud computation from depth and color. enable_confidence_map : bool, optional bool, default=False Enable confidence map computation based on stereo infrared frames. enable_hole_filling : bool, optional bool, default=True Enable RealSense hole-filling post-processing on depth frames. serial_number : str, optional Device serial number. If omitted, the default device is used. Raises ------ ValueError If point cloud or confidence map is enabled while depth is disabled. RuntimeError If pipeline startup fails due to invalid configuration or device issues. """ self._resolution = resolution self._fps = fps self._depth_enabled = enable_depth self._pointcloud_enabled = enable_pointcloud if self._pointcloud_enabled and not self._depth_enabled: raise ValueError("enable_point_cloud can only be True if enable_depth is also True") self._confidence_enabled = enable_confidence_map if self._confidence_enabled and not self._depth_enabled: raise ValueError("enable_confidence_map can only be True if enable_depth is also True") self.hole_filling_enabled = enable_hole_filling self.serial_number = serial_number config = rs.config() if serial_number is not None: # Note: Invalid serial_number leads to RuntimeError for pipeline.start(config) config.enable_device(serial_number) config.enable_stream(rs.stream.color, resolution[0], resolution[1], rs.format.rgb8, fps) # Use max resolution that can handle the fps for depth (will be change by align_transform) depth_resolution = Realsense.RESOLUTION_720 if fps <= 30 else Realsense.RESOLUTION_480 if self._depth_enabled: config.enable_stream( rs.stream.depth, depth_resolution[0], depth_resolution[1], rs.format.z16, fps, ) if self._confidence_enabled: config.enable_stream( rs.stream.infrared, 1, depth_resolution[0], depth_resolution[1], rs.format.y8, fps, ) config.enable_stream( rs.stream.infrared, 2, depth_resolution[0], depth_resolution[1], rs.format.y8, fps, ) # Avoid having to reconnect the USB cable, see https://github.com/IntelRealSense/librealsense/issues/6628#issuecomment-646558144 ctx = rs.context() devices = ctx.query_devices() for dev in devices: dev.hardware_reset() self.pipeline = rs.pipeline() self.pipeline.start(config) profile = self.pipeline.get_active_profile() device = profile.get_device() self.Name = device.get_info(rs.camera_info.name).replace(" ", "_") # Get intrinsics matrix color_profile = rs.video_stream_profile(profile.get_stream(rs.stream.color)) intrinsics = color_profile.get_intrinsics() self._intrinsics_matrix = np.array( [ [intrinsics.fx, 0, intrinsics.ppx], [0, intrinsics.fy, intrinsics.ppy], [0, 0, 1], ] ) if self._depth_enabled: device = profile.get_device() depth_sensor = device.first_depth_sensor() self.depth_factor = depth_sensor.get_depth_scale() self._setup_depth_transforms() self.colorizer = rs.colorizer() self.colorizer.set_option(rs.option.color_scheme, 2) # 2 = White to Black
def __enter__(self) -> RGBDCamera: """ Enter context manager. Returns ------- RGBDCamera The current camera instance. """ return self def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: """ Exit context manager and stop the RealSense pipeline. Parameters ---------- exc_type : Any Exception type raised inside the context, if any. exc_value : Any Exception instance raised inside the context, if any. traceback : Any Exception traceback, if any. Returns ------- None """ self.pipeline.stop() def _setup_depth_transforms(self) -> None: """ Initialize depth-alignment and depth-filter transforms. Returns ------- None """ # Configure depth filters and transfrom, adapted from: # https://github.com/IntelRealSense/librealsense/blob/master/wrappers/python/examples/align-depth2color.py # https://github.com/IntelRealSense/librealsense/blob/jupyter/notebooks/depth_filters.ipynb self.align_transform = rs.align(rs.stream.color) self.hole_filling = rs.hole_filling_filter() @property def fps(self) -> int: """ Return the configured camera frame rate. Returns ------- int Frame rate in frames per second. """ return self._fps @property def resolution(self) -> CameraResolutionType: """ Return the configured image resolution. Returns ------- CameraResolutionType Resolution ``(width, height)`` in pixels. """ return self._resolution
[docs] def intrinsics_matrix(self) -> CameraIntrinsicsMatrixType: """ Return the color camera intrinsics matrix. Returns ------- CameraIntrinsicsMatrixType Camera matrix with shape ``(3, 3)``. """ return self._intrinsics_matrix
def _grab_images(self) -> None: """ Grab and cache the latest composite frame and derived depth data. Returns ------- None """ self._composite_frame = self.pipeline.wait_for_frames() if not self._depth_enabled: return try: aligned_frames = self.align_transform.process(self._composite_frame) except RuntimeError as e: # Sometimes, the realsense SDK throws an error with aligning RGB and depth. # This can happen if the CPU is busy: https://github.com/IntelRealSense/librealsense/issues/6628#issuecomment-647379900 # A solution is to try again. Here, we only try again once; if the error occurs again, we raise it # and let the user deal with it. logger.error(f"Error while grabbing images:\n{e}.\nWill retry in 1 second.") time.sleep(1) aligned_frames = self.align_transform.process(self._composite_frame) self._depth_frame = aligned_frames.get_depth_frame() if self.hole_filling_enabled: self._depth_frame = self.hole_filling.process(self._depth_frame) # Compute point cloud. if self._pointcloud_enabled: self._point_cloud = self._compute_point_cloud(aligned_frames) if not self._confidence_enabled: return self._infrared_frame_1 = aligned_frames.get_infrared_frame(1) self._infrared_frame_2 = aligned_frames.get_infrared_frame(2) def _compute_point_cloud(self, aligned_frames: Any) -> PointCloud: """ Compute a colored point cloud from aligned depth and color frames. Parameters ---------- aligned_frames : Any Composite frame with depth aligned to color. Returns ------- PointCloud Point cloud with 3D positions and RGB colors. """ pcd = rs.pointcloud() color_frame = aligned_frames.get_color_frame() pcd.map_to(color_frame) points = pcd.calculate(self._depth_frame) v, t = points.get_vertices(), points.get_texture_coordinates() vertices = np.asanyarray(v).view(np.float32).reshape(-1, 3) tex_coords = np.asanyarray(t).view(np.float32).reshape(-1, 2) color_image: NumpyIntImageType = np.asanyarray(color_frame.get_data()) h, w, _ = color_image.shape # tex_coords shape is (N, 2) u = tex_coords[:, 0] v = tex_coords[:, 1] # convert to pixel coordinates x = np.clip((u * w).astype(np.int32), 0, w - 1) y = np.clip((v * h).astype(np.int32), 0, h - 1) # index color_image colors = color_image[y, x] # shape (N, 3) return PointCloud(vertices, colors) def _retrieve_rgb_image(self) -> NumpyFloatImageType: """ Return the latest RGB frame as normalized float image. Returns ------- NumpyFloatImageType RGB image in floating-point representation. """ image = self._retrieve_rgb_image_as_int() return ImageConverter.from_numpy_int_format(image).image_in_numpy_format def _retrieve_rgb_image_as_int(self) -> NumpyIntImageType: """ Return the latest RGB frame as integer image. Returns ------- NumpyIntImageType RGB image in integer representation. Raises ------ RuntimeError If no frame has been grabbed yet. """ if not isinstance(self._composite_frame, rs.composite_frame): raise RuntimeError("_grab_images must be called before retrieving images") color_frame = self._composite_frame.get_color_frame() image: NumpyIntImageType = np.asanyarray(color_frame.get_data()) return image def _retrieve_depth_map(self) -> NumpyDepthMapType: """ Return the latest metric depth map. Returns ------- NumpyDepthMapType Depth map in meters. Raises ------ RuntimeError If depth is disabled. """ if not self._depth_enabled: raise RuntimeError("Cannot retrieve depth data if depth is disabled") frame = self._depth_frame image = np.asanyarray(frame.get_data()).astype(np.float32) return image * self.depth_factor def _retrieve_depth_image(self) -> NumpyIntImageType: """ Return the latest colorized depth image. Returns ------- NumpyIntImageType Colorized depth image. Raises ------ RuntimeError If depth is disabled. """ if not self._depth_enabled: raise RuntimeError("Cannot retrieve depth data if depth is disabled") frame = self._depth_frame frame_colorized = self.colorizer.colorize(frame) image = np.asanyarray(frame_colorized.get_data()) # this is uint8 with 3 channels return image def _retrieve_colored_point_cloud(self) -> PointCloud: """ Return the latest computed colored point cloud. Returns ------- PointCloud Colored point cloud. Raises ------ RuntimeError If point-cloud computation is disabled. """ if not self._pointcloud_enabled: raise RuntimeError("Cannot retrieve point cloud if point cloud is disabled") return self._point_cloud def _retrieve_confidence_map(self) -> NumpyConfidenceMapType: """ Return a confidence map derived from stereo disparity filtering. Returns ------- NumpyConfidenceMapType Confidence map normalized to ``[0, 1]``. Raises ------ RuntimeError If confidence retrieval is disabled or no frame is available. """ # Compute confidence map based on the disparity between the two IR images. if not self._confidence_enabled: raise RuntimeError("Cannot retrieve confidence data if confidence is disabled") if not isinstance(self._composite_frame, rs.composite_frame): raise RuntimeError("_grab_images must be called before retrieving images") ir1_frame = self._infrared_frame_1 ir2_frame = self._infrared_frame_2 # Convert images to numpy ir1 = np.asanyarray(ir1_frame.get_data()) ir2 = np.asanyarray(ir2_frame.get_data()) # default values for SGBM according to OpenCV docs wls_lambda = 8000.0 wls_sigma = 1.5 if not hasattr(self, "_stereo_sgbm"): max_disp = 160 # must be divisible by 16 window_size = 3 p1 = 216 # 24 * window_size ** 2 p2 = 864 # 96 * window_size ** 2 pre_filter_cap = 63 self._stereo_sgbm = cv2.StereoSGBM.create( minDisparity=0, numDisparities=max_disp, blockSize=window_size, P1=p1, P2=p2, preFilterCap=pre_filter_cap, mode=cv2.StereoSGBM_MODE_SGBM_3WAY, ) left_matcher = self._stereo_sgbm wls_filter = cv2.ximgproc.createDisparityWLSFilter(left_matcher) right_matcher = cv2.ximgproc.createRightMatcher(left_matcher) left_disp = left_matcher.compute(ir1, ir2).astype(np.float32) / 16.0 right_disp = right_matcher.compute(ir2, ir1).astype(np.float32) / 16.0 wls_filter.setLambda(wls_lambda) wls_filter.setSigmaColor(wls_sigma) wls_filter.filter(left_disp, ir1, disparity_map_right=right_disp) confidence_map = wls_filter.getConfidenceMap() return confidence_map / 255.0
if __name__ == "__main__": import robotblockset.cameras.manual_test_hw as test camera = Realsense(fps=30, resolution=Realsense.RESOLUTION_1080, enable_hole_filling=True) # Perform tests test.manual_test_camera(camera) test.manual_test_rgb_camera(camera) test.manual_test_depth_camera(camera) test.profile_rgb_throughput(camera) test.profile_rgbd_throughput(camera) # Live viewer cv2.namedWindow("RealSense RGB", cv2.WINDOW_NORMAL) cv2.namedWindow("RealSense Depth Image", cv2.WINDOW_NORMAL) cv2.namedWindow("RealSense Depth Map", cv2.WINDOW_NORMAL) while True: color_image = camera.get_rgb_image_as_int() color_image = ImageConverter.from_numpy_int_format(color_image).image_in_opencv_format depth_image = camera._retrieve_depth_image() depth_map = camera._retrieve_depth_map() cv2.imshow("RealSense RGB", color_image) cv2.imshow("RealSense Depth Image", depth_image) cv2.imshow("RealSense Depth Map", depth_map) key = cv2.waitKey(1) if key == ord("q"): break