Skip to content

Conversation

@withinboredom
Copy link
Member

@withinboredom withinboredom commented Oct 11, 2025

This adds the bare necessary plumbing for notifications to work. Still a long way to go.

Summary by CodeRabbit

  • New Features

    • Subscriptions & webhooks for key changes (batching, retries, auth) and a SUB command.
    • Concurrent-safe prefix trie for efficient subscription matching.
    • Real-time notification dispatch across cluster nodes with delivery/retry semantics.
  • Improvements

    • Context-aware request handling and reduced startup log verbosity.
    • Stricter ACL ownership checks and more consistent key-based routing.
  • Tests

    • Extensive unit & integration tests covering subscriptions, notifications, trie, retry/backoff, and concurrency.
  • Chores

    • CI Go version and dependency updates; improved test harness/scripts.

Signed-off-by: Robert Landers <[email protected]>
Signed-off-by: Robert Landers <[email protected]>
Signed-off-by: Robert Landers <[email protected]>
@linear
Copy link

linear bot commented Oct 11, 2025

ATLS-118 Notifications

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Oct 11, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

Adds a subscription/notification subsystem (proto, server, sender, magic-key handling), a new SUB command, key-name/type migration across consensus/kv APIs, a concurrent Trie, named locks, extensive tests and integration scenario, many API signature/type changes, dependency bumps, and assorted deletions/refactors.

Changes

