Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 197 additions & 1 deletion src/ingestion/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,208 @@ def build_telemetry_segments(
return tuple(segments)


# ---------------------------------------------------------------------------
# Streaming JSON tokenizer — low-memory ingestion via ijson
# ---------------------------------------------------------------------------
# The functions below accept a *binary* file-like object (or any object whose
# ``read()`` returns bytes) and parse it incrementally using ijson's SAX-style
# event emitter. No complete JSON document is ever held in memory; only the
# current candidate frame dict is assembled and immediately yielded once it is
# recognised as a ticker frame.
#
# Supported top-level shapes (mirrors the in-memory parser above):
# • A single frame object: {"asset_id": ..., "price": ..., "timestamp": ...}
# • A container wrapping a batch:
# {"frames": [...]} / {"tickers": [...]} / {"data": {"tickers": [...]}}
# • An array of frame objects at the root.
#
# Attribute extraction is deliberately limited to the same key priority lists
# (_ASSET_KEYS, _PRICE_KEYS, etc.) used by the existing in-memory parser so
# both paths produce identical output for the same data.
# ---------------------------------------------------------------------------

import io
from typing import BinaryIO


def _parse_frame_dict(
frame: dict[str, object],
*,
drop_invalid: bool,
) -> TelemetryTuple | None:
"""Attempt to parse *frame* as a ticker frame dict.

Returns a :data:`TelemetryTuple` on success, or ``None`` when the mapping
does not look like a ticker frame and *drop_invalid* is ``True``.
"""
if not _looks_like_frame(frame):
if drop_invalid:
return None
raise ValueError(f"Unsupported ticker payload shape: {frame!r}")
try:
return _flatten_frame(frame)
except (TypeError, ValueError):
if drop_invalid:
return None
raise


def iter_price_events_from_stream(
source: BinaryIO | bytes,
*,
drop_invalid: bool = False,
) -> Iterator[TelemetryTuple]:
"""Yield ticker tuples by streaming *source* through an ijson tokenizer.

Parameters
----------
source:
A binary file-like object (``read()`` must return ``bytes``) **or** a
raw ``bytes`` / ``bytearray`` blob. The data must be valid JSON.
drop_invalid:
When ``True``, frames that fail validation are silently skipped.
When ``False`` (default), a :exc:`ValueError` or :exc:`TypeError` is
raised on the first invalid frame, matching the behaviour of
:func:`iter_flat_ticker_tuples`.

Yields
------
TelemetryTuple
``(asset_id, price, timestamp, sequence, flags)`` for each recognised
ticker frame found anywhere in the JSON structure.

Notes
-----
* Memory usage is bounded by the size of a single frame dict, regardless
of overall document size.
* ``ijson`` is imported lazily so callers that never use the streaming path
do not pay the import cost.
"""
try:
import ijson # type: ignore[import-untyped]
except ModuleNotFoundError as exc: # pragma: no cover
raise ModuleNotFoundError(
"Streaming JSON ingestion requires 'ijson'. "
"Install it with: pip install ijson"
) from exc

if isinstance(source, (bytes, bytearray)):
source = io.BytesIO(source)

# We use ijson's low-level ``parse`` iterator which yields
# ``(prefix, event, value)`` triples. A ``prefix`` is a dot-separated
# path that encodes the nesting level, e.g. ``"frames.item.price"``.
#
# Strategy:
# 1. Walk the event stream; whenever we enter an object (``map_key`` /
# ``start_map`` / ``end_map`` events) we accumulate key-value pairs.
# 2. On ``end_map`` we check whether the accumulated dict looks like a
# ticker frame and emit it.
# 3. We maintain a simple depth-tracked stack so nested objects within a
# frame (if any) do not confuse the collector.

# Stack entries: (depth_at_open, partial_dict)
object_stack: list[tuple[int, dict[str, object]]] = []
current_key: str | None = None
depth: int = 0

for prefix, event, value in ijson.parse(source, use_float=True):
if event == "start_map":
depth += 1
object_stack.append((depth, {}))
current_key = None

elif event == "end_map":
if object_stack:
open_depth, frame_dict = object_stack.pop()
depth -= 1

# Only attempt frame extraction for objects that were opened
# at depth 1 greater than the enclosing context. This means
# we emit on the *innermost* complete object that matches,
# which correctly handles both bare frames and nested batches.
if _looks_like_frame(frame_dict):
result = _parse_frame_dict(frame_dict, drop_invalid=drop_invalid)
if result is not None:
yield result
else:
depth -= 1

current_key = None

elif event == "map_key":
current_key = value # type: ignore[assignment]

elif event in (
"null",
"boolean",
"integer",
"number",
"string",
):
if object_stack and current_key is not None:
object_stack[-1][1][current_key] = value
current_key = None

elif event in ("start_array", "end_array"):
# Arrays are traversed transparently; individual frame objects
# inside arrays will be picked up as separate map events above.
current_key = None


def build_segments_from_stream(
source: BinaryIO | bytes,
*,
segment_size: int = DEFAULT_SEGMENT_SIZE,
drop_invalid: bool = False,
) -> TelemetrySegmentBatch:
"""Parse *source* with a streaming tokenizer and group results into segments.

This is the streaming counterpart of :func:`build_telemetry_segments`. It
reads the JSON document incrementally via :func:`iter_price_events_from_stream`
and therefore keeps memory consumption proportional to *segment_size* rather
than the full document size.

Parameters
----------
source:
Binary file-like object or raw bytes containing a JSON payload.
segment_size:
Maximum number of ticker tuples per segment. Must be positive.
drop_invalid:
Silently skip unrecognised frames when ``True``.

Returns
-------
TelemetrySegmentBatch
An immutable tuple of fixed-size :data:`TelemetrySegment` tuples.
"""
if segment_size <= 0:
raise ValueError("segment_size must be greater than zero")

segments: list[TelemetrySegment] = []
current: list[TelemetryTuple] = []

for frame in iter_price_events_from_stream(source, drop_invalid=drop_invalid):
current.append(frame)
if len(current) == segment_size:
segments.append(tuple(current))
current = []

if current:
segments.append(tuple(current))

return tuple(segments)


__all__ = [
"DEFAULT_SEGMENT_SIZE",
"TelemetrySegment",
"TelemetrySegmentBatch",
"TelemetryTuple",
"build_segments_from_stream",
"build_telemetry_segments",
"flatten_telemetry_frames",
"iter_flat_ticker_tuples",
]
"iter_price_events_from_stream",
]
6 changes: 5 additions & 1 deletion src/ingestion/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,21 @@
TelemetrySegment,
TelemetrySegmentBatch,
TelemetryTuple,
build_segments_from_stream,
build_telemetry_segments,
flatten_telemetry_frames,
iter_flat_ticker_tuples,
iter_price_events_from_stream,
)

__all__ = [
"DEFAULT_SEGMENT_SIZE",
"TelemetrySegment",
"TelemetrySegmentBatch",
"TelemetryTuple",
"build_segments_from_stream",
"build_telemetry_segments",
"flatten_telemetry_frames",
"iter_flat_ticker_tuples",
]
"iter_price_events_from_stream",
]
Loading