From d6cf2d0fc0607db4105a490666082797821db8ba Mon Sep 17 00:00:00 2001 From: Jeff Gardner <202880+erskingardner@users.noreply.github.com> Date: Thu, 21 May 2026 08:25:12 +0200 Subject: [PATCH 01/11] Update default and negentropy relay lists Add relay.divine.video, relay.momostr.pink, and relay.ditto.pub to the default seed relays and drop the now-dead relay.nostr.band. Add relay.divine.video and nos.lol to the negentropy (NIP-77) sync defaults, and refresh the associated docs, CLI help, and tests. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/pensieve-ingest/src/main.rs | 6 ++++-- crates/pensieve-ingest/src/source/relay.rs | 4 +++- crates/pensieve-ingest/src/sync/negentropy.rs | 20 ++++++++++++++++--- pensieve-deploy/README.md | 1 + 4 files changed, 25 insertions(+), 6 deletions(-) diff --git a/crates/pensieve-ingest/src/main.rs b/crates/pensieve-ingest/src/main.rs index 6976679..06cbe57 100644 --- a/crates/pensieve-ingest/src/main.rs +++ b/crates/pensieve-ingest/src/main.rs @@ -51,13 +51,15 @@ use tracing_subscriber::EnvFilter; fn default_seed_relays() -> Vec { vec![ "wss://relay.damus.io".to_string(), - "wss://relay.nostr.band".to_string(), "wss://nos.lol".to_string(), "wss://relay.snort.social".to_string(), "wss://purplepag.es".to_string(), "wss://relay.primal.net".to_string(), "wss://nostr.wine".to_string(), "wss://relay.nostr.bg".to_string(), + "wss://relay.divine.video".to_string(), + "wss://relay.momostr.pink".to_string(), + "wss://relay.ditto.pub".to_string(), ] } @@ -212,7 +214,7 @@ struct Args { /// Path to negentropy trusted relays file (one URL per line, # for comments). /// - /// If not specified, uses default trusted relays (relay.damus.io, relay.primal.net). + /// If not specified, uses default trusted relays (relay.damus.io, relay.divine.video, nos.lol). /// Only relays that support NIP-77 should be listed. #[arg(long)] negentropy_relays_file: Option, diff --git a/crates/pensieve-ingest/src/source/relay.rs b/crates/pensieve-ingest/src/source/relay.rs index 7b80459..ba0be0d 100644 --- a/crates/pensieve-ingest/src/source/relay.rs +++ b/crates/pensieve-ingest/src/source/relay.rs @@ -91,13 +91,15 @@ impl Default for RelayConfig { Self { seed_relays: vec![ "wss://relay.damus.io".to_string(), - "wss://relay.nostr.band".to_string(), "wss://nos.lol".to_string(), "wss://relay.snort.social".to_string(), "wss://purplepag.es".to_string(), "wss://relay.primal.net".to_string(), "wss://nostr.wine".to_string(), "wss://relay.nostr.bg".to_string(), + "wss://relay.divine.video".to_string(), + "wss://relay.momostr.pink".to_string(), + "wss://relay.ditto.pub".to_string(), ], discovery_enabled: true, max_relays: 30, diff --git a/crates/pensieve-ingest/src/sync/negentropy.rs b/crates/pensieve-ingest/src/sync/negentropy.rs index c68ab2b..c071650 100644 --- a/crates/pensieve-ingest/src/sync/negentropy.rs +++ b/crates/pensieve-ingest/src/sync/negentropy.rs @@ -53,9 +53,16 @@ pub struct NegentropySyncConfig { impl Default for NegentropySyncConfig { fn default() -> Self { Self { - // Only Damus confirmed to support NIP-77 + // Relays confirmed to support NIP-77: + // - relay.damus.io + // - relay.divine.video + // - nos.lol // Primal does NOT support NIP-77 (connects but ignores NEG-OPEN) - relays: vec!["wss://relay.damus.io".to_string()], + relays: vec![ + "wss://relay.damus.io".to_string(), + "wss://relay.divine.video".to_string(), + "wss://nos.lol".to_string(), + ], interval: Duration::from_secs(1800), // 30 minutes lookback: Duration::from_secs(14 * 24 * 3600), // 14 days protocol_timeout: Duration::from_secs(900), // 15 min for full sync @@ -643,7 +650,14 @@ mod tests { #[test] fn test_config_defaults() { let config = NegentropySyncConfig::default(); - assert_eq!(config.relays.len(), 1); + assert_eq!( + config.relays, + vec![ + "wss://relay.damus.io".to_string(), + "wss://relay.divine.video".to_string(), + "wss://nos.lol".to_string(), + ] + ); assert_eq!(config.interval, Duration::from_secs(1800)); assert_eq!(config.lookback, Duration::from_secs(14 * 24 * 3600)); } diff --git a/pensieve-deploy/README.md b/pensieve-deploy/README.md index a1d5183..5ea7b50 100644 --- a/pensieve-deploy/README.md +++ b/pensieve-deploy/README.md @@ -658,6 +658,7 @@ Add `.onion` relay addresses to your seed file: wss://relay.damus.io wss://nos.lol wss://relay.primal.net +wss://relay.divine.video # Tor relays (routed through proxy) # Note: .onion relays use ws:// not wss:// (Tor provides encryption) From 6201a14f51a6d25ec82af0d6a85b3a9c99c5e510 Mon Sep 17 00:00:00 2001 From: Jeff Gardner <202880+erskingardner@users.noreply.github.com> Date: Thu, 21 May 2026 08:45:38 +0200 Subject: [PATCH 02/11] Harden HTTP tier: strip article HTML, SSRF guard, constant-time tokens MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - pensieve-preview: filter raw/inline HTML out of kind-30023 article markdown so attacker-controlled content can no longer inject "); - assert!(result.contains("")); + // A benign inline tag is dropped too. + let bold = render_markdown("hello world"); + assert!(!bold.contains("")); + assert!(bold.contains("world")); } // -- extract_tag_value() tests -- diff --git a/crates/pensieve-preview/src/routes/og.rs b/crates/pensieve-preview/src/routes/og.rs index de3a905..f8bf61a 100644 --- a/crates/pensieve-preview/src/routes/og.rs +++ b/crates/pensieve-preview/src/routes/og.rs @@ -96,9 +96,38 @@ fn extract_avatar_url(content: &resolve::ResolvedContent) -> Option { /// Fetch an avatar image from a URL, with a timeout. /// /// Returns the raw image bytes, or `None` if the fetch fails. +/// +/// SSRF guard: the `picture` URL is attacker-controlled (it comes from a Nostr +/// profile event), so before fetching we require an `http(s)` scheme, resolve the +/// host and reject any address that is not globally routable (loopback, private, +/// link-local/metadata, CGNAT, etc.), and disable redirect following so a public +/// URL cannot bounce to an internal one. (A determined attacker could still race +/// DNS between our resolution and reqwest's; pinning the resolved IP would close +/// that, but redirect-disable + resolve-check covers the practical cases.) async fn fetch_avatar(url: &str) -> Option> { + let parsed = reqwest::Url::parse(url).ok()?; + if !matches!(parsed.scheme(), "http" | "https") { + return None; + } + let host = parsed.host_str()?; + let port = parsed.port_or_known_default().unwrap_or(443); + + // Resolve the host and ensure every resolved address is globally routable. + let mut resolved_any = false; + for addr in tokio::net::lookup_host((host, port)).await.ok()? { + resolved_any = true; + if !is_global_ip(addr.ip()) { + tracing::debug!(%host, ip = %addr.ip(), "blocked avatar fetch to non-global address"); + return None; + } + } + if !resolved_any { + return None; + } + let client = reqwest::Client::builder() .timeout(std::time::Duration::from_secs(5)) + .redirect(reqwest::redirect::Policy::none()) .build() .ok()?; @@ -117,6 +146,39 @@ async fn fetch_avatar(url: &str) -> Option> { Some(bytes.to_vec()) } +/// Whether an IP address is globally routable (i.e. safe to fetch from). +/// +/// Conservatively rejects loopback, private, link-local (incl. 169.254.169.254 +/// cloud metadata), CGNAT, documentation, unspecified, and IPv6 ULA/link-local. +fn is_global_ip(ip: std::net::IpAddr) -> bool { + match ip { + std::net::IpAddr::V4(v4) => is_global_v4(v4), + std::net::IpAddr::V6(v6) => { + // IPv4-mapped IPv6 (::ffff:a.b.c.d) must be judged by its IPv4 value. + if let Some(v4) = v6.to_ipv4_mapped() { + return is_global_v4(v4); + } + !(v6.is_loopback() + || v6.is_unspecified() + || (v6.segments()[0] & 0xfe00) == 0xfc00 // ULA fc00::/7 + || (v6.segments()[0] & 0xffc0) == 0xfe80) // link-local fe80::/10 + } + } +} + +/// IPv4 global-routability check. +fn is_global_v4(v4: std::net::Ipv4Addr) -> bool { + let o = v4.octets(); + !(v4.is_private() + || v4.is_loopback() + || v4.is_link_local() + || v4.is_broadcast() + || v4.is_documentation() + || v4.is_unspecified() + || o[0] == 0 + || (o[0] == 100 && (o[1] & 0xc0) == 64)) // CGNAT 100.64.0.0/10 +} + /// Font family string for SVG text (sans single quotes that confuse `format!`). const FONT_FAMILY: &str = "Inter, -apple-system, BlinkMacSystemFont, Segoe UI, Roboto, sans-serif"; diff --git a/crates/pensieve-serve/src/auth.rs b/crates/pensieve-serve/src/auth.rs index 38b6b98..ec1694c 100644 --- a/crates/pensieve-serve/src/auth.rs +++ b/crates/pensieve-serve/src/auth.rs @@ -34,10 +34,40 @@ pub async fn require_auth( } }; - if !state.config.api_tokens.contains(token) { + if !token_is_valid(&state.config.api_tokens, token) { tracing::debug!("invalid api token"); return Err(ApiError::Unauthorized); } Ok(next.run(request).await) } + +/// Constant-time check that `token` matches one of the configured tokens. +/// +/// Iterates the entire set without early exit and compares each candidate with a +/// constant-time byte comparison, so response timing does not reveal how many +/// leading characters of a guessed token were correct (which a naive +/// `HashSet::contains` / `==` would leak). +fn token_is_valid(tokens: &std::collections::HashSet, token: &str) -> bool { + let mut valid = false; + for known in tokens { + // Bitwise `|=` (not `||`) so the comparison runs for every token. + valid |= constant_time_eq(known.as_bytes(), token.as_bytes()); + } + valid +} + +/// Constant-time equality for two byte slices. +/// +/// Returns early only on a length mismatch (token length is not a meaningful +/// secret); for equal lengths the comparison time is independent of the content. +fn constant_time_eq(a: &[u8], b: &[u8]) -> bool { + if a.len() != b.len() { + return false; + } + let mut diff = 0u8; + for (x, y) in a.iter().zip(b.iter()) { + diff |= x ^ y; + } + diff == 0 +} diff --git a/crates/pensieve-serve/src/error.rs b/crates/pensieve-serve/src/error.rs index 3267411..9e272c1 100644 --- a/crates/pensieve-serve/src/error.rs +++ b/crates/pensieve-serve/src/error.rs @@ -60,7 +60,8 @@ impl IntoResponse for ApiError { ( StatusCode::INTERNAL_SERVER_ERROR, "database_error", - Some(format!("Database error: {}", err)), + // Do not leak SQL/schema internals to clients; full detail is logged above. + Some("A database error occurred".to_string()), ) } Self::Serialization(err) => { From cdeb097501ea28bbf48a9d69adda29433e5e465e Mon Sep 17 00:00:00 2001 From: Jeff Gardner <202880+erskingardner@users.noreply.github.com> Date: Thu, 21 May 2026 09:04:24 +0200 Subject: [PATCH 03/11] Externalize preview JS and lock CSP to script-src 'self' Move all interactivity (image lightbox, copy-to-clipboard, broken-image hiding, home search) into a same-origin /app.js using event delegation and data- attributes, and replace every inline event handler. This removes the image-grid onclick sink where an event-content URL containing a single quote could break out of the JS string (stored XSS), and lets the CSP drop script-src 'unsafe-inline' in favor of 'self'. Also corrects doc comments that claimed "no JavaScript execution" / "all content escaped", which no longer matched reality. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/pensieve-preview/src/lib.rs | 6 +- .../pensieve-preview/src/render/components.rs | 12 +-- crates/pensieve-preview/src/render/content.rs | 6 +- crates/pensieve-preview/src/render/profile.rs | 6 +- crates/pensieve-preview/src/routes/home.rs | 20 +---- crates/pensieve-preview/src/routes/llms.rs | 2 +- crates/pensieve-preview/src/routes/mod.rs | 79 +++++++++++++++++++ 7 files changed, 99 insertions(+), 32 deletions(-) diff --git a/crates/pensieve-preview/src/lib.rs b/crates/pensieve-preview/src/lib.rs index 1abc8a5..afc771a 100644 --- a/crates/pensieve-preview/src/lib.rs +++ b/crates/pensieve-preview/src/lib.rs @@ -26,9 +26,11 @@ //! //! # Security //! -//! - All dynamic content is HTML-escaped by maud +//! - User-controlled text is HTML-escaped by maud; article markdown is rendered +//! with raw/inline HTML stripped //! - URLs are validated (HTTPS/HTTP only) before use in attributes -//! - Strict Content-Security-Policy: no JavaScript execution +//! - Strict Content-Security-Policy: scripts only from same-origin `/app.js`, +//! with no inline scripts or event handlers //! - X-Frame-Options: DENY prevents clickjacking pub mod config; diff --git a/crates/pensieve-preview/src/render/components.rs b/crates/pensieve-preview/src/render/components.rs index 335987b..9e949a9 100644 --- a/crates/pensieve-preview/src/render/components.rs +++ b/crates/pensieve-preview/src/render/components.rs @@ -160,9 +160,10 @@ body{background:#0a0a0f;color:#e5e5e5} /// Content-Security-Policy header value. /// -/// Allows inline styles and a small inline script for copy-to-clipboard. -/// No external scripts, no iframes, only HTTPS images. -pub const CSP_HEADER: &str = "default-src 'none'; style-src 'unsafe-inline'; script-src 'unsafe-inline'; img-src https: data:; connect-src 'self'; form-action 'none'; frame-ancestors 'none'"; +/// Scripts are served same-origin from `/app.js`; there are no inline scripts or +/// inline event handlers, so `script-src` is locked to `'self'`. Inline styles +/// are still allowed. No iframes; images limited to HTTPS and data URIs. +pub const CSP_HEADER: &str = "default-src 'none'; style-src 'unsafe-inline'; script-src 'self'; img-src https: data:; connect-src 'self'; form-action 'none'; frame-ancestors 'none'"; /// Render the full HTML page shell with ``, OG tags, and body content. pub fn page_shell( @@ -207,6 +208,7 @@ pub fn page_shell( // these pages in search results initially. style { (PreEscaped(PAGE_CSS)) } + script src="/app.js" defer {} } body { main { (body_content) } @@ -255,7 +257,7 @@ pub fn author_header(metadata: Option<&ProfileMetadata>, npub: &str, base_url: & (initial.as_str()) @if let Some(pic_url) = picture { @if is_safe_url(pic_url) { - img src=(pic_url) alt=(name) loading="lazy" onerror="this.style.display='none'"; + img src=(pic_url) alt=(name) loading="lazy" data-hide-on-error="1"; } } } @@ -268,7 +270,7 @@ pub fn author_header(metadata: Option<&ProfileMetadata>, npub: &str, base_url: & } div class="author-npub" { span class="author-npub-text" title=(npub) { (truncated_npub) } - button class="copy-btn" onclick=(format!("navigator.clipboard.writeText('{}').then(()=>{{this.innerHTML='Copied!';setTimeout(()=>this.innerHTML='{}',1500)}})", npub, ICON_COPY.replace('"', """))) title="Copy npub" { + button class="copy-btn" data-copy=(npub) title="Copy npub" { (PreEscaped(ICON_COPY)) } } diff --git a/crates/pensieve-preview/src/render/content.rs b/crates/pensieve-preview/src/render/content.rs index fef6d91..afd36db 100644 --- a/crates/pensieve-preview/src/render/content.rs +++ b/crates/pensieve-preview/src/render/content.rs @@ -168,7 +168,7 @@ pub fn render_content( } // Lightbox overlay (hidden by default, activated by JS) - div class="lightbox-overlay" id="lightbox" onclick="this.classList.remove('active')" { + div class="lightbox-overlay" id="lightbox" { img id="lightbox-img" src="" alt=""; } } @@ -184,7 +184,7 @@ fn render_image_grid(urls: &[String]) -> Markup { html! { div class=(grid_class) { @for (i, url) in urls.iter().take(display_count).enumerate() { - div class="media-item" onclick=(format!("document.getElementById('lightbox-img').src='{}';document.getElementById('lightbox').classList.add('active')", url)) { + div class="media-item" data-full=(url) { img src=(url) alt="" loading="lazy"; // Show +N overlay on the last visible image if there are more @if overflow > 0 && i == display_count - 1 { @@ -422,7 +422,7 @@ fn render_quote_card(quoted: &QuotedEvent, base_url: &str) -> Markup { (initial.as_str()) @if let Some(pic_url) = author_pic { @if is_safe_url(pic_url) { - img src=(pic_url) alt="" loading="lazy" onerror="this.style.display='none'"; + img src=(pic_url) alt="" loading="lazy" data-hide-on-error="1"; } } } diff --git a/crates/pensieve-preview/src/render/profile.rs b/crates/pensieve-preview/src/render/profile.rs index b36ff11..ab4698c 100644 --- a/crates/pensieve-preview/src/render/profile.rs +++ b/crates/pensieve-preview/src/render/profile.rs @@ -55,7 +55,7 @@ pub fn render( div class="profile-banner-fallback" { @if let Some(banner_url) = metadata.banner.as_deref() { @if is_safe_url(banner_url) { - img class="profile-banner" src=(banner_url) alt="" loading="lazy" onerror="this.style.display='none'"; + img class="profile-banner" src=(banner_url) alt="" loading="lazy" data-hide-on-error="1"; } } } @@ -66,7 +66,7 @@ pub fn render( (initial.as_str()) @if let Some(pic_url) = metadata.picture.as_deref() { @if is_safe_url(pic_url) { - img src=(pic_url) alt=(name) loading="lazy" onerror="this.style.display='none'"; + img src=(pic_url) alt=(name) loading="lazy" data-hide-on-error="1"; } } } @@ -81,7 +81,7 @@ pub fn render( div class="author-npub" { span class="author-npub-text" title=(npub) { (truncate_npub_display(npub)) } - button class="copy-btn" onclick=(format!("navigator.clipboard.writeText('{}').then(()=>{{this.innerHTML='Copied!';setTimeout(()=>this.innerHTML='{}',1500)}})", npub, ICON_COPY.replace('"', """))) title="Copy npub" { + button class="copy-btn" data-copy=(npub) title="Copy npub" { (PreEscaped(ICON_COPY)) } } diff --git a/crates/pensieve-preview/src/routes/home.rs b/crates/pensieve-preview/src/routes/home.rs index a92a2ce..7735f0e 100644 --- a/crates/pensieve-preview/src/routes/home.rs +++ b/crates/pensieve-preview/src/routes/home.rs @@ -21,6 +21,7 @@ pub async fn home_page() -> impl IntoResponse { meta property="og:type" content="website"; style { (PreEscaped(PAGE_CSS)) } style { (PreEscaped(HOME_CSS)) } + script src="/app.js" defer {} } body { main class="home" { @@ -31,8 +32,7 @@ pub async fn home_page() -> impl IntoResponse { "Fast, static preview pages for any Nostr event." } - form class="home-search" action="/" method="GET" - onsubmit="return handleLookup(event)" { + form class="home-search" action="/" method="GET" { input class="home-input" type="text" name="q" placeholder="Paste an identifier" @@ -41,22 +41,6 @@ pub async fn home_page() -> impl IntoResponse { autofocus; button class="home-btn" type="submit" { "Look up" } } - script { (PreEscaped(r#" -var inp=document.querySelector('.home-input'); -inp.addEventListener('input',function(){ - var v=this.value; - if(v.toLowerCase().startsWith('nostr:')){ - this.value=v.slice(6); - } -}); -function handleLookup(e){ - e.preventDefault(); - var v=inp.value.trim().replace(/\s+/g,''); - if(v)window.location.href='/'+encodeURIComponent(v); - return false; -} -"#)) } - div class="home-how" { p { "Add any Nostr identifier to the URL:" diff --git a/crates/pensieve-preview/src/routes/llms.rs b/crates/pensieve-preview/src/routes/llms.rs index 44e3562..a9f3a92 100644 --- a/crates/pensieve-preview/src/routes/llms.rs +++ b/crates/pensieve-preview/src/routes/llms.rs @@ -137,7 +137,7 @@ and tag specifications — useful for understanding event content and structure. - Engagement counts are approximate (based on ingested data) - Zap amounts are in millisatoshis (divide by 1000 for sats) - HTML pages include Open Graph and Twitter Card meta tags -- All dynamic content is HTML-escaped for XSS protection +- User-controlled text is HTML-escaped and article markdown is rendered with raw HTML stripped, for XSS protection ## Links diff --git a/crates/pensieve-preview/src/routes/mod.rs b/crates/pensieve-preview/src/routes/mod.rs index 7d6f503..f603e01 100644 --- a/crates/pensieve-preview/src/routes/mod.rs +++ b/crates/pensieve-preview/src/routes/mod.rs @@ -30,6 +30,7 @@ pub fn router(state: AppState) -> Router { .route("/robots.txt", get(robots_txt)) .route("/favicon.svg", get(favicon_svg)) .route("/favicon.ico", get(favicon_svg)) + .route("/app.js", get(app_js)) .route("/llms.txt", get(llms::llms_txt)) .route("/llms-full.txt", get(llms::llms_full_txt)) .route("/og/{identifier}", get(og::og_image_handler)) @@ -58,3 +59,81 @@ async fn favicon_svg() -> impl IntoResponse { FAVICON_SVG, ) } + +/// Client-side behavior served as a same-origin script. +/// +/// All interactivity (image lightbox, copy-to-clipboard buttons, hiding broken +/// images, and the home-page search box) lives here so the Content-Security-Policy +/// can forbid inline scripts and inline event handlers (`script-src 'self'`). +/// Everything is wired with event delegation + `data-` attributes, so no +/// attacker-controlled value is ever interpolated into executable JS. +const APP_JS: &str = r#"(function () { + "use strict"; + // 'error' events don't bubble, so hide broken images via a capture-phase listener. + document.addEventListener("error", function (e) { + var t = e.target; + if (t && t.tagName === "IMG" && t.hasAttribute("data-hide-on-error")) { + t.style.display = "none"; + } + }, true); + + document.addEventListener("click", function (e) { + var el = e.target; + if (!el || !el.closest) return; + + var item = el.closest(".media-item[data-full]"); + if (item) { + var img = document.getElementById("lightbox-img"); + var box = document.getElementById("lightbox"); + if (img && box) { + img.src = item.getAttribute("data-full"); + box.classList.add("active"); + } + return; + } + + if (el.closest(".lightbox-overlay")) { + var overlay = document.getElementById("lightbox"); + if (overlay) overlay.classList.remove("active"); + return; + } + + var copy = el.closest(".copy-btn[data-copy]"); + if (copy && navigator.clipboard) { + var orig = copy.innerHTML; + navigator.clipboard.writeText(copy.getAttribute("data-copy")).then(function () { + copy.innerHTML = "Copied!"; + setTimeout(function () { copy.innerHTML = orig; }, 1500); + }); + return; + } + }); + + var inp = document.querySelector(".home-input"); + if (inp) { + inp.addEventListener("input", function () { + var v = this.value; + if (v.toLowerCase().indexOf("nostr:") === 0) this.value = v.slice(6); + }); + } + var form = document.querySelector(".home-search"); + if (form) { + form.addEventListener("submit", function (e) { + e.preventDefault(); + var q = inp ? inp.value.trim().replace(/\s+/g, "") : ""; + if (q) window.location.href = "/" + encodeURIComponent(q); + }); + } +})(); +"#; + +/// Serve the client-side script. +async fn app_js() -> impl IntoResponse { + ( + [ + ("content-type", "application/javascript; charset=utf-8"), + ("cache-control", "public, max-age=86400"), + ], + APP_JS, + ) +} From 52e7f58016cfe1f1f282592042223562c4f3961d Mon Sep 17 00:00:00 2001 From: Jeff Gardner <202880+erskingardner@users.noreply.github.com> Date: Thu, 21 May 2026 09:07:04 +0200 Subject: [PATCH 04/11] Bound request work: batch mention profiles, cap them, add request timeout The JSON event endpoint resolved one ClickHouse profile query per p-tag serially, so an event with thousands of p tags caused a query amplification DoS. Resolve mentions with a single batched fetch_profiles() query, dedupe, and cap at 50. Add a 10s request TimeoutLayer to both the preview and API servers to bound slow/expensive requests. Co-Authored-By: Claude Opus 4.7 (1M context) --- Cargo.lock | 1 + Cargo.toml | 2 +- crates/pensieve-preview/src/main.rs | 8 +++++- crates/pensieve-preview/src/query.rs | 31 ++++++++++++++++++++++ crates/pensieve-preview/src/routes/json.rs | 15 ++++++++--- crates/pensieve-serve/src/main.rs | 8 +++++- 6 files changed, 59 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 505162a..201af45 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4381,6 +4381,7 @@ dependencies = [ "http-body 1.0.1", "iri-string", "pin-project-lite", + "tokio", "tower", "tower-layer", "tower-service", diff --git a/Cargo.toml b/Cargo.toml index 3b8397d..29f9ba3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,7 +57,7 @@ notepack = { git = "https://github.com/erskingardner/notepack" } # HTTP server axum = "0.8" tower = "0.5" -tower-http = { version = "0.6", features = ["cors", "trace"] } +tower-http = { version = "0.6", features = ["cors", "trace", "timeout"] } # Observability tracing = "0.1" diff --git a/crates/pensieve-preview/src/main.rs b/crates/pensieve-preview/src/main.rs index c08ccfd..b3edc5d 100644 --- a/crates/pensieve-preview/src/main.rs +++ b/crates/pensieve-preview/src/main.rs @@ -6,6 +6,7 @@ use axum::http::Request; use clap::Parser; use tower_http::cors::{Any, CorsLayer}; +use tower_http::timeout::TimeoutLayer; use tower_http::trace::{DefaultOnFailure, DefaultOnRequest, DefaultOnResponse, TraceLayer}; use tracing::Level; use tracing_subscriber::EnvFilter; @@ -77,7 +78,12 @@ async fn main() -> anyhow::Result<()> { .allow_origin(Any) .allow_methods(Any) .allow_headers(Any), - ); + ) + // Cap total request handling time to bound slow/expensive requests. + .layer(TimeoutLayer::with_status_code( + axum::http::StatusCode::REQUEST_TIMEOUT, + std::time::Duration::from_secs(10), + )); // Start server let listener = tokio::net::TcpListener::bind(&bind_addr).await?; diff --git a/crates/pensieve-preview/src/query.rs b/crates/pensieve-preview/src/query.rs index 292627e..fc4c966 100644 --- a/crates/pensieve-preview/src/query.rs +++ b/crates/pensieve-preview/src/query.rs @@ -144,6 +144,37 @@ pub async fn fetch_profile( Ok(result) } +/// Batch-fetch full profiles (kind 0) for a set of pubkeys in a single query. +/// +/// Returns a map of pubkey (hex) -> [`ProfileRow`]. Pubkeys without a profile +/// are omitted. Used to resolve mentioned pubkeys without an N+1 query. +pub async fn fetch_profiles( + client: &Client, + pubkeys: &[String], +) -> Result, PreviewError> { + if pubkeys.is_empty() { + return Ok(std::collections::HashMap::new()); + } + + let placeholders: Vec<&str> = pubkeys.iter().map(|_| "?").collect(); + let query_str = format!( + "SELECT pubkey, argMax(id, created_at) AS event_id, \ + argMax(content, created_at) AS content \ + FROM events_local \ + WHERE pubkey IN ({}) AND kind = 0 \ + GROUP BY pubkey", + placeholders.join(", ") + ); + + let mut query = client.query(&query_str); + for pk in pubkeys { + query = query.bind(pk); + } + + let rows = query.fetch_all::().await?; + Ok(rows.into_iter().map(|r| (r.pubkey.clone(), r)).collect()) +} + /// Fetch a replaceable event by (kind, pubkey, d_tag). /// /// Used for addressable events like NIP-23 articles (kind 30023), diff --git a/crates/pensieve-preview/src/routes/json.rs b/crates/pensieve-preview/src/routes/json.rs index 7936d98..4145fcd 100644 --- a/crates/pensieve-preview/src/routes/json.rs +++ b/crates/pensieve-preview/src/routes/json.rs @@ -70,17 +70,26 @@ pub async fn json_handler_inner( engagement, .. } => { - // Build mentions_metadata: fetch full profiles for all p-tagged pubkeys - let mentioned_pubkeys: Vec = event + // Build mentions_metadata: resolve p-tagged pubkeys' profiles. + // Cap the count and batch into a single query so a malicious event + // with thousands of `p` tags can't trigger an N+1 query amplification. + const MAX_MENTIONS: usize = 50; + let mut mentioned_pubkeys: Vec = event .tags .iter() .filter(|t| t.len() >= 2 && t[0] == "p" && t[1].len() == 64) .map(|t| t[1].clone()) .collect(); + mentioned_pubkeys.sort(); + mentioned_pubkeys.dedup(); + mentioned_pubkeys.truncate(MAX_MENTIONS); + let profiles = query::fetch_profiles(&state.clickhouse, &mentioned_pubkeys) + .await + .unwrap_or_default(); let mut mentions_map = serde_json::Map::new(); for pk in &mentioned_pubkeys { - if let Ok(Some(profile_row)) = query::fetch_profile(&state.clickhouse, pk).await { + if let Some(profile_row) = profiles.get(pk) { let meta = ProfileMetadata::from_json(&profile_row.content); mentions_map.insert( pk.clone(), diff --git a/crates/pensieve-serve/src/main.rs b/crates/pensieve-serve/src/main.rs index c5abb51..cc61cee 100644 --- a/crates/pensieve-serve/src/main.rs +++ b/crates/pensieve-serve/src/main.rs @@ -6,6 +6,7 @@ use axum::http::Request; use clap::Parser; use tower_http::cors::{Any, CorsLayer}; +use tower_http::timeout::TimeoutLayer; use tower_http::trace::{DefaultOnFailure, DefaultOnRequest, DefaultOnResponse, TraceLayer}; use tracing::Level; use tracing_subscriber::EnvFilter; @@ -70,7 +71,12 @@ async fn main() -> anyhow::Result<()> { .allow_origin(Any) .allow_methods(Any) .allow_headers(Any), - ); + ) + // Cap total request handling time to bound slow/expensive requests. + .layer(TimeoutLayer::with_status_code( + axum::http::StatusCode::REQUEST_TIMEOUT, + std::time::Duration::from_secs(10), + )); // Start server let listener = tokio::net::TcpListener::bind(&bind_addr).await?; From f229b5dd4a5ed41d965dec2d75cf37234d92d437 Mon Sep 17 00:00:00 2001 From: Jeff Gardner <202880+erskingardner@users.noreply.github.com> Date: Thu, 21 May 2026 09:18:47 +0200 Subject: [PATCH 05/11] Make the archive write path crash-safe (fix silent event loss) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The live daemon only marked events `Archived` on the final shutdown seal; mid-stream seals went to ClickHouse but never to the dedupe index, and the `Pending` marker was written (non-durably) before the event bytes left an 8 MB in-memory buffer. A non-graceful crash therefore lost the buffered events while their markers survived, so they were treated as "seen" forever and never re-fetched — defeating the archive-first guarantee. Changes: - Dedupe `Pending` is now tracked in memory only; only durably-sealed events are persisted (as `Archived`). On a crash the in-memory markers vanish with the lost buffer, so those events are simply re-fetched and de-duplicated by ClickHouse's ReplacingMergeTree. Legacy on-disk `Pending` values still read as "seen", so no migration/re-fetch storm. - seal() now fsyncs the segment file and marks its events `Archived` on EVERY seal (the writer holds a dedupe ref), not just the final one. - Negentropy only advances sync-state for events confirmed durably `Archived`, so a crash can't leave sync-state claiming an unarchived event (H4). Backfill binaries keep marking archived explicitly (writer dedupe ref = None). Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/pensieve-ingest/src/bin/jsonl.rs | 3 +- crates/pensieve-ingest/src/bin/proto.rs | 3 +- crates/pensieve-ingest/src/main.rs | 9 +- crates/pensieve-ingest/src/pipeline/dedupe.rs | 93 +++++++++++++++---- .../pensieve-ingest/src/pipeline/segment.rs | 56 ++++++++--- crates/pensieve-ingest/src/sync/negentropy.rs | 58 +++++++++--- 6 files changed, 166 insertions(+), 56 deletions(-) diff --git a/crates/pensieve-ingest/src/bin/jsonl.rs b/crates/pensieve-ingest/src/bin/jsonl.rs index f2d4b01..2cd167a 100644 --- a/crates/pensieve-ingest/src/bin/jsonl.rs +++ b/crates/pensieve-ingest/src/bin/jsonl.rs @@ -708,7 +708,8 @@ fn init_pipeline(args: &Args, output: &Path) -> Result { segment_prefix: "segment".to_string(), compress: !args.no_compress, }; - let segment_writer = Arc::new(SegmentWriter::new(segment_config, sealed_sender)?); + // Backfill marks archived itself (see below), so the writer gets no dedupe ref. + let segment_writer = Arc::new(SegmentWriter::new(segment_config, sealed_sender, None)?); // Initialize ClickHouse indexer (optional) let indexer_handle = diff --git a/crates/pensieve-ingest/src/bin/proto.rs b/crates/pensieve-ingest/src/bin/proto.rs index 1df8698..92d96df 100644 --- a/crates/pensieve-ingest/src/bin/proto.rs +++ b/crates/pensieve-ingest/src/bin/proto.rs @@ -729,7 +729,8 @@ fn init_pipeline(args: &Args) -> Result { "disabled" } ); - let segment_writer = Arc::new(SegmentWriter::new(segment_config, sealed_sender)?); + // Backfill marks archived itself (see below), so the writer gets no dedupe ref. + let segment_writer = Arc::new(SegmentWriter::new(segment_config, sealed_sender, None)?); // Initialize ClickHouse indexer (optional) let indexer_handle = diff --git a/crates/pensieve-ingest/src/main.rs b/crates/pensieve-ingest/src/main.rs index 06cbe57..232e1fa 100644 --- a/crates/pensieve-ingest/src/main.rs +++ b/crates/pensieve-ingest/src/main.rs @@ -543,6 +543,7 @@ async fn main() -> Result<()> { Some(Arc::new(NegentropySyncer::new( negentropy_config, sync_state, + Some(Arc::clone(&dedupe)), ))) } else { None @@ -958,16 +959,14 @@ async fn main() -> Result<()> { } } - // Seal final segment + // Seal final segment. Its events are marked archived inside seal() now (the + // writer holds a dedupe reference), so we don't mark them again here. if let Some(sealed) = segment_writer.seal()? { tracing::info!( segment_number = sealed.segment_number, event_count = sealed.event_count, "sealed final segment" ); - - // Mark as archived - dedupe.mark_archived(sealed.event_ids.iter())?; } // Flush dedupe @@ -1077,7 +1076,7 @@ fn init_pipeline(args: &Args) -> Result { ); let segment_writer = Arc::new( - SegmentWriter::new(segment_config, sealed_sender) + SegmentWriter::new(segment_config, sealed_sender, Some(Arc::clone(&dedupe))) .with_context(|| "Failed to create segment writer")?, ); diff --git a/crates/pensieve-ingest/src/pipeline/dedupe.rs b/crates/pensieve-ingest/src/pipeline/dedupe.rs index c118e57..28ca278 100644 --- a/crates/pensieve-ingest/src/pipeline/dedupe.rs +++ b/crates/pensieve-ingest/src/pipeline/dedupe.rs @@ -20,6 +20,7 @@ use crate::Result; use parking_lot::Mutex; use rocksdb::{DBWithThreadMode, MultiThreaded, Options, WriteBatch, WriteOptions}; +use std::collections::HashSet; use std::path::Path; use std::sync::Arc; @@ -27,7 +28,10 @@ use std::sync::Arc; #[repr(u8)] #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum EventStatus { - /// Event is being processed (written to segment but not yet sealed). + /// Legacy on-disk status written by older builds. New builds track in-flight + /// events in memory and only ever persist `Archived`; this variant is kept so + /// that existing databases (which contain `Pending` values) still read as + /// "already seen". Pending = 1, /// Event has been archived (segment sealed and uploaded). Archived = 2, @@ -54,12 +58,20 @@ impl EventStatus { /// when multiple sources concurrently attempt to claim the same event ID. pub struct DedupeIndex { db: Arc>, - /// Mutex for atomic check-and-mark operations. + /// In-flight event IDs: claimed (written to the current, unsealed segment) but + /// not yet durably archived. Tracked in memory ONLY — never persisted. /// - /// This ensures that when multiple sources (e.g., live ingestion and negentropy - /// sync) concurrently process the same event, only one "wins" and marks it pending. - /// Without this, a race between check and put could allow duplicates. - write_lock: Mutex<()>, + /// This is deliberate for crash-safety. On a crash, the unsealed segment's + /// buffered bytes are lost; because these markers live only in memory, they are + /// lost too, so the affected events are simply "not seen" on the next start and + /// get re-fetched (by live ingestion and negentropy), then de-duplicated + /// downstream by ClickHouse's ReplacingMergeTree. Only durably-sealed events are + /// written to disk as `Archived`, and that on-disk state is what actually + /// suppresses re-fetching. + /// + /// The mutex also serializes check-and-mark so two sources (live and negentropy) + /// cannot both claim the same novel event ID. + pending: Mutex>, } impl DedupeIndex { @@ -110,7 +122,7 @@ impl DedupeIndex { Ok(Self { db: Arc::new(db), - write_lock: Mutex::new(()), + pending: Mutex::new(HashSet::new()), }) } @@ -139,6 +151,10 @@ impl DedupeIndex { /// This is optimized for the common case where events are new. /// Uses bloom filters for fast rejection of seen events. pub fn is_new(&self, event_id: &[u8; 32]) -> Result { + // An event is "not new" if it's either durably on disk or currently in-flight. + if self.pending.lock().contains(event_id) { + return Ok(false); + } Ok(self.get_status(event_id)?.is_none()) } @@ -154,16 +170,18 @@ impl DedupeIndex { pub fn check_and_mark_pending(&self, event_id: &[u8; 32]) -> Result { // Hold the lock for the entire check-and-mark operation to prevent races. // Without this, two sources could both see "not exists" and both write. - let _guard = self.write_lock.lock(); + let mut pending = self.pending.lock(); - // Check if it exists (uses bloom filter for fast path) + // Already durably recorded on disk? (`Archived`, or a legacy `Pending` + // value from an older build — both mean "we already have this".) if self.get_status(event_id)?.is_some() { return Ok(false); } - // Mark as pending - self.db.put(event_id, [EventStatus::Pending.to_byte()])?; - Ok(true) + // Claim it in-flight. This is in memory only and becomes a durable + // `Archived` entry when the segment is sealed (see `mark_archived`). + // `insert` returns false if another source already claimed it this run. + Ok(pending.insert(*event_id)) } /// Mark multiple events as archived (batch operation). @@ -178,18 +196,25 @@ impl DedupeIndex { I: Iterator, { let mut batch = WriteBatch::default(); - let mut count = 0usize; + let mut archived: Vec<[u8; 32]> = Vec::new(); for event_id in event_ids { batch.put(event_id, [EventStatus::Archived.to_byte()]); - count += 1; + archived.push(*event_id); } - if count > 0 { + if !archived.is_empty() { let mut write_opts = WriteOptions::default(); - write_opts.set_sync(true); // Ensure durability + write_opts.set_sync(true); // Durable before we drop the in-flight markers self.db.write_opt(batch, &write_opts)?; - tracing::debug!("Marked {} events as archived", count); + + // Now that the IDs are durably `Archived`, remove them from the + // in-flight set so the in-memory set only ever holds unsealed events. + let mut pending = self.pending.lock(); + for id in &archived { + pending.remove(id); + } + tracing::debug!("Marked {} events as archived", archived.len()); } Ok(()) @@ -259,8 +284,10 @@ mod tests { // Different ID should return true assert!(index.check_and_mark_pending(&id2).unwrap()); - // Check status - assert_eq!(index.get_status(&id1).unwrap(), Some(EventStatus::Pending)); + // In-flight events are tracked in memory (not persisted), so they read as + // "not new" but have no on-disk status until the segment is sealed. + assert!(!index.is_new(&id1).unwrap()); + assert_eq!(index.get_status(&id1).unwrap(), None); } #[test] @@ -294,4 +321,32 @@ mod tests { index.check_and_mark_pending(&id1).unwrap(); assert!(!index.is_new(&id1).unwrap()); } + + #[test] + fn test_pending_not_persisted_but_archived_is() { + let tmp = TempDir::new().unwrap(); + let id1 = test_event_id(1); + let id2 = test_event_id(2); + + { + let index = DedupeIndex::open(tmp.path()).unwrap(); + // Claim both in-flight, then durably archive only id1. + assert!(index.check_and_mark_pending(&id1).unwrap()); + assert!(index.check_and_mark_pending(&id2).unwrap()); + index.mark_archived([&id1].into_iter()).unwrap(); + } + + // Re-opening simulates a restart: in-memory in-flight state is gone, so an + // unsealed event (id2) becomes re-fetchable while an archived one (id1) + // stays suppressed. This is the crash-safety guarantee. + let index = DedupeIndex::open(tmp.path()).unwrap(); + assert!( + !index.is_new(&id1).unwrap(), + "archived event must stay seen" + ); + assert!( + index.is_new(&id2).unwrap(), + "unsealed event must be re-fetchable after restart" + ); + } } diff --git a/crates/pensieve-ingest/src/pipeline/segment.rs b/crates/pensieve-ingest/src/pipeline/segment.rs index 845b0d9..b529e63 100644 --- a/crates/pensieve-ingest/src/pipeline/segment.rs +++ b/crates/pensieve-ingest/src/pipeline/segment.rs @@ -35,6 +35,8 @@ use std::path::PathBuf; use std::sync::Arc; use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; +use super::dedupe::DedupeIndex; + /// Configuration for the segment writer. #[derive(Debug, Clone)] pub struct SegmentConfig { @@ -167,6 +169,11 @@ pub struct SegmentWriter { /// Wrapped in Arc for sharing with background compression threads. total_compressed_bytes: Arc, sealed_sender: Option>, + /// Optional dedupe index. When present, every sealed segment's events are + /// marked durably `Archived` here (after the segment file is fsync'd), which is + /// what makes mid-stream seals crash-safe. Backfill binaries pass `None` and + /// mark archived themselves. + dedupe: Option>, } impl SegmentWriter { @@ -176,9 +183,12 @@ impl SegmentWriter { /// /// * `config` - Configuration for the writer /// * `sealed_sender` - Optional channel to send sealed segment notifications + /// * `dedupe` - Optional dedupe index; when set, each sealed segment's events + /// are durably marked `Archived` after the file is fsync'd pub fn new( config: SegmentConfig, sealed_sender: Option>, + dedupe: Option>, ) -> Result { // Create output directory if it doesn't exist fs::create_dir_all(&config.output_dir)?; @@ -201,6 +211,7 @@ impl SegmentWriter { total_bytes: AtomicUsize::new(0), total_compressed_bytes: Arc::new(AtomicUsize::new(0)), sealed_sender, + dedupe, }) } @@ -330,24 +341,39 @@ impl SegmentWriter { /// for marking as archived). The ClickHouse notification is sent after /// compression completes (from the background thread). pub fn seal(&self) -> Result> { - let mut current = self.current.lock(); - - let segment = match current.take() { - Some(s) => s, - None => return Ok(None), - }; + let segment = { + let mut current = self.current.lock(); + match current.take() { + Some(s) => s, + None => return Ok(None), + } + }; // release the `current` lock before durable I/O and dedupe writes - // Flush and close the writer let CurrentSegment { - mut writer, + writer, path, event_count, size_bytes, event_ids, } = segment; - writer.flush()?; - drop(writer); + // Flush the buffer and fsync so the segment bytes are durable on disk + // BEFORE we record its events as archived. Otherwise a machine crash could + // leave events marked `Archived` (hence never re-fetched) while their bytes + // were still only in the OS page cache. + let file = writer + .into_inner() + .map_err(|e| Error::Segment(format!("failed to flush segment on seal: {e}")))?; + file.sync_all()?; + drop(file); + + // Record the segment's events as durably archived (and clear their in-flight + // markers). Doing this on EVERY seal — not just the final one — is what makes + // mid-stream sealed events durable; a crash before this point leaves them + // re-fetchable rather than silently lost. + if let Some(dedupe) = &self.dedupe { + dedupe.mark_archived(event_ids.iter())?; + } let segment_number = self.segment_number.fetch_add(1, Ordering::SeqCst); let sealed_at = Utc::now(); @@ -555,7 +581,7 @@ mod tests { ..Default::default() }; - let writer = SegmentWriter::new(config, None).unwrap(); + let writer = SegmentWriter::new(config, None, None).unwrap(); writer.write(test_event(1)).unwrap(); let stats = writer.stats(); @@ -572,7 +598,7 @@ mod tests { ..Default::default() }; - let writer = SegmentWriter::new(config, None).unwrap(); + let writer = SegmentWriter::new(config, None, None).unwrap(); // Write enough events to trigger seal for i in 0..20 { @@ -592,7 +618,7 @@ mod tests { ..Default::default() }; - let writer = SegmentWriter::new(config, None).unwrap(); + let writer = SegmentWriter::new(config, None, None).unwrap(); writer.write(test_event(1)).unwrap(); writer.seal().unwrap(); @@ -610,7 +636,7 @@ mod tests { ..Default::default() }; - let writer = SegmentWriter::new(config, None).unwrap(); + let writer = SegmentWriter::new(config, None, None).unwrap(); writer.write(test_event(1)).unwrap(); writer.seal().unwrap(); @@ -646,7 +672,7 @@ mod tests { }; let (sender, receiver) = crossbeam_channel::unbounded(); - let writer = SegmentWriter::new(config, Some(sender)).unwrap(); + let writer = SegmentWriter::new(config, Some(sender), None).unwrap(); writer.write(test_event(1)).unwrap(); writer.seal().unwrap(); diff --git a/crates/pensieve-ingest/src/sync/negentropy.rs b/crates/pensieve-ingest/src/sync/negentropy.rs index c071650..ea13d45 100644 --- a/crates/pensieve-ingest/src/sync/negentropy.rs +++ b/crates/pensieve-ingest/src/sync/negentropy.rs @@ -21,6 +21,7 @@ use super::SyncStateDb; use crate::Result; use crate::logging::compact_error; +use crate::pipeline::{DedupeIndex, EventStatus}; use metrics::{counter, gauge, histogram}; use nostr_sdk::prelude::*; use std::fmt::Debug; @@ -225,15 +226,28 @@ pub struct NegentropySyncer { config: NegentropySyncConfig, sync_state: Arc, running: Arc, + /// Optional dedupe index, used to confirm an event is DURABLY archived before + /// advancing sync-state. Without this gate, sync-state could record an event + /// that was only written to an unsealed segment; a crash would then drop it + /// from the archive *and* stop negentropy from ever re-fetching it (H4). + dedupe: Option>, } impl NegentropySyncer { /// Create a new negentropy syncer. - pub fn new(config: NegentropySyncConfig, sync_state: Arc) -> Self { + /// + /// `dedupe` should be the shared dedupe index so sync-state is only advanced + /// for events that are durably archived. Pass `None` only in tests. + pub fn new( + config: NegentropySyncConfig, + sync_state: Arc, + dedupe: Option>, + ) -> Self { Self { config, sync_state, running: Arc::new(AtomicBool::new(false)), + dedupe, } } @@ -448,21 +462,35 @@ impl NegentropySyncer { for event in events { match event_handler(&event) { Ok(true) => { - // Event processed successfully, NOW record in sync state. - // This ensures we only record events that were actually archived. - if let Err(e) = self - .sync_state - .record(event.id.as_bytes(), event.created_at.as_secs()) - { - tracing::warn!( - event_id = %event.id, - kind = event.kind.as_u16(), - pubkey = %event.pubkey, - error = %compact_error(&e), - "failed to record negentropy event in sync state" - ); + // Only advance sync-state once the event is DURABLY + // archived (its segment sealed + fsync'd). Recording an + // event that is merely in-flight would let a crash drop + // it from the archive while sync-state still claims we + // have it — negentropy would then never re-fetch it (H4). + // Not-yet-durable events are simply re-evaluated on the + // next cycle and recorded once their segment seals. + let durable = match &self.dedupe { + Some(dedupe) => matches!( + dedupe.get_status(event.id.as_bytes()), + Ok(Some(EventStatus::Archived)) + ), + None => true, + }; + if durable { + if let Err(e) = self + .sync_state + .record(event.id.as_bytes(), event.created_at.as_secs()) + { + tracing::warn!( + event_id = %event.id, + kind = event.kind.as_u16(), + pubkey = %event.pubkey, + error = %compact_error(&e), + "failed to record negentropy event in sync state" + ); + } + events_succeeded += 1; } - events_succeeded += 1; } Ok(false) => { tracing::debug!("Event handler signaled stop"); From 46ed6025d1b983374e0fdcbb9f8916442f1f3b3e Mon Sep 17 00:00:00 2001 From: Jeff Gardner <202880+erskingardner@users.noreply.github.com> Date: Thu, 21 May 2026 09:20:43 +0200 Subject: [PATCH 06/11] Harden ingest edge cases: seal ratio, torn frames, transient dedupe errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Guard the compression-ratio log against divide-by-zero on an empty/forced seal (it ran in a detached thread, so a panic there would have silently killed compression for that segment). - Bound the segment frame length on read: a crash mid-write can leave a partial length prefix, so cap a single frame at 16 MB and stop reading rather than attempting a multi-gigabyte allocation on a torn segment. - Stop dropping a live or negentropy event when the dedupe check itself errors (a transient RocksDB failure). Treat it as novel instead — a possible duplicate is reconciled downstream by ClickHouse, but a dropped event is lost. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/pensieve-ingest/src/main.rs | 9 +++++++-- crates/pensieve-ingest/src/pipeline/clickhouse.rs | 12 ++++++++++++ crates/pensieve-ingest/src/pipeline/segment.rs | 8 +++++++- 3 files changed, 26 insertions(+), 3 deletions(-) diff --git a/crates/pensieve-ingest/src/main.rs b/crates/pensieve-ingest/src/main.rs index 232e1fa..603894e 100644 --- a/crates/pensieve-ingest/src/main.rs +++ b/crates/pensieve-ingest/src/main.rs @@ -764,7 +764,9 @@ async fn main() -> Result<()> { error = %compact_error(&e), "negentropy dedupe check failed" ); - false + // Treat as novel on a transient dedupe error rather than + // dropping a possibly-new event; ClickHouse de-dups by id. + true } }; @@ -866,7 +868,10 @@ async fn main() -> Result<()> { error = %compact_error(&e), "dedupe check failed for live event" ); - false // Treat as duplicate on error + // Treat as novel on a (rare, transient) dedupe error: writing a + // possible duplicate is safe (ClickHouse de-dups by id), whereas + // dropping it would silently lose a potentially-new event. + true } }; diff --git a/crates/pensieve-ingest/src/pipeline/clickhouse.rs b/crates/pensieve-ingest/src/pipeline/clickhouse.rs index 76fc0ee..8c6f8b3 100644 --- a/crates/pensieve-ingest/src/pipeline/clickhouse.rs +++ b/crates/pensieve-ingest/src/pipeline/clickhouse.rs @@ -229,6 +229,18 @@ impl ClickHouseIndexer { let len = u32::from_le_bytes(len_buf) as usize; + // Guard against a torn/corrupt frame (e.g. a crash mid-write that left a + // partial length prefix): a single packed event is never this large, so + // treat an absurd length as end-of-data instead of allocating gigabytes. + const MAX_EVENT_BYTES: usize = 16 * 1024 * 1024; + if len > MAX_EVENT_BYTES { + tracing::warn!( + len, + "segment frame length exceeds sane maximum; stopping read (likely torn segment)" + ); + break; + } + // Read notepack bytes let mut data = vec![0u8; len]; reader.read_exact(&mut data)?; diff --git a/crates/pensieve-ingest/src/pipeline/segment.rs b/crates/pensieve-ingest/src/pipeline/segment.rs index b529e63..e4bc55f 100644 --- a/crates/pensieve-ingest/src/pipeline/segment.rs +++ b/crates/pensieve-ingest/src/pipeline/segment.rs @@ -409,13 +409,19 @@ impl SegmentWriter { ); } + // Guard against divide-by-zero on an empty/forced seal. + let ratio_pct = if size_bytes > 0 { + (compressed_bytes as f64 / size_bytes as f64) * 100.0 + } else { + 0.0 + }; tracing::info!( "Sealed segment {}: {} events, {} bytes -> {} bytes ({:.1}%) at {}", segment_number, event_count, size_bytes, compressed_bytes, - (compressed_bytes as f64 / size_bytes as f64) * 100.0, + ratio_pct, gz_path.display() ); From 9fa60f9c42b1a8ee4e799e3c1f2e7e9aafb5db6b Mon Sep 17 00:00:00 2001 From: Jeff Gardner <202880+erskingardner@users.noreply.github.com> Date: Thu, 21 May 2026 09:24:46 +0200 Subject: [PATCH 07/11] Make ClickHouse indexing self-healing and bounded Previously a failed segment insert was logged and dropped (with a TODO), the configured batch_size was unused (the whole segment was inserted in one request), and the indexer shared its counters across threads via an unsafe raw-pointer cast. - Retry each segment with bounded exponential backoff; on exhaustion increment clickhouse_insert_errors_total and append the segment path to a persistent reindex queue. The queue is drained on the next start, so ClickHouse self-heals from the canonical archive (idempotent via ReplacingMergeTree). - Insert events in batch_size-sized chunks instead of one giant request. - Replace the unsafe AtomicUsize pointer cast with Arc. The live ingester enables the reindex queue (in the segment output dir); backfill binaries leave it disabled. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/pensieve-ingest/src/bin/jsonl.rs | 2 + crates/pensieve-ingest/src/bin/proto.rs | 1 + crates/pensieve-ingest/src/main.rs | 1 + .../src/pipeline/clickhouse.rs | 210 ++++++++++++++---- 4 files changed, 169 insertions(+), 45 deletions(-) diff --git a/crates/pensieve-ingest/src/bin/jsonl.rs b/crates/pensieve-ingest/src/bin/jsonl.rs index 2cd167a..b41d3fe 100644 --- a/crates/pensieve-ingest/src/bin/jsonl.rs +++ b/crates/pensieve-ingest/src/bin/jsonl.rs @@ -290,6 +290,7 @@ async fn index_segments_mode( database: clickhouse_db.to_string(), table: "events_local".to_string(), batch_size: 10000, + reindex_queue_path: None, }; let indexer = ClickHouseIndexer::new(ch_config)?; @@ -720,6 +721,7 @@ fn init_pipeline(args: &Args, output: &Path) -> Result { database: args.clickhouse_db.clone(), table: "events_local".to_string(), batch_size: 10000, + reindex_queue_path: None, }; let indexer = ClickHouseIndexer::new(ch_config)?; Some(indexer.start(receiver)) diff --git a/crates/pensieve-ingest/src/bin/proto.rs b/crates/pensieve-ingest/src/bin/proto.rs index 92d96df..c053936 100644 --- a/crates/pensieve-ingest/src/bin/proto.rs +++ b/crates/pensieve-ingest/src/bin/proto.rs @@ -741,6 +741,7 @@ fn init_pipeline(args: &Args) -> Result { database: args.clickhouse_db.clone(), table: "events_local".to_string(), batch_size: 10000, + reindex_queue_path: None, }; let indexer = ClickHouseIndexer::new(ch_config)?; Some(indexer.start(receiver)) diff --git a/crates/pensieve-ingest/src/main.rs b/crates/pensieve-ingest/src/main.rs index 603894e..14bb51c 100644 --- a/crates/pensieve-ingest/src/main.rs +++ b/crates/pensieve-ingest/src/main.rs @@ -1094,6 +1094,7 @@ fn init_pipeline(args: &Args) -> Result { database: args.clickhouse_db.clone(), table: "events_local".to_string(), batch_size: 10000, + reindex_queue_path: Some(args.output_dir.join(".clickhouse_reindex_queue")), }; let indexer = ClickHouseIndexer::new(ch_config) .with_context(|| "Failed to create ClickHouse indexer")?; diff --git a/crates/pensieve-ingest/src/pipeline/clickhouse.rs b/crates/pensieve-ingest/src/pipeline/clickhouse.rs index 8c6f8b3..7341225 100644 --- a/crates/pensieve-ingest/src/pipeline/clickhouse.rs +++ b/crates/pensieve-ingest/src/pipeline/clickhouse.rs @@ -22,11 +22,12 @@ use crate::{Error, Result}; use clickhouse::{Client, Row}; use crossbeam_channel::Receiver; use flate2::read::GzDecoder; +use metrics::counter; use notepack::NoteParser; use serde::{Deserialize, Serialize}; use std::fs::File; use std::io::{BufReader, Read}; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::thread; @@ -45,6 +46,11 @@ pub struct ClickHouseConfig { /// Batch size for inserts pub batch_size: usize, + + /// Optional path to a newline-delimited file of segment paths that failed to + /// index. Failed segments are appended here and re-indexed on the next start, + /// so ClickHouse self-heals from the canonical archive. + pub reindex_queue_path: Option, } impl Default for ClickHouseConfig { @@ -54,6 +60,7 @@ impl Default for ClickHouseConfig { database: "nostr".to_string(), table: "events_local".to_string(), batch_size: 10000, + reindex_queue_path: None, } } } @@ -76,8 +83,8 @@ pub struct ClickHouseIndexer { client: Client, config: ClickHouseConfig, running: Arc, - segments_indexed: AtomicUsize, - events_indexed: AtomicUsize, + segments_indexed: Arc, + events_indexed: Arc, } impl ClickHouseIndexer { @@ -98,8 +105,8 @@ impl ClickHouseIndexer { client, config, running: Arc::new(AtomicBool::new(false)), - segments_indexed: AtomicUsize::new(0), - events_indexed: AtomicUsize::new(0), + segments_indexed: Arc::new(AtomicUsize::new(0)), + events_indexed: Arc::new(AtomicUsize::new(0)), }) } @@ -110,16 +117,12 @@ impl ClickHouseIndexer { let client = self.client.clone(); let config = self.config.clone(); let running = Arc::clone(&self.running); - let segments_indexed = &self.segments_indexed as *const AtomicUsize as usize; - let events_indexed = &self.events_indexed as *const AtomicUsize as usize; + let segments_indexed = Arc::clone(&self.segments_indexed); + let events_indexed = Arc::clone(&self.events_indexed); self.running.store(true, Ordering::SeqCst); thread::spawn(move || { - // SAFETY: We're passing raw pointers but they're valid for the lifetime of the indexer - let segments_indexed = unsafe { &*(segments_indexed as *const AtomicUsize) }; - let events_indexed = unsafe { &*(events_indexed as *const AtomicUsize) }; - tracing::info!("ClickHouse indexer thread started"); // Create a runtime for async operations @@ -128,6 +131,10 @@ impl ClickHouseIndexer { .build() .expect("Failed to create tokio runtime"); + // Re-index any segments that failed in a previous run before consuming + // new ones, so ClickHouse catches up with the archive. + Self::drain_reindex_queue(&client, &config, &rt, &segments_indexed, &events_indexed); + while running.load(Ordering::SeqCst) { match receiver.recv_timeout(std::time::Duration::from_secs(1)) { Ok(sealed) => { @@ -138,7 +145,7 @@ impl ClickHouseIndexer { sealed.path.display() ); - match rt.block_on(Self::index_segment(&client, &config, &sealed)) { + match Self::index_with_retry(&client, &config, &rt, &sealed) { Ok(count) => { segments_indexed.fetch_add(1, Ordering::Relaxed); events_indexed.fetch_add(count, Ordering::Relaxed); @@ -149,12 +156,15 @@ impl ClickHouseIndexer { ); } Err(e) => { + // Retries exhausted: record the failure and queue the + // segment for later reindex (the archive is canonical). + counter!("clickhouse_insert_errors_total").increment(1); tracing::error!( - "Failed to index segment {}: {}", - sealed.segment_number, - e + segment = sealed.segment_number, + error = %compact_error(&e), + "failed to index segment after retries; queued for reindex" ); - // TODO: Implement retry logic + Self::enqueue_failed(&config, &sealed.path); } } } @@ -177,28 +187,153 @@ impl ClickHouseIndexer { self.running.store(false, Ordering::SeqCst); } - /// Index a single segment into ClickHouse. - async fn index_segment( + /// Read a segment file and insert its events into ClickHouse (in batches). + async fn index_path(client: &Client, config: &ClickHouseConfig, path: &Path) -> Result { + let events = Self::read_segment(path)?; + Self::insert_events(client, config, &events).await + } + + /// Insert events in `batch_size`-sized chunks so a single insert request never + /// holds an entire 256 MB segment, and a transient failure only affects one + /// chunk (the whole segment is re-indexed idempotently via ReplacingMergeTree). + async fn insert_events( client: &Client, config: &ClickHouseConfig, - sealed: &SealedSegment, + events: &[EventRow], ) -> Result { - let events = Self::read_segment(&sealed.path)?; - if events.is_empty() { return Ok(0); } + let batch = config.batch_size.max(1); + for chunk in events.chunks(batch) { + let mut inserter = client.insert(&config.table)?; + for event in chunk { + inserter.write(event).await?; + } + inserter.end().await?; + } + Ok(events.len()) + } - // Batch insert - let mut inserter = client.insert(&config.table)?; + /// Index a sealed segment with bounded retries + exponential backoff. + fn index_with_retry( + client: &Client, + config: &ClickHouseConfig, + rt: &tokio::runtime::Runtime, + sealed: &SealedSegment, + ) -> Result { + const MAX_ATTEMPTS: u32 = 3; + let mut attempt = 0; + loop { + match rt.block_on(Self::index_path(client, config, &sealed.path)) { + Ok(count) => return Ok(count), + Err(e) => { + attempt += 1; + if attempt >= MAX_ATTEMPTS { + return Err(e); + } + let backoff = std::time::Duration::from_secs(2u64.pow(attempt)); + tracing::warn!( + segment = sealed.segment_number, + attempt, + error = %compact_error(&e), + "clickhouse index attempt failed; retrying in {:?}", + backoff + ); + std::thread::sleep(backoff); + } + } + } + } - for event in &events { - inserter.write(event).await?; + /// Append a failed segment's path to the reindex queue (best-effort). + fn enqueue_failed(config: &ClickHouseConfig, path: &Path) { + let Some(queue) = &config.reindex_queue_path else { + return; + }; + use std::io::Write; + let result = std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(queue) + .and_then(|mut f| writeln!(f, "{}", path.display())); + match result { + Ok(()) => tracing::info!(segment = %path.display(), "queued segment for reindex"), + Err(e) => { + tracing::warn!(error = %e, queue = %queue.display(), "failed to enqueue segment for reindex") + } } + } - inserter.end().await?; + /// Re-index every segment listed in the reindex queue, rewriting the queue with + /// the ones that still fail. Runs once at indexer start. + fn drain_reindex_queue( + client: &Client, + config: &ClickHouseConfig, + rt: &tokio::runtime::Runtime, + segments_indexed: &AtomicUsize, + events_indexed: &AtomicUsize, + ) { + let Some(queue) = &config.reindex_queue_path else { + return; + }; + if !queue.exists() { + return; + } + let contents = match std::fs::read_to_string(queue) { + Ok(c) => c, + Err(e) => { + tracing::warn!(error = %e, queue = %queue.display(), "failed to read reindex queue"); + return; + } + }; + let mut paths: Vec = contents + .lines() + .map(|l| l.trim().to_string()) + .filter(|l| !l.is_empty()) + .collect(); + paths.sort(); + paths.dedup(); + if paths.is_empty() { + let _ = std::fs::remove_file(queue); + return; + } - Ok(events.len()) + tracing::info!(count = paths.len(), "draining clickhouse reindex queue"); + let mut still_failing: Vec = Vec::new(); + for p in paths { + let path = Path::new(&p); + if !path.exists() { + tracing::warn!(segment = %p, "queued segment no longer exists; dropping from queue"); + continue; + } + match rt.block_on(Self::index_path(client, config, path)) { + Ok(count) => { + segments_indexed.fetch_add(1, Ordering::Relaxed); + events_indexed.fetch_add(count, Ordering::Relaxed); + tracing::info!(segment = %p, count, "reindexed queued segment"); + } + Err(e) => { + tracing::error!(segment = %p, error = %compact_error(&e), "reindex still failing"); + still_failing.push(p); + } + } + } + + let write_result = if still_failing.is_empty() { + std::fs::remove_file(queue).or_else(|e| { + if e.kind() == std::io::ErrorKind::NotFound { + Ok(()) + } else { + Err(e) + } + }) + } else { + std::fs::write(queue, format!("{}\n", still_failing.join("\n"))) + }; + if let Err(e) = write_result { + tracing::warn!(error = %e, queue = %queue.display(), "failed to update reindex queue"); + } } /// Read a notepack segment file and parse into EventRows. @@ -300,25 +435,10 @@ impl ClickHouseIndexer { /// Index a segment file directly (for batch operations without channel). pub async fn index_segment_file(&self, path: &Path) -> Result { - let events = Self::read_segment(path)?; - - if events.is_empty() { - return Ok(0); - } - - let mut inserter = self.client.insert(&self.config.table)?; - - for event in &events { - inserter.write(event).await?; - } - - inserter.end().await?; - - self.events_indexed - .fetch_add(events.len(), Ordering::Relaxed); + let count = Self::index_path(&self.client, &self.config, path).await?; + self.events_indexed.fetch_add(count, Ordering::Relaxed); self.segments_indexed.fetch_add(1, Ordering::Relaxed); - - Ok(events.len()) + Ok(count) } /// Get statistics about the indexer. From b78f78263cee406b4fd7a40a3dfc0f775787edde Mon Sep 17 00:00:00 2001 From: Jeff Gardner <202880+erskingardner@users.noreply.github.com> Date: Thu, 21 May 2026 09:29:18 +0200 Subject: [PATCH 08/11] Verify event id + signature in backfill validation (fix accepted forgeries) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Added strict regression tests asserting that tampered-signature and tampered-id events are rejected on every validation entry point. They revealed a real hole: `nostr::Event::from_json` only deserializes — it does NOT verify the ID hash or signature — so validate_event() (JSONL backfill) and validate_proto_event() (protobuf backfill) were returning Ok for forged events, which would then be written to the canonical archive. Fix: add an explicit verify_event_crypto() (verify_id + verify_signature) and call it from validate_event, validate_notebuf, and validate_proto_event. The live and negentropy paths were already safe (nostr-relay-pool verifies before emitting events); these tests also guard that shared invariant against future dependency changes. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/pensieve-core/src/event.rs | 56 +++++++++++++++++++++++++++++-- crates/pensieve-core/src/proto.rs | 54 +++++++++++++++++++++++++++-- 2 files changed, 105 insertions(+), 5 deletions(-) diff --git a/crates/pensieve-core/src/event.rs b/crates/pensieve-core/src/event.rs index 9be8eaf..8f6daf1 100644 --- a/crates/pensieve-core/src/event.rs +++ b/crates/pensieve-core/src/event.rs @@ -22,18 +22,41 @@ use notepack::{NoteBinary, NoteBuf}; // Required for Event::from_json() /// - Event ID doesn't match computed hash /// - Signature is invalid pub fn validate_event(event_json: &str) -> Result { - // The nostr crate's Event::from_json validates ID and signature automatically + // IMPORTANT: nostr's `Event::from_json` only DESERIALIZES — it does not verify + // the ID hash or signature — so we must verify explicitly. Without this a + // tampered event would pass validation and be written to the canonical archive. let event = nostr::Event::from_json(event_json)?; + verify_event_crypto(&event)?; Ok(event) } +/// Verify a deserialized event's ID hash and Schnorr signature. +/// +/// `nostr::Event::from_json` does NOT check these, so every validation entry +/// point that accepts untrusted input must call this before trusting the event. +pub(crate) fn verify_event_crypto(event: &nostr::Event) -> Result<()> { + if !event.verify_id() { + return Err(crate::error::Error::InvalidEventId { + computed: "recomputed id differs".to_string(), + expected: event.id.to_hex(), + }); + } + if !event.verify_signature() { + return Err(crate::error::Error::InvalidSignature( + "signature verification failed".to_string(), + )); + } + Ok(()) +} + /// Validates a NoteBuf event's ID and signature by converting through nostr crate. /// /// This is useful when you have a NoteBuf from notepack and want to validate it. pub fn validate_notebuf(note: &NoteBuf) -> Result<()> { - // Convert NoteBuf to JSON, then parse with nostr crate for validation + // Convert NoteBuf to JSON, then parse AND verify with the nostr crate. let json = serde_json::to_string(note)?; - let _event = nostr::Event::from_json(&json)?; + let event = nostr::Event::from_json(&json)?; + verify_event_crypto(&event)?; Ok(()) } @@ -306,6 +329,33 @@ mod tests { } } + #[test] + fn test_validate_event_strictly_rejects_tampered_signature() { + // Strict guard for the invariant the live + negentropy ingest paths rely + // on: the nostr crate verifies signatures during deserialization, so + // validate_event() (used by the JSONL backfill) must REJECT a bad signature + // outright rather than return an unverified event. If a future nostr + // version stopped verifying, this test fails loudly. + let event = make_test_event("strict sig test", Kind::TextNote, vec![]); + let mut json: serde_json::Value = serde_json::to_value(&event).unwrap(); + json["sig"] = serde_json::Value::String("a".repeat(128)); + assert!( + validate_event(&json.to_string()).is_err(), + "validate_event must reject a tampered signature" + ); + } + + #[test] + fn test_validate_event_strictly_rejects_tampered_id() { + let event = make_test_event("strict id test", Kind::TextNote, vec![]); + let mut json: serde_json::Value = serde_json::to_value(&event).unwrap(); + json["id"] = serde_json::Value::String("a".repeat(64)); + assert!( + validate_event(&json.to_string()).is_err(), + "validate_event must reject a tampered id" + ); + } + // ========================================================================= // validate_event_id tests // ========================================================================= diff --git a/crates/pensieve-core/src/proto.rs b/crates/pensieve-core/src/proto.rs index f606bf7..0b83871 100644 --- a/crates/pensieve-core/src/proto.rs +++ b/crates/pensieve-core/src/proto.rs @@ -29,10 +29,11 @@ pub use nostr_proto::{EventBatch, ProtoEvent, Tag}; /// - Event ID doesn't match computed hash /// - Signature is invalid pub fn validate_proto_event(proto: &ProtoEvent) -> Result { - // Convert ProtoEvent to JSON for validation via nostr crate - // This is the most reliable path since nostr crate validates everything + // Convert ProtoEvent to JSON, then parse AND verify. `from_json` only + // deserializes, so the ID hash + signature must be checked explicitly. let json = proto_to_json(proto)?; let event = nostr::Event::from_json(&json)?; + crate::event::verify_event_crypto(&event)?; Ok(event) } @@ -412,4 +413,53 @@ mod tests { let result = validate_proto_event(&proto); assert!(result.is_err()); } + + #[test] + fn test_validate_proto_event_rejects_tampered_signature() { + use nostr::{EventBuilder, Keys, Kind}; + let keys = Keys::generate(); + let event = EventBuilder::new(Kind::TextNote, "proto sig test") + .sign_with_keys(&keys) + .expect("sign"); + let mut proto = ProtoEvent { + id: event.id.to_hex(), + pubkey: event.pubkey.to_hex(), + created_at: event.created_at.as_secs() as i64, + kind: event.kind.as_u16() as i32, + tags: vec![], + content: event.content.clone(), + sig: event.sig.to_string(), + }; + // The untampered event validates... + assert!(validate_proto_event(&proto).is_ok()); + // ...but a tampered signature must be rejected (proto backfill path). + proto.sig = "a".repeat(128); + assert!( + validate_proto_event(&proto).is_err(), + "tampered-signature proto event must be rejected" + ); + } + + #[test] + fn test_validate_proto_event_rejects_tampered_id() { + use nostr::{EventBuilder, Keys, Kind}; + let keys = Keys::generate(); + let event = EventBuilder::new(Kind::TextNote, "proto id test") + .sign_with_keys(&keys) + .expect("sign"); + let mut proto = ProtoEvent { + id: event.id.to_hex(), + pubkey: event.pubkey.to_hex(), + created_at: event.created_at.as_secs() as i64, + kind: event.kind.as_u16() as i32, + tags: vec![], + content: event.content.clone(), + sig: event.sig.to_string(), + }; + proto.id = "a".repeat(64); + assert!( + validate_proto_event(&proto).is_err(), + "tampered-id proto event must be rejected" + ); + } } From 32dfb5600fc61ef0881458c4b4713d7d071cfaa3 Mon Sep 17 00:00:00 2001 From: Jeff Gardner <202880+erskingardner@users.noreply.github.com> Date: Thu, 21 May 2026 09:40:07 +0200 Subject: [PATCH 09/11] Ignore local reviews/ artifacts Keep local code-review output (e.g. reviews/review.md) out of version control. Co-Authored-By: Claude Opus 4.7 (1M context) --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index 67bcff6..b72af20 100644 --- a/.gitignore +++ b/.gitignore @@ -24,6 +24,9 @@ Thumbs.db # Local data directories (RocksDB, archive segments, etc.) data/ +# Local code-review artifacts +reviews/ + # Debug files *.pdb From 48487645ca9ef8dd3c2634715460992a7ecca44d Mon Sep 17 00:00:00 2001 From: Jeff Gardner <202880+erskingardner@users.noreply.github.com> Date: Thu, 21 May 2026 11:02:56 +0200 Subject: [PATCH 10/11] Add GitHub Actions CI Mirror `just precommit` in CI: rustfmt check, clippy (-D warnings), and the test suite via cargo-nextest sharded across 3 runners, plus a separate doctest job. Installs libclang for the RocksDB build and caches the cargo build. A ci-success job aggregates the matrix for a single required status check. Co-Authored-By: Claude Opus 4.7 (1M context) --- .github/workflows/ci.yml | 105 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) create mode 100644 .github/workflows/ci.yml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..08b2e87 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,105 @@ +name: CI + +# Mirrors `just precommit` (fmt + clippy + test) and shards the test suite across +# multiple runners with cargo-nextest. + +on: + push: + branches: [master] + pull_request: + +# Cancel superseded runs for the same ref (e.g. new pushes to a PR). +concurrency: + group: ci-${{ github.ref }} + cancel-in-progress: true + +env: + CARGO_TERM_COLOR: always + CARGO_INCREMENTAL: "0" + CARGO_NET_RETRY: "10" + RUST_BACKTRACE: "1" + +jobs: + fmt: + name: rustfmt + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Install Rust (stable + rustfmt) + uses: dtolnay/rust-toolchain@stable + with: + components: rustfmt + # Same as `just fmt-check`. + - run: cargo fmt --all -- --check + + clippy: + name: clippy + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + # The `rocksdb` crate builds librocksdb-sys via bindgen, which needs libclang. + - name: Install system dependencies + run: sudo apt-get update && sudo apt-get install -y clang libclang-dev + - name: Install Rust (stable + clippy) + uses: dtolnay/rust-toolchain@stable + with: + components: clippy + - uses: Swatinem/rust-cache@v2 + with: + shared-key: clippy + # Same as `just clippy`. + - run: cargo clippy --workspace --all-targets -- -D warnings + + test: + name: test (shard ${{ matrix.partition }}/3) + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + partition: [1, 2, 3] + steps: + - uses: actions/checkout@v4 + - name: Install system dependencies + run: sudo apt-get update && sudo apt-get install -y clang libclang-dev + - name: Install Rust (stable) + uses: dtolnay/rust-toolchain@stable + - uses: Swatinem/rust-cache@v2 + with: + shared-key: test + - name: Install cargo-nextest + uses: taiki-e/install-action@v2 + with: + tool: nextest + # Split the suite across runners; nextest assigns each test to one shard. + - run: cargo nextest run --workspace --partition count:${{ matrix.partition }}/3 + + doctest: + name: doctests + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Install system dependencies + run: sudo apt-get update && sudo apt-get install -y clang libclang-dev + - name: Install Rust (stable) + uses: dtolnay/rust-toolchain@stable + - uses: Swatinem/rust-cache@v2 + with: + shared-key: doctest + # nextest does not run doctests, so cover them separately. + - run: cargo test --workspace --doc + + # Single required status check that aggregates the matrix + other jobs, so branch + # protection only needs to require "CI success". + ci-success: + name: CI success + if: always() + needs: [fmt, clippy, test, doctest] + runs-on: ubuntu-latest + steps: + - name: Verify all jobs passed + run: | + if echo '${{ join(needs.*.result, ',') }}' | grep -qE 'failure|cancelled'; then + echo "One or more CI jobs failed." + exit 1 + fi + echo "All CI jobs passed." From eeafa2208c210257752ba1b00072aa70f830d822 Mon Sep 17 00:00:00 2001 From: Jeff Gardner <202880+erskingardner@users.noreply.github.com> Date: Thu, 21 May 2026 11:17:05 +0200 Subject: [PATCH 11/11] ci: install protobuf-compiler for the pensieve-core build pensieve-core's build script uses prost-build, which requires `protoc`. Without it every workspace build (clippy, tests, doctests) failed at the build script; add protobuf-compiler to the apt install alongside libclang. Co-Authored-By: Claude Opus 4.7 (1M context) --- .github/workflows/ci.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 08b2e87..9be4514 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -39,7 +39,7 @@ jobs: - uses: actions/checkout@v4 # The `rocksdb` crate builds librocksdb-sys via bindgen, which needs libclang. - name: Install system dependencies - run: sudo apt-get update && sudo apt-get install -y clang libclang-dev + run: sudo apt-get update && sudo apt-get install -y clang libclang-dev protobuf-compiler - name: Install Rust (stable + clippy) uses: dtolnay/rust-toolchain@stable with: @@ -60,7 +60,7 @@ jobs: steps: - uses: actions/checkout@v4 - name: Install system dependencies - run: sudo apt-get update && sudo apt-get install -y clang libclang-dev + run: sudo apt-get update && sudo apt-get install -y clang libclang-dev protobuf-compiler - name: Install Rust (stable) uses: dtolnay/rust-toolchain@stable - uses: Swatinem/rust-cache@v2 @@ -79,7 +79,7 @@ jobs: steps: - uses: actions/checkout@v4 - name: Install system dependencies - run: sudo apt-get update && sudo apt-get install -y clang libclang-dev + run: sudo apt-get update && sudo apt-get install -y clang libclang-dev protobuf-compiler - name: Install Rust (stable) uses: dtolnay/rust-toolchain@stable - uses: Swatinem/rust-cache@v2