diff --git a/django-backend/soroscan/ingest/admin.py b/django-backend/soroscan/ingest/admin.py index 697a4ada..5a15ede0 100644 --- a/django-backend/soroscan/ingest/admin.py +++ b/django-backend/soroscan/ingest/admin.py @@ -36,6 +36,7 @@ ContractVerification, DataDeletionRequest, DataRetentionPolicy, + EventAggregation, EventSchema, IndexerState, IngestError, @@ -47,9 +48,11 @@ PIIField, RemediationIncident, RemediationRule, + SigningKey, Team, TeamMembership, TrackedContract, + TransactionCost, WebhookDeadLetter, WebhookDeliveryLog, WebhookSubscription, @@ -676,18 +679,14 @@ def ping_webhook(self, request, pk): "contract_id": webhook.contract.contract_id, "timestamp": timezone.now().isoformat(), } - payload_bytes = json.dumps(test_payload, sort_keys=True).encode("utf-8") + payload_str = json.dumps(test_payload, sort_keys=True) + payload_bytes = payload_str.encode("utf-8") algorithm = (webhook.signature_algorithm or WebhookSubscription.SIGNATURE_SHA256).lower() - digestmod = hashlib.sha1 if algorithm == WebhookSubscription.SIGNATURE_SHA1 else hashlib.sha256 - prefix = "sha1" if algorithm == WebhookSubscription.SIGNATURE_SHA1 else "sha256" - sig_hex = hmac.new( - webhook.secret.encode("utf-8"), - msg=payload_bytes, - digestmod=digestmod, - ).hexdigest() + from .webhook_signing import sign_webhook_payload + headers = { "Content-Type": "application/json", - "X-SoroScan-Signature": f"{prefix}={sig_hex}", + "X-SoroScan-Signature": sign_webhook_payload(payload_str, webhook.secret, algorithm=algorithm), "X-SoroScan-Timestamp": timezone.now().isoformat(), } try: @@ -1418,3 +1417,245 @@ def test_dedup_view(self, request, contract_id): json.dumps({"dedup_hash": dedup_hash, "material": material}), content_type="application/json", ) + + +# --------------------------------------------------------------------------- +# Transaction Cost Analysis (Issue #804) +# --------------------------------------------------------------------------- + + +@admin.register(TransactionCost) +class TransactionCostAdmin(admin.ModelAdmin): + list_display = [ + "tx_hash_short", + "contract", + "function_name", + "total_fee_stroops", + "is_outlier", + "ledger_sequence", + "created_at", + ] + list_filter = ["is_outlier", "function_name", "created_at"] + search_fields = [ + "tx_hash", + "contract__contract_id", + "contract__name", + "function_name", + ] + readonly_fields = [ + "tx_hash", + "contract", + "function_name", + "ledger_sequence", + "total_fee_stroops", + "cpu_instructions_used", + "memory_bytes_used", + "network_bytes_used", + "is_outlier", + "created_at", + ] + + def tx_hash_short(self, obj): + return obj.tx_hash[:16] + "..." + tx_hash_short.short_description = "TX Hash" + tx_hash_short.admin_order_field = "tx_hash" + + def has_add_permission(self, request): + return False + + def has_change_permission(self, request, obj=None): + return False + + def has_delete_permission(self, request, obj=None): + return False + + +# --------------------------------------------------------------------------- +# Event Aggregation Analytics (Issue #801) +# --------------------------------------------------------------------------- + + +@admin.register(EventAggregation) +class EventAggregationAdmin(admin.ModelAdmin): + list_display = [ + "contract", + "event_type", + "time_bucket", + "event_count", + "created_at", + ] + list_filter = ["event_type", "time_bucket"] + search_fields = [ + "contract__contract_id", + "contract__name", + "event_type", + ] + readonly_fields = [ + "contract", + "event_type", + "time_bucket", + "event_count", + "created_at", + ] + date_hierarchy = "time_bucket" + + def has_add_permission(self, request): + return False + + def has_change_permission(self, request, obj=None): + return False + + def has_delete_permission(self, request, obj=None): + return False + + +# --------------------------------------------------------------------------- +# Analytics Dashboard View +# --------------------------------------------------------------------------- + + +from django.template.response import TemplateResponse +from django.db.models import Count as _Count, Sum as _Sum +from django.urls import path as _path +from django.utils import timezone as _tz + + +def analytics_dashboard_view(request): + """ + Admin view for the analytics dashboard. + Displays summary widgets for event volume, active contracts, and event types. + """ + now = _tz.now() + day_ago = now - timedelta(days=1) + week_ago = now - timedelta(days=7) + month_ago = now - timedelta(days=30) + + total_events_24h = ( + EventAggregation.objects.filter(time_bucket__gte=day_ago) + .aggregate(total=_Sum("event_count"))["total"] or 0 + ) + total_events_7d = ( + EventAggregation.objects.filter(time_bucket__gte=week_ago) + .aggregate(total=_Sum("event_count"))["total"] or 0 + ) + total_events_30d = ( + EventAggregation.objects.filter(time_bucket__gte=month_ago) + .aggregate(total=_Sum("event_count"))["total"] or 0 + ) + + active_contracts_24h = ( + EventAggregation.objects.filter(time_bucket__gte=day_ago) + .values("contract_id") + .distinct() + .count() + ) + active_contracts_7d = ( + EventAggregation.objects.filter(time_bucket__gte=week_ago) + .values("contract_id") + .distinct() + .count() + ) + + daily_volume = list( + EventAggregation.objects.filter(time_bucket__gte=month_ago) + .values("time_bucket") + .annotate(count=_Sum("event_count")) + .order_by("time_bucket")[:30] + ) + + event_type_breakdown = list( + EventAggregation.objects.filter(time_bucket__gte=week_ago) + .values("event_type") + .annotate(count=_Sum("event_count")) + .order_by("-count")[:10] + ) + + top_contracts = list( + EventAggregation.objects.filter(time_bucket__gte=week_ago) + .values("contract__contract_id", "contract__name") + .annotate(count=_Sum("event_count")) + .order_by("-count")[:10] + ) + + extra_context = ( + request.contextual_help_context() + if hasattr(request, "contextual_help_context") + else {} + ) + context = { + **extra_context, + "title": "Analytics Dashboard", + "total_events_24h": total_events_24h, + "total_events_7d": total_events_7d, + "total_events_30d": total_events_30d, + "active_contracts_24h": active_contracts_24h, + "active_contracts_7d": active_contracts_7d, + "daily_volume": [ + { + "date": r["time_bucket"].strftime("%Y-%m-%d") if hasattr(r["time_bucket"], "strftime") else str(r["time_bucket"]), + "count": r["count"], + } + for r in daily_volume + ], + "event_type_breakdown": [ + {"event_type": r["event_type"], "count": r["count"]} + for r in event_type_breakdown + ], + "top_contracts": [ + { + "contract_id": r["contract__contract_id"], + "name": r["contract__name"], + "count": r["count"], + } + for r in top_contracts + ], + } + + return TemplateResponse(request, "admin/analytics_dashboard.html", context) + + +@admin.register(SigningKey) +class SigningKeyAdmin(admin.ModelAdmin): + list_display = ["subscription", "label", "is_active", "expires_at", "created_at"] + list_filter = ["is_active", "subscription"] + search_fields = ["label", "key"] + readonly_fields = ["key", "created_at"] + raw_id_fields = ["subscription"] + + fieldsets = [ + (None, {"fields": ["subscription", "label"]}), + ("Key Material", {"fields": ["key", "is_active", "expires_at", "created_at"]}), + ] + + def save_model(self, request, obj, form, change): + from .webhook_signing import generate_signing_key + + if not obj.key: + obj.key = generate_signing_key() + super().save_model(request, obj, form, change) + + def get_queryset(self, request): + return super().get_queryset(request).select_related("subscription") + + +# Register analytics dashboard URL with the admin site +from django.contrib import admin as _admin_site + +_original_get_urls = _admin_site.site.get_urls + + +def _extended_get_urls(): + from django.urls import path as _url_path + urls = _original_get_urls() + urls.insert( + 0, + _url_path( + "analytics-dashboard/", + _admin_site.site.admin_view(analytics_dashboard_view), + name="analytics-dashboard", + ), + ) + return urls + + +_admin_site.site.get_urls = _extended_get_urls diff --git a/django-backend/soroscan/ingest/migrations/0042_apiusagelog.py b/django-backend/soroscan/ingest/migrations/0042_apiusagelog.py new file mode 100644 index 00000000..e598ab5f --- /dev/null +++ b/django-backend/soroscan/ingest/migrations/0042_apiusagelog.py @@ -0,0 +1,73 @@ +from django.conf import settings +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + dependencies = [ + ("ingest", "0041_eventdeduplicationconfig"), + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ] + + operations = [ + migrations.CreateModel( + name="APIUsageLog", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("method", models.CharField(max_length=12)), + ("endpoint", models.CharField(db_index=True, max_length=255)), + ("path", models.CharField(max_length=512)), + ("status_code", models.PositiveSmallIntegerField(db_index=True)), + ("request_bytes", models.PositiveIntegerField(default=0)), + ("response_bytes", models.PositiveIntegerField(default=0)), + ("error_type", models.CharField(blank=True, db_index=True, max_length=64)), + ("timestamp", models.DateTimeField(auto_now_add=True, db_index=True)), + ( + "api_key", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="usage_logs", + to="ingest.apikey", + ), + ), + ( + "organization", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="api_usage_logs", + to="ingest.organization", + ), + ), + ( + "user", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="api_usage_logs", + to=settings.AUTH_USER_MODEL, + ), + ), + ], + options={ + "ordering": ["-timestamp"], + "indexes": [ + models.Index(fields=["organization", "timestamp"], name="ingest_apiu_organiz_f4df26_idx"), + models.Index(fields=["organization", "endpoint", "timestamp"], name="ingest_apiu_organiz_7f7db3_idx"), + models.Index(fields=["organization", "error_type", "timestamp"], name="ingest_apiu_organiz_76c671_idx"), + ], + }, + ), + ] diff --git a/django-backend/soroscan/ingest/migrations/0043_transactioncost.py b/django-backend/soroscan/ingest/migrations/0043_transactioncost.py new file mode 100644 index 00000000..79c6014a --- /dev/null +++ b/django-backend/soroscan/ingest/migrations/0043_transactioncost.py @@ -0,0 +1,65 @@ +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + dependencies = [ + ("ingest", "0042_apiusagelog"), + ] + + operations = [ + migrations.CreateModel( + name="TransactionCost", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("tx_hash", models.CharField(max_length=64, unique=True)), + ( + "function_name", + models.CharField(blank=True, db_index=True, max_length=128), + ), + ("ledger_sequence", models.PositiveIntegerField()), + ("total_fee_stroops", models.BigIntegerField()), + ("cpu_instructions_used", models.BigIntegerField(default=0)), + ("memory_bytes_used", models.BigIntegerField(default=0)), + ("network_bytes_used", models.BigIntegerField(default=0)), + ( + "is_outlier", + models.BooleanField(db_index=True, default=False), + ), + ("created_at", models.DateTimeField(auto_now_add=True)), + ( + "contract", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="transaction_costs", + to="ingest.trackedcontract", + ), + ), + ], + options={ + "ordering": ["-created_at"], + "indexes": [ + models.Index( + fields=["contract", "-created_at"], + name="ingest_tran_contrac_6a6677_idx", + ), + models.Index( + fields=["function_name"], + name="ingest_tran_functio_3be0ea_idx", + ), + models.Index( + fields=["contract", "function_name", "created_at"], + name="ingest_tran_contrac_0f10c6_idx", + ), + ], + }, + ), + ] diff --git a/django-backend/soroscan/ingest/migrations/0044_eventaggregation.py b/django-backend/soroscan/ingest/migrations/0044_eventaggregation.py new file mode 100644 index 00000000..c8493cdd --- /dev/null +++ b/django-backend/soroscan/ingest/migrations/0044_eventaggregation.py @@ -0,0 +1,57 @@ +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + dependencies = [ + ("ingest", "0043_transactioncost"), + ] + + operations = [ + migrations.CreateModel( + name="EventAggregation", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ( + "event_type", + models.CharField(db_index=True, max_length=128), + ), + ( + "time_bucket", + models.DateTimeField(db_index=True), + ), + ("event_count", models.PositiveIntegerField(default=0)), + ("created_at", models.DateTimeField(auto_now_add=True)), + ( + "contract", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="event_aggregations", + to="ingest.trackedcontract", + ), + ), + ], + options={ + "ordering": ["-time_bucket"], + "indexes": [ + models.Index( + fields=["contract", "time_bucket"], + name="ingest_even_contrac_6c3351_idx", + ), + models.Index( + fields=["event_type", "time_bucket"], + name="ingest_even_event_t_6ca77e_idx", + ), + ], + "unique_together": {("contract", "event_type", "time_bucket")}, + }, + ), + ] diff --git a/django-backend/soroscan/ingest/migrations/0045_signingkey.py b/django-backend/soroscan/ingest/migrations/0045_signingkey.py new file mode 100644 index 00000000..d9628e13 --- /dev/null +++ b/django-backend/soroscan/ingest/migrations/0045_signingkey.py @@ -0,0 +1,61 @@ +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + dependencies = [ + ("ingest", "0044_eventaggregation"), + ] + + operations = [ + migrations.CreateModel( + name="SigningKey", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("key", models.CharField(max_length=255, unique=True)), + ( + "label", + models.CharField( + blank=True, + help_text="Human-readable label for this key", + max_length=128, + ), + ), + ( + "is_active", + models.BooleanField(db_index=True, default=True), + ), + ("expires_at", models.DateTimeField()), + ("created_at", models.DateTimeField(auto_now_add=True)), + ( + "subscription", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="signing_keys", + to="ingest.webhooksubscription", + ), + ), + ], + options={ + "ordering": ["-created_at"], + "indexes": [ + models.Index( + fields=["subscription", "is_active"], + name="ingest_sign_subscri_875e25_idx", + ), + models.Index( + fields=["expires_at"], + name="ingest_sign_expires_4ff03a_idx", + ), + ], + }, + ), + ] diff --git a/django-backend/soroscan/ingest/models.py b/django-backend/soroscan/ingest/models.py index d6e34083..581e0aee 100644 --- a/django-backend/soroscan/ingest/models.py +++ b/django-backend/soroscan/ingest/models.py @@ -176,6 +176,51 @@ def __str__(self): ) +class APIUsageLog(models.Model): + """Per-request API usage facts used for organization analytics.""" + + organization = models.ForeignKey( + Organization, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="api_usage_logs", + ) + user = models.ForeignKey( + User, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="api_usage_logs", + ) + api_key = models.ForeignKey( + "APIKey", + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="usage_logs", + ) + method = models.CharField(max_length=12) + endpoint = models.CharField(max_length=255, db_index=True) + path = models.CharField(max_length=512) + status_code = models.PositiveSmallIntegerField(db_index=True) + request_bytes = models.PositiveIntegerField(default=0) + response_bytes = models.PositiveIntegerField(default=0) + error_type = models.CharField(max_length=64, blank=True, db_index=True) + timestamp = models.DateTimeField(auto_now_add=True, db_index=True) + + class Meta: + ordering = ["-timestamp"] + indexes = [ + models.Index(fields=["organization", "timestamp"]), + models.Index(fields=["organization", "endpoint", "timestamp"]), + models.Index(fields=["organization", "error_type", "timestamp"]), + ] + + def __str__(self): + return f"{self.method} {self.endpoint} -> {self.status_code}" + + class Team(models.Model): """ Multi-tenant organization: groups users and shared tracked contracts. @@ -2190,46 +2235,95 @@ def __str__(self): return f"ABI v{self.version_number} for {self.contract.contract_id[:8]}... (ledger {self.valid_from_ledger}–{self.valid_to_ledger or '∞'})" -class BlacklistedContract(models.Model): +class TransactionCost(models.Model): + """ + Tracks Soroban transaction resource costs per contract interaction. + Enables cost analytics, outlier detection, and optimization suggestions. """ - Contracts whose events must not be indexed. - Any contract_id present in this table is silently skipped by the - ingestion loop, regardless of whether it also exists in - TrackedContract. A log entry is written each time a skip occurs - so operators can audit the decision. + tx_hash = models.CharField(max_length=64, unique=True) + contract = models.ForeignKey( + TrackedContract, + on_delete=models.CASCADE, + related_name="transaction_costs", + ) + function_name = models.CharField(max_length=128, blank=True, db_index=True) + ledger_sequence = models.PositiveIntegerField() + total_fee_stroops = models.BigIntegerField() + cpu_instructions_used = models.BigIntegerField(default=0) + memory_bytes_used = models.BigIntegerField(default=0) + network_bytes_used = models.BigIntegerField(default=0) + is_outlier = models.BooleanField(default=False, db_index=True) + created_at = models.DateTimeField(auto_now_add=True) + + class Meta: + ordering = ["-created_at"] + indexes = [ + models.Index(fields=["contract", "-created_at"]), + models.Index(fields=["function_name"]), + models.Index(fields=["contract", "function_name", "created_at"]), + ] + + def __str__(self): + return f"${self.total_fee_stroops} stroops | {self.function_name} @ ledger {self.ledger_sequence}" + + +class EventAggregation(models.Model): + """ + Pre-computed event counters aggregated by contract, event type, and time bucket. + Updated hourly by aggregate_event_statistics for fast dashboard queries. """ - contract_id = models.CharField( - max_length=56, - unique=True, - db_index=True, - validators=[ - RegexValidator( - regex=r"^C[A-Z2-7]{55}$", - message="Contract address must start with 'C' and be exactly 56 characters using valid Base32 characters (A-Z, 2-7).", - ) - ], - help_text="Stellar contract address to block from indexing (C...)", + contract = models.ForeignKey( + TrackedContract, + on_delete=models.CASCADE, + related_name="event_aggregations", ) - reason = models.TextField( - blank=True, - help_text="Human-readable explanation of why this contract is blacklisted", + event_type = models.CharField(max_length=128, db_index=True) + time_bucket = models.DateTimeField(db_index=True) + event_count = models.PositiveIntegerField(default=0) + created_at = models.DateTimeField(auto_now_add=True) + + class Meta: + ordering = ["-time_bucket"] + unique_together = [("contract", "event_type", "time_bucket")] + indexes = [ + models.Index(fields=["contract", "time_bucket"]), + models.Index(fields=["event_type", "time_bucket"]), + ] + + def __str__(self): + return f"{self.event_type}: {self.event_count} @ {self.time_bucket.isoformat()}" + + +class SigningKey(models.Model): + """ + Rotatable HMAC signing keys for webhook request authentication. + + Each webhook subscription can have multiple keys for zero-downtime + rotation. Expired keys are retained for a configurable grace period + (default 7 days) to allow subscribers to transition. + """ + + subscription = models.ForeignKey( + WebhookSubscription, + on_delete=models.CASCADE, + related_name="signing_keys", ) - added_by = models.ForeignKey( - User, - on_delete=models.SET_NULL, - null=True, - blank=True, - related_name="blacklisted_contracts", - help_text="User who added this entry", + key = models.CharField(max_length=255, unique=True) + label = models.CharField( + max_length=128, blank=True, help_text="Human-readable label for this key" ) - created_at = models.DateTimeField(auto_now_add=True, db_index=True) + is_active = models.BooleanField(default=True, db_index=True) + expires_at = models.DateTimeField() + created_at = models.DateTimeField(auto_now_add=True) class Meta: - verbose_name = "Blacklisted Contract" - verbose_name_plural = "Blacklisted Contracts" ordering = ["-created_at"] + indexes = [ + models.Index(fields=["subscription", "is_active"]), + models.Index(fields=["expires_at"]), + ] def __str__(self): - return f"Blacklisted({self.contract_id[:8]}...)" + return f"SigningKey for {self.subscription} ({'active' if self.is_active else 'expired'})" diff --git a/django-backend/soroscan/ingest/serializers.py b/django-backend/soroscan/ingest/serializers.py index c98b16f9..7a411933 100644 --- a/django-backend/soroscan/ingest/serializers.py +++ b/django-backend/soroscan/ingest/serializers.py @@ -14,6 +14,7 @@ ContractInvocation, ContractSource, ContractVerification, + EventAggregation, Organization, OrganizationBudget, OrganizationCostSnapshot, @@ -21,6 +22,7 @@ Team, TeamMembership, TrackedContract, + TransactionCost, WebhookSubscription, ) @@ -551,3 +553,71 @@ class Meta: "error_message", ] read_only_fields = ["id", "verified_at"] + + +class TransactionCostSerializer(serializers.ModelSerializer): + contract_id = serializers.CharField(source="contract.contract_id", read_only=True) + + class Meta: + model = TransactionCost + fields = [ + "id", + "tx_hash", + "contract_id", + "function_name", + "ledger_sequence", + "total_fee_stroops", + "cpu_instructions_used", + "memory_bytes_used", + "network_bytes_used", + "is_outlier", + "created_at", + ] + read_only_fields = fields + + +class CostAnalyticsQuerySerializer(serializers.Serializer): + contract_id = serializers.CharField(required=True) + groupby = serializers.ChoiceField( + choices=["function", "day"], default="function" + ) + range = serializers.ChoiceField( + choices=["7d", "30d", "90d"], default="7d" + ) + + +class EventAggregationSerializer(serializers.ModelSerializer): + contract_id = serializers.CharField(source="contract.contract_id", read_only=True) + + class Meta: + model = EventAggregation + fields = [ + "id", + "contract_id", + "event_type", + "time_bucket", + "event_count", + "created_at", + ] + read_only_fields = fields + + +class AnalyticsQuerySerializer(serializers.Serializer): + metric = serializers.ChoiceField( + choices=["event_volume", "active_contracts", "event_type_breakdown"], + default="event_volume", + ) + granularity = serializers.ChoiceField( + choices=["hourly", "daily", "weekly", "monthly"], + default="daily", + ) + range = serializers.ChoiceField( + choices=["7d", "30d", "90d", "1y"], + default="30d", + ) + contract_id = serializers.CharField(required=False, allow_blank=True) + export = serializers.ChoiceField( + choices=["csv", "json"], + required=False, + allow_blank=True, + ) diff --git a/django-backend/soroscan/ingest/stellar_client.py b/django-backend/soroscan/ingest/stellar_client.py index 2c5bc247..271476ff 100644 --- a/django-backend/soroscan/ingest/stellar_client.py +++ b/django-backend/soroscan/ingest/stellar_client.py @@ -36,6 +36,16 @@ class TransactionResult: result_xdr: Optional[str] = None +@dataclass +class CostData: + """Parsed cost information from Soroban transaction metadata.""" + + cpu_instructions: int + memory_bytes: int + network_bytes: int + total_fee_stroops: int + + @dataclass class InvocationData: """Parsed invocation metadata from transaction response.""" @@ -484,3 +494,111 @@ def get_invocation(self, tx_hash: str) -> InvocationData: error=str(e), ) + def extract_transaction_costs(self, tx_response) -> CostData: + """ + Extract resource fee information from a Soroban transaction response. + + Parses the sorobanMeta from the transaction result metadata to + extract CPU instructions, memory bytes, and network bytes used. + Falls back to the transaction fee field when meta parsing fails. + """ + try: + if isinstance(tx_response, dict): + soroban_meta = ( + tx_response.get("result", {}) + .get("result", {}) + .get("txMeta", {}) + .get("v3", {}) + .get("sorobanMeta", {}) + ) + if soroban_meta: + resources = soroban_meta.get("resources", {}) + return CostData( + cpu_instructions=int(resources.get("cpuInstructions", 0)), + memory_bytes=int(resources.get("memBytes", 0)), + network_bytes=int(resources.get("netBytes", 0)), + total_fee_stroops=int( + tx_response.get("tx", {}) + .get("fee", {}) + .get("amount", 0) + ), + ) + else: + soroban_meta = getattr(tx_response, "sorobanMeta", None) + if soroban_meta: + resources = getattr(soroban_meta, "resources", {}) + return CostData( + cpu_instructions=int(getattr(resources, "cpuInstructions", 0)), + memory_bytes=int(getattr(resources, "memBytes", 0)), + network_bytes=int(getattr(resources, "netBytes", 0)), + total_fee_stroops=int(getattr(tx_response, "fee", 0)), + ) + except (AttributeError, KeyError, TypeError, ValueError): + logger.debug("Could not parse sorobanMeta from tx_response", exc_info=True) + + try: + if isinstance(tx_response, dict): + fee_meta = tx_response.get("fee_charged") or tx_response.get("fee") + else: + fee_meta = getattr(tx_response, "fee_charged", None) or getattr( + tx_response, "fee", None + ) + if fee_meta is not None: + return CostData( + cpu_instructions=0, + memory_bytes=0, + network_bytes=0, + total_fee_stroops=int(fee_meta), + ) + except (AttributeError, TypeError, ValueError): + pass + + return CostData( + cpu_instructions=0, + memory_bytes=0, + network_bytes=0, + total_fee_stroops=0, + ) + + def get_transaction_cost(self, tx_hash: str) -> CostData: + """ + Fetch and parse cost data for a specific transaction. + + Uses the same caching layer as get_invocation to avoid + redundant RPC calls. + + Args: + tx_hash: Transaction hash to fetch + + Returns: + CostData with resource usage metrics + """ + try: + self._rate_limiter.acquire() + tx_response = self.server.get_transaction(tx_hash) + + if not tx_response or getattr(tx_response, "status", None) == "NOT_FOUND": + return CostData( + cpu_instructions=0, + memory_bytes=0, + network_bytes=0, + total_fee_stroops=0, + ) + + if hasattr(tx_response, "to_dict"): + raw = tx_response.to_dict() + else: + raw = tx_response + + return self.extract_transaction_costs(raw) + except Exception as e: + logger.exception( + "Failed to fetch transaction cost for tx_hash=%s", tx_hash + ) + return CostData( + cpu_instructions=0, + memory_bytes=0, + network_bytes=0, + total_fee_stroops=0, + ) + diff --git a/django-backend/soroscan/ingest/tasks.py b/django-backend/soroscan/ingest/tasks.py index 5ff58b81..e208380e 100644 --- a/django-backend/soroscan/ingest/tasks.py +++ b/django-backend/soroscan/ingest/tasks.py @@ -27,7 +27,7 @@ from celery.signals import task_postrun, task_prerun, task_retry from django.conf import settings from django.core.cache import cache -from django.db.models import Count, F, Max, Min +from django.db.models import Avg, Count, F, Max, Min, StdDev, Sum from django.utils import timezone from soroscan.circuit_breaker import execute_with_circuit_breaker @@ -50,9 +50,11 @@ TrackedContract, WebhookSubscription, IndexerState, + EventAggregation, EventSchema, RemediationRule, RemediationIncident, + SigningKey, AdminAction, ContractInvocation, ContractDependency, @@ -61,6 +63,7 @@ Organization, OrganizationBudget, OrganizationCostSnapshot, + TransactionCost, WebhookDeadLetter, ) from stellar_sdk import SorobanServer @@ -424,19 +427,24 @@ def _build_webhook_signature_header( algorithm = ( webhook.signature_algorithm or WebhookSubscription.SIGNATURE_SHA256 ).lower() - if algorithm == WebhookSubscription.SIGNATURE_SHA1: - digestmod = hashlib.sha1 - prefix = "sha1" - else: - digestmod = hashlib.sha256 - prefix = "sha256" - sig_hex = hmac.new( - webhook.secret.encode("utf-8"), - msg=payload_bytes, - digestmod=digestmod, - ).hexdigest() - return f"{prefix}={sig_hex}" + # Try to use the active SigningKey first; fall back to webhook.secret + signing_key = ( + SigningKey.objects.filter( + subscription=webhook, is_active=True, expires_at__gt=timezone.now() + ) + .order_by("-created_at") + .first() + ) + key_material = signing_key.key if signing_key else webhook.secret + + from .webhook_signing import sign_webhook_payload + + return sign_webhook_payload( + payload_bytes.decode("utf-8"), + key_material, + algorithm=algorithm, + ) def validate_contract_payload_schema( @@ -2170,6 +2178,31 @@ def ingest_latest_events() -> int: exc_info=True, ) + # --- TransactionCost tracking (Issue #804) --- + try: + cost_data = client.get_transaction_cost(event.tx_hash) + if cost_data.total_fee_stroops > 0: + TransactionCost.objects.get_or_create( + tx_hash=event.tx_hash, + defaults={ + "contract": contract, + "function_name": invocation_data.function_name + if invocation_data and invocation_data.success + else "", + "ledger_sequence": event.ledger, + "total_fee_stroops": cost_data.total_fee_stroops, + "cpu_instructions_used": cost_data.cpu_instructions, + "memory_bytes_used": cost_data.memory_bytes, + "network_bytes_used": cost_data.network_bytes, + }, + ) + except Exception: + logger.warning( + "Failed to record transaction cost for tx=%s", + event.tx_hash, + exc_info=True, + ) + event_record, created = ContractEvent.objects.get_or_create( tx_hash=event.tx_hash, ledger=event.ledger, @@ -2283,20 +2316,56 @@ def ingest_latest_events() -> int: @shared_task(name="ingest.tasks.aggregate_event_statistics") def aggregate_event_statistics() -> dict[str, Any]: """ - Perform analytics aggregation on ingested events (Low Priority). + Aggregate events into pre-computed buckets by contract, event type, and hour. + + Runs hourly to populate EventAggregation for fast dashboard queries. + Supports daily, weekly, monthly rollups derived from hourly buckets. """ _start = time.monotonic() m = _get_metrics() + now = timezone.now() + hour_ago = now - timedelta(hours=1) + bucket = hour_ago.replace(minute=0, second=0, microsecond=0) + + aggs = ( + ContractEvent.objects.filter( + timestamp__gte=bucket, + timestamp__lt=bucket + timedelta(hours=1), + ) + .values("contract_id", "event_type") + .annotate(count=Count("id")) + ) + + created = 0 + updated = 0 + for agg in aggs: + obj, was_created = EventAggregation.objects.update_or_create( + contract_id=agg["contract_id"], + event_type=agg["event_type"], + time_bucket=bucket, + defaults={"event_count": agg["count"]}, + ) + if was_created: + created += 1 + else: + updated += 1 - # Placeholder for actual aggregation logic total_events = ContractEvent.objects.count() active_contracts = TrackedContract.objects.filter(is_active=True).count() logger.info( - "Aggregated statistics: %d events across %d contracts", + "Aggregated %d event buckets (%d created, %d updated): %d total events, %d active contracts", + len(aggs), + created, + updated, total_events, active_contracts, - extra={"total_events": total_events, "active_contracts": active_contracts}, + extra={ + "buckets_created": created, + "buckets_updated": updated, + "total_events": total_events, + "active_contracts": active_contracts, + }, ) m.task_duration_seconds.labels(task_name="aggregate_event_statistics").observe( @@ -2304,9 +2373,83 @@ def aggregate_event_statistics() -> dict[str, Any]: ) return { + "buckets_created": created, + "buckets_updated": updated, "total_events": total_events, "active_contracts": active_contracts, - "timestamp": timezone.now().isoformat(), + "timestamp": now.isoformat(), + } + + +@shared_task(name="ingest.tasks.detect_event_anomalies") +def detect_event_anomalies() -> dict[str, Any]: + """ + Detect anomalous drops in event volume. + + Compares the last full hour of event counts against the previous + same-day-of-week hourly average. Flags contracts where volume + drops below the configurable ANOMALY_DROP_THRESHOLD_PCT threshold. + """ + from django.conf import settings + + _start = time.monotonic() + now = timezone.now() + bucket = (now - timedelta(hours=1)).replace(minute=0, second=0, microsecond=0) + prev_hour = bucket - timedelta(hours=1) + threshold_pct = getattr(settings, "ANOMALY_DROP_THRESHOLD_PCT", 50) + + anomalies = [] + recent_rows = list( + EventAggregation.objects.filter(time_bucket=bucket) + .values("contract_id", "event_type") + .annotate(count=Sum("event_count")) + .values("contract_id", "event_type", "count") + ) + recent_counts = {(r["contract_id"], r["event_type"]): r["count"] for r in recent_rows} + + prev_rows = list( + EventAggregation.objects.filter(time_bucket=prev_hour) + .values("contract_id", "event_type") + .annotate(count=Sum("event_count")) + .values("contract_id", "event_type", "count") + ) + prev_counts = {(r["contract_id"], r["event_type"]): r["count"] for r in prev_rows} + + for (cid, etype), count in recent_counts.items(): + prev = prev_counts.get((cid, etype), 0) + if prev > 0: + drop_pct = (1 - count / prev) * 100 + if drop_pct > threshold_pct: + anomalies.append({ + "contract_id": cid, + "event_type": etype, + "current_count": count, + "previous_count": prev, + "drop_pct": round(drop_pct, 1), + }) + logger.warning( + "Anomaly detected: contract=%s event_type=%s dropped %.1f%%", + cid, etype, drop_pct, + extra={ + "contract_id": cid, + "event_type": etype, + "drop_pct": drop_pct, + "current_count": count, + "previous_count": prev, + }, + ) + + elapsed = time.monotonic() - _start + logger.info( + "Anomaly detection completed in %.2fs: %d anomalies found", + elapsed, + len(anomalies), + ) + + return { + "anomalies_found": len(anomalies), + "anomalies": anomalies, + "duration_seconds": round(elapsed, 3), } @@ -2364,6 +2507,41 @@ def warm_event_count_cache() -> dict[str, Any]: } +@shared_task(name="ingest.tasks.rotate_expired_signing_keys") +def rotate_expired_signing_keys() -> dict[str, Any]: + """ + Deactivate expired signing keys past the retention window. + + Keys whose ``expires_at`` is older than ``SIGNING_KEY_RETENTION_DAYS`` + are hard-deleted. Keys that have expired but are still within the + grace period are deactivated (``is_active = False``) so subscribers + have a window to update their secrets. + """ + now = timezone.now() + retention_cutoff = now - timedelta(days=settings.SIGNING_KEY_RETENTION_DAYS) + + # 1) Deactivate keys where expires_at is between now and retention_cutoff + expired_keys = SigningKey.objects.filter( + expires_at__lte=now, expires_at__gt=retention_cutoff, is_active=True + ) + deactivated_count = expired_keys.update(is_active=False) + + # 2) Hard-delete keys past the retention window + stale_keys = SigningKey.objects.filter(expires_at__lte=retention_cutoff) + deleted_count, _ = stale_keys.delete() + + logger.info( + "Signing key rotation: %d deactivated, %d deleted", + deactivated_count, + deleted_count, + ) + return { + "deactivated": deactivated_count, + "deleted": deleted_count, + "cutoff": retention_cutoff.isoformat(), + } + + @shared_task(name="ingest.tasks.log_daily_platform_stats") def log_daily_platform_stats() -> dict[str, Any]: """ @@ -3896,3 +4074,98 @@ def detect_contract_upgrades() -> dict[str, Any]: ).update(valid_to_ledger=ledger - 1) return summary + + +# --------------------------------------------------------------------------- +# Transaction Cost Analysis (Issue #804) +# --------------------------------------------------------------------------- + + +@shared_task(name="ingest.tasks.analyze_transaction_costs") +def analyze_transaction_costs() -> dict[str, Any]: + """ + Aggregate transaction costs by contract, function, and time period. + + Runs hourly to: + - Compute per-function cost statistics (avg, min, max, total) + - Flag outlier transactions (>2 stddev from mean) + - Generate trend data for the cost dashboard + + Returns: + dict with summary of aggregations performed + """ + _start = time.monotonic() + now = timezone.now() + seven_days_ago = now - timedelta(days=7) + thirty_days_ago = now - timedelta(days=30) + stats: dict[str, Any] = { + "transactions_analyzed": 0, + "outliers_flagged": 0, + "functions_aggregated": 0, + "contracts_analyzed": 0, + } + + # Analyze costs per contract + contracts = TrackedContract.objects.filter(is_active=True) + stats["contracts_analyzed"] = contracts.count() + + for contract in contracts: + costs_qs = TransactionCost.objects.filter(contract=contract) + + # Flag outliers: transactions with total_fee > 2 stddev from contract mean + agg = costs_qs.aggregate( + mean_fee=Avg("total_fee_stroops"), + stddev_fee=StdDev("total_fee_stroops"), + ) + mean_fee = agg["mean_fee"] or 0 + stddev_fee = agg["stddev_fee"] or 0 + threshold = mean_fee + 2 * stddev_fee + + if threshold > 0: + outliers = costs_qs.filter( + total_fee_stroops__gt=threshold, is_outlier=False + ) + count = outliers.update(is_outlier=True) + stats["outliers_flagged"] += count + + # Compute per-function statistics for the last 7 days + recent_costs = costs_qs.filter(created_at__gte=seven_days_ago) + function_stats = ( + recent_costs.values("function_name") + .annotate( + avg_cost=Avg("total_fee_stroops"), + min_cost=Min("total_fee_stroops"), + max_cost=Max("total_fee_stroops"), + total_cost=Sum("total_fee_stroops"), + call_count=Count("id"), + ) + .order_by("-total_cost") + ) + stats["functions_aggregated"] += len(function_stats) + + # Week-over-week trend comparison + last_week = now - timedelta(days=14) + this_week_costs = TransactionCost.objects.filter( + created_at__gte=seven_days_ago + ).aggregate(total=Sum("total_fee_stroops"))["total"] or 0 + prev_week_costs = TransactionCost.objects.filter( + created_at__gte=last_week, created_at__lt=seven_days_ago + ).aggregate(total=Sum("total_fee_stroops"))["total"] or 0 + + stats["total_cost_7d_stroops"] = this_week_costs + stats["total_cost_prev_7d_stroops"] = prev_week_costs + stats["week_over_week_change_pct"] = ( + ((this_week_costs - prev_week_costs) / prev_week_costs * 100) + if prev_week_costs > 0 + else 0 + ) + + elapsed = time.monotonic() - _start + logger.info( + "Transaction cost analysis completed in %.2fs: %s", + elapsed, + stats, + extra={"stats": stats, "duration_seconds": round(elapsed, 3)}, + ) + + return stats diff --git a/django-backend/soroscan/ingest/templates/admin/analytics_dashboard.html b/django-backend/soroscan/ingest/templates/admin/analytics_dashboard.html new file mode 100644 index 00000000..5e47ab61 --- /dev/null +++ b/django-backend/soroscan/ingest/templates/admin/analytics_dashboard.html @@ -0,0 +1,220 @@ +{% extends "admin/base_site.html" %} +{% load i18n static %} + +{% block title %}Analytics Dashboard{% endblock %} + +{% block extrastyle %} + {{ block.super }} + +{% endblock %} + +{% block content %} +

