Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
457 changes: 414 additions & 43 deletions backend/Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ rand = "0.8"
sqlx = { version = "0.7", features = ["runtime-tokio-rustls", "postgres", "chrono", "uuid", "macros", "migrate", "rust_decimal"] }
rust_decimal = { version = "1.33", features = ["serde-float", "maths"] }
dotenvy = "0.15"
reqwest = { version = "0.11", features = ["json", "rustls-tls"] }
hmac = "0.12"
sha2 = "0.10"
hex = "0.4"
Expand All @@ -29,7 +30,6 @@ jsonwebtoken = "9.0"
base64 = "0.21"
stellar-strkey = "0.0.8"
ed25519-dalek = { version = "2.1", features = ["pkcs8", "rand_core"] }

dashmap = "6"
prometheus = { version = "0.13", features = ["process"] }
once_cell = "1.19"
2 changes: 2 additions & 0 deletions backend/migrations/20260625000000_create_webhooks.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS webhook_dispatches;
DROP TABLE IF EXISTS webhook_endpoints;
25 changes: 25 additions & 0 deletions backend/migrations/20260625000000_create_webhooks.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
-- Create webhook endpoints and dispatch records
CREATE TABLE IF NOT EXISTS webhook_endpoints (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL,
url TEXT NOT NULL,
secret TEXT NOT NULL,
is_active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE TABLE IF NOT EXISTS webhook_dispatches (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
endpoint_id UUID NOT NULL REFERENCES webhook_endpoints (id) ON DELETE CASCADE,
event_type TEXT NOT NULL,
payload JSONB NOT NULL,
attempts INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 5,
status TEXT NOT NULL DEFAULT 'pending',
last_error TEXT,
next_attempt_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX IF NOT EXISTS idx_webhook_dispatches_status_next ON webhook_dispatches (status, next_attempt_at);
30 changes: 22 additions & 8 deletions backend/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ use tower_http::cors::CorsLayer;
use tracing::error;
use uuid::Uuid;


use crate::stellar_anchor::{AnchorPayout, AnchorRegistry};

use crate::WebhookDispatcherService;

use crate::auth::signature_auth_middleware;
use crate::cache::PlanCache;
use crate::kyc_webhook::kyc_webhook_handler;
Expand All @@ -28,6 +33,7 @@ use crate::stellar_anchor::AnchorRegistry;
use crate::ws::{ws_handler, KycUpdateEvent};
use crate::yield_calculator;


#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlanBeneficiary {
pub address: String,
Expand All @@ -52,7 +58,6 @@ pub struct Plan {
pub struct AppState {
pub anchor: Arc<AnchorRegistry>,
pub db_pool: sqlx::PgPool,
pub kyc_tx: tokio::sync::broadcast::Sender<KycUpdateEvent>,
pub kyc_webhook_secret: Option<String>,
pub apy_config: yield_calculator::ApyConfig,
pub plan_cache: PlanCache,
Expand All @@ -64,7 +69,7 @@ pub struct PlanQuery {
pub beneficiary: Option<String>,
}

#[derive(Deserialize)]
#[derive(Deserialize, serde::Serialize)]
pub struct PingRequest {
pub owner: String,
pub signature: String,
Expand Down Expand Up @@ -144,14 +149,13 @@ pub fn create_router(state: Arc<AppState>) -> Router {
let public_routes = Router::new()
.route("/api/plans", get(get_plans))
.route("/api/anchor/payout-status", get(get_anchor_payouts))
.route("/api/kyc/webhook", post(kyc_webhook_handler))
.route("/api/kyc/webhook", post(kyc_webhook_handler))
.route("/api/kyc/status", get(get_kyc_status))
.route("/api/kyc/submit", post(submit_kyc))
.route("/api/kyc/upload", post(upload_kyc_document))
.route("/api/kyc/required", get(is_kyc_required))
.route("/api/kyc/requirements", get(get_kyc_requirements))
.route("/ws/kyc", get(ws_handler));

Router::new()
.merge(user_routes)
.merge(public_routes)
Expand Down Expand Up @@ -573,6 +577,18 @@ async fn create_plan(
beneficiaries: inserted_beneficiaries,
};

// 3. Enqueue webhook event for plan.created (non-blocking)
let payload_value = serde_json::to_value(&response).unwrap_or(serde_json::json!({}));
if let Err(e) = crate::WebhookDispatcherService::enqueue_event(
&state.db_pool,
"plan.created",
&payload_value,
)
.await
{
tracing::warn!("Failed to enqueue webhook for plan.created: {:?}", e);
}

(StatusCode::CREATED, Json(response)).into_response()
}

Expand Down Expand Up @@ -780,15 +796,14 @@ async fn ping_plan(
State(state): State<Arc<AppState>>,
Json(payload): Json<PingRequest>,
) -> impl IntoResponse {
// 1. Verify signature
// 1. Verify signature
if !verify_ping_signature(&payload.owner, &payload.signature, &payload.message) {
return (
StatusCode::UNAUTHORIZED,
Json(serde_json::json!({ "error": "Invalid signature" })),
)
.into_response();
}

// 2. Fetch the active plan from DB
let plan = match sqlx::query_as::<_, PlanRow>(
"SELECT * FROM plans WHERE owner_address = $1 AND is_active = true",
Expand All @@ -813,14 +828,13 @@ async fn ping_plan(
.into_response();
}
};

// 3. Calculate accumulated yield
let current_time = chrono::Utc::now().timestamp();
let elapsed = if current_time > plan.last_ping {
(current_time - plan.last_ping) as u64
} else {
0
};
};r

let mut new_accrued_yield: rust_decimal::Decimal = plan.accrued_yield;
if plan.earn_yield && elapsed > 0 {
Expand Down
17 changes: 17 additions & 0 deletions backend/src/inactivity_watchdog.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::WebhookDispatcherService;
use chrono::{DateTime, Utc};
use serde_json::json;
use sqlx::PgPool;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -146,6 +148,21 @@ impl InactivityWatchdogService {
inactivity_deadline_at = %plan.inactivity_deadline_at,
"Plan marked claimable by inactivity watchdog"
);

// enqueue webhook for claimable/claimed plans
let payload = serde_json::json!({
"plan_id": plan.id,
"user_id": plan.user_id,
"title": plan.title,
"inactivity_deadline_at": plan.inactivity_deadline_at,
});

if let Err(e) =
crate::WebhookDispatcherService::enqueue_event(&self.db, "plan.claimable", &payload)
.await
{
warn!("Failed to enqueue webhook for plan.claimable: {:?}", e);
}
}

tx.commit().await?;
Expand Down
5 changes: 4 additions & 1 deletion backend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@ pub mod cache;
pub mod config;
pub mod db;
pub mod inactivity_watchdog;

pub mod kyc_webhook;
pub mod metrics;
pub mod middleware;

pub mod stellar_anchor;
pub mod telemetry;
pub mod ws;
pub mod webhooks;
pub mod yield_calculator;

pub use api::{create_router, AppState, PlanResponse};
pub use cache::PlanCache;
pub use config::Config;
pub use db::DbManager;
pub use inactivity_watchdog::{InactivityWatchdogConfig, InactivityWatchdogService};
pub use webhooks::WebhookDispatcherService;
18 changes: 12 additions & 6 deletions backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize tracing logging
telemetry::init_tracing()?;


// Initialize Prometheus metrics
metrics::init();

//loading the .env

dotenvy::dotenv().ok();

// Load configuration
Expand All @@ -28,7 +30,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
inheritx_backend::PlanCache::disabled()
});

// Attempt to connect to PostgreSQL stub/real
// Connect to PostgreSQL and run migrations
let db_pool = match DbManager::create_pool(&config.database_url).await {
Ok(pool) => {
info!("Successfully connected to PostgreSQL database.");
Expand All @@ -39,35 +41,38 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

pool
}

Err(e) => {
error!(
"Failed to connect to PostgreSQL database ({}): {:?}",
config.database_url, e
);

std::process::exit(1);
}
};

// Initialize state skeleton
let (kyc_tx, _) = tokio::sync::broadcast::channel(100);
// Initialize state
let state = Arc::new(AppState {
anchor: Arc::new(inheritx_backend::stellar_anchor::AnchorRegistry::new()),
db_pool: db_pool.clone(),
kyc_tx,
kyc_webhook_secret: std::env::var("KYC_WEBHOOK_SECRET").ok(),
apy_config: inheritx_backend::yield_calculator::ApyConfig::from_env(),
plan_cache: plan_cache.clone(),
});

// Start inactivity watchdog
let inactivity_watchdog = Arc::new(InactivityWatchdogService::new(
db_pool.clone(),
plan_cache,
InactivityWatchdogConfig::from_env(),
));
inactivity_watchdog.start();

r
let webhook_dispatcher = Arc::new(inheritx_backend::WebhookDispatcherService::new(
db_pool.clone(),
));
webhook_dispatcher.start();

// Periodically refresh DB pool metrics
{
let pool = db_pool.clone();
Expand All @@ -80,6 +85,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
});
}


// Create Axum application
let app = create_router(state);

Expand Down
Loading
Loading