From 33fe938da343e72aae2c646dedef753a540b7be2 Mon Sep 17 00:00:00 2001 From: Vaishnavi Desai Date: Sun, 28 Jun 2026 16:04:25 +0530 Subject: [PATCH] feat(ingestion): streaming JSON tokenizer for high-volume ingestion (#500) --- src/ingestion/parser.py | 198 ++++++++++++++++++++++++++++++++++++++- src/ingestion/parsers.py | 6 +- tests/test_parser.py | 166 +++++++++++++++++++++++++++++++- 3 files changed, 366 insertions(+), 4 deletions(-) diff --git a/src/ingestion/parser.py b/src/ingestion/parser.py index 6930420d..70a5e81d 100644 --- a/src/ingestion/parser.py +++ b/src/ingestion/parser.py @@ -239,12 +239,208 @@ def build_telemetry_segments( return tuple(segments) +# --------------------------------------------------------------------------- +# Streaming JSON tokenizer — low-memory ingestion via ijson +# --------------------------------------------------------------------------- +# The functions below accept a *binary* file-like object (or any object whose +# ``read()`` returns bytes) and parse it incrementally using ijson's SAX-style +# event emitter. No complete JSON document is ever held in memory; only the +# current candidate frame dict is assembled and immediately yielded once it is +# recognised as a ticker frame. +# +# Supported top-level shapes (mirrors the in-memory parser above): +# • A single frame object: {"asset_id": ..., "price": ..., "timestamp": ...} +# • A container wrapping a batch: +# {"frames": [...]} / {"tickers": [...]} / {"data": {"tickers": [...]}} +# • An array of frame objects at the root. +# +# Attribute extraction is deliberately limited to the same key priority lists +# (_ASSET_KEYS, _PRICE_KEYS, etc.) used by the existing in-memory parser so +# both paths produce identical output for the same data. +# --------------------------------------------------------------------------- + +import io +from typing import BinaryIO + + +def _parse_frame_dict( + frame: dict[str, object], + *, + drop_invalid: bool, +) -> TelemetryTuple | None: + """Attempt to parse *frame* as a ticker frame dict. + + Returns a :data:`TelemetryTuple` on success, or ``None`` when the mapping + does not look like a ticker frame and *drop_invalid* is ``True``. + """ + if not _looks_like_frame(frame): + if drop_invalid: + return None + raise ValueError(f"Unsupported ticker payload shape: {frame!r}") + try: + return _flatten_frame(frame) + except (TypeError, ValueError): + if drop_invalid: + return None + raise + + +def iter_price_events_from_stream( + source: BinaryIO | bytes, + *, + drop_invalid: bool = False, +) -> Iterator[TelemetryTuple]: + """Yield ticker tuples by streaming *source* through an ijson tokenizer. + + Parameters + ---------- + source: + A binary file-like object (``read()`` must return ``bytes``) **or** a + raw ``bytes`` / ``bytearray`` blob. The data must be valid JSON. + drop_invalid: + When ``True``, frames that fail validation are silently skipped. + When ``False`` (default), a :exc:`ValueError` or :exc:`TypeError` is + raised on the first invalid frame, matching the behaviour of + :func:`iter_flat_ticker_tuples`. + + Yields + ------ + TelemetryTuple + ``(asset_id, price, timestamp, sequence, flags)`` for each recognised + ticker frame found anywhere in the JSON structure. + + Notes + ----- + * Memory usage is bounded by the size of a single frame dict, regardless + of overall document size. + * ``ijson`` is imported lazily so callers that never use the streaming path + do not pay the import cost. + """ + try: + import ijson # type: ignore[import-untyped] + except ModuleNotFoundError as exc: # pragma: no cover + raise ModuleNotFoundError( + "Streaming JSON ingestion requires 'ijson'. " + "Install it with: pip install ijson" + ) from exc + + if isinstance(source, (bytes, bytearray)): + source = io.BytesIO(source) + + # We use ijson's low-level ``parse`` iterator which yields + # ``(prefix, event, value)`` triples. A ``prefix`` is a dot-separated + # path that encodes the nesting level, e.g. ``"frames.item.price"``. + # + # Strategy: + # 1. Walk the event stream; whenever we enter an object (``map_key`` / + # ``start_map`` / ``end_map`` events) we accumulate key-value pairs. + # 2. On ``end_map`` we check whether the accumulated dict looks like a + # ticker frame and emit it. + # 3. We maintain a simple depth-tracked stack so nested objects within a + # frame (if any) do not confuse the collector. + + # Stack entries: (depth_at_open, partial_dict) + object_stack: list[tuple[int, dict[str, object]]] = [] + current_key: str | None = None + depth: int = 0 + + for prefix, event, value in ijson.parse(source, use_float=True): + if event == "start_map": + depth += 1 + object_stack.append((depth, {})) + current_key = None + + elif event == "end_map": + if object_stack: + open_depth, frame_dict = object_stack.pop() + depth -= 1 + + # Only attempt frame extraction for objects that were opened + # at depth 1 greater than the enclosing context. This means + # we emit on the *innermost* complete object that matches, + # which correctly handles both bare frames and nested batches. + if _looks_like_frame(frame_dict): + result = _parse_frame_dict(frame_dict, drop_invalid=drop_invalid) + if result is not None: + yield result + else: + depth -= 1 + + current_key = None + + elif event == "map_key": + current_key = value # type: ignore[assignment] + + elif event in ( + "null", + "boolean", + "integer", + "number", + "string", + ): + if object_stack and current_key is not None: + object_stack[-1][1][current_key] = value + current_key = None + + elif event in ("start_array", "end_array"): + # Arrays are traversed transparently; individual frame objects + # inside arrays will be picked up as separate map events above. + current_key = None + + +def build_segments_from_stream( + source: BinaryIO | bytes, + *, + segment_size: int = DEFAULT_SEGMENT_SIZE, + drop_invalid: bool = False, +) -> TelemetrySegmentBatch: + """Parse *source* with a streaming tokenizer and group results into segments. + + This is the streaming counterpart of :func:`build_telemetry_segments`. It + reads the JSON document incrementally via :func:`iter_price_events_from_stream` + and therefore keeps memory consumption proportional to *segment_size* rather + than the full document size. + + Parameters + ---------- + source: + Binary file-like object or raw bytes containing a JSON payload. + segment_size: + Maximum number of ticker tuples per segment. Must be positive. + drop_invalid: + Silently skip unrecognised frames when ``True``. + + Returns + ------- + TelemetrySegmentBatch + An immutable tuple of fixed-size :data:`TelemetrySegment` tuples. + """ + if segment_size <= 0: + raise ValueError("segment_size must be greater than zero") + + segments: list[TelemetrySegment] = [] + current: list[TelemetryTuple] = [] + + for frame in iter_price_events_from_stream(source, drop_invalid=drop_invalid): + current.append(frame) + if len(current) == segment_size: + segments.append(tuple(current)) + current = [] + + if current: + segments.append(tuple(current)) + + return tuple(segments) + + __all__ = [ "DEFAULT_SEGMENT_SIZE", "TelemetrySegment", "TelemetrySegmentBatch", "TelemetryTuple", + "build_segments_from_stream", "build_telemetry_segments", "flatten_telemetry_frames", "iter_flat_ticker_tuples", -] + "iter_price_events_from_stream", +] \ No newline at end of file diff --git a/src/ingestion/parsers.py b/src/ingestion/parsers.py index 767a85b1..8010f0b1 100644 --- a/src/ingestion/parsers.py +++ b/src/ingestion/parsers.py @@ -11,9 +11,11 @@ TelemetrySegment, TelemetrySegmentBatch, TelemetryTuple, + build_segments_from_stream, build_telemetry_segments, flatten_telemetry_frames, iter_flat_ticker_tuples, + iter_price_events_from_stream, ) __all__ = [ @@ -21,7 +23,9 @@ "TelemetrySegment", "TelemetrySegmentBatch", "TelemetryTuple", + "build_segments_from_stream", "build_telemetry_segments", "flatten_telemetry_frames", "iter_flat_ticker_tuples", -] + "iter_price_events_from_stream", +] \ No newline at end of file diff --git a/tests/test_parser.py b/tests/test_parser.py index 6ffb5c16..70b3e28e 100644 --- a/tests/test_parser.py +++ b/tests/test_parser.py @@ -1,8 +1,15 @@ from __future__ import annotations +import io +import json import unittest -from ingestion.parser import build_telemetry_segments, flatten_telemetry_frames +from ingestion.parser import ( + build_segments_from_stream, + build_telemetry_segments, + flatten_telemetry_frames, + iter_price_events_from_stream, +) class ParserTests(unittest.TestCase): @@ -90,5 +97,160 @@ def test_build_telemetry_segments_rejects_non_positive_segment_size(self) -> Non _ = build_telemetry_segments([], segment_size=0) +# --------------------------------------------------------------------------- +# Streaming tokenizer tests +# --------------------------------------------------------------------------- + + +def _to_stream(obj: object) -> io.BytesIO: + """Serialise *obj* to JSON and wrap it in a BytesIO stream.""" + return io.BytesIO(json.dumps(obj).encode()) + + +class StreamingParserTests(unittest.TestCase): + """Tests for iter_price_events_from_stream / build_segments_from_stream.""" + + # ------------------------------------------------------------------ + # iter_price_events_from_stream + # ------------------------------------------------------------------ + + def test_stream_single_bare_frame(self) -> None: + """A bare frame object at the root is yielded directly.""" + payload = {"asset_id": "xlm-usd", "price": 0.1025, "timestamp": 1710000000} + result = list(iter_price_events_from_stream(_to_stream(payload))) + self.assertEqual(result, [("XLM-USD", 0.1025, 1710000000, 0, 0)]) + + def test_stream_bytes_input(self) -> None: + """Raw bytes are accepted in addition to file-like objects.""" + payload = {"asset": "btc-usd", "price": 64000.0, "timestamp": 1} + raw = json.dumps(payload).encode() + result = list(iter_price_events_from_stream(raw)) + self.assertEqual(result, [("BTC-USD", 64000.0, 1, 0, 0)]) + + def test_stream_array_of_frames(self) -> None: + """A top-level JSON array of frame objects is fully consumed.""" + payload = [ + {"asset": "xlm-usd", "price": 0.11, "timestamp": 1}, + {"asset": "btc-usd", "price": 1.22, "timestamp": 2}, + {"asset": "eth-usd", "price": 2.33, "timestamp": 3}, + ] + result = list(iter_price_events_from_stream(_to_stream(payload))) + self.assertEqual( + result, + [ + ("XLM-USD", 0.11, 1, 0, 0), + ("BTC-USD", 1.22, 2, 0, 0), + ("ETH-USD", 2.33, 3, 0, 0), + ], + ) + + def test_stream_frames_wrapper_key(self) -> None: + """``frames`` batch key is unwrapped transparently.""" + payload = { + "frames": [ + {"pair": "ngn-xlm", "value": "0.00045", "ts": 100, "nonce": 5, "flag_bits": "1"}, + {"pair": "kes-xlm", "value": "0.00031", "ts": 101}, + ] + } + result = list(iter_price_events_from_stream(_to_stream(payload))) + self.assertEqual( + result, + [ + ("NGN-XLM", 0.00045, 100, 5, 1), + ("KES-XLM", 0.00031, 101, 0, 0), + ], + ) + + def test_stream_all_optional_fields_default_to_zero(self) -> None: + """sequence and flags default to 0 when absent in the stream.""" + payload = {"symbol": "ghs-xlm", "last_price": "0.00020", "event_time": 999} + result = list(iter_price_events_from_stream(_to_stream(payload))) + self.assertEqual(result, [("GHS-XLM", 0.0002, 999, 0, 0)]) + + def test_stream_string_numeric_fields_are_coerced(self) -> None: + """Numeric fields encoded as JSON strings are coerced to the right types.""" + payload = { + "asset_id": "eth-usd", + "price": "3150.75", + "timestamp": "1710000000999", + "sequence": "7", + "flags": "2", + } + result = list(iter_price_events_from_stream(_to_stream(payload))) + self.assertEqual(result, [("ETH-USD", 3150.75, 1710000000999, 7, 2)]) + + def test_stream_drop_invalid_skips_bad_frames(self) -> None: + """Frames missing required fields are skipped when drop_invalid=True.""" + payload = [ + {"asset": "xlm-usd", "price": 0.11, "timestamp": 1}, # valid + {"asset": "bad-frame", "timestamp": 2}, # missing price + {"asset": "eth-usd", "price": "3.50", "time": "3"}, # valid + ] + result = list( + iter_price_events_from_stream(_to_stream(payload), drop_invalid=True) + ) + self.assertEqual( + result, + [ + ("XLM-USD", 0.11, 1, 0, 0), + ("ETH-USD", 3.5, 3, 0, 0), + ], + ) + + def test_stream_matches_in_memory_parser_output(self) -> None: + """Streaming and in-memory parsers produce identical results.""" + frames = [ + {"asset_id": "xlm-usd", "price": 0.1025, "timestamp": 1710000000123, "seq": 101}, + {"asset_id": "btc-usd", "price": 64000.25, "timestamp": 1710000000456, "flags": 3}, + {"pair": "eth-usd", "value": "3150.75", "ts": 1710000000999, "nonce": 7, "flag_bits": "2"}, + ] + # in-memory path + expected = flatten_telemetry_frames([frames]) + # streaming path + streamed = tuple(iter_price_events_from_stream(_to_stream(frames))) + self.assertEqual(streamed, expected) + + # ------------------------------------------------------------------ + # build_segments_from_stream + # ------------------------------------------------------------------ + + def test_build_segments_from_stream_groups_into_fixed_size_batches(self) -> None: + payload = [ + {"asset": "xlm-usd", "price": 0.11, "timestamp": 1}, + {"asset": "btc-usd", "price": 1.22, "timestamp": 2}, + {"asset": "eth-usd", "price": 2.33, "timestamp": 3}, + ] + result = build_segments_from_stream(_to_stream(payload), segment_size=2) + self.assertEqual( + result, + ( + (("XLM-USD", 0.11, 1, 0, 0), ("BTC-USD", 1.22, 2, 0, 0)), + (("ETH-USD", 2.33, 3, 0, 0),), + ), + ) + + def test_build_segments_from_stream_rejects_non_positive_segment_size(self) -> None: + with self.assertRaisesRegex(ValueError, "segment_size"): + build_segments_from_stream(io.BytesIO(b"[]"), segment_size=0) + + def test_build_segments_from_stream_empty_source_returns_empty_batch(self) -> None: + result = build_segments_from_stream(_to_stream([])) + self.assertEqual(result, ()) + + def test_build_segments_from_stream_single_segment_when_fewer_than_size(self) -> None: + payload = [{"asset": "xlm-usd", "price": 0.5, "timestamp": 1}] + result = build_segments_from_stream(_to_stream(payload), segment_size=10) + self.assertEqual(result, ((("XLM-USD", 0.5, 1, 0, 0),),)) + + def test_build_segments_matches_in_memory_segments(self) -> None: + """Streaming segments equal in-memory segments for the same data.""" + frames = [ + {"asset": "xlm-usd", "price": 0.11, "timestamp": i} for i in range(7) + ] + in_memory = build_telemetry_segments([frames], segment_size=3) + streaming = build_segments_from_stream(_to_stream(frames), segment_size=3) + self.assertEqual(streaming, in_memory) + + if __name__ == "__main__": - _ = unittest.main() + _ = unittest.main() \ No newline at end of file