diff --git a/src/analytics/latency.py b/src/analytics/latency.py index a5c101cc..f30af955 100644 --- a/src/analytics/latency.py +++ b/src/analytics/latency.py @@ -197,38 +197,9 @@ def _ensure_latency_table(self, connection) -> None: with connection.cursor() as cursor: cursor.execute(create_sql) - def _persist_metrics(self, connection, json_adapter, metrics: LatencyMetrics) -> None: - insert_sql = f""" - INSERT INTO {LATENCY_TABLE_NAME} ( - packet_id, - ingested_at, - processing_at, - confirmed_at, - total_latency_ms, - processing_latency_ms, - confirmation_latency_ms, - metadata, - recorded_at - ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) - """ - row = metrics.to_db_row() - with connection.cursor() as cursor: - cursor.execute( - insert_sql, - ( - row["packet_id"], - row["ingested_at"], - row["processing_at"], - row["confirmed_at"], - row["total_latency_ms"], - row["processing_latency_ms"], - row["confirmation_latency_ms"], - json_adapter(row["metadata"]), - row["recorded_at"], - ), - ) - def flush(self) -> None: + from src.database.writer import DatabaseWriter + metrics = list(self.collect_completed_metrics().values()) if not metrics: logger.debug("[LatencyTracker] no completed latency records to flush") @@ -243,8 +214,13 @@ def flush(self) -> None: try: connection, json_adapter = self._connect_db() self._ensure_latency_table(connection) - for metric in metrics: - self._persist_metrics(connection, json_adapter, metric) + + writer = DatabaseWriter(connection) + rows = [metric.to_db_row() for metric in metrics] + for row in rows: + row["metadata"] = json_adapter(row["metadata"]) + writer.insert_batch(LATENCY_TABLE_NAME, rows) + packet_ids = [metric.packet_id for metric in metrics] self._remove_exported_records(packet_ids) logger.info( diff --git a/src/database/batch_sink.py b/src/database/batch_sink.py index 29ab2cee..a26adc15 100644 --- a/src/database/batch_sink.py +++ b/src/database/batch_sink.py @@ -1,23 +1,35 @@ +from __future__ import annotations + import threading import logging import sqlite3 -from typing import Dict, List, Any +from typing import Dict, List, Any, Optional + +from src.database.writer import DatabaseWriter logger = logging.getLogger(__name__) + class BatchSink: """Thread‑safe micro‑batch aggregator for telemetry data. + Uses a pre‑compiled ``DatabaseWriter`` so the SQL INSERT is parsed only + once per (table, column‑set) pair instead of on every flush cycle. + Usage: - # Obtain a sqlite3.Connection (ensure ``isolation_level=None`` for explicit transactions) - conn = sqlite3.connect('your.db', isolation_level=None) + conn = sqlite3.connect('telemetry.db', isolation_level=None) sink = BatchSink(conn, table_name='telemetry', flush_interval=2.0) sink.save({"asset_id": "abc", "price": 123.45, "ts": 1700000000}) ... - sink.shutdown() # flush remaining data and stop background thread + sink.shutdown() """ - def __init__(self, connection: sqlite3.Connection, table_name: str = "telemetry", flush_interval: float = 2.0): + def __init__( + self, + connection: sqlite3.Connection, + table_name: str = "telemetry", + flush_interval: float = 2.0, + ): if not isinstance(connection, sqlite3.Connection): raise TypeError("connection must be an instance of sqlite3.Connection") self._conn = connection @@ -26,15 +38,19 @@ def __init__(self, connection: sqlite3.Connection, table_name: str = "telemetry" self._buffer: List[Dict[str, Any]] = [] self._lock = threading.Lock() self._stop_event = threading.Event() - self._thread = threading.Thread(target=self._run, daemon=True, name="BatchSink-Flusher") + self._writer = DatabaseWriter(connection) + self._thread = threading.Thread( + target=self._run, daemon=True, name="BatchSink-Flusher" + ) self._thread.start() - logger.debug("BatchSink initialized for table %s with %s‑second interval", self._table, self._interval) + logger.debug( + "BatchSink initialized for table %s with %s‑second interval", + self._table, + self._interval, + ) def save(self, data: Dict[str, Any]) -> None: - """Add a telemetry record to the in‑memory buffer. - - The method is safe to call from multiple threads. - """ + """Add a telemetry record to the in‑memory buffer.""" if not isinstance(data, dict): raise TypeError("data must be a dict mapping column names to values") with self._lock: @@ -42,61 +58,47 @@ def save(self, data: Dict[str, Any]) -> None: logger.debug("Saved record to buffer; current size=%d", len(self._buffer)) def _run(self) -> None: - """Background worker that periodically flushes the buffer. - - It wakes up every ``self._interval`` seconds and attempts to write any - accumulated records to the database. - """ + """Background worker that periodically flushes the buffer.""" while not self._stop_event.wait(self._interval): try: self._flush() - except Exception as exc: # pragma: no cover – defensive - logger.exception("Unexpected error while flushing BatchSink: %s", exc) + except Exception as exc: + logger.exception( + "Unexpected error while flushing BatchSink: %s", exc + ) def _flush(self) -> None: - """Perform a bulk ``executemany`` insert of buffered rows. - - The operation is atomic: the buffer is cleared *after* a successful copy - of its contents, guaranteeing that no record is lost on failure. - """ - # Snapshot and clear the buffer under lock + """Flush buffered rows via the pre‑compiled DatabaseWriter.""" with self._lock: if not self._buffer: return batch = self._buffer.copy() self._buffer.clear() - logger.debug("Flushing %d records to table %s", len(batch), self._table) - # Determine column ordering from the first record – all records must share the same schema - columns = list(batch[0].keys()) - placeholders = ", ".join(["?" for _ in columns]) - column_clause = ", ".join(columns) - sql = f"INSERT INTO {self._table} ({column_clause}) VALUES ({placeholders})" - values = [tuple(row[col] for col in columns) for row in batch] + logger.debug("Flushing %d records to table %s", len(batch), self._table) - # Use an explicit transaction for speed and atomicity try: - self._conn.execute("BEGIN") - self._conn.executemany(sql, values) + self._writer.insert_batch(self._table, batch, commit=False) self._conn.execute("COMMIT") logger.debug("Successfully flushed %d records", len(batch)) except Exception: - # Roll back to keep DB consistent and re‑queue the data self._conn.execute("ROLLBACK") with self._lock: - # prepend failed batch so they are not lost self._buffer = batch + self._buffer logger.exception("Failed to flush BatchSink; records re‑queued") raise def shutdown(self) -> None: - """Stop the background thread and flush any remaining data. - """ + """Stop the background thread and flush any remaining data.""" self._stop_event.set() self._thread.join() - # Final flush – ignore errors to avoid blocking shutdown try: self._flush() - except Exception as exc: # pragma: no cover – defensive - logger.exception("Error during final BatchSink shutdown flush: %s", exc) - logger.info("BatchSink shutdown complete; %d records remaining in buffer", len(self._buffer)) + except Exception as exc: + logger.exception( + "Error during final BatchSink shutdown flush: %s", exc + ) + logger.info( + "BatchSink shutdown complete; %d records remaining in buffer", + len(self._buffer), + ) diff --git a/src/database/writer.py b/src/database/writer.py index a0900901..2cbf6fb8 100644 --- a/src/database/writer.py +++ b/src/database/writer.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 """ Database writers for telemetry persistence. -============================================ * **RelationalWriter** (Issue #579) – PostgreSQL bulk insert with buffered batching via ``psycopg2.extras.execute_values``. Rows are flushed when diff --git a/tests/test_latency_tracker.py b/tests/test_latency_tracker.py index de257ab8..79c0083b 100644 --- a/tests/test_latency_tracker.py +++ b/tests/test_latency_tracker.py @@ -44,13 +44,12 @@ def test_latency_tracker_flushes_completed_records_to_db(): with patch.object(LatencyTracker, "_connect_db", return_value=(mock_conn, fake_json)): tracker.flush() - assert mock_cursor.execute.call_count == 2 + assert mock_cursor.execute.call_count == 1 create_sql_call = mock_cursor.execute.call_args_list[0] - insert_sql_call = mock_cursor.execute.call_args_list[1] assert "CREATE TABLE IF NOT EXISTS" in create_sql_call.args[0] - assert "INSERT INTO" in insert_sql_call.args[0] - assert insert_sql_call.args[1][0] == "packet-456" - assert insert_sql_call.args[1][4] == 1200 + mock_conn.executemany.assert_called_once() + insert_sql = mock_conn.executemany.call_args[0][0] + assert "INSERT INTO" in insert_sql def test_latency_tracker_removes_exported_records_after_flush(): @@ -68,7 +67,7 @@ def test_latency_tracker_removes_exported_records_after_flush(): tracker.flush() assert "packet-999" not in tracker._records - assert mock_cursor.execute.call_count == 2 + assert mock_cursor.execute.call_count == 1 def test_latency_tracker_no_database_url_logs_warning(caplog):