Cohort / File(s) Summary
CI / Modules
\.github/workflows/go.yml, go.mod
Bump Go from 1.25.11.25.2 in workflow; update multiple Go module versions and add zeebo/blake3.
New SUB command
atlas/commands/string.go, atlas/commands/sub-command.go, integration-tests/harness/client.go
Add SUB routing and SubCommand implementation (Parse/GetNext/Execute) to create Subscribe KV changes; harness client methods for Subscribe and SubscribeWithOptions.
Consensus protobuf changes
atlas/consensus/consensus.proto
Add Subscribe, SubscribeOptions, Notify, SubscriptionList; change several fields from stringbytes (table/key/prefix) and extend KVChange oneof with sub and notify.
Notifications / subscriptions core
atlas/consensus/subscriptions.go, atlas/consensus/subscriptions_test.go, integration-tests/scenarios/notifications_test.go
New NotificationSender implementation (batched per-URL delivery, retries/backoff, GenerateNotification, magic-key handling, bucket logic) plus extensive unit and integration tests.
Server / Migration flow
atlas/consensus/server.go, atlas/consensus/majority-quorum.go, atlas/consensus/data-repository.go, atlas/consensus/broadcastQuorum.go
Propagate context into migration application; integrate notification generation/dispatch; add handling for KVChange_Sub/Notify; ACL ownership checks; replace some implementations (NewServer(), prefix-scan placeholder) and add broadcastQuorum.
Key/type migration (KeyName)
atlas/consensus/quorum.go, atlas/consensus/table-repository*.go, atlas/consensus/migration-repository.go, many consensus tests
Introduce KeyName []byte, change numerous method signatures and storage models to use KeyName/[]byte for table/key identifiers.
KV API changes
atlas/kv.go, atlas/kv/encoding.go, atlas/kv/badger.go, atlas/kv/pool.go
Replace builder-based key handling with consensus.KeyName usage; add Subscribe and SubscribeOptions; remove many legacy encoding/value/Record APIs; minor comment tweak in pool/badger.
Named locks and concurrency
atlas/consensus/named_locks.go, atlas/consensus/named_locks_test.go
Add per-name locker (namedLocker) with refcounting and tests.
Trie implementation
atlas/trie/trie.go, atlas/trie/trie_test.go
Add generic concurrent Trie[T] with Insert/PrefixesOf/LongestPrefixOf/Remove and comprehensive tests.
Commands & CLI updates
atlas/commands/key-command.go, atlas/commands/acl-command.go, atlas/commands/quorum-command.go, atlas/commands/key-command_test.go, atlas/commands/scan_commands.go
Replace kv.KeyBuilder returns with consensus.KeyName, update ACL/Key/Quorum command call sites and tests, adjust scanning to new PrefixScan signature and bytes joining.
Integration test harness
integration-tests/harness/node.go, integration-tests/harness/cluster.go
NewNode now accepts *testing.T; Node stores test handle and logs to t.Log; update callers.
Deleted / removed modules & tests
atlas/operations/table.go, atlas/operations/table_test.go, atlas/pool.go, atlas/test/util.go, atlas/nodes.go, various test removals`
Remove table SQL parsing/alter logic and its tests, remove atlas Param/pool helpers, test util and Node type deletions.
Caddy module & scripts
atlas/caddy/module/module.go, test_acl_e2e.sh
Lower startup log level and remove debug println; redirect script prints to stderr, session helpers and improved result handling.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant CLI as Atlas CLI
  participant Cmd as SubCommand
  participant KV as atlas/kv
  participant QM as QuorumManager
  participant Q as Quorum
  participant Srv as Consensus Server
  participant Notif as NotificationSender

  CLI->>Cmd: SUB <prefix> <url> [opts]
  Cmd->>KV: Subscribe(ctx, KeyName(prefix), url, opts)
  KV->>QM: GetQuorum(ctx, KeyName(prefix))
  QM-->>Q: Quorum
  Q->>Srv: AcceptMigration(ctx, KVChange{sub})
  Srv->>Notif: maybeHandleMagicKey(ctx, migration) / GenerateNotification
  Notif-->>Srv: enqueue/send (async)
  Srv-->>Q: Ack
  Q-->>Cmd: OK
  Cmd-->>CLI: Success
Loading
sequenceDiagram
  autonumber
  participant Server
  participant Notif as NotificationSender
  participant Webhook
  Server->>Notif: GenerateNotification(migration Set/Del/Acl)
  Note right of Notif: Batch per-URL (timer or size)
  par delivery per-URL
    Notif->>Webhook: POST batched notifications (with Auth header)
    Webhook-->>Notif: 2xx / 5xx
    alt non-2xx
      Notif->>Notif: retry with backoff (per options)
    end
  end
  Notif-->>Server: Log failures/success (async)
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related PRs

Poem

I thump a line, the SUB is near,
Buckets hum and webhooks cheer.
Tries remember every trail,
Named locks guard each tiny vale.
Retry, batch, and carrot-sent delight. 🥕

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 12.68% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title clearly signals that this pull request introduces the initial implementation of notification functionality, which aligns with the extensive changes setting up subscription and notification plumbing across the codebase. It is concise and free of unrelated details or noise, and it directly reflects the main feature under development. The “part 1” suffix appropriately indicates that further work is expected without obscuring the primary intent.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature/atls-118-notifications

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Signed-off-by: Robert Landers <[email protected]>
Signed-off-by: Robert Landers <[email protected]>
Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting

Comment on lines 51 to 63

key := builder.Build()
keyString := string(key)
tableName, ok := builder.TableName()
if !ok || tableName == "" {
if t, _, valid := kv.ParseTableRowKey(key); valid {
tableName = t
} else {
tableName = keyString
}
}

q, err := qm.GetQuorum(ctx, tableName)
q, err := qm.GetQuorum(ctx, keyString)
if err != nil {
return err
}

resp, err := q.WriteKey(ctx, &consensus.WriteKeyRequest{
Sender: nil,
Table: tableName,
Table: keyString,
Value: change,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Pass key instead of table when routing quorum operations

The recent refactor sends keyString to GetQuorum and into the WriteKeyRequest.Table field for all writes, reads, and deletes. The table name is now the entire key (e.g. users.alice) instead of the logical table (e.g. users). majorityQuorum.WriteKey treats the Table field as the unit of ownership and migration, so every distinct key now becomes its own table: the server will create a new table record, run table‑steal handshakes, and maintain a separate version counter per key. Table configuration (replication regions, ownership) is never consulted because it can no longer be looked up by name. This causes massive metadata bloat and bypasses any table‑level replication settings. The previous implementation extracted the table name from the key builder before calling GetQuorum; that behaviour should be preserved.

Useful? React with 👍 / 👎.

@coderabbitai coderabbitai bot added dependencies Pull requests that update a dependency file enhancement New feature or request labels Oct 11, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
.github/workflows/go.yml (1)

71-75: Keep all jobs on the same Go patch level.

Build and integration now install Go 1.25.2, but the lint job still pulls 1.25.1. Mixing toolchain patch levels makes the workflow flaky (e.g., if go env GOVERSION or module resolution diverges). Please bump the lint job to 1.25.2 as well.

🧹 Nitpick comments (2)
atlas/consensus/server.go (1)

380-386: Avoid spawning goroutines per mutation for notifications

Each SET/DEL/ACL change spins up a goroutine that immediately calls DefaultNotificationSender().Notify(...) and logs errors. This fans out goroutines on every migration replay, even during catch-up, and the async notifications race with the subsequent HandleNotifications() (which runs synchronously right after the switch). Consider queueing via the sender directly (or letting HandleNotifications drive dispatch) instead of launching ad-hoc goroutines per change.

Also applies to: 398-404, 505-511, 572-578

atlas/commands/sub-command.go (1)

143-165: Log quorum error without wrapping as errors.New

When quorum returns an error string, we wrap it with errors.New and lose stack/context. Prefer fmt.Errorf("...: %s", resp.Error) or just log raw string without synthesizing a new error object so zap logs remain structured.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 01413ce and fd801e6.

⛔ Files ignored due to path filters (2)
  • atlas/consensus/consensus.pb.go is excluded by !**/*.pb.go
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (15)
  • .github/workflows/go.yml (2 hunks)
  • atlas/caddy/module/module.go (1 hunks)
  • atlas/commands/string.go (1 hunks)
  • atlas/commands/sub-command.go (1 hunks)
  • atlas/consensus/consensus.proto (1 hunks)
  • atlas/consensus/data-repository.go (2 hunks)
  • atlas/consensus/majority-quorum.go (3 hunks)
  • atlas/consensus/server.go (9 hunks)
  • atlas/consensus/subscriptions.go (1 hunks)
  • atlas/consensus/subscriptions_test.go (1 hunks)
  • atlas/kv.go (3 hunks)
  • atlas/trie/trie.go (1 hunks)
  • atlas/trie/trie_test.go (1 hunks)
  • go.mod (4 hunks)
  • test_acl_e2e.sh (7 hunks)
🧰 Additional context used
🧬 Code graph analysis (10)
atlas/consensus/data-repository.go (1)
atlas/consensus/consensus.pb.go (4)
  • KVChange_Notification (1308-1310)
  • KVChange_Notification (1322-1322)
  • KVChange_Sub (1304-1306)
  • KVChange_Sub (1320-1320)
atlas/consensus/server.go (4)
atlas/consensus/consensus.pb.go (12)
  • Migration (1434-1445)
  • Migration (1458-1458)
  • Migration (1473-1475)
  • DataMigration_Change (1392-1394)
  • DataMigration_Change (1396-1396)
  • KVChange_Sub (1304-1306)
  • KVChange_Sub (1320-1320)
  • Notify (1056-1069)
  • Notify (1082-1082)
  • Notify (1097-1099)
  • KVChange_Notification (1308-1310)
  • KVChange_Notification (1322-1322)
atlas/kv/store.go (1)
  • Store (26-48)
atlas/consensus/subscriptions.go (1)
  • DefaultNotificationSender (49-51)
atlas/options/options.go (1)
  • Logger (28-28)
atlas/trie/trie_test.go (1)
atlas/trie/trie.go (1)
  • New (41-45)
atlas/kv.go (1)
atlas/consensus/consensus.pb.go (9)
  • WriteKeyRequest (2714-2721)
  • WriteKeyRequest (2734-2734)
  • WriteKeyRequest (2749-2751)
  • Table (1756-1770)
  • Table (1783-1783)
  • Table (1798-1800)
  • ReadKeyRequest (2473-2480)
  • ReadKeyRequest (2493-2493)
  • ReadKeyRequest (2508-2510)
atlas/commands/sub-command.go (3)
atlas/commands/string.go (2)
  • CommandString (44-51)
  • Command (29-42)
atlas/options/options.go (1)
  • Logger (28-28)
atlas/consensus/quorum.go (1)
  • GetDefaultQuorumManager (49-58)
atlas/consensus/majority-quorum.go (4)
atlas/consensus/server.go (1)
  • NewServer (48-50)
atlas/consensus/consensus.pb.go (12)
  • KVChange_Acl (1300-1302)
  • KVChange_Acl (1318-1318)
  • Record (556-566)
  • Record (579-579)
  • Record (594-596)
  • WriteKeyResponse (2774-2780)
  • WriteKeyResponse (2793-2793)
  • WriteKeyResponse (2808-2810)
  • KVChange_Notification (1308-1310)
  • KVChange_Notification (1322-1322)
  • KVChange_Sub (1304-1306)
  • KVChange_Sub (1320-1320)
atlas/kv/pool.go (1)
  • GetPool (65-69)
atlas/kv/store.go (1)
  • ErrKeyNotFound (96-96)
atlas/caddy/module/module.go (1)
atlas/options/options.go (1)
  • Logger (28-28)
atlas/commands/string.go (1)
atlas/commands/sub-command.go (1)
  • SubCommand (34-36)
atlas/consensus/subscriptions_test.go (5)
atlas/consensus/consensus.pb.go (74)
  • Migration (1434-1445)
  • Migration (1458-1458)
  • Migration (1473-1475)
  • MigrationVersion (196-204)
  • MigrationVersion (217-217)
  • MigrationVersion (232-234)
  • Migration_Data (1526-1528)
  • Migration_Data (1536-1536)
  • DataMigration (1324-1333)
  • DataMigration (1346-1346)
  • DataMigration (1361-1363)
  • DataMigration_Change (1392-1394)
  • DataMigration_Change (1396-1396)
  • KVChange (1178-1191)
  • KVChange (1204-1204)
  • KVChange (1219-1221)
  • KVChange_Set (1288-1290)
  • KVChange_Set (1312-1312)
  • SetChange (690-696)
  • SetChange (709-709)
  • SetChange (724-726)
  • Record (556-566)
  • Record (579-579)
  • Record (594-596)
  • Notify_Set (1160-1162)
  • Notify_Set (1172-1172)
  • KVChange_Del (1292-1294)
  • KVChange_Del (1314-1314)
  • DelChange (832-837)
  • DelChange (850-850)
  • DelChange (865-867)
  • Notify_Del (1164-1166)
  • Notify_Del (1174-1174)
  • KVChange_Acl (1300-1302)
  • KVChange_Acl (1318-1318)
  • AclChange (742-752)
  • AclChange (765-765)
  • AclChange (780-782)
  • AclChange_Addition (820-822)
  • AclChange_Addition (828-828)
  • ACL (444-451)
  • ACL (464-464)
  • ACL (479-481)
  • ACLData (2827-2834)
  • ACLData (2847-2847)
  • ACLData (2862-2864)
  • Notify_Acl (1168-1170)
  • Notify_Acl (1176-1176)
  • Migration_Schema (1522-1524)
  • Migration_Schema (1534-1534)
  • Subscribe (996-1003)
  • Subscribe (1016-1016)
  • Subscribe (1031-1033)
  • SubscribeOptions (928-936)
  • SubscribeOptions (949-949)
  • SubscribeOptions (964-966)
  • Notify (1056-1069)
  • Notify (1082-1082)
  • Notify (1097-1099)
  • Table (1756-1770)
  • Table (1783-1783)
  • Table (1798-1800)
  • Migration_None (1530-1532)
  • Migration_None (1538-1538)
  • NilMigration (1398-1402)
  • NilMigration (1415-1415)
  • NilMigration (1430-1432)
  • KVChange_Sub (1304-1306)
  • KVChange_Sub (1320-1320)
  • SubscriptionList (876-882)
  • SubscriptionList (895-895)
  • SubscriptionList (910-912)
  • KVChange_Notification (1308-1310)
  • KVChange_Notification (1322-1322)
atlas/kv/encoding.go (1)
  • NewKeyBuilder (47-54)
atlas/options/options.go (2)
  • Options (30-45)
  • CurrentOptions (71-71)
atlas/trie/trie.go (1)
  • New (41-45)
atlas/kv/pool.go (3)
  • CreatePool (42-62)
  • DrainPool (185-195)
  • GetPool (65-69)
atlas/consensus/subscriptions.go (7)
atlas/consensus/consensus.pb.go (43)
  • Notify (1056-1069)
  • Notify (1082-1082)
  • Notify (1097-1099)
  • Migration (1434-1445)
  • Migration (1458-1458)
  • Migration (1473-1475)
  • Subscribe (996-1003)
  • Subscribe (1016-1016)
  • Subscribe (1031-1033)
  • Table (1756-1770)
  • Table (1783-1783)
  • Table (1798-1800)
  • WriteKeyRequest (2714-2721)
  • WriteKeyRequest (2734-2734)
  • WriteKeyRequest (2749-2751)
  • Migration_Data (1526-1528)
  • Migration_Data (1536-1536)
  • KVChange_Set (1288-1290)
  • KVChange_Set (1312-1312)
  • KVChange_Notification (1308-1310)
  • KVChange_Notification (1322-1322)
  • Notify_Set (1160-1162)
  • Notify_Set (1172-1172)
  • KVChange_Del (1292-1294)
  • KVChange_Del (1314-1314)
  • Notify_Del (1164-1166)
  • Notify_Del (1174-1174)
  • KVChange_Acl (1300-1302)
  • KVChange_Acl (1318-1318)
  • Notify_Acl (1168-1170)
  • Notify_Acl (1176-1176)
  • Migration_None (1530-1532)
  • Migration_None (1538-1538)
  • Migration_Schema (1522-1524)
  • Migration_Schema (1534-1534)
  • KVChange_Sub (1304-1306)
  • KVChange_Sub (1320-1320)
  • SubscriptionList (876-882)
  • SubscriptionList (895-895)
  • SubscriptionList (910-912)
  • KVChange (1178-1191)
  • KVChange (1204-1204)
  • KVChange (1219-1221)
atlas/trie/trie.go (2)
  • Trie (23-28)
  • New (41-45)
atlas/options/options.go (1)
  • Logger (28-28)
atlas/consensus/quorum.go (1)
  • GetDefaultQuorumManager (49-58)
atlas/kv/encoding.go (2)
  • NewKeyBuilder (47-54)
  • Value (288-292)
atlas/kv/pool.go (1)
  • GetPool (65-69)
atlas/kv/store.go (1)
  • ErrKeyNotFound (96-96)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: build
  • GitHub Check: integration-test
  • GitHub Check: Analyze (go)

Comment on lines 344 to 360
dataMigration := migration.GetData()
mv := migration.GetVersion()

if halt, err := sender.maybeHandleMagicKey(ctx, migration); err != nil {
return err
} else if halt {
return nil
}

switch migrationType := dataMigration.GetSession().(type) {
case *DataMigration_Change:
switch op := migrationType.Change.GetOperation().(type) {
case *KVChange_Sub:
err := DefaultNotificationSender().Notify(migration)
if err != nil {
return err
}
case *KVChange_Notification:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

SUB migrations never persist subscriptions

When a KVChange_Sub comes through, we just call DefaultNotificationSender().Notify(migration) and return without ever writing the subscription anywhere. For non-magic prefixes maybeHandleMagicKey falls through, so nothing inserts the subscription into the backing store and Notify on a *Migration does nothing (it expects a *Notify). NET result: subscriptions are silently dropped. We need to route SUB operations to the subscription registry (the trie/bucket logic in subscriptions.go) — e.g. have maybeHandleMagicKey handle them or persist the subscription before returning.

Comment on lines +296 to +303
func (j jsonReader) Read(p []byte) (n int, err error) {
if j.pos >= len(j.data) {
return 0, io.EOF
}
n = copy(p, j.data[j.pos:])
//j.pos += n
return n, nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix jsonReader to advance the read offset.

jsonReader.Read never increments pos, so every Read returns the same bytes and never reaches EOF. Any HTTP client using it (several tests here) will either hang or time out waiting for the request body to finish streaming, so these tests will consistently fail. Update the reader to advance its offset (and return EOF when done) before relying on it.

 func (j jsonReader) Read(p []byte) (n int, err error) {
 	if j.pos >= len(j.data) {
 		return 0, io.EOF
 	}
 	n = copy(p, j.data[j.pos:])
-	//j.pos += n
-	return n, nil
+	j.pos += n
+	return n, nil
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (j jsonReader) Read(p []byte) (n int, err error) {
if j.pos >= len(j.data) {
return 0, io.EOF
}
n = copy(p, j.data[j.pos:])
//j.pos += n
return n, nil
}
func (j jsonReader) Read(p []byte) (n int, err error) {
if j.pos >= len(j.data) {
return 0, io.EOF
}
n = copy(p, j.data[j.pos:])
j.pos += n
return n, nil
}
🤖 Prompt for AI Agents
In atlas/consensus/subscriptions_test.go around lines 296 to 303,
jsonReader.Read never advances j.pos so repeated reads return the same bytes and
never reach EOF; update the method to increment j.pos by the number of bytes
copied (j.pos += n) and if advancing consumes all data, return the bytes read
and io.EOF (otherwise return bytes read and nil) so callers properly see EOF
when the stream is exhausted.

Comment on lines +125 to +160
bodyBytes, err := json.Marshal(nl)
if err != nil {
options.Logger.Error("failed to marshal notification list", zap.Error(err))
return
}

client := &http.Client{
Timeout: 2 * time.Second,
}

for retries := next.sub.GetOptions().GetRetryAttempts(); retries > 0; retries-- {
body := bytes.NewReader(bodyBytes)

req, err := http.NewRequest("POST", next.sub.GetUrl(), body)
if err != nil {
options.Logger.Error("failed to create notification request", zap.Error(err))
return
}

resp, err := client.Do(req)
if err != nil {
options.Logger.Error("failed to send notification", zap.Error(err))
return
}
_ = resp.Body.Close()
options.Logger.Info("sent notification", zap.Int("status_code", resp.StatusCode))
if resp.StatusCode == http.StatusOK {
return
}
options.Logger.Warn("failed to send notification", zap.Int("status_code", resp.StatusCode))
retryBase := next.sub.GetOptions().RetryAfterBase.AsDuration()
if retryBase == 0 {
retryBase = 100 * time.Millisecond
}
time.Sleep(retryBase * time.Duration(next.sub.GetOptions().GetRetryAttempts()-retries+1))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Set Content-Type and Authorization headers before dispatching notifications.

The POST request we issue to subscriber webhooks never sets Content-Type or the optional auth token from SubscribeOptions. Any endpoint expecting JSON or bearer auth will reject these notifications (HTTP 401/415), so retries will exhaust without ever succeeding. Populate both headers prior to client.Do(...).

 						req, err := http.NewRequest("POST", next.sub.GetUrl(), body)
 						if err != nil {
 							options.Logger.Error("failed to create notification request", zap.Error(err))
 							return
 						}
+						req.Header.Set("Content-Type", "application/json")
+						if auth := next.sub.GetOptions().GetAuth(); auth != "" {
+							req.Header.Set("Authorization", auth)
+						}
 
 						resp, err := client.Do(req)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
bodyBytes, err := json.Marshal(nl)
if err != nil {
options.Logger.Error("failed to marshal notification list", zap.Error(err))
return
}
client := &http.Client{
Timeout: 2 * time.Second,
}
for retries := next.sub.GetOptions().GetRetryAttempts(); retries > 0; retries-- {
body := bytes.NewReader(bodyBytes)
req, err := http.NewRequest("POST", next.sub.GetUrl(), body)
if err != nil {
options.Logger.Error("failed to create notification request", zap.Error(err))
return
}
resp, err := client.Do(req)
if err != nil {
options.Logger.Error("failed to send notification", zap.Error(err))
return
}
_ = resp.Body.Close()
options.Logger.Info("sent notification", zap.Int("status_code", resp.StatusCode))
if resp.StatusCode == http.StatusOK {
return
}
options.Logger.Warn("failed to send notification", zap.Int("status_code", resp.StatusCode))
retryBase := next.sub.GetOptions().RetryAfterBase.AsDuration()
if retryBase == 0 {
retryBase = 100 * time.Millisecond
}
time.Sleep(retryBase * time.Duration(next.sub.GetOptions().GetRetryAttempts()-retries+1))
}
req, err := http.NewRequest("POST", next.sub.GetUrl(), body)
if err != nil {
options.Logger.Error("failed to create notification request", zap.Error(err))
return
}
req.Header.Set("Content-Type", "application/json")
if auth := next.sub.GetOptions().GetAuth(); auth != "" {
req.Header.Set("Authorization", auth)
}
resp, err := client.Do(req)
🤖 Prompt for AI Agents
In atlas/consensus/subscriptions.go around lines 125 to 160, the POST request
never sets the Content-Type or optional Authorization header, causing endpoints
that require JSON or bearer auth to reject requests; before calling
client.Do(...) set req.Header.Set("Content-Type", "application/json") and, if
next.sub.GetOptions().GetAuthToken() is non-empty, set
req.Header.Set("Authorization", "Bearer "+token) (ensure you create the body
reader per retry as is already done so headers apply to the fresh request).

Comment on lines +132 to +144
if !cur.end {
return false
}
cur.end = false

for i := len(path) - 1; i >= 0; i-- {
n := path[i].cur
if n.end || len(n.children) > 0 {
break
}
delete(path[i].parent.children, path[i].r)
}
return true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Clear stored values when removing a prefix.

Remove flips end but leaves cur.value untouched. If the same prefix is later reinserted, the old entries reappear alongside the new one, so the removal isn’t effective. Nil out the value slice when removing.

-	cur.end = false
+	cur.end = false
+	cur.value = nil
🤖 Prompt for AI Agents
In atlas/trie/trie.go around lines 132 to 144, the Remove logic unsets cur.end
but leaves cur.value populated which causes old entries to reappear if the
prefix is reinserted; update the removal sequence to nil out the node's value
(e.g., cur.value = nil) when clearing end, doing this before pruning parent
children so the stored value is fully cleared and can be garbage-collected.

Signed-off-by: Robert Landers <[email protected]>
Signed-off-by: Robert Landers <[email protected]>
Signed-off-by: Robert Landers <[email protected]>
Signed-off-by: Robert Landers <[email protected]>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
atlas/consensus/broadcastQuorum.go (1)

366-369: Fix data race when building the broadcast quorum snapshot.

GetBroadcastQuorum hands broadcastQuorum the live q.nodes map without holding the manager lock or copying. Any concurrent AddNode/RemoveNode mutates that map while PrefixScan iterates it, triggering concurrent map iteration and map write or data races. Take the read lock and return a deep copy of the topology instead of the shared map.

 func (q *defaultQuorumManager) GetBroadcastQuorum(ctx context.Context) (Quorum, error) {
-	return &broadcastQuorum{
-		nodes: q.nodes,
-	}, nil
+	q.mu.RLock()
+	defer q.mu.RUnlock()
+
+	nodesCopy := make(map[RegionName][]*QuorumNode, len(q.nodes))
+	for region, nodes := range q.nodes {
+		cloned := make([]*QuorumNode, len(nodes))
+		copy(cloned, nodes)
+		nodesCopy[region] = cloned
+	}
+
+	return &broadcastQuorum{
+		nodes: nodesCopy,
+	}, nil
 }
atlas/consensus/majority-quorum.go (3)

258-264: ReadKey: wrong behavior when Promised is true.

If phase-1 returns Promised=true, this node can serve the read (optionally after accepting missing migrations). Returning ErrStealTableOwnershipFailed is incorrect.

Apply:

-	if phase1.Promised {
-		return nil, ErrStealTableOwnershipFailed
-	}
+	if phase1.Promised {
+		// Optionally accept missing migrations before reading locally.
+		s := NewServer()
+		for _, mig := range phase1.GetSuccess().GetMissingMigrations() {
+			if _, err := s.AcceptMigration(ctx, &WriteMigrationRequest{Sender: currentNode, Migration: mig}); err != nil {
+				return nil, err
+			}
+		}
+		return s.ReadKey(ctx, in)
+	}

513-518: Set in.Sender = currentNode for DeleteKey, mirroring WriteKey.

Keeps metadata consistent across ops.

 	currentNode, err := nr.GetNodeById(options.CurrentOptions.ServerId)
 	if err != nil {
 		return nil, err
 	}
+	in.Sender = currentNode

547-558: DeleteKey: align behavior with WriteKey when not leader (forward to owner).

Currently returns ErrStealTableOwnershipFailed; forward to the owner like WriteKey does.

 	if !phase1.Promised {
 		table = phase1.GetFailure().GetTable()
 		// we are not the leader, so update our tr with the new table information
 		err = upsertTable(ctx, tr, table)
 		if err != nil {
 			return nil, err
 		}
-		return nil, ErrStealTableOwnershipFailed
+		owner := table.GetOwner()
+		qm := GetDefaultQuorumManager(ctx)
+		resp, err := qm.Send(owner, func(node *QuorumNode) (any, error) {
+			return node.DeleteKey(ctx, in, opts...)
+		})
+		if err != nil {
+			return nil, errors.Join(errors.New("failed to forward delete key to owner"), err)
+		}
+		if resp == nil {
+			return nil, errors.New("owner returned nil response")
+		}
+		return resp.(*WriteKeyResponse), nil
 	}
♻️ Duplicate comments (4)
atlas/kv.go (1)

30-70: Critical: Key passed as table identifier breaks table-based consensus.

All write and read operations in this file pass the full key (e.g., users.alice) directly to GetQuorum(ctx, key) and use it in the WriteKeyRequest.Table field where only the table name (e.g., users) should be used.

Impact:

  • Every distinct key becomes its own "table" for consensus purposes
  • Table-level ownership, versioning, and replication configuration is bypassed
  • Massive metadata bloat from per-key table records
  • Table-steal handshakes run per-key instead of per-table

Required fix: Extract the table name from the key before calling GetQuorum and populate WriteKeyRequest.Table with just the table portion of the key.

For example, if keys follow a pattern like table.row, extract table before passing to GetQuorum:

 func sendWrite(ctx context.Context, key consensus.KeyName, change *consensus.KVChange) error {
 	qm := consensus.GetDefaultQuorumManager(ctx)
 
-	q, err := qm.GetQuorum(ctx, key)
+	tableName := extractTableName(key) // Implement table extraction logic
+	q, err := qm.GetQuorum(ctx, tableName)
 	if err != nil {
 		return err
 	}
 
 	resp, err := q.WriteKey(ctx, &consensus.WriteKeyRequest{
 		Sender: nil,
-		Table:  key,
+		Table:  tableName,
 		Value:  change,
 	})

This issue affects WriteKey, DeleteKey, all ACL operations, GetKey, Subscribe, and indirectly PrefixScan.

Based on past review comments.

Also applies to: 72-88, 90-106, 108-124, 126-143, 145-162, 164-181, 183-201, 205-232

atlas/consensus/server.go (1)

1117-1117: Critical: GetTable called with full key instead of table name.

GetTable(req.GetKey()) passes the entire key (e.g., users.alice) where only the table name (e.g., users) should be extracted. This causes every key to become its own "table," bypassing table-level ownership and replication configuration, leading to metadata bloat and broken consensus semantics.

The key must be parsed to extract the table name before calling GetTable.

Based on past review comments.

atlas/consensus/subscriptions.go (1)

162-197: Set Content-Type and Authorization headers before dispatching notifications.

The HTTP POST request never sets Content-Type or the optional Authorization header from SubscribeOptions. Subscriber endpoints expecting JSON (with Content-Type: application/json) or bearer authentication will reject these notifications with HTTP 415 (Unsupported Media Type) or 401 (Unauthorized), causing retries to exhaust without ever succeeding.

Apply this diff to set both headers:

 					req, err := http.NewRequest("POST", next.sub.GetUrl(), body)
 					if err != nil {
 						options.Logger.Error("failed to create notification request", zap.Error(err))
 						return
 					}
+					req.Header.Set("Content-Type", "application/json")
+					if auth := next.sub.GetOptions().GetAuth(); auth != "" {
+						req.Header.Set("Authorization", auth)
+					}
 
 					resp, err := client.Do(req)

Based on past review comments.

atlas/consensus/subscriptions_test.go (1)

296-303: Fix jsonReader: advance offset and use pointer receiver.

Currently Read never advances and uses a value receiver, causing infinite body streams/timeouts.

Apply these changes:

-type jsonReader struct {
+type jsonReader struct {
 	data []byte
 	pos  int
 }
 
-func (j jsonReader) Read(p []byte) (n int, err error) {
+func (j *jsonReader) Read(p []byte) (n int, err error) {
 	if j.pos >= len(j.data) {
 		return 0, io.EOF
 	}
 	n = copy(p, j.data[j.pos:])
-	//j.pos += n
-	return n, nil
+	j.pos += n
+	if j.pos >= len(j.data) {
+		return n, io.EOF
+	}
+	return n, nil
 }

Update call sites to pass a pointer:

-req, err := http.NewRequest("POST", sub.GetUrl(), jsonReader{data: bodyBytes})
+req, err := http.NewRequest("POST", sub.GetUrl(), &jsonReader{data: bodyBytes})

(Do this in both tests where jsonReader is used.)

🧹 Nitpick comments (6)
atlas/consensus/server.go (1)

42-42: Consider making NodeTable immutable or documenting mutability rationale.

NodeTable changed from a constant to a mutable variable. If this identifier is not intended to change at runtime, consider using a const or explaining why mutability is needed. Accidental modification of this value could cause routing and consensus issues.

If immutability is desired:

-var NodeTable = KeyName("atlas.nodes")
+const NodeTableName = "atlas.nodes"
+var NodeTable = KeyName(NodeTableName)

Or add a comment explaining the mutability requirement.

atlas/consensus/subscriptions.go (1)

284-303: Add inline documentation and audit bucket tests for edge cases

  • Document the bit-manipulation in currentBucket/nextBucket (e.g., how bits.Len drives the power-of-2 prefix sizes).
  • Ensure the existing TestNotificationSender_CurrentBucket and TestNotificationSender_NextBucket cover empty keys and single‐byte keys.
atlas/consensus/named_locks.go (1)

89-97: Avoid recursion in lock path.

Recursive retry on addRef error can stack overflow under persistent contention (even if unlikely). Prefer a loop-based retry.

Example approach:

  • While holding l.mu, attempt addRef.
  • If it fails, release l.mu and retry acquisition in a loop.

No functional change otherwise.

atlas/consensus/majority-quorum.go (3)

347-359: Forwarding log should not be at error level.

Forwarding to the owner is a normal path; use Info.

-		options.Logger.Error("forwarding write key to owner", zap.Int64("owner_id", owner.GetId()))
+		options.Logger.Info("forwarding write key to owner", zap.Int64("owner_id", owner.GetId()))

566-575: Use NewServer() for consistency when accepting migrations.

-	s := Server{}
+	s := NewServer()
 	for _, migration := range phase1.GetSuccess().GetMissingMigrations() {
 		_, err = s.AcceptMigration(ctx, &WriteMigrationRequest{
 			Sender:    currentNode,
 			Migration: migration,
 		})

218-221: Meta/Data store nil checks (optional hardening).

kv.GetPool() may be nil early in process lifetime. Consider guarding like in Gossip.

Optional, but improves resilience.

If you rely on initialization order to guarantee pool presence here, ignore.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between ea6d17e and 0b18917.

⛔ Files ignored due to path filters (1)
  • atlas/consensus/consensus.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (37)
  • atlas/caddy/module/module.go (2 hunks)
  • atlas/commands/acl-command.go (3 hunks)
  • atlas/commands/key-command.go (2 hunks)
  • atlas/commands/key-command_test.go (4 hunks)
  • atlas/commands/quorum-command.go (1 hunks)
  • atlas/commands/scan_commands.go (3 hunks)
  • atlas/commands/sub-command.go (1 hunks)
  • atlas/consensus/broadcastQuorum.go (1 hunks)
  • atlas/consensus/consensus.proto (5 hunks)
  • atlas/consensus/data-repository.go (2 hunks)
  • atlas/consensus/majority-quorum.go (13 hunks)
  • atlas/consensus/migration-repository-kv_test.go (17 hunks)
  • atlas/consensus/migration-repository.go (4 hunks)
  • atlas/consensus/named_locks.go (1 hunks)
  • atlas/consensus/named_locks_test.go (1 hunks)
  • atlas/consensus/quorum-integration_test.go (4 hunks)
  • atlas/consensus/quorum.go (5 hunks)
  • atlas/consensus/server.go (21 hunks)
  • atlas/consensus/server_acl_test.go (2 hunks)
  • atlas/consensus/subscriptions.go (1 hunks)
  • atlas/consensus/subscriptions_test.go (1 hunks)
  • atlas/consensus/table-repository-kv.go (18 hunks)
  • atlas/consensus/table-repository-kv_test.go (20 hunks)
  • atlas/consensus/table-repository.go (2 hunks)
  • atlas/kv.go (11 hunks)
  • atlas/kv/badger.go (0 hunks)
  • atlas/kv/encoding.go (5 hunks)
  • atlas/kv/pool.go (1 hunks)
  • atlas/nodes.go (0 hunks)
  • atlas/operations/table.go (0 hunks)
  • atlas/operations/table_test.go (0 hunks)
  • atlas/pool.go (0 hunks)
  • atlas/test/util.go (0 hunks)
  • integration-tests/harness/client.go (1 hunks)
  • integration-tests/harness/cluster.go (2 hunks)
  • integration-tests/harness/node.go (4 hunks)
  • integration-tests/scenarios/notifications_test.go (1 hunks)
💤 Files with no reviewable changes (6)
  • atlas/kv/badger.go
  • atlas/operations/table_test.go
  • atlas/pool.go
  • atlas/nodes.go
  • atlas/operations/table.go
  • atlas/test/util.go
✅ Files skipped from review due to trivial changes (1)
  • atlas/kv/pool.go
🚧 Files skipped from review as they are similar to previous changes (2)
  • atlas/consensus/data-repository.go
  • atlas/caddy/module/module.go
🧰 Additional context used
🧬 Code graph analysis (22)
atlas/consensus/migration-repository.go (4)
atlas/consensus/quorum.go (1)
  • KeyName (62-62)
atlas/consensus/consensus.pb.go (6)
  • MigrationVersion (196-204)
  • MigrationVersion (217-217)
  • MigrationVersion (232-234)
  • Migration (1442-1453)
  • Migration (1466-1466)
  • Migration (1481-1483)
atlas/consensus/repository.go (1)
  • Prefix (49-51)
atlas/kv/encoding.go (1)
  • NewKeyBuilder (42-49)
integration-tests/harness/client.go (2)
atlas/kv.go (1)
  • Subscribe (262-286)
atlas/consensus/consensus.pb.go (3)
  • Subscribe (996-1003)
  • Subscribe (1016-1016)
  • Subscribe (1031-1033)
integration-tests/harness/cluster.go (1)
integration-tests/harness/node.go (1)
  • NewNode (50-69)
atlas/consensus/quorum-integration_test.go (1)
atlas/consensus/quorum.go (2)
  • DescribeQuorum (782-790)
  • KeyName (62-62)
integration-tests/harness/node.go (1)
integration-tests/harness/config.go (1)
  • NodeConfig (29-38)
atlas/commands/quorum-command.go (1)
atlas/consensus/quorum.go (2)
  • DescribeQuorum (782-790)
  • KeyName (62-62)
atlas/commands/key-command.go (1)
atlas/consensus/quorum.go (1)
  • KeyName (62-62)
integration-tests/scenarios/notifications_test.go (3)
integration-tests/harness/cluster.go (2)
  • NewCluster (72-129)
  • ClusterConfig (65-70)
atlas/kv.go (1)
  • Subscribe (262-286)
atlas/consensus/consensus.pb.go (3)
  • Subscribe (996-1003)
  • Subscribe (1016-1016)
  • Subscribe (1031-1033)
atlas/consensus/subscriptions_test.go (5)
atlas/consensus/consensus.pb.go (79)
  • Migration (1442-1453)
  • Migration (1466-1466)
  • Migration (1481-1483)
  • MigrationVersion (196-204)
  • MigrationVersion (217-217)
  • MigrationVersion (232-234)
  • Migration_Data (1534-1536)
  • Migration_Data (1544-1544)
  • DataMigration (1332-1341)
  • DataMigration (1354-1354)
  • DataMigration (1369-1371)
  • DataMigration_Change (1400-1402)
  • DataMigration_Change (1404-1404)
  • KVChange (1186-1199)
  • KVChange (1212-1212)
  • KVChange (1227-1229)
  • KVChange_Set (1296-1298)
  • KVChange_Set (1320-1320)
  • SetChange (690-696)
  • SetChange (709-709)
  • SetChange (724-726)
  • Record (556-566)
  • Record (579-579)
  • Record (594-596)
  • Record_Value (634-636)
  • Record_Value (642-642)
  • RawData (646-651)
  • RawData (664-664)
  • RawData (679-681)
  • Notify_Set (1168-1170)
  • Notify_Set (1180-1180)
  • KVChange_Del (1300-1302)
  • KVChange_Del (1322-1322)
  • DelChange (832-837)
  • DelChange (850-850)
  • DelChange (865-867)
  • Notify_Del (1172-1174)
  • Notify_Del (1182-1182)
  • KVChange_Acl (1308-1310)
  • KVChange_Acl (1326-1326)
  • AclChange (742-752)
  • AclChange (765-765)
  • AclChange (780-782)
  • AclChange_Addition (820-822)
  • AclChange_Addition (828-828)
  • ACL (444-451)
  • ACL (464-464)
  • ACL (479-481)
  • ACLData (2819-2826)
  • ACLData (2839-2839)
  • ACLData (2854-2856)
  • Notify_Acl (1176-1178)
  • Notify_Acl (1184-1184)
  • Migration_Schema (1530-1532)
  • Migration_Schema (1542-1542)
  • Subscribe (996-1003)
  • Subscribe (1016-1016)
  • Subscribe (1031-1033)
  • SubscribeOptions (928-936)
  • SubscribeOptions (949-949)
  • SubscribeOptions (964-966)
  • Notify (1056-1070)
  • Notify (1083-1083)
  • Notify (1098-1100)
  • Table (1764-1778)
  • Table (1791-1791)
  • Table (1806-1808)
  • Migration_None (1538-1540)
  • Migration_None (1546-1546)
  • NilMigration (1406-1410)
  • NilMigration (1423-1423)
  • NilMigration (1438-1440)
  • KVChange_Sub (1312-1314)
  • KVChange_Sub (1328-1328)
  • SubscriptionList (876-882)
  • SubscriptionList (895-895)
  • SubscriptionList (910-912)
  • KVChange_Notify (1316-1318)
  • KVChange_Notify (1330-1330)
atlas/kv.go (2)
  • Subscribe (262-286)
  • SubscribeOptions (256-260)
atlas/options/options.go (2)
  • Options (30-45)
  • CurrentOptions (71-71)
atlas/trie/trie.go (1)
  • New (41-45)
atlas/kv/pool.go (3)
  • CreatePool (42-62)
  • DrainPool (185-195)
  • GetPool (65-69)
atlas/consensus/server.go (7)
atlas/consensus/quorum.go (1)
  • KeyName (62-62)
atlas/options/options.go (2)
  • Logger (28-28)
  • CurrentOptions (71-71)
atlas/consensus/consensus.pb.go (23)
  • Migration (1442-1453)
  • Migration (1466-1466)
  • Migration (1481-1483)
  • Migration_Schema (1530-1532)
  • Migration_Schema (1542-1542)
  • Migration_Data (1534-1536)
  • Migration_Data (1544-1544)
  • KVChange_Sub (1312-1314)
  • KVChange_Sub (1328-1328)
  • Notify (1056-1070)
  • Notify (1083-1083)
  • Notify (1098-1100)
  • KVChange_Notify (1316-1318)
  • KVChange_Notify (1330-1330)
  • Node (2178-2188)
  • Node (2201-2201)
  • Node (2216-2218)
  • Table (1764-1778)
  • Table (1791-1791)
  • Table (1806-1808)
  • Record (556-566)
  • Record (579-579)
  • Record (594-596)
atlas/kv/store.go (2)
  • Store (26-48)
  • Transaction (51-55)
atlas/consensus/subscriptions.go (1)
  • DefaultNotificationSender (58-60)
atlas/consensus/node-repository.go (1)
  • NewNodeRepository (44-53)
atlas/kv.go (2)
  • GetKey (183-201)
  • PrefixScan (236-254)
atlas/commands/acl-command.go (1)
atlas/consensus/quorum.go (1)
  • KeyName (62-62)
atlas/kv.go (2)
atlas/consensus/quorum.go (2)
  • KeyName (62-62)
  • GetDefaultQuorumManager (50-59)
atlas/consensus/consensus.pb.go (35)
  • KVChange (1186-1199)
  • KVChange (1212-1212)
  • KVChange (1227-1229)
  • KVChange_Set (1296-1298)
  • KVChange_Set (1320-1320)
  • SetChange (690-696)
  • SetChange (709-709)
  • SetChange (724-726)
  • WriteKeyRequest (2706-2713)
  • WriteKeyRequest (2726-2726)
  • WriteKeyRequest (2741-2743)
  • Table (1764-1778)
  • Table (1791-1791)
  • Table (1806-1808)
  • KVChange_Acl (1308-1310)
  • KVChange_Acl (1326-1326)
  • ReadKeyRequest (2481-2487)
  • ReadKeyRequest (2500-2500)
  • ReadKeyRequest (2515-2517)
  • KVChange_Del (1300-1302)
  • KVChange_Del (1322-1322)
  • DelChange (832-837)
  • DelChange (850-850)
  • DelChange (865-867)
  • PrefixScanRequest (2594-2600)
  • PrefixScanRequest (2613-2613)
  • PrefixScanRequest (2628-2630)
  • Subscribe (996-1003)
  • Subscribe (1016-1016)
  • Subscribe (1031-1033)
  • SubscribeOptions (928-936)
  • SubscribeOptions (949-949)
  • SubscribeOptions (964-966)
  • KVChange_Sub (1312-1314)
  • KVChange_Sub (1328-1328)
atlas/commands/scan_commands.go (2)
atlas/kv.go (1)
  • PrefixScan (236-254)
atlas/consensus/quorum.go (1)
  • KeyName (62-62)
atlas/consensus/migration-repository-kv_test.go (2)
atlas/consensus/quorum.go (1)
  • KeyName (62-62)
atlas/consensus/consensus.pb.go (9)
  • Migration (1442-1453)
  • Migration (1466-1466)
  • Migration (1481-1483)
  • MigrationVersion (196-204)
  • MigrationVersion (217-217)
  • MigrationVersion (232-234)
  • Table (1764-1778)
  • Table (1791-1791)
  • Table (1806-1808)
atlas/consensus/table-repository.go (2)
atlas/consensus/quorum.go (1)
  • KeyName (62-62)
atlas/consensus/consensus.pb.go (6)
  • Table (1764-1778)
  • Table (1791-1791)
  • Table (1806-1808)
  • TableGroup (1880-1886)
  • TableGroup (1899-1899)
  • TableGroup (1914-1916)
atlas/consensus/table-repository-kv_test.go (1)
atlas/consensus/quorum.go (1)
  • KeyName (62-62)
atlas/consensus/subscriptions.go (8)
atlas/consensus/repository.go (2)
  • Key (30-33)
  • Prefix (49-51)
atlas/consensus/consensus.pb.go (43)
  • Notify (1056-1070)
  • Notify (1083-1083)
  • Notify (1098-1100)
  • Migration (1442-1453)
  • Migration (1466-1466)
  • Migration (1481-1483)
  • Subscribe (996-1003)
  • Subscribe (1016-1016)
  • Subscribe (1031-1033)
  • Notify_Set (1168-1170)
  • Notify_Set (1180-1180)
  • Notify_Acl (1176-1178)
  • Notify_Acl (1184-1184)
  • Notify_Del (1172-1174)
  • Notify_Del (1182-1182)
  • Table (1764-1778)
  • Table (1791-1791)
  • Table (1806-1808)
  • WriteKeyRequest (2706-2713)
  • WriteKeyRequest (2726-2726)
  • WriteKeyRequest (2741-2743)
  • Migration_Data (1534-1536)
  • Migration_Data (1544-1544)
  • KVChange_Set (1296-1298)
  • KVChange_Set (1320-1320)
  • KVChange_Notify (1316-1318)
  • KVChange_Notify (1330-1330)
  • KVChange_Del (1300-1302)
  • KVChange_Del (1322-1322)
  • KVChange_Acl (1308-1310)
  • KVChange_Acl (1326-1326)
  • Migration_None (1538-1540)
  • Migration_None (1546-1546)
  • Migration_Schema (1530-1532)
  • Migration_Schema (1542-1542)
  • KVChange_Sub (1312-1314)
  • KVChange_Sub (1328-1328)
  • SubscriptionList (876-882)
  • SubscriptionList (895-895)
  • SubscriptionList (910-912)
  • KVChange (1186-1199)
  • KVChange (1212-1212)
  • KVChange (1227-1229)
atlas/trie/trie.go (2)
  • Trie (23-28)
  • New (41-45)
atlas/kv.go (3)
  • Subscribe (262-286)
  • GetKey (183-201)
  • WriteKey (30-48)
atlas/consensus/quorum.go (1)
  • GetDefaultQuorumManager (50-59)
atlas/kv/encoding.go (2)
  • NewKeyBuilder (42-49)
  • KeyBuilder (28-39)
atlas/kv/pool.go (1)
  • GetPool (65-69)
atlas/kv/store.go (1)
  • ErrKeyNotFound (96-96)
atlas/consensus/majority-quorum.go (7)
atlas/kv.go (2)
  • GetKey (183-201)
  • WriteKey (30-48)
atlas/options/options.go (1)
  • Logger (28-28)
atlas/consensus/quorum.go (2)
  • GetDefaultQuorumManager (50-59)
  • QuorumNode (162-166)
atlas/consensus/consensus.pb.go (12)
  • WriteKeyResponse (2766-2772)
  • WriteKeyResponse (2785-2785)
  • WriteKeyResponse (2800-2802)
  • KVChange_Acl (1308-1310)
  • KVChange_Acl (1326-1326)
  • Record (556-566)
  • Record (579-579)
  • Record (594-596)
  • KVChange_Notify (1316-1318)
  • KVChange_Notify (1330-1330)
  • KVChange_Sub (1312-1314)
  • KVChange_Sub (1328-1328)
atlas/consensus/server.go (1)
  • NewServer (48-50)
atlas/kv/pool.go (1)
  • GetPool (65-69)
atlas/kv/store.go (1)
  • ErrKeyNotFound (96-96)
atlas/consensus/server_acl_test.go (1)
atlas/consensus/quorum.go (1)
  • KeyName (62-62)
atlas/commands/sub-command.go (5)
atlas/commands/string.go (2)
  • CommandString (44-51)
  • Command (29-42)
atlas/kv.go (2)
  • Subscribe (262-286)
  • SubscribeOptions (256-260)
atlas/consensus/consensus.pb.go (6)
  • Subscribe (996-1003)
  • Subscribe (1016-1016)
  • Subscribe (1031-1033)
  • SubscribeOptions (928-936)
  • SubscribeOptions (949-949)
  • SubscribeOptions (964-966)
atlas/consensus/quorum.go (1)
  • KeyName (62-62)
atlas/options/options.go (1)
  • Logger (28-28)
atlas/consensus/broadcastQuorum.go (4)
atlas/consensus/quorum.go (2)
  • RegionName (61-61)
  • QuorumNode (162-166)
atlas/consensus/consensus.pb.go (9)
  • Node (2178-2188)
  • Node (2201-2201)
  • Node (2216-2218)
  • PrefixScanRequest (2594-2600)
  • PrefixScanRequest (2613-2613)
  • PrefixScanRequest (2628-2630)
  • PrefixScanResponse (2646-2653)
  • PrefixScanResponse (2666-2666)
  • PrefixScanResponse (2681-2683)
atlas/kv.go (1)
  • PrefixScan (236-254)
atlas/options/options.go (1)
  • Logger (28-28)
atlas/consensus/table-repository-kv.go (3)
atlas/consensus/quorum.go (1)
  • KeyName (62-62)
atlas/consensus/consensus.pb.go (7)
  • Table (1764-1778)
  • Table (1791-1791)
  • Table (1806-1808)
  • ReplicationLevel (43-43)
  • ReplicationLevel (75-77)
  • ReplicationLevel (79-81)
  • ReplicationLevel (88-90)
atlas/kv/encoding.go (1)
  • NewKeyBuilder (42-49)
🪛 GitHub Check: lint
atlas/consensus/named_locks.go

[failure] 54-54:
func (*namedLock).release is unused (unused)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Analyze (go)
🔇 Additional comments (27)
atlas/commands/quorum-command.go (1)

30-30: LGTM: Consistent KeyName migration.

The update to wrap table in consensus.KeyName() aligns with the broader migration to typed table identifiers throughout the codebase.

atlas/consensus/server_acl_test.go (2)

1-17: LGTM: Standard license header.

The license header follows the project's standard AGPL-3.0 format.


58-58: LGTM: Consistent KeyName usage.

The update to use KeyName("user.table") for the table name aligns with the broader migration to typed identifiers.

atlas/consensus/migration-repository.go (4)

38-44: LGTM: Interface updated for KeyName.

The migration of CommitAllMigrations and GetNextVersion to accept KeyName instead of string is consistent with the broader typed identifier migration across the codebase.


113-116: LGTM: Correct byte-based key construction.

The update to accept table []byte and use AppendBytes(table) is the appropriate way to handle byte-slice table identifiers in key construction.


226-226: LGTM: Implementation matches updated interface.

The method signature correctly implements the updated MigrationRepository interface with KeyName parameter.


263-263: LGTM: Implementation matches updated interface.

The method signature correctly implements the updated MigrationRepository interface with KeyName parameter.

atlas/commands/scan_commands.go (2)

26-26: LGTM: PrefixScan migrated to KeyName.

The update to use consensus.KeyName(prefix) is consistent with the broader migration from string-based to typed key identifiers.


38-38: LGTM: Correct use of bytes.Join for byte slice keys.

Using bytes.Join is appropriate since keys is now [][]byte (byte-based keys), replacing the previous string-based approach.

integration-tests/harness/cluster.go (2)

274-274: LGTM: Consistent testing context usage.

This call site correctly provides the *testing.T parameter to NewNode, consistent with the updated constructor signature.


120-120: *All NewNode calls updated with testing.T The *testing.T parameter is passed at both cluster.go:120 and 274, and no other invocations exist.

atlas/commands/acl-command.go (3)

26-26: LGTM: Import updated for KeyName migration.

The import change from atlas/kv to atlas/consensus supports the migration to consensus.KeyName for key identifiers.


61-70: LGTM: Consistent KeyName usage in ACL grant.

The update to use consensus.KeyName(keyName) for key construction is consistent with the broader migration. The variable renaming from tableKey to keyName improves clarity.


119-119: LGTM: Consistent KeyName usage in ACL revoke.

The key construction using consensus.KeyName(tableKey) aligns with the updated ACL API signatures.

atlas/consensus/table-repository.go (2)

23-23: LGTM: GetTable interface updated for KeyName.

The signature change to accept KeyName instead of string aligns with the typed identifier migration throughout the consensus package.


33-33: LGTM: GetGroup interface updated for KeyName.

The signature change to accept KeyName instead of string maintains consistency with the GetTable update and the broader KeyName migration.

atlas/commands/key-command.go (1)

69-70: Downstream APIs accept consensus.KeyName
WriteKey, GetKey, and DeleteKey signatures have been updated to use consensus.KeyName.

atlas/consensus/table-repository-kv.go (1)

75-77: LGTM: KeyName migration implemented consistently.

The migration from string-based table names to byte-based KeyName is handled correctly throughout:

  • GetTable and GetGroup signatures updated to accept KeyName
  • Key construction uses AppendBytes for binary identifiers
  • Comparisons use bytes.Equal instead of string equality
  • Shard operations use bytes.Join for name assembly

The implementation is consistent and maintains correctness.

Also applies to: 255-255, 305-305, 344-344, 353-353, 362-362, 375-375, 384-384, 393-393, 402-423, 445-445, 450-450, 514-515, 539-539, 570-570, 608-608

atlas/consensus/consensus.proto (1)

45-45: LGTM: Proto schema changes support KeyName migration and subscription system.

The protobuf schema updates correctly introduce:

  • Subscription/notification messages (SubscriptionList, Subscribe, SubscribeOptions, Notify)
  • Binary identifier fields (e.g., MigrationVersion.tableName, Table.name, ReadKeyRequest.keybytes)
  • New KVChange variants (Subscribe sub = 5, Notify notify = 6)

The schema is consistent with the KeyName-based refactor and notification subsystem introduced across the codebase.

Also applies to: 106-134, 142-143, 201-201, 282-282, 294-294, 299-299, 305-305

atlas/consensus/subscriptions.go (2)

306-413: LGTM: Magic key handling implements subscription persistence and notification cascade.

The maybeHandleMagicKey method correctly handles:

  1. Subscriptions (SUB) (lines 336-370): Persists to meta store, inserts into trie for prefix matching, commits atomically
  2. Notifications (NOTIFY) (lines 371-406): Loads subscription list, deduplicates using version hash log (lines 458-462), sends to matching prefixes, cascades to lower-level buckets

The deduplication log (lines 464-468) prevents reprocessing and is bounded at 100 entries.

Observability note: The cascade logic (lines 415-444) writes notifications to exponentially smaller prefix buckets, ensuring subscribers at any prefix level receive updates. This implements at-least-once delivery semantics, relying on the version hash log to prevent duplicates.

Also applies to: 446-505


85-202: LGTM: Notification batching implements efficient delivery with retry logic.

The notification delivery mechanism correctly implements:

  • Batching: Per-URL queues with 100-item or 100ms thresholds (lines 105-120)
  • Concurrency control: Per-URL synchronization prevents simultaneous batches to the same endpoint (lines 93-101)
  • Retry logic: Configurable attempts with exponential backoff (lines 172-197)
  • Logging: Success/failure tracking for observability (lines 187-191)

The batching strategy balances delivery latency (100ms maximum) with efficiency (up to 100 notifications per batch).

atlas/consensus/server.go (1)

346-365: Subscription persistence and notification patterns are correct.

maybeHandleMagicKey skips “magic”‐key migrations, so the SUB case in server.go only runs once and writes via Notify. Async notifications for Set/Del/Acl run in goroutines and log errors via Logger.Error. No duplicate writes or blocking/race issues detected.

atlas/consensus/table-repository-kv_test.go (2)

172-173: KeyName usage and string conversions look correct.

Casting []byte names to string for comparisons is appropriate in tests; assert.Equal on []byte elsewhere is fine (uses deep-equal).

Also applies to: 208-209, 284-285, 336-344, 392-393


195-203: Index cleanup is implemented; test assertion will pass
The UpdateTable method invokes removeTableIndex on the existing table (old replication level) before re‐indexing, so stale regional indexes are cleaned up and the assert.Len(…, 0) check will succeed.

atlas/consensus/migration-repository-kv_test.go (1)

37-41: Adaptation to KeyName looks consistent.

All updated fields and calls use KeyName ([]byte) correctly; assertions compare byte slices or structs as expected. No issues found.

Also applies to: 45-50, 57-64, 76-80, 92-100, 140-147, 186-194, 223-231, 276-284, 293-301, 328-336, 366-377, 407-415

atlas/consensus/subscriptions_test.go (1)

768-773: Integer range loops supported (Go 1.25.2)
go.mod specifies Go 1.25.2, so integer range loops are valid. No changes needed.

atlas/consensus/majority-quorum.go (1)

334-338: Do not remove the ErrStealTableOwnershipFailed check; StealTableOwnership returns that error when proposals are rejected, and we must treat it non-critically here.

Likely an incorrect or invalid review comment.

Comment on lines +118 to +133
err = atlas.Subscribe(ctx, consensus.KeyName(parsed.Prefix), parsed.URL, atlas.SubscribeOptions{
RetryAttempts: int(parsed.RetryAttempts),
RetryAfterBase: parsed.RetryAfterBase,
Auth: parsed.Auth,
})
if err != nil {
return nil, err
}

options.Logger.Info("created subscription",
zap.String("prefix", parsed.Prefix),
zap.String("url", parsed.URL),
zap.Bool("batch", parsed.Batch),
zap.Int32("retry_attempts", parsed.RetryAttempts),
zap.Duration("retry_after_base", parsed.RetryAfterBase))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Propagate the parsed Batch flag into SubscribeOptions

We parse NOBATCH into parsed.Batch, log it, then drop it on the floor—atlas.Subscribe is always invoked with the default batching behavior. Please extend atlas.SubscribeOptions to carry a batch flag and set it here so SUB … NOBATCH actually changes behavior.

-	err = atlas.Subscribe(ctx, consensus.KeyName(parsed.Prefix), parsed.URL, atlas.SubscribeOptions{
-		RetryAttempts:  int(parsed.RetryAttempts),
-		RetryAfterBase: parsed.RetryAfterBase,
-		Auth:           parsed.Auth,
-	})
+	err = atlas.Subscribe(ctx, consensus.KeyName(parsed.Prefix), parsed.URL, atlas.SubscribeOptions{
+		Batch:          parsed.Batch,
+		RetryAttempts:  int(parsed.RetryAttempts),
+		RetryAfterBase: parsed.RetryAfterBase,
+		Auth:           parsed.Auth,
+	})

Be sure to update atlas.SubscribeOptions/atlas.Subscribe accordingly so the consensus layer receives the correct batch value.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
err = atlas.Subscribe(ctx, consensus.KeyName(parsed.Prefix), parsed.URL, atlas.SubscribeOptions{
RetryAttempts: int(parsed.RetryAttempts),
RetryAfterBase: parsed.RetryAfterBase,
Auth: parsed.Auth,
})
if err != nil {
return nil, err
}
options.Logger.Info("created subscription",
zap.String("prefix", parsed.Prefix),
zap.String("url", parsed.URL),
zap.Bool("batch", parsed.Batch),
zap.Int32("retry_attempts", parsed.RetryAttempts),
zap.Duration("retry_after_base", parsed.RetryAfterBase))
err = atlas.Subscribe(ctx, consensus.KeyName(parsed.Prefix), parsed.URL, atlas.SubscribeOptions{
Batch: parsed.Batch,
RetryAttempts: int(parsed.RetryAttempts),
RetryAfterBase: parsed.RetryAfterBase,
Auth: parsed.Auth,
})
if err != nil {
return nil, err
}
🤖 Prompt for AI Agents
In atlas/commands/sub-command.go around lines 118 to 133, the parsed Batch flag
is logged but not forwarded into atlas.Subscribe; modify the call to pass the
batch value (e.g., Batch: parsed.Batch) in the atlas.SubscribeOptions so the
subscription honors NOBATCH, and then update the atlas.SubscribeOptions struct
definition and atlas.Subscribe signature/implementation in the atlas/ package to
include and propagate this boolean to the consensus layer so the batch behavior
is respected end-to-end.

Comment on lines +433 to 468
case *KVChange_Acl:
key := op.Acl.GetKey()
var record Record
store := kv.GetPool().DataStore()
val, err := store.Get(ctx, key)
if err != nil && errors.Is(err, kv.ErrKeyNotFound) {
break
}
if err != nil {
return &WriteKeyResponse{
Success: false,
Error: fmt.Sprintf("failed to get key: %v", err),
}, nil
}
err = proto.Unmarshal(val, &record)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal record: %v", err)
}
if isOwner(ctx, &record) {
break
}
return &WriteKeyResponse{
Success: false,
Error: "principal isn't allowed to modify ACLs for this key",
}, nil
case *KVChange_Notify:
// Notifications are internal system operations that bypass ACL checks
// They are written to magic keys for subscription processing
break
case *KVChange_Sub:
// Subscriptions are internal system operations that bypass ACL checks
// They are written to magic keys for subscription storage
break
default:
panic("unknown operation type")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid panic on unknown operation types; return a proper error response.

Panics in quorum code can crash the server.

-	default:
-		panic("unknown operation type")
+	default:
+		return &WriteKeyResponse{Success: false, Error: "unknown operation type"}, nil
🤖 Prompt for AI Agents
In atlas/consensus/majority-quorum.go around lines 433 to 468, the default
branch currently calls panic("unknown operation type"); replace it with a
non-panicking error return: construct and return a WriteKeyResponse with
Success:false and a clear Error message (include the unexpected op type using %T
or similar) and a nil error value so the caller can handle it; optionally log
the unexpected type before returning. This prevents server crashes from
malformed or unrecognized operations and keeps behavior consistent with other
error paths.

Comment on lines +44 to +52
func (n *namedLock) addRef() error {
n.mu.Lock()
defer n.mu.Unlock()
n.refs++
if n.refs == 0 {
return errors.New("lock already released")
}
return nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Harden addRef against resurrecting released locks.

Incrementing first then checking n.refs == 0 allows a transition from -1 → 0 and leaves the lock in an odd state. Check negative before incrementing.

 func (n *namedLock) addRef() error {
 	n.mu.Lock()
 	defer n.mu.Unlock()
-	n.refs++
-	if n.refs == 0 {
-		return errors.New("lock already released")
-	}
+	if n.refs < 0 {
+		return errors.New("lock already released")
+	}
+	n.refs++
 	return nil
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (n *namedLock) addRef() error {
n.mu.Lock()
defer n.mu.Unlock()
n.refs++
if n.refs == 0 {
return errors.New("lock already released")
}
return nil
}
func (n *namedLock) addRef() error {
n.mu.Lock()
defer n.mu.Unlock()
if n.refs < 0 {
return errors.New("lock already released")
}
n.refs++
return nil
}
🤖 Prompt for AI Agents
In atlas/consensus/named_locks.go around lines 44–52, addRef currently
increments n.refs before validating which allows a transition from -1→0 and
leaves the lock in an invalid state; change the logic to check for negative refs
first and return an error if n.refs < 0, then increment n.refs under the same
lock and return nil—i.e., acquire n.mu, if n.refs < 0 return errors.New("lock
already released"), otherwise n.refs++ and return nil.

Comment on lines +54 to +64
func (n *namedLock) release() {
n.mu.Lock()
n.refs--
if n.refs == 0 {
n.refs = -1
n.mu.Unlock()
n.onRelease()
} else {
n.mu.Unlock()
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Unused method (*namedLock).release breaks lint; either use it or remove it.

Lint failure flags this as unused. Prefer using it inside namedLocker.unlock to centralize ref-count cleanup and map deletion through the existing onRelease hook.

Apply this diff to unlock to use release (and fix the lint error):

 func (l *namedLocker) unlock(name string) {
-	l.mu.Lock()
-	lock, ok := l.locks[name]
-	if !ok {
-		l.mu.Unlock()
-		return
-	}
-
-	// Decrement refs while holding l.mu
-	lock.mu.Lock()
-	lock.refs--
-	shouldCleanup := lock.refs == 0
-	if shouldCleanup {
-		lock.refs = -1
-		delete(l.locks, name)
-	}
-	lock.mu.Unlock()
-	l.mu.Unlock()
-
-	lock.lock.Unlock()
+	l.mu.Lock()
+	lock, ok := l.locks[name]
+	l.mu.Unlock()
+	if !ok {
+		return
+	}
+	// release() handles ref-counting and calls onRelease to delete from the map.
+	lock.release()
+	lock.lock.Unlock()
 }
🧰 Tools
🪛 GitHub Check: lint

[failure] 54-54:
func (*namedLock).release is unused (unused)

Comment on lines +341 to +346
client := &http.Client{Timeout: 2 * time.Second}

for retries := sub.GetOptions().GetRetryAttempts(); retries > 0; retries-- {
req, err := http.NewRequest("POST", sub.GetUrl(), jsonReader{data: bodyBytes})
require.NoError(t, err)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Update jsonReader call sites to use pointer.

Change both NewRequest calls to pass &jsonReader{data: bodyBytes} as per the reader fix.

Also applies to: 835-837

🤖 Prompt for AI Agents
In atlas/consensus/subscriptions_test.go around lines 341-346 (and also update
the other occurrence around lines 835-837), the http.NewRequest calls pass
jsonReader by value; update both call sites to pass a pointer instead (use
&jsonReader{data: bodyBytes}) so the request body implements io.ReadCloser
semantics expected by the fixed reader implementation. Ensure both POST request
constructions are changed to pass the address of the jsonReader.

Comment on lines +352 to +367
if batch {
cmd += " BATCH"
}

if retryAttempts > 0 {
cmd += fmt.Sprintf(" RETRY %d", retryAttempts)
}

if retryAfter > 0 {
cmd += fmt.Sprintf(" RETRY_AFTER %s", retryAfter.String())
}

if auth != "" {
cmd += fmt.Sprintf(" AUTH %s", auth)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Respect NOBATCH when building the SUB command

batch=false is currently ignored, so the client never emits the NOBATCH modifier that the server-side parser understands. As a result, callers cannot turn batching off. Please emit NOBATCH when batching is disabled.

-	if batch {
-		cmd += " BATCH"
-	}
+	if !batch {
+		cmd += " NOBATCH"
+	}
🤖 Prompt for AI Agents
In integration-tests/harness/client.go around lines 352 to 367, the code only
appends " BATCH" when batch is true and never emits the " NOBATCH" modifier, so
callers cannot disable batching; modify the logic to append " NOBATCH" when
batch is false (i.e., add an else branch that appends " NOBATCH" in the same
position/format as the existing " BATCH" append), leaving the retry and auth
appends unchanged.

Comment on lines +65 to +74
notificationMutex.Lock()
for _, notif := range notifs {
notifications[notif["event_id"].(string)] = notif
t.Logf("Received notification: %+v", notif)
}
notificationMutex.Unlock()

receivedNotifications.Add(int32(len(notifications)))
t.Logf("Received %d notifications (total: %d)", len(notifications), receivedNotifications.Load())

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Count only newly received notifications.

receivedNotifications.Add(int32(len(notifications))) bumps the counter by the total map size each time, so duplicates or retries inflate the tally and the test may pass before all unique events arrive. Track the delta instead (and guard the type assertion to avoid panics).

-	notificationMutex.Lock()
-	for _, notif := range notifs {
-		notifications[notif["event_id"].(string)] = notif
-		t.Logf("Received notification: %+v", notif)
-	}
-	notificationMutex.Unlock()
-
-	receivedNotifications.Add(int32(len(notifications)))
+	notificationMutex.Lock()
+	before := len(notifications)
+	for _, notif := range notifs {
+		eventID, ok := notif["event_id"].(string)
+		if !ok {
+			t.Logf("Notification missing string event_id: %+v", notif)
+			continue
+		}
+		notifications[eventID] = notif
+		t.Logf("Received notification: %+v", notif)
+	}
+	delta := len(notifications) - before
+	notificationMutex.Unlock()
+
+	if delta > 0 {
+		receivedNotifications.Add(int32(delta))
+	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
notificationMutex.Lock()
for _, notif := range notifs {
notifications[notif["event_id"].(string)] = notif
t.Logf("Received notification: %+v", notif)
}
notificationMutex.Unlock()
receivedNotifications.Add(int32(len(notifications)))
t.Logf("Received %d notifications (total: %d)", len(notifications), receivedNotifications.Load())
notificationMutex.Lock()
before := len(notifications)
for _, notif := range notifs {
eventID, ok := notif["event_id"].(string)
if !ok {
t.Logf("Notification missing string event_id: %+v", notif)
continue
}
notifications[eventID] = notif
t.Logf("Received notification: %+v", notif)
}
delta := len(notifications) - before
notificationMutex.Unlock()
if delta > 0 {
receivedNotifications.Add(int32(delta))
}
t.Logf("Received %d notifications (total: %d)", len(notifications), receivedNotifications.Load())
🤖 Prompt for AI Agents
In integration-tests/scenarios/notifications_test.go around lines 65 to 74, the
test increments receivedNotifications by the entire map size each callback which
double-counts already-seen notifications and can panic on unsafe type
assertions; change the loop to count only newly added events by checking if
notifications[evtID] already exists before inserting, increment a local newCount
for each successful new insert, guard the map key extraction with a safe type
assertion (ok pattern) and skip non-string keys, then call
receivedNotifications.Add(int32(newCount)) and log the delta and new total.

Comment on lines +107 to +113
for range 10 {
_ = node0.Client().Subscribe("user.", server.URL)
if err == nil {
break
}
time.Sleep(1 * time.Second)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Actually check the Subscribe error.

The retry loop breaks on the outer err variable, but the call result is discarded, so the loop exits immediately and the test never verifies the subscription succeeded. Capture the return value and assert success after the loop.

-	for range 10 {
-		_ = node0.Client().Subscribe("user.", server.URL)
-		if err == nil {
-			break
-		}
-		time.Sleep(1 * time.Second)
-	}
+	for range 10 {
+		err = node0.Client().Subscribe("user.", server.URL)
+		if err == nil {
+			break
+		}
+		time.Sleep(1 * time.Second)
+	}
+	require.NoError(t, err, "Failed to subscribe to notifications")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for range 10 {
_ = node0.Client().Subscribe("user.", server.URL)
if err == nil {
break
}
time.Sleep(1 * time.Second)
}
for range 10 {
err = node0.Client().Subscribe("user.", server.URL)
if err == nil {
break
}
time.Sleep(1 * time.Second)
}
require.NoError(t, err, "Failed to subscribe to notifications")
🤖 Prompt for AI Agents
In integration-tests/scenarios/notifications_test.go around lines 107 to 113,
the Subscribe call discards its error and the loop checks an unrelated outer
variable; change the loop to capture the return value (err :=
node0.Client().Subscribe("user.", server.URL)) on each iteration, retry up to 10
times (for i := 0; i < 10; i++), break when err == nil, and after the loop
assert or fail the test if err != nil so the test verifies the subscription
actually succeeded.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 19

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (7)
atlas/kv/encoding.go (2)

80-96: Fix off-by-one and parsing for table version.

  • Use i+1 < len(parts) before accessing parts[i+1].
  • For keyTableVersion, increment i and continue to avoid misclassifying tokens as extra.
-	if len(parts) >= i+1 {
+	if i+1 < len(parts) {
@@
-			if string(parts[i]) == keyTableVersion {
+			if string(parts[i]) == keyTableVersion {
 				var err error
 				builder.tableVersion, err = strconv.ParseInt(string(parts[i+1]), 10, 64)
 				if err != nil {
 					panic("invalid table version")
 				}
-			}
+				i += 1
+				continue
+			}

Also applies to: 114-121


253-261: Clone omits fields (isIndex, isUncommitted, migration, versions, node) and aliases slices.

Copy all fields and deep-copy slices to avoid shared mutation.

-func (kb *KeyBuilder) Clone() *KeyBuilder {
-	return &KeyBuilder{
-		isMeta: kb.isMeta,
-		table:  kb.table,
-		row:    kb.row,
-		extra:  kb.extra,
-	}
-}
+func (kb *KeyBuilder) Clone() *KeyBuilder {
+	// deep copy extra
+	extra := make([][]byte, len(kb.extra))
+	for i := range kb.extra {
+		if kb.extra[i] != nil {
+			cp := make([]byte, len(kb.extra[i]))
+			copy(cp, kb.extra[i])
+			extra[i] = cp
+		}
+	}
+	var migrationTableCopy []byte
+	if kb.migrationTable != nil {
+		migrationTableCopy = make([]byte, len(kb.migrationTable))
+		copy(migrationTableCopy, kb.migrationTable)
+	}
+	return &KeyBuilder{
+		isMeta:         kb.isMeta,
+		isIndex:        kb.isIndex,
+		isUncommitted:  kb.isUncommitted,
+		table:          kb.table,
+		row:            kb.row,
+		extra:          extra,
+		migrationTable: migrationTableCopy,
+		migrationVersion: kb.migrationVersion,
+		tableVersion:     kb.tableVersion,
+		node:             kb.node,
+	}
+}
atlas/consensus/quorum.go (3)

448-460: Fix Q1 assembly: invalid range over int64 and use filtered nodes (not q.nodes)

  • for i := range nodesPerQ1Region won’t compile (cannot range over int64).
  • Using q.nodes[region] ignores the earlier healthy-node filtering and table-region filtering done on nodes.
-	for _, region := range farRegions {
-		if int64(len(q.nodes[region])) < nodesPerQ1Region {
+	for _, region := range farRegions {
+		if int64(len(nodes[region])) < nodesPerQ1Region {
 			// this region cannot be selected, so we skip it
 			continue
 		}
 		if int64(len(selectedQ1Regions)) >= q1RegionCount {
 			// we have enough regions, so we can stop
 			break
 		}
 
 		selectedQ1Regions = append(selectedQ1Regions, region)
 	}
@@
-	for _, region := range selectedQ1Regions {
-		for i := range nodesPerQ1Region {
-			q1 = append(q1, q.nodes[region][i])
-		}
-	}
+	for _, region := range selectedQ1Regions {
+		for i := int64(0); i < nodesPerQ1Region; i++ {
+			q1 = append(q1, nodes[region][int(i)])
+		}
+	}
@@
-	for _, region := range selectedQ2Regions {
-		for i := int64(0); i < q.calculateLfn(nodes, region, Fn); i++ {
-			q2 = append(q2, q.nodes[region][i])
-		}
-	}
+	for _, region := range selectedQ2Regions {
+		ln := q.calculateLfn(nodes, region, Fn)
+		for i := int64(0); i < ln; i++ {
+			q2 = append(q2, nodes[region][int(i]])
+		}
+	}

Also applies to: 518-528


503-509: Infinite loop risk: decrement Fz but never assign it

When Fmax < 0 || Fmin < 0, next := Fz - 1 is computed but Fz isn’t updated, so the goto recalculate loop never changes state.

 	if Fmax < 0 || Fmin < 0 {
 		// we cannot form a quorum with the current settings, so we need to reduce Fz
 		next := Fz - 1
 		if next < 0 {
 			return nil, errors.New("unable to form a quorum")
 		}
-		goto recalculate
+		Fz = next
+		goto recalculate
 	}

710-716: Diagnostic Q1 assembly: invalid range over int64

Same compile error as the main path. Use a counted loop and guard index type.

-	for _, region := range selectedQ1Regions {
-		for i := range nodesPerQ1Region {
-			if i < int64(len(nodesCopy[region])) {
-				q1Nodes = append(q1Nodes, nodesCopy[region][i])
-			}
-		}
-	}
+	for _, region := range selectedQ1Regions {
+		for i := int64(0); i < nodesPerQ1Region && i < int64(len(nodesCopy[region])); i++ {
+			q1Nodes = append(q1Nodes, nodesCopy[region][int(i)])
+		}
+	}
atlas/consensus/majority-quorum.go (1)

227-236: Route by table, not full key
tr.GetTable(in.GetKey()) treats the entire key as a table, exploding metadata. Extract the table segment (the prefix before the row delimiter) and pass that to tr.GetTable. Since there’s no built-in parser for raw keys, implement a helper (or extend KeyBuilder) to split a key into (table, row) and use it here. Apply the same change at lines 238–249.

atlas/consensus/table-repository-kv.go (1)

109-137: GetTablesBatch uses table name as KV key; build meta key

txn.Get(ctx, name) uses the raw name, not meta:table:{name}. This will fail to fetch tables.

Apply this fix:

 for i, name := range names {
-    data, err := txn.Get(r.ctx, name)
+    key := kv.NewKeyBuilder().Meta().Append("table").AppendBytes(name).Build()
+    data, err := txn.Get(r.ctx, key)
     if err != nil {
         if errors.Is(err, kv.ErrKeyNotFound) {
             // Keep nil in results[i]
             continue
         }
-        return nil, fmt.Errorf("failed to get table %s: %w", name, err)
+        return nil, fmt.Errorf("failed to get table %s: %w", name, err)
     }
♻️ Duplicate comments (4)
atlas/consensus/subscriptions.go (1)

172-197: Set Content-Type/Authorization and don’t abort retries on transient errors

Headers are missing, and return on error exits the retry loop. Set headers and continue to retry. Also guard nil options.

-					for retries := next.sub.GetOptions().GetRetryAttempts(); retries > 0; retries-- {
+					opts := next.sub.GetOptions()
+					var attempts int32
+					if opts != nil && opts.GetRetryAttempts() > 0 {
+						attempts = opts.GetRetryAttempts()
+					} else {
+						attempts = 3
+					}
+					for retries := attempts; retries > 0; retries-- {
 						body := bytes.NewReader(bodyBytes)
 
 						req, err := http.NewRequest("POST", next.sub.GetUrl(), body)
 						if err != nil {
 							options.Logger.Error("failed to create notification request", zap.Error(err))
-							return
+							continue
 						}
+						req.Header.Set("Content-Type", "application/json")
+						if opts != nil && opts.GetAuth() != "" {
+							req.Header.Set("Authorization", opts.GetAuth())
+						}
 
 						resp, err := client.Do(req)
 						if err != nil {
 							options.Logger.Error("failed to send notification", zap.Error(err))
-							return
+							continue
 						}
 						_ = resp.Body.Close()
 						options.Logger.Info("sent notification", zap.Int("status_code", resp.StatusCode))
 						if resp.StatusCode == http.StatusOK {
 							return
 						}
 						options.Logger.Warn("failed to send notification", zap.Int("status_code", resp.StatusCode))
-						retryBase := next.sub.GetOptions().RetryAfterBase.AsDuration()
+						var retryBase time.Duration
+						if opts != nil && opts.GetRetryAfterBase() != nil {
+							retryBase = opts.RetryAfterBase.AsDuration()
+						}
 						if retryBase == 0 {
 							retryBase = 100 * time.Millisecond
 						}
-						time.Sleep(retryBase * time.Duration(next.sub.GetOptions().GetRetryAttempts()-retries+1))
+						time.Sleep(retryBase * time.Duration(attempts-retries+1))
 					}
atlas/kv.go (1)

50-61: Routing by full key as the table breaks ownership/metadata semantics

You pass the full key as Table (and to GetQuorum) for writes, reads, and deletes. This makes every key its own “table”, bypassing table-level config/replication and causing metadata bloat.

Action:

  • Derive a table identifier from the key and use that for quorum routing and WriteKeyRequest.Table. Keep the record’s logical key in the KVChange.

Sketch:

// New helper (suggested)
func tableFromKey(key consensus.KeyName) consensus.KeyName {
  // parse up to the table delimiter used by KeyBuilder; implement centrally
}

// Then:
tbl := tableFromKey(key)
q, err := qm.GetQuorum(ctx, tbl)
...
Table: tbl,

I can generate a concrete helper once the key encoding/delimiter is confirmed. Would you like me to add it?

Also applies to: 183-194, 205-217

atlas/consensus/server.go (1)

364-368: SUB operations still don't persist subscriptions

As flagged in the previous review, KVChange_Sub operations call DefaultNotificationSender().Notify(migration) and return without persisting the subscription to any backing store. The maybeHandleMagicKey call at line 355 may handle this, but if it doesn't (or falls through for non-magic prefixes), subscriptions will be lost.

atlas/consensus/subscriptions_test.go (1)

296-303: jsonReader is non-progressing; make Read stateful and advance pos

Read doesn’t update pos and uses a value receiver, so streams never reach EOF. Use a pointer receiver and advance pos.

Apply this diff:

-type jsonReader struct {
+type jsonReader struct {
     data []byte
     pos  int
 }
 
-func (j jsonReader) Read(p []byte) (n int, err error) {
+func (j *jsonReader) Read(p []byte) (n int, err error) {
     if j.pos >= len(j.data) {
         return 0, io.EOF
     }
     n = copy(p, j.data[j.pos:])
-    //j.pos += n
-    return n, nil
+    j.pos += n
+    if j.pos >= len(j.data) {
+        return n, io.EOF
+    }
+    return n, nil
 }
🧹 Nitpick comments (9)
atlas/kv/encoding.go (1)

188-191: Consider copying input in AppendBytes to avoid aliasing.

If callers reuse/mutate the slice, keys may change unexpectedly. Copy for safety.

-func (kb *KeyBuilder) AppendBytes(part []byte) *KeyBuilder {
-	kb.extra = append(kb.extra, part)
+func (kb *KeyBuilder) AppendBytes(part []byte) *KeyBuilder {
+	cp := make([]byte, len(part))
+	copy(cp, part)
+	kb.extra = append(kb.extra, cp)
 	return kb
 }
atlas/consensus/subscriptions.go (2)

364-373: Protect subscriptions trie with a mutex (likely not concurrent-safe)

Insert occurs here while reads happen in sendNotification without locks. Guard both insert and read.

-			s.subscriptions.Insert(op.Sub.Prefix, op.Sub)
+			s.mu.Lock()
+			s.subscriptions.Insert(op.Sub.Prefix, op.Sub)
+			s.mu.Unlock()

And in send path:

-	for _, sub := range s.subscriptions.PrefixesOf(op.Notify.Key) {
+	s.mu.Lock()
+	subs := s.subscriptions.PrefixesOf(op.Notify.Key)
+	s.mu.Unlock()
+	for _, sub := range subs {

131-160: Optional: encode binary keys/prefixes safely in JSON

string([]byte) may produce invalid UTF‑8. Prefer base32/base64 for Key and Origin.

Example:

- Key:     string(n.pub.GetKey()),
+ Key:     encoder.EncodeToString(n.pub.GetKey()),
- Origin:  string(n.sub.GetPrefix()),
+ Origin:  encoder.EncodeToString(n.sub.GetPrefix()),

Also applies to: 506-507

atlas/consensus/majority-quorum.go (2)

347-359: Forwarding log level

Forwarding the write to the owner is expected; use Info/Debug rather than Error.

- options.Logger.Error("forwarding write key to owner", zap.Int64("owner_id", owner.GetId()))
+ options.Logger.Info("forwarding write key to owner", zap.Int64("owner_id", owner.GetId()))

232-236: Use NewServer() consistently

Construct Server with NewServer() to ensure internal fields (e.g., named locks) are initialized.

-	s := Server{}
+	s := NewServer()

Also applies to: 566-567

atlas/consensus/named_locks_test.go (1)

70-75: Avoid flaky timing; increase timeout or remove sleeps

100ms end-to-end with 3x 50ms sleeps is tight for CI. Bump timeout (e.g., 500ms–1s) or synchronize via channels/WaitGroups instead of Sleep to avoid flakes.

atlas/commands/sub-command.go (2)

64-67: Fix misleading comment

Comment says “Default to non-batched” but Batch is set to true. Update the comment to reflect actual default.


81-86: Validate RETRY/RETRY_AFTER inputs

Guard against negative attempts/durations to avoid disabling retries or backoff anomalies.

- attempts, err := strconv.ParseInt(attemptsStr, 10, 32)
+ attempts, err := strconv.ParseInt(attemptsStr, 10, 32)
  if err != nil {
      return nil, errors.New("RETRY requires valid number")
  }
- parsed.RetryAttempts = int32(attempts)
+ if attempts <= 0 {
+     return nil, errors.New("RETRY attempts must be > 0")
+ }
+ parsed.RetryAttempts = int32(attempts)
- dur, err := time.ParseDuration(durationStr)
+ dur, err := time.ParseDuration(durationStr)
  if err != nil {
      return nil, err
  }
- parsed.RetryAfterBase = dur
+ if dur <= 0 {
+     return nil, errors.New("RETRY_AFTER must be > 0")
+ }
+ parsed.RetryAfterBase = dur

Also applies to: 93-99

atlas/consensus/subscriptions_test.go (1)

629-636: Ensure options.CurrentOptions is initialized in tests

Directly writing options.CurrentOptions.ServerId assumes non-nil. Initialize it in test setup or guard to avoid panics on nil.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between ea6d17e and e2a556c.

⛔ Files ignored due to path filters (1)
  • atlas/consensus/consensus.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (37)
  • atlas/caddy/module/module.go (2 hunks)
  • atlas/commands/acl-command.go (3 hunks)
  • atlas/commands/key-command.go (2 hunks)
  • atlas/commands/key-command_test.go (4 hunks)
  • atlas/commands/quorum-command.go (1 hunks)
  • atlas/commands/scan_commands.go (3 hunks)
  • atlas/commands/sub-command.go (1 hunks)
  • atlas/consensus/broadcastQuorum.go (1 hunks)
  • atlas/consensus/consensus.proto (5 hunks)
  • atlas/consensus/data-repository.go (2 hunks)
  • atlas/consensus/majority-quorum.go (13 hunks)
  • atlas/consensus/migration-repository-kv_test.go (17 hunks)
  • atlas/consensus/migration-repository.go (4 hunks)
  • atlas/consensus/named_locks.go (1 hunks)
  • atlas/consensus/named_locks_test.go (1 hunks)
  • atlas/consensus/quorum-integration_test.go (4 hunks)
  • atlas/consensus/quorum.go (5 hunks)
  • atlas/consensus/server.go (23 hunks)
  • atlas/consensus/server_acl_test.go (2 hunks)
  • atlas/consensus/subscriptions.go (1 hunks)
  • atlas/consensus/subscriptions_test.go (1 hunks)
  • atlas/consensus/table-repository-kv.go (18 hunks)
  • atlas/consensus/table-repository-kv_test.go (20 hunks)
  • atlas/consensus/table-repository.go (2 hunks)
  • atlas/kv.go (11 hunks)
  • atlas/kv/badger.go (0 hunks)
  • atlas/kv/encoding.go (5 hunks)
  • atlas/kv/pool.go (1 hunks)
  • atlas/nodes.go (0 hunks)
  • atlas/operations/table.go (0 hunks)
  • atlas/operations/table_test.go (0 hunks)
  • atlas/pool.go (0 hunks)
  • atlas/test/util.go (0 hunks)
  • integration-tests/harness/client.go (1 hunks)
  • integration-tests/harness/cluster.go (2 hunks)
  • integration-tests/harness/node.go (4 hunks)
  • integration-tests/scenarios/notifications_test.go (1 hunks)
💤 Files with no reviewable changes (6)
  • atlas/test/util.go
  • atlas/kv/badger.go
  • atlas/operations/table.go
  • atlas/operations/table_test.go
  • atlas/pool.go
  • atlas/nodes.go
✅ Files skipped from review due to trivial changes (1)
  • atlas/kv/pool.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • atlas/caddy/module/module.go
🧰 Additional context used
🧬 Code graph analysis (23)
atlas/consensus/table-repository.go (2)
atlas/consensus/quorum.go (1)
  • KeyName (62-62)
atlas/consensus/consensus.pb.go (6)
  • Table (1764-1778)
  • Table (1791-1791)
  • Table (1806-1808)
  • TableGroup (1880-1886)
  • TableGroup (1899-1899)
  • TableGroup (1914-1916)
atlas/consensus/migration-repository.go (3)
atlas/consensus/quorum.go (1)
  • KeyName (62-62)
atlas/consensus/consensus.pb.go (6)
  • MigrationVersion (196-204)
  • MigrationVersion (217-217)
  • MigrationVersion (232-234)
  • Migration (1442-1453)
  • Migration (1466-1466)
  • Migration (1481-1483)
atlas/consensus/repository.go (1)
  • Prefix (49-51)
atlas/commands/quorum-command.go (1)
atlas/consensus/quorum.go (2)
  • DescribeQuorum (782-790)
  • KeyName (62-62)
atlas/consensus/data-repository.go (1)
atlas/consensus/consensus.pb.go (4)
  • KVChange_Notify (1316-1318)
  • KVChange_Notify (1330-1330)
  • KVChange_Sub (1312-1314)
  • KVChange_Sub (1328-1328)
integration-tests/scenarios/notifications_test.go (3)
integration-tests/harness/cluster.go (2)
  • NewCluster (72-129)
  • ClusterConfig (65-70)
atlas/kv.go (1)
  • Subscribe (262-286)
atlas/consensus/consensus.pb.go (3)
  • Subscribe (996-1003)
  • Subscribe (1016-1016)
  • Subscribe (1031-1033)
atlas/consensus/quorum-integration_test.go (1)
atlas/consensus/quorum.go (2)
  • DescribeQuorum (782-790)
  • KeyName (62-62)
atlas/commands/sub-command.go (5)
atlas/commands/string.go (2)
  • CommandString (44-51)
  • Command (29-42)
atlas/kv.go (2)
  • Subscribe (262-286)
  • SubscribeOptions (256-260)
atlas/consensus/consensus.pb.go (6)
  • Subscribe (996-1003)
  • Subscribe (1016-1016)
  • Subscribe (1031-1033)
  • SubscribeOptions (928-936)
  • SubscribeOptions (949-949)
  • SubscribeOptions (964-966)
atlas/consensus/quorum.go (1)
  • KeyName (62-62)
atlas/options/options.go (1)
  • Logger (28-28)
integration-tests/harness/cluster.go (1)
integration-tests/harness/node.go (1)
  • NewNode (50-69)
atlas/consensus/broadcastQuorum.go (4)
atlas/consensus/quorum.go (2)
  • RegionName (61-61)
  • QuorumNode (162-166)
atlas/consensus/consensus.pb.go (9)
  • Node (2178-2188)
  • Node (2201-2201)
  • Node (2216-2218)
  • PrefixScanRequest (2594-2600)
  • PrefixScanRequest (2613-2613)
  • PrefixScanRequest (2628-2630)
  • PrefixScanResponse (2646-2653)
  • PrefixScanResponse (2666-2666)
  • PrefixScanResponse (2681-2683)
atlas/kv.go (1)
  • PrefixScan (236-254)
atlas/options/options.go (1)
  • Logger (28-28)
atlas/consensus/table-repository-kv_test.go (1)
atlas/consensus/quorum.go (1)
  • KeyName (62-62)
atlas/consensus/migration-repository-kv_test.go (2)
atlas/consensus/quorum.go (1)
  • KeyName (62-62)
atlas/consensus/consensus.pb.go (9)
  • Migration (1442-1453)
  • Migration (1466-1466)
  • Migration (1481-1483)
  • MigrationVersion (196-204)
  • MigrationVersion (217-217)
  • MigrationVersion (232-234)
  • Table (1764-1778)
  • Table (1791-1791)
  • Table (1806-1808)
integration-tests/harness/client.go (2)
atlas/kv.go (1)
  • Subscribe (262-286)
atlas/consensus/consensus.pb.go (3)
  • Subscribe (996-1003)
  • Subscribe (1016-1016)
  • Subscribe (1031-1033)
integration-tests/harness/node.go (1)
integration-tests/harness/config.go (1)
  • NodeConfig (29-38)
atlas/kv.go (2)
atlas/consensus/quorum.go (2)
  • KeyName (62-62)
  • GetDefaultQuorumManager (50-59)
atlas/consensus/consensus.pb.go (38)
  • KVChange (1186-1199)
  • KVChange (1212-1212)
  • KVChange (1227-1229)
  • KVChange_Set (1296-1298)
  • KVChange_Set (1320-1320)
  • SetChange (690-696)
  • SetChange (709-709)
  • SetChange (724-726)
  • WriteKeyRequest (2706-2713)
  • WriteKeyRequest (2726-2726)
  • WriteKeyRequest (2741-2743)
  • Table (1764-1778)
  • Table (1791-1791)
  • Table (1806-1808)
  • KVChange_Acl (1308-1310)
  • KVChange_Acl (1326-1326)
  • AclChange (742-752)
  • AclChange (765-765)
  • AclChange (780-782)
  • ReadKeyRequest (2481-2487)
  • ReadKeyRequest (2500-2500)
  • ReadKeyRequest (2515-2517)
  • KVChange_Del (1300-1302)
  • KVChange_Del (1322-1322)
  • DelChange (832-837)
  • DelChange (850-850)
  • DelChange (865-867)
  • PrefixScanRequest (2594-2600)
  • PrefixScanRequest (2613-2613)
  • PrefixScanRequest (2628-2630)
  • Subscribe (996-1003)
  • Subscribe (1016-1016)
  • Subscribe (1031-1033)
  • SubscribeOptions (928-936)
  • SubscribeOptions (949-949)
  • SubscribeOptions (964-966)
  • KVChange_Sub (1312-1314)
  • KVChange_Sub (1328-1328)
atlas/commands/acl-command.go (1)
atlas/consensus/quorum.go (1)
  • KeyName (62-62)
atlas/consensus/subscriptions.go (7)
atlas/consensus/consensus.pb.go (43)
  • Notify (1056-1070)
  • Notify (1083-1083)
  • Notify (1098-1100)
  • Migration (1442-1453)
  • Migration (1466-1466)
  • Migration (1481-1483)
  • Subscribe (996-1003)
  • Subscribe (1016-1016)
  • Subscribe (1031-1033)
  • Notify_Set (1168-1170)
  • Notify_Set (1180-1180)
  • Notify_Acl (1176-1178)
  • Notify_Acl (1184-1184)
  • Notify_Del (1172-1174)
  • Notify_Del (1182-1182)
  • Table (1764-1778)
  • Table (1791-1791)
  • Table (1806-1808)
  • WriteKeyRequest (2706-2713)
  • WriteKeyRequest (2726-2726)
  • WriteKeyRequest (2741-2743)
  • Migration_Data (1534-1536)
  • Migration_Data (1544-1544)
  • KVChange_Set (1296-1298)
  • KVChange_Set (1320-1320)
  • KVChange_Notify (1316-1318)
  • KVChange_Notify (1330-1330)
  • KVChange_Del (1300-1302)
  • KVChange_Del (1322-1322)
  • KVChange_Acl (1308-1310)
  • KVChange_Acl (1326-1326)
  • Migration_None (1538-1540)
  • Migration_None (1546-1546)
  • Migration_Schema (1530-1532)
  • Migration_Schema (1542-1542)
  • KVChange_Sub (1312-1314)
  • KVChange_Sub (1328-1328)
  • SubscriptionList (876-882)
  • SubscriptionList (895-895)
  • SubscriptionList (910-912)
  • KVChange (1186-1199)
  • KVChange (1212-1212)
  • KVChange (1227-1229)
atlas/trie/trie.go (2)
  • Trie (23-28)
  • New (41-45)
atlas/options/options.go (1)
  • Logger (28-28)
atlas/consensus/quorum.go (1)
  • GetDefaultQuorumManager (50-59)
atlas/kv/encoding.go (2)
  • NewKeyBuilder (42-49)
  • KeyBuilder (28-39)
atlas/kv/pool.go (1)
  • GetPool (65-69)
atlas/kv/store.go (1)
  • ErrKeyNotFound (96-96)
atlas/consensus/subscriptions_test.go (5)
atlas/consensus/consensus.pb.go (71)
  • Migration (1442-1453)
  • Migration (1466-1466)
  • Migration (1481-1483)
  • MigrationVersion (196-204)
  • MigrationVersion (217-217)
  • MigrationVersion (232-234)
  • Migration_Data (1534-1536)
  • Migration_Data (1544-1544)
  • DataMigration (1332-1341)
  • DataMigration (1354-1354)
  • DataMigration (1369-1371)
  • DataMigration_Change (1400-1402)
  • DataMigration_Change (1404-1404)
  • KVChange (1186-1199)
  • KVChange (1212-1212)
  • KVChange (1227-1229)
  • KVChange_Set (1296-1298)
  • KVChange_Set (1320-1320)
  • SetChange (690-696)
  • SetChange (709-709)
  • SetChange (724-726)
  • Record (556-566)
  • Record (579-579)
  • Record (594-596)
  • Notify_Set (1168-1170)
  • Notify_Set (1180-1180)
  • KVChange_Del (1300-1302)
  • KVChange_Del (1322-1322)
  • DelChange (832-837)
  • DelChange (850-850)
  • DelChange (865-867)
  • Notify_Del (1172-1174)
  • Notify_Del (1182-1182)
  • KVChange_Acl (1308-1310)
  • KVChange_Acl (1326-1326)
  • AclChange (742-752)
  • AclChange (765-765)
  • AclChange (780-782)
  • AclChange_Addition (820-822)
  • AclChange_Addition (828-828)
  • ACL (444-451)
  • ACL (464-464)
  • ACL (479-481)
  • ACLData (2819-2826)
  • ACLData (2839-2839)
  • ACLData (2854-2856)
  • Notify_Acl (1176-1178)
  • Notify_Acl (1184-1184)
  • Migration_Schema (1530-1532)
  • Migration_Schema (1542-1542)
  • Subscribe (996-1003)
  • Subscribe (1016-1016)
  • Subscribe (1031-1033)
  • SubscribeOptions (928-936)
  • SubscribeOptions (949-949)
  • SubscribeOptions (964-966)
  • Notify (1056-1070)
  • Notify (1083-1083)
  • Notify (1098-1100)
  • Table (1764-1778)
  • Table (1791-1791)
  • Table (1806-1808)
  • Migration_None (1538-1540)
  • Migration_None (1546-1546)
  • KVChange_Sub (1312-1314)
  • KVChange_Sub (1328-1328)
  • SubscriptionList (876-882)
  • SubscriptionList (895-895)
  • SubscriptionList (910-912)
  • KVChange_Notify (1316-1318)
  • KVChange_Notify (1330-1330)
atlas/options/options.go (2)
  • Options (30-45)
  • CurrentOptions (71-71)
atlas/trie/trie.go (1)
  • New (41-45)
atlas/kv/pool.go (3)
  • CreatePool (42-62)
  • DrainPool (185-195)
  • GetPool (65-69)
atlas/kv/encoding.go (1)
  • NewKeyBuilder (42-49)
atlas/commands/scan_commands.go (2)
atlas/kv.go (1)
  • PrefixScan (236-254)
atlas/consensus/quorum.go (1)
  • KeyName (62-62)
atlas/commands/key-command.go (1)
atlas/consensus/quorum.go (1)
  • KeyName (62-62)
atlas/consensus/server.go (6)
atlas/consensus/quorum.go (1)
  • KeyName (62-62)
atlas/consensus/consensus_grpc.pb.go (13)
  • UnimplementedConsensusServer (197-197)
  • UnimplementedConsensusServer (199-201)
  • UnimplementedConsensusServer (202-204)
  • UnimplementedConsensusServer (205-207)
  • UnimplementedConsensusServer (208-210)
  • UnimplementedConsensusServer (211-213)
  • UnimplementedConsensusServer (214-216)
  • UnimplementedConsensusServer (217-219)
  • UnimplementedConsensusServer (220-222)
  • UnimplementedConsensusServer (223-225)
  • UnimplementedConsensusServer (226-228)
  • UnimplementedConsensusServer (229-229)
  • UnimplementedConsensusServer (230-230)
atlas/kv/store.go (2)
  • Store (26-48)
  • Transaction (51-55)
atlas/consensus/subscriptions.go (1)
  • DefaultNotificationSender (58-60)
atlas/consensus/node-repository.go (1)
  • NewNodeRepository (44-53)
atlas/kv.go (2)
  • GetKey (183-201)
  • PrefixScan (236-254)
atlas/consensus/server_acl_test.go (1)
atlas/consensus/quorum.go (1)
  • KeyName (62-62)
atlas/consensus/table-repository-kv.go (3)
atlas/consensus/quorum.go (1)
  • KeyName (62-62)
atlas/consensus/consensus.pb.go (10)
  • Table (1764-1778)
  • Table (1791-1791)
  • Table (1806-1808)
  • ReplicationLevel (43-43)
  • ReplicationLevel (75-77)
  • ReplicationLevel (79-81)
  • ReplicationLevel (88-90)
  • TableGroup (1880-1886)
  • TableGroup (1899-1899)
  • TableGroup (1914-1916)
atlas/kv/encoding.go (1)
  • NewKeyBuilder (42-49)
atlas/consensus/majority-quorum.go (7)
atlas/kv.go (2)
  • GetKey (183-201)
  • WriteKey (30-48)
atlas/options/options.go (1)
  • Logger (28-28)
atlas/consensus/quorum.go (2)
  • GetDefaultQuorumManager (50-59)
  • QuorumNode (162-166)
atlas/consensus/consensus.pb.go (12)
  • WriteKeyResponse (2766-2772)
  • WriteKeyResponse (2785-2785)
  • WriteKeyResponse (2800-2802)
  • KVChange_Acl (1308-1310)
  • KVChange_Acl (1326-1326)
  • Record (556-566)
  • Record (579-579)
  • Record (594-596)
  • KVChange_Notify (1316-1318)
  • KVChange_Notify (1330-1330)
  • KVChange_Sub (1312-1314)
  • KVChange_Sub (1328-1328)
atlas/consensus/server.go (1)
  • NewServer (49-53)
atlas/kv/pool.go (1)
  • GetPool (65-69)
atlas/kv/store.go (1)
  • ErrKeyNotFound (96-96)
🪛 GitHub Actions: Go
integration-tests/scenarios/notifications_test.go

[error] 170-170: TestThreeNodeNotifications failed. Expected at least four notifications for user.* keys, but only 3 were received.

atlas/consensus/named_locks.go

[error] 54-54: golangci-lint: '(*namedLock).release' is unused (unused).

integration-tests/harness/node.go

[warning] 1-1: Broadcast to peer failed: rpc error: code = Unknown desc = version is nil (observed during node join/migration broadcasts). This may indicate transient cluster-state propagation issues.

🪛 GitHub Check: lint
atlas/consensus/named_locks.go

[failure] 54-54:
func (*namedLock).release is unused (unused)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Analyze (go)
🔇 Additional comments (32)
atlas/commands/key-command_test.go (2)

91-91: LGTM! Type conversion aligned with KeyName migration.

The change from builder.String() to string(builder) aligns with the broader migration to consensus.KeyName (byte-based keys). Direct casting is more idiomatic for byte slice to string conversions.

Also applies to: 108-108, 125-125


130-151: LGTM! Test updated to validate SCAN command parsing.

The test correctly validates the SCAN command structure:

  • Enforces minimum token length (2 tokens)
  • Selects and normalizes the prefix
  • Asserts the normalized prefix matches expectations

This aligns with the SCAN command implementation introduced in the PR.

atlas/consensus/data-repository.go (2)

110-113: LGTM! Passthrough handling for Notify/Sub operations.

The passthrough behavior for KVChange_Notify and KVChange_Sub is appropriate, as these operations represent ephemeral events rather than persistent data that requires deduplication.


152-157: LGTM! Consistent passthrough with clear documentation.

The comments explicitly document that notifications and subscriptions bypass content deduplication, which is the correct behavior for these event-based operations.

atlas/commands/acl-command.go (1)

26-26: LGTM! ACL commands migrated to consensus.KeyName.

The changes consistently migrate from kv.FromDottedKey to direct consensus.KeyName casting, aligning with the architectural shift to byte-based key identifiers throughout the consensus layer.

Also applies to: 61-70, 119-119

atlas/commands/scan_commands.go (1)

9-9: LGTM! SCAN command migrated to KeyName.

The changes correctly:

  • Convert the prefix to consensus.KeyName for the PrefixScan call
  • Use bytes.Join for formatting byte-based keys
  • Maintain existing control flow and error handling

This aligns with the broader KeyName migration.

Also applies to: 26-26, 38-38

integration-tests/harness/cluster.go (1)

120-120: LGTM! Test context propagated to nodes.

The addition of the t parameter to NewNode calls enables test-visible logging from nodes, improving test observability and debugging.

Also applies to: 274-274

integration-tests/harness/node.go (2)

32-32: LGTM! Test context integrated into Node.

The addition of the *testing.T field enables test-visible logging while maintaining backward compatibility through nil checks.

Also applies to: 47-47, 50-50, 67-67


142-153: LGTM! Enhanced logging with dual output.

The logging enhancement writes to both:

  1. Log file (existing behavior)
  2. Test logger via t.Log (new, for test visibility)

This improves debugging and test observability while maintaining file-based logs. The nil check on line 150 ensures graceful handling when t is not provided.

integration-tests/harness/client.go (2)

336-347: LGTM! Basic subscription support added.

The Subscribe method follows the existing command execution pattern and properly validates the response for an OK status.


349-378: LGTM! Subscription options support added.

The SubscribeWithOptions method correctly builds SUB commands with optional flags:

  • BATCH (boolean flag)
  • RETRY (with attempt count)
  • RETRY_AFTER (with duration)
  • AUTH (with token)

The command building logic is clear and follows the protocol requirements. Error handling is consistent with other client methods.

atlas/consensus/table-repository.go (1)

23-23: Verified KeyName usage: All GetTable/GetGroup implementations and callers use KeyName; no raw strings remain.

atlas/consensus/consensus.proto (1)

124-135: KVChange: sub/notify oneofs look consistent; no blocker

The new Subscribe/Notify variants and message shapes align with the new workflow. No changes requested.

Also applies to: 136-145

atlas/consensus/subscriptions.go (1)

325-331: Bounds check when slicing originalKey

+1 after len(prefix.Build()) assumes a delimiter and can slice past end. Add a guard or compute the offset explicitly.

Add a guard like:

pfx := prefix.Build()
if len(migration.GetVersion().GetTableName()) < len(pfx) {
  return true, fmt.Errorf("magic key too short")
}
offset := len(pfx)
if len(migration.GetVersion().GetTableName()) > offset && migration.GetVersion().GetTableName()[offset] == ':' { // or the real delimiter
  offset++
}
originalKey := migration.GetVersion().GetTableName()[offset:]
atlas/kv.go (2)

234-254: Broadcast PrefixScan path looks good

Using GetBroadcastQuorum and the updated proto fields is consistent with the new design.


256-286: Subscribe defaults and payload are reasonable

Sane defaults and correct serialization of retry base/auth. No changes requested.

atlas/consensus/server.go (13)

22-22: LGTM: Import and NodeTable type migration

The addition of the bytes import and the change of NodeTable from const string to var KeyName correctly support the KeyName migration across the codebase.

Also applies to: 42-42


46-52: LGTM: Named locking infrastructure

The addition of namedLocker to the Server struct and its initialization in NewServer enables per-table synchronization for concurrent operations, which is used consistently in StealTableOwnership, WriteMigration, and AcceptMigration.


56-57: LGTM: Consistent per-table locking

The per-table locking using namedLocker is correctly implemented across StealTableOwnership, WriteMigration, and AcceptMigration operations. The lock/unlock pattern with deferred unlock ensures proper synchronization and cleanup.

Also applies to: 229-230, 286-287


132-132: LGTM: Logging updated for KeyName type

The logging statements correctly use zap.ByteString for table names to accommodate the KeyName type ([]byte), ensuring proper binary data handling in logs.

Also applies to: 249-249, 339-339, 391-391, 409-409


301-301: LGTM: Correct atlas table prefix checking

The use of bytes.HasPrefix for checking the "atlas." prefix is the correct approach for KeyName ([]byte) comparisons, ensuring proper identification of metadata tables.

Also applies to: 910-910


319-319: LGTM: Context propagation

The addition of context parameters to applyMigration and applyKVDataMigration correctly enables proper context propagation for cancellation, tracing, and timeouts throughout the migration pipeline.

Also applies to: 332-332, 342-342, 351-351


355-359: LGTM: Magic key pre-check integration

The maybeHandleMagicKey call correctly handles special keys before normal processing, with proper error propagation and early return when the operation should be halted.


393-398: LGTM: Async notification dispatches

The asynchronous notification dispatches using goroutines are correctly implemented with error logging. While errors aren't propagated to the caller (by design for async operations), the logging provides visibility into notification failures without blocking the main operation flow.

Also applies to: 411-416, 518-523, 585-590


597-597: LGTM: Notification flush at migration completion

The call to DefaultNotificationSender().HandleNotifications() at the end of applyKVDataMigration correctly ensures that all queued notifications are processed before the function completes.


784-830: LGTM: Immediate node broadcast after join

The addition of immediate gossip broadcasting after a node joins correctly accelerates node discovery across the cluster. The asynchronous execution with best-effort delivery (logged errors but no failures) is appropriate, as the normal gossip protocol will eventually ensure consistency even if some immediate broadcasts fail.


841-841: LGTM: gossipKey type updated

The change of the table field in gossipKey from string to KeyName correctly aligns with the KeyName migration across the codebase.


1233-1233: LGTM: PrefixScan updated for KeyName

The PrefixScan implementation correctly handles the KeyName type migration:

  • Proper nil checking for the prefix
  • Correct use of zap.ByteString for logging binary data
  • Return type changed from []string to [][]byte for owned keys
  • All type conversions handled appropriately

Also applies to: 1280-1285, 1300-1308, 1313-1313


988-988: LGTM: Logging updated for KeyName

The gossip completion logging correctly uses zap.ByteString for the table name, consistent with the KeyName type migration.

atlas/consensus/migration-repository-kv_test.go (1)

37-41: LGTM on KeyName migration in tests

Usage of KeyName and assertions align with the new []byte-based identifiers and API.

Also applies to: 45-50, 57-75, 76-80

atlas/commands/sub-command.go (1)

58-60: Confirm prefix normalization won’t corrupt keys

Using SelectNormalizedCommand for the prefix may alter case. If key prefixes are case-sensitive, prefer SelectCommand to preserve raw bytes.

atlas/consensus/subscriptions_test.go (1)

841-844: Pass jsonReader by pointer (headers test)

Use pointer to jsonReader to avoid non-progressing reads.

-req, err := http.NewRequest("POST", sub.GetUrl(), jsonReader{data: bodyBytes})
+req, err := http.NewRequest("POST", sub.GetUrl(), &jsonReader{data: bodyBytes})

Likely an incorrect or invalid review comment.

Comment on lines +69 to 71
func (c *KeyCommand) FromKey(key string) consensus.KeyName {
return consensus.KeyName(key)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix invalid KeyName conversion in FromKey.

Return KeyName from []byte data, not string.

-func (c *KeyCommand) FromKey(key string) consensus.KeyName {
-	return consensus.KeyName(key)
-}
+func (c *KeyCommand) FromKey(key string) consensus.KeyName {
+	return consensus.KeyName([]byte(key))
+}
🤖 Prompt for AI Agents
In atlas/commands/key-command.go around lines 69 to 71, the FromKey function
currently accepts a string and converts it incorrectly; change the parameter to
accept the raw []byte key and return consensus.KeyName constructed from that
byte slice (i.e., update the signature to FromKey(key []byte) consensus.KeyName
and return consensus.KeyName(key)) so the KeyName is created from the byte data
rather than from a string.

table, _ := q.SelectNormalizedCommand(2)

q1, q2, err := consensus.DescribeQuorum(ctx, table)
q1, q2, err := consensus.DescribeQuorum(ctx, consensus.KeyName(table))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix invalid KeyName conversion (string → []byte).

KeyName is []byte. Casting a string directly won’t compile. Convert via []byte.

-	q1, q2, err := consensus.DescribeQuorum(ctx, consensus.KeyName(table))
+	q1, q2, err := consensus.DescribeQuorum(ctx, consensus.KeyName([]byte(table)))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
q1, q2, err := consensus.DescribeQuorum(ctx, consensus.KeyName(table))
q1, q2, err := consensus.DescribeQuorum(ctx, consensus.KeyName([]byte(table)))
🤖 Prompt for AI Agents
In atlas/commands/quorum-command.go around line 30, the call uses
consensus.KeyName(table) but KeyName is a []byte type; cast the string properly
by converting the string to a byte slice (e.g.,
consensus.KeyName([]byte(table))) so the types match and the code compiles.

Comment on lines +118 to +133
err = atlas.Subscribe(ctx, consensus.KeyName(parsed.Prefix), parsed.URL, atlas.SubscribeOptions{
RetryAttempts: int(parsed.RetryAttempts),
RetryAfterBase: parsed.RetryAfterBase,
Auth: parsed.Auth,
})
if err != nil {
return nil, err
}

options.Logger.Info("created subscription",
zap.String("prefix", parsed.Prefix),
zap.String("url", parsed.URL),
zap.Bool("batch", parsed.Batch),
zap.Int32("retry_attempts", parsed.RetryAttempts),
zap.Duration("retry_after_base", parsed.RetryAfterBase))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

NOBATCH is ignored; wire Batch through to Subscribe

Parse sets Batch but Execute never passes it; atlas.Subscribe hard-codes Batch=true, making NOBATCH ineffective. Pass Batch here and propagate in atlas.SubscribeOptions.

Apply this diff here:

-	err = atlas.Subscribe(ctx, consensus.KeyName(parsed.Prefix), parsed.URL, atlas.SubscribeOptions{
-		RetryAttempts:  int(parsed.RetryAttempts),
-		RetryAfterBase: parsed.RetryAfterBase,
-		Auth:           parsed.Auth,
-	})
+	err = atlas.Subscribe(ctx, consensus.KeyName(parsed.Prefix), parsed.URL, atlas.SubscribeOptions{
+		Batch:          parsed.Batch,
+		RetryAttempts:  int(parsed.RetryAttempts),
+		RetryAfterBase: parsed.RetryAfterBase,
+		Auth:           parsed.Auth,
+	})

And update atlas/kv.go accordingly (outside this file):

- type SubscribeOptions struct {
- 	RetryAttempts  int
- 	RetryAfterBase time.Duration
- 	Auth           string
- }
+ type SubscribeOptions struct {
+ 	Batch          bool
+ 	RetryAttempts  int
+ 	RetryAfterBase time.Duration
+ 	Auth           string
+ }

  func Subscribe(ctx context.Context, prefix consensus.KeyName, callbackUrl string, opts SubscribeOptions) error {
     if opts.RetryAttempts == 0 {
         opts.RetryAttempts = 3
     }
     if opts.RetryAfterBase == 0 {
         opts.RetryAfterBase = 100 * time.Millisecond
     }
     op := &consensus.KVChange{
         Operation: &consensus.KVChange_Sub{
             Sub: &consensus.Subscribe{
                 Url:    callbackUrl,
                 Prefix: prefix,
                 Options: &consensus.SubscribeOptions{
-                    Batch:          true,
+                    Batch:          opts.Batch,
                     RetryAttempts:  int32(opts.RetryAttempts),
                     RetryAfterBase: durationpb.New(opts.RetryAfterBase),
                     Auth:           opts.Auth,
                 },
             },
         },
     }
     return sendWrite(ctx, prefix, op)
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
err = atlas.Subscribe(ctx, consensus.KeyName(parsed.Prefix), parsed.URL, atlas.SubscribeOptions{
RetryAttempts: int(parsed.RetryAttempts),
RetryAfterBase: parsed.RetryAfterBase,
Auth: parsed.Auth,
})
if err != nil {
return nil, err
}
options.Logger.Info("created subscription",
zap.String("prefix", parsed.Prefix),
zap.String("url", parsed.URL),
zap.Bool("batch", parsed.Batch),
zap.Int32("retry_attempts", parsed.RetryAttempts),
zap.Duration("retry_after_base", parsed.RetryAfterBase))
err = atlas.Subscribe(ctx, consensus.KeyName(parsed.Prefix), parsed.URL, atlas.SubscribeOptions{
Batch: parsed.Batch,
RetryAttempts: int(parsed.RetryAttempts),
RetryAfterBase: parsed.RetryAfterBase,
Auth: parsed.Auth,
})
🤖 Prompt for AI Agents
In atlas/commands/sub-command.go around lines 118 to 133, the parsed.Batch flag
is set but never passed into atlas.Subscribe, so NOBATCH is ignored; update the
Subscribe call to include Batch: parsed.Batch in the atlas.SubscribeOptions you
construct. Also update the atlas.SubscribeOptions type and its usages in
atlas/kv.go to add a Batch bool field and ensure atlas.Subscribe consumes that
field (do not hard-code Batch=true) and propagates it into the subscription
creation logic; keep the field name and type consistent and update any call
sites/tests that build SubscribeOptions accordingly.

Comment on lines +433 to 468
case *KVChange_Acl:
key := op.Acl.GetKey()
var record Record
store := kv.GetPool().DataStore()
val, err := store.Get(ctx, key)
if err != nil && errors.Is(err, kv.ErrKeyNotFound) {
break
}
if err != nil {
return &WriteKeyResponse{
Success: false,
Error: fmt.Sprintf("failed to get key: %v", err),
}, nil
}
err = proto.Unmarshal(val, &record)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal record: %v", err)
}
if isOwner(ctx, &record) {
break
}
return &WriteKeyResponse{
Success: false,
Error: "principal isn't allowed to modify ACLs for this key",
}, nil
case *KVChange_Notify:
// Notifications are internal system operations that bypass ACL checks
// They are written to magic keys for subscription processing
break
case *KVChange_Sub:
// Subscriptions are internal system operations that bypass ACL checks
// They are written to magic keys for subscription storage
break
default:
panic("unknown operation type")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid panic on unknown operation types; return an error response

Panicking on untrusted input can crash the node. Return a structured error.

-	default:
-		panic("unknown operation type")
+	default:
+		return &WriteKeyResponse{
+			Success: false,
+			Error:   fmt.Sprintf("unknown operation type: %T", in.GetValue().Operation),
+		}, nil
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
case *KVChange_Acl:
key := op.Acl.GetKey()
var record Record
store := kv.GetPool().DataStore()
val, err := store.Get(ctx, key)
if err != nil && errors.Is(err, kv.ErrKeyNotFound) {
break
}
if err != nil {
return &WriteKeyResponse{
Success: false,
Error: fmt.Sprintf("failed to get key: %v", err),
}, nil
}
err = proto.Unmarshal(val, &record)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal record: %v", err)
}
if isOwner(ctx, &record) {
break
}
return &WriteKeyResponse{
Success: false,
Error: "principal isn't allowed to modify ACLs for this key",
}, nil
case *KVChange_Notify:
// Notifications are internal system operations that bypass ACL checks
// They are written to magic keys for subscription processing
break
case *KVChange_Sub:
// Subscriptions are internal system operations that bypass ACL checks
// They are written to magic keys for subscription storage
break
default:
panic("unknown operation type")
}
case *KVChange_Acl:
key := op.Acl.GetKey()
var record Record
store := kv.GetPool().DataStore()
val, err := store.Get(ctx, key)
if err != nil && errors.Is(err, kv.ErrKeyNotFound) {
break
}
if err != nil {
return &WriteKeyResponse{
Success: false,
Error: fmt.Sprintf("failed to get key: %v", err),
}, nil
}
err = proto.Unmarshal(val, &record)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal record: %v", err)
}
if isOwner(ctx, &record) {
break
}
return &WriteKeyResponse{
Success: false,
Error: "principal isn't allowed to modify ACLs for this key",
}, nil
case *KVChange_Notify:
// Notifications are internal system operations that bypass ACL checks
// They are written to magic keys for subscription processing
break
case *KVChange_Sub:
// Subscriptions are internal system operations that bypass ACL checks
// They are written to magic keys for subscription storage
break
default:
return &WriteKeyResponse{
Success: false,
Error: fmt.Sprintf("unknown operation type: %T", in.GetValue().Operation),
}, nil
}
🤖 Prompt for AI Agents
In atlas/consensus/majority-quorum.go around lines 433 to 468, the default
branch panics on unknown operation types which can crash the node; change it to
return a structured error response instead. Replace the panic with a returned
*WriteKeyResponse{Success:false, Error: fmt.Sprintf("unknown operation type:
%T", op)} (or similar descriptive message) and nil error from the function so
callers receive a safe, non-panicking result; ensure the function's return
signature and any callers remain consistent with returning a WriteKeyResponse on
failure.

Comment on lines +557 to 558
return nil, ErrStealTableOwnershipFailed
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Deletes should forward to the owner (parity with writes)

On not‑promised, DeleteKey returns an error instead of forwarding to the owner like WriteKey does.

 	if !phase1.Promised {
-		table = phase1.GetFailure().GetTable()
-		// we are not the leader, so update our tr with the new table information
-		err = upsertTable(ctx, tr, table)
-		if err != nil {
-			return nil, err
-		}
-		return nil, ErrStealTableOwnershipFailed
+		table = phase1.GetFailure().GetTable()
+		// we are not the leader; update and forward to owner
+		if err = upsertTable(ctx, tr, table); err != nil {
+			return nil, err
+		}
+		owner := table.GetOwner()
+		qm := GetDefaultQuorumManager(ctx)
+		resp, err := qm.Send(owner, func(node *QuorumNode) (any, error) {
+			return node.DeleteKey(ctx, in, opts...)
+		})
+		if err != nil {
+			return nil, errors.Join(errors.New("failed to forward delete key to owner"), err)
+		}
+		if resp == nil {
+			return nil, errors.New("owner returned nil response")
+		}
+		return resp.(*WriteKeyResponse), nil
 	}

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In atlas/consensus/majority-quorum.go around lines 557-558, the DeleteKey path
currently returns ErrStealTableOwnershipFailed on a not-promised response
instead of forwarding the delete to the table owner like WriteKey does; update
the code to detect the owner (use the same owner/forwarding logic used by
WriteKey), perform the forward RPC/call to the owner for the delete operation,
and return that forwarded response (or translated error) instead of
ErrStealTableOwnershipFailed so deletes gain parity with writes.

Comment on lines +151 to +159
encoder := base32.NewEncoding("0123456789ABCDEFGHJKMNPQRSTVWXYZ")

nl = append(nl, &NotificationJson{
Key: string(n.pub.GetKey()),
Version: n.pub.GetVersion(),
Op: opName,
Origin: string(n.sub.GetPrefix()),
EventId: strings.ReplaceAll(encoder.EncodeToString(hasher.Sum(nil)), "=", ""),
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Use Write([]byte) instead of non-existent WriteString for blake3

blake3.Hasher doesn’t guarantee a WriteString method. Use Write([]byte(...)).

-						_, err := hasher.WriteString(n.pub.GetVersion())
+						_, err := hasher.Write([]byte(n.pub.GetVersion()))

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In atlas/consensus/subscriptions.go around lines 151-159, the code calls a
non-existent WriteString on a blake3.Hasher; replace that call with
hasher.Write([]byte(yourString)) and handle the returned (int, error)
appropriately (e.g., check error and propagate or log). Ensure you import no new
packages beyond existing ones and remove any WriteString usage so the hasher
uses the io.Writer Write([]byte) API.

Comment on lines +410 to +415
default:
panic("unsupported migration type")
}
default:
panic("unsupported migration type")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don’t panic on unsupported migrations; return an error

Panic in replication paths can bring down the node. Return an error.

-		default:
-			panic("unsupported migration type")
+		default:
+			return true, fmt.Errorf("unsupported data operation: %T", mig.Data.GetChange().GetOperation())
 	}

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +456 to +459
_, err = hasher.WriteString(op.Notify.Version)
if err != nil {
return obj, err
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Second occurrence: use Write([]byte) instead of WriteString

Same hashing bug as above.

-	_, err = hasher.WriteString(op.Notify.Version)
+	_, err = hasher.Write([]byte(op.Notify.Version))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
_, err = hasher.WriteString(op.Notify.Version)
if err != nil {
return obj, err
}
_, err = hasher.Write([]byte(op.Notify.Version))
if err != nil {
return obj, err
}
🤖 Prompt for AI Agents
In atlas/consensus/subscriptions.go around lines 456 to 459, the code uses
hasher.WriteString(op.Notify.Version) which reproduces the same hashing bug as
earlier; replace the WriteString call with
hasher.Write([]byte(op.Notify.Version)) and handle the returned (int, error) as
you do for other hasher writes (check/return the error if non-nil) so the
version bytes are hashed correctly.

Comment on lines +65 to +75
notificationMutex.Lock()
for _, notif := range notifs {
notifications[notif["event_id"].(string)] = notif
t.Logf("Received notification: %+v", notif)
}
notificationMutex.Unlock()

receivedNotifications.Add(int32(len(notifications)))
t.Logf("Received %d notifications (total: %d)", len(notifications), receivedNotifications.Load())

w.WriteHeader(http.StatusOK)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Count only newly received notifications.

receivedNotifications should reflect just the batch delta, but the current code adds the total map size on every callback. With multiple HTTP deliveries this inflates the counter (1+2+3…), so the test can pass even when fewer notifications arrive. Track and add only the truly new events inserted into the map.

-	notificationMutex.Lock()
-	for _, notif := range notifs {
-		notifications[notif["event_id"].(string)] = notif
-		t.Logf("Received notification: %+v", notif)
-	}
-	notificationMutex.Unlock()
-
-	receivedNotifications.Add(int32(len(notifications)))
-	t.Logf("Received %d notifications (total: %d)", len(notifications), receivedNotifications.Load())
+	notificationMutex.Lock()
+	newCount := 0
+	for _, notif := range notifs {
+		eventID, _ := notif["event_id"].(string)
+		if eventID == "" {
+			t.Logf("Skipping notification without event_id: %+v", notif)
+			continue
+		}
+		if _, exists := notifications[eventID]; !exists {
+			newCount++
+		}
+		notifications[eventID] = notif
+		t.Logf("Received notification: %+v", notif)
+	}
+	notificationMutex.Unlock()
+
+	if newCount > 0 {
+		receivedNotifications.Add(int32(newCount))
+	}
+	t.Logf("Received %d new notifications (total unique: %d, counter: %d)", newCount, len(notifications), receivedNotifications.Load())
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
notificationMutex.Lock()
for _, notif := range notifs {
notifications[notif["event_id"].(string)] = notif
t.Logf("Received notification: %+v", notif)
}
notificationMutex.Unlock()
receivedNotifications.Add(int32(len(notifications)))
t.Logf("Received %d notifications (total: %d)", len(notifications), receivedNotifications.Load())
w.WriteHeader(http.StatusOK)
notificationMutex.Lock()
newCount := 0
for _, notif := range notifs {
eventID, _ := notif["event_id"].(string)
if eventID == "" {
t.Logf("Skipping notification without event_id: %+v", notif)
continue
}
if _, exists := notifications[eventID]; !exists {
newCount++
}
notifications[eventID] = notif
t.Logf("Received notification: %+v", notif)
}
notificationMutex.Unlock()
if newCount > 0 {
receivedNotifications.Add(int32(newCount))
}
t.Logf("Received %d new notifications (total unique: %d, counter: %d)",
newCount, len(notifications), receivedNotifications.Load())
w.WriteHeader(http.StatusOK)
🤖 Prompt for AI Agents
In integration-tests/scenarios/notifications_test.go around lines 65-75, the
code incorrectly adds the total map size to receivedNotifications on every
callback; change it to count only newly inserted notifications by computing a
newCount while holding notificationMutex (for each notif: if key doesn't exist
in notifications map then insert and increment newCount), release the mutex,
then call receivedNotifications.Add(newCount) and update the log to report
newCount and the new total via receivedNotifications.Load().

Comment on lines +107 to +115
for range 10 {
_ = node0.Client().Subscribe("user.", server.URL)
if err == nil {
break
}
time.Sleep(1 * time.Second)
}

t.Logf("Subscribed to notifications for prefix 'user.' at %s", server.URL)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Propagate Subscribe errors during retry.

err never captures the Subscribe result, so the loop breaks immediately with the stale (and currently nil) value and we may proceed without a successful subscription—matching the pipeline failure where notifications were missing. Assign the error from Subscribe and assert success after the retries.

-	for range 10 {
-		_ = node0.Client().Subscribe("user.", server.URL)
-		if err == nil {
-			break
-		}
+	for range 10 {
+		err = node0.Client().Subscribe("user.", server.URL)
+		if err == nil {
+			break
+		}
 		time.Sleep(1 * time.Second)
 	}
+
+	require.NoError(t, err, "Failed to subscribe node 0 to notifications")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for range 10 {
_ = node0.Client().Subscribe("user.", server.URL)
if err == nil {
break
}
time.Sleep(1 * time.Second)
}
t.Logf("Subscribed to notifications for prefix 'user.' at %s", server.URL)
for range 10 {
err = node0.Client().Subscribe("user.", server.URL)
if err == nil {
break
}
time.Sleep(1 * time.Second)
}
require.NoError(t, err, "Failed to subscribe node 0 to notifications")
t.Logf("Subscribed to notifications for prefix 'user.' at %s", server.URL)
🤖 Prompt for AI Agents
In integration-tests/scenarios/notifications_test.go around lines 107 to 115,
the retry loop calls node0.Client().Subscribe but never assigns its return error
to err, so the loop breaks immediately using a stale nil value; change the call
to assign err = node0.Client().Subscribe("user.", server.URL) (or err := if err
is new in scope), keep the retry/backoff logic, and after the loop assert
success (e.g., if err != nil { t.Fatalf("failed to subscribe after retries: %v",
err) }) so failures are propagated rather than ignored.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

dependencies Pull requests that update a dependency file enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants