Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 9 additions & 33 deletions src/analytics/latency.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,38 +197,9 @@ def _ensure_latency_table(self, connection) -> None:
with connection.cursor() as cursor:
cursor.execute(create_sql)

def _persist_metrics(self, connection, json_adapter, metrics: LatencyMetrics) -> None:
insert_sql = f"""
INSERT INTO {LATENCY_TABLE_NAME} (
packet_id,
ingested_at,
processing_at,
confirmed_at,
total_latency_ms,
processing_latency_ms,
confirmation_latency_ms,
metadata,
recorded_at
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
row = metrics.to_db_row()
with connection.cursor() as cursor:
cursor.execute(
insert_sql,
(
row["packet_id"],
row["ingested_at"],
row["processing_at"],
row["confirmed_at"],
row["total_latency_ms"],
row["processing_latency_ms"],
row["confirmation_latency_ms"],
json_adapter(row["metadata"]),
row["recorded_at"],
),
)

def flush(self) -> None:
from src.database.writer import DatabaseWriter

metrics = list(self.collect_completed_metrics().values())
if not metrics:
logger.debug("[LatencyTracker] no completed latency records to flush")
Expand All @@ -243,8 +214,13 @@ def flush(self) -> None:
try:
connection, json_adapter = self._connect_db()
self._ensure_latency_table(connection)
for metric in metrics:
self._persist_metrics(connection, json_adapter, metric)

writer = DatabaseWriter(connection)
rows = [metric.to_db_row() for metric in metrics]
for row in rows:
row["metadata"] = json_adapter(row["metadata"])
writer.insert_batch(LATENCY_TABLE_NAME, rows)

packet_ids = [metric.packet_id for metric in metrics]
self._remove_exported_records(packet_ids)
logger.info(
Expand Down
86 changes: 44 additions & 42 deletions src/database/batch_sink.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,35 @@
from __future__ import annotations

import threading
import logging
import sqlite3
from typing import Dict, List, Any
from typing import Dict, List, Any, Optional

from src.database.writer import DatabaseWriter

logger = logging.getLogger(__name__)


class BatchSink:
"""Thread‑safe micro‑batch aggregator for telemetry data.

Uses a pre‑compiled ``DatabaseWriter`` so the SQL INSERT is parsed only
once per (table, column‑set) pair instead of on every flush cycle.

Usage:
# Obtain a sqlite3.Connection (ensure ``isolation_level=None`` for explicit transactions)
conn = sqlite3.connect('your.db', isolation_level=None)
conn = sqlite3.connect('telemetry.db', isolation_level=None)
sink = BatchSink(conn, table_name='telemetry', flush_interval=2.0)
sink.save({"asset_id": "abc", "price": 123.45, "ts": 1700000000})
...
sink.shutdown() # flush remaining data and stop background thread
sink.shutdown()
"""

def __init__(self, connection: sqlite3.Connection, table_name: str = "telemetry", flush_interval: float = 2.0):
def __init__(
self,
connection: sqlite3.Connection,
table_name: str = "telemetry",
flush_interval: float = 2.0,
):
if not isinstance(connection, sqlite3.Connection):
raise TypeError("connection must be an instance of sqlite3.Connection")
self._conn = connection
Expand All @@ -26,77 +38,67 @@ def __init__(self, connection: sqlite3.Connection, table_name: str = "telemetry"
self._buffer: List[Dict[str, Any]] = []
self._lock = threading.Lock()
self._stop_event = threading.Event()
self._thread = threading.Thread(target=self._run, daemon=True, name="BatchSink-Flusher")
self._writer = DatabaseWriter(connection)
self._thread = threading.Thread(
target=self._run, daemon=True, name="BatchSink-Flusher"
)
self._thread.start()
logger.debug("BatchSink initialized for table %s with %s‑second interval", self._table, self._interval)
logger.debug(
"BatchSink initialized for table %s with %s‑second interval",
self._table,
self._interval,
)

def save(self, data: Dict[str, Any]) -> None:
"""Add a telemetry record to the in‑memory buffer.