{% trans 'Analytics Dashboard' %}

+ +
+
+

{% trans 'Events (24h)' %}

+
{{ total_events_24h }}
+
+
+

{% trans 'Events (7d)' %}

+
{{ total_events_7d }}
+
+
+

{% trans 'Events (30d)' %}

+
{{ total_events_30d }}
+
+
+

{% trans 'Active Contracts (24h)' %}

+
{{ active_contracts_24h }}
+
+
+

{% trans 'Active Contracts (7d)' %}

+
{{ active_contracts_7d }}
+
+
+ +
+

{% trans 'Daily Event Volume (Last 30 Days)' %}

+ {% if daily_volume %} + {% with max_count=daily_volume.0.count %} +
+ {% for day in daily_volume %} +
+ {{ day.date }}: {{ day.count }} +
+ {% endfor %} +
+ {% endwith %} + {% else %} +

{% trans 'No data available for the last 30 days.' %}

+ {% endif %} +
+ +
+
+

{% trans 'Event Type Breakdown (7d)' %}

+ {% if event_type_breakdown %} + {% with max_count=event_type_breakdown.0.count %} + + + + + + + + + {% for item in event_type_breakdown %} + + + + + {% endfor %} + +
{% trans 'Event Type' %}{% trans 'Count' %}
{{ item.event_type }} +
+ {{ item.count }} +
+
+
+ {% endwith %} + {% else %} +

