Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/ingestor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,10 @@ func main() {
// Neighbor-edges builder (#1287 — Option 4): ingestor owns
// neighbor_edges writes. Runs every 60s. Server reads the snapshot
// via cmd/server/neighbor_recomputer.go on the same cadence.
stopNeighborBuilder := store.StartNeighborEdgesBuilder(NeighborEdgesBuilderInterval)
neighborLookback := int64(cfg.NeighborEdgesDaysOrDefault()) * 86400
stopNeighborBuilder := store.StartNeighborEdgesBuilder(NeighborEdgesBuilderInterval, neighborLookback)
defer stopNeighborBuilder()
log.Printf("[neighbor-build] enabled (interval=%s)", NeighborEdgesBuilderInterval)
log.Printf("[neighbor-build] enabled (interval=%s, lookback=%dd)", NeighborEdgesBuilderInterval, cfg.NeighborEdgesDaysOrDefault())

channelKeys := loadChannelKeys(cfg, *configPath)
if len(channelKeys) > 0 {
Expand Down
20 changes: 11 additions & 9 deletions cmd/ingestor/neighbor_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type edgeRow struct {
// The function returns a stop closure. Initial build runs synchronously
// before the ticker starts so the server's first snapshot load picks
// up real data instead of an empty table.
func (s *Store) StartNeighborEdgesBuilder(interval time.Duration) func() {
func (s *Store) StartNeighborEdgesBuilder(interval time.Duration, lookbackSecs int64) func() {
if interval <= 0 {
interval = NeighborEdgesBuilderInterval
}
Expand All @@ -44,7 +44,7 @@ func (s *Store) StartNeighborEdgesBuilder(interval time.Duration) func() {

// Synchronous warm-up: a single pass so the first server load
// after process start sees a populated table.
if n, err := s.buildAndPersistNeighborEdges(); err != nil {
if n, err := s.buildAndPersistNeighborEdges(lookbackSecs); err != nil {
log.Printf("[neighbor-build] initial build error: %v", err)
} else {
log.Printf("[neighbor-build] initial build: %d edges upserted", n)
Expand All @@ -58,7 +58,7 @@ func (s *Store) StartNeighborEdgesBuilder(interval time.Duration) func() {
for {
select {
case <-t.C:
if n, err := s.buildAndPersistNeighborEdges(); err != nil {
if n, err := s.buildAndPersistNeighborEdges(lookbackSecs); err != nil {
log.Printf("[neighbor-build] tick error: %v", err)
} else if n > 0 {
log.Printf("[neighbor-build] %d edges upserted", n)
Expand All @@ -78,16 +78,17 @@ func (s *Store) StartNeighborEdgesBuilder(interval time.Duration) func() {
}
}

// buildAndPersistNeighborEdges scans transmissions + observations,
// extracts edge candidates (originator↔first-hop on ADVERTs;
// observer↔last-hop on all packet types) and upserts them into
// neighbor_edges. Returns count of attempted upserts.
// buildAndPersistNeighborEdges scans recent transmissions + observations
// (within lookbackSecs seconds) and upserts derived neighbor_edges rows.
// Using a bounded window avoids a full-table scan on large deployments;
// historical edges already in neighbor_edges are preserved by the upsert.
// Returns count of attempted upserts.
//
// Resolution of hop-prefix → full pubkey is done via a one-shot
// SELECT of (lowered) pubkey prefixes from nodes. Prefixes with
// multiple candidates are skipped (matches the conservative
// resolution rule in cmd/server/extractEdgesFromObs).
func (s *Store) buildAndPersistNeighborEdges() (int, error) {
func (s *Store) buildAndPersistNeighborEdges(lookbackSecs int64) (int, error) {
prefixIdx, err := buildPrefixIndex(s.db)
if err != nil {
return 0, fmt.Errorf("build prefix index: %w", err)
Expand All @@ -102,7 +103,8 @@ func (s *Store) buildAndPersistNeighborEdges() (int, error) {
o.timestamp
FROM observations o
JOIN transmissions t ON t.id = o.transmission_id
LEFT JOIN observers obs ON obs.rowid = o.observer_idx`)
LEFT JOIN observers obs ON obs.rowid = o.observer_idx
WHERE o.timestamp > strftime('%s', 'now') - ?`, lookbackSecs)
if err != nil {
return 0, fmt.Errorf("scan observations: %w", err)
}
Expand Down
5 changes: 3 additions & 2 deletions cmd/ingestor/neighbor_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"path/filepath"
"testing"
"time"
)

// TestNeighborEdgesBuilderUpsertsFromObservations enforces issue
Expand Down Expand Up @@ -61,12 +62,12 @@ func TestNeighborEdgesBuilderUpsertsFromObservations(t *testing.T) {
// to bbbbb… in the nodes table). Expected edge: a↔b.
if _, err := store.db.Exec(
`INSERT INTO observations (transmission_id, observer_idx, path_json, timestamp) VALUES (?, ?, ?, ?)`,
txID, obsRowid, `["bb"]`, int64(1735689600),
txID, obsRowid, `["bb"]`, time.Now().Unix(),
); err != nil {
t.Fatal(err)
}

n, err := store.buildAndPersistNeighborEdges()
n, err := store.buildAndPersistNeighborEdges(7 * 86400)
if err != nil {
t.Fatalf("buildAndPersistNeighborEdges: %v", err)
}
Expand Down
Loading