Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 75 additions & 10 deletions cmd/ingestor/neighbor_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,20 @@ import (
// pulse here is sufficient to keep the snapshot fresh.
const NeighborEdgesBuilderInterval = 60 * time.Second

// neighborBuilderMaxBatch caps how many observation rows a single
// delta tick may process (#1339). With max_open_conns=1, an unbounded
// scan on a multi-million-row table holds the SQLite write lock for
// minutes and starves MQTT ingest. The cap keeps each tick bounded;
// if a backlog accumulates, successive ticks drain it 50k rows at a
// time without ever blocking ingest for long.
const neighborBuilderMaxBatch = 50000

// neighborBuilderSlowTickThreshold is the per-tick wallclock budget
// for the builder. Exceeding it is logged loudly so operators can
// catch a regression of #1339 quickly. The full instrumentation
// framework is tracked in #1340.
const neighborBuilderSlowTickThreshold = 5 * time.Second

// payloadADVERT mirrors the constant in cmd/server/decoder.go.
// Duplicated rather than imported so the ingestor binary stays
// independent of the server package.
Expand All @@ -42,13 +56,25 @@ func (s *Store) StartNeighborEdgesBuilder(interval time.Duration) func() {
stop := make(chan struct{})
done := make(chan struct{})

// Synchronous warm-up: a single pass so the first server load
// after process start sees a populated table.
if n, err := s.buildAndPersistNeighborEdges(); err != nil {
log.Printf("[neighbor-build] initial build error: %v", err)
} else {
log.Printf("[neighbor-build] initial build: %d edges upserted", n)
// Synchronous warm-up: on a fresh DB this is a full scan; on a DB
// with persisted neighbor_edges (most restarts), the watermark
// short-circuits it into a delta scan. Loop until the per-tick
// batch cap stops triggering so we drain any backlog before
// returning — first server load needs a fully-populated table.
wuStart := time.Now()
var wuTotal int
for {
n, err := s.buildAndPersistNeighborEdges()
if err != nil {
log.Printf("[neighbor-build] initial build error: %v", err)
break
}
wuTotal += n
if n < neighborBuilderMaxBatch {
break
}
}
log.Printf("[neighbor-build] initial build: %d edges upserted in %s", wuTotal, time.Since(wuStart))

var stopOnce sync.Once
go func() {
Expand All @@ -58,10 +84,16 @@ func (s *Store) StartNeighborEdgesBuilder(interval time.Duration) func() {
for {
select {
case <-t.C:
if n, err := s.buildAndPersistNeighborEdges(); err != nil {
log.Printf("[neighbor-build] tick error: %v", err)
start := time.Now()
n, err := s.buildAndPersistNeighborEdges()
dur := time.Since(start)
if err != nil {
log.Printf("[neighbor-build] tick error after %s: %v", dur, err)
} else if n > 0 {
log.Printf("[neighbor-build] %d edges upserted", n)
log.Printf("[neighbor-build] tick: %d edges in %s (delta from watermark)", n, dur)
}
if dur > neighborBuilderSlowTickThreshold {
log.Printf("[neighbor-build] SLOW tick: %s — possible regression of #1339", dur)
}
case <-stop:
return
Expand All @@ -83,6 +115,21 @@ func (s *Store) StartNeighborEdgesBuilder(interval time.Duration) func() {
// observer↔last-hop on all packet types) and upserts them into
// neighbor_edges. Returns count of attempted upserts.
//
// Watermark / delta semantics (#1339): the builder derives a watermark
// from MAX(neighbor_edges.last_seen). On an empty edges table (fresh
// DB), watermark is 0 and the builder does a full warm-up scan. On
// every subsequent call, the SELECT is restricted to observations
// whose timestamp is strictly greater than the watermark, bounded by
// neighborBuilderMaxBatch. neighbor_edges itself is the persistence —
// no metadata table or in-memory state is required, and restarts
// resume cleanly from whatever the table reflects.
//
// Trade-off (documented for #1340 follow-up): an anomalously-old
// observation that arrives AFTER its timestamp has already been
// crossed by the watermark will be skipped. Acceptable for an
// approximate neighbor graph; a periodic full-rebuild can be added
// later if needed.
//
// Resolution of hop-prefix → full pubkey is done via a one-shot
// SELECT of (lowered) pubkey prefixes from nodes. Prefixes with
// multiple candidates are skipped (matches the conservative
Expand All @@ -93,6 +140,21 @@ func (s *Store) buildAndPersistNeighborEdges() (int, error) {
return 0, fmt.Errorf("build prefix index: %w", err)
}

// Derive the watermark from the existing edges table. RFC3339
// → epoch seconds so it can be compared against observations.timestamp
// (stored as INTEGER unix epoch). On an empty edges table both the
// query and the parse return zero → full warm-up scan.
var watermarkRFC sql.NullString
if err := s.db.QueryRow(`SELECT MAX(last_seen) FROM neighbor_edges`).Scan(&watermarkRFC); err != nil {
return 0, fmt.Errorf("read watermark: %w", err)
}
var watermarkEpoch int64
if watermarkRFC.Valid && watermarkRFC.String != "" {
if t, parseErr := time.Parse(time.RFC3339, watermarkRFC.String); parseErr == nil {
watermarkEpoch = t.Unix()
}
}

rows, err := s.db.Query(`SELECT
t.payload_type,
t.decoded_json,
Expand All @@ -102,7 +164,10 @@ func (s *Store) buildAndPersistNeighborEdges() (int, error) {
o.timestamp
FROM observations o
JOIN transmissions t ON t.id = o.transmission_id
LEFT JOIN observers obs ON obs.rowid = o.observer_idx`)
LEFT JOIN observers obs ON obs.rowid = o.observer_idx
WHERE o.timestamp > ?
ORDER BY o.timestamp
LIMIT ?`, watermarkEpoch, neighborBuilderMaxBatch)
if err != nil {
return 0, fmt.Errorf("scan observations: %w", err)
}
Expand Down
195 changes: 195 additions & 0 deletions cmd/ingestor/neighbor_builder_delta_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package main

import (
"fmt"
"path/filepath"
"testing"
"time"
)

// TestNeighborEdgesBuilderDeltaScan enforces issue #1339:
// after the initial (warm-up) full build, subsequent ticks of
// buildAndPersistNeighborEdges MUST scan only observations newer
// than the most recent edge already persisted. The watermark is
// derived from MAX(neighbor_edges.last_seen) — neighbor_edges itself
// is the persistence, no separate metadata table.
//
// RED expectations:
// 1. After warm-up that produces edges, a second build with NO new
// observations is a fast no-op (<1s) and writes nothing.
// 2. After inserting K observations with timestamps strictly newer
// than the prior MAX(last_seen), the next build upserts exactly
// K edges in <1s.
// 3. Initial build (empty neighbor_edges) still does a full scan
// (warm-up preserved).
func TestNeighborEdgesBuilderDeltaScan(t *testing.T) {
if testing.Short() {
t.Skip("synthetic 100k-row benchmark; skipped in -short")
}

dir := t.TempDir()
dbPath := filepath.Join(dir, "delta.db")
store, err := OpenStore(dbPath)
if err != nil {
t.Fatalf("OpenStore: %v", err)
}
defer store.Close()

if _, err := store.db.Exec(
`INSERT INTO nodes (public_key, name) VALUES (?, ?), (?, ?)`,
"aaaaaaaaaa", "from-node",
"bbbbbbbbbb", "first-hop",
); err != nil {
t.Fatal(err)
}
if _, err := store.db.Exec(
`INSERT INTO observers (id, name) VALUES (?, ?)`,
"obs-1", "observer-1",
); err != nil {
t.Fatal(err)
}
var obsRowid int64
if err := store.db.QueryRow(`SELECT rowid FROM observers WHERE id = ?`, "obs-1").Scan(&obsRowid); err != nil {
t.Fatal(err)
}

// Baseline timestamps: a contiguous block ending at baselineMaxTs.
const baseline = 100_000
const baselineStartTs int64 = 1735689600 // 2025-01-01 UTC
baselineMaxTs := baselineStartTs + int64(baseline) - 1

tx, err := store.db.Begin()
if err != nil {
t.Fatal(err)
}
txStmt, err := tx.Prepare(`INSERT INTO transmissions
(raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json, from_pubkey)
VALUES ('', ?, ?, 0, ?, 0, '{}', 'aaaaaaaaaa')`)
if err != nil {
t.Fatal(err)
}
obsStmt, err := tx.Prepare(`INSERT INTO observations
(transmission_id, observer_idx, path_json, timestamp) VALUES (?, ?, '["bb"]', ?)`)
if err != nil {
t.Fatal(err)
}
for i := 0; i < baseline; i++ {
res, err := txStmt.Exec(fmt.Sprintf("h%d", i), baselineStartTs+int64(i), payloadADVERT)
if err != nil {
t.Fatal(err)
}
txID, _ := res.LastInsertId()
if _, err := obsStmt.Exec(txID, obsRowid, baselineStartTs+int64(i)); err != nil {
t.Fatal(err)
}
}
if err := tx.Commit(); err != nil {
t.Fatal(err)
}

// Initial warm-up: drain to completion (StartNeighborEdgesBuilder
// does the same — call directly so the test doesn't depend on the
// goroutine harness). Full scan allowed because neighbor_edges
// starts empty.
for {
n, err := store.buildAndPersistNeighborEdges()
if err != nil {
t.Fatalf("warm-up build: %v", err)
}
if n == 0 || n < 50000 {
break
}
}
var edgesAfterWarmup int
if err := store.db.QueryRow(`SELECT COUNT(*) FROM neighbor_edges`).Scan(&edgesAfterWarmup); err != nil {
t.Fatal(err)
}
if edgesAfterWarmup == 0 {
t.Fatal("warm-up produced 0 edges; can't establish a watermark")
}
// Sanity: MAX(last_seen) should reflect the baseline tail timestamp.
var maxLastSeen string
if err := store.db.QueryRow(`SELECT MAX(last_seen) FROM neighbor_edges`).Scan(&maxLastSeen); err != nil {
t.Fatal(err)
}
wantMax := time.Unix(baselineMaxTs, 0).UTC().Format(time.RFC3339)
if maxLastSeen != wantMax {
t.Fatalf("MAX(last_seen) after warm-up: want %s, got %s", wantMax, maxLastSeen)
}

// Tick #2: NO new observations. Expect no-op + fast.
noopStart := time.Now()
n2, err := store.buildAndPersistNeighborEdges()
if err != nil {
t.Fatalf("noop build: %v", err)
}
noopDur := time.Since(noopStart)
if n2 != 0 {
t.Fatalf("expected 0 edges on empty-delta tick; got %d (#1339)", n2)
}
if noopDur > time.Second {
t.Fatalf("empty-delta build took %v; expected <1s — builder is "+
"still doing a full table scan. (#1339)", noopDur)
}

// Tick #3: insert K observations with timestamps strictly newer
// than baselineMaxTs.
const delta = 100
deltaStartTs := baselineMaxTs + 1
tx2, err := store.db.Begin()
if err != nil {
t.Fatal(err)
}
txStmt2, err := tx2.Prepare(`INSERT INTO transmissions
(raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json, from_pubkey)
VALUES ('', ?, ?, 0, ?, 0, '{}', 'aaaaaaaaaa')`)
if err != nil {
t.Fatal(err)
}
obsStmt2, err := tx2.Prepare(`INSERT INTO observations
(transmission_id, observer_idx, path_json, timestamp) VALUES (?, ?, '["bb"]', ?)`)
if err != nil {
t.Fatal(err)
}
for i := 0; i < delta; i++ {
res, err := txStmt2.Exec(fmt.Sprintf("d%d", i), deltaStartTs+int64(i), payloadADVERT)
if err != nil {
t.Fatal(err)
}
txID, _ := res.LastInsertId()
if _, err := obsStmt2.Exec(txID, obsRowid, deltaStartTs+int64(i)); err != nil {
t.Fatal(err)
}
}
if err := tx2.Commit(); err != nil {
t.Fatal(err)
}

deltaStart := time.Now()
n3, err := store.buildAndPersistNeighborEdges()
if err != nil {
t.Fatalf("delta build: %v", err)
}
deltaDur := time.Since(deltaStart)
// Each ADVERT observation with a non-empty path produces 2 edge
// candidates (from↔hop[0] and observer↔hop[-1]). The watermark
// must clamp the scan to the delta rows ONLY — anything more
// proves the WHERE clause was bypassed.
if n3 != delta*2 {
t.Fatalf("expected %d edges upserted (delta only, 2 per advert obs); got %d. "+
"Builder must only scan observations with timestamp > MAX(neighbor_edges.last_seen). (#1339)",
delta*2, n3)
}
if deltaDur > 500*time.Millisecond {
t.Fatalf("delta build of %d rows took %v; expected <500ms. (#1339)", delta, deltaDur)
}

// Sanity: MAX(last_seen) advanced.
var maxLastSeen2 string
if err := store.db.QueryRow(`SELECT MAX(last_seen) FROM neighbor_edges`).Scan(&maxLastSeen2); err != nil {
t.Fatal(err)
}
if maxLastSeen2 <= maxLastSeen {
t.Fatalf("MAX(last_seen) did not advance: was %s, now %s", maxLastSeen, maxLastSeen2)
}
}