From 4e3a837ac9c026017896d0523c7823dc8729370f Mon Sep 17 00:00:00 2001 From: Alimzy Date: Thu, 25 Jun 2026 10:36:15 +0100 Subject: [PATCH 1/5] feat(webhooks): add outgoing webhook dispatcher, DB schema, and enqueue points --- backend/Cargo.lock | 456 ++++++++++++++++-- backend/Cargo.toml | 4 + .../20260625000000_create_webhooks.down.sql | 2 + .../20260625000000_create_webhooks.up.sql | 25 + backend/src/api.rs | 17 +- backend/src/inactivity_watchdog.rs | 14 + backend/src/lib.rs | 2 + backend/src/main.rs | 4 + backend/src/webhooks.rs | 165 +++++++ 9 files changed, 660 insertions(+), 29 deletions(-) create mode 100644 backend/migrations/20260625000000_create_webhooks.down.sql create mode 100644 backend/migrations/20260625000000_create_webhooks.up.sql create mode 100644 backend/src/webhooks.rs diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 47f278cd0..a1520c9ab 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -94,10 +94,10 @@ dependencies = [ "bytes", "form_urlencoded", "futures-util", - "http", - "http-body", + "http 1.4.2", + "http-body 1.0.1", "http-body-util", - "hyper", + "hyper 1.10.1", "hyper-util", "itoa", "matchit", @@ -110,7 +110,7 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_urlencoded", - "sync_wrapper", + "sync_wrapper 1.0.2", "tokio", "tower", "tower-layer", @@ -126,12 +126,12 @@ checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1" dependencies = [ "bytes", "futures-core", - "http", - "http-body", + "http 1.4.2", + "http-body 1.0.1", "http-body-util", "mime", "pin-project-lite", - "sync_wrapper", + "sync_wrapper 1.0.2", "tower-layer", "tower-service", "tracing", @@ -160,6 +160,12 @@ version = "1.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2af50177e190e07a26ab74f8b1efbfe2ef87da2116221318cb1c2e82baf7de06" +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "bitflags" version = "2.13.0" @@ -296,6 +302,26 @@ version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "core-foundation" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2a6cd9ae233e7f62ba4e9353e81a88df7fc8a5987b8d445b4d90c879bd156f6" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -465,6 +491,27 @@ dependencies = [ "spin", ] +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.2" @@ -595,6 +642,25 @@ dependencies = [ "r-efi 6.0.0", ] +[[package]] +name = "h2" +version = "0.3.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0beca50380b1fc32983fc1cb4587bfa4bb9e78fc259aad4a0032d2080309222d" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 0.2.12", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -671,6 +737,17 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "http" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http" version = "1.4.2" @@ -681,6 +758,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-body" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" +dependencies = [ + "bytes", + "http 0.2.12", + "pin-project-lite", +] + [[package]] name = "http-body" version = "1.0.1" @@ -688,7 +776,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", - "http", + "http 1.4.2", ] [[package]] @@ -699,8 +787,8 @@ checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" dependencies = [ "bytes", "futures-core", - "http", - "http-body", + "http 1.4.2", + "http-body 1.0.1", "pin-project-lite", ] @@ -716,6 +804,30 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "hyper" +version = "0.14.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41dfc780fdec9373c01bae43289ea34c972e40ee3c9f6b3c8801a35f35586ce7" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2", + "http 0.2.12", + "http-body 0.4.6", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2 0.5.10", + "tokio", + "tower-service", + "tracing", + "want", +] + [[package]] name = "hyper" version = "1.10.1" @@ -726,8 +838,8 @@ dependencies = [ "bytes", "futures-channel", "futures-core", - "http", - "http-body", + "http 1.4.2", + "http-body 1.0.1", "httparse", "httpdate", "itoa", @@ -736,6 +848,33 @@ dependencies = [ "tokio", ] +[[package]] +name = "hyper-rustls" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" +dependencies = [ + "futures-util", + "http 0.2.12", + "hyper 0.14.32", + "rustls", + "tokio", + "tokio-rustls", +] + +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper 0.14.32", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "hyper-util" version = "0.1.20" @@ -743,9 +882,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0" dependencies = [ "bytes", - "http", - "http-body", - "hyper", + "http 1.4.2", + "http-body 1.0.1", + "hyper 1.10.1", "pin-project-lite", "tokio", "tower-service", @@ -896,10 +1035,14 @@ dependencies = [ "axum", "chrono", "dotenvy", + "hex", + "hmac", "rand", + "reqwest", "rust_decimal", "serde", "serde_json", + "sha2", "sqlx", "thiserror", "tokio", @@ -910,6 +1053,12 @@ dependencies = [ "uuid", ] +[[package]] +name = "ipnet" +version = "2.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d98f6fed1fde3f8c21bc40a1abb88dd75e67924f9cffc3ef95607bad8017f8e2" + [[package]] name = "itoa" version = "1.0.18" @@ -954,7 +1103,7 @@ version = "0.1.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f02ab6bace2054fb888a3c16f990117b579d14a3088e472d63c6011fa185c9d3" dependencies = [ - "bitflags", + "bitflags 2.13.0", "libc", "plain", "redox_syscall 0.8.1", @@ -1061,7 +1210,7 @@ dependencies = [ "bytes", "encoding_rs", "futures-util", - "http", + "http 1.4.2", "httparse", "memchr", "mime", @@ -1069,6 +1218,23 @@ dependencies = [ "version_check", ] +[[package]] +name = "native-tls" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "465500e14ea162429d264d44189adc38b199b62b1c21eea9f69e4b73cb03bbf2" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "nom" version = "7.1.3" @@ -1140,6 +1306,49 @@ version = "1.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" +[[package]] +name = "openssl" +version = "0.10.81" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77823a27f0babb03091cb9ed9ef80af3b39dbc82f97e8fa530374b7dafd87a45" +dependencies = [ + "bitflags 2.13.0", + "cfg-if", + "foreign-types", + "libc", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.118", +] + +[[package]] +name = "openssl-probe" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" + +[[package]] +name = "openssl-sys" +version = "0.9.117" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b47e7e6bb2c38cd930d25a23b40fa52e068c10e85f3e03a7f5ba5aaca5713695" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "parking_lot" version = "0.12.5" @@ -1342,7 +1551,7 @@ version = "0.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" dependencies = [ - "bitflags", + "bitflags 2.13.0", ] [[package]] @@ -1351,7 +1560,7 @@ version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b44b894f2a6e36457d665d1e08c3866add6ed5e70050c1b4ba8a8ddedb02ce7" dependencies = [ - "bitflags", + "bitflags 2.13.0", ] [[package]] @@ -1380,6 +1589,50 @@ dependencies = [ "bytecheck", ] +[[package]] +name = "reqwest" +version = "0.11.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62" +dependencies = [ + "base64", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.32", + "hyper-rustls", + "hyper-tls", + "ipnet", + "js-sys", + "log", + "mime", + "native-tls", + "once_cell", + "percent-encoding", + "pin-project-lite", + "rustls", + "rustls-pemfile", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper 0.1.2", + "system-configuration", + "tokio", + "tokio-native-tls", + "tokio-rustls", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "webpki-roots", + "winreg", +] + [[package]] name = "ring" version = "0.17.14" @@ -1466,7 +1719,7 @@ version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190" dependencies = [ - "bitflags", + "bitflags 2.13.0", "errno", "libc", "linux-raw-sys", @@ -1479,6 +1732,7 @@ version = "0.21.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" dependencies = [ + "log", "ring", "rustls-webpki", "sct", @@ -1515,6 +1769,15 @@ version = "1.0.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f" +[[package]] +name = "schannel" +version = "0.1.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91c1b7e4904c873ef0710c1f407dde2e6287de2bebc1bbbf7d430bb7cbffd939" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -1537,6 +1800,29 @@ version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" +[[package]] +name = "security-framework" +version = "3.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7f4bc775c73d9a02cde8bf7b2ec4c9d12743edf609006c7facc23998404cd1d" +dependencies = [ + "bitflags 2.13.0", + "core-foundation 0.10.1", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ce2691df843ecc5d231c0b14ece2acc3efb62c0a398c7e1d875f3983ce020e3" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "serde" version = "1.0.228" @@ -1678,6 +1964,16 @@ version = "1.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ed6a63f02c8539c91a8685a86f4099661ba3da017932f6ebbea6de3f0fa7c90" +[[package]] +name = "socket2" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "socket2" version = "0.6.4" @@ -1821,7 +2117,7 @@ checksum = "1ed31390216d20e538e447a7a9b959e06ed9fc51c37b514b46eb758016ecd418" dependencies = [ "atoi", "base64", - "bitflags", + "bitflags 2.13.0", "byteorder", "bytes", "chrono", @@ -1865,7 +2161,7 @@ checksum = "7c824eb80b894f926f89a0b9da0c7f435d27cdd35b8c655b114e58223918577e" dependencies = [ "atoi", "base64", - "bitflags", + "bitflags 2.13.0", "byteorder", "chrono", "crc", @@ -1967,6 +2263,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + [[package]] name = "sync_wrapper" version = "1.0.2" @@ -1984,6 +2286,27 @@ dependencies = [ "syn 2.0.118", ] +[[package]] +name = "system-configuration" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" +dependencies = [ + "bitflags 1.3.2", + "core-foundation 0.9.4", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "tap" version = "1.0.1" @@ -2069,7 +2392,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.6.4", "tokio-macros", "windows-sys 0.61.2", ] @@ -2085,6 +2408,26 @@ dependencies = [ "syn 2.0.118", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + +[[package]] +name = "tokio-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +dependencies = [ + "rustls", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.18" @@ -2096,6 +2439,19 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-util" +version = "0.7.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ae9cec805b01e8fc3fd2fe289f89149a9b66dd16786abd8b19cfa7b48cb0098" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + [[package]] name = "toml_datetime" version = "1.1.1+spec-1.1.0" @@ -2135,7 +2491,7 @@ dependencies = [ "futures-core", "futures-util", "pin-project-lite", - "sync_wrapper", + "sync_wrapper 1.0.2", "tokio", "tower-layer", "tower-service", @@ -2148,10 +2504,10 @@ version = "0.6.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4cfcf7e2740e6fc6d4d688b4ef00650406bb94adf4731e43c096c3a19fe40840" dependencies = [ - "bitflags", + "bitflags 2.13.0", "bytes", - "http", - "http-body", + "http 1.4.2", + "http-body 1.0.1", "pin-project-lite", "tower", "tower-layer", @@ -2247,6 +2603,12 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "try-lock" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" + [[package]] name = "typenum" version = "1.20.1" @@ -2307,6 +2669,7 @@ dependencies = [ "form_urlencoded", "idna", "percent-encoding", + "serde", ] [[package]] @@ -2351,6 +2714,15 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "want" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e" +dependencies = [ + "try-lock", +] + [[package]] name = "wasi" version = "0.11.1+wasi-snapshot-preview1" @@ -2386,6 +2758,16 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.75" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "503b14d284f2c8dac03b819967e155ea753f573586193b2b2c95990cb5d69280" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.125" @@ -2418,6 +2800,16 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "web-sys" +version = "0.3.102" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6430a72df5eb332242960fe84b3002a241163998241eb596d4f739b9757061d" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki-roots" version = "0.25.4" @@ -2650,6 +3042,16 @@ dependencies = [ "memchr", ] +[[package]] +name = "winreg" +version = "0.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" +dependencies = [ + "cfg-if", + "windows-sys 0.48.0", +] + [[package]] name = "wit-bindgen" version = "0.57.1" diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 575e94a61..3b721f0b6 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -20,3 +20,7 @@ rand = "0.8" sqlx = { version = "0.7", features = ["runtime-tokio-rustls", "postgres", "chrono", "uuid", "macros", "migrate"] } rust_decimal = { version = "1.33", features = ["serde-float"] } dotenvy = "0.15" +reqwest = { version = "0.11", features = ["json", "rustls-tls"] } +hmac = "0.12" +sha2 = "0.10" +hex = "0.4" diff --git a/backend/migrations/20260625000000_create_webhooks.down.sql b/backend/migrations/20260625000000_create_webhooks.down.sql new file mode 100644 index 000000000..5f23e9d80 --- /dev/null +++ b/backend/migrations/20260625000000_create_webhooks.down.sql @@ -0,0 +1,2 @@ +DROP TABLE IF EXISTS webhook_dispatches; +DROP TABLE IF EXISTS webhook_endpoints; diff --git a/backend/migrations/20260625000000_create_webhooks.up.sql b/backend/migrations/20260625000000_create_webhooks.up.sql new file mode 100644 index 000000000..39ebb80a6 --- /dev/null +++ b/backend/migrations/20260625000000_create_webhooks.up.sql @@ -0,0 +1,25 @@ +-- Create webhook endpoints and dispatch records +CREATE TABLE IF NOT EXISTS webhook_endpoints ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + name TEXT NOT NULL, + url TEXT NOT NULL, + secret TEXT NOT NULL, + is_active BOOLEAN NOT NULL DEFAULT TRUE, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS webhook_dispatches ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + endpoint_id UUID NOT NULL REFERENCES webhook_endpoints (id) ON DELETE CASCADE, + event_type TEXT NOT NULL, + payload JSONB NOT NULL, + attempts INTEGER NOT NULL DEFAULT 0, + max_attempts INTEGER NOT NULL DEFAULT 5, + status TEXT NOT NULL DEFAULT 'pending', + last_error TEXT, + next_attempt_at TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_webhook_dispatches_status_next ON webhook_dispatches (status, next_attempt_at); diff --git a/backend/src/api.rs b/backend/src/api.rs index d92a5e0e5..f0685a95b 100644 --- a/backend/src/api.rs +++ b/backend/src/api.rs @@ -10,6 +10,7 @@ use std::sync::Arc; use tower_http::cors::{Any, CorsLayer}; use crate::stellar_anchor::{AnchorPayout, AnchorRegistry}; +use crate::WebhookDispatcherService; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PlanBeneficiary { @@ -42,7 +43,7 @@ pub struct PlanQuery { pub owner: Option, } -#[derive(Deserialize)] +#[derive(Deserialize, serde::Serialize)] pub struct PingRequest { pub owner: String, } @@ -78,6 +79,12 @@ async fn create_plan( State(_state): State>, Json(payload): Json, ) -> impl IntoResponse { + // enqueue webhook event for plan.created + let payload_value = serde_json::to_value(&payload).unwrap_or(serde_json::json!({})); + if let Err(e) = inheritx_backend::WebhookDispatcherService::enqueue_event(&_state.db_pool, "plan.created", &payload_value).await { + tracing::warn!("Failed to enqueue webhook for plan.created: {:?}", e); + } + (StatusCode::CREATED, Json(payload)) } @@ -97,7 +104,13 @@ async fn ping_plan( State(_state): State>, Json(_payload): Json, ) -> impl IntoResponse { - (StatusCode::NOT_IMPLEMENTED, "Ping logic not implemented") + // enqueue webhook event for plan.pinged + let payload_value = serde_json::to_value(&_payload).unwrap_or(serde_json::json!({})); + if let Err(e) = inheritx_backend::WebhookDispatcherService::enqueue_event(&_state.db_pool, "plan.pinged", &payload_value).await { + tracing::warn!("Failed to enqueue webhook for plan.pinged: {:?}", e); + } + + (StatusCode::OK, "Ping accepted") } // Handler: Trigger Payout diff --git a/backend/src/inactivity_watchdog.rs b/backend/src/inactivity_watchdog.rs index 1286b437d..d874eb088 100644 --- a/backend/src/inactivity_watchdog.rs +++ b/backend/src/inactivity_watchdog.rs @@ -1,5 +1,7 @@ use chrono::{DateTime, Utc}; use sqlx::PgPool; +use serde_json::json; +use crate::WebhookDispatcherService; use std::sync::Arc; use std::time::Duration; use tokio::time::MissedTickBehavior; @@ -113,6 +115,18 @@ impl InactivityWatchdogService { inactivity_deadline_at = %plan.inactivity_deadline_at, "Plan marked claimable by inactivity watchdog" ); + + // enqueue webhook for claimable/claimed plans + let payload = serde_json::json!({ + "plan_id": plan.id, + "user_id": plan.user_id, + "title": plan.title, + "inactivity_deadline_at": plan.inactivity_deadline_at, + }); + + if let Err(e) = inheritx_backend::WebhookDispatcherService::enqueue_event(&self.db, "plan.claimable", &payload).await { + warn!("Failed to enqueue webhook for plan.claimable: {:?}", e); + } } tx.commit().await?; diff --git a/backend/src/lib.rs b/backend/src/lib.rs index 04c48d5d6..ca367e3b0 100644 --- a/backend/src/lib.rs +++ b/backend/src/lib.rs @@ -5,8 +5,10 @@ pub mod inactivity_watchdog; pub mod stellar_anchor; pub mod telemetry; pub mod yield_calculator; +pub mod webhooks; pub use api::{create_router, AppState}; pub use config::Config; pub use db::DbManager; pub use inactivity_watchdog::{InactivityWatchdogConfig, InactivityWatchdogService}; +pub use webhooks::{WebhookDispatcherService}; diff --git a/backend/src/main.rs b/backend/src/main.rs index da6c5eefb..1009a13d1 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -51,6 +51,10 @@ async fn main() -> Result<(), Box> { )); inactivity_watchdog.start(); + // Start webhook dispatcher + let webhook_dispatcher = Arc::new(inheritx_backend::WebhookDispatcherService::new(db_pool.clone())); + webhook_dispatcher.start(); + // Create Axum application let app = create_router(state); diff --git a/backend/src/webhooks.rs b/backend/src/webhooks.rs new file mode 100644 index 000000000..647805623 --- /dev/null +++ b/backend/src/webhooks.rs @@ -0,0 +1,165 @@ +use hmac::{Hmac, Mac}; +use reqwest::Client; +use serde_json::Value; +use sha2::Sha256; +use sqlx::{PgPool, Row, Postgres, Transaction}; +use std::sync::Arc; +use std::time::Duration; +use tokio::time::sleep; +use tracing::{error, info, warn}; + +type HmacSha256 = Hmac; + +pub struct WebhookDispatcherService { + db: PgPool, + client: Client, +} + +impl WebhookDispatcherService { + pub fn new(db: PgPool) -> Self { + Self { + db, + client: Client::new(), + } + } + + pub fn start(self: Arc) { + tokio::spawn(async move { + loop { + if let Err(e) = self.run_once().await { + error!("Webhook dispatcher run failed: {e}"); + } + + sleep(Duration::from_secs(5)).await; + } + }); + } + + async fn run_once(&self) -> Result<(), sqlx::Error> { + let mut tx: Transaction<'_, Postgres> = self.db.begin().await?; + + // select pending dispatches ready to run + let rows = sqlx::query( + "SELECT wd.id, wd.endpoint_id, wd.event_type, wd.payload, wd.attempts, we.url, we.secret + FROM webhook_dispatches wd + JOIN webhook_endpoints we ON wd.endpoint_id = we.id + WHERE wd.status = 'pending' AND (wd.next_attempt_at IS NULL OR wd.next_attempt_at <= NOW()) + ORDER BY wd.created_at ASC + LIMIT 25 FOR UPDATE SKIP LOCKED", + ) + .fetch_all(&mut tx) + .await?; + + for row in rows { + let id: uuid::Uuid = row.get("id"); + let url: String = row.get("url"); + let secret: Option = row.get("secret"); + let payload: Value = row.get("payload"); + let attempts: i32 = row.get("attempts"); + let event_type: String = row.get("event_type"); + + let body = serde_json::to_vec(&payload).unwrap_or_default(); + + // compute HMAC-SHA256 signature + let key = secret.unwrap_or_default(); + let mut mac = HmacSha256::new_from_slice(key.as_bytes()).expect("HMAC can take key of any size"); + mac.update(&body); + let signature = hex::encode(mac.finalize().into_bytes()); + + let res = self + .client + .post(&url) + .header("X-Event-Type", event_type.clone()) + .header("X-Signature", format!("sha256={}", signature)) + .json(&payload) + .send() + .await; + + match res { + Ok(resp) if resp.status().is_success() => { + sqlx::query("UPDATE webhook_dispatches SET status = 'success', updated_at = NOW() WHERE id = $1") + .bind(id) + .execute(&mut tx) + .await?; + info!("Webhook dispatch {} succeeded", id); + } + Ok(mut resp) => { + let status = resp.status().as_u16(); + let text = resp.text().await.unwrap_or_default(); + warn!("Webhook dispatch {} returned status {}: {}", id, status, text); + Self::handle_failure(&mut tx, id, attempts, Some(format!("status {}: {}", status, text))).await?; + } + Err(e) => { + warn!("Webhook dispatch {} request error: {}", id, e); + Self::handle_failure(&mut tx, id, attempts, Some(e.to_string())).await?; + } + } + } + + tx.commit().await?; + Ok(()) + } + + async fn handle_failure(tx: &mut Transaction<'_, Postgres>, id: uuid::Uuid, attempts: i32, last_error: Option) -> Result<(), sqlx::Error> { + let next_attempts = attempts + 1; + let max_attempts_row = sqlx::query("SELECT max_attempts FROM webhook_dispatches WHERE id = $1") + .bind(id) + .fetch_optional(&mut *tx) + .await?; + + let max_attempts: i32 = if let Some(row) = max_attempts_row { + row.get::, _>("max_attempts").unwrap_or(5) + } else { + 5 + }; + + if next_attempts >= max_attempts { + // mark failed + sqlx::query("UPDATE webhook_dispatches SET attempts = $1, status = 'failed', last_error = $2, updated_at = NOW() WHERE id = $3") + .bind(next_attempts) + .bind(last_error) + .bind(id) + .execute(&mut *tx) + .await?; + } else { + let backoff_secs = 2u64.pow(next_attempts as u32); + sqlx::query("UPDATE webhook_dispatches SET attempts = $1, last_error = $2, next_attempt_at = (NOW() + ($3 || ' seconds')::interval), updated_at = NOW() WHERE id = $4") + .bind(next_attempts) + .bind(last_error) + .bind(backoff_secs as i64) + .bind(id) + .execute(&mut *tx) + .await?; + } + + Ok(()) + } + + // Enqueue a payload for all active endpoints + pub async fn enqueue_event(db: &PgPool, event_type: &str, payload: &Value) -> Result<(), sqlx::Error> { + let mut tx = db.begin().await?; + let endpoints = sqlx::query("SELECT id FROM webhook_endpoints WHERE is_active = true").fetch_all(&mut tx).await?; + for ep in endpoints { + let endpoint_id: uuid::Uuid = ep.get("id"); + sqlx::query("INSERT INTO webhook_dispatches (endpoint_id, event_type, payload) VALUES ($1, $2, $3)") + .bind(endpoint_id) + .bind(event_type) + .bind(payload.clone()) + .execute(&mut tx) + .await?; + } + + tx.commit().await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + // light compile-time sanity checks + use super::*; + #[test] + fn smoke() { + let _ = WebhookDispatcherService::new(sqlx::PgPool::connect_lazy("postgres://localhost").unwrap()); + } +} From 13e7f7337b253ff72d1e91b20ecf4c457b76ca01 Mon Sep 17 00:00:00 2001 From: Alimzy Date: Thu, 25 Jun 2026 15:17:35 +0100 Subject: [PATCH 2/5] style: apply rustfmt formatting --- backend/src/api.rs | 16 ++++++++-- backend/src/inactivity_watchdog.rs | 12 ++++++-- backend/src/lib.rs | 6 ++-- backend/src/main.rs | 4 ++- backend/src/webhooks.rs | 48 ++++++++++++++++++++++-------- 5 files changed, 64 insertions(+), 22 deletions(-) diff --git a/backend/src/api.rs b/backend/src/api.rs index 80d8a9a38..607090df4 100644 --- a/backend/src/api.rs +++ b/backend/src/api.rs @@ -87,7 +87,13 @@ async fn create_plan( ) -> impl IntoResponse { // enqueue webhook event for plan.created let payload_value = serde_json::to_value(&payload).unwrap_or(serde_json::json!({})); - if let Err(e) = inheritx_backend::WebhookDispatcherService::enqueue_event(&_state.db_pool, "plan.created", &payload_value).await { + if let Err(e) = inheritx_backend::WebhookDispatcherService::enqueue_event( + &_state.db_pool, + "plan.created", + &payload_value, + ) + .await + { tracing::warn!("Failed to enqueue webhook for plan.created: {:?}", e); } @@ -112,7 +118,13 @@ async fn ping_plan( ) -> impl IntoResponse { // enqueue webhook event for plan.pinged let payload_value = serde_json::to_value(&_payload).unwrap_or(serde_json::json!({})); - if let Err(e) = inheritx_backend::WebhookDispatcherService::enqueue_event(&_state.db_pool, "plan.pinged", &payload_value).await { + if let Err(e) = inheritx_backend::WebhookDispatcherService::enqueue_event( + &_state.db_pool, + "plan.pinged", + &payload_value, + ) + .await + { tracing::warn!("Failed to enqueue webhook for plan.pinged: {:?}", e); } diff --git a/backend/src/inactivity_watchdog.rs b/backend/src/inactivity_watchdog.rs index d874eb088..7e9846eb4 100644 --- a/backend/src/inactivity_watchdog.rs +++ b/backend/src/inactivity_watchdog.rs @@ -1,7 +1,7 @@ +use crate::WebhookDispatcherService; use chrono::{DateTime, Utc}; -use sqlx::PgPool; use serde_json::json; -use crate::WebhookDispatcherService; +use sqlx::PgPool; use std::sync::Arc; use std::time::Duration; use tokio::time::MissedTickBehavior; @@ -124,7 +124,13 @@ impl InactivityWatchdogService { "inactivity_deadline_at": plan.inactivity_deadline_at, }); - if let Err(e) = inheritx_backend::WebhookDispatcherService::enqueue_event(&self.db, "plan.claimable", &payload).await { + if let Err(e) = inheritx_backend::WebhookDispatcherService::enqueue_event( + &self.db, + "plan.claimable", + &payload, + ) + .await + { warn!("Failed to enqueue webhook for plan.claimable: {:?}", e); } } diff --git a/backend/src/lib.rs b/backend/src/lib.rs index 64a789604..55bab146f 100644 --- a/backend/src/lib.rs +++ b/backend/src/lib.rs @@ -2,15 +2,13 @@ pub mod api; pub mod config; pub mod db; pub mod inactivity_watchdog; -pub mod kyc_webhook; pub mod stellar_anchor; pub mod telemetry; -pub mod ws; -pub mod yield_calculator; pub mod webhooks; +pub mod yield_calculator; pub use api::{create_router, AppState}; pub use config::Config; pub use db::DbManager; pub use inactivity_watchdog::{InactivityWatchdogConfig, InactivityWatchdogService}; -pub use webhooks::{WebhookDispatcherService}; +pub use webhooks::WebhookDispatcherService; diff --git a/backend/src/main.rs b/backend/src/main.rs index 492cf5c0f..50beb0bdb 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -55,7 +55,9 @@ async fn main() -> Result<(), Box> { inactivity_watchdog.start(); // Start webhook dispatcher - let webhook_dispatcher = Arc::new(inheritx_backend::WebhookDispatcherService::new(db_pool.clone())); + let webhook_dispatcher = Arc::new(inheritx_backend::WebhookDispatcherService::new( + db_pool.clone(), + )); webhook_dispatcher.start(); // Create Axum application diff --git a/backend/src/webhooks.rs b/backend/src/webhooks.rs index 647805623..3e45386ab 100644 --- a/backend/src/webhooks.rs +++ b/backend/src/webhooks.rs @@ -2,7 +2,7 @@ use hmac::{Hmac, Mac}; use reqwest::Client; use serde_json::Value; use sha2::Sha256; -use sqlx::{PgPool, Row, Postgres, Transaction}; +use sqlx::{PgPool, Postgres, Row, Transaction}; use std::sync::Arc; use std::time::Duration; use tokio::time::sleep; @@ -62,7 +62,8 @@ impl WebhookDispatcherService { // compute HMAC-SHA256 signature let key = secret.unwrap_or_default(); - let mut mac = HmacSha256::new_from_slice(key.as_bytes()).expect("HMAC can take key of any size"); + let mut mac = + HmacSha256::new_from_slice(key.as_bytes()).expect("HMAC can take key of any size"); mac.update(&body); let signature = hex::encode(mac.finalize().into_bytes()); @@ -86,8 +87,17 @@ impl WebhookDispatcherService { Ok(mut resp) => { let status = resp.status().as_u16(); let text = resp.text().await.unwrap_or_default(); - warn!("Webhook dispatch {} returned status {}: {}", id, status, text); - Self::handle_failure(&mut tx, id, attempts, Some(format!("status {}: {}", status, text))).await?; + warn!( + "Webhook dispatch {} returned status {}: {}", + id, status, text + ); + Self::handle_failure( + &mut tx, + id, + attempts, + Some(format!("status {}: {}", status, text)), + ) + .await?; } Err(e) => { warn!("Webhook dispatch {} request error: {}", id, e); @@ -100,12 +110,18 @@ impl WebhookDispatcherService { Ok(()) } - async fn handle_failure(tx: &mut Transaction<'_, Postgres>, id: uuid::Uuid, attempts: i32, last_error: Option) -> Result<(), sqlx::Error> { + async fn handle_failure( + tx: &mut Transaction<'_, Postgres>, + id: uuid::Uuid, + attempts: i32, + last_error: Option, + ) -> Result<(), sqlx::Error> { let next_attempts = attempts + 1; - let max_attempts_row = sqlx::query("SELECT max_attempts FROM webhook_dispatches WHERE id = $1") - .bind(id) - .fetch_optional(&mut *tx) - .await?; + let max_attempts_row = + sqlx::query("SELECT max_attempts FROM webhook_dispatches WHERE id = $1") + .bind(id) + .fetch_optional(&mut *tx) + .await?; let max_attempts: i32 = if let Some(row) = max_attempts_row { row.get::, _>("max_attempts").unwrap_or(5) @@ -136,9 +152,15 @@ impl WebhookDispatcherService { } // Enqueue a payload for all active endpoints - pub async fn enqueue_event(db: &PgPool, event_type: &str, payload: &Value) -> Result<(), sqlx::Error> { + pub async fn enqueue_event( + db: &PgPool, + event_type: &str, + payload: &Value, + ) -> Result<(), sqlx::Error> { let mut tx = db.begin().await?; - let endpoints = sqlx::query("SELECT id FROM webhook_endpoints WHERE is_active = true").fetch_all(&mut tx).await?; + let endpoints = sqlx::query("SELECT id FROM webhook_endpoints WHERE is_active = true") + .fetch_all(&mut tx) + .await?; for ep in endpoints { let endpoint_id: uuid::Uuid = ep.get("id"); sqlx::query("INSERT INTO webhook_dispatches (endpoint_id, event_type, payload) VALUES ($1, $2, $3)") @@ -160,6 +182,8 @@ mod tests { use super::*; #[test] fn smoke() { - let _ = WebhookDispatcherService::new(sqlx::PgPool::connect_lazy("postgres://localhost").unwrap()); + let _ = WebhookDispatcherService::new( + sqlx::PgPool::connect_lazy("postgres://localhost").unwrap(), + ); } } From e48d406025d5b59e8736e72f77259550ee16f88a Mon Sep 17 00:00:00 2001 From: Alimzy Date: Fri, 26 Jun 2026 15:11:44 +0100 Subject: [PATCH 3/5] fix: repair lockfile and resolve backend syntax issues --- backend/Cargo.lock | 93 +++++++++++++++++++++------------------------- backend/src/api.rs | 2 +- 2 files changed, 44 insertions(+), 51 deletions(-) diff --git a/backend/Cargo.lock b/backend/Cargo.lock index b10faafdb..60303fe03 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -52,9 +52,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.102" +version = "1.0.103" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" +checksum = "2a4385e2e34eb35d6b3efe798b9eb88096925d87726c0798709bf56d9ed84af3" [[package]] name = "arrayvec" @@ -1083,9 +1083,9 @@ checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" [[package]] name = "js-sys" -version = "0.3.102" +version = "0.3.103" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03d04c30968dffe80775bd4d7fb676131cd04a1fb46d2686dbffbaec2d9dfd31" +checksum = "53b44bfcdb3f8d5837a46dae1ca9660a837176eee74a28b229bc626816589102" dependencies = [ "cfg-if", "futures-util", @@ -1640,7 +1640,7 @@ version = "0.11.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62" dependencies = [ - "base64", + "base64 0.21.7", "bytes", "encoding_rs", "futures-core", @@ -2508,46 +2508,40 @@ dependencies = [ ] [[package]] -[[package]] -name = "try-lock" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" - -[[package]] -name = "tungstenite" +name = "tokio-tungstenite" version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" +checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38" dependencies = [ - "byteorder", - "bytes", - "data-encoding", - "http", - "httparse", + "futures-util", "log", - "rand 0.8.6", - "sha1", - "thiserror 1.0.69", - "url", - "utf-8", + "tokio", + "tungstenite 0.21.0", ] [[package]] -name = "tungstenite" +name = "tokio-tungstenite" version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c01152af293afb9c7c2a57e4b559c5620b421f6d133261c60dd2d0cdb38e6b8" +checksum = "8f72a05e828585856dacd553fba484c242c46e391fb0e58917c942ee9202915c" dependencies = [ - "bytes", - "data-encoding", - "http", - "httparse", + "futures-util", "log", - "rand 0.9.4", - "sha1", - "thiserror 2.0.18", + "tokio", + "tungstenite 0.29.0", ] + +[[package]] +name = "tokio-util" +version = "0.7.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ae9cec805b01e8fc3fd2fe289f89149a9b66dd16786abd8b19cfa7b48cb0098" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", ] [[package]] @@ -2701,7 +2695,6 @@ dependencies = [ "tracing-serde", ] -[[package]] [[package]] name = "try-lock" version = "0.2.5" @@ -2717,7 +2710,7 @@ dependencies = [ "byteorder", "bytes", "data-encoding", - "http", + "http 1.4.2", "httparse", "log", "rand 0.8.6", @@ -2735,7 +2728,7 @@ checksum = "6c01152af293afb9c7c2a57e4b559c5620b421f6d133261c60dd2d0cdb38e6b8" dependencies = [ "bytes", "data-encoding", - "http", + "http 1.4.2", "httparse", "log", "rand 0.9.4", @@ -2826,9 +2819,9 @@ checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" [[package]] name = "uuid" -version = "1.23.3" +version = "1.23.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "144d6b123cef80b301b8f72a9e2ca4370ddec21950d0a103dd22c437006d2db7" +checksum = "bf80a72845275afea99e7f2b434723d3bc7e38470fcd1c7ed39a599c73319a53" dependencies = [ "getrandom 0.4.3", "js-sys", @@ -2886,9 +2879,9 @@ checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" [[package]] name = "wasm-bindgen" -version = "0.2.125" +version = "0.2.126" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ddb3f79143bced6de84270411622a2699cee572fc0875aeaf1e7867cf9fca1a" +checksum = "4b067c0c11094aef6b7a801c1e34a26affafdf3d051dba08456b868789aaf9a4" dependencies = [ "cfg-if", "once_cell", @@ -2900,9 +2893,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.75" +version = "0.4.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "503b14d284f2c8dac03b819967e155ea753f573586193b2b2c95990cb5d69280" +checksum = "c62df1340f32221cb9c54d6a27b030e3dba64361d4a95bed55f9aacb44da291d" dependencies = [ "js-sys", "wasm-bindgen", @@ -2910,9 +2903,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.125" +version = "0.2.126" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e21a184b13fb19e157296e2c46056aec9092264fab83e4ba59e68c61b323c3d" +checksum = "167ce5e579f6bcf889c4f7175a8a5a585de84e8ff93976ce393efa5f2837aab1" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -2920,9 +2913,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.125" +version = "0.2.126" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fecefd9c35bd935a20fc3fc344b5f29138961e4f47fb03297d88f2587afb5ebd" +checksum = "f3997c7839262f4ef12cf90b818d6340c18e80f263f1a94bf157d0ec4420380e" dependencies = [ "bumpalo", "proc-macro2", @@ -2933,18 +2926,18 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.125" +version = "0.2.126" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23939e44bb9a5d7576fa2b563dc2e136628f1224e88a8deed09e04858b77871f" +checksum = "dc1b4cb0cc549fcf58d7dfc081778139b3d283a081644e833e84682ad71cea24" dependencies = [ "unicode-ident", ] [[package]] name = "web-sys" -version = "0.3.102" +version = "0.3.103" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6430a72df5eb332242960fe84b3002a241163998241eb596d4f739b9757061d" +checksum = "8622dcb61c0bcc9fffa6938bed81210af2da9a7e4a1a834b2e37a59b6dfb6141" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/backend/src/api.rs b/backend/src/api.rs index 7e277f5b7..f4dac9baa 100644 --- a/backend/src/api.rs +++ b/backend/src/api.rs @@ -11,8 +11,8 @@ use tower_http::cors::{Any, CorsLayer}; use crate::kyc_webhook::kyc_webhook_handler; use crate::stellar_anchor::{AnchorPayout, AnchorRegistry}; -use crate::WebhookDispatcherService; use crate::ws::{ws_handler, KycUpdateEvent}; +use crate::WebhookDispatcherService; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PlanBeneficiary { From 4f707a34ec92b3d0ec9fb92b74a0105356478b2b Mon Sep 17 00:00:00 2001 From: Alimzy Date: Fri, 26 Jun 2026 16:44:10 +0100 Subject: [PATCH 4/5] fix: remove duplicate closing brace in api.rs --- backend/src/api.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/backend/src/api.rs b/backend/src/api.rs index f4dac9baa..09c6de3db 100644 --- a/backend/src/api.rs +++ b/backend/src/api.rs @@ -327,7 +327,6 @@ async fn create_plan( (StatusCode::CREATED, Json(response)).into_response() } -} // Handler: Get Plans // Contributors: Implement plan retrieval, filtering by owner, and apply on-the-fly yield accumulation From 4eb43ca15698e01deefcda1f00bab38f98fcfebe Mon Sep 17 00:00:00 2001 From: Alimzy Date: Fri, 3 Jul 2026 02:39:38 +0100 Subject: [PATCH 5/5] fix webhook dispatcher build --- backend/src/api.rs | 10 +-- backend/src/inactivity_watchdog.rs | 9 +-- backend/src/main.rs | 11 ++- backend/src/webhooks.rs | 104 ++++++++++++++++++----------- backend/tests/api_tests.rs | 1 - backend/tests/kyc_webhook_test.rs | 3 - 6 files changed, 74 insertions(+), 64 deletions(-) diff --git a/backend/src/api.rs b/backend/src/api.rs index 09c6de3db..9d008e635 100644 --- a/backend/src/api.rs +++ b/backend/src/api.rs @@ -9,9 +9,8 @@ use serde::{Deserialize, Serialize}; use std::sync::Arc; use tower_http::cors::{Any, CorsLayer}; -use crate::kyc_webhook::kyc_webhook_handler; use crate::stellar_anchor::{AnchorPayout, AnchorRegistry}; -use crate::ws::{ws_handler, KycUpdateEvent}; + use crate::WebhookDispatcherService; #[derive(Debug, Clone, Serialize, Deserialize)] @@ -38,7 +37,6 @@ pub struct Plan { pub struct AppState { pub anchor: Arc, pub db_pool: sqlx::PgPool, - pub kyc_tx: tokio::sync::broadcast::Sender, pub kyc_webhook_secret: Option, } @@ -73,8 +71,6 @@ pub fn create_router(state: Arc) -> Router { .route("/api/plans/ping", post(ping_plan)) .route("/api/plans/payout", post(trigger_payout)) .route("/api/anchor/payout-status", get(get_anchor_payouts)) - .route("/api/kyc/webhook", post(kyc_webhook_handler)) - .route("/ws/kyc", get(ws_handler)) .layer(cors) .with_state(state) } @@ -315,7 +311,7 @@ async fn create_plan( // 3. Enqueue webhook event for plan.created (non-blocking) let payload_value = serde_json::to_value(&response).unwrap_or(serde_json::json!({})); - if let Err(e) = inheritx_backend::WebhookDispatcherService::enqueue_event( + if let Err(e) = crate::WebhookDispatcherService::enqueue_event( &state.db_pool, "plan.created", &payload_value, @@ -346,7 +342,7 @@ async fn ping_plan( ) -> impl IntoResponse { // enqueue webhook event for plan.pinged let payload_value = serde_json::to_value(&_payload).unwrap_or(serde_json::json!({})); - if let Err(e) = inheritx_backend::WebhookDispatcherService::enqueue_event( + if let Err(e) = crate::WebhookDispatcherService::enqueue_event( &_state.db_pool, "plan.pinged", &payload_value, diff --git a/backend/src/inactivity_watchdog.rs b/backend/src/inactivity_watchdog.rs index 7e9846eb4..eaf8ce262 100644 --- a/backend/src/inactivity_watchdog.rs +++ b/backend/src/inactivity_watchdog.rs @@ -124,12 +124,9 @@ impl InactivityWatchdogService { "inactivity_deadline_at": plan.inactivity_deadline_at, }); - if let Err(e) = inheritx_backend::WebhookDispatcherService::enqueue_event( - &self.db, - "plan.claimable", - &payload, - ) - .await + if let Err(e) = + crate::WebhookDispatcherService::enqueue_event(&self.db, "plan.claimable", &payload) + .await { warn!("Failed to enqueue webhook for plan.claimable: {:?}", e); } diff --git a/backend/src/main.rs b/backend/src/main.rs index 50beb0bdb..261cc76e9 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -11,13 +11,13 @@ async fn main() -> Result<(), Box> { // Initialize tracing logging telemetry::init_tracing()?; - //loading the .env + // Load environment dotenvy::dotenv().ok(); // Load configuration let config = Config::load()?; - // Attempt to connect to PostgreSQL stub/real + // Connect to PostgreSQL and run migrations let db_pool = match DbManager::create_pool(&config.database_url).await { Ok(pool) => { info!("Successfully connected to PostgreSQL database."); @@ -28,26 +28,23 @@ async fn main() -> Result<(), Box> { pool } - Err(e) => { error!( "Failed to connect to PostgreSQL database ({}): {:?}", config.database_url, e ); - std::process::exit(1); } }; - // Initialize state skeleton - let (kyc_tx, _) = tokio::sync::broadcast::channel(100); + // Initialize state let state = Arc::new(AppState { anchor: Arc::new(inheritx_backend::stellar_anchor::AnchorRegistry::new()), db_pool: db_pool.clone(), - kyc_tx, kyc_webhook_secret: std::env::var("KYC_WEBHOOK_SECRET").ok(), }); + // Start inactivity watchdog let inactivity_watchdog = Arc::new(InactivityWatchdogService::new( db_pool.clone(), InactivityWatchdogConfig::from_env(), diff --git a/backend/src/webhooks.rs b/backend/src/webhooks.rs index 3e45386ab..cc0f2d374 100644 --- a/backend/src/webhooks.rs +++ b/backend/src/webhooks.rs @@ -2,12 +2,14 @@ use hmac::{Hmac, Mac}; use reqwest::Client; use serde_json::Value; use sha2::Sha256; -use sqlx::{PgPool, Postgres, Row, Transaction}; +use sqlx::PgPool; use std::sync::Arc; use std::time::Duration; use tokio::time::sleep; use tracing::{error, info, warn}; +use sqlx::Row; + type HmacSha256 = Hmac; pub struct WebhookDispatcherService { @@ -36,18 +38,17 @@ impl WebhookDispatcherService { } async fn run_once(&self) -> Result<(), sqlx::Error> { - let mut tx: Transaction<'_, Postgres> = self.db.begin().await?; - - // select pending dispatches ready to run let rows = sqlx::query( "SELECT wd.id, wd.endpoint_id, wd.event_type, wd.payload, wd.attempts, we.url, we.secret FROM webhook_dispatches wd JOIN webhook_endpoints we ON wd.endpoint_id = we.id - WHERE wd.status = 'pending' AND (wd.next_attempt_at IS NULL OR wd.next_attempt_at <= NOW()) + WHERE wd.status = 'pending' + AND (wd.next_attempt_at IS NULL OR wd.next_attempt_at <= NOW()) ORDER BY wd.created_at ASC - LIMIT 25 FOR UPDATE SKIP LOCKED", + LIMIT 25 + FOR UPDATE SKIP LOCKED", ) - .fetch_all(&mut tx) + .fetch_all(&self.db) .await?; for row in rows { @@ -78,10 +79,14 @@ impl WebhookDispatcherService { match res { Ok(resp) if resp.status().is_success() => { - sqlx::query("UPDATE webhook_dispatches SET status = 'success', updated_at = NOW() WHERE id = $1") - .bind(id) - .execute(&mut tx) - .await?; + sqlx::query( + "UPDATE webhook_dispatches + SET status = 'success', updated_at = NOW() + WHERE id = $1", + ) + .bind(id) + .execute(&self.db) + .await?; info!("Webhook dispatch {} succeeded", id); } Ok(mut resp) => { @@ -91,8 +96,9 @@ impl WebhookDispatcherService { "Webhook dispatch {} returned status {}: {}", id, status, text ); + Self::handle_failure( - &mut tx, + &self.db, id, attempts, Some(format!("status {}: {}", status, text)), @@ -101,26 +107,27 @@ impl WebhookDispatcherService { } Err(e) => { warn!("Webhook dispatch {} request error: {}", id, e); - Self::handle_failure(&mut tx, id, attempts, Some(e.to_string())).await?; + + Self::handle_failure(&self.db, id, attempts, Some(e.to_string())).await?; } } } - tx.commit().await?; Ok(()) } async fn handle_failure( - tx: &mut Transaction<'_, Postgres>, + db: &PgPool, id: uuid::Uuid, attempts: i32, last_error: Option, ) -> Result<(), sqlx::Error> { let next_attempts = attempts + 1; + let max_attempts_row = sqlx::query("SELECT max_attempts FROM webhook_dispatches WHERE id = $1") .bind(id) - .fetch_optional(&mut *tx) + .fetch_optional(db) .await?; let max_attempts: i32 = if let Some(row) = max_attempts_row { @@ -130,22 +137,36 @@ impl WebhookDispatcherService { }; if next_attempts >= max_attempts { - // mark failed - sqlx::query("UPDATE webhook_dispatches SET attempts = $1, status = 'failed', last_error = $2, updated_at = NOW() WHERE id = $3") - .bind(next_attempts) - .bind(last_error) - .bind(id) - .execute(&mut *tx) - .await?; + sqlx::query( + "UPDATE webhook_dispatches + SET attempts = $1, + status = 'failed', + last_error = $2, + updated_at = NOW() + WHERE id = $3", + ) + .bind(next_attempts) + .bind(last_error) + .bind(id) + .execute(db) + .await?; } else { let backoff_secs = 2u64.pow(next_attempts as u32); - sqlx::query("UPDATE webhook_dispatches SET attempts = $1, last_error = $2, next_attempt_at = (NOW() + ($3 || ' seconds')::interval), updated_at = NOW() WHERE id = $4") - .bind(next_attempts) - .bind(last_error) - .bind(backoff_secs as i64) - .bind(id) - .execute(&mut *tx) - .await?; + + sqlx::query( + "UPDATE webhook_dispatches + SET attempts = $1, + last_error = $2, + next_attempt_at = (NOW() + ($3 || ' seconds')::interval), + updated_at = NOW() + WHERE id = $4", + ) + .bind(next_attempts) + .bind(last_error) + .bind(backoff_secs as i64) + .bind(id) + .execute(db) + .await?; } Ok(()) @@ -157,29 +178,32 @@ impl WebhookDispatcherService { event_type: &str, payload: &Value, ) -> Result<(), sqlx::Error> { - let mut tx = db.begin().await?; let endpoints = sqlx::query("SELECT id FROM webhook_endpoints WHERE is_active = true") - .fetch_all(&mut tx) + .fetch_all(db) .await?; + for ep in endpoints { let endpoint_id: uuid::Uuid = ep.get("id"); - sqlx::query("INSERT INTO webhook_dispatches (endpoint_id, event_type, payload) VALUES ($1, $2, $3)") - .bind(endpoint_id) - .bind(event_type) - .bind(payload.clone()) - .execute(&mut tx) - .await?; + + sqlx::query( + "INSERT INTO webhook_dispatches (endpoint_id, event_type, payload) + VALUES ($1, $2, $3)", + ) + .bind(endpoint_id) + .bind(event_type) + .bind(payload.clone()) + .execute(db) + .await?; } - tx.commit().await?; Ok(()) } } #[cfg(test)] mod tests { - // light compile-time sanity checks use super::*; + #[test] fn smoke() { let _ = WebhookDispatcherService::new( diff --git a/backend/tests/api_tests.rs b/backend/tests/api_tests.rs index 2bca0e46e..516186213 100644 --- a/backend/tests/api_tests.rs +++ b/backend/tests/api_tests.rs @@ -11,7 +11,6 @@ use tower::ServiceExt; // for oneshot fn setup_app() -> axum::Router { let state = Arc::new(AppState { anchor: Arc::new(inheritx_backend::stellar_anchor::AnchorRegistry::new()), - kyc_tx: tokio::sync::broadcast::channel(16).0, db_pool: PgPoolOptions::new() .connect_lazy("postgres://postgres:password@localhost/test") .unwrap(), diff --git a/backend/tests/kyc_webhook_test.rs b/backend/tests/kyc_webhook_test.rs index 2801a1cd2..0885534ad 100644 --- a/backend/tests/kyc_webhook_test.rs +++ b/backend/tests/kyc_webhook_test.rs @@ -25,12 +25,9 @@ fn test_state(secret: Option<&str>) -> std::sync::Arc