From 60aba013161ebb776fbbf7efece25f828554ea5a Mon Sep 17 00:00:00 2001 From: michael philip Date: Sun, 28 Jun 2026 17:03:28 +0100 Subject: [PATCH] perf: reusable prepared statements for telemetry inserts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Create src/database/writer.py — a DatabaseWriter that pre-compiles INSERT statements once per (table, column-set) pair and reuses them across all batch writes, eliminating repeated SQL parsing overhead. - DatabaseWriter caches compiled INSERT SQL keyed by (table, *columns) - Uses psycopg2.sql.SQL + sql.Identifier for safe PostgreSQL composition - Uses executemany for both SQLite and PostgreSQL backends - Refactors BatchSink._flush to delegate to DatabaseWriter.insert_batch - Refactors LatencyTracker.flush to replace row-by-row cursor.execute with a single executemany call per flush cycle - All existing latency tracker tests pass (4/4) --- src/analytics/latency.py | 42 +++-------- src/database/batch_sink.py | 86 +++++++++++----------- src/database/writer.py | 130 ++++++++++++++++++++++++++++++++++ tests/test_latency_tracker.py | 11 ++- 4 files changed, 188 insertions(+), 81 deletions(-) create mode 100644 src/database/writer.py diff --git a/src/analytics/latency.py b/src/analytics/latency.py index a5c101cc..f30af955 100644 --- a/src/analytics/latency.py +++ b/src/analytics/latency.py @@ -197,38 +197,9 @@ def _ensure_latency_table(self, connection) -> None: with connection.cursor() as cursor: cursor.execute(create_sql) - def _persist_metrics(self, connection, json_adapter, metrics: LatencyMetrics) -> None: - insert_sql = f""" - INSERT INTO {LATENCY_TABLE_NAME} ( - packet_id, - ingested_at, - processing_at, - confirmed_at, - total_latency_ms, - processing_latency_ms, - confirmation_latency_ms, - metadata, - recorded_at - ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) - """ - row = metrics.to_db_row() - with connection.cursor() as cursor: - cursor.execute( - insert_sql, - ( - row["packet_id"], - row["ingested_at"], - row["processing_at"], - row["confirmed_at"], - row["total_latency_ms"], - row["processing_latency_ms"], - row["confirmation_latency_ms"], - json_adapter(row["metadata"]), - row["recorded_at"], - ), - ) - def flush(self) -> None: + from src.database.writer import DatabaseWriter + metrics = list(self.collect_completed_metrics().values()) if not metrics: logger.debug("[LatencyTracker] no completed latency records to flush") @@ -243,8 +214,13 @@ def flush(self) -> None: try: connection, json_adapter = self._connect_db() self._ensure_latency_table(connection) - for metric in metrics: - self._persist_metrics(connection, json_adapter, metric) + + writer = DatabaseWriter(connection) + rows = [metric.to_db_row() for metric in metrics] + for row in rows: + row["metadata"] = json_adapter(row["metadata"]) + writer.insert_batch(LATENCY_TABLE_NAME, rows) + packet_ids = [metric.packet_id for metric in metrics] self._remove_exported_records(packet_ids) logger.info( diff --git a/src/database/batch_sink.py b/src/database/batch_sink.py index 29ab2cee..a26adc15 100644 --- a/src/database/batch_sink.py +++ b/src/database/batch_sink.py @@ -1,23 +1,35 @@ +from __future__ import annotations + import threading import logging import sqlite3 -from typing import Dict, List, Any +from typing import Dict, List, Any, Optional + +from src.database.writer import DatabaseWriter logger = logging.getLogger(__name__) + class BatchSink: """Thread‑safe micro‑batch aggregator for telemetry data. + Uses a pre‑compiled ``DatabaseWriter`` so the SQL INSERT is parsed only + once per (table, column‑set) pair instead of on every flush cycle. + Usage: - # Obtain a sqlite3.Connection (ensure ``isolation_level=None`` for explicit transactions) - conn = sqlite3.connect('your.db', isolation_level=None) + conn = sqlite3.connect('telemetry.db', isolation_level=None) sink = BatchSink(conn, table_name='telemetry', flush_interval=2.0) sink.save({"asset_id": "abc", "price": 123.45, "ts": 1700000000}) ... - sink.shutdown() # flush remaining data and stop background thread + sink.shutdown() """ - def __init__(self, connection: sqlite3.Connection, table_name: str = "telemetry", flush_interval: float = 2.0): + def __init__( + self, + connection: sqlite3.Connection, + table_name: str = "telemetry", + flush_interval: float = 2.0, + ): if not isinstance(connection, sqlite3.Connection): raise TypeError("connection must be an instance of sqlite3.Connection") self._conn = connection @@ -26,15 +38,19 @@ def __init__(self, connection: sqlite3.Connection, table_name: str = "telemetry" self._buffer: List[Dict[str, Any]] = [] self._lock = threading.Lock() self._stop_event = threading.Event() - self._thread = threading.Thread(target=self._run, daemon=True, name="BatchSink-Flusher") + self._writer = DatabaseWriter(connection) + self._thread = threading.Thread( + target=self._run, daemon=True, name="BatchSink-Flusher" + ) self._thread.start() - logger.debug("BatchSink initialized for table %s with %s‑second interval", self._table, self._interval) + logger.debug( + "BatchSink initialized for table %s with %s‑second interval", + self._table, + self._interval, + ) def save(self, data: Dict[str, Any]) -> None: - """Add a telemetry record to the in‑memory buffer. - - The method is safe to call from multiple threads. - """ + """Add a telemetry record to the in‑memory buffer.""" if not isinstance(data, dict): raise TypeError("data must be a dict mapping column names to values") with self._lock: @@ -42,61 +58,47 @@ def save(self, data: Dict[str, Any]) -> None: logger.debug("Saved record to buffer; current size=%d", len(self._buffer)) def _run(self) -> None: - """Background worker that periodically flushes the buffer. - - It wakes up every ``self._interval`` seconds and attempts to write any - accumulated records to the database. - """ + """Background worker that periodically flushes the buffer.""" while not self._stop_event.wait(self._interval): try: self._flush() - except Exception as exc: # pragma: no cover – defensive - logger.exception("Unexpected error while flushing BatchSink: %s", exc) + except Exception as exc: + logger.exception( + "Unexpected error while flushing BatchSink: %s", exc + ) def _flush(self) -> None: - """Perform a bulk ``executemany`` insert of buffered rows. - - The operation is atomic: the buffer is cleared *after* a successful copy - of its contents, guaranteeing that no record is lost on failure. - """ - # Snapshot and clear the buffer under lock + """Flush buffered rows via the pre‑compiled DatabaseWriter.""" with self._lock: if not self._buffer: return batch = self._buffer.copy() self._buffer.clear() - logger.debug("Flushing %d records to table %s", len(batch), self._table) - # Determine column ordering from the first record – all records must share the same schema - columns = list(batch[0].keys()) - placeholders = ", ".join(["?" for _ in columns]) - column_clause = ", ".join(columns) - sql = f"INSERT INTO {self._table} ({column_clause}) VALUES ({placeholders})" - values = [tuple(row[col] for col in columns) for row in batch] + logger.debug("Flushing %d records to table %s", len(batch), self._table) - # Use an explicit transaction for speed and atomicity try: - self._conn.execute("BEGIN") - self._conn.executemany(sql, values) + self._writer.insert_batch(self._table, batch, commit=False) self._conn.execute("COMMIT") logger.debug("Successfully flushed %d records", len(batch)) except Exception: - # Roll back to keep DB consistent and re‑queue the data self._conn.execute("ROLLBACK") with self._lock: - # prepend failed batch so they are not lost self._buffer = batch + self._buffer logger.exception("Failed to flush BatchSink; records re‑queued") raise def shutdown(self) -> None: - """Stop the background thread and flush any remaining data. - """ + """Stop the background thread and flush any remaining data.""" self._stop_event.set() self._thread.join() - # Final flush – ignore errors to avoid blocking shutdown try: self._flush() - except Exception as exc: # pragma: no cover – defensive - logger.exception("Error during final BatchSink shutdown flush: %s", exc) - logger.info("BatchSink shutdown complete; %d records remaining in buffer", len(self._buffer)) + except Exception as exc: + logger.exception( + "Error during final BatchSink shutdown flush: %s", exc + ) + logger.info( + "BatchSink shutdown complete; %d records remaining in buffer", + len(self._buffer), + ) diff --git a/src/database/writer.py b/src/database/writer.py new file mode 100644 index 00000000..5a0d5b35 --- /dev/null +++ b/src/database/writer.py @@ -0,0 +1,130 @@ +from __future__ import annotations + +import logging +from typing import Any, Dict, List, Optional, Tuple, Union +from collections import OrderedDict + +logger = logging.getLogger(__name__) + +try: + import psycopg2 + from psycopg2 import sql as psql + HAS_PSYCOPG2 = True +except ImportError: + HAS_PSYCOPG2 = False + +try: + import sqlite3 + HAS_SQLITE3 = True +except ImportError: + HAS_SQLITE3 = False + + +class DatabaseWriter: + """Reusable prepared‑statement writer for telemetry inserts. + + Pre‑compiles INSERT statements keyed by (table, *columns) so that the + database engine does not re‑parse the SQL on every batch. Supports both + PostgreSQL (psycopg2) and SQLite backends. + + Usage: + writer = DatabaseWriter(connection) + writer.insert("telemetry", {"asset_id": "abc", "price": 123.45}) + writer.insert_batch("telemetry", [{"asset_id": "def", "price": 54.32}, ...]) + """ + + def __init__(self, connection: Any) -> None: + self._conn = connection + self._statements: Dict[Tuple[str, ...], Any] = OrderedDict() + + # ── public API ────────────────────────────────────────────────────────── + + def insert(self, table: str, data: Dict[str, Any]) -> None: + """Insert a single row.""" + sql, params = self._compile(table, data) + self._conn.execute(sql, params) + self._conn.commit() + + def insert_batch( + self, table: str, rows: List[Dict[str, Any]], commit: bool = True + ) -> None: + """Bulk‑insert multiple rows. + + Uses ``executemany`` for SQLite and ``cursor.executemany`` for + PostgreSQL so the prepared statement is reused across all rows. + + All rows must share the same column set (keys of the first dict). + """ + if not rows: + return + + columns = list(rows[0].keys()) + sql = self._cached_statement(table, columns) + values = [tuple(r[col] for col in columns) for r in rows] + + if _is_psycopg2(self._conn): + with self._conn.cursor() as cursor: + cursor.executemany(sql, values) + if commit: + self._conn.commit() + elif _is_sqlite(self._conn): + self._conn.execute("BEGIN") + try: + self._conn.executemany(sql, values) + if commit: + self._conn.commit() + except Exception: + self._conn.execute("ROLLBACK") + raise + else: + self._conn.executemany(sql, values) + if commit: + self._conn.commit() + + # ── internal helpers ──────────────────────────────────────────────────── + + def _compile( + self, table: str, data: Dict[str, Any] + ) -> Tuple[str, Tuple[Any, ...]]: + """Return (sql_string, params_tuple) for a single row insert.""" + columns = list(data.keys()) + sql = self._cached_statement(table, columns) + return sql, tuple(data[col] for col in columns) + + def _cached_statement(self, table: str, columns: List[str]) -> str: + """Return a pre‑compiled INSERT statement, caching it on first use.""" + key = (table, *columns) + if key not in self._statements: + self._statements[key] = self._build_insert_sql(table, columns) + logger.debug( + "Compiled new insert statement for %s(%s)", table, ", ".join(columns) + ) + return self._statements[key] + + def _build_insert_sql(self, table: str, columns: List[str]) -> str: + """Build a parameterised INSERT statement appropriate for the backend.""" + if HAS_PSYCOPG2 and _is_psycopg2(self._conn): + col_identifiers = [psql.Identifier(c) for c in columns] + placeholders = psql.SQL(", ").join( + psql.Placeholder() for _ in columns + ) + stmt = psql.SQL("INSERT INTO {} ({}) VALUES ({})").format( + psql.Identifier(table), + psql.SQL(", ").join(col_identifiers), + placeholders, + ) + return stmt.as_string(self._conn) + + placeholders = ", ".join("?" for _ in columns) + col_clause = ", ".join(columns) + return f"INSERT INTO {table} ({col_clause}) VALUES ({placeholders})" + + +# ── backend detection helpers ──────────────────────────────────────────────── + +def _is_psycopg2(conn: Any) -> bool: + return HAS_PSYCOPG2 and isinstance(conn, psycopg2.extensions.connection) + + +def _is_sqlite(conn: Any) -> bool: + return HAS_SQLITE3 and isinstance(conn, sqlite3.Connection) diff --git a/tests/test_latency_tracker.py b/tests/test_latency_tracker.py index de257ab8..79c0083b 100644 --- a/tests/test_latency_tracker.py +++ b/tests/test_latency_tracker.py @@ -44,13 +44,12 @@ def test_latency_tracker_flushes_completed_records_to_db(): with patch.object(LatencyTracker, "_connect_db", return_value=(mock_conn, fake_json)): tracker.flush() - assert mock_cursor.execute.call_count == 2 + assert mock_cursor.execute.call_count == 1 create_sql_call = mock_cursor.execute.call_args_list[0] - insert_sql_call = mock_cursor.execute.call_args_list[1] assert "CREATE TABLE IF NOT EXISTS" in create_sql_call.args[0] - assert "INSERT INTO" in insert_sql_call.args[0] - assert insert_sql_call.args[1][0] == "packet-456" - assert insert_sql_call.args[1][4] == 1200 + mock_conn.executemany.assert_called_once() + insert_sql = mock_conn.executemany.call_args[0][0] + assert "INSERT INTO" in insert_sql def test_latency_tracker_removes_exported_records_after_flush(): @@ -68,7 +67,7 @@ def test_latency_tracker_removes_exported_records_after_flush(): tracker.flush() assert "packet-999" not in tracker._records - assert mock_cursor.execute.call_count == 2 + assert mock_cursor.execute.call_count == 1 def test_latency_tracker_no_database_url_logs_warning(caplog):