diff --git a/cmd/ingestor/main.go b/cmd/ingestor/main.go index 54d0e684..6a9dcafe 100644 --- a/cmd/ingestor/main.go +++ b/cmd/ingestor/main.go @@ -200,9 +200,10 @@ func main() { // Neighbor-edges builder (#1287 — Option 4): ingestor owns // neighbor_edges writes. Runs every 60s. Server reads the snapshot // via cmd/server/neighbor_recomputer.go on the same cadence. - stopNeighborBuilder := store.StartNeighborEdgesBuilder(NeighborEdgesBuilderInterval) + neighborLookback := int64(cfg.NeighborEdgesDaysOrDefault()) * 86400 + stopNeighborBuilder := store.StartNeighborEdgesBuilder(NeighborEdgesBuilderInterval, neighborLookback) defer stopNeighborBuilder() - log.Printf("[neighbor-build] enabled (interval=%s)", NeighborEdgesBuilderInterval) + log.Printf("[neighbor-build] enabled (interval=%s, lookback=%dd)", NeighborEdgesBuilderInterval, cfg.NeighborEdgesDaysOrDefault()) channelKeys := loadChannelKeys(cfg, *configPath) if len(channelKeys) > 0 { diff --git a/cmd/ingestor/neighbor_builder.go b/cmd/ingestor/neighbor_builder.go index ff93d9df..35626398 100644 --- a/cmd/ingestor/neighbor_builder.go +++ b/cmd/ingestor/neighbor_builder.go @@ -35,7 +35,7 @@ type edgeRow struct { // The function returns a stop closure. Initial build runs synchronously // before the ticker starts so the server's first snapshot load picks // up real data instead of an empty table. -func (s *Store) StartNeighborEdgesBuilder(interval time.Duration) func() { +func (s *Store) StartNeighborEdgesBuilder(interval time.Duration, lookbackSecs int64) func() { if interval <= 0 { interval = NeighborEdgesBuilderInterval } @@ -44,7 +44,7 @@ func (s *Store) StartNeighborEdgesBuilder(interval time.Duration) func() { // Synchronous warm-up: a single pass so the first server load // after process start sees a populated table. - if n, err := s.buildAndPersistNeighborEdges(); err != nil { + if n, err := s.buildAndPersistNeighborEdges(lookbackSecs); err != nil { log.Printf("[neighbor-build] initial build error: %v", err) } else { log.Printf("[neighbor-build] initial build: %d edges upserted", n) @@ -58,7 +58,7 @@ func (s *Store) StartNeighborEdgesBuilder(interval time.Duration) func() { for { select { case <-t.C: - if n, err := s.buildAndPersistNeighborEdges(); err != nil { + if n, err := s.buildAndPersistNeighborEdges(lookbackSecs); err != nil { log.Printf("[neighbor-build] tick error: %v", err) } else if n > 0 { log.Printf("[neighbor-build] %d edges upserted", n) @@ -78,16 +78,17 @@ func (s *Store) StartNeighborEdgesBuilder(interval time.Duration) func() { } } -// buildAndPersistNeighborEdges scans transmissions + observations, -// extracts edge candidates (originator↔first-hop on ADVERTs; -// observer↔last-hop on all packet types) and upserts them into -// neighbor_edges. Returns count of attempted upserts. +// buildAndPersistNeighborEdges scans recent transmissions + observations +// (within lookbackSecs seconds) and upserts derived neighbor_edges rows. +// Using a bounded window avoids a full-table scan on large deployments; +// historical edges already in neighbor_edges are preserved by the upsert. +// Returns count of attempted upserts. // // Resolution of hop-prefix → full pubkey is done via a one-shot // SELECT of (lowered) pubkey prefixes from nodes. Prefixes with // multiple candidates are skipped (matches the conservative // resolution rule in cmd/server/extractEdgesFromObs). -func (s *Store) buildAndPersistNeighborEdges() (int, error) { +func (s *Store) buildAndPersistNeighborEdges(lookbackSecs int64) (int, error) { prefixIdx, err := buildPrefixIndex(s.db) if err != nil { return 0, fmt.Errorf("build prefix index: %w", err) @@ -102,7 +103,8 @@ func (s *Store) buildAndPersistNeighborEdges() (int, error) { o.timestamp FROM observations o JOIN transmissions t ON t.id = o.transmission_id - LEFT JOIN observers obs ON obs.rowid = o.observer_idx`) + LEFT JOIN observers obs ON obs.rowid = o.observer_idx + WHERE o.timestamp > strftime('%s', 'now') - ?`, lookbackSecs) if err != nil { return 0, fmt.Errorf("scan observations: %w", err) } diff --git a/cmd/ingestor/neighbor_builder_test.go b/cmd/ingestor/neighbor_builder_test.go index 65b63be7..b756f47f 100644 --- a/cmd/ingestor/neighbor_builder_test.go +++ b/cmd/ingestor/neighbor_builder_test.go @@ -3,6 +3,7 @@ package main import ( "path/filepath" "testing" + "time" ) // TestNeighborEdgesBuilderUpsertsFromObservations enforces issue @@ -61,12 +62,12 @@ func TestNeighborEdgesBuilderUpsertsFromObservations(t *testing.T) { // to bbbbb… in the nodes table). Expected edge: a↔b. if _, err := store.db.Exec( `INSERT INTO observations (transmission_id, observer_idx, path_json, timestamp) VALUES (?, ?, ?, ?)`, - txID, obsRowid, `["bb"]`, int64(1735689600), + txID, obsRowid, `["bb"]`, time.Now().Unix(), ); err != nil { t.Fatal(err) } - n, err := store.buildAndPersistNeighborEdges() + n, err := store.buildAndPersistNeighborEdges(7 * 86400) if err != nil { t.Fatalf("buildAndPersistNeighborEdges: %v", err) }