The method is safe to call from multiple threads.
"""
"""Add a telemetry record to the in‑memory buffer."""
if not isinstance(data, dict):
raise TypeError("data must be a dict mapping column names to values")
with self._lock:
self._buffer.append(data)
logger.debug("Saved record to buffer; current size=%d", len(self._buffer))

def _run(self) -> None:
"""Background worker that periodically flushes the buffer.

It wakes up every ``self._interval`` seconds and attempts to write any
accumulated records to the database.
"""
"""Background worker that periodically flushes the buffer."""
while not self._stop_event.wait(self._interval):
try:
self._flush()
except Exception as exc: # pragma: no cover – defensive
logger.exception("Unexpected error while flushing BatchSink: %s", exc)
except Exception as exc:
logger.exception(
"Unexpected error while flushing BatchSink: %s", exc
)

def _flush(self) -> None:
"""Perform a bulk ``executemany`` insert of buffered rows.

The operation is atomic: the buffer is cleared *after* a successful copy
of its contents, guaranteeing that no record is lost on failure.
"""
# Snapshot and clear the buffer under lock
"""Flush buffered rows via the pre‑compiled DatabaseWriter."""
with self._lock:
if not self._buffer:
return
batch = self._buffer.copy()
self._buffer.clear()
logger.debug("Flushing %d records to table %s", len(batch), self._table)

# Determine column ordering from the first record – all records must share the same schema
columns = list(batch[0].keys())
placeholders = ", ".join(["?" for _ in columns])
column_clause = ", ".join(columns)
sql = f"INSERT INTO {self._table} ({column_clause}) VALUES ({placeholders})"
values = [tuple(row[col] for col in columns) for row in batch]
logger.debug("Flushing %d records to table %s", len(batch), self._table)

# Use an explicit transaction for speed and atomicity
try:
self._conn.execute("BEGIN")
self._conn.executemany(sql, values)
self._writer.insert_batch(self._table, batch, commit=False)
self._conn.execute("COMMIT")
logger.debug("Successfully flushed %d records", len(batch))
except Exception:
# Roll back to keep DB consistent and re‑queue the data
self._conn.execute("ROLLBACK")
with self._lock:
# prepend failed batch so they are not lost
self._buffer = batch + self._buffer
logger.exception("Failed to flush BatchSink; records re‑queued")
raise

def shutdown(self) -> None:
"""Stop the background thread and flush any remaining data.
"""
"""Stop the background thread and flush any remaining data."""
self._stop_event.set()
self._thread.join()
# Final flush – ignore errors to avoid blocking shutdown
try:
self._flush()
except Exception as exc: # pragma: no cover – defensive
logger.exception("Error during final BatchSink shutdown flush: %s", exc)
logger.info("BatchSink shutdown complete; %d records remaining in buffer", len(self._buffer))
except Exception as exc:
logger.exception(
"Error during final BatchSink shutdown flush: %s", exc
)
logger.info(
"BatchSink shutdown complete; %d records remaining in buffer",
len(self._buffer),
)
130 changes: 130 additions & 0 deletions src/database/writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
from __future__ import annotations

import logging
from typing import Any, Dict, List, Optional, Tuple, Union
from collections import OrderedDict

logger = logging.getLogger(__name__)

try:
import psycopg2
from psycopg2 import sql as psql
HAS_PSYCOPG2 = True
except ImportError:
HAS_PSYCOPG2 = False

try:
import sqlite3
HAS_SQLITE3 = True
except ImportError:
HAS_SQLITE3 = False


class DatabaseWriter:
"""Reusable prepared‑statement writer for telemetry inserts.

Pre‑compiles INSERT statements keyed by (table, *columns) so that the
database engine does not re‑parse the SQL on every batch. Supports both
PostgreSQL (psycopg2) and SQLite backends.

Usage:
writer = DatabaseWriter(connection)
writer.insert("telemetry", {"asset_id": "abc", "price": 123.45})
writer.insert_batch("telemetry", [{"asset_id": "def", "price": 54.32}, ...])
"""

def __init__(self, connection: Any) -> None:
self._conn = connection
self._statements: Dict[Tuple[str, ...], Any] = OrderedDict()

# ── public API ──────────────────────────────────────────────────────────

def insert(self, table: str, data: Dict[str, Any]) -> None:
"""Insert a single row."""
sql, params = self._compile(table, data)
self._conn.execute(sql, params)
self._conn.commit()

def insert_batch(
self, table: str, rows: List[Dict[str, Any]], commit: bool = True
) -> None:
"""Bulk‑insert multiple rows.

Uses ``executemany`` for SQLite and ``cursor.executemany`` for
PostgreSQL so the prepared statement is reused across all rows.

All rows must share the same column set (keys of the first dict).
"""
if not rows:
return

columns = list(rows[0].keys())
sql = self._cached_statement(table, columns)
values = [tuple(r[col] for col in columns) for r in rows]

if _is_psycopg2(self._conn):
with self._conn.cursor() as cursor:
cursor.executemany(sql, values)
if commit:
self._conn.commit()
elif _is_sqlite(self._conn):
self._conn.execute("BEGIN")
try:
self._conn.executemany(sql, values)
if commit:
self._conn.commit()
except Exception:
self._conn.execute("ROLLBACK")
raise
else:
self._conn.executemany(sql, values)
if commit:
self._conn.commit()

# ── internal helpers ────────────────────────────────────────────────────

def _compile(
self, table: str, data: Dict[str, Any]
) -> Tuple[str, Tuple[Any, ...]]:
"""Return (sql_string, params_tuple) for a single row insert."""
columns = list(data.keys())
sql = self._cached_statement(table, columns)
return sql, tuple(data[col] for col in columns)

def _cached_statement(self, table: str, columns: List[str]) -> str:
"""Return a pre‑compiled INSERT statement, caching it on first use."""
key = (table, *columns)
if key not in self._statements:
self._statements[key] = self._build_insert_sql(table, columns)
logger.debug(
"Compiled new insert statement for %s(%s)", table, ", ".join(columns)
)
return self._statements[key]

def _build_insert_sql(self, table: str, columns: List[str]) -> str:
"""Build a parameterised INSERT statement appropriate for the backend."""
if HAS_PSYCOPG2 and _is_psycopg2(self._conn):
col_identifiers = [psql.Identifier(c) for c in columns]
placeholders = psql.SQL(", ").join(
psql.Placeholder() for _ in columns
)
stmt = psql.SQL("INSERT INTO {} ({}) VALUES ({})").format(
psql.Identifier(table),
psql.SQL(", ").join(col_identifiers),
placeholders,
)
return stmt.as_string(self._conn)

placeholders = ", ".join("?" for _ in columns)
col_clause = ", ".join(columns)
return f"INSERT INTO {table} ({col_clause}) VALUES ({placeholders})"


# ── backend detection helpers ────────────────────────────────────────────────

def _is_psycopg2(conn: Any) -> bool:
return HAS_PSYCOPG2 and isinstance(conn, psycopg2.extensions.connection)


def _is_sqlite(conn: Any) -> bool:
return HAS_SQLITE3 and isinstance(conn, sqlite3.Connection)
11 changes: 5 additions & 6 deletions tests/test_latency_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,12 @@ def test_latency_tracker_flushes_completed_records_to_db():
with patch.object(LatencyTracker, "_connect_db", return_value=(mock_conn, fake_json)):
tracker.flush()

assert mock_cursor.execute.call_count == 2
assert mock_cursor.execute.call_count == 1
create_sql_call = mock_cursor.execute.call_args_list[0]
insert_sql_call = mock_cursor.execute.call_args_list[1]
assert "CREATE TABLE IF NOT EXISTS" in create_sql_call.args[0]
assert "INSERT INTO" in insert_sql_call.args[0]
assert insert_sql_call.args[1][0] == "packet-456"
assert insert_sql_call.args[1][4] == 1200
mock_conn.executemany.assert_called_once()
insert_sql = mock_conn.executemany.call_args[0][0]
assert "INSERT INTO" in insert_sql


def test_latency_tracker_removes_exported_records_after_flush():
Expand All @@ -68,7 +67,7 @@ def test_latency_tracker_removes_exported_records_after_flush():
tracker.flush()

assert "packet-999" not in tracker._records
assert mock_cursor.execute.call_count == 2
assert mock_cursor.execute.call_count == 1


def test_latency_tracker_no_database_url_logs_warning(caplog):
Expand Down