From c7f96895e39114c3a97af570703abb27c64d3bcd Mon Sep 17 00:00:00 2001 From: Joseph Mikhail Date: Tue, 17 Feb 2026 17:30:36 -0800 Subject: [PATCH] Carry Ben non-CLI/non-telemetry changes from main base --- dsg-jit/dsg_jit/datasets/__init__.py | 23 - dsg-jit/dsg_jit/datasets/kitti_odometry.py | 230 ------ dsg-jit/dsg_jit/datasets/tum_rgbd.py | 258 ------ dsg-jit/dsg_jit/sensors/__init__.py | 0 dsg-jit/dsg_jit/sensors/base.py | 189 ----- dsg-jit/dsg_jit/sensors/camera.py | 243 ------ dsg-jit/dsg_jit/sensors/conversion.py | 904 --------------------- dsg-jit/dsg_jit/sensors/fusion.py | 487 ----------- dsg-jit/dsg_jit/sensors/imu.py | 214 ----- dsg-jit/dsg_jit/sensors/integration.py | 89 -- dsg-jit/dsg_jit/sensors/lidar.py | 217 ----- dsg-jit/dsg_jit/sensors/streams.py | 269 ------ 12 files changed, 3123 deletions(-) delete mode 100644 dsg-jit/dsg_jit/datasets/__init__.py delete mode 100644 dsg-jit/dsg_jit/datasets/kitti_odometry.py delete mode 100644 dsg-jit/dsg_jit/datasets/tum_rgbd.py delete mode 100644 dsg-jit/dsg_jit/sensors/__init__.py delete mode 100644 dsg-jit/dsg_jit/sensors/base.py delete mode 100644 dsg-jit/dsg_jit/sensors/camera.py delete mode 100644 dsg-jit/dsg_jit/sensors/conversion.py delete mode 100644 dsg-jit/dsg_jit/sensors/fusion.py delete mode 100644 dsg-jit/dsg_jit/sensors/imu.py delete mode 100644 dsg-jit/dsg_jit/sensors/integration.py delete mode 100644 dsg-jit/dsg_jit/sensors/lidar.py delete mode 100644 dsg-jit/dsg_jit/sensors/streams.py diff --git a/dsg-jit/dsg_jit/datasets/__init__.py b/dsg-jit/dsg_jit/datasets/__init__.py deleted file mode 100644 index ee3ee2e..0000000 --- a/dsg-jit/dsg_jit/datasets/__init__.py +++ /dev/null @@ -1,23 +0,0 @@ -""" -Lightweight dataset loaders for DSG-JIT. - -This module provides convenience helpers to load popular SLAM / VO datasets -into simple Python structures that can be wired into the `sensors.*` and -`world.*` subsystems (e.g. camera, LiDAR, pose sequences). - -The goal is to: - -* Keep I/O and parsing logic separate from the core optimization engine. -* Provide small, explicit dataclasses for each dataset family. -* Avoid heavy dependencies (no hard requirement on OpenCV, etc.). -""" - -from .tum_rgbd import TumRgbdFrame, load_tum_rgbd_sequence -from .kitti_odometry import KittiOdomFrame, load_kitti_odometry_sequence - -__all__ = [ - "TumRgbdFrame", - "load_tum_rgbd_sequence", - "KittiOdomFrame", - "load_kitti_odometry_sequence", -] \ No newline at end of file diff --git a/dsg-jit/dsg_jit/datasets/kitti_odometry.py b/dsg-jit/dsg_jit/datasets/kitti_odometry.py deleted file mode 100644 index 68223c2..0000000 --- a/dsg-jit/dsg_jit/datasets/kitti_odometry.py +++ /dev/null @@ -1,230 +0,0 @@ -from __future__ import annotations - -import os -from dataclasses import dataclass -from pathlib import Path -from typing import List, Optional, Tuple - - -@dataclass -class KittiOdomFrame: - """ - Single frame from the KITTI Odometry dataset. - - This dataclass provides file paths and optional ground-truth pose for a - particular frame index in a given sequence. - - :param seq: - KITTI odometry sequence ID (e.g. ``"00"``, ``"05"``). - :type seq: str - - :param idx: - Integer frame index within the sequence. - :type idx: int - - :param t: - Approximate timestamp in seconds. Many downstream pipelines assume - KITTI odometry runs at 10 Hz, so a common convention is - ``t = idx / 10.0``. - :type t: float - - :param left_path: - Path to the left camera image (``image_0``) for this frame. - :type left_path: str - - :param right_path: - Optional path to the right camera image (``image_1``). May be ``None`` - if not available or desired. - :type right_path: Optional[str] - - :param velo_path: - Optional path to the LiDAR point cloud (``velodyne``). May be ``None`` - if not available or desired. - :type velo_path: Optional[str] - - :param T_w_cam0: - Optional 4x4 homogeneous transform from camera-0 to world frame as a - flattened 16-element tuple in row-major order. This is derived from - the official ``poses/.txt`` file if available. - :type T_w_cam0: Optional[Tuple[float, ...]] - """ - - seq: str - idx: int - t: float - left_path: str - right_path: Optional[str] = None - velo_path: Optional[str] = None - T_w_cam0: Optional[Tuple[float, ...]] = None - - -def _load_kitti_poses(poses_path: Path) -> List[Tuple[float, ...]]: - """ - Load KITTI odometry ground-truth poses from a ``poses/.txt`` file. - - Each line is expected to contain 12 numbers corresponding to the first - three rows of a 4x4 homogeneous transform: - - ``r11 r12 r13 tx r21 r22 r23 ty r31 r32 r33 tz`` - - This helper converts each pose to a flattened 4x4 matrix with a final - row ``[0, 0, 0, 1]``. - - :param poses_path: - Path to the KITTI poses file for a sequence. - :type poses_path: pathlib.Path - - :return: - List of flattened 4x4 transforms in row-major order, one per frame. - :rtype: List[Tuple[float, ...]] - """ - poses: List[Tuple[float, ...]] = [] - - if not poses_path.exists(): - return poses - - with poses_path.open("r") as f: - for line in f: - line = line.strip() - if not line: - continue - vals = [float(x) for x in line.split()] - if len(vals) != 12: - continue - - r11, r12, r13, tx = vals[0:4] - r21, r22, r23, ty = vals[4:8] - r31, r32, r33, tz = vals[8:12] - mat4 = ( - r11, r12, r13, tx, - r21, r22, r23, ty, - r31, r32, r33, tz, - 0.0, 0.0, 0.0, 1.0, - ) - poses.append(mat4) - - return poses - - -def load_kitti_odometry_sequence( - root: str | os.PathLike, - seq: str, - load_right: bool = True, - load_velodyne: bool = False, - with_poses: bool = True, - max_frames: Optional[int] = None, -) -> List[KittiOdomFrame]: - """ - Load a KITTI Odometry sequence into a list of frames. - - This helper assumes the standard KITTI directory structure: - - .. code-block:: - - root/ - sequences/ - 00/ - image_0/ - image_1/ - velodyne/ - calib.txt - poses/ - 00.txt - - Only metadata (paths and ground-truth transforms) is loaded; images and - point clouds are not read into memory. - - :param root: - Path to the KITTI odometry dataset root directory. - :type root: Union[str, os.PathLike] - - :param seq: - Sequence ID string (e.g. ``"00"``, ``"01"``). - :type seq: str - - :param load_right: - Whether to populate ``right_path`` pointing to ``image_1``. If ``False``, - the field will always be ``None``. - :type load_right: bool - - :param load_velodyne: - Whether to populate ``velo_path`` pointing to ``velodyne`` scans. If - ``False``, the field will always be ``None``. - :type load_velodyne: bool - - :param with_poses: - Whether to attempt to load ground-truth poses from ``poses/.txt``. - :type with_poses: bool - - :param max_frames: - Optional maximum number of frames to return. If ``None``, all available - frames in the left camera directory are used. - :type max_frames: Optional[int] - - :return: - List of :class:`KittiOdomFrame` entries with timestamps, paths, and - optionally ground-truth transforms. - :rtype: List[KittiOdomFrame] - """ - root_path = Path(root) - seq_str = f"{int(seq):02d}" # normalize "0" -> "00" - - seq_dir = root_path / "sequences" / seq_str - left_dir = seq_dir / "image_0" - right_dir = seq_dir / "image_1" - velo_dir = seq_dir / "velodyne" - - if not left_dir.exists(): - raise FileNotFoundError(f"Left image directory not found: {left_dir}") - - # Determine frame indices from left camera images - left_files = sorted(left_dir.glob("*.png")) - if not left_files: - raise FileNotFoundError(f"No left images found in {left_dir}") - - # Optional poses - poses: List[Tuple[float, ...]] = [] - if with_poses: - poses_path = root_path / "poses" / f"{seq_str}.txt" - poses = _load_kitti_poses(poses_path) - - frames: List[KittiOdomFrame] = [] - - for idx, left_path in enumerate(left_files): - t = idx / 10.0 # KITTI odometry is ~10 Hz; good enough for indexing. - - right_path: Optional[str] = None - velo_path: Optional[str] = None - T_w_cam0: Optional[Tuple[float, ...]] = None - - if load_right: - candidate = right_dir / left_path.name - if candidate.exists(): - right_path = str(candidate) - - if load_velodyne: - # KITTI velodyne uses "*.bin" with same numeric frame index - stem = left_path.stem # e.g. "000000" - candidate = velo_dir / f"{stem}.bin" - if candidate.exists(): - velo_path = str(candidate) - - if with_poses and idx < len(poses): - T_w_cam0 = poses[idx] - - frames.append( - KittiOdomFrame( - seq=seq_str, - idx=idx, - t=t, - left_path=str(left_path), - right_path=right_path, - velo_path=velo_path, - T_w_cam0=T_w_cam0, - ) - ) - - if max_frames is not None and len(frames) >= max_frames: - break - - return frames \ No newline at end of file diff --git a/dsg-jit/dsg_jit/datasets/tum_rgbd.py b/dsg-jit/dsg_jit/datasets/tum_rgbd.py deleted file mode 100644 index 9cdbe39..0000000 --- a/dsg-jit/dsg_jit/datasets/tum_rgbd.py +++ /dev/null @@ -1,258 +0,0 @@ -from __future__ import annotations - -import os -from dataclasses import dataclass -from pathlib import Path -from typing import Dict, Iterable, List, Optional, Sequence, Tuple - - -@dataclass -class TumRgbdFrame: - """ - Single RGB-D frame from a TUM RGB-D sequence. - - This dataclass stores only light-weight metadata: timestamps and relative - file paths. Consumers are responsible for actually loading images/depth - (e.g., via OpenCV or Pillow) if desired. - - :param t: - Timestamp in seconds (as parsed from the TUM text files). - :type t: float - - :param rgb_path: - Relative or absolute path to the RGB image file corresponding to this - frame. Typically something like ``"rgb/1341847980.722988.png"``. - :type rgb_path: str - - :param depth_path: - Optional path to the depth image associated with this frame. May be - ``None`` if depth is not available or ``use_depth=False`` was passed - to the loader. - :type depth_path: Optional[str] - - :param pose_quat: - Optional ground-truth pose as a 7-tuple - ``(tx, ty, tz, qx, qy, qz, qw)`` in TUM convention. May be ``None`` - if ground truth is unavailable or alignment was disabled. - :type pose_quat: Optional[Tuple[float, float, float, float, float, float, float]] - """ - - t: float - rgb_path: str - depth_path: Optional[str] = None - pose_quat: Optional[Tuple[float, float, float, float, float, float, float]] = None - - -def _parse_tum_list_file(path: Path) -> List[Tuple[float, str]]: - """ - Parse a TUM-style list file (e.g. ``rgb.txt`` or ``depth.txt``). - - Each non-comment line is expected to have: - - `` `` - - :param path: - Path to the TUM list file. - :type path: pathlib.Path - - :return: - A list of ``(timestamp, relative_path)`` pairs sorted by time. - :rtype: List[Tuple[float, str]] - """ - items: List[Tuple[float, str]] = [] - - with path.open("r") as f: - for line in f: - line = line.strip() - if not line or line.startswith("#"): - continue - parts = line.split() - if len(parts) < 2: - continue - t = float(parts[0]) - rel = parts[1] - items.append((t, rel)) - - items.sort(key=lambda x: x[0]) - return items - - -def _parse_tum_groundtruth(path: Path) -> List[Tuple[float, Tuple[float, float, float, float, float, float, float]]]: - """ - Parse a TUM groundtruth file (``groundtruth.txt``). - - Lines follow the standard format: - - ``timestamp tx ty tz qx qy qz qw`` - - :param path: - Path to ``groundtruth.txt``. - :type path: pathlib.Path - - :return: - A list of ``(timestamp, (tx, ty, tz, qx, qy, qz, qw))`` tuples sorted by - time. - :rtype: List[Tuple[float, Tuple[float, float, float, float, float, float, float]]] - """ - poses: List[Tuple[float, Tuple[float, float, float, float, float, float, float]]] = [] - - with path.open("r") as f: - for line in f: - line = line.strip() - if not line or line.startswith("#"): - continue - parts = line.split() - if len(parts) != 8: - continue - t = float(parts[0]) - tx, ty, tz = (float(parts[1]), float(parts[2]), float(parts[3])) - qx, qy, qz, qw = (float(parts[4]), float(parts[5]), float(parts[6]), float(parts[7])) - poses.append((t, (tx, ty, tz, qx, qy, qz, qw))) - - poses.sort(key=lambda x: x[0]) - return poses - - -def _associate_by_timestamp( - primary: Sequence[Tuple[float, str]], - secondary: Sequence[Tuple[float, str]], - max_diff: float = 0.02, -) -> Dict[int, Optional[int]]: - """ - Greedy association of two timestamped streams by nearest neighbor. - - Used to match RGB and depth streams, or frames to ground truth. For each - index in ``primary``, we find the closest timestamp in ``secondary`` within - a given maximum difference. - - :param primary: - Primary sequence of ``(timestamp, path)`` pairs. - :type primary: Sequence[Tuple[float, str]] - - :param secondary: - Secondary sequence of ``(timestamp, path)`` pairs. - :type secondary: Sequence[Tuple[float, str]] - - :param max_diff: - Maximum allowed time difference in seconds. If the closest match - exceeds this, the association will be ``None``. - :type max_diff: float - - :return: - A dictionary mapping indices in ``primary`` to indices in - ``secondary`` or ``None`` if no suitable match was found. - :rtype: Dict[int, Optional[int]] - """ - assoc: Dict[int, Optional[int]] = {} - - j = 0 - for i, (t0, _) in enumerate(primary): - # Advance j while secondary[j] is before t0 - while j + 1 < len(secondary) and secondary[j + 1][0] <= t0: - j += 1 - # Check nearest of j or j+1 - best_j = None - best_dt = float("inf") - for jj in (j, j + 1): - if 0 <= jj < len(secondary): - dt = abs(secondary[jj][0] - t0) - if dt < best_dt: - best_dt = dt - best_j = jj - assoc[i] = best_j if (best_j is not None and best_dt <= max_diff) else None - - return assoc - - -def load_tum_rgbd_sequence( - root: str | os.PathLike, - use_depth: bool = True, - use_groundtruth: bool = False, - max_frames: Optional[int] = None, - max_time_diff: float = 0.02, -) -> List[TumRgbdFrame]: - """ - Load a TUM RGB-D sequence directory into a list of frames. - - The directory is expected to contain standard TUM files such as - ``rgb.txt``, ``depth.txt``, and optionally ``groundtruth.txt``. This - loader parses metadata and returns a list of :class:`TumRgbdFrame` - instances, but does not actually load images or depth maps. - - :param root: - Path to the TUM sequence root directory. - :type root: Union[str, os.PathLike] - - :param use_depth: - Whether to attempt to associate depth frames from ``depth.txt`` with - each RGB frame. - :type use_depth: bool - - :param use_groundtruth: - Whether to attempt to associate ground-truth poses from - ``groundtruth.txt`` with each RGB frame. - :type use_groundtruth: bool - - :param max_frames: - Optional maximum number of frames to return. If ``None``, the full - sequence is loaded. - :type max_frames: Optional[int] - - :param max_time_diff: - Maximum allowed absolute difference in timestamps (seconds) when - associating RGB with depth and ground truth. - :type max_time_diff: float - - :return: - A list of TUM RGB-D frames with timestamps, file paths, and optionally - ground-truth poses. - :rtype: List[TumRgbdFrame] - """ - root_path = Path(root) - - rgb_entries = _parse_tum_list_file(root_path / "rgb.txt") - - depth_entries: List[Tuple[float, str]] = [] - if use_depth and (root_path / "depth.txt").exists(): - depth_entries = _parse_tum_list_file(root_path / "depth.txt") - - gt_entries: List[Tuple[float, Tuple[float, float, float, float, float, float, float]]] = [] - if use_groundtruth and (root_path / "groundtruth.txt").exists(): - gt_entries = _parse_tum_groundtruth(root_path / "groundtruth.txt") - - depth_assoc: Dict[int, Optional[int]] = {} - gt_assoc: Dict[int, Optional[int]] = {} - - if depth_entries: - depth_assoc = _associate_by_timestamp(rgb_entries, depth_entries, max_diff=max_time_diff) - - if gt_entries: - # Convert to (t, dummy_path) to reuse association logic - gt_ts_paths = [(t, "") for (t, _pose) in gt_entries] - gt_assoc = _associate_by_timestamp(rgb_entries, gt_ts_paths, max_diff=max_time_diff) - - frames: List[TumRgbdFrame] = [] - - for i, (t_rgb, rgb_rel) in enumerate(rgb_entries): - depth_rel: Optional[str] = None - pose_quat: Optional[Tuple[float, float, float, float, float, float, float]] = None - - if depth_entries and i in depth_assoc and depth_assoc[i] is not None: - depth_rel = depth_entries[depth_assoc[i]][1] - - if gt_entries and i in gt_assoc and gt_assoc[i] is not None: - pose_quat = gt_entries[gt_assoc[i]][1] - - frames.append( - TumRgbdFrame( - t=t_rgb, - rgb_path=str(root_path / rgb_rel), - depth_path=str(root_path / depth_rel) if depth_rel is not None else None, - pose_quat=pose_quat, - ) - ) - - if max_frames is not None and len(frames) >= max_frames: - break - - return frames \ No newline at end of file diff --git a/dsg-jit/dsg_jit/sensors/__init__.py b/dsg-jit/dsg_jit/sensors/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/dsg-jit/dsg_jit/sensors/base.py b/dsg-jit/dsg_jit/sensors/base.py deleted file mode 100644 index a39d7af..0000000 --- a/dsg-jit/dsg_jit/sensors/base.py +++ /dev/null @@ -1,189 +0,0 @@ -# Copyright (c) 2025. -# This file is part of DSG-JIT, released under the Business Source License 1.1. -""" -Abstract base classes and shared interfaces for DSG-JIT sensor modules. - -This module defines the foundational API that all sensor types in DSG-JIT -(IMU, LiDAR, RGB cameras, range sensors, etc.) are expected to implement. -By unifying the contract between sensor objects and the rest of the system, -DSG-JIT enables plug-and-play multi-sensor integration without requiring -special-case logic for each modality. - -The goal of this module is to answer a simple question: - - “What does it mean to be a sensor in DSG-JIT?” - -------------------------------------------------------------------------------- -Core Components -------------------------------------------------------------------------------- - -**BaseSensor** - The abstract parent class for all sensors. It typically defines: - - - ``initialize()`` — optional setup before streaming begins - - ``read()`` — return a *single typed measurement* - - ``close()`` — release hardware or simulation resources - - Subclasses (e.g., ``IMUSensor``, ``LiDARSensor``, ``CameraSensor``) - implement their own measurement-specific logic, but all present - a consistent surface to the rest of DSG-JIT. - -**BaseMeasurement** - A typed container for the output of a sensor. - Every measurement has: - - - ``timestamp`` - - device-specific payload (e.g., accelerations, images, point clouds) - - The purpose of this common type is to make downstream modules— - factor graphs, dynamic scene graphs, training loops, logging tools— - treat all measurements uniformly. - -------------------------------------------------------------------------------- -Design Goals -------------------------------------------------------------------------------- - -1. **Unified sensor API** - All sensors behave the same from the perspective of DSG-JIT’s - world model, optimization routines, and streaming helpers. - -2. **Compatibility with synchronous & asynchronous streams** - The interfaces defined here are intentionally minimal so that - ``streams.py`` can wrap *any* sensor using either Python loops or - asyncio-based background tasks. - -3. **Future-proof extensibility** - The goal is for users to implement custom sensors (GPS, UWB, RADAR, - event cameras, tactile sensors, etc.) by subclassing - ``BaseSensor`` and returning custom ``BaseMeasurement`` types. - -4. **Separation of concerns** - This module defines *contracts*, not implementation logic. - Sensor-specific math, calibration, conversion, or projection - live in their respective modules (e.g., ``lidar.py``, ``camera.py``). - -------------------------------------------------------------------------------- -Summary -------------------------------------------------------------------------------- - -The ``base`` module provides the essential abstraction layer required for -building robust, modular, multi-sensor dynamic scene graphs. It ensures that -all data entering DSG-JIT—regardless of modality—flows through a consistent, -well-structured interface suitable for high-frequency SLAM, perception, -and future real-time scene generation. -""" - -from __future__ import annotations - -from dataclasses import dataclass -from typing import Any, List, Optional - -from dsg_jit.core.types import Factor -from dsg_jit.world.model import WorldModel -from dsg_jit.world.dynamic_scene_graph import DynamicSceneGraph - -@dataclass -class BaseMeasurement: - """ - Generic typed measurement used by all DSG-JIT sensor backends. - - This class represents a single sensor sample emitted by a device or - by a stream wrapper. All concrete sensor measurement types - (e.g., :class:`CameraMeasurement`, :class:`LidarMeasurement`, - :class:`IMUMeasurement`) should inherit from this class. - - :param t: Discrete timestamp or frame index associated with the sample. - :type t: int - :param source: Name of the sensor that produced this measurement - (e.g., ``"front_cam"``, ``"lidar_0"``, ``"imu_main"``). - :type source: str - :param data: Raw modality-specific payload. Subclasses may refine this type. - For example, a camera may store an ndarray, LiDAR may store a point cloud, - and IMU may store (accel, gyro) tuples. - :type data: Any - :param meta: Optional metadata, such as exposure time, pose hints, or flags. - :type meta: dict or None - """ - - t: int - source: str - data: Any - meta: Optional[dict] = None - -@dataclass -class SensorReading: - """ - Lightweight container for a single sensor measurement. - - :param t: Discrete time index or timestamp at which the reading was - taken. For now this is typically an integer matching the DSG's - time index. - :type t: int - :param data: Raw or minimally processed sensor payload. The exact - structure depends on the sensor type (e.g. a scalar range, - a 3D point, an image array, etc.). - :type data: Any - """ - t: int - data: Any - - -class Sensor: - """ - Abstract base class for all DSG-JIT sensors. - - Concrete subclasses implement :meth:`build_factors` to turn a - :class:`SensorReading` into one or more :class:`core.types.Factor` - instances, which can then be added to a :class:`world.model.WorldModel`. - - Sensors are intended to be *stateless* with respect to optimization: - they describe how to map readings into factors, but do not own any - variables themselves. - - Typical usage pattern:: - - sensor = SomeSensor(agent_id="robot0", ...) - reading = SensorReading(t=3, data=raw_measurement) - factors = sensor.build_factors(world_model, dsg, reading) - for f in factors: - world_model.fg.add_factor(f) - - :param name: Human-readable name for this sensor instance. - :type name: str - :param agent_id: Identifier of the agent this sensor is mounted on, - e.g. ``"robot0"``. Used to resolve pose nodes in the DSG. - :type agent_id: str - """ - - def __init__(self, name: str, agent_id: str) -> None: - self.name = name - self.agent_id = agent_id - - # You can keep this as a simple duck-typed interface instead of - # using abc.ABC to avoid import overheads. - def build_factors( - self, - wm: WorldModel, - dsg: DynamicSceneGraph, - reading: SensorReading, - ) -> List[Factor]: - """ - Convert a sensor reading into factor(s) to be added to the world. - - Subclasses must implement this to return a list of - :class:`core.types.Factor` objects whose ``var_ids`` and - ``params`` are consistent with the residuals registered in the - world's factor graph. - - :param wm: World model into which new factors will be added. - :type wm: world.model.WorldModel - :param dsg: Dynamic scene graph providing access to pose node ids - for this sensor's agent at the requested time index. - :type dsg: world.dynamic_scene_graph.DynamicSceneGraph - :param reading: Sensor reading to convert. - :type reading: SensorReading - :return: List of factor objects ready to be added to - :attr:`wm.fg`. - :rtype: list[core.types.Factor] - """ - raise NotImplementedError \ No newline at end of file diff --git a/dsg-jit/dsg_jit/sensors/camera.py b/dsg-jit/dsg_jit/sensors/camera.py deleted file mode 100644 index 9cb1662..0000000 --- a/dsg-jit/dsg_jit/sensors/camera.py +++ /dev/null @@ -1,243 +0,0 @@ -# Copyright (c) 2025. -# This file is part of DSG-JIT, released under the Business Source License 1.1. -""" -Camera sensor abstractions and utilities for DSG-JIT. - -This module defines lightweight, JAX-friendly camera interfaces that can be -plugged into dynamic scene graph (DSG) pipelines. The goal is to provide a -clean separation between: - - * **Raw image acquisition** (e.g., from a hardware driver, simulator, or - prerecorded dataset), and - * **Downstream SLAM / DSG consumers** that only need structured frames - (RGB or grayscale) with timestamps and metadata. - -The module typically exposes: - - * ``CameraFrame``: - A simple dataclass-like structure representing a single image frame. - It usually stores image data (RGB or grayscale), optional depth, and - a timestamp or frame index. - - * ``CameraSensor``: - A wrapper around an arbitrary user-provided capture function. The - capture function may return NumPy arrays or JAX arrays; the wrapper - normalizes these and provides a consistent interface for reading - frames in synchronous loops or via the generic sensor streams. - - * Optional utilities for: - - Converting RGB frames to grayscale. - - Normalizing/typing images for consumption by JAX or downstream - vision modules. - - Integrating with the generic sensor streaming API (synchronous or - asynchronous). - -These camera abstractions are intentionally minimal and do *not* perform -full visual odometry or object detection. Instead, they are designed as -building blocks for higher-level modules (e.g., visual SLAM, semantic -DSG layers) that can interpret camera data and inject new nodes and -factors into the scene graph or factor graph. - -The design philosophy is: - - * Keep camera handling **stateless** where possible. - * Make it easy to wrap *any* existing camera source (OpenCV, ROS, custom - simulator, etc.) behind a small capture function. - * Ensure that integration with DSG-JIT remains explicit and composable, - so users can swap cameras or add multiple sensors without changing - core SLAM logic. -""" - -from typing import Optional, Callable, AsyncIterator, Dict, Any -from dataclasses import dataclass -import numpy as np -import time - - -@dataclass -class CameraIntrinsics: - """ - Pinhole camera intrinsics. - - :param width: Image width in pixels. - :param height: Image height in pixels. - :param fx: Focal length in x direction. - :param fy: Focal length in y direction. - :param cx: Principal point x coordinate. - :param cy: Principal point y coordinate. - """ - width: int - height: int - fx: float - fy: float - cx: float - cy: float - - -@dataclass -class CameraFrame: - """ - A single camera frame. - - :param image: The image array. - :param timestamp: Timestamp of the frame capture. - :param frame_id: Optional frame identifier. - :param color_space: Color space of the image, either "rgb" or "gray". - """ - image: np.ndarray - timestamp: float - frame_id: Optional[str] - color_space: str - -@dataclass -class CameraMeasurement: - """ - High-level camera measurement suitable for feeding into SLAM / DSG layers. - - This wraps a low-level :class:`CameraFrame` with additional metadata - such as sensor ID, extrinsics, and an optional sequence index. It is - intentionally minimal and can be extended by applications as needed. - - :param frame: - The underlying camera frame (image, timestamp, color space, etc.). - :param sensor_id: - Identifier for the camera (e.g., ``"cam0"``). Useful when multiple - cameras are present. - :param T_cam_body: - Optional 4x4 homogeneous transform from body frame to camera frame. - If omitted, downstream consumers may assume an identity transform or - use a configured default. - :param seq: - Optional sequence index (e.g., frame counter) for convenience. - :param metadata: - Optional free-form dictionary for extra information - (exposure, gain, rolling-shutter parameters, etc.). - """ - frame: "CameraFrame" - sensor_id: str = "cam0" - T_cam_body: Optional[np.ndarray] = None - seq: Optional[int] = None - metadata: Optional[Dict[str, Any]] = None - - -def is_rgb(frame: CameraFrame) -> bool: - """ - Check if the frame is in RGB color space. - - :param frame: The camera frame to check. - :returns: True if the frame's color space is "rgb" (case-insensitive), False otherwise. - """ - return frame.color_space.lower() == "rgb" - - -def is_gray(frame: CameraFrame) -> bool: - """ - Check if the frame is in grayscale color space. - - :param frame: The camera frame to check. - :returns: True if the frame's color space is one of "gray", "grey", or "grayscale" (case-insensitive), False otherwise. - """ - return frame.color_space.lower() in ("gray", "grey", "grayscale") - - -def to_grayscale(frame: CameraFrame) -> CameraFrame: - """ - Convert an RGB frame to grayscale. If the frame is already grayscale, returns it unchanged. - - For RGB frames, assumes image shape is (H, W, 3) and applies standard luminance weights [0.299, 0.587, 0.114] - to convert to grayscale with shape (H, W). The returned image is float32 in [0, 1] if the input was uint8. - - :param frame: The input camera frame. - :returns: A new CameraFrame in grayscale color space with the same timestamp and frame_id. - """ - if is_gray(frame): - return frame - img = np.asarray(frame.image) - if img.dtype == np.uint8: - img = img.astype(np.float32) / 255.0 - gray_img = np.dot(img[..., :3], [0.299, 0.587, 0.114]) - return CameraFrame( - image=gray_img, - timestamp=frame.timestamp, - frame_id=frame.frame_id, - color_space="gray" - ) - - -def to_rgb(frame: CameraFrame) -> CameraFrame: - """ - Convert a grayscale frame to RGB by stacking the gray channel three times along the last axis. - If the frame is already RGB, returns it unchanged. - - :param frame: The input camera frame. - :returns: A new CameraFrame in RGB color space with the same timestamp and frame_id. - """ - if is_rgb(frame): - return frame - img = np.asarray(frame.image) - if img.ndim == 2: - rgb_img = np.stack([img, img, img], axis=-1) - else: - rgb_img = img - return CameraFrame( - image=rgb_img, - timestamp=frame.timestamp, - frame_id=frame.frame_id, - color_space="rgb" - ) - - -@dataclass -class SyncCamera: - """ - Synchronous camera source. - - :param intrinsics: Camera intrinsics. - :param read_fn: Callable that returns an image as a numpy array. - :param color_space: Color space of the images produced by read_fn, default is "rgb". - """ - intrinsics: CameraIntrinsics - read_fn: Callable[[], np.ndarray] - color_space: str = "rgb" - - def read(self) -> CameraFrame: - """ - Read a single frame from the camera. - - :returns: A CameraFrame containing the image, current timestamp, no frame_id, and the configured color_space. - """ - img = self.read_fn() - return CameraFrame( - image=img, - timestamp=time.time(), - frame_id=None, - color_space=self.color_space - ) - - -@dataclass -class AsyncCamera: - """ - Asynchronous camera source. - - :param intrinsics: Camera intrinsics. - :param aiter_fn: Callable that returns an async iterator yielding images as numpy arrays. - :param color_space: Color space of the images produced by aiter_fn, default is "rgb". - """ - intrinsics: CameraIntrinsics - aiter_fn: Callable[[], AsyncIterator[np.ndarray]] - color_space: str = "rgb" - - async def frames(self) -> AsyncIterator[CameraFrame]: - """ - Asynchronously iterate over frames from the camera. - - :returns: An async iterator yielding CameraFrame objects with current timestamp, no frame_id, and the configured color_space. - """ - async for img in self.aiter_fn(): - yield CameraFrame( - image=img, - timestamp=time.time(), - frame_id=None, - color_space=self.color_space - ) diff --git a/dsg-jit/dsg_jit/sensors/conversion.py b/dsg-jit/dsg_jit/sensors/conversion.py deleted file mode 100644 index 6a1aae7..0000000 --- a/dsg-jit/dsg_jit/sensors/conversion.py +++ /dev/null @@ -1,904 +0,0 @@ -# Copyright (c) 2025 Tanner Kocher -# SPDX-License-Identifier: Business Source License 1.1 - -""" -Conversion utilities from sensor measurements to factor-graph factors. - -This module implements the "measurement conversion layer" for DSG-JIT: given -typed sensor measurements (IMU, LiDAR, cameras, simple range sensors, etc.), -it produces factor descriptions that can be attached to the core -:class:`core.factor_graph.FactorGraph` or to higher-level world/scene-graph -abstractions. - -The core idea is to keep sensor-facing code and factor-graph-facing code -decoupled: - -* Sensor modules (``sensors.camera``, ``sensors.imu``, ``sensors.lidar``, - ``sensors.streams``, …) produce strongly-typed measurement objects. -* This module converts those measurements into small - :class:`MeasurementFactor` records describing: - - - ``factor_type`` (string key for the residual) - - ``var_ids`` (tuple of variable node ids to connect) - - ``params`` (dictionary passed into the residual function) - -Downstream code can then: - -* Construct :class:`core.types.Factor` objects from these records. -* Call :meth:`core.factor_graph.FactorGraph.add_factor`. -* Or wrap them in higher-level helpers in :mod:`world.scene_graph`. - -This keeps sensor integration "plug-and-play" while preserving a clean, -minimal interface for the optimization engine. -""" - -from __future__ import annotations - -from dataclasses import dataclass -from typing import Any, Dict, Iterable, List, Sequence, Tuple, Mapping, Optional - -import jax.numpy as jnp -import time -import numpy as np - -from dsg_jit.core.factor_graph import FactorGraph -from dsg_jit.core.types import Factor - - -from dsg_jit.sensors.camera import CameraMeasurement, CameraFrame -from dsg_jit.sensors.imu import IMUMeasurement -from dsg_jit.sensors.lidar import LidarMeasurement - - -@dataclass -class MeasurementFactor: - """ - Lightweight description of a factor generated from a sensor measurement. - - This is intentionally decoupled from :class:`core.types.Factor` so that - the conversion layer does not depend on internal factor-graph details. - Call :func:`measurement_factors_to_graph_factors` to turn these into - concrete :class:`Factor` instances, or use - :func:`apply_measurement_factors` to add them directly to a - :class:`FactorGraph`. - - :param factor_type: String key for the residual function - (e.g. ``"range_1d"``, ``"pose_landmark_bearing"``, - ``"voxel_point_obs"``, etc.). This must correspond to a type - previously registered via - :meth:`core.factor_graph.FactorGraph.register_residual`. - :type factor_type: str - :param var_ids: Tuple of variable node ids that this factor connects, - in the order expected by the residual function. - :type var_ids: tuple[int, ...] - :param params: Parameter dictionary passed into the residual function. - Entries are typically NumPy/JAX arrays or scalar floats - (e.g. ``{"measurement": ..., "sigma": ...}``). - :type params: dict[str, Any] - """ - - factor_type: str - var_ids: Tuple[int, ...] - params: Dict[str, Any] - - -# --------------------------------------------------------------------------- -# Range / 1D distance measurements -# --------------------------------------------------------------------------- - -def range_1d_to_factor( - pose_id: int, - place_id: int, - distance: float, - sigma: float = 0.05, - factor_type: str = "range_1d", -) -> MeasurementFactor: - """ - Convert a scalar range measurement (1D) into a factor description. - - This is the generic bridge used by simple range sensors (e.g. the - 1D range DSG experiment), where you have: - - pose_x - place_x ≈ distance - - and a residual function of type ``factor_type`` that expects a - ``"measurement"`` and optionally a ``"sigma"`` or ``"weight"`` in - its parameter dictionary. - - :param pose_id: Node id of the pose variable in the factor graph. - :type pose_id: int - :param place_id: Node id of the place or landmark variable in the - factor graph. - :type place_id: int - :param distance: Measured distance between the pose and the place. - :type distance: float - :param sigma: Measurement noise standard deviation. If your residual - uses ``weight`` instead, you can convert this using - ``1.0 / (sigma ** 2)`` when creating the factor. - :type sigma: float - :param factor_type: String key for the residual function. This must - match a factor type registered into the :class:`FactorGraph`, - for example ``"range_1d"``. - :type factor_type: str - :return: A :class:`MeasurementFactor` describing the range constraint. - :rtype: MeasurementFactor - """ - params: Dict[str, Any] = { - "measurement": jnp.array([distance], dtype=jnp.float32), - "sigma": float(sigma), - } - return MeasurementFactor( - factor_type=factor_type, - var_ids=(pose_id, place_id), - params=params, - ) - - -# --------------------------------------------------------------------------- -# Bearing measurements (camera-like, 2D/3D directions) -# --------------------------------------------------------------------------- - -def bearing_to_factor( - pose_id: int, - landmark_id: int, - bearing_vec: jnp.ndarray, - sigma: float = 0.01, - factor_type: str = "pose_landmark_bearing", -) -> MeasurementFactor: - """ - Convert a bearing vector to a factor description. - - This is suitable for camera-like observations where you have a unit - bearing vector from the camera pose to a landmark in either camera - or world coordinates, and a residual function that enforces angular - consistency between the predicted bearing and the measured one. - - :param pose_id: Node id of the camera/pose variable. - :type pose_id: int - :param landmark_id: Node id of the landmark/point variable. - :type landmark_id: int - :param bearing_vec: Measured bearing direction as a vector. This - should typically be normalized (unit length) and have shape - ``(2,)`` or ``(3,)``, depending on the residual implementation. - :type bearing_vec: jax.numpy.ndarray - :param sigma: Measurement noise standard deviation in angular units - (radians or an equivalent bearing metric). - :type sigma: float - :param factor_type: String key for the residual function (e.g. - ``"pose_landmark_bearing"``). Must match a registered factor - type in the :class:`FactorGraph`. - :type factor_type: str - :return: A :class:`MeasurementFactor` describing the bearing - constraint. - :rtype: MeasurementFactor - """ - bearing_vec = jnp.asarray(bearing_vec, dtype=jnp.float32) - params: Dict[str, Any] = { - "bearing_meas": bearing_vec, - "sigma": float(sigma), - } - return MeasurementFactor( - factor_type=factor_type, - var_ids=(pose_id, landmark_id), - params=params, - ) - - -# --------------------------------------------------------------------------- -# Camera measurements (bearing-style observations) -# --------------------------------------------------------------------------- - -def camera_bearings_to_factors( - pose_id: int, - landmark_ids: Sequence[int], - measurement: CameraMeasurement, - sigma: float = 0.01, - factor_type: str = "pose_landmark_bearing", -) -> List[MeasurementFactor]: - """ - Convert a :class:`sensors.camera.CameraMeasurement` containing bearing - directions into a list of measurement factors. - - This helper assumes that the camera front-end has already extracted - unit bearing vectors from image data (e.g. via feature detection and - calibration), and that these bearings have been associated with a set - of landmark ids. For each ``landmark_id`` and corresponding row in - ``measurement.bearings``, we construct a bearing factor using - :func:`bearing_to_factor`. - - :param pose_id: Node id of the camera/pose variable in the factor graph. - :type pose_id: int - :param landmark_ids: Sequence of landmark node ids, one per bearing - vector in ``measurement.bearings``. - :type landmark_ids: Sequence[int] - :param measurement: Camera measurement containing an array of bearing - vectors in ``measurement.bearings`` with shape ``(N, D)`` where - ``D`` is typically 2 or 3. - :type measurement: CameraMeasurement - :param sigma: Bearing noise standard deviation passed through to - :func:`bearing_to_factor`. - :type sigma: float - :param factor_type: Factor type string for the bearing residual, - usually ``"pose_landmark_bearing"``. - :type factor_type: str - :return: List of :class:`MeasurementFactor` objects, one per - (pose, landmark) bearing observation. - :rtype: list[MeasurementFactor] - :raises ValueError: If the number of landmark ids does not match the - number of bearing vectors stored in the measurement. - """ - bearings = jnp.asarray(measurement.bearings, dtype=jnp.float32) - if bearings.ndim != 2: - raise ValueError( - f"measurement.bearings must have shape (N, D), got {bearings.shape}" - ) - if len(landmark_ids) != bearings.shape[0]: - raise ValueError( - f"len(landmark_ids)={len(landmark_ids)} does not match " - f"measurement.bearings.shape[0]={bearings.shape[0]}" - ) - - factors: List[MeasurementFactor] = [] - for lid, b in zip(landmark_ids, bearings): - factors.append( - bearing_to_factor( - pose_id=pose_id, - landmark_id=int(lid), - bearing_vec=b, - sigma=sigma, - factor_type=factor_type, - ) - ) - return factors - - -# --------------------------------------------------------------------------- -# Voxel / point-based observations (e.g. LiDAR, depth, point clouds) -# --------------------------------------------------------------------------- - -def voxel_point_obs_factor( - voxel_id: int, - point_world: jnp.ndarray, - sigma: float = 0.05, - factor_type: str = "voxel_point_obs", -) -> MeasurementFactor: - """ - Convert a single 3D point observation into a voxel-point factor. - - This is intended for mapping-style sensors (LiDAR, depth cameras, - RGB-D, stereo) where you receive one or more 3D points in world - coordinates and want to attach them to a voxel cell center. - - :param voxel_id: Node id of the voxel variable (``voxel_cell``) - in the factor graph. - :type voxel_id: int - :param point_world: Observed 3D point in world coordinates with shape - ``(3,)``. - :type point_world: jax.numpy.ndarray - :param sigma: Noise level for this observation in world units - (meters, etc.). - :type sigma: float - :param factor_type: Factor type string (e.g. ``"voxel_point_obs"``) - corresponding to the voxel-point residual used in your - measurement model. - :type factor_type: str - :return: A :class:`MeasurementFactor` describing the voxel-point - observation. - :rtype: MeasurementFactor - """ - point_world = jnp.asarray(point_world, dtype=jnp.float32).reshape(3,) - params: Dict[str, Any] = { - "point_world": point_world, - "sigma": float(sigma), - } - return MeasurementFactor( - factor_type=factor_type, - var_ids=(voxel_id,), - params=params, - ) - - -def lidar_scan_to_voxel_factors( - voxel_ids: Sequence[int], - points_world: jnp.ndarray, - sigma: float = 0.05, - factor_type: str = "voxel_point_obs", -) -> List[MeasurementFactor]: - """ - Convert a LiDAR point cloud into per-voxel observation factors. - - This helper assumes that a pre-processing step has already: - - * Projected the LiDAR ranges into 3D points in world coordinates. - * Associated each point with a voxel id (e.g. via a voxel grid index). - - Given a list of voxel node ids and a matching array of 3D points, - this function returns one :class:`MeasurementFactor` per point. - - :param voxel_ids: Iterable of voxel node ids, one per point. - :type voxel_ids: Sequence[int] - :param points_world: Array of 3D points in world coordinates with - shape ``(N, 3)``, where ``N == len(voxel_ids)``. - :type points_world: jax.numpy.ndarray - :param sigma: Noise level for each point in world units. - :type sigma: float - :param factor_type: Factor type string used for all voxel-point - factors, typically ``"voxel_point_obs"``. - :type factor_type: str - :return: A list of :class:`MeasurementFactor` objects, one for each - point/voxel pair. - :rtype: list[MeasurementFactor] - """ - points_world = jnp.asarray(points_world, dtype=jnp.float32) - if points_world.ndim != 2 or points_world.shape[1] != 3: - raise ValueError( - f"points_world must have shape (N, 3), got {points_world.shape}" - ) - if len(voxel_ids) != points_world.shape[0]: - raise ValueError( - f"voxel_ids length {len(voxel_ids)} does not match " - f"points_world.shape[0] {points_world.shape[0]}" - ) - - factors: List[MeasurementFactor] = [] - for vid, pt in zip(voxel_ids, points_world): - factors.append( - voxel_point_obs_factor( - voxel_id=int(vid), - point_world=pt, - sigma=sigma, - factor_type=factor_type, - ) - ) - return factors - - -# --------------------------------------------------------------------------- -# LiDAR scan measurements -# --------------------------------------------------------------------------- - -def lidar_measurement_to_voxel_factors( - measurement: LidarMeasurement, - sigma: float = 0.05, - factor_type: str = "voxel_point_obs", -) -> List[MeasurementFactor]: - """ - Convert a :class:`sensors.lidar.LidarMeasurement` into voxel-point - observation factors. - - This helper assumes that the LiDAR front-end has already projected raw - ranges into 3D world coordinates and, optionally, associated each - point with a voxel id. If ``measurement.voxel_ids`` is provided, it is - used directly; otherwise, the caller is expected to supply voxel - associations separately. - - Concretely, this is a thin wrapper around - :func:`lidar_scan_to_voxel_factors`, using - ``measurement.points_world`` and ``measurement.voxel_ids``. - - :param measurement: LiDAR measurement containing a point cloud in - world coordinates and optional voxel assignments. - :type measurement: LidarMeasurement - :param sigma: Noise level for each point in world units, forwarded to - :func:`voxel_point_obs_factor`. - :type sigma: float - :param factor_type: Factor type string used for all voxel-point - factors, typically ``"voxel_point_obs"``. - :type factor_type: str - :return: List of :class:`MeasurementFactor` objects describing the - LiDAR point cloud constraints. - :rtype: list[MeasurementFactor] - :raises ValueError: If ``measurement.voxel_ids`` is ``None`` or its - length does not match the number of points. - """ - points_world = jnp.asarray(measurement.points_world, dtype=jnp.float32) - if points_world.ndim != 2 or points_world.shape[1] != 3: - raise ValueError( - f"measurement.points_world must have shape (N, 3), got {points_world.shape}" - ) - - if measurement.voxel_ids is None: - raise ValueError( - "measurement.voxel_ids is None; voxel associations are required " - "to build voxel-point factors." - ) - - voxel_ids_seq: Sequence[int] = list(measurement.voxel_ids) - if len(voxel_ids_seq) != points_world.shape[0]: - raise ValueError( - f"len(measurement.voxel_ids)={len(voxel_ids_seq)} does not match " - f"measurement.points_world.shape[0]={points_world.shape[0]}" - ) - - return lidar_scan_to_voxel_factors( - voxel_ids=voxel_ids_seq, - points_world=points_world, - sigma=sigma, - factor_type=factor_type, - ) - - -# --------------------------------------------------------------------------- -# IMU measurements (placeholder / future preintegration) -# --------------------------------------------------------------------------- - -def imu_to_factors_placeholder( - pose_ids: Sequence[int], - imu_meas: IMUMeasurement, -) -> List[MeasurementFactor]: - """ - Placeholder conversion from IMU measurement to factor descriptions. - - In a full SLAM system, IMU data is typically handled via *preintegration* - over multiple high-rate samples to produce a single inertial factor - between two poses. That logic is non-trivial and highly application - specific, so this function serves as a placeholder and example. - - For now, it returns an empty list and is intended to be replaced with - a proper preintegration pipeline in future work. - - :param pose_ids: Sequence of pose node ids between which an IMU factor - would be created (e.g. previous pose id and current pose id). - :type pose_ids: Sequence[int] - :param imu_meas: A single IMU measurement containing accelerometer and - gyroscope readings along with a timestamp. - :type imu_meas: IMUMeasurement - :return: An empty list. Replace with application-specific IMU factor - generation as needed. - :rtype: list[MeasurementFactor] - """ - _ = pose_ids, imu_meas # avoid unused variable warnings - return [] - - -# --------------------------------------------------------------------------- -# Helpers to apply MeasurementFactor objects to a FactorGraph -# --------------------------------------------------------------------------- - -def measurement_factors_to_graph_factors( - meas_factors: Iterable[MeasurementFactor], -) -> List[Factor]: - """ - Convert a sequence of :class:`MeasurementFactor` objects to concrete - :class:`core.types.Factor` instances. - - Unique factor ids are generated using a simple running index; if you - need stable ids, you can post-process the factors or construct - them manually instead. - - :param meas_factors: Iterable of :class:`MeasurementFactor` objects - returned by the conversion helpers in this module. - :type meas_factors: Iterable[MeasurementFactor] - :return: A list of :class:`Factor` instances suitable for adding to - a :class:`FactorGraph`. - :rtype: list[Factor] - """ - graph_factors: List[Factor] = [] - for idx, mf in enumerate(meas_factors): - fid = f"meas_{idx}" - graph_factors.append( - Factor( - id=fid, - type=mf.factor_type, - var_ids=tuple(mf.var_ids), - params=dict(mf.params), - ) - ) - return graph_factors - - -def apply_measurement_factors( - fg: FactorGraph, - meas_factors: Iterable[MeasurementFactor], -) -> None: - """ - Add a sequence of measurement-derived factors to a factor graph. - - This is a thin convenience wrapper around - :func:`measurement_factors_to_graph_factors` and - :meth:`core.factor_graph.FactorGraph.add_factor`. - - :param fg: The factor graph to which the new factors should be added. - :type fg: FactorGraph - :param meas_factors: Iterable of :class:`MeasurementFactor` objects. - :type meas_factors: Iterable[MeasurementFactor] - :return: This function has no return value; it mutates ``fg`` in-place - by adding new factors. - :rtype: None - """ - for factor in measurement_factors_to_graph_factors(meas_factors): - fg.add_factor(factor) - -# --------------------------------------------------------------------------- -# IMU → SE3 delta / odometry-style increment -# --------------------------------------------------------------------------- - -def integrate_imu_delta( - imu: IMUMeasurement, - dt: float, - gravity: jnp.ndarray | None = None, -) -> jnp.ndarray: - """ - Integrate a single IMU sample into a small SE(3) increment (se(3) vector). - - This is a **very** simple, single-step integrator meant as a starting - point / placeholder. For real applications, a proper preintegration - scheme or filter should be used instead. - - The returned 6D vector is in the form:: - - [dtx, dty, dtz, drx, dry, drz] - - where ``dt*`` is the approximate translational displacement in the IMU - frame and ``dr*`` is a small-angle approximation for the rotation. - - :param imu: IMU measurement containing linear acceleration and angular - velocity in the sensor frame. - :param dt: Time step in seconds between this sample and the previous one. - :param gravity: Optional gravity vector (in sensor frame). If provided, - it is subtracted from the measured acceleration before integration. - If ``None``, no gravity compensation is performed. - :returns: A length-6 JAX array representing a small SE(3) increment in - the IMU frame. - """ - acc = jnp.asarray(imu.accel, dtype=jnp.float32) - gyro = jnp.asarray(imu.gyro, dtype=jnp.float32) - - if gravity is not None: - gravity = jnp.asarray(gravity, dtype=jnp.float32) - acc = acc - gravity - - # Very crude single-step integration: - # v ≈ a * dt - # p ≈ 0.5 * a * dt^2 - # θ ≈ ω * dt - dp = 0.5 * acc * (dt ** 2) - dtheta = gyro * dt - - return jnp.concatenate([dp, dtheta], axis=0) - - -# --------------------------------------------------------------------------- -# RANGE → 3D point -# --------------------------------------------------------------------------- - -@dataclass -class RangeMeasurement: - """ - Simple 1D range measurement along a known unit ray. - - :param distance: Measured distance along the ray, in meters. - :param ray_dir: Unit direction vector in the sensor frame, shape (3,). - """ - distance: float - ray_dir: jnp.ndarray - - -def range_to_point_sensor(m: RangeMeasurement) -> jnp.ndarray: - """ - Convert a range measurement into a 3D point in the sensor frame. - - :param m: Range measurement with a scalar distance and unit ray direction - in the sensor frame. - :returns: A 3D point (shape ``(3,)``) in the sensor frame. - """ - d = float(m.distance) - dir_s = jnp.asarray(m.ray_dir, dtype=jnp.float32) - return d * dir_s - - -def range_to_point_world( - m: RangeMeasurement, - T_world_sensor: jnp.ndarray, -) -> jnp.ndarray: - """ - Convert a range measurement into a 3D point in world coordinates. - - This assumes you already have the sensor pose ``T_world_sensor`` as a - homogeneous transform matrix of shape ``(4, 4)``. The point is first - constructed in the sensor frame and then transformed into the world. - - :param m: Range measurement in the sensor frame. - :param T_world_sensor: Homogeneous transform from sensor to world, - shape ``(4, 4)``. - :returns: A 3D point (shape ``(3,)``) in world coordinates. - """ - p_s = range_to_point_sensor(m) - p_s_h = jnp.concatenate([p_s, jnp.array([1.0], dtype=jnp.float32)], axis=0) - p_w_h = T_world_sensor @ p_s_h - return p_w_h[:3] - - -# --------------------------------------------------------------------------- -# LIDAR → point cloud -# --------------------------------------------------------------------------- - -def lidar_scan_to_points_sensor( - scan: LidarMeasurement, -) -> jnp.ndarray: - """ - Convert a planar LiDAR scan into 3D points in the sensor frame. - - This function assumes a 2D planar LiDAR mounted in the ``x-y`` plane, - with all points lying at ``z=0`` in the sensor frame:: - - x = r * cos(theta) - y = r * sin(theta) - z = 0 - - :param scan: LiDAR measurement containing per-beam ranges and angles. - :returns: Array of points of shape ``(N, 3)`` in the sensor frame. - """ - ranges = jnp.asarray(scan.ranges, dtype=jnp.float32) - angles = jnp.asarray(scan.angles, dtype=jnp.float32) - - x = ranges * jnp.cos(angles) - y = ranges * jnp.sin(angles) - z = jnp.zeros_like(x) - - return jnp.stack([x, y, z], axis=-1) - - -def lidar_scan_to_points_world( - scan: LidarMeasurement, - T_world_sensor: jnp.ndarray, -) -> jnp.ndarray: - """ - Convert a planar LiDAR scan into 3D points in world coordinates. - - :param scan: LiDAR measurement in the sensor frame. - :param T_world_sensor: Homogeneous transform from sensor to world, - shape ``(4, 4)``. - :returns: Array of points of shape ``(N, 3)`` in world coordinates. - """ - pts_s = lidar_scan_to_points_sensor(scan) # (N, 3) - ones = jnp.ones((pts_s.shape[0], 1), dtype=jnp.float32) - pts_s_h = jnp.concatenate([pts_s, ones], axis=1) # (N, 4) - pts_w_h = (T_world_sensor @ pts_s_h.T).T # (N, 4) - return pts_w_h[:, :3] - - -# --------------------------------------------------------------------------- -# CAMERA → rays / bearings -# --------------------------------------------------------------------------- - -def pixel_to_ray_camera( - cam: CameraMeasurement, - u: float, - v: float, -) -> jnp.ndarray: - """ - Convert a pixel coordinate into a normalized ray in the camera frame. - - The intrinsics are assumed to follow the usual pinhole model:: - - x = (u - cx) / fx - y = (v - cy) / fy - ray_cam = [x, y, 1] - ray_cam /= ||ray_cam|| - - :param cam: Camera measurement object providing intrinsics. - :param u: Pixel x-coordinate (column index). - :param v: Pixel y-coordinate (row index). - :returns: A unit 3D vector (shape ``(3,)``) in the camera frame. - """ - fx = float(cam.intrinsics.fx) - fy = float(cam.intrinsics.fy) - cx = float(cam.intrinsics.cx) - cy = float(cam.intrinsics.cy) - - x = (u - cx) / fx - y = (v - cy) / fy - ray = jnp.array([x, y, 1.0], dtype=jnp.float32) - ray = ray / jnp.linalg.norm(ray) - return ray - - -def pixels_to_rays_camera( - cam: CameraMeasurement, - pixels: Iterable[Tuple[float, float]], -) -> jnp.ndarray: - """ - Convert a collection of pixel coordinates into rays in the camera frame. - - :param cam: Camera measurement object providing intrinsics. - :param pixels: Iterable of ``(u, v)`` pixel coordinates. - :returns: Array of unit 3D vectors of shape ``(N, 3)`` in the camera frame. - """ - rays = [pixel_to_ray_camera(cam, u, v) for (u, v) in pixels] - return jnp.stack(rays, axis=0) - - -def camera_depth_to_points_sensor( - cam: CameraMeasurement, - depth: jnp.ndarray, -) -> jnp.ndarray: - """ - Convert a depth image into a 3D point cloud in the camera frame. - - The depth image is assumed to have the same width/height as specified - by the camera intrinsics. For each pixel ``(u, v)`` with depth ``d``, - we compute a ray via :func:`pixel_to_ray_camera` and place the point - at ``d * ray``. - - :param cam: Camera measurement with intrinsics. - :param depth: Depth map of shape ``(H, W)`` in meters. - :returns: Array of points of shape ``(H*W, 3)`` in the camera frame. - Invalid or zero depths are skipped. - """ - H, W = depth.shape - points = [] - - for v in range(H): - for u in range(W): - d = float(depth[v, u]) - if d <= 0.0: - continue - ray = pixel_to_ray_camera(cam, float(u), float(v)) - p = d * ray - points.append(p) - - if not points: - return jnp.zeros((0, 3), dtype=jnp.float32) - - return jnp.stack(points, axis=0) - -# Conversion helpers - -def raw_sample_to_camera_measurement( - sample: Mapping[str, Any], - sensor_id: str = "cam0", - T_cam_body: Optional[np.ndarray] = None, - seq: Optional[int] = None, -) -> CameraMeasurement: - """ - Convert a raw camera sample dictionary into a CameraMeasurement. - - Expected keys in ``sample``: - - - ``"t"`` (optional): float timestamp. If missing, the current time is used. - - ``"frame_id"`` (optional): string frame identifier. - - ONE of the following image-like entries (optional): - * ``"image"``: (H, W) or (H, W, 3) array-like. - - Optional directional data (for feature-based SLAM): - * ``"bearings"``: (N, 3) array-like of unit directions. - * ``"dirs"``: (N, 3) array-like. - * ``"rays"``: (N, 3) array-like. - - Any directional data is stored in the returned measurement's ``metadata`` - under the corresponding key (e.g. ``metadata["bearings"]``). - - :param sample: - Raw sample dictionary produced by a sensor stream (e.g. ReadingStream - or FunctionStream) in the experiments. - :param sensor_id: - Identifier for this camera (e.g. ``"cam0"``). - :param T_cam_body: - Optional 4x4 homogeneous transform from body frame to camera frame. - :param seq: - Optional sequence index (frame counter). - - :returns: - A :class:`CameraMeasurement` containing a :class:`CameraFrame` plus - any directional data in the ``metadata`` field. - """ - # Timestamp / frame id - t = float(sample.get("t", time.time())) - frame_id = sample.get("frame_id", None) - - # Image (optional) - if "image" in sample: - img = np.asarray(sample["image"]) - else: - # If no image is present, create a dummy 1x1 grayscale image. - # This lets us still pass a valid CameraFrame downstream. - img = np.zeros((1, 1), dtype=np.float32) - - # Decide color space from shape - if img.ndim == 3 and img.shape[-1] == 3: - color_space = "rgb" - else: - color_space = "gray" - - frame = CameraFrame( - image=img, - timestamp=t, - frame_id=frame_id, - color_space=color_space, - ) - - # Collect directional data (bearings, dirs, rays) into metadata - metadata: Dict[str, Any] = {} - - for key in ("bearings", "dirs", "rays"): - if key in sample: - metadata[key] = np.asarray(sample[key]) - - # Optionally keep any other extra keys under metadata["extra"] - for k, v in sample.items(): - if k not in ("t", "frame_id", "image", "bearings", "dirs", "rays"): - metadata.setdefault("extra", {})[k] = v - - if not metadata: - metadata = None # keep clean if there is nothing to store - - return CameraMeasurement( - frame=frame, - sensor_id=sensor_id, - T_cam_body=T_cam_body, - seq=seq, - metadata=metadata, - ) - -def raw_sample_to_lidar_measurement(sample: Mapping[str, Any]) -> LidarMeasurement: - """Convert a raw LiDAR sample dictionary into a :class:`LidarMeasurement`. - - Expected keys in ``sample``: - - - ``"ranges"``: 1D array-like of LiDAR ranges. (required) - - ``"angles"``: 1D array-like of bearing angles (radians), same length as ``ranges``. (optional) - - ``"directions"``: (N, 3) array-like of direction vectors. (optional) - - ``"t"`` (optional): float timestamp. - - ``"frame_id"`` (optional): string frame identifier. - - This is consistent with the rest of the LiDAR utilities in this module, - which assume a planar LiDAR described by ``ranges`` and ``angles``. - - :param sample: Raw sample dictionary produced by a sensor stream. - :returns: A :class:`LidarMeasurement` instance. - :raises KeyError: If required keys are missing from the sample. - """ - if "ranges" not in sample: - raise KeyError("raw_sample_to_lidar_measurement expected 'ranges' in sample") - - ranges = jnp.array(sample["ranges"], dtype=jnp.float32) - - # Determine directions (dirs) if present, else compute from angles if present, else None - dirs = None - if "directions" in sample: - dirs = jnp.array(sample["directions"], dtype=jnp.float32) - elif "angles" in sample: - angles = jnp.array(sample["angles"], dtype=jnp.float32) - # Planar lidar: directions in the x-y plane - dirs = jnp.stack( - [jnp.cos(angles), jnp.sin(angles), jnp.zeros_like(angles)], axis=-1 - ) - else: - dirs = None - - return LidarMeasurement( - ranges=ranges, - directions=dirs, - t=sample.get("t", None), - frame_id=sample.get("frame_id", "lidar"), - metadata=None, - ) - -def raw_sample_to_imu_measurement(sample: Mapping[str, Any]) -> IMUMeasurement: - """ - Convert a raw dict from a stream into an IMUMeasurement. - - Expected keys in ``sample``: - - - "t": float timestamp (seconds). - - "accel": array-like linear acceleration in the IMU frame. - - "gyro": array-like angular velocity in the IMU frame. - - "dt" (optional): time step in seconds since the previous sample. - If omitted, a default value is used. - - :param sample: Raw sample dictionary produced by a sensor stream. - :return: An :class:`IMUMeasurement` instance. - """ - t = float(sample.get("t", 0.0)) - accel = jnp.array(sample["accel"], dtype=jnp.float32) - gyro = jnp.array(sample["gyro"], dtype=jnp.float32) - - # Use provided dt if available, otherwise fall back to a reasonable default - dt = float(sample.get("dt", 0.1)) - - return IMUMeasurement( - timestamp=t, - accel=accel, - gyro=gyro, - dt=dt, - ) \ No newline at end of file diff --git a/dsg-jit/dsg_jit/sensors/fusion.py b/dsg-jit/dsg_jit/sensors/fusion.py deleted file mode 100644 index 943f2d7..0000000 --- a/dsg-jit/dsg_jit/sensors/fusion.py +++ /dev/null @@ -1,487 +0,0 @@ -# Copyright (c) 2025. -# This file is part of DSG-JIT, released under the Business Source License 1.1. - -"""Sensor fusion utilities for DSG-JIT. - -This module provides a small, opinionated *fusion core* that sits between -raw sensor streams (e.g. LiDAR, cameras, IMUs) and the rest of the -DSG-JIT world/scene-graph stack. - -The goals of this layer are: - -- Provide a common abstraction to register named sensors and pull data - from synchronous streams. -- Convert raw samples into strongly-typed measurement objects defined in - :mod:`sensors.camera`, :mod:`sensors.lidar`, and :mod:`sensors.imu`. -- Dispatch the resulting measurements to user-provided callbacks that - can update a :class:`world.model.WorldModel` or - :class:`world.dynamic_scene_graph.DynamicSceneGraph`. - -This is intentionally lightweight: - -- It does **not** assume any particular factor-graph structure. -- It does **not** run its own background threads or event loops. -- It is designed so that experiments can start simple (single-threaded - polling) while leaving room to grow into a more complex async or - multi-rate fusion system later on. - -Typical usage from an experiment:: - - from sensors.streams import ReadingStream - from sensors.camera import CameraMeasurement - from sensors.lidar import LidarMeasurement - from sensors.fusion import SensorFusionManager - - fusion = SensorFusionManager() - - # 1) Register a camera and a lidar stream - fusion.register_sensor( - name="cam0", - modality="camera", - stream=ReadingStream(camera_read_fn), - ) - fusion.register_sensor( - name="lidar0", - modality="lidar", - stream=ReadingStream(lidar_read_fn), - ) - - # 2) Connect fusion to the world model via a callback - def on_measurement(meas): - if isinstance(meas, CameraMeasurement): - # add a bearing / reprojection factor, etc. - ... - elif isinstance(meas, LidarMeasurement): - # add range factors or occupancy updates - ... - - fusion.register_callback(on_measurement) - - # 3) Poll sensors in your main loop - for step in range(100): - fusion.poll_once() - # run optimization, render, etc. - -In the future we can add: - -- Async helpers that drive :class:`sensors.streams.AReadingStream`. -- Tighter integration with the dynamic scene graph (e.g. per-agent queues). -- Higher-level policies (e.g. downsampling, time sync, etc.). -""" - -from __future__ import annotations - -import jax.numpy as jnp - -from dataclasses import dataclass, field -from typing import Any, Callable, Dict, Iterable, List, Optional -from typing import TYPE_CHECKING - -from dsg_jit.sensors.camera import CameraMeasurement -from dsg_jit.sensors.lidar import LidarMeasurement -from dsg_jit.sensors.imu import IMUMeasurement - -if TYPE_CHECKING: - from dsg_jit.world.model import WorldModel - -from dsg_jit.sensors.streams import BaseSensorStream, ReadingStream -from dsg_jit.sensors.base import BaseMeasurement - -from dsg_jit.sensors.conversion import ( - raw_sample_to_camera_measurement, - raw_sample_to_lidar_measurement, - imu_to_factors_placeholder, -) - -@dataclass -class FusedPoseEstimate: - """ - Fused SE(3) pose estimate produced by SensorFusionManager. - - :param t: Time index (discrete step or timestamp) associated with this - estimate. - :param pose_se3: 6D se(3) pose vector in world coordinates, typically - ``(tx, ty, tz, rx, ry, rz)``. - :param covariance: Optional 6x6 covariance matrix for the fused pose. - :param source_counts: Optional dictionary tracking how many measurements - contributed from each sensor type (e.g. ``{"imu": 10, "lidar": 2}``). - """ - - t: float | int - pose_se3: jnp.ndarray - covariance: Optional[jnp.ndarray] = None - source_counts: Dict[str, int] = field(default_factory=dict) - -@dataclass -class RegisteredSensor: - """Bookkeeping structure for a registered sensor. - - :param name: Logical name of the sensor (e.g. ``"lidar0"``). - :param modality: String describing the modality, e.g. ``"camera"``, - ``"lidar"``, or ``"imu"``. This is used to choose a default - converter when one is not provided. - :param stream: Underlying sensor stream used to pull raw samples. - :param converter: Function that maps a raw sample from ``stream`` to - a :class:`~sensors.base.BaseMeasurement` instance. - """ - - name: str - modality: str - stream: BaseSensorStream - converter: Callable[[Any], BaseMeasurement] - - -class SensorFusionManager: - """Central registry and dispatcher for sensor measurements. - - The fusion manager is intentionally minimal: it owns no threads and does - not know about factor graphs or optimization. It simply: - - - Keeps track of registered sensors. - - Polls synchronous streams on demand. - - Converts raw samples to measurement objects. - - Broadcasts those measurements to user callbacks. - - Experiments are free to use this in a tight, single-threaded loop, or - to build more advanced async / multi-rate infrastructure on top. - - :param default_callbacks: Optional iterable of callbacks to register - at construction time. Each callback will be called as - ``callback(measurement)`` whenever a new - :class:`~sensors.base.BaseMeasurement` is produced. - """ - - def __init__( - self, - default_callbacks: Optional[Iterable[Callable[[BaseMeasurement], None]]] = None, - world_model: Optional["WorldModel"] = None, - auto_register_world_callbacks: bool = True, - ) -> None: - """Create a new fusion manager. - - :param default_callbacks: Optional iterable of callbacks to register - immediately. Each callback will be invoked as ``cb(measurement)`` - for every measurement produced by :meth:`poll_once` or - :meth:`push_measurement`. - :param world_model: Optional :class:`world.model.WorldModel` instance - to be associated with this fusion manager. If provided and - ``auto_register_world_callbacks`` is ``True``, the manager will - automatically register per-modality callbacks that forward - measurements into the world model. - :param auto_register_world_callbacks: If ``True`` and ``world_model`` - is not ``None``, register default world-model callbacks for - camera, LiDAR, and IMU measurements (when the corresponding - handler methods exist on the world model). - """ - - self._sensors: Dict[str, RegisteredSensor] = {} - self._callbacks: List[Callable[[BaseMeasurement], None]] = [] - self._fused_history: List[ - tuple[float | int, jnp.ndarray, Optional[jnp.ndarray], Dict[str, int]] - ] = [] - self._world_model: Optional["WorldModel"] = world_model - - if default_callbacks is not None: - for cb in default_callbacks: - self.register_callback(cb) - - if self._world_model is not None and auto_register_world_callbacks: - # Automatically hook camera / LiDAR / IMU measurements into the - # world model using any available handler methods. - self.attach_world_model(self._world_model, register_default_callbacks=True) - def record_fused_pose( - self, - t: float | int, - pose_se3: jnp.ndarray, - covariance: Optional[jnp.ndarray] = None, - source_counts: Optional[Dict[str, int]] = None, - ) -> None: - """ - Append a fused SE(3) pose estimate to the internal history buffer. - - This does not perform any fusion by itself; instead it allows an - external estimator (e.g. an EKF or factor-graph solver) to publish - its current best pose into the fusion manager so that callers can - retrieve it via :meth:`get_latest_pose`. - - :param t: Time index or timestamp associated with the estimate. - :param pose_se3: 6D se(3) pose vector in world coordinates. - :param covariance: Optional 6x6 covariance matrix for the estimate. - :param source_counts: Optional dictionary describing how many - measurements from each sensor type contributed to this pose. - """ - if source_counts is None: - source_counts = {} - self._fused_history.append((t, pose_se3, covariance, source_counts)) - - # ------------------------------------------------------------------ - # World model integration hooks - # ------------------------------------------------------------------ - - def attach_world_model( - self, - world_model: "WorldModel", - register_default_callbacks: bool = True, - ) -> None: - """Attach a :class:`world.model.WorldModel` to this fusion manager. - - When a world model is attached and ``register_default_callbacks`` is - ``True``, the manager will install per-modality callbacks that route - camera, LiDAR, and IMU measurements into the world model using any - available handler methods. - - Expected handler names on ``world_model`` are, for example: - - * ``apply_camera_measurement`` or ``add_camera_measurement`` - * ``apply_lidar_measurement`` or ``add_lidar_measurement`` - * ``apply_imu_measurement`` or ``add_imu_measurement`` - - Only handlers that actually exist and are callable will be used. - - :param world_model: World model instance to associate. - :param register_default_callbacks: If ``True``, automatically - register default callbacks that forward measurements into the - world model. - """ - - self._world_model = world_model - if register_default_callbacks: - self._register_world_callbacks() - - @staticmethod - def _resolve_world_handler(world_model: Any, names: List[str]) -> Optional[Callable[..., Any]]: - """Return the first callable attribute on ``world_model`` matching ``names``. - - :param world_model: World model object to inspect. - :param names: Ordered list of attribute names to try. - :return: The first callable attribute found, or ``None`` if no - suitable handler exists. - """ - - for name in names: - if hasattr(world_model, name): - fn = getattr(world_model, name) - if callable(fn): - return fn - return None - - def _register_world_callbacks(self) -> None: - """Register per-modality callbacks that forward into the world model. - - This inspects the attached world model (if any) for known handler - methods and wraps them in measurement-type checks. It is safe to - call repeatedly; new callbacks are simply appended to the internal - callback list. - """ - - wm = self._world_model - if wm is None: - return - - # Camera measurements - cam_handler = self._resolve_world_handler( - wm, - ["apply_camera_measurement", "add_camera_measurement"], - ) - if cam_handler is not None: - - def _cb_camera(meas: BaseMeasurement) -> None: - if isinstance(meas, CameraMeasurement): - cam_handler(meas) - - self.register_callback(_cb_camera) - - # LiDAR measurements - lidar_handler = self._resolve_world_handler( - wm, - ["apply_lidar_measurement", "add_lidar_measurement"], - ) - if lidar_handler is not None: - - def _cb_lidar(meas: BaseMeasurement) -> None: - if isinstance(meas, LidarMeasurement): - lidar_handler(meas) - - self.register_callback(_cb_lidar) - - # IMU measurements - imu_handler = self._resolve_world_handler( - wm, - ["apply_imu_measurement", "add_imu_measurement"], - ) - if imu_handler is not None: - - def _cb_imu(meas: BaseMeasurement) -> None: - if isinstance(meas, IMUMeasurement): - imu_handler(meas) - - self.register_callback(_cb_imu) - - # ------------------------------------------------------------------ - # Registration and configuration - # ------------------------------------------------------------------ - - def register_sensor( - self, - name: str, - modality: str, - stream: BaseSensorStream, - converter: Optional[Callable[[Any], BaseMeasurement]] = None, - ) -> None: - """Register a new sensor with the fusion manager. - - If ``converter`` is not provided, a default converter will be - chosen based on ``modality``. - - :param name: Logical sensor name, e.g. ``"cam0"`` or ``"lidar_front"``. - :param modality: Modality string (``"camera"``, ``"lidar"``, - ``"imu"``, or a custom value). Custom values must provide an - explicit ``converter``. - :param stream: Sensor stream object used to read raw samples. - :param converter: Optional function that converts raw samples from - ``stream`` into measurement objects. - - :raises ValueError: If a sensor with the same name is already - registered, or if no default converter exists for the given - modality and no explicit converter is provided. - """ - - if name in self._sensors: - raise ValueError(f"Sensor '{name}' is already registered") - - if converter is None: - converter = self._infer_default_converter(modality) - - self._sensors[name] = RegisteredSensor( - name=name, - modality=modality, - stream=stream, - converter=converter, - ) - - def _infer_default_converter(self, modality: str) -> Callable[[Any], BaseMeasurement]: - """Return a default converter for a given modality. - - :param modality: Modality string such as ``"camera"``, ``"lidar"``, - or ``"imu"``. - :return: Callable that converts raw samples for the given modality - into measurement objects. - - :raises ValueError: If a default converter is not known for the - given modality. - """ - - m = modality.lower() - if m == "camera": - return raw_sample_to_camera_measurement - if m == "lidar": - return raw_sample_to_lidar_measurement - if m == "imu": - return imu_to_factors_placeholder - - raise ValueError( - "No default converter for modality '" + modality + "'. " - "Please provide an explicit converter when registering this sensor." - ) - - def register_callback(self, callback: Callable[[BaseMeasurement], None]) -> None: - """Register a callback to receive all fused measurements. - - :param callback: Function that accepts a - :class:`~sensors.base.BaseMeasurement` instance. It will be - called for *every* measurement produced by - :meth:`poll_once` or :meth:`push_measurement`. - """ - - self._callbacks.append(callback) - - # ------------------------------------------------------------------ - # Core polling / dispatch API - # ------------------------------------------------------------------ - - def poll_once(self) -> int: - """Poll all registered *synchronous* streams exactly once. - - For each sensor whose stream is an instance of - :class:`~sensors.streams.ReadingStream`, this method will: - - 1. Call ``stream.read()`` to obtain a raw sample. - 2. If a non-``None`` sample is returned, convert it to a - measurement via the registered converter. - 3. Dispatch the measurement to all registered callbacks. - - Asynchronous streams are not handled here; they can be consumed - separately using their own ``async for`` loops. - - :return: The total number of measurements produced and - dispatched during this call. - """ - - count = 0 - for rs in self._sensors.values(): - if not isinstance(rs.stream, ReadingStream): - # Async streams are handled outside of this helper. - continue - - sample = rs.stream.read() - if sample is None: - continue - - meas = rs.converter(sample) - self._dispatch(meas) - count += 1 - - return count - - def push_measurement(self, measurement: BaseMeasurement) -> None: - """Inject a pre-constructed measurement into the fusion pipeline. - - This is useful when the experiment already builds measurement - objects directly (e.g. from a simulator) and only wants to reuse - the callback dispatch mechanism. - - :param measurement: Measurement instance to dispatch to all - registered callbacks. - """ - - self._dispatch(measurement) - - def get_latest_pose(self) -> Optional[FusedPoseEstimate]: - """ - Return the most recent fused SE(3) pose estimate, if available. - - This is a thin convenience wrapper used by integration layers - (e.g. world models or dynamic scene graphs) to pull a single, - canonical pose update out of the fusion buffer. - - :returns: A :class:`FusedPoseEstimate` instance if any fused result - has been produced, or ``None`` if the manager has not yet - emitted a pose. - """ - if not self._fused_history: - return None - - # Assuming you internally store a list of (t, pose, cov, meta). - t, pose_se3, cov, source_counts = self._fused_history[-1] - - return FusedPoseEstimate( - t=t, - pose_se3=pose_se3, - covariance=cov, - source_counts=source_counts, - ) - - # ------------------------------------------------------------------ - # Internal helpers - # ------------------------------------------------------------------ - - def _dispatch(self, measurement: BaseMeasurement) -> None: - """Call all registered callbacks with ``measurement``. - - :param measurement: Measurement instance to forward to callbacks. - """ - - for cb in self._callbacks: - cb(measurement) - - diff --git a/dsg-jit/dsg_jit/sensors/imu.py b/dsg-jit/dsg_jit/sensors/imu.py deleted file mode 100644 index 52975f4..0000000 --- a/dsg-jit/dsg_jit/sensors/imu.py +++ /dev/null @@ -1,214 +0,0 @@ -# Copyright (c) 2025. -# This file is part of DSG-JIT, released under the Business Source License 1.1. -""" -Inertial measurement unit (IMU) abstractions and simple integration utilities. - -This module provides a small set of IMU-related types and helpers that make -it easy to wire IMU data into DSG-JIT experiments and factor graphs. The -focus is on **structured measurements** and **simple integration**, rather -than on a full production-grade inertial navigation system. - -The module typically exposes: - - * ``IMUMeasurement``: - A lightweight container for a single IMU sample, including body-frame - linear acceleration, angular velocity, and an associated ``dt`` (time - delta). A timestamp can also be stored for logging or alignment with - other sensors. - - * ``IMUSensor``: - A thin wrapper around a user-defined sampling function. The sampling - function returns ``IMUMeasurement`` instances, and the wrapper - provides: - - A ``read()`` method for synchronous polling. - - An iterator interface for use in simple loops. - - Compatibility with the generic sensor streaming helpers. - - * ``integrate_imu_naive``: - A toy integrator that demonstrates how to accumulate IMU measurements - into a position and velocity estimate. It assumes that: - - Acceleration is already expressed in the world frame, and - - Gravity has been compensated externally. - This function is intended for didactic experiments and not as a - replacement for a full IMU preintegration pipeline. - -Usage patterns: - - * Wrap any IMU source (hardware driver, simulator, dataset) with an - ``IMUSensor`` so that you always work with ``IMUMeasurement`` objects. - * For demonstration or unit tests, use ``integrate_imu_naive`` to produce - rough pose/velocity estimates and then inject those as odometry factors - into a factor graph. - * For more advanced use cases, replace the naive integrator with a - preintegration module, but keep the same ``IMUMeasurement`` and - ``IMUSensor`` interfaces so the rest of the system remains unchanged. - -The design goal is to make IMU handling: - - * Explicit and transparent (no hidden global state). - * Composable with other sensors (cameras, range sensors, etc.). - * Easy to plug into the existing DSG-JIT world model and optimization - stack without requiring changes to core solvers. -""" - -from __future__ import annotations - -from dataclasses import dataclass -from typing import Iterable, Optional, Protocol - -import jax.numpy as jnp -import numpy as np - -@dataclass -class IMUMeasurement: - """Single IMU sample consisting of specific force and angular velocity. - - This is a lightweight container for raw IMU readings that can be produced by - hardware drivers, simulators, or log readers and then consumed by the - factor-graph/DSG layers. - - The convention assumed here is: - - * ``accel`` is specific force in the IMU body frame, in ``m/s^2``. - * ``gyro`` is angular velocity in the IMU body frame, in ``rad/s``. - * ``dt`` is the time delta since the previous sample, in seconds. - * ``timestamp`` is an optional absolute time in seconds. - - :param accel: Linear acceleration in the IMU/body frame, shape ``(3,)``. - :param gyro: Angular velocity in the IMU/body frame, shape ``(3,)``. - :param dt: Time delta since the previous sample, in seconds. - :param timestamp: Optional absolute timestamp in seconds. - """ - - accel: jnp.ndarray - gyro: jnp.ndarray - dt: float - timestamp: Optional[float] = None - - def as_numpy(self) -> tuple["np.ndarray", "np.ndarray"]: # type: ignore[name-defined] - """Return the acceleration and gyro as NumPy arrays. - - This is a small convenience helper for users who want to interoperate - with NumPy-based tooling. If NumPy is not available, a :class:`RuntimeError` - is raised. - - :return: Tuple ``(accel_np, gyro_np)`` as NumPy arrays. - :raises RuntimeError: If NumPy is not installed or import failed. - """ - - if np is None: # type: ignore[truthy-function] - raise RuntimeError("NumPy is not available in this environment.") - return np.asarray(self.accel), np.asarray(self.gyro) - - -class IMUSampleFn(Protocol): - """Protocol for callables that produce :class:`IMUMeasurement` samples. - - This is primarily used to type-annotate IMU sensor wrappers. - - :return: A new :class:`IMUMeasurement` instance. - """ - - def __call__(self) -> IMUMeasurement: - ... - - -# ``BaseSensor`` is optional here to keep this module usable in isolation. -try: # pragma: no cover - import is environment dependent - from .base import BaseSensor # type: ignore -except Exception: # pragma: no cover - BaseSensor = object # type: ignore[misc,assignment] - - -class IMUSensor(BaseSensor): # type: ignore[misc] - """Generic IMU sensor wrapper. - - This class adapts any callable that returns :class:`IMUMeasurement` into the - common sensor interface used by DSG-JIT. It is intentionally minimal: it does - not attempt to manage threads, buffering, or synchronization—those concerns - are handled by the sensor stream utilities in :mod:`sensors.streams`. - - :param name: Human-readable sensor name. - :param sample_fn: Callable producing a single :class:`IMUMeasurement` per call. - """ - - def __init__(self, name: str, sample_fn: IMUSampleFn) -> None: - # ``BaseSensor`` may or may not define an ``__init__``; call it if present. - if hasattr(super(), "__init__"): - try: - super().__init__(name=name) # type: ignore[call-arg] - except TypeError: - # Fallback if BaseSensor has a different signature. - super().__init__() # type: ignore[misc] - - self.name: str = name - self._sample_fn: IMUSampleFn = sample_fn - - def read(self) -> IMUMeasurement: - """Read a single IMU measurement. - - This will typically perform a blocking read from a hardware device, a - simulator, or a log file, depending on how ``sample_fn`` is implemented. - - :return: The next :class:`IMUMeasurement` sample. - """ - - return self._sample_fn() - - def __iter__(self) -> Iterable[IMUMeasurement]: - """Create an iterator over IMU samples. - - This is primarily useful when coupling the sensor with an asynchronous - or synchronous sensor stream helper that consumes an iterator. - - :return: Infinite iterator yielding :class:`IMUMeasurement` objects. - """ - - while True: - yield self.read() - - -def integrate_imu_naive( - measurements: Iterable[IMUMeasurement], - v0: Optional[jnp.ndarray] = None, - p0: Optional[jnp.ndarray] = None, -) -> tuple[jnp.ndarray, jnp.ndarray]: - """Naively integrate IMU measurements to update position and velocity. - - This helper performs a very simple, orientation-agnostic integration of a - sequence of IMU samples. It assumes that the acceleration vectors are - already expressed in the world frame and that gravity has been compensated - for. As such, **it is not a replacement for proper IMU preintegration**, but - it is convenient for toy 1D/3D experiments and testing the data flow from - sensors into the optimizer. - - The integration scheme is: - - .. math:: - - v_{k+1} &= v_k + a_k \\delta t_k\\ - p_{k+1} &= p_k + v_{k+1} \\delta t_k\\ - - :param measurements: Iterable of :class:`IMUMeasurement` objects, in time order. - :param v0: Optional initial velocity, shape ``(3,)``. Defaults to zeros. - :param p0: Optional initial position, shape ``(3,)``. Defaults to zeros. - :return: Tuple ``(p, v)`` with the final position and velocity. - """ - - if v0 is None: - v = jnp.zeros(3, dtype=jnp.float32) - else: - v = jnp.asarray(v0, dtype=jnp.float32) - - if p0 is None: - p = jnp.zeros(3, dtype=jnp.float32) - else: - p = jnp.asarray(p0, dtype=jnp.float32) - - for meas in measurements: - a = jnp.asarray(meas.accel, dtype=jnp.float32) - dt = float(meas.dt) - v = v + a * dt - p = p + v * dt - - return p, v diff --git a/dsg-jit/dsg_jit/sensors/integration.py b/dsg-jit/dsg_jit/sensors/integration.py deleted file mode 100644 index fdc09ab..0000000 --- a/dsg-jit/dsg_jit/sensors/integration.py +++ /dev/null @@ -1,89 +0,0 @@ -# Copyright (c) 2025. -# This file is part of DSG-JIT, released under the Business Source License 1.1. -""" -Helpers for wiring sensor fusion results into DSG-JIT world models. - -This module provides small, stateless utilities that take fused pose -estimates from :mod:`sensors.fusion` and apply them to a -:class:`world.model.WorldModel` or dynamic scene graph. - -By keeping this logic in a separate integration layer, we avoid coupling -the sensor stack directly to the optimization core, while still making it -very easy for users to build real-time or batch pipelines. -""" - -from typing import Optional - -import jax.numpy as jnp - -from dsg_jit.world.model import WorldModel -from dsg_jit.sensors.fusion import SensorFusionManager, FusedPoseEstimate - - -def apply_fused_pose_to_world( - world: WorldModel, - fusion: SensorFusionManager, - agent_id: str, - t: int, - fused: Optional[FusedPoseEstimate] = None, -) -> int: - """ - Apply a fused SE(3) pose estimate to the world model for a given agent. - - If a pose for ``(agent_id, t)`` already exists in the world, this function - **updates** its value in-place. Otherwise, it **creates** a new agent pose - variable via :meth:`world.model.WorldModel.add_agent_pose`. - - :param world: World model whose factor graph should be updated. - :param fusion: Sensor fusion manager providing fused pose estimates. - :param agent_id: String identifier for the agent (e.g. ``"robot0"``). - :param t: Discrete timestep index for this update. - :param fused: Optional fused pose estimate. If ``None``, this function - calls :meth:`SensorFusionManager.get_latest_pose` internally. - :returns: The integer node id (``int(NodeId)``) corresponding to the - agent's pose variable at time ``t``. - :raises ValueError: If no fused estimate is available. - """ - if fused is None: - fused = fusion.get_latest_pose() - - if fused is None: - raise ValueError("No fused pose estimate available to apply to world.") - - pose_vec = jnp.asarray(fused.pose_se3) - - # Check if we already have a pose for this (agent, t). - if agent_id in world.agent_pose_ids and t in world.agent_pose_ids[agent_id]: - nid = world.agent_pose_ids[agent_id][t] - world.fg.variables[nid].value = pose_vec - else: - nid = world.add_agent_pose(agent_id=agent_id, t=t, value=pose_vec) - - return int(nid) - - -def apply_trajectory_to_world( - world: WorldModel, - agent_id: str, - trajectory: dict[int, jnp.ndarray], -) -> None: - """ - Bulk-apply a discrete trajectory to the world model as agent poses. - - This is useful for offline pipelines, where you already have a fused - trajectory (e.g. from a batch fusion run) and simply want to seed or - refresh the world model with those states. - - :param world: World model to update. - :param agent_id: String identifier for the agent. - :param trajectory: Mapping from timestep ``t`` to 6D se(3) pose vectors - in world coordinates. - :returns: ``None``. All updates are applied in-place. - """ - for t, pose_vec in sorted(trajectory.items()): - pose_vec = jnp.asarray(pose_vec) - if agent_id in world.agent_pose_ids and t in world.agent_pose_ids[agent_id]: - nid = world.agent_pose_ids[agent_id][t] - world.fg.variables[nid].value = pose_vec - else: - world.add_agent_pose(agent_id=agent_id, t=t, value=pose_vec) \ No newline at end of file diff --git a/dsg-jit/dsg_jit/sensors/lidar.py b/dsg-jit/dsg_jit/sensors/lidar.py deleted file mode 100644 index ebe9802..0000000 --- a/dsg-jit/dsg_jit/sensors/lidar.py +++ /dev/null @@ -1,217 +0,0 @@ -# Copyright (c) 2025. -# This file is part of DSG-JIT, released under the Business Source License 1.1. -""" -LiDAR sensor abstractions and utilities for DSG-JIT. - -This module defines a lightweight representation of LiDAR data—either -1D range scans, 2D planar scans, or sparse 3D point samples—and provides -a simple, JAX-friendly interface for integrating such scans into the -factor graph or dynamic scene graph layers. - -The module includes: - - * **LiDARScan** - A minimal container representing a single LiDAR measurement: - - Ranges (1D, 2D, or 3D depending on sensor type) - - Optional beam angles or directions - - Optional timestamp - The structure is intentionally simple so that downstream algorithms - (e.g., geometry-based localization, place association, or occupancy - grid inference) can build on top of it without being locked into a - particular LiDAR model. - - * **LiDARSensor** - A wrapper around any user-provided LiDAR capture function. This allows - plugging in real hardware, ROS topics, simulation engines, or synthetic - data generators. The wrapper returns ``LiDARScan`` objects and is fully - compatible with both synchronous and asynchronous streaming utilities. - - * **Helper transforms** - Utilities for: - - Converting range/angle scans into 2D or 3D point clouds. - - Projecting LiDAR points into world coordinates given a robot pose. - - Preparing LiDAR-derived factors for integration into the factor - graph (e.g., bearing-range residuals, scan-matching constraints). - -Design philosophy: - - * Keep the LiDAR model minimal and general. - * Allow users to choose how scans translate into DSG elements (places, - objects, layout nodes, occupancy voxels). - * Cleanly interoperate with the broader DSG-JIT world model, enabling: - - scan→point cloud→place association - - scan→object detection integration - - scan→unknown-space discovery - - scan→range constraints between robot poses and map nodes - -This module intentionally avoids tying LiDAR data to a specific SLAM method -(e.g., ICP, NDT, LOAM); instead, it provides a consistent base layer for -future plugins and extensions. -""" - -from __future__ import annotations - -from dataclasses import dataclass -from typing import Dict, List, Optional - -import jax.numpy as jnp - -from dsg_jit.core.types import Factor -from dsg_jit.slam.measurements import sigma_to_weight -from dsg_jit.world.model import WorldModel -from dsg_jit.world.dynamic_scene_graph import DynamicSceneGraph -from dsg_jit.world.scene_graph import SceneGraphWorld -from dsg_jit.sensors.base import Sensor, SensorReading - -@dataclass -class LidarMeasurement: - """ - Lightweight container for a single LiDAR scan. - - This structure is deliberately minimal and JAX-friendly. It can be used - for 1D range scans, 2D planar scans, or sparse 3D point samples, and - is meant to serve as an intermediate representation between raw sensor - data and DSG-JIT factors (e.g., range/bearing residuals, voxel updates). - - :param ranges: - LiDAR ranges in sensor coordinates. - - Typical shapes: - - * 1D scan: ``(N,)`` where each entry is a range sample. - * 2D grid: ``(H, W)`` for image-like range sensors. - * Sparse 3D: ``(N,)`` used together with ``directions`` below. - - Values are typically in meters. - :type ranges: jax.numpy.ndarray - - :param directions: - Optional unit vectors giving the direction of each range sample in - the sensor frame. - - Typical shapes: - - * 1D scan: ``(N, 3)`` where each row is a 3D direction vector. - * 2D grid: ``(H, W, 3)`` matching the shape of ``ranges``. - - If ``None``, downstream code is expected to infer directions based - on sensor intrinsics (e.g., azimuth/elevation tables or a pinhole - camera model). - :type directions: Optional[jax.numpy.ndarray] - - :param t: - Optional timestamp (e.g., in seconds). This can be used to align the - measurement with the dynamic scene graph time index or other sensors. - :type t: Optional[float] - - :param frame_id: - Identifier for the sensor frame from which this measurement was - taken (e.g. ``"lidar_front"``). This is useful when a robot carries - multiple LiDAR units or when extrinsic calibration is maintained - per frame. - :type frame_id: str - - :param metadata: - Optional dictionary for any additional information (e.g., intensity - values, per-beam noise estimates, or scan ID). This field is not - used by the core library but can be useful for higher-level - perception algorithms. - :type metadata: Optional[dict] - """ - - ranges: jnp.ndarray - directions: Optional[jnp.ndarray] = None - t: Optional[float] = None - frame_id: str = "lidar" - metadata: Optional[dict] = None - -@dataclass -class RangeSensorConfig: - """ - Configuration for a simple range-only sensor. - - :param noise_sigma: Standard deviation of the range measurement, in - the same units as the world coordinates (typically meters). - :type noise_sigma: float - :param target_node: Node id of the 3D target in the - :class:`SceneGraphWorld` (e.g. a room or object center). - :type target_node: int - """ - noise_sigma: float - target_node: int - - -class RangeSensor(Sensor): - """ - Simple range-only sensor attached to an agent. - - Given a :class:`SensorReading` whose ``data`` field is a scalar - range, this sensor produces a single ``"range"`` factor connecting - the agent's pose at time ``t`` to a 3D target node. - - It expects: - - * the agent trajectory to be registered in the - :class:`DynamicSceneGraph` under the same ``agent_id`` used to - construct this sensor, and - * the target node id to be present in the underlying - :class:`SceneGraphWorld`. - - The factor uses :func:`slam.measurements.range_residual` and is - registered under the factor type ``"range"``. - """ - - def __init__( - self, - name: str, - agent_id: str, - config: RangeSensorConfig, - sg: SceneGraphWorld, - ) -> None: - super().__init__(name=name, agent_id=agent_id) - self.config = config - self.sg = sg # used to look up the target's variable index - - def build_factors( - self, - wm: WorldModel, - dsg: DynamicSceneGraph, - reading: SensorReading, - ) -> List[Factor]: - """ - Build a single range factor from a reading. - - :param wm: World model whose factor graph already contains the - agent poses and the target node declared in the config. - :type wm: world.model.WorldModel - :param dsg: Dynamic scene graph providing the pose node id for - ``(agent_id, reading.t)``. - :type dsg: world.dynamic_scene_graph.DynamicSceneGraph - :param reading: Range measurement; ``reading.data`` is assumed - to be a scalar float. - :type reading: sensors.base.SensorReading - :return: List containing a single ``"range"`` factor. - :rtype: list[core.types.Factor] - """ - t = reading.t - range_meas = float(reading.data) - - # Resolve pose node id from the DSG trajectory. - pose_nid = dsg.world.pose_trajectory[(self.agent_id, t)] - - # Variable ids: [pose, target] - var_ids = (pose_nid, self.config.target_node) - - # Params for range_residual. - params: Dict[str, jnp.ndarray] = { - "range": jnp.array(range_meas, dtype=jnp.float32), - "weight": sigma_to_weight(self.config.noise_sigma), - } - - factor = Factor( - id=len(wm.fg.factors), - type="range", - var_ids=var_ids, - params=params, - ) - return [factor] \ No newline at end of file diff --git a/dsg-jit/dsg_jit/sensors/streams.py b/dsg-jit/dsg_jit/sensors/streams.py deleted file mode 100644 index 16f99a6..0000000 --- a/dsg-jit/dsg_jit/sensors/streams.py +++ /dev/null @@ -1,269 +0,0 @@ -# Copyright (c) 2025. -# This file is part of DSG-JIT, released under the Business Source License 1.1. -""" -Generic sensor streaming utilities for DSG-JIT. - -This module provides a unified abstraction layer for handling live or -simulated sensor data streams—synchronous or asynchronous—so that all -sensor types (IMU, LiDAR, cameras, range sensors, etc.) can plug into -DSG-JIT’s dynamic scene graph pipeline without requiring unique logic for -each device. - -The key goal is to decouple **how data is produced** (polling, callbacks, -asynchronous feeds) from **how DSG-JIT consumes that data** (factor graph -updates, node creation, temporal linking). - -The module typically defines: - - * **SynchronousStreams** - - A minimal wrapper around a sensor object that exposes a ``read()`` - method. - - Designed for simple for-loops or offline dataset playback. - - Ensures each call returns a typed measurement (e.g., IMUMeasurement, - CameraFrame, LiDARScan). - - * **AsynchronousStreams** - - Provides asyncio-based background tasks that fetch sensor data - without blocking the main SLAM/DSG loop. - - Each sensor type is polled in a separate coroutine, and the latest - measurement is stored internally for consumption. - - Enables future real-time extensions (e.g., multi-sensor fusion, - asynchronous factor graph updates). - - * **StreamHandle** - - An ergonomic wrapper exposing: - ``.latest()`` – retrieve most recent measurement. - ``.reset()`` – clear buffer. - ``.close()`` – stop background tasks. - - Useful for synchronized fusion pipelines where multiple sensors must - provide data simultaneously. - -Design philosophy: - - * Treat *all* sensors uniformly—same streaming API regardless of modality. - * Allow both synchronous and asynchronous execution with zero change to - downstream SLAM/DSG logic. - * Enable future “plug-and-play” sensor integration for real robots, - simulators, prerecorded logs, or unit test data. - -This module does not itself perform SLAM or scene graph updates—it only -defines the mechanism by which sensors deliver data into such pipelines. -""" - -from typing import Iterator, AsyncIterator, Optional, Any -from abc import ABC -from dsg_jit.sensors.base import SensorReading - -class BaseSensorStream(ABC): - """ - Abstract base class for all sensor streams used by DSG-JIT. - - This defines the minimal interface that synchronous and asynchronous - sensor streams must support so they can be registered with - :class:`sensors.fusion.SensorFusionManager`. - - Subclasses may implement *either* sync or async behavior: - - - Synchronous streams must implement :meth:`read`. - - Asynchronous streams must implement :meth:`__aiter__`. - - Implementations may choose to support both, but it is not required. - """ - - def read(self) -> Optional[Any]: - """ - Return the next sample from the stream, or ``None`` if no sample - is currently available. - - Sync-only streams MUST implement this. Async-only streams may raise - ``NotImplementedError``. - """ - raise NotImplementedError("This stream does not support sync read().") - - def __aiter__(self) -> AsyncIterator[Any]: - """ - Asynchronous iteration interface. Async-only streams MUST implement - this. Sync-only streams may raise ``NotImplementedError``. - """ - raise NotImplementedError("This stream does not support async iteration.") - - def close(self) -> None: - """ - Optional cleanup hook. - - Streams that hold system resources (files, sockets, hardware handles) - should override this to release them. - """ - return None - -class ReadingStream(BaseSensorStream): - """Synchronous stream of :class:`SensorReading` objects. - - Concrete subclasses implement :meth:`__iter__` to yield readings one by one. - - Typical usage:: - - stream = FileRangeStream("ranges.txt") - for reading in stream: - process(reading) - """ - - def __init__(self) -> None: - """ - Initialize a new reading stream. - - Subclasses typically only need to override :meth:`__iter__`. This base - constructor sets up internal state for :meth:`read`. - """ - self._iter: Optional[Iterator[SensorReading]] = None - - def read(self) -> Optional[SensorReading]: - """ - Return the next sensor reading from the stream, or ``None`` if the - underlying iterator is exhausted. - - This method adapts the iterator protocol (``__iter__``) to the - :class:`BaseSensorStream` synchronous ``read`` API, so that - :class:`~sensors.fusion.SensorFusionManager` and other callers can - treat all synchronous streams uniformly. - - :return: Next sensor reading, or ``None`` if no further data is - available. - :rtype: Optional[SensorReading] - """ - if self._iter is None: - self._iter = iter(self) - try: - return next(self._iter) - except StopIteration: - return None - - def __iter__(self) -> Iterator[SensorReading]: - """Iterate over sensor readings. - - :return: Iterator over :class:`SensorReading` objects. - :rtype: Iterator[SensorReading] - """ - raise NotImplementedError - -class FunctionStream(ReadingStream): - """ - Synchronous stream that pulls samples from a user-provided callback - function instead of a file or iterator. - - The callback must return either: - - a SensorReading - - None (to indicate end of data) - """ - - def __init__(self, fn): - super().__init__() - self.fn = fn - - def __iter__(self): - while True: - out = self.fn() - if out is None: - return - yield out - -class FileRangeStream(ReadingStream): - """Synchronous stream backed by a plain-text file. - - Each line in the file is interpreted as a single floating-point range - measurement. The line index is used as the time step ``t``. - - :param path: Path to a text file containing one numeric value per line. - :type path: str - """ - - def __init__(self, path: str): - """Initialize the file-backed range stream. - - :param path: Path to a text file containing one numeric value per line. - :type path: str - """ - super().__init__() - self.path = path - - def __iter__(self) -> Iterator[SensorReading]: - """Yield sensor readings from the underlying file. - - Each yielded reading has ``t`` equal to the zero-based line index and - ``data`` equal to the parsed floating-point value. - - :return: Iterator over :class:`SensorReading` objects. - :rtype: Iterator[SensorReading] - """ - with open(self.path) as f: - for t, line in enumerate(f): - yield SensorReading(t=t, data=float(line.strip())) - - -class AsyncReadingStream(BaseSensorStream): - """Asynchronous stream of :class:`SensorReading` objects. - - Subclasses implement :meth:`__aiter__` to provide an async iterator over - sensor readings. This is useful when integrating with an asyncio-based - event loop or real-time sensor drivers. - """ - - def __aiter__(self) -> AsyncIterator[SensorReading]: - """Return an asynchronous iterator over sensor readings. - - :return: Asynchronous iterator over :class:`SensorReading` objects. - :rtype: AsyncIterator[SensorReading] - """ - raise NotImplementedError - - -import asyncio - - -class AsyncFileRangeStream(AsyncReadingStream): - """Asynchronous file-backed range stream. - - This behaves like :class:`FileRangeStream`, but exposes an async iterator - interface so it can be consumed with ``async for``. An optional ``delay`` - can be used to simulate a sensor publishing at a fixed rate. - - .. note:: - - File I/O is still performed using the standard blocking ``open`` call. - For small logs and simple simulations this is usually fine, but for - large files or strict real-time requirements you may prefer to replace - this with a true async file reader. - - :param path: Path to a text file containing one numeric value per line. - :type path: str - :param delay: Optional delay between consecutive readings, in seconds. - :type delay: float or None - """ - - def __init__(self, path: str, delay: Optional[float] = None): - """Initialize the asynchronous file-backed range stream. - - :param path: Path to a text file containing one numeric value per line. - :type path: str - :param delay: Optional delay between consecutive readings, in seconds. - :type delay: float or None - """ - self.path = path - self.delay = delay - - async def __aiter__(self) -> AsyncIterator[SensorReading]: - """Asynchronously iterate over sensor readings. - - If ``delay`` is set, the coroutine sleeps for that many seconds - between consecutive readings. - - :return: Asynchronous iterator over :class:`SensorReading` objects. - :rtype: AsyncIterator[SensorReading] - """ - # Note: we use regular file I/O here; for most offline logs this is - # sufficient and keeps the dependency surface small. - with open(self.path) as f: - for t, line in enumerate(f): - if self.delay is not None: - await asyncio.sleep(self.delay) - yield SensorReading(t=t, data=float(line.strip())) \ No newline at end of file