Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions cmd/server/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,14 @@ func (db *DB) QueryPackets(q PacketQuery) (*PacketResult, error) {
db.conn.QueryRow(countSQL, args...).Scan(&total)
}

// #1345: order by ingest id, NOT first_seen. PR #1233 made first_seen=rxTime,
// so buffered-then-uploaded observer packets with hours-old rxTime were
// sorting to the top/middle and hiding fresh ingest. Ordering by id keeps
// "latest activity" semantically equal to "what we ingested last" — which
// is what the packets page is showing. The `since=` filter still uses
// first_seen / observation timestamp, preserving "received-by-radio since X."
selectCols, observerJoin := db.transmissionBaseSQL()
querySQL := fmt.Sprintf("SELECT %s FROM transmissions t %s %s ORDER BY t.first_seen %s LIMIT ? OFFSET ?",
querySQL := fmt.Sprintf("SELECT %s FROM transmissions t %s %s ORDER BY t.id %s LIMIT ? OFFSET ?",
selectCols, observerJoin, w, q.Order)

qArgs := make([]interface{}, len(args))
Expand Down Expand Up @@ -1013,7 +1019,10 @@ func (db *DB) GetRecentTransmissionsForNode(pubkey string, limit int) ([]map[str

selectCols, observerJoin := db.transmissionBaseSQL()

querySQL := fmt.Sprintf("SELECT %s FROM transmissions t %s WHERE t.from_pubkey = ? ORDER BY t.first_seen DESC LIMIT ?",
// #1345: order by ingest id, not first_seen (=rxTime). Buffered observer
// uploads with old rxTime would otherwise displace fresh activity from
// the "recent transmissions for node" list.
querySQL := fmt.Sprintf("SELECT %s FROM transmissions t %s WHERE t.from_pubkey = ? ORDER BY t.id DESC LIMIT ?",
selectCols, observerJoin)
args := []interface{}{pubkey, limit}

Expand Down Expand Up @@ -1968,7 +1977,8 @@ func (db *DB) QueryMultiNodePackets(pubkeys []string, limit, offset int, order,
db.conn.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM transmissions t %s", w), args...).Scan(&total)

selectCols, observerJoin := db.transmissionBaseSQL()
querySQL := fmt.Sprintf("SELECT %s FROM transmissions t %s %s ORDER BY t.first_seen %s LIMIT ? OFFSET ?",
// #1345: order by ingest id (see QueryPackets comment above).
querySQL := fmt.Sprintf("SELECT %s FROM transmissions t %s %s ORDER BY t.id %s LIMIT ? OFFSET ?",
selectCols, observerJoin, w, order)

qArgs := make([]interface{}, len(args))
Expand Down
114 changes: 114 additions & 0 deletions cmd/server/packets_order_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package main

import (
"testing"
"time"
)

// TestQueryPacketsOrdersByIngestID is the regression test for issue #1345.
//
// PR #1233 changed `first_seen` to be the observer's receive time (rxTime),
// not the moment the server ingested the row. When an observer buffers
// offline and uploads hours later, its packets land with old first_seen
// values. The /api/packets handler previously ordered by
// `first_seen DESC`, so buffered uploads with old rxTime appeared at the
// bottom while older-ingested packets with newer rxTime took the top —
// users on the packets page saw "no recent activity" even though MQTT
// ingest was active.
//
// Fix: default ordering for /api/packets is `t.id DESC` (ingest order).
// This test inserts two rows where row order by id and order by
// first_seen DISAGREE, then asserts the result is ordered by id DESC.
func TestQueryPacketsOrdersByIngestID(t *testing.T) {
db := setupTestDB(t)
defer db.Close()

now := time.Now().UTC()
// Row A: ingested FIRST (lower id), rxTime "newer" (fresher first_seen)
freshFirstSeen := now.Add(-1 * time.Hour).Format(time.RFC3339)
// Row B: ingested SECOND (higher id), rxTime "older" — simulating a
// buffered observer upload that arrived after row A but contains a
// packet the radio received hours earlier.
bufferedFirstSeen := now.Add(-6 * time.Hour).Format(time.RFC3339)

if _, err := db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, payload_type)
VALUES ('AA', 'hashfresh00000001', ?, 4)`, freshFirstSeen); err != nil {
t.Fatal(err)
}
if _, err := db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, payload_type)
VALUES ('BB', 'hashbuffered00002', ?, 4)`, bufferedFirstSeen); err != nil {
t.Fatal(err)
}

result, err := db.QueryPackets(PacketQuery{Limit: 50, Order: "DESC"})
if err != nil {
t.Fatal(err)
}
if len(result.Packets) != 2 {
t.Fatalf("expected 2 packets, got %d", len(result.Packets))
}
// With first_seen DESC (the bug), the order would be [fresh, buffered]
// because the fresh row has the newer rxTime. With the fix (id DESC),
// order is [buffered, fresh] because the buffered row was ingested
// second and has the higher id.
first, _ := result.Packets[0]["hash"].(string)
second, _ := result.Packets[1]["hash"].(string)
if first != "hashbuffered00002" || second != "hashfresh00000001" {
t.Errorf("expected order [buffered, fresh] by ingest id DESC, got [%s, %s]",
first, second)
}
}

// TestQueryPacketsSinceFilterUsesFirstSeen documents the chosen semantic for
// the `since=` query param: it still filters by `first_seen` (radio receive
// time), NOT by ingest time. Rationale: callers using `since=` expect
// "packets the network received since X" — buffered uploads of older
// packets should still be EXCLUDED from a `since=15min` view even if
// they were ingested in the last 15 minutes. Display order is by ingest
// id (issue #1345 fix); filter semantic is unchanged.
func TestQueryPacketsSinceFilterUsesFirstSeen(t *testing.T) {
db := setupTestDB(t)
defer db.Close()

now := time.Now().UTC()
recent := now.Add(-30 * time.Minute).Format(time.RFC3339)
old := now.Add(-6 * time.Hour).Format(time.RFC3339)
sinceCutoff := now.Add(-1 * time.Hour).Format(time.RFC3339)
recentEpoch := now.Add(-30 * time.Minute).Unix()
oldEpoch := now.Add(-6 * time.Hour).Unix()

if _, err := db.conn.Exec(`INSERT INTO observers (id, name, last_seen, first_seen, packet_count)
VALUES ('obs1', 'Obs1', ?, ?, 1)`, recent, recent); err != nil {
t.Fatal(err)
}
if _, err := db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, payload_type)
VALUES ('AA', 'recentrx00000001', ?, 4)`, recent); err != nil {
t.Fatal(err)
}
// Buffered upload — ingested SECOND, but rxTime is 6h ago.
if _, err := db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, payload_type)
VALUES ('BB', 'oldrxbuffered001', ?, 4)`, old); err != nil {
t.Fatal(err)
}
if _, err := db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
VALUES (1, 1, 10, -90, '[]', ?)`, recentEpoch); err != nil {
t.Fatal(err)
}
if _, err := db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
VALUES (2, 1, 10, -90, '[]', ?)`, oldEpoch); err != nil {
t.Fatal(err)
}

result, err := db.QueryPackets(PacketQuery{Limit: 50, Order: "DESC", Since: sinceCutoff})
if err != nil {
t.Fatal(err)
}
if len(result.Packets) != 1 {
t.Fatalf("since= should filter by first_seen (rxTime); expected 1 packet, got %d",
len(result.Packets))
}
h, _ := result.Packets[0]["hash"].(string)
if h != "recentrx00000001" {
t.Errorf("expected the rxTime-recent packet, got %s", h)
}
}
18 changes: 16 additions & 2 deletions cmd/server/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1373,6 +1373,20 @@ func (s *PacketStore) QueryPackets(q PacketQuery) *PacketResult {
results := s.filterPackets(q)
total := len(results)

// #1345: order by ingest id, not insertion-into-s.packets order. After
// Load() (which orders by first_seen ASC) the slice is mostly id-ordered
// EXCEPT where rxTime ≠ ingest time — exactly the buffered-observer-upload
// case that hides fresh activity. Sort by ID DESC so "page 0" is always
// the most-recently-ingested transmissions, matching the DB-path fix.
// Cost: O(n log n) on the filtered set per query; acceptable for the
// typical filter-then-paginate flow (filterPackets already O(n)).
sortedByID := make([]*StoreTx, len(results))
copy(sortedByID, results)
sort.Slice(sortedByID, func(i, j int) bool {
return sortedByID[i].ID < sortedByID[j].ID
})
results = sortedByID

// results is oldest-first (ASC). For DESC (default) read backwards from the tail;
// for ASC read forwards. Both are O(page_size) — no sort copy needed.
start := q.Offset
Expand Down Expand Up @@ -1956,9 +1970,9 @@ func (s *PacketStore) QueryMultiNodePackets(pubkeys []string, limit, offset int,
filtered = append(filtered, tx)
}
}
// Sort oldest-first to match pagination expectations (same as s.packets order).
// #1345: sort by ingest id, not first_seen (=rxTime).
sort.Slice(filtered, func(i, j int) bool {
return filtered[i].FirstSeen < filtered[j].FirstSeen
return filtered[i].ID < filtered[j].ID
})

total := len(filtered)
Expand Down