diff --git a/cmd/server/db.go b/cmd/server/db.go index e5141076..e3739e68 100644 --- a/cmd/server/db.go +++ b/cmd/server/db.go @@ -493,8 +493,14 @@ func (db *DB) QueryPackets(q PacketQuery) (*PacketResult, error) { db.conn.QueryRow(countSQL, args...).Scan(&total) } + // #1345: order by ingest id, NOT first_seen. PR #1233 made first_seen=rxTime, + // so buffered-then-uploaded observer packets with hours-old rxTime were + // sorting to the top/middle and hiding fresh ingest. Ordering by id keeps + // "latest activity" semantically equal to "what we ingested last" — which + // is what the packets page is showing. The `since=` filter still uses + // first_seen / observation timestamp, preserving "received-by-radio since X." selectCols, observerJoin := db.transmissionBaseSQL() - querySQL := fmt.Sprintf("SELECT %s FROM transmissions t %s %s ORDER BY t.first_seen %s LIMIT ? OFFSET ?", + querySQL := fmt.Sprintf("SELECT %s FROM transmissions t %s %s ORDER BY t.id %s LIMIT ? OFFSET ?", selectCols, observerJoin, w, q.Order) qArgs := make([]interface{}, len(args)) @@ -1013,7 +1019,10 @@ func (db *DB) GetRecentTransmissionsForNode(pubkey string, limit int) ([]map[str selectCols, observerJoin := db.transmissionBaseSQL() - querySQL := fmt.Sprintf("SELECT %s FROM transmissions t %s WHERE t.from_pubkey = ? ORDER BY t.first_seen DESC LIMIT ?", + // #1345: order by ingest id, not first_seen (=rxTime). Buffered observer + // uploads with old rxTime would otherwise displace fresh activity from + // the "recent transmissions for node" list. + querySQL := fmt.Sprintf("SELECT %s FROM transmissions t %s WHERE t.from_pubkey = ? ORDER BY t.id DESC LIMIT ?", selectCols, observerJoin) args := []interface{}{pubkey, limit} @@ -1968,7 +1977,8 @@ func (db *DB) QueryMultiNodePackets(pubkeys []string, limit, offset int, order, db.conn.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM transmissions t %s", w), args...).Scan(&total) selectCols, observerJoin := db.transmissionBaseSQL() - querySQL := fmt.Sprintf("SELECT %s FROM transmissions t %s %s ORDER BY t.first_seen %s LIMIT ? OFFSET ?", + // #1345: order by ingest id (see QueryPackets comment above). + querySQL := fmt.Sprintf("SELECT %s FROM transmissions t %s %s ORDER BY t.id %s LIMIT ? OFFSET ?", selectCols, observerJoin, w, order) qArgs := make([]interface{}, len(args)) diff --git a/cmd/server/packets_order_test.go b/cmd/server/packets_order_test.go new file mode 100644 index 00000000..8d62efb9 --- /dev/null +++ b/cmd/server/packets_order_test.go @@ -0,0 +1,114 @@ +package main + +import ( + "testing" + "time" +) + +// TestQueryPacketsOrdersByIngestID is the regression test for issue #1345. +// +// PR #1233 changed `first_seen` to be the observer's receive time (rxTime), +// not the moment the server ingested the row. When an observer buffers +// offline and uploads hours later, its packets land with old first_seen +// values. The /api/packets handler previously ordered by +// `first_seen DESC`, so buffered uploads with old rxTime appeared at the +// bottom while older-ingested packets with newer rxTime took the top — +// users on the packets page saw "no recent activity" even though MQTT +// ingest was active. +// +// Fix: default ordering for /api/packets is `t.id DESC` (ingest order). +// This test inserts two rows where row order by id and order by +// first_seen DISAGREE, then asserts the result is ordered by id DESC. +func TestQueryPacketsOrdersByIngestID(t *testing.T) { + db := setupTestDB(t) + defer db.Close() + + now := time.Now().UTC() + // Row A: ingested FIRST (lower id), rxTime "newer" (fresher first_seen) + freshFirstSeen := now.Add(-1 * time.Hour).Format(time.RFC3339) + // Row B: ingested SECOND (higher id), rxTime "older" — simulating a + // buffered observer upload that arrived after row A but contains a + // packet the radio received hours earlier. + bufferedFirstSeen := now.Add(-6 * time.Hour).Format(time.RFC3339) + + if _, err := db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, payload_type) + VALUES ('AA', 'hashfresh00000001', ?, 4)`, freshFirstSeen); err != nil { + t.Fatal(err) + } + if _, err := db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, payload_type) + VALUES ('BB', 'hashbuffered00002', ?, 4)`, bufferedFirstSeen); err != nil { + t.Fatal(err) + } + + result, err := db.QueryPackets(PacketQuery{Limit: 50, Order: "DESC"}) + if err != nil { + t.Fatal(err) + } + if len(result.Packets) != 2 { + t.Fatalf("expected 2 packets, got %d", len(result.Packets)) + } + // With first_seen DESC (the bug), the order would be [fresh, buffered] + // because the fresh row has the newer rxTime. With the fix (id DESC), + // order is [buffered, fresh] because the buffered row was ingested + // second and has the higher id. + first, _ := result.Packets[0]["hash"].(string) + second, _ := result.Packets[1]["hash"].(string) + if first != "hashbuffered00002" || second != "hashfresh00000001" { + t.Errorf("expected order [buffered, fresh] by ingest id DESC, got [%s, %s]", + first, second) + } +} + +// TestQueryPacketsSinceFilterUsesFirstSeen documents the chosen semantic for +// the `since=` query param: it still filters by `first_seen` (radio receive +// time), NOT by ingest time. Rationale: callers using `since=` expect +// "packets the network received since X" — buffered uploads of older +// packets should still be EXCLUDED from a `since=15min` view even if +// they were ingested in the last 15 minutes. Display order is by ingest +// id (issue #1345 fix); filter semantic is unchanged. +func TestQueryPacketsSinceFilterUsesFirstSeen(t *testing.T) { + db := setupTestDB(t) + defer db.Close() + + now := time.Now().UTC() + recent := now.Add(-30 * time.Minute).Format(time.RFC3339) + old := now.Add(-6 * time.Hour).Format(time.RFC3339) + sinceCutoff := now.Add(-1 * time.Hour).Format(time.RFC3339) + recentEpoch := now.Add(-30 * time.Minute).Unix() + oldEpoch := now.Add(-6 * time.Hour).Unix() + + if _, err := db.conn.Exec(`INSERT INTO observers (id, name, last_seen, first_seen, packet_count) + VALUES ('obs1', 'Obs1', ?, ?, 1)`, recent, recent); err != nil { + t.Fatal(err) + } + if _, err := db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, payload_type) + VALUES ('AA', 'recentrx00000001', ?, 4)`, recent); err != nil { + t.Fatal(err) + } + // Buffered upload — ingested SECOND, but rxTime is 6h ago. + if _, err := db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, payload_type) + VALUES ('BB', 'oldrxbuffered001', ?, 4)`, old); err != nil { + t.Fatal(err) + } + if _, err := db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp) + VALUES (1, 1, 10, -90, '[]', ?)`, recentEpoch); err != nil { + t.Fatal(err) + } + if _, err := db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp) + VALUES (2, 1, 10, -90, '[]', ?)`, oldEpoch); err != nil { + t.Fatal(err) + } + + result, err := db.QueryPackets(PacketQuery{Limit: 50, Order: "DESC", Since: sinceCutoff}) + if err != nil { + t.Fatal(err) + } + if len(result.Packets) != 1 { + t.Fatalf("since= should filter by first_seen (rxTime); expected 1 packet, got %d", + len(result.Packets)) + } + h, _ := result.Packets[0]["hash"].(string) + if h != "recentrx00000001" { + t.Errorf("expected the rxTime-recent packet, got %s", h) + } +} diff --git a/cmd/server/store.go b/cmd/server/store.go index e793af2c..bcefacfc 100644 --- a/cmd/server/store.go +++ b/cmd/server/store.go @@ -1373,6 +1373,20 @@ func (s *PacketStore) QueryPackets(q PacketQuery) *PacketResult { results := s.filterPackets(q) total := len(results) + // #1345: order by ingest id, not insertion-into-s.packets order. After + // Load() (which orders by first_seen ASC) the slice is mostly id-ordered + // EXCEPT where rxTime ≠ ingest time — exactly the buffered-observer-upload + // case that hides fresh activity. Sort by ID DESC so "page 0" is always + // the most-recently-ingested transmissions, matching the DB-path fix. + // Cost: O(n log n) on the filtered set per query; acceptable for the + // typical filter-then-paginate flow (filterPackets already O(n)). + sortedByID := make([]*StoreTx, len(results)) + copy(sortedByID, results) + sort.Slice(sortedByID, func(i, j int) bool { + return sortedByID[i].ID < sortedByID[j].ID + }) + results = sortedByID + // results is oldest-first (ASC). For DESC (default) read backwards from the tail; // for ASC read forwards. Both are O(page_size) — no sort copy needed. start := q.Offset @@ -1956,9 +1970,9 @@ func (s *PacketStore) QueryMultiNodePackets(pubkeys []string, limit, offset int, filtered = append(filtered, tx) } } - // Sort oldest-first to match pagination expectations (same as s.packets order). + // #1345: sort by ingest id, not first_seen (=rxTime). sort.Slice(filtered, func(i, j int) bool { - return filtered[i].FirstSeen < filtered[j].FirstSeen + return filtered[i].ID < filtered[j].ID }) total := len(filtered)