{% trans 'No event data for the last 7 days.' %}

+ {% endif %} +
+ +
+

{% trans 'Top Contracts (7d)' %}

+ {% if top_contracts %} + {% with max_count=top_contracts.0.count %} + + + + + + + + + {% for item in top_contracts %} + + + + + {% endfor %} + +
{% trans 'Contract' %}{% trans 'Count' %}
{{ item.name|default:item.contract_id }} +
+ {{ item.count }} +
+
+
+ {% endwith %} + {% else %} +

{% trans 'No contract data for the last 7 days.' %}

+ {% endif %} +
+
+{% endblock %} diff --git a/django-backend/soroscan/ingest/tests/conftest.py b/django-backend/soroscan/ingest/tests/conftest.py index 47c91b06..feaaac26 100644 --- a/django-backend/soroscan/ingest/tests/conftest.py +++ b/django-backend/soroscan/ingest/tests/conftest.py @@ -1,5 +1,5 @@ import pytest -from .factories import UserFactory, TrackedContractFactory +from .factories import UserFactory, TrackedContractFactory, WebhookSubscriptionFactory @pytest.fixture def user(): @@ -8,3 +8,7 @@ def user(): @pytest.fixture def contract(user): return TrackedContractFactory(owner=user) + +@pytest.fixture +def webhook_subscription(contract): + return WebhookSubscriptionFactory(contract=contract) diff --git a/django-backend/soroscan/ingest/tests/test_analytics.py b/django-backend/soroscan/ingest/tests/test_analytics.py new file mode 100644 index 00000000..a7d76248 --- /dev/null +++ b/django-backend/soroscan/ingest/tests/test_analytics.py @@ -0,0 +1,354 @@ +from unittest.mock import patch + +import pytest +from django.contrib.auth import get_user_model +from django.urls import reverse +from django.utils import timezone +from rest_framework import status +from rest_framework.test import APIClient + +from soroscan.ingest.models import EventAggregation, TrackedContract +from soroscan.ingest.tasks import aggregate_event_statistics, detect_event_anomalies + +from .factories import ContractEventFactory, TrackedContractFactory, UserFactory + +User = get_user_model() + + +@pytest.fixture +def api_client(): + return APIClient() + + +@pytest.fixture +def user(): + return UserFactory() + + +@pytest.fixture +def authenticated_client(api_client, user): + api_client.force_authenticate(user=user) + return api_client + + +@pytest.fixture +def contract(user): + return TrackedContractFactory(owner=user) + + +@pytest.mark.django_db +class TestEventAggregationModel: + def test_create_aggregation(self, contract): + bucket = timezone.now().replace(minute=0, second=0, microsecond=0) + agg = EventAggregation.objects.create( + contract=contract, + event_type="transfer", + time_bucket=bucket, + event_count=42, + ) + assert agg.event_type == "transfer" + assert agg.event_count == 42 + assert str(agg) == f"transfer: 42 @ {bucket.isoformat()}" + + def test_unique_together(self, contract): + bucket = timezone.now().replace(minute=0, second=0, microsecond=0) + EventAggregation.objects.create( + contract=contract, + event_type="transfer", + time_bucket=bucket, + event_count=10, + ) + with pytest.raises(Exception): + EventAggregation.objects.create( + contract=contract, + event_type="transfer", + time_bucket=bucket, + event_count=20, + ) + + +@pytest.mark.django_db +class TestAggregateEventStatisticsTask: + def test_aggregate_empty(self): + result = aggregate_event_statistics() + assert result["buckets_created"] == 0 + assert result["buckets_updated"] == 0 + + def test_aggregate_with_events(self, contract): + now = timezone.now() + bucket = (now - timezone.timedelta(hours=1)).replace( + minute=0, second=0, microsecond=0 + ) + ContractEventFactory.create_batch( + 5, + contract=contract, + event_type="transfer", + timestamp=bucket + timezone.timedelta(minutes=30), + ) + ContractEventFactory.create_batch( + 3, + contract=contract, + event_type="mint", + timestamp=bucket + timezone.timedelta(minutes=45), + ) + + result = aggregate_event_statistics() + + assert result["buckets_created"] == 2 + assert EventAggregation.objects.count() == 2 + + transfer_agg = EventAggregation.objects.get( + contract=contract, event_type="transfer" + ) + assert transfer_agg.event_count == 5 + + mint_agg = EventAggregation.objects.get( + contract=contract, event_type="mint" + ) + assert mint_agg.event_count == 3 + + +@pytest.mark.django_db +class TestDetectEventAnomaliesTask: + def test_no_anomalies(self, contract): + now = timezone.now() + bucket = now.replace(minute=0, second=0, microsecond=0) + prev_bucket = bucket - timezone.timedelta(hours=1) + EventAggregation.objects.create( + contract=contract, + event_type="transfer", + time_bucket=prev_bucket, + event_count=100, + ) + EventAggregation.objects.create( + contract=contract, + event_type="transfer", + time_bucket=bucket, + event_count=90, + ) + + with patch("soroscan.ingest.tasks.timezone.now", return_value=bucket + timezone.timedelta(hours=1)): + result = detect_event_anomalies() + assert result["anomalies_found"] == 0 + + def test_anomaly_detected(self, contract): + now = timezone.now() + bucket = now.replace(minute=0, second=0, microsecond=0) + prev_bucket = bucket - timezone.timedelta(hours=1) + EventAggregation.objects.create( + contract=contract, + event_type="transfer", + time_bucket=prev_bucket, + event_count=100, + ) + EventAggregation.objects.create( + contract=contract, + event_type="transfer", + time_bucket=bucket, + event_count=10, + ) + + with patch("soroscan.ingest.tasks.timezone.now", return_value=bucket + timezone.timedelta(hours=1)): + result = detect_event_anomalies() + assert result["anomalies_found"] == 1 + assert result["anomalies"][0]["drop_pct"] > 50 + + +@pytest.mark.django_db +class TestAnalyticsAPI: + def test_analytics_event_volume(self, authenticated_client, contract): + bucket = timezone.now().replace(minute=0, second=0, microsecond=0) + EventAggregation.objects.create( + contract=contract, + event_type="transfer", + time_bucket=bucket, + event_count=100, + ) + + url = reverse("analytics-list") + response = authenticated_client.get( + url, + { + "metric": "event_volume", + "granularity": "daily", + "range": "7d", + }, + ) + + assert response.status_code == status.HTTP_200_OK + assert response.data["metric"] == "event_volume" + assert len(response.data["data"]) >= 1 + + def test_analytics_active_contracts(self, authenticated_client, contract): + bucket = timezone.now().replace(minute=0, second=0, microsecond=0) + EventAggregation.objects.create( + contract=contract, + event_type="transfer", + time_bucket=bucket, + event_count=100, + ) + + url = reverse("analytics-list") + response = authenticated_client.get( + url, + { + "metric": "active_contracts", + "granularity": "daily", + "range": "7d", + }, + ) + + assert response.status_code == status.HTTP_200_OK + assert response.data["metric"] == "active_contracts" + + def test_analytics_event_type_breakdown( + self, authenticated_client, contract + ): + bucket = timezone.now().replace(minute=0, second=0, microsecond=0) + EventAggregation.objects.create( + contract=contract, + event_type="transfer", + time_bucket=bucket, + event_count=100, + ) + EventAggregation.objects.create( + contract=contract, + event_type="mint", + time_bucket=bucket, + event_count=50, + ) + + url = reverse("analytics-list") + response = authenticated_client.get( + url, + { + "metric": "event_type_breakdown", + "range": "7d", + }, + ) + + assert response.status_code == status.HTTP_200_OK + assert len(response.data["data"]) == 2 + + def test_analytics_csv_export(self, authenticated_client, contract): + bucket = timezone.now().replace(minute=0, second=0, microsecond=0) + EventAggregation.objects.create( + contract=contract, + event_type="transfer", + time_bucket=bucket, + event_count=100, + ) + + url = reverse("analytics-list") + response = authenticated_client.get( + url, + { + "metric": "event_volume", + "granularity": "daily", + "range": "7d", + "export": "csv", + }, + ) + + assert response.status_code == status.HTTP_200_OK + assert response["Content-Type"] == "text/csv" + assert "filename=" in response["Content-Disposition"] + + def test_analytics_json_export(self, authenticated_client, contract): + bucket = timezone.now().replace(minute=0, second=0, microsecond=0) + EventAggregation.objects.create( + contract=contract, + event_type="transfer", + time_bucket=bucket, + event_count=100, + ) + + url = reverse("analytics-list") + response = authenticated_client.get( + url, + { + "metric": "event_volume", + "granularity": "daily", + "range": "7d", + "export": "json", + }, + ) + + assert response.status_code == status.HTTP_200_OK + assert "data" in response.data + + def test_analytics_invalid_params(self, authenticated_client): + url = reverse("analytics-list") + response = authenticated_client.get(url, {"metric": "invalid"}) + assert response.status_code == status.HTTP_400_BAD_REQUEST + + +@pytest.mark.django_db +class TestAnalyticsOverview: + def test_overview(self, authenticated_client, contract): + bucket = timezone.now().replace(minute=0, second=0, microsecond=0) + EventAggregation.objects.create( + contract=contract, + event_type="transfer", + time_bucket=bucket, + event_count=100, + ) + + url = reverse("analytics-overview") + response = authenticated_client.get(url) + + assert response.status_code == status.HTTP_200_OK + assert "total_events_24h" in response.data + assert "active_contracts_24h" in response.data + assert "top_event_types" in response.data + assert "top_contracts" in response.data + + def test_overview_no_data(self, authenticated_client): + url = reverse("analytics-overview") + response = authenticated_client.get(url) + + assert response.status_code == status.HTTP_200_OK + assert response.data["total_events_24h"] == 0 + + +@pytest.mark.django_db +class TestAnalyticsAnomalies: + def test_anomalies_endpoint(self, authenticated_client, contract): + now = timezone.now() + bucket = now.replace(minute=0, second=0, microsecond=0) + prev_bucket = bucket - timezone.timedelta(hours=1) + EventAggregation.objects.create( + contract=contract, + event_type="transfer", + time_bucket=prev_bucket, + event_count=100, + ) + EventAggregation.objects.create( + contract=contract, + event_type="transfer", + time_bucket=bucket, + event_count=10, + ) + + url = reverse("analytics-anomalies") + with patch("soroscan.ingest.tasks.detect_event_anomalies", return_value={"anomalies": [{"contract_id": "test", "event_type": "transfer", "current_count": 10, "previous_count": 100, "drop_pct": 90.0}]}): + response = authenticated_client.get(url) + + assert response.status_code == status.HTTP_200_OK + assert "anomalies" in response.data + + +@pytest.mark.django_db +class TestEventAggregationAdmin: + def test_admin_list_view(self, authenticated_client, contract): + bucket = timezone.now().replace(minute=0, second=0, microsecond=0) + EventAggregation.objects.create( + contract=contract, + event_type="transfer", + time_bucket=bucket, + event_count=100, + ) + + url = reverse("admin:ingest_eventaggregation_changelist") + response = authenticated_client.get(url) + assert response.status_code in (200, 302) diff --git a/django-backend/soroscan/ingest/tests/test_transaction_cost.py b/django-backend/soroscan/ingest/tests/test_transaction_cost.py new file mode 100644 index 00000000..fcd9e5cf --- /dev/null +++ b/django-backend/soroscan/ingest/tests/test_transaction_cost.py @@ -0,0 +1,318 @@ +from unittest.mock import MagicMock, patch + +import pytest +from django.contrib.auth import get_user_model +from django.urls import reverse +from django.utils import timezone +from rest_framework import status +from rest_framework.test import APIClient + +from soroscan.ingest.models import TrackedContract, TransactionCost +from soroscan.ingest.stellar_client import CostData, SorobanClient +from soroscan.ingest.tasks import analyze_transaction_costs + +from .factories import TrackedContractFactory, UserFactory + +User = get_user_model() + + +@pytest.fixture +def api_client(): + return APIClient() + + +@pytest.fixture +def user(): + return UserFactory() + + +@pytest.fixture +def authenticated_client(api_client, user): + api_client.force_authenticate(user=user) + return api_client + + +@pytest.fixture +def contract(user): + return TrackedContractFactory(owner=user) + + +@pytest.mark.django_db +class TestCostExtraction: + def test_extract_transaction_costs_with_soroban_meta(self): + client = SorobanClient( + rpc_url="https://testnet.stellar.org", + network_passphrase="Test SDF Network ; September 2015", + ) + tx_response = { + "result": { + "result": { + "txMeta": { + "v3": { + "sorobanMeta": { + "resources": { + "cpuInstructions": 5000000, + "memBytes": 204800, + "netBytes": 1024, + } + } + } + } + } + }, + "tx": {"fee": {"amount": 1500000}}, + } + + cost = client.extract_transaction_costs(tx_response) + assert cost.cpu_instructions == 5000000 + assert cost.memory_bytes == 204800 + assert cost.network_bytes == 1024 + assert cost.total_fee_stroops == 1500000 + + def test_extract_transaction_costs_fallback_to_fee(self): + client = SorobanClient( + rpc_url="https://testnet.stellar.org", + network_passphrase="Test SDF Network ; September 2015", + ) + tx_response = {"status": "SUCCESS", "fee_charged": 100000} + + cost = client.extract_transaction_costs(tx_response) + assert cost.cpu_instructions == 0 + assert cost.memory_bytes == 0 + assert cost.network_bytes == 0 + assert cost.total_fee_stroops == 100000 + + def test_extract_transaction_costs_empty_response(self): + client = SorobanClient( + rpc_url="https://testnet.stellar.org", + network_passphrase="Test SDF Network ; September 2015", + ) + cost = client.extract_transaction_costs({}) + assert cost.total_fee_stroops == 0 + assert cost.cpu_instructions == 0 + + +@pytest.mark.django_db +class TestTransactionCostModel: + def test_create_transaction_cost(self, contract): + cost = TransactionCost.objects.create( + tx_hash="abc123def456", + contract=contract, + function_name="transfer", + ledger_sequence=123456, + total_fee_stroops=1500000, + cpu_instructions_used=5000000, + memory_bytes_used=204800, + network_bytes_used=1024, + ) + assert cost.tx_hash == "abc123def456" + assert cost.contract == contract + assert cost.function_name == "transfer" + assert cost.total_fee_stroops == 1500000 + assert str(cost) == "$1500000 stroops | transfer @ ledger 123456" + + def test_unique_tx_hash_constraint(self, contract): + TransactionCost.objects.create( + tx_hash="unique_hash", contract=contract, ledger_sequence=1, total_fee_stroops=100 + ) + with pytest.raises(Exception): + TransactionCost.objects.create( + tx_hash="unique_hash", contract=contract, ledger_sequence=2, total_fee_stroops=200 + ) + + +@pytest.mark.django_db +class TestTransactionCostAdmin: + def test_admin_list_view(self, authenticated_client, contract): + TransactionCost.objects.create( + tx_hash="tx1", contract=contract, ledger_sequence=1, total_fee_stroops=100 + ) + TransactionCost.objects.create( + tx_hash="tx2", contract=contract, ledger_sequence=2, total_fee_stroops=200 + ) + url = reverse("admin:ingest_transactioncost_changelist") + response = authenticated_client.get(url) + assert response.status_code in (200, 302) + + +@pytest.mark.django_db +class TestCostAnalyticsAPI: + def test_cost_analytics_by_function(self, authenticated_client, contract): + TransactionCost.objects.create( + tx_hash="tx1", contract=contract, function_name="transfer", + ledger_sequence=1, total_fee_stroops=1000, + ) + TransactionCost.objects.create( + tx_hash="tx2", contract=contract, function_name="transfer", + ledger_sequence=2, total_fee_stroops=2000, + ) + TransactionCost.objects.create( + tx_hash="tx3", contract=contract, function_name="mint", + ledger_sequence=3, total_fee_stroops=500, + ) + + url = reverse("cost-analytics-list") + response = authenticated_client.get( + url, {"contract_id": contract.contract_id, "groupby": "function"} + ) + + assert response.status_code == status.HTTP_200_OK + data = response.data["data"] + assert len(data) == 2 + + transfer_data = next(d for d in data if d["function"] == "transfer") + assert transfer_data["callCount"] == 2 + assert transfer_data["avgCost"] == 1500.0 + assert transfer_data["totalCost"] == 3000.0 + + mint_data = next(d for d in data if d["function"] == "mint") + assert mint_data["callCount"] == 1 + assert mint_data["totalCost"] == 500.0 + + def test_cost_analytics_by_day(self, authenticated_client, contract): + TransactionCost.objects.create( + tx_hash="tx1", contract=contract, function_name="transfer", + ledger_sequence=1, total_fee_stroops=1000, + ) + + url = reverse("cost-analytics-list") + response = authenticated_client.get( + url, {"contract_id": contract.contract_id, "groupby": "day"} + ) + + assert response.status_code == status.HTTP_200_OK + assert len(response.data["data"]) == 1 + + def test_cost_analytics_contract_not_found(self, authenticated_client): + url = reverse("cost-analytics-list") + response = authenticated_client.get( + url, {"contract_id": "CNONEXISTENT1234567890123456789012345678901234567890123"} + ) + assert response.status_code == status.HTTP_404_NOT_FOUND + + def test_cost_analytics_invalid_params(self, authenticated_client): + url = reverse("cost-analytics-list") + response = authenticated_client.get(url, {}) + assert response.status_code == status.HTTP_400_BAD_REQUEST + + def test_cost_trends(self, authenticated_client, contract): + TransactionCost.objects.create( + tx_hash="tx1", contract=contract, function_name="transfer", + ledger_sequence=1, total_fee_stroops=1000, + ) + + url = reverse("cost-analytics-trends") + response = authenticated_client.get(url) + + assert response.status_code == status.HTTP_200_OK + assert "current_7d_total_stroops" in response.data + + def test_cost_suggestions(self, authenticated_client, contract): + for i in range(10): + TransactionCost.objects.create( + tx_hash=f"tx{i}", contract=contract, function_name="transfer", + ledger_sequence=i, total_fee_stroops=1000 * (i + 1), + ) + + url = reverse("cost-analytics-suggestions") + response = authenticated_client.get(url) + + assert response.status_code == status.HTTP_200_OK + assert "suggestions" in response.data + + +@pytest.mark.django_db +class TestAnalyzeTransactionCostsTask: + def test_analyze_task_with_no_data(self): + result = analyze_transaction_costs() + assert result["contracts_analyzed"] == 0 + assert result["transactions_analyzed"] == 0 + assert result["outliers_flagged"] == 0 + + def test_analyze_task_with_data(self, contract): + TransactionCost.objects.create( + tx_hash="tx1", contract=contract, function_name="transfer", + ledger_sequence=1, total_fee_stroops=1000, + ) + TransactionCost.objects.create( + tx_hash="tx2", contract=contract, function_name="transfer", + ledger_sequence=2, total_fee_stroops=2000, + ) + TransactionCost.objects.create( + tx_hash="tx3", contract=contract, function_name="mint", + ledger_sequence=3, total_fee_stroops=500, + ) + + result = analyze_transaction_costs() + assert result["contracts_analyzed"] >= 1 + assert result["transactions_analyzed"] >= 0 + + def test_analyze_task_flags_outliers(self, contract): + for i in range(20): + TransactionCost.objects.create( + tx_hash=f"tx{i}", contract=contract, function_name="transfer", + ledger_sequence=i, total_fee_stroops=1000, + ) + TransactionCost.objects.create( + tx_hash="outlier_tx", contract=contract, function_name="transfer", + ledger_sequence=99, total_fee_stroops=100000, + ) + + result = analyze_transaction_costs() + outliers = TransactionCost.objects.filter(contract=contract, is_outlier=True) + assert outliers.count() >= 1 + assert result["outliers_flagged"] >= 1 + + +@pytest.mark.django_db +class TestGetTransactionCost: + def test_get_transaction_cost_success(self): + client = SorobanClient( + rpc_url="https://testnet.stellar.org", + network_passphrase="Test SDF Network ; September 2015", + ) + mock_tx_response = MagicMock() + mock_tx_response.status = "SUCCESS" + mock_tx_response.to_dict.return_value = { + "result": { + "result": { + "txMeta": { + "v3": { + "sorobanMeta": { + "resources": { + "cpuInstructions": 3000000, + "memBytes": 100000, + "netBytes": 512, + } + } + } + } + } + }, + "tx": {"fee": {"amount": 800000}}, + } + + client.server = MagicMock() + client.server.get_transaction.return_value = mock_tx_response + client._rate_limiter = MagicMock() + + cost = client.get_transaction_cost("some_tx_hash") + assert cost.cpu_instructions == 3000000 + assert cost.memory_bytes == 100000 + assert cost.total_fee_stroops == 800000 + + def test_get_transaction_cost_not_found(self): + client = SorobanClient( + rpc_url="https://testnet.stellar.org", + network_passphrase="Test SDF Network ; September 2015", + ) + mock_tx_response = MagicMock() + mock_tx_response.status = "NOT_FOUND" + + client.server = MagicMock() + client.server.get_transaction.return_value = mock_tx_response + client._rate_limiter = MagicMock() + + cost = client.get_transaction_cost("nonexistent_tx") + assert cost.total_fee_stroops == 0 + assert cost.cpu_instructions == 0 diff --git a/django-backend/soroscan/ingest/tests/test_webhook_signing.py b/django-backend/soroscan/ingest/tests/test_webhook_signing.py new file mode 100644 index 00000000..dd16665e --- /dev/null +++ b/django-backend/soroscan/ingest/tests/test_webhook_signing.py @@ -0,0 +1,223 @@ +"""Tests for webhook signing module and SigningKey model.""" + +import json +from datetime import timedelta + +import pytest +from django.utils import timezone + +from soroscan.ingest.webhook_signing import ( + generate_signing_key, + sign_webhook_payload, + verify_webhook_signature, + verify_webhook_request, +) + + +class TestSigningFunctions: + def test_generate_key_hex_length(self): + key = generate_signing_key() + assert len(key) == 64 + + def test_generate_key_unique(self): + keys = {generate_signing_key() for _ in range(100)} + assert len(keys) == 100 + + def test_sign_and_verify_sha256(self): + key = generate_signing_key() + payload = '{"event_type": "transfer", "amount": "100"}' + sig = sign_webhook_payload(payload, key, algorithm="sha256") + assert sig.startswith("sha256=") + assert verify_webhook_signature(payload, sig, key) is True + + def test_sign_and_verify_sha1(self): + key = generate_signing_key() + payload = '{"event_type": "transfer", "amount": "100"}' + sig = sign_webhook_payload(payload, key, algorithm="sha1") + assert sig.startswith("sha1=") + assert verify_webhook_signature(payload, sig, key) is True + + def test_wrong_key_rejected(self): + key = generate_signing_key() + wrong_key = generate_signing_key() + payload = '{"event_type": "transfer"}' + sig = sign_webhook_payload(payload, key) + assert verify_webhook_signature(payload, sig, wrong_key) is False + + def test_tampered_payload_rejected(self): + key = generate_signing_key() + payload = '{"event_type": "transfer"}' + sig = sign_webhook_payload(payload, key) + assert verify_webhook_signature('{"event_type": "mint"}', sig, key) is False + + def test_tampered_signature_rejected(self): + key = generate_signing_key() + payload = '{"event_type": "transfer"}' + sig = sign_webhook_payload(payload, key) + bad_sig = "sha256=0000000000000000000000000000000000000000000000000000" + assert verify_webhook_signature(payload, bad_sig, key) is False + + def test_empty_signature_rejected(self): + key = generate_signing_key() + payload = '{"event_type": "transfer"}' + assert verify_webhook_signature(payload, "", key) is False + + def test_none_signature_rejected(self): + key = generate_signing_key() + payload = '{"event_type": "transfer"}' + assert verify_webhook_signature(payload, None, key) is False + + def test_invalid_format_signature_rejected(self): + key = generate_signing_key() + payload = '{"event_type": "transfer"}' + assert verify_webhook_signature(payload, "not-a-valid-format", key) is False + + def test_verify_webhook_request_bytes(self): + key = generate_signing_key() + payload = '{"event": "test"}' + sig = sign_webhook_payload(payload, key) + assert verify_webhook_request(payload.encode("utf-8"), sig, key) is True + + def test_verify_webhook_request_no_header(self): + key = generate_signing_key() + payload = b'{"event": "test"}' + assert verify_webhook_request(payload, None, key) is False + + def test_json_payload_with_unicode(self): + key = generate_signing_key() + payload = json.dumps({"note": "héllo üñîçödé ✓"}, sort_keys=True) + sig = sign_webhook_payload(payload, key) + assert verify_webhook_signature(payload, sig, key) is True + + +@pytest.mark.django_db +class TestSigningKeyModel: + def test_create_signing_key(self, webhook_subscription): + from soroscan.ingest.models import SigningKey + from soroscan.ingest.webhook_signing import generate_signing_key + + key = SigningKey.objects.create( + subscription=webhook_subscription, + key=generate_signing_key(), + label="test-key", + expires_at=timezone.now() + timedelta(days=30), + ) + assert key.is_active is True + assert str(key).startswith("SigningKey for") + assert "active" in str(key) + + def test_expired_key_str(self, webhook_subscription): + from soroscan.ingest.models import SigningKey + from soroscan.ingest.webhook_signing import generate_signing_key + + key = SigningKey.objects.create( + subscription=webhook_subscription, + key=generate_signing_key(), + label="expired-key", + expires_at=timezone.now() - timedelta(days=1), + is_active=False, + ) + assert "expired" in str(key) + + def test_multiple_keys_per_subscription(self, webhook_subscription): + from soroscan.ingest.models import SigningKey + from soroscan.ingest.webhook_signing import generate_signing_key + + for i in range(3): + SigningKey.objects.create( + subscription=webhook_subscription, + key=generate_signing_key(), + label=f"key-{i}", + expires_at=timezone.now() + timedelta(days=30), + ) + assert webhook_subscription.signing_keys.count() == 3 + + +@pytest.mark.django_db +class TestRotateExpiredSigningKeysTask: + def test_rotate_task_deactivates_expired_keys(self, webhook_subscription): + from soroscan.ingest.models import SigningKey + from soroscan.ingest.tasks import rotate_expired_signing_keys + from soroscan.ingest.webhook_signing import generate_signing_key + + # Create a key that expired 1 day ago (still within 7-day retention) + key = SigningKey.objects.create( + subscription=webhook_subscription, + key=generate_signing_key(), + label="expired-key", + expires_at=timezone.now() - timedelta(days=1), + is_active=True, + ) + result = rotate_expired_signing_keys() + key.refresh_from_db() + assert key.is_active is False + assert result["deactivated"] == 1 + + def test_rotate_task_deletes_very_old_keys(self, webhook_subscription): + from soroscan.ingest.models import SigningKey + from soroscan.ingest.tasks import rotate_expired_signing_keys + from soroscan.ingest.webhook_signing import generate_signing_key + + # Create a key that expired 30 days ago (past retention) + key = SigningKey.objects.create( + subscription=webhook_subscription, + key=generate_signing_key(), + label="stale-key", + expires_at=timezone.now() - timedelta(days=30), + is_active=True, + ) + result = rotate_expired_signing_keys() + assert SigningKey.objects.filter(pk=key.pk).exists() is False + assert result["deleted"] == 1 + + def test_rotate_ignores_active_keys(self, webhook_subscription): + from soroscan.ingest.models import SigningKey + from soroscan.ingest.tasks import rotate_expired_signing_keys + from soroscan.ingest.webhook_signing import generate_signing_key + + SigningKey.objects.create( + subscription=webhook_subscription, + key=generate_signing_key(), + label="active-key", + expires_at=timezone.now() + timedelta(days=30), + is_active=True, + ) + result = rotate_expired_signing_keys() + assert result["deactivated"] == 0 + assert result["deleted"] == 0 + + +@pytest.mark.django_db +class TestWebhookDispatchWithSigningKey: + def test_dispatch_uses_signing_key_when_available( + self, webhook_subscription, mocker + ): + from soroscan.ingest.models import SigningKey + from soroscan.ingest.tasks import _build_webhook_signature_header + from soroscan.ingest.webhook_signing import generate_signing_key + + # Create an active signing key + signing_key = SigningKey.objects.create( + subscription=webhook_subscription, + key=generate_signing_key(), + label="active-key", + expires_at=timezone.now() + timedelta(days=30), + is_active=True, + ) + + payload = b'{"event": "test"}' + sig = _build_webhook_signature_header(webhook_subscription, payload) + + # Verify the signature with the signing key + assert verify_webhook_signature(payload.decode(), sig, signing_key.key) is True + + def test_dispatch_falls_back_to_webhook_secret(self, webhook_subscription): + from soroscan.ingest.tasks import _build_webhook_signature_header + + payload = b'{"event": "test"}' + sig = _build_webhook_signature_header(webhook_subscription, payload) + + # Verify with the webhook's own secret + assert verify_webhook_signature( + payload.decode(), sig, webhook_subscription.secret + ) is True diff --git a/django-backend/soroscan/ingest/urls.py b/django-backend/soroscan/ingest/urls.py index d5173aa1..83b6ecb4 100644 --- a/django-backend/soroscan/ingest/urls.py +++ b/django-backend/soroscan/ingest/urls.py @@ -6,8 +6,10 @@ from .views import ( APIKeyViewSet, + AnalyticsViewSet, ContractEventViewSet, ContractInvocationViewSet, + CostAnalyticsViewSet, TeamViewSet, TrackedContractViewSet, admin_ingest_errors_view, @@ -39,6 +41,8 @@ router.register(r"webhooks", WebhookSubscriptionViewSet, basename="webhook") router.register(r"api-keys", APIKeyViewSet, basename="apikey") router.register(r"teams", TeamViewSet, basename="team") +router.register(r"analytics/costs", CostAnalyticsViewSet, basename="cost-analytics") +router.register(r"analytics", AnalyticsViewSet, basename="analytics") urlpatterns = [ path("contracts//timeline/", contract_timeline_view, name="contract-timeline"), diff --git a/django-backend/soroscan/ingest/views.py b/django-backend/soroscan/ingest/views.py index 44aacb9a..e6728c0f 100644 --- a/django-backend/soroscan/ingest/views.py +++ b/django-backend/soroscan/ingest/views.py @@ -1,19 +1,22 @@ """ API Views for SoroScan event ingestion. """ +import csv import hashlib import hmac import json import logging import re import time -from datetime import timedelta +from datetime import datetime, time as datetime_time, timedelta from django.conf import settings -from django.db.models import Count, Max, Min, Q, Avg +from django.db.models import Avg, Count, Max, Min, Q, StdDev, Sum, Variance from django.db.models.functions import Cast +from django.http import HttpResponse from django.shortcuts import get_object_or_404, redirect from django.utils import timezone +from django.utils.dateparse import parse_date, parse_datetime from django_filters.rest_framework import DjangoFilterBackend from drf_spectacular.utils import extend_schema, inline_serializer from rest_framework import serializers, status, viewsets @@ -31,6 +34,7 @@ from .cache_utils import cache_result, get_or_set_json, query_cache_ttl, stable_cache_key from .models import ( APIKey, + APIUsageLog, AdminAction, ArchivedEventBatch, ContractEvent, @@ -40,22 +44,27 @@ Organization, OrganizationCostSnapshot, OrganizationBudget, - OrganizationMembership, + Organization, IngestError, IndexerState, Team, TeamMembership, + EventAggregation, TrackedContract, + TransactionCost, WebhookDeliveryLog, WebhookSubscription, ) from .cache_utils import get_cached_contract from .serializers import ( APIKeySerializer, + AnalyticsQuerySerializer, ContractEventSerializer, ContractInvocationSerializer, ContractSourceSerializer, ContractVerificationSerializer, + CostAnalyticsQuerySerializer, + EventAggregationSerializer, EventSearchSerializer, OrganizationBudgetSerializer, OrganizationCorsSerializer, @@ -64,6 +73,7 @@ TeamMemberAddSerializer, TeamSerializer, TrackedContractSerializer, + TransactionCostSerializer, WebhookSubscriptionSerializer, ) from .stellar_client import SorobanClient @@ -1581,6 +1591,263 @@ def rate_limit_analytics_view(request): ) +def _parse_usage_datetime(value: str | None, *, end_of_day: bool = False): + if not value: + return None + parsed = parse_datetime(value) + if parsed is None: + parsed_date = parse_date(value) + if parsed_date is None: + return None + parsed = datetime.combine( + parsed_date, + datetime_time.max if end_of_day else datetime_time.min, + ) + if timezone.is_naive(parsed): + parsed = timezone.make_aware(parsed, timezone.get_current_timezone()) + return parsed + + +def _accessible_organizations(user): + if user.is_staff: + return Organization.objects.all() + return Organization.objects.filter(Q(owner=user) | Q(memberships__user=user)).distinct() + + +def _usage_time_range(request): + start_param = request.query_params.get("start") + end_param = request.query_params.get("end") + start = _parse_usage_datetime(start_param) + end = _parse_usage_datetime(end_param, end_of_day=True) + + if start_param and start is None: + return None, None, Response( + {"error": "start must be an ISO-8601 datetime or YYYY-MM-DD date"}, + status=status.HTTP_400_BAD_REQUEST, + ) + if end_param and end is None: + return None, None, Response( + {"error": "end must be an ISO-8601 datetime or YYYY-MM-DD date"}, + status=status.HTTP_400_BAD_REQUEST, + ) + + if start is None: + try: + days = int(request.query_params.get("days", 30)) + except (TypeError, ValueError): + return None, None, Response( + {"error": "days must be an integer"}, + status=status.HTTP_400_BAD_REQUEST, + ) + if days <= 0 or days > 366: + return None, None, Response( + {"error": "days must be between 1 and 366"}, + status=status.HTTP_400_BAD_REQUEST, + ) + start = timezone.now() - timedelta(days=days) + + if end is None: + end = timezone.now() + + if start > end: + return None, None, Response( + {"error": "start must be before or equal to end"}, + status=status.HTTP_400_BAD_REQUEST, + ) + + return start, end, None + + +def _organization_usage_payload(request): + organizations = _accessible_organizations(request.user) + organization_id = request.query_params.get("organization_id") + if organization_id: + try: + organization_id = int(organization_id) + except (TypeError, ValueError): + return None, Response( + {"error": "organization_id must be an integer"}, + status=status.HTTP_400_BAD_REQUEST, + ) + organizations = organizations.filter(pk=organization_id) + if not organizations.exists(): + return None, Response( + {"error": "organization not found or access denied"}, + status=status.HTTP_404_NOT_FOUND, + ) + + start, end, error = _usage_time_range(request) + if error: + return None, error + + usage = APIUsageLog.objects.filter( + organization__in=organizations, + timestamp__gte=start, + timestamp__lte=end, + ) + webhook_deliveries = WebhookDeliveryLog.objects.filter( + subscription__contract__organization__in=organizations, + timestamp__gte=start, + timestamp__lte=end, + ) + + endpoint_rows = list( + usage.values("endpoint", "method") + .annotate( + requests=Count("id"), + errors=Count("id", filter=Q(status_code__gte=400)), + request_bytes=Sum("request_bytes"), + response_bytes=Sum("response_bytes"), + ) + .order_by("-requests", "endpoint", "method") + ) + for row in endpoint_rows: + row["request_bytes"] = row["request_bytes"] or 0 + row["response_bytes"] = row["response_bytes"] or 0 + row["data_transferred_bytes"] = row["request_bytes"] + row["response_bytes"] + + error_rows = list( + usage.exclude(error_type="") + .values("error_type") + .annotate(count=Count("id")) + .order_by("-count", "error_type") + ) + + webhook_rows = list( + webhook_deliveries.values( + "subscription_id", + "subscription__contract__contract_id", + "subscription__contract__name", + ) + .annotate( + deliveries=Count("id"), + successes=Count("id", filter=Q(success=True)), + failures=Count("id", filter=Q(success=False)), + payload_bytes=Sum("payload_bytes"), + avg_latency_ms=Avg("latency_ms"), + ) + .order_by("-deliveries", "subscription_id") + ) + for row in webhook_rows: + deliveries = row["deliveries"] or 0 + successes = row["successes"] or 0 + row["payload_bytes"] = row["payload_bytes"] or 0 + row["success_rate_percent"] = round((successes / deliveries) * 100.0, 2) if deliveries else None + + totals = usage.aggregate( + requests=Count("id"), + request_bytes=Sum("request_bytes"), + response_bytes=Sum("response_bytes"), + errors=Count("id", filter=Q(status_code__gte=400)), + ) + webhook_totals = webhook_deliveries.aggregate( + deliveries=Count("id"), + failures=Count("id", filter=Q(success=False)), + payload_bytes=Sum("payload_bytes"), + ) + request_bytes = totals["request_bytes"] or 0 + response_bytes = totals["response_bytes"] or 0 + + payload = { + "generated_at": timezone.now(), + "time_range": {"start": start, "end": end}, + "organizations": list(organizations.values("id", "name", "slug")), + "totals": { + "requests": totals["requests"] or 0, + "errors": totals["errors"] or 0, + "request_bytes": request_bytes, + "response_bytes": response_bytes, + "data_transferred_bytes": request_bytes + response_bytes, + "webhook_deliveries": webhook_totals["deliveries"] or 0, + "webhook_failures": webhook_totals["failures"] or 0, + "webhook_payload_bytes": webhook_totals["payload_bytes"] or 0, + }, + "requests_per_endpoint": endpoint_rows, + "errors_by_type": error_rows, + "webhook_deliveries": webhook_rows, + } + return payload, None + + +def _usage_payload_as_csv(payload): + response = HttpResponse(content_type="text/csv") + response["Content-Disposition"] = 'attachment; filename="api-usage-analytics.csv"' + writer = csv.writer(response) + + writer.writerow(["section", "metric", "value"]) + for key, value in payload["totals"].items(): + writer.writerow(["totals", key, value]) + + writer.writerow([]) + writer.writerow(["endpoint", "method", "requests", "errors", "request_bytes", "response_bytes", "data_transferred_bytes"]) + for row in payload["requests_per_endpoint"]: + writer.writerow([ + row["endpoint"], + row["method"], + row["requests"], + row["errors"], + row["request_bytes"], + row["response_bytes"], + row["data_transferred_bytes"], + ]) + + writer.writerow([]) + writer.writerow(["error_type", "count"]) + for row in payload["errors_by_type"]: + writer.writerow([row["error_type"], row["count"]]) + + writer.writerow([]) + writer.writerow(["subscription_id", "contract_id", "contract_name", "deliveries", "successes", "failures", "payload_bytes", "avg_latency_ms", "success_rate_percent"]) + for row in payload["webhook_deliveries"]: + writer.writerow([ + row["subscription_id"], + row["subscription__contract__contract_id"], + row["subscription__contract__name"], + row["deliveries"], + row["successes"], + row["failures"], + row["payload_bytes"], + row["avg_latency_ms"], + row["success_rate_percent"], + ]) + + return response + + +@extend_schema( + responses=inline_serializer( + name="OrganizationAPIUsageAnalyticsResponse", + fields={ + "generated_at": serializers.DateTimeField(), + "time_range": serializers.JSONField(), + "organizations": serializers.JSONField(), + "totals": serializers.JSONField(), + "requests_per_endpoint": serializers.JSONField(), + "errors_by_type": serializers.JSONField(), + "webhook_deliveries": serializers.JSONField(), + }, + ) +) +@api_view(["GET"]) +@permission_classes([IsAuthenticated]) +def organization_api_usage_analytics_view(request, format=None): + """ + Return API usage analytics for organizations visible to the current user. + + Query params: + - organization_id: optional organization filter + - start / end: ISO-8601 datetime or YYYY-MM-DD + - days: relative lookback when start is omitted, default 30 + - format=csv or export=csv: return a CSV export + """ + payload, error = _organization_usage_payload(request) + if error: + return error + if format == "csv" or request.query_params.get("format") == "csv" or request.query_params.get("export") == "csv": + return _usage_payload_as_csv(payload) + return Response(payload) + + # --------------------------------------------------------------------------- # Issue #280: GDPR — deletion requests & compliance export # --------------------------------------------------------------------------- @@ -2041,127 +2308,556 @@ def contract_identity_view(request): # --------------------------------------------------------------------------- -# Issue #491: EXPLAIN ANALYZE endpoint for query debugging +# Transaction Cost Analytics (Issue #804) # --------------------------------------------------------------------------- -_ALLOWED_STATEMENTS = re.compile( - r"^\s*(SELECT|WITH|EXPLAIN)\b", - re.IGNORECASE, -) +class CostAnalyticsViewSet(viewsets.ViewSet): + """ + ViewSet for transaction cost analytics. -@extend_schema( - request=inline_serializer( - name="DBExplainRequest", - fields={ - "query": serializers.CharField( - help_text="SQL SELECT statement to explain", - ), - "analyze": serializers.BooleanField( - default=False, - help_text="If true, runs EXPLAIN ANALYZE instead of EXPLAIN", - ), - }, - ), - responses={ - 200: inline_serializer( - name="DBExplainResponse", + Endpoints: + - GET /api/analytics/costs/ - Cost breakdown by function or day + - GET /api/analytics/costs/trends/ - Week-over-week and month-over-month trends + - GET /api/analytics/costs/suggestions/ - Optimization suggestions + """ + + permission_classes = [IsAuthenticated] + + @extend_schema( + parameters=[CostAnalyticsQuerySerializer], + responses=inline_serializer( + name="CostAnalyticsResponse", fields={ - "query_plan": serializers.CharField(), - "analyzed": serializers.BooleanField(), + "data": serializers.ListField( + child=inline_serializer( + name="CostBreakdownItem", + fields={ + "function": serializers.CharField(required=False), + "date": serializers.CharField(required=False), + "avgCost": serializers.FloatField(), + "minCost": serializers.FloatField(), + "maxCost": serializers.FloatField(), + "totalCost": serializers.FloatField(), + "callCount": serializers.IntegerField(), + }, + ) + ), + "contract_id": serializers.CharField(), + "range": serializers.CharField(), }, ), - 400: inline_serializer( - name="DBExplainError", - fields={"error": serializers.CharField()}, - ), - }, -) -@api_view(["POST"]) -@permission_classes([IsAuthenticated]) -@throttle_classes([UserRateThrottle]) -def db_explain_view(request): - """ - POST /api/admin/db/explain/ + ) + def list(self, request): + """ + GET /api/analytics/costs/ - Returns the query execution plan for a given SQL SELECT statement. - Secured to admin (staff) users only and rate-limited. - """ - if not request.user or not request.user.is_staff: - return Response( - {"error": "Admin access required."}, - status=status.HTTP_403_FORBIDDEN, - ) + Query params: + - contract_id (required): Contract ID to analyze + - groupby (optional): "function" (default) or "day" + - range (optional): "7d" (default), "30d", or "90d" + """ + serializer = CostAnalyticsQuerySerializer(data=request.query_params) + serializer.is_valid(raise_exception=True) - sql = (request.data.get("query") or "").strip() - if not sql: - return Response( - {"error": "A SQL query is required."}, - status=status.HTTP_400_BAD_REQUEST, + contract_id = serializer.validated_data["contract_id"] + groupby = serializer.validated_data["groupby"] + range_days = {"7d": 7, "30d": 30, "90d": 90}[serializer.validated_data["range"]] + + contract = get_cached_contract(contract_id) + if not contract: + return Response( + {"detail": "Contract not found."}, + status=status.HTTP_404_NOT_FOUND, + ) + + since = timezone.now() - timedelta(days=range_days) + qs = TransactionCost.objects.filter( + contract=contract, created_at__gte=since ) - if not _ALLOWED_STATEMENTS.match(sql): - return Response( - {"error": "Only SELECT, WITH, and EXPLAIN statements are allowed."}, - status=status.HTTP_400_BAD_REQUEST, + from django.db.models import Avg, Count, Max, Min, Sum + from django.db.models.functions import TruncDate + + if groupby == "function": + results = ( + qs.values("function_name") + .annotate( + avg_cost=Avg("total_fee_stroops"), + min_cost=Min("total_fee_stroops"), + max_cost=Max("total_fee_stroops"), + total_cost=Sum("total_fee_stroops"), + call_count=Count("id"), + ) + .order_by("-total_cost") + ) + data = [ + { + "function": r["function_name"], + "avgCost": round(float(r["avg_cost"]), 2) if r["avg_cost"] else 0, + "minCost": float(r["min_cost"]) if r["min_cost"] else 0, + "maxCost": float(r["max_cost"]) if r["max_cost"] else 0, + "totalCost": float(r["total_cost"]) if r["total_cost"] else 0, + "callCount": r["call_count"], + } + for r in results + ] + else: + results = ( + qs.annotate(date=TruncDate("created_at")) + .values("date") + .annotate( + avg_cost=Avg("total_fee_stroops"), + min_cost=Min("total_fee_stroops"), + max_cost=Max("total_fee_stroops"), + total_cost=Sum("total_fee_stroops"), + call_count=Count("id"), + ) + .order_by("date") + ) + data = [ + { + "date": r["date"].isoformat() if r["date"] else "", + "avgCost": round(float(r["avg_cost"]), 2) if r["avg_cost"] else 0, + "minCost": float(r["min_cost"]) if r["min_cost"] else 0, + "maxCost": float(r["max_cost"]) if r["max_cost"] else 0, + "totalCost": float(r["total_cost"]) if r["total_cost"] else 0, + "callCount": r["call_count"], + } + for r in results + ] + + return Response({ + "data": data, + "contract_id": contract_id, + "range": serializer.validated_data["range"], + }) + + @extend_schema( + responses=inline_serializer( + name="CostTrendsResponse", + fields={ + "current_7d_total_stroops": serializers.FloatField(), + "previous_7d_total_stroops": serializers.FloatField(), + "week_over_week_change_pct": serializers.FloatField(), + "current_30d_total_stroops": serializers.FloatField(), + "previous_30d_total_stroops": serializers.FloatField(), + "month_over_month_change_pct": serializers.FloatField(), + }, ) + ) + @action(detail=False, methods=["get"]) + def trends(self, request): + """ + GET /api/analytics/costs/trends/ - analyze = bool(request.data.get("analyze", False)) - prefix = "EXPLAIN ANALYZE" if analyze else "EXPLAIN" + Returns week-over-week and month-over-month cost trends. + """ + now = timezone.now() + + # Week-over-week + seven_days_ago = now - timedelta(days=7) + fourteen_days_ago = now - timedelta(days=14) + current_week = TransactionCost.objects.filter( + created_at__gte=seven_days_ago + ).aggregate(total=Sum("total_fee_stroops"))["total"] or 0 + prev_week = TransactionCost.objects.filter( + created_at__gte=fourteen_days_ago, + created_at__lt=seven_days_ago, + ).aggregate(total=Sum("total_fee_stroops"))["total"] or 0 + + # Month-over-month + thirty_days_ago = now - timedelta(days=30) + sixty_days_ago = now - timedelta(days=60) + current_month = TransactionCost.objects.filter( + created_at__gte=thirty_days_ago + ).aggregate(total=Sum("total_fee_stroops"))["total"] or 0 + prev_month = TransactionCost.objects.filter( + created_at__gte=sixty_days_ago, + created_at__lt=thirty_days_ago, + ).aggregate(total=Sum("total_fee_stroops"))["total"] or 0 + + def pct_change(current, previous): + if previous > 0: + return round((current - previous) / previous * 100, 2) + return 0.0 + + return Response({ + "current_7d_total_stroops": float(current_week), + "previous_7d_total_stroops": float(prev_week), + "week_over_week_change_pct": pct_change(current_week, prev_week), + "current_30d_total_stroops": float(current_month), + "previous_30d_total_stroops": float(prev_month), + "month_over_month_change_pct": pct_change(current_month, prev_month), + }) - try: - from django.db import connection + @extend_schema( + responses=inline_serializer( + name="CostSuggestionsResponse", + fields={ + "suggestions": serializers.ListField( + child=inline_serializer( + name="OptimizationSuggestion", + fields={ + "function_name": serializers.CharField(), + "avg_cost_stroops": serializers.FloatField(), + "max_cost_stroops": serializers.FloatField(), + "call_count": serializers.IntegerField(), + "cost_variance": serializers.FloatField(), + "suggestion": serializers.CharField(), + }, + ) + ) + }, + ) + ) + @action(detail=False, methods=["get"]) + def suggestions(self, request): + """ + GET /api/analytics/costs/suggestions/ - with connection.cursor() as cursor: - try: - cursor.execute(f"{prefix} {sql}") - except Exception: - if analyze: - # SQLite doesn't support EXPLAIN ANALYZE; fall back to EXPLAIN - cursor.execute(f"EXPLAIN {sql}") - else: - raise - plan = "\n".join( - " ".join(str(cell) for cell in row) for row in cursor.fetchall() + Returns optimization suggestions for high-variance functions. + Identifies functions with high cost variance that could be optimized. + """ + seven_days_ago = timezone.now() - timedelta(days=7) + function_stats = ( + TransactionCost.objects.filter(created_at__gte=seven_days_ago) + .values("function_name") + .annotate( + avg_cost=Avg("total_fee_stroops"), + max_cost=Max("total_fee_stroops"), + min_cost=Min("total_fee_stroops"), + total_cost=Sum("total_fee_stroops"), + call_count=Count("id"), + cost_stddev=StdDev("total_fee_stroops"), ) - except Exception as exc: - return Response( - {"error": f"Failed to execute EXPLAIN: {exc}"}, - status=status.HTTP_400_BAD_REQUEST, + .filter(call_count__gte=5) + .order_by("-cost_stddev") ) - return Response({"query_plan": plan, "analyzed": analyze}) + suggestions = [] + for r in function_stats: + avg = float(r["avg_cost"] or 0) + stddev = float(r["cost_stddev"] or 0) + variance = stddev / avg if avg > 0 else 0 + max_cost = float(r["max_cost"] or 0) + + if variance > 0.5 and max_cost > avg * 2: + suggestion = ( + f"High cost variance detected for '{r['function_name']}'. " + f"Max cost ({max_cost:.0f} stroops) is >2x average ({avg:.0f} stroops). " + "Review parameter sizes and loop bounds for optimization opportunities." + ) + elif avg > 1000000: + suggestion = ( + f"'{r['function_name']}' has high average cost ({avg:.0f} stroops). " + "Consider caching results or batching calls to reduce fees." + ) + else: + continue + + suggestions.append({ + "function_name": r["function_name"], + "avg_cost_stroops": avg, + "max_cost_stroops": max_cost, + "call_count": r["call_count"], + "cost_variance": round(variance, 4), + "suggestion": suggestion, + }) + + return Response({"suggestions": suggestions}) # --------------------------------------------------------------------------- -# Issue #488: Cache stats endpoint +# Analytics Dashboard (Issue #801) # --------------------------------------------------------------------------- -@extend_schema( - responses=inline_serializer( - name="CacheStatsResponse", - fields={ - "backend": serializers.CharField(), - "default_ttl": serializers.IntegerField(), - "status": serializers.CharField(), - }, - ), -) -@api_view(["GET"]) -@permission_classes([IsAuthenticated]) -@throttle_classes([UserRateThrottle]) -def cache_stats_view(request): + +class AnalyticsViewSet(viewsets.ViewSet): """ - GET /api/cache/stats/ + ViewSet for event analytics and reporting. - Returns cache hit/miss statistics and current cache backend info. + Endpoints: + - GET /api/analytics/ - Time-series event data with filtering + - GET /api/analytics/overview/ - Dashboard summary widgets + - GET /api/analytics/anomalies/ - Detected volume anomalies """ - from django.core.cache import cache as django_cache - backend_info = str(type(django_cache._cache).__name__) + permission_classes = [IsAuthenticated] - return Response({ - "backend": backend_info, - "default_ttl": getattr(settings, "QUERY_CACHE_TTL_SECONDS", 60), - "status": "ok", - }) + def _time_bucket_trunc(self, granularity: str): + from django.db.models.functions import TruncDay, TruncHour, TruncMonth, TruncWeek + return { + "hourly": TruncHour("time_bucket"), + "daily": TruncDay("time_bucket"), + "weekly": TruncWeek("time_bucket"), + "monthly": TruncMonth("time_bucket"), + }[granularity] + + def _range_days(self, range_param: str) -> int: + return {"7d": 7, "30d": 30, "90d": 90, "1y": 365}[range_param] + + @extend_schema( + parameters=[AnalyticsQuerySerializer], + responses=inline_serializer( + name="AnalyticsResponse", + fields={ + "metric": serializers.CharField(), + "granularity": serializers.CharField(), + "range": serializers.CharField(), + "data": serializers.ListField( + child=inline_serializer( + name="AnalyticsDataPoint", + fields={ + "timestamp": serializers.CharField(), + "contract_id": serializers.CharField(required=False), + "event_type": serializers.CharField(required=False), + "count": serializers.IntegerField(), + }, + ) + ), + }, + ), + ) + def list(self, request): + """ + GET /api/analytics/ + + Query params: + - metric: event_volume (default), active_contracts, event_type_breakdown + - granularity: hourly, daily (default), weekly, monthly + - range: 7d, 30d (default), 90d, 1y + - contract_id: optional filter by specific contract + - export: csv or json (triggers file download) + """ + serializer = AnalyticsQuerySerializer(data=request.query_params) + serializer.is_valid(raise_exception=True) + + metric = serializer.validated_data["metric"] + granularity = serializer.validated_data["granularity"] + range_days = self._range_days(serializer.validated_data["range"]) + contract_id = serializer.validated_data.get("contract_id") + export_format = serializer.validated_data.get("export") + + since = timezone.now() - timedelta(days=range_days) + qs = EventAggregation.objects.filter(time_bucket__gte=since) + + if contract_id: + qs = qs.filter(contract__contract_id=contract_id) + + trunc = self._time_bucket_trunc(granularity) + + if metric == "active_contracts": + results = ( + qs.annotate(bucket=trunc) + .values("bucket") + .annotate(count=Count("contract_id", distinct=True)) + .order_by("bucket") + ) + data = [ + {"timestamp": r["bucket"].isoformat(), "count": r["count"]} + for r in results + ] + elif metric == "event_type_breakdown": + results = ( + qs.values("event_type") + .annotate(count=Sum("event_count")) + .order_by("-count") + ) + data = [ + {"event_type": r["event_type"], "count": r["count"]} + for r in results + ] + else: + results = ( + qs.annotate(bucket=trunc) + .values("bucket", "contract__contract_id") + .annotate(count=Sum("event_count")) + .order_by("bucket") + ) + data = [ + { + "timestamp": r["bucket"].isoformat(), + "contract_id": r["contract__contract_id"], + "count": r["count"], + } + for r in results + ] + + if export_format == "csv": + return self._export_csv(metric, data) + + if export_format == "json": + return self._export_json(metric, data) + + return Response({ + "metric": metric, + "granularity": granularity, + "range": serializer.validated_data["range"], + "data": data, + }) + + def _export_csv(self, metric: str, data: list) -> Response: + import csv + from io import StringIO + + buf = StringIO() + writer = csv.writer(buf) + if metric == "event_type_breakdown": + writer.writerow(["event_type", "count"]) + for row in data: + writer.writerow([row["event_type"], row["count"]]) + elif metric == "active_contracts": + writer.writerow(["timestamp", "active_contracts"]) + for row in data: + writer.writerow([row["timestamp"], row["count"]]) + else: + writer.writerow(["timestamp", "contract_id", "count"]) + for row in data: + writer.writerow([row["timestamp"], row["contract_id"], row["count"]]) + + response = Response( + buf.getvalue(), + content_type="text/csv", + headers={ + "Content-Disposition": f'attachment; filename="analytics_{metric}.csv"' + }, + ) + return response + + def _export_json(self, metric: str, data: list) -> Response: + response = Response( + {"metric": metric, "data": data}, + content_type="application/json", + headers={ + "Content-Disposition": f'attachment; filename="analytics_{metric}.json"' + }, + ) + return response + + @extend_schema( + responses=inline_serializer( + name="AnalyticsOverviewResponse", + fields={ + "total_events_24h": serializers.IntegerField(), + "total_events_7d": serializers.IntegerField(), + "active_contracts_24h": serializers.IntegerField(), + "active_contracts_7d": serializers.IntegerField(), + "top_event_types": serializers.ListField( + child=inline_serializer( + name="TopEventType", + fields={ + "event_type": serializers.CharField(), + "count": serializers.IntegerField(), + }, + ) + ), + "top_contracts": serializers.ListField( + child=inline_serializer( + name="TopContract", + fields={ + "contract_id": serializers.CharField(), + "count": serializers.IntegerField(), + }, + ) + ), + }, + ) + ) + @action(detail=False, methods=["get"]) + def overview(self, request): + """ + GET /api/analytics/overview/ + + Returns summary widgets for the analytics dashboard: + - Total events in last 24h and 7d + - Active contracts in last 24h and 7d + - Top event types by volume + - Top contracts by event count + """ + now = timezone.now() + day_ago = now - timedelta(days=1) + week_ago = now - timedelta(days=7) + + total_24h = ( + EventAggregation.objects.filter(time_bucket__gte=day_ago) + .aggregate(total=Sum("event_count"))["total"] or 0 + ) + total_7d = ( + EventAggregation.objects.filter(time_bucket__gte=week_ago) + .aggregate(total=Sum("event_count"))["total"] or 0 + ) + + active_24h = ( + EventAggregation.objects.filter(time_bucket__gte=day_ago) + .values("contract_id") + .distinct() + .count() + ) + active_7d = ( + EventAggregation.objects.filter(time_bucket__gte=week_ago) + .values("contract_id") + .distinct() + .count() + ) + + top_event_types = list( + EventAggregation.objects.filter(time_bucket__gte=week_ago) + .values("event_type") + .annotate(count=Sum("event_count")) + .order_by("-count")[:10] + ) + + top_contracts = list( + EventAggregation.objects.filter(time_bucket__gte=week_ago) + .values("contract__contract_id") + .annotate(count=Sum("event_count")) + .order_by("-count")[:10] + ) + + return Response({ + "total_events_24h": total_24h, + "total_events_7d": total_7d, + "active_contracts_24h": active_24h, + "active_contracts_7d": active_7d, + "top_event_types": [ + {"event_type": r["event_type"], "count": r["count"]} + for r in top_event_types + ], + "top_contracts": [ + {"contract_id": r["contract__contract_id"], "count": r["count"]} + for r in top_contracts + ], + }) + + @extend_schema( + responses=inline_serializer( + name="AnomaliesResponse", + fields={ + "anomalies": serializers.ListField( + child=inline_serializer( + name="AnomalyItem", + fields={ + "contract_id": serializers.CharField(), + "event_type": serializers.CharField(), + "current_count": serializers.IntegerField(), + "previous_count": serializers.IntegerField(), + "drop_pct": serializers.FloatField(), + }, + ) + ) + }, + ) + ) + @action(detail=False, methods=["get"]) + def anomalies(self, request): + """ + GET /api/analytics/anomalies/ + + Returns detected event volume anomalies by comparing the last + full hour against the previous hour. + """ + from soroscan.ingest.tasks import detect_event_anomalies + + result = detect_event_anomalies() + return Response({"anomalies": result.get("anomalies", [])}) diff --git a/django-backend/soroscan/ingest/webhook_signing.py b/django-backend/soroscan/ingest/webhook_signing.py new file mode 100644 index 00000000..684a6c0e --- /dev/null +++ b/django-backend/soroscan/ingest/webhook_signing.py @@ -0,0 +1,112 @@ +""" +HMAC-SHA256 request signing and verification for webhook security. + +Provides outbound webhook signing (X-SoroScan-Signature header) and +verification utilities for webhook subscribers. +""" + +import hashlib +import hmac +import secrets +from typing import Optional + + +def generate_signing_key() -> str: + """ + Generate a cryptographically secure signing key. + + Returns a hex-encoded string of 32 random bytes (64 hex chars). + """ + return secrets.token_hex(32) + + +def sign_webhook_payload(payload: str, key: str, algorithm: str = "sha256") -> str: + """ + Sign a webhook payload using HMAC. + + Args: + payload: The JSON-serialized payload string to sign. + key: The signing key (hex-encoded string). + algorithm: Hash algorithm to use ("sha256" or "sha1"). + + Returns: + Signature string in the format "{algorithm}={hex_digest}". + + Example: + >>> sign_webhook_payload('{"event":"transfer"}', my_key) + 'sha256=abc123def456...' + """ + digestmod = hashlib.sha256 if algorithm == "sha256" else hashlib.sha1 + prefix = "sha256" if algorithm == "sha256" else "sha1" + sig_hex = hmac.new( + key.encode("utf-8"), + payload.encode("utf-8"), + digestmod=digestmod, + ).hexdigest() + return f"{prefix}={sig_hex}" + + +def verify_webhook_signature( + payload: str, signature_header: str, secret_key: str +) -> bool: + """ + Verify a webhook payload against the X-SoroScan-Signature header. + + Args: + payload: The raw JSON payload string. + signature_header: The X-SoroScan-Signature header value + (e.g. "sha256=abc123..."). + secret_key: The shared signing key. + + Returns: + True if the signature is valid, False otherwise. + + Example: + >>> is_valid = verify_webhook_signature( + ... '{"event":"transfer"}', + ... 'sha256=abc123...', + ... my_key + ... ) + """ + if not signature_header: + return False + try: + if "=" not in signature_header: + return False + algorithm_part, signature = signature_header.split("=", 1) + digestmod = ( + hashlib.sha256 if algorithm_part == "sha256" else hashlib.sha1 + ) + expected_sig = hmac.new( + secret_key.encode("utf-8"), + payload.encode("utf-8"), + digestmod=digestmod, + ).hexdigest() + return hmac.compare_digest(expected_sig, signature) + except (ValueError, AttributeError): + return False + + +def verify_webhook_request( + payload: bytes, + signature_header: Optional[str], + secret_key: str, +) -> bool: + """ + Verify a webhook request from raw bytes payload and headers. + + Convenience wrapper that decodes bytes and delegates to + verify_webhook_signature. + + Args: + payload: Raw bytes of the request body. + signature_header: The X-SoroScan-Signature header value or None. + secret_key: The shared signing key. + + Returns: + True if the signature is valid, False otherwise. + """ + if not signature_header: + return False + payload_str = payload.decode("utf-8") if isinstance(payload, bytes) else payload + return verify_webhook_signature(payload_str, signature_header, secret_key) diff --git a/django-backend/soroscan/middleware.py b/django-backend/soroscan/middleware.py index ee9e7a4c..97d01723 100644 --- a/django-backend/soroscan/middleware.py +++ b/django-backend/soroscan/middleware.py @@ -58,6 +58,111 @@ def __call__(self, request): return response +class APIUsageAnalyticsMiddleware: + """Persist API request usage facts for organization analytics.""" + + def __init__(self, get_response): + self.get_response = get_response + + def __call__(self, request): + response = self.get_response(request) + + if request.path.startswith(("/api/", "/graphql/")): + self._record_usage(request, response) + + return response + + def _record_usage(self, request, response) -> None: + try: + from soroscan.ingest.models import APIKey, APIUsageLog, OrganizationMembership + + api_key = self._get_api_key(request, APIKey) + user = getattr(request, "user", None) + if not getattr(user, "is_authenticated", False): + user = getattr(api_key, "user", None) + + organization = None + if api_key and api_key.team_id: + organization = getattr(api_key.team, "organization", None) + if organization is None and user is not None: + membership = ( + OrganizationMembership.objects.select_related("organization") + .filter(user=user) + .order_by("organization_id") + .first() + ) + organization = membership.organization if membership else None + + APIUsageLog.objects.create( + organization=organization, + user=user, + api_key=api_key, + method=request.method, + endpoint=self._endpoint_name(request), + path=request.path[:512], + status_code=getattr(response, "status_code", 0) or 0, + request_bytes=self._request_bytes(request), + response_bytes=self._response_bytes(response), + error_type=self._error_type(response), + ) + except Exception: + logger.exception("Failed to record API usage analytics") + + @staticmethod + def _get_api_key(request, api_key_model): + auth = request.META.get("HTTP_AUTHORIZATION", "") + key = "" + if auth.lower().startswith("apikey "): + key = auth[7:].strip() + if not key: + key = request.GET.get("api_key", "") + if not key: + return None + return ( + api_key_model.objects.select_related("user", "team", "team__organization") + .filter(key=key, is_active=True) + .first() + ) + + @staticmethod + def _endpoint_name(request) -> str: + match = getattr(request, "resolver_match", None) + if match: + route = getattr(match, "route", "") + if route: + return route[:255] + if match.view_name: + return match.view_name[:255] + return request.path[:255] + + @staticmethod + def _request_bytes(request) -> int: + try: + return max(0, int(request.META.get("CONTENT_LENGTH") or 0)) + except (TypeError, ValueError): + return 0 + + @staticmethod + def _response_bytes(response) -> int: + try: + return max(0, int(response.get("Content-Length") or 0)) + except (TypeError, ValueError): + pass + if getattr(response, "streaming", False): + return 0 + content = getattr(response, "content", b"") + return len(content or b"") + + @staticmethod + def _error_type(response) -> str: + status_code = getattr(response, "status_code", 0) or 0 + if status_code < 400: + return "" + if status_code >= 500: + return f"server_error_{status_code}" + return f"client_error_{status_code}" + + class ReverseProxyFixedIPMiddleware: """ Middleware to handle rate limiting behind a reverse proxy. @@ -203,73 +308,3 @@ def __call__(self, request): response["Link"] = f'<{config.get("replacement", "")}>; rel="replacement"' break return response - - -_STATIC_PATH_PREFIXES = ("/static/", "/media/", "/favicon.ico") - -ip_logger = logging.getLogger("soroscan.ip_access") - - -class ClientIPLoggingMiddleware: - """ - Log the client IP address, HTTP method, and request path for every - incoming API request. - - The client IP is read from REMOTE_ADDR, which is expected to already - be set correctly by ReverseProxyFixedIPMiddleware when running behind - a proxy. Static-asset paths are excluded to avoid log noise. - """ - - def __init__(self, get_response): - self.get_response = get_response - - def __call__(self, request): - path = request.path - if not path.startswith(_STATIC_PATH_PREFIXES): - client_ip = request.META.get("REMOTE_ADDR", "unknown") - ip_logger.info( - "%s %s from %s", - request.method, - path, - client_ip, - extra={ - "client_ip": client_ip, - "method": request.method, - "path": path, - }, - ) - return self.get_response(request) - - -class CacheBustingMiddleware: - """ - Add Cache-Control headers to API responses. - - Supports cache busting via: - - ``Cache-Control: no-cache`` header from client - - ``X-Cache-Bust`` header from client - - ``Last-Modified`` / ``ETag`` conditional request support - """ - - CACHE_CONTROL_PATHS = ("/api/", "/graphql/") - - def __init__(self, get_response): - self.get_response = get_response - - def __call__(self, request): - # Check if client requests cache bypass - cache_bust = ( - request.headers.get("Cache-Control") == "no-cache" - or request.headers.get("X-Cache-Bust") == "1" - ) - - response = self.get_response(request) - - if any(request.path.startswith(p) for p in self.CACHE_CONTROL_PATHS): - if cache_bust: - response["Cache-Control"] = "no-cache, no-store, must-revalidate" - response["Pragma"] = "no-cache" - else: - response["Cache-Control"] = "private, max-age=0" - - return response \ No newline at end of file diff --git a/django-backend/soroscan/settings.py b/django-backend/soroscan/settings.py index c10a8612..8c87aed7 100644 --- a/django-backend/soroscan/settings.py +++ b/django-backend/soroscan/settings.py @@ -125,6 +125,7 @@ def _load_software_version() -> str: "django.middleware.gzip.GZipMiddleware", "django.middleware.csrf.CsrfViewMiddleware", "django.contrib.auth.middleware.AuthenticationMiddleware", + "soroscan.middleware.APIUsageAnalyticsMiddleware", "django.contrib.messages.middleware.MessageMiddleware", "django.middleware.clickjacking.XFrameOptionsMiddleware", # PrometheusAfterMiddleware must be last to record response codes/latencies. @@ -371,8 +372,28 @@ def _load_software_version() -> str: "task": "ingest.tasks.warm_event_count_cache", "schedule": 300, # every 5 minutes }, + "analyze-transaction-costs": { + "task": "ingest.tasks.analyze_transaction_costs", + "schedule": 3600, # hourly + }, + "detect-event-anomalies": { + "task": "ingest.tasks.detect_event_anomalies", + "schedule": 3600, # hourly + }, + "rotate-expired-signing-keys": { + "task": "ingest.tasks.rotate_expired_signing_keys", + "schedule": 86400, # daily + }, } +# Analytics / Anomaly Detection Configuration +# Threshold for event volume drop anomaly detection (percentage) +ANOMALY_DROP_THRESHOLD_PCT = env.int("ANOMALY_DROP_THRESHOLD_PCT", default=50) + +# Signing Key Rotation Configuration +# Number of days to retain expired signing keys after rotation +SIGNING_KEY_RETENTION_DAYS = env.int("SIGNING_KEY_RETENTION_DAYS", default=7) + # Data Retention Configuration # Number of days to retain deduplication logs before cleanup DEDUP_LOG_RETENTION_DAYS = env("DEDUP_LOG_RETENTION_DAYS", default=90, cast=int) @@ -650,10 +671,3 @@ def _load_software_version() -> str: "replacement": "/graphql/" } } - -if 'test' in sys.argv: - CACHES = { - 'default': { - 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache', - } - } diff --git a/django-backend/soroscan/settings_test.py b/django-backend/soroscan/settings_test.py index def0eae8..62618f00 100644 --- a/django-backend/soroscan/settings_test.py +++ b/django-backend/soroscan/settings_test.py @@ -211,4 +211,7 @@ } MAX_REQUEST_BODY_SIZE = 10485760 -DEPRECATED_ENDPOINTS = {} \ No newline at end of file +DEPRECATED_ENDPOINTS = {} + +# Signing Key Rotation +SIGNING_KEY_RETENTION_DAYS = 7 \ No newline at end of file diff --git a/django-backend/soroscan/urls.py b/django-backend/soroscan/urls.py index 75348e05..3226e012 100644 --- a/django-backend/soroscan/urls.py +++ b/django-backend/soroscan/urls.py @@ -22,7 +22,7 @@ audit_trail_view, cache_stats_view, contract_status, - db_explain_view, + organization_api_usage_analytics_view, rate_limit_analytics_view, webhook_batch_delivery_status_view, webhook_delivery_metrics_view, @@ -49,6 +49,8 @@ path("api/audit-trail/", audit_trail_view, name="audit-trail"), path("api/contracts/status/", contract_status, name="contract-status"), path("api/analytics/rate-limits/", rate_limit_analytics_view, name="rate-limit-analytics"), + path("api/analytics/api-usage/", organization_api_usage_analytics_view, name="organization-api-usage-analytics"), + path("api/analytics/api-usage.csv/", organization_api_usage_analytics_view, {"format": "csv"}, name="organization-api-usage-analytics-csv"), path("api/meta/db-pool/", db_pool_stats_view, name="db-pool-stats"), path("api/dev/summary/", dev_summary_view, name="dev-summary"), path("api/admin/db/explain/", db_explain_view, name="admin-db-explain"), @@ -79,4 +81,3 @@ # Silk profiling UI — available only when ENABLE_SILK is set if getattr(settings, "ENABLE_SILK", False): urlpatterns += [path("silk/", include("silk.urls", namespace="silk"))] - diff --git a/sdk/python/soroscan/__init__.py b/sdk/python/soroscan/__init__.py index ed59b5f8..63f9ebfa 100644 --- a/sdk/python/soroscan/__init__.py +++ b/sdk/python/soroscan/__init__.py @@ -12,6 +12,7 @@ AsyncContractQueryBuilder, ) from soroscan.pagination import AsyncPaginator, Paginator +from soroscan.webhooks import verify_webhook, verify_webhook_signature from soroscan.exceptions import ( SoroScanAPIError, SoroScanAuthError, @@ -39,6 +40,8 @@ "AsyncContractQueryBuilder", "Paginator", "AsyncPaginator", + "verify_webhook", + "verify_webhook_signature", "ContractEvent", "TrackedContract", "WebhookSubscription", diff --git a/sdk/python/soroscan/webhooks.py b/sdk/python/soroscan/webhooks.py new file mode 100644 index 00000000..99fb7921 --- /dev/null +++ b/sdk/python/soroscan/webhooks.py @@ -0,0 +1,102 @@ +""" +Webhook signature verification utilities for the SoroScan Python SDK. + +Provides helpers to verify HMAC signatures on incoming webhooks. +""" + +import hashlib +import hmac +from typing import Dict, Optional, Union + + +def verify_webhook_signature( + payload: Union[str, bytes], + signature_header_value: Optional[str], + secret_key: str, + known_algorithms: tuple = ("sha256", "sha1"), +) -> bool: + """ + Verify the X-SoroScan-Signature header of an incoming webhook. + + Args: + payload: The raw webhook body (string or bytes). + signature_header_value: + The value of the ``X-SoroScan-Signature`` header + (e.g. ``"sha256=abc123..."``). If ``None`` verification fails. + secret_key: The shared HMAC signing key (hex-encoded string). + known_algorithms: + Acceptable hash prefixes. Defaults to ``("sha256", "sha1")``. + + Returns: + ``True`` if the signature is valid, ``False`` otherwise. + + Usage:: + + from soroscan.webhooks import verify_webhook_signature + + is_valid = verify_webhook_signature( + payload=request.body, + signature_header_value=request.headers.get("X-SoroScan-Signature"), + secret_key="", + ) + if not is_valid: + raise PermissionError("Invalid webhook signature") + """ + if not signature_header_value: + return False + + try: + if "=" not in signature_header_value: + return False + + algorithm, signature = signature_header_value.split("=", 1) + if algorithm not in known_algorithms: + return False + + if isinstance(payload, bytes): + payload_str = payload.decode("utf-8") + else: + payload_str = payload + + digestmod = hashlib.sha256 if algorithm == "sha256" else hashlib.sha1 + expected = hmac.new( + secret_key.encode("utf-8"), + payload_str.encode("utf-8"), + digestmod=digestmod, + ).hexdigest() + + return hmac.compare_digest(expected, signature) + except (ValueError, AttributeError, TypeError): + return False + + +def verify_webhook( + payload: Union[str, bytes], + headers: Dict[str, str], + secret_key: str, +) -> bool: + """ + Convenience wrapper that extracts the signature header from a + headers dict. + + Args: + payload: The raw webhook body (string or bytes). + headers: A dict-like object of request headers (case-insensitive + keys recommended). + secret_key: The shared HMAC signing key. + + Returns: + ``True`` if the signature is valid, ``False`` otherwise. + """ + header_value = None + for key, value in headers.items(): + if key.lower() == "x-soroscan-signature": + header_value = value + break + return verify_webhook_signature(payload, header_value, secret_key) + + +__all__ = [ + "verify_webhook_signature", + "verify_webhook", +] diff --git a/sdk/typescript/src/webhooks.ts b/sdk/typescript/src/webhooks.ts new file mode 100644 index 00000000..a2305f1d --- /dev/null +++ b/sdk/typescript/src/webhooks.ts @@ -0,0 +1,84 @@ +import { createHmac, timingSafeEqual } from "node:crypto"; + +/** + * Verify the X-SoroScan-Signature header of an incoming webhook. + * + * @param payload - The raw webhook body as a string. + * @param signatureHeader - The value of the X-SoroScan-Signature header + * (e.g. `"sha256=abc123..."`). + * @param secretKey - The shared HMAC signing key. + * @returns `true` if the signature is valid, `false` otherwise. + * + * @example + * ```ts + * import { verifyWebhookSignature } from "@soroscan/sdk/webhooks"; + * + * const isValid = verifyWebhookSignature( + * JSON.stringify(req.body), + * req.headers["x-soroscan-signature"], + * process.env.SOROSCAN_WEBHOOK_SECRET!, + * ); + * ``` + */ +export function verifyWebhookSignature( + payload: string, + signatureHeader: string | undefined | null, + secretKey: string, +): boolean { + if (!signatureHeader) { + return false; + } + + const eqIndex = signatureHeader.indexOf("="); + if (eqIndex === -1) { + return false; + } + + const algorithm = signatureHeader.slice(0, eqIndex); + const signature = signatureHeader.slice(eqIndex + 1); + + let hashAlgo: string; + if (algorithm === "sha256") { + hashAlgo = "sha256"; + } else if (algorithm === "sha1") { + hashAlgo = "sha1"; + } else { + return false; + } + + const expected = createHmac(hashAlgo, secretKey) + .update(payload) + .digest("hex"); + + try { + const expectedBuf = Buffer.from(expected, "utf-8"); + const sigBuf = Buffer.from(signature, "utf-8"); + return ( + expectedBuf.length === sigBuf.length && + timingSafeEqual(expectedBuf, sigBuf) + ); + } catch { + return false; + } +} + +/** + * Convenience wrapper that extracts the signature header from a headers + * object (case-insensitive). + * + * @param payload - The raw webhook body as a string. + * @param headers - An object of request headers. + * @param secretKey - The shared HMAC signing key. + * @returns `true` if the signature is valid, `false` otherwise. + */ +export function verifyWebhook( + payload: string, + headers: Record, + secretKey: string, +): boolean { + const headerValue = Object.entries(headers).find( + ([key]) => key.toLowerCase() === "x-soroscan-signature", + )?.[1]; + const signatureValue = Array.isArray(headerValue) ? headerValue[0] : headerValue; + return verifyWebhookSignature(payload, signatureValue, secretKey); +}