Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""Add onchain_kpi_snapshots table

Revision ID: 006
Revises: 005
Create Date: 2026-06-28 00:00:00.000000

Schema
------
onchain_kpi_snapshots captures one row per calendar day with:
period_date VARCHAR(10) – YYYY-MM-DD, unique (prevents duplicate snapshots)
tvl_xlm FLOAT – total value locked (XLM)
volume_xlm FLOAT – 24-h on-chain volume (XLM)
active_rounds INTEGER – active quadratic-funding rounds
contribution_count INTEGER – contribution events for the period
extra_data JSON – forward-compatible KPI extension field
captured_at TIMESTAMPTZ – snapshot capture wall-clock time
created_at TIMESTAMPTZ – row insertion time
"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa

revision: str = "006"
down_revision: Union[str, None] = "005"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
op.create_table(
"onchain_kpi_snapshots",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("period_date", sa.String(length=10), nullable=False),
sa.Column("tvl_xlm", sa.Float(), nullable=False, server_default="0.0"),
sa.Column("volume_xlm", sa.Float(), nullable=False, server_default="0.0"),
sa.Column("active_rounds", sa.Integer(), nullable=False, server_default="0"),
sa.Column("contribution_count", sa.Integer(), nullable=False, server_default="0"),
sa.Column("extra_data", sa.JSON(), nullable=True),
sa.Column("captured_at", sa.DateTime(timezone=True), nullable=False),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.func.now(),
nullable=False,
),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("period_date", name="uq_onchain_kpi_snapshots_period_date"),
)
op.create_index(
"idx_onchain_kpi_snapshots_period_date",
"onchain_kpi_snapshots",
["period_date"],
)


def downgrade() -> None:
op.drop_index("idx_onchain_kpi_snapshots_period_date", table_name="onchain_kpi_snapshots")
op.drop_table("onchain_kpi_snapshots")
38 changes: 38 additions & 0 deletions apps/data-processing/src/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,44 @@ def __repr__(self):
return f"<AssetTrend(asset={self.asset}, metric={self.metric_name}, trend={self.trend_direction})>"


class OnchainKpiSnapshot(Base):
"""
Daily snapshot of core on-chain KPIs for trend analysis.

Schema
------
period_date DATE (UTC) – calendar day the snapshot covers (unique key)
tvl_xlm FLOAT – total value locked across monitored contracts (XLM)
volume_xlm FLOAT – 24-h payment/trade volume (XLM)
active_rounds INT – quadratic-funding rounds with status 'active'
contribution_count INT – total Contribute events ingested for the period
extra_data JSON – reserved for future KPIs without a schema change
captured_at DATETIME – wall-clock time the snapshot was written
created_at DATETIME – row insertion time (server default)
"""

__tablename__ = "onchain_kpi_snapshots"

id = Column(Integer, primary_key=True, autoincrement=True)
period_date = Column(String(10), nullable=False, unique=True, index=True) # YYYY-MM-DD
tvl_xlm = Column(Float, nullable=False, default=0.0)
volume_xlm = Column(Float, nullable=False, default=0.0)
active_rounds = Column(Integer, nullable=False, default=0)
contribution_count = Column(Integer, nullable=False, default=0)
extra_data = Column(JSON, nullable=True)
captured_at = Column(DateTime(timezone=True), nullable=False)
created_at = Column(
DateTime(timezone=True), server_default=func.now(), nullable=False
)

def __repr__(self):
return (
f"<OnchainKpiSnapshot(period={self.period_date}, "
f"tvl={self.tvl_xlm:.2f}, volume={self.volume_xlm:.2f}, "
f"rounds={self.active_rounds}, contributions={self.contribution_count})>"
)


class RoundAnomalySignal(Base):
"""
Stores anomaly signals detected in quadratic funding rounds for maintainer review.
Expand Down
68 changes: 68 additions & 0 deletions apps/data-processing/src/db/postgres_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
NewsInsight,
AssetTrend,
RoundAnomalySignal,
OnchainKpiSnapshot,
)
from src.analytics.ner_service import NERService
from src.analytics.onchain_entity_linker import (
Expand Down Expand Up @@ -2067,3 +2068,70 @@ def cleanup_old_data(self, days: int = 30) -> Dict[str, int]:
"news_insights": 0,
"asset_trends": 0,
}

# ── On-chain KPI snapshots (#877) ────────────────────────────────────────

def save_onchain_kpi_snapshot(
self,
period_date: str,
tvl_xlm: float,
volume_xlm: float,
active_rounds: int,
contribution_count: int,
extra_data: Optional[Dict[str, Any]] = None,
captured_at: Optional[datetime] = None,
) -> Optional[OnchainKpiSnapshot]:
"""Persist a daily KPI snapshot; skips if one already exists for *period_date*.

Args:
period_date: ISO date string (YYYY-MM-DD) for the snapshot period.
tvl_xlm: Total value locked in XLM.
volume_xlm: 24-h volume in XLM.
active_rounds: Number of active funding rounds.
contribution_count: Total contribution events for the period.
extra_data: Optional dict of additional metrics.
captured_at: Snapshot capture timestamp (defaults to utcnow).

Returns:
The existing or newly created OnchainKpiSnapshot, or None on error.
"""
try:
with self.get_session() as session:
existing = session.query(OnchainKpiSnapshot).filter_by(
period_date=period_date
).first()
if existing:
logger.info("KPI snapshot already exists for %s – skipping", period_date)
return existing
snapshot = OnchainKpiSnapshot(
period_date=period_date,
tvl_xlm=tvl_xlm,
volume_xlm=volume_xlm,
active_rounds=active_rounds,
contribution_count=contribution_count,
extra_data=extra_data,
captured_at=captured_at or datetime.utcnow(),
)
session.add(snapshot)
session.flush()
logger.info("Saved KPI snapshot for %s", period_date)
return snapshot
except SQLAlchemyError as e:
logger.error("Failed to save KPI snapshot for %s: %s", period_date, e)
return None

def get_onchain_kpi_snapshots(
self, limit: int = 90
) -> List[OnchainKpiSnapshot]:
"""Return the most recent *limit* daily KPI snapshots, newest first."""
try:
with self.get_session() as session:
return (
session.query(OnchainKpiSnapshot)
.order_by(desc(OnchainKpiSnapshot.period_date))
.limit(limit)
.all()
)
except SQLAlchemyError as e:
logger.error("Failed to query KPI snapshots: %s", e)
return []
18 changes: 18 additions & 0 deletions apps/data-processing/src/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from src.db.postgres_service import PostgresService
from src.ingestion.rpc_benchmark import RPCProviderBenchmark
from src.round_analyzer import _round_analyzer_job
from src.snapshots.onchain_kpi_snapshot import run_onchain_kpi_snapshot_job


logger = setup_logger(__name__)
Expand Down Expand Up @@ -249,6 +250,14 @@ def _rpc_provider_benchmark_job() -> None:
except Exception as exc:
logger.error("RPC provider benchmark job failed: %s", exc, exc_info=True)

def _onchain_kpi_snapshot_job() -> None:
"""Scheduled wrapper for daily on-chain KPI snapshots (#877)."""
try:
run_onchain_kpi_snapshot_job()
except Exception as exc:
logger.error("On-chain KPI snapshot job failed: %s", exc, exc_info=True)


def _contributor_reputation_snapshot_job() -> None:
"""Scheduled wrapper for building contributor reputation snapshots.

Expand Down Expand Up @@ -358,6 +367,15 @@ def start(self):
replace_existing=True,
)

# ── On-chain KPI Snapshot: daily at 00:05 UTC (#877) ─────────────
self.scheduler.add_job(
func=_onchain_kpi_snapshot_job,
trigger=CronTrigger(hour=0, minute=5, timezone="UTC"),
id="onchain_kpi_snapshot_daily",
name="On-chain KPI Daily Snapshot",
replace_existing=True,
)

self.scheduler.start()
logger.info("✓ Analytics scheduler started")
logger.info(f" - Job: {market_job.name} | Next: {market_job.next_run_time}")
Expand Down
Empty file.
131 changes: 131 additions & 0 deletions apps/data-processing/src/snapshots/onchain_kpi_snapshot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
"""
Daily on-chain KPI snapshot – fetcher and scheduler job (#877).

Captures TVL, 24-h volume, active rounds, and contribution counts once per
UTC calendar day and persists them via PostgresService. Duplicate snapshots
for the same period_date are silently skipped.
"""

from __future__ import annotations

import os
from datetime import datetime, timezone
from typing import Any, Dict

from src.utils.logger import setup_logger
from src.db.postgres_service import PostgresService

logger = setup_logger(__name__)


def _fetch_kpis(period_date: str) -> Dict[str, Any]:
"""Fetch on-chain KPIs for *period_date* (YYYY-MM-DD).

Falls back to 0.0 / 0 for any metric that cannot be retrieved so the
snapshot is always written (allows partial data rather than no data).
"""
tvl_xlm = 0.0
volume_xlm = 0.0
active_rounds = 0
contribution_count = 0
extra: Dict[str, Any] = {}

# ── Volume + network stats via Stellar Horizon ───────────────────────────
try:
from src.ingestion.stellar_fetcher import StellarDataFetcher

horizon_url = os.getenv("HORIZON_URL")
network = os.getenv("STELLAR_NETWORK", "public")
fetcher = StellarDataFetcher(horizon_url=horizon_url, network=network)

vol = fetcher.get_asset_volume("XLM", hours=24)
volume_xlm = vol.total_volume if vol else 0.0

stats = fetcher.get_network_stats()
extra["latest_ledger"] = stats.get("latest_ledger", 0)
extra["protocol_version"] = stats.get("protocol_version", "")
except Exception as exc:
logger.warning("Could not fetch Stellar volume/stats: %s", exc)

# ── Active rounds + contribution count from contract events ─────────────
try:
service = PostgresService()
with service.get_session() as session:
from src.db.models import ProjectView, ContractEvent
from sqlalchemy import func as sa_func

active_rounds = (
session.query(sa_func.count(ProjectView.id))
.filter(ProjectView.status == "active")
.scalar()
or 0
)

# Contributions recorded for this calendar day
from datetime import date
day = date.fromisoformat(period_date)
day_start = datetime(day.year, day.month, day.day, tzinfo=timezone.utc)
day_end = datetime(day.year, day.month, day.day, 23, 59, 59, tzinfo=timezone.utc)

contribution_count = (
session.query(sa_func.count(ContractEvent.id))
.filter(
ContractEvent.event_type == "Contribute",
ContractEvent.timestamp >= day_start,
ContractEvent.timestamp <= day_end,
)
.scalar()
or 0
)

# TVL: sum of total_contributions across all projects
tvl_xlm = (
session.query(sa_func.coalesce(sa_func.sum(ProjectView.total_contributions), 0.0))
.scalar()
or 0.0
)
except Exception as exc:
logger.warning("Could not fetch round/contribution KPIs: %s", exc)

return {
"tvl_xlm": tvl_xlm,
"volume_xlm": volume_xlm,
"active_rounds": int(active_rounds),
"contribution_count": int(contribution_count),
"extra_data": extra or None,
}


def run_onchain_kpi_snapshot_job() -> None:
"""Fetch today's on-chain KPIs and persist a snapshot.

Intended to be called by the scheduler once per day. Safe to call
multiple times – duplicate snapshots for the same period are skipped.
"""
period_date = datetime.now(tz=timezone.utc).strftime("%Y-%m-%d")
logger.info("Running on-chain KPI snapshot for %s", period_date)

try:
kpis = _fetch_kpis(period_date)
service = PostgresService()
snapshot = service.save_onchain_kpi_snapshot(
period_date=period_date,
tvl_xlm=kpis["tvl_xlm"],
volume_xlm=kpis["volume_xlm"],
active_rounds=kpis["active_rounds"],
contribution_count=kpis["contribution_count"],
extra_data=kpis["extra_data"],
captured_at=datetime.now(tz=timezone.utc),
)
if snapshot:
logger.info(
"KPI snapshot persisted: period=%s tvl=%.2f volume=%.2f "
"rounds=%d contributions=%d",
period_date,
kpis["tvl_xlm"],
kpis["volume_xlm"],
kpis["active_rounds"],
kpis["contribution_count"],
)
except Exception as exc:
logger.error("On-chain KPI snapshot job failed: %s", exc, exc_info=True)
Loading
Loading