Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
name: CI

# Mirrors `just precommit` (fmt + clippy + test) and shards the test suite across
# multiple runners with cargo-nextest.

on:
push:
branches: [master]
pull_request:

# Cancel superseded runs for the same ref (e.g. new pushes to a PR).
concurrency:
group: ci-${{ github.ref }}
cancel-in-progress: true

env:
CARGO_TERM_COLOR: always
CARGO_INCREMENTAL: "0"
CARGO_NET_RETRY: "10"
RUST_BACKTRACE: "1"

jobs:
fmt:
name: rustfmt
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Rust (stable + rustfmt)
uses: dtolnay/rust-toolchain@stable
with:
components: rustfmt
# Same as `just fmt-check`.
- run: cargo fmt --all -- --check

clippy:
name: clippy
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
# The `rocksdb` crate builds librocksdb-sys via bindgen, which needs libclang.
- name: Install system dependencies
run: sudo apt-get update && sudo apt-get install -y clang libclang-dev protobuf-compiler
- name: Install Rust (stable + clippy)
uses: dtolnay/rust-toolchain@stable
with:
components: clippy
- uses: Swatinem/rust-cache@v2
with:
shared-key: clippy
# Same as `just clippy`.
- run: cargo clippy --workspace --all-targets -- -D warnings

test:
name: test (shard ${{ matrix.partition }}/3)
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
partition: [1, 2, 3]
steps:
- uses: actions/checkout@v4
- name: Install system dependencies
run: sudo apt-get update && sudo apt-get install -y clang libclang-dev protobuf-compiler
- name: Install Rust (stable)
uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
with:
shared-key: test
- name: Install cargo-nextest
uses: taiki-e/install-action@v2
with:
tool: nextest
# Split the suite across runners; nextest assigns each test to one shard.
- run: cargo nextest run --workspace --partition count:${{ matrix.partition }}/3

doctest:
name: doctests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install system dependencies
run: sudo apt-get update && sudo apt-get install -y clang libclang-dev protobuf-compiler
- name: Install Rust (stable)
uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
with:
shared-key: doctest
# nextest does not run doctests, so cover them separately.
- run: cargo test --workspace --doc

# Single required status check that aggregates the matrix + other jobs, so branch
# protection only needs to require "CI success".
ci-success:
name: CI success
if: always()
needs: [fmt, clippy, test, doctest]
runs-on: ubuntu-latest
steps:
- name: Verify all jobs passed
run: |
if echo '${{ join(needs.*.result, ',') }}' | grep -qE 'failure|cancelled'; then
echo "One or more CI jobs failed."
exit 1
fi
echo "All CI jobs passed."
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ Thumbs.db
# Local data directories (RocksDB, archive segments, etc.)
data/

# Local code-review artifacts
reviews/

# Debug files
*.pdb

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ notepack = { git = "https://github.com/erskingardner/notepack" }
# HTTP server
axum = "0.8"
tower = "0.5"
tower-http = { version = "0.6", features = ["cors", "trace"] }
tower-http = { version = "0.6", features = ["cors", "trace", "timeout"] }

# Observability
tracing = "0.1"
Expand Down
56 changes: 53 additions & 3 deletions crates/pensieve-core/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,41 @@ use notepack::{NoteBinary, NoteBuf}; // Required for Event::from_json()
/// - Event ID doesn't match computed hash
/// - Signature is invalid
pub fn validate_event(event_json: &str) -> Result<nostr::Event> {
// The nostr crate's Event::from_json validates ID and signature automatically
// IMPORTANT: nostr's `Event::from_json` only DESERIALIZES — it does not verify
// the ID hash or signature — so we must verify explicitly. Without this a
// tampered event would pass validation and be written to the canonical archive.
let event = nostr::Event::from_json(event_json)?;
verify_event_crypto(&event)?;
Ok(event)
}

/// Verify a deserialized event's ID hash and Schnorr signature.
///
/// `nostr::Event::from_json` does NOT check these, so every validation entry
/// point that accepts untrusted input must call this before trusting the event.
pub(crate) fn verify_event_crypto(event: &nostr::Event) -> Result<()> {
if !event.verify_id() {
return Err(crate::error::Error::InvalidEventId {
computed: "recomputed id differs".to_string(),
expected: event.id.to_hex(),
});
}
if !event.verify_signature() {
return Err(crate::error::Error::InvalidSignature(
"signature verification failed".to_string(),
));
}
Ok(())
}

/// Validates a NoteBuf event's ID and signature by converting through nostr crate.
///
/// This is useful when you have a NoteBuf from notepack and want to validate it.
pub fn validate_notebuf(note: &NoteBuf) -> Result<()> {
// Convert NoteBuf to JSON, then parse with nostr crate for validation
// Convert NoteBuf to JSON, then parse AND verify with the nostr crate.
let json = serde_json::to_string(note)?;
let _event = nostr::Event::from_json(&json)?;
let event = nostr::Event::from_json(&json)?;
verify_event_crypto(&event)?;
Ok(())
}

Expand Down Expand Up @@ -306,6 +329,33 @@ mod tests {
}
}

#[test]
fn test_validate_event_strictly_rejects_tampered_signature() {
// Strict guard for the invariant the live + negentropy ingest paths rely
// on: the nostr crate verifies signatures during deserialization, so
// validate_event() (used by the JSONL backfill) must REJECT a bad signature
// outright rather than return an unverified event. If a future nostr
// version stopped verifying, this test fails loudly.
let event = make_test_event("strict sig test", Kind::TextNote, vec![]);
let mut json: serde_json::Value = serde_json::to_value(&event).unwrap();
json["sig"] = serde_json::Value::String("a".repeat(128));
assert!(
validate_event(&json.to_string()).is_err(),
"validate_event must reject a tampered signature"
);
}

#[test]
fn test_validate_event_strictly_rejects_tampered_id() {
let event = make_test_event("strict id test", Kind::TextNote, vec![]);
let mut json: serde_json::Value = serde_json::to_value(&event).unwrap();
json["id"] = serde_json::Value::String("a".repeat(64));
assert!(
validate_event(&json.to_string()).is_err(),
"validate_event must reject a tampered id"
);
}

// =========================================================================
// validate_event_id tests
// =========================================================================
Expand Down
54 changes: 52 additions & 2 deletions crates/pensieve-core/src/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ pub use nostr_proto::{EventBatch, ProtoEvent, Tag};
/// - Event ID doesn't match computed hash
/// - Signature is invalid
pub fn validate_proto_event(proto: &ProtoEvent) -> Result<nostr::Event> {
// Convert ProtoEvent to JSON for validation via nostr crate
// This is the most reliable path since nostr crate validates everything
// Convert ProtoEvent to JSON, then parse AND verify. `from_json` only
// deserializes, so the ID hash + signature must be checked explicitly.
let json = proto_to_json(proto)?;
let event = nostr::Event::from_json(&json)?;
crate::event::verify_event_crypto(&event)?;
Ok(event)
}

Expand Down Expand Up @@ -412,4 +413,53 @@ mod tests {
let result = validate_proto_event(&proto);
assert!(result.is_err());
}

#[test]
fn test_validate_proto_event_rejects_tampered_signature() {
use nostr::{EventBuilder, Keys, Kind};
let keys = Keys::generate();
let event = EventBuilder::new(Kind::TextNote, "proto sig test")
.sign_with_keys(&keys)
.expect("sign");
let mut proto = ProtoEvent {
id: event.id.to_hex(),
pubkey: event.pubkey.to_hex(),
created_at: event.created_at.as_secs() as i64,
kind: event.kind.as_u16() as i32,
tags: vec![],
content: event.content.clone(),
sig: event.sig.to_string(),
};
// The untampered event validates...
assert!(validate_proto_event(&proto).is_ok());
// ...but a tampered signature must be rejected (proto backfill path).
proto.sig = "a".repeat(128);
assert!(
validate_proto_event(&proto).is_err(),
"tampered-signature proto event must be rejected"
);
}

#[test]
fn test_validate_proto_event_rejects_tampered_id() {
use nostr::{EventBuilder, Keys, Kind};
let keys = Keys::generate();
let event = EventBuilder::new(Kind::TextNote, "proto id test")
.sign_with_keys(&keys)
.expect("sign");
let mut proto = ProtoEvent {
id: event.id.to_hex(),
pubkey: event.pubkey.to_hex(),
created_at: event.created_at.as_secs() as i64,
kind: event.kind.as_u16() as i32,
tags: vec![],
content: event.content.clone(),
sig: event.sig.to_string(),
};
proto.id = "a".repeat(64);
assert!(
validate_proto_event(&proto).is_err(),
"tampered-id proto event must be rejected"
);
}
}
5 changes: 4 additions & 1 deletion crates/pensieve-ingest/src/bin/jsonl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ async fn index_segments_mode(
database: clickhouse_db.to_string(),
table: "events_local".to_string(),
batch_size: 10000,
reindex_queue_path: None,
};
let indexer = ClickHouseIndexer::new(ch_config)?;

Expand Down Expand Up @@ -708,7 +709,8 @@ fn init_pipeline(args: &Args, output: &Path) -> Result<PipelineComponents> {
segment_prefix: "segment".to_string(),
compress: !args.no_compress,
};
let segment_writer = Arc::new(SegmentWriter::new(segment_config, sealed_sender)?);
// Backfill marks archived itself (see below), so the writer gets no dedupe ref.
let segment_writer = Arc::new(SegmentWriter::new(segment_config, sealed_sender, None)?);

// Initialize ClickHouse indexer (optional)
let indexer_handle =
Expand All @@ -719,6 +721,7 @@ fn init_pipeline(args: &Args, output: &Path) -> Result<PipelineComponents> {
database: args.clickhouse_db.clone(),
table: "events_local".to_string(),
batch_size: 10000,
reindex_queue_path: None,
};
let indexer = ClickHouseIndexer::new(ch_config)?;
Some(indexer.start(receiver))
Expand Down
4 changes: 3 additions & 1 deletion crates/pensieve-ingest/src/bin/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,8 @@ fn init_pipeline(args: &Args) -> Result<PipelineComponents> {
"disabled"
}
);
let segment_writer = Arc::new(SegmentWriter::new(segment_config, sealed_sender)?);
// Backfill marks archived itself (see below), so the writer gets no dedupe ref.
let segment_writer = Arc::new(SegmentWriter::new(segment_config, sealed_sender, None)?);

// Initialize ClickHouse indexer (optional)
let indexer_handle =
Expand All @@ -740,6 +741,7 @@ fn init_pipeline(args: &Args) -> Result<PipelineComponents> {
database: args.clickhouse_db.clone(),
table: "events_local".to_string(),
batch_size: 10000,
reindex_queue_path: None,
};
let indexer = ClickHouseIndexer::new(ch_config)?;
Some(indexer.start(receiver))
Expand Down
Loading
Loading