subscribe to chain#13
Conversation
|
Caution Review failedThe pull request is closed. ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughThis PR adds CometBFT-based settlement streaming across the RFQ toolkit. New settlement decoders in Go, Python, and TypeScript subscribe to on-chain transaction events, parse settlements, filter by maker execution, and emit settlement updates. Framework and example integration layers wire these into existing market maker flows with deduplication, concurrent handling, and origin tracking. ChangesCometBFT settlement streaming integration
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 6
🧹 Nitpick comments (4)
examples/ts-retail/main.ts (1)
346-357: 💤 Low valueChain stream handle is discarded; no shutdown/cleanup.
streamChainSettlements(...)returns the underlyingWebSocket, but the return value isn't retained, so the socket can't be closed on shutdown likesettlementWs. Capturing it (e.g., into a module-level variable) would let you close it cleanly and mirror the off-chain path's lifecycle.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@examples/ts-retail/main.ts` around lines 346 - 357, The call to streamChainSettlements(...) discards the returned WebSocket so it cannot be closed on shutdown; capture its return (e.g., assign to the existing module-level settlementWs or a new variable) when CHAIN_COMETBFT_ENDPOINT is set, and ensure the same shutdown/cleanup logic that closes settlementWs for the WS_URL path is applied to that captured socket (use the same onError/onSettlement handlers like printSettlement and close the socket during shutdown).examples/ts-retail/chain-settlement.ts (1)
63-102: ⚖️ Poor tradeoffNo reconnection or close handling for the settlement stream.
The
wsconnection has noclose/reconnect handling, so a transient drop silently stops settlement delivery for the lifetime of the process. For a long-running settlement streamer, consider reconnecting on close (with backoff) and resubscribing. Acceptable to defer for an example, but worth noting.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@examples/ts-retail/chain-settlement.ts` around lines 63 - 102, streamChainSettlements currently opens a WebSocket but does not handle "close" or reconnection, so transient drops stop delivery; add a reconnection/backoff loop (or internal retry logic) that listens for "close" and "error" on the WebSocket created in streamChainSettlements, implements exponential backoff (configurable via StreamOptions e.g., maxRetries and initialDelay), and on reconnect re-create the ws, re-subscribe with the same query (the JSON-RPC subscribe payload currently built in streamChainSettlements), reattach the same "message" parsing and settlementFromEvents handling, and ensure you stop retrying on explicit cancellation/cleanup to avoid duplicate handlers or multiple concurrent sockets.examples/go-mm/chainsettlement/settlement.go (1)
44-63: ⚡ Quick winGuard the outbound send with
ctx.Done()to avoid a blocked goroutine.If the consumer stops draining
out(e.g. the example exits on a stream error),out <- ...blocks indefinitely, sodefer client.Stop()never runs and the subscription goroutine leaks. Select onctx.Done()during the send.♻️ Proposed change
- out <- convertSettlementToMakerUpdate(settlement, event.Events) + select { + case out <- convertSettlementToMakerUpdate(settlement, event.Events): + case <-ctx.Done(): + return nil + }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@examples/go-mm/chainsettlement/settlement.go` around lines 44 - 63, The send to out in the subscription loop can block and leak the goroutine; modify the code inside the for/select (the branch that calls out <- convertSettlementToMakerUpdate(...)) to perform a non-blocking/send-with-cancellation using ctx: replace the direct send with a select that returns if ctx.Done() is closed and otherwise sends the value to out; keep the existing checks around settlementFromCometEvents, makerHasTraded, and the error logging, and use the same converted value from convertSettlementToMakerUpdate when sending.examples/go-mm/main-grpc/main.go (1)
458-473: 💤 Low valueNested goroutines leave
settlementChunclosed.When
StreamMakerSettlementsreturns (error or ctx cancel),settlementChis never closed, so the inner forwarding goroutine ranging over it never exits. It's harmless whilelog.Fataltears the process down, but consider closing the channel after the stream returns so the forwarder terminates cleanly. The extra wrapping goroutine can also be flattened.♻️ Optional cleanup
if cometBFTEndpoint != "" { go func() { settlementCh := make(chan *pb.RFQSettlementMakerUpdate, 100) go func() { for settlement := range settlementCh { respCh <- &pb.MakerStreamResponse{ MessageType: "settlement_update_chain", Settlement: settlement, } } }() - if err := chainsettlement.StreamMakerSettlements(baseCtx, cometBFTEndpoint, contractAddr, makerAddr, settlementCh); err != nil { + defer close(settlementCh) + if err := chainsettlement.StreamMakerSettlements(baseCtx, cometBFTEndpoint, contractAddr, makerAddr, settlementCh); err != nil { errCh <- err } }() }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@examples/go-mm/main-grpc/main.go` around lines 458 - 473, The nested goroutine leaves settlementCh unclosed and the inner forwarder ranging over it never exits; refactor to flatten the wrapper: create settlementCh := make(chan *pb.RFQSettlementMakerUpdate, 100), start a single forwarder goroutine that ranges over settlementCh and forwards to respCh, then call chainsettlement.StreamMakerSettlements(baseCtx, cometBFTEndpoint, contractAddr, makerAddr, settlementCh) in the same outer goroutine and ensure you close(settlementCh) after StreamMakerSettlements returns (use defer close(settlementCh) immediately after creating settlementCh or explicitly close once StreamMakerSettlements finishes), and if StreamMakerSettlements returns an error send it to errCh; ensure you don’t close settlementCh twice.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@examples/go-mm/chainsettlement/settlement.go`:
- Around line 271-279: convertSettlementToMakerUpdate currently keys
resultsByMaker by quote.Maker which causes overwrites when a maker has multiple
quotes; change the map to key by a per-quote unique identifier (e.g.,
quote.Nonce, quote.Signature or quote.Id) and build resultsByQuoteID from
settlement.QuoteResults using that identifier, then pass
resultsByQuoteID[quote.<idField>] into convertSettlementQuote for each
settlement.Quote; update references to rfqSettlement, resultsByMaker,
rfqQuoteResult and convertSettlementQuote accordingly and ensure you pick the
actual unique field present on the quote/quote result structs.
In `@examples/python-mm/chain_settlement.py`:
- Around line 119-128: In settlement_from_events, json.loads(raw) can raise
JSONDecodeError and currently crashes the stream; wrap the json.loads call in a
try/except that catches json.JSONDecodeError, mirror the behavior used by
load_json_attr (return None on parse failure or log and return None), and ensure
you reference the function settlement_from_events and the helper
first_event_value when implementing the safe parse so malformed settlement
payloads do not raise uncaught exceptions.
In `@examples/python-mm/mark_quote_loop.py`:
- Line 210: The local variable comet_bft_endpoint is defined inside
_print_startup but used in main, causing a NameError; fix by moving the
environment read into main (set comet_bft_endpoint =
os.getenv("CHAIN_COMETBFT_ENDPOINT") in main) and pass that value into
_print_startup as a parameter (update _print_startup signature and its call
sites), or alternatively remove the local assignment inside _print_startup and
add a comet_bft_endpoint parameter so main can supply the value; update all
references in main to use the local variable passed into/returned from those
functions (functions to edit: _print_startup and main).
In `@src/rfq_test/clients/chain_settlement.py`:
- Around line 46-85: stream_maker_settlements currently opens a single WebSocket
via websockets.connect(ws_url) and will exit if the connection drops, so add
reconnection logic with exponential backoff (similar to
BaseStreamClient._reconnect) around the connect/receive loop: wrap the
connect-and-read block in a retry loop that catches connection and recv errors,
waits with increasing backoff (reset on successful connect), and re-establishes
the websocket; retain existing handling for message parsing and use the same
symbols (stream_maker_settlements, comet_ws_url, settlement_from_events,
maker_has_traded, settlement_to_maker_update, out_queue, stop) so the function
continues to filter and enqueue maker updates but survives transient network
failures.
- Around line 222-227: The current code calls .get() on action["limit"] and
action["market"] without verifying types, which will raise AttributeError if
those entries are present but not dicts; update the block handling
unfilled_action (references: unfilled_action, action,
RFQSettlementLimitActionType, RFQSettlementMarketActionType) to first check that
action.get("limit") is an instance of dict before accessing .get() and only call
unfilled_action.limit.CopyFrom(...) when it is valid; do the same for
action.get("market") (e.g., isinstance(..., dict)), and skip or log/ignore
malformed entries instead of accessing attributes on non-dict values.
In `@src/rfq_test/clients/websocket.py`:
- Around line 847-850: The loop in _forward_chain_settlements currently awaits
items and calls _queue_settlement_update without handling exceptions, so wrap
the while True body (after awaiting self._chain_settlement_queue.get() and
around the call to self._queue_settlement_update) in a try/except that catches
Exception, logs the full exception (including traceback) via the instance logger
(e.g. self._logger.error or similar) and then continues the loop; ensure
asyncio.CancelledError is not swallowed (re-raise it) so task cancellation still
works.
---
Nitpick comments:
In `@examples/go-mm/chainsettlement/settlement.go`:
- Around line 44-63: The send to out in the subscription loop can block and leak
the goroutine; modify the code inside the for/select (the branch that calls out
<- convertSettlementToMakerUpdate(...)) to perform a
non-blocking/send-with-cancellation using ctx: replace the direct send with a
select that returns if ctx.Done() is closed and otherwise sends the value to
out; keep the existing checks around settlementFromCometEvents, makerHasTraded,
and the error logging, and use the same converted value from
convertSettlementToMakerUpdate when sending.
In `@examples/go-mm/main-grpc/main.go`:
- Around line 458-473: The nested goroutine leaves settlementCh unclosed and the
inner forwarder ranging over it never exits; refactor to flatten the wrapper:
create settlementCh := make(chan *pb.RFQSettlementMakerUpdate, 100), start a
single forwarder goroutine that ranges over settlementCh and forwards to respCh,
then call chainsettlement.StreamMakerSettlements(baseCtx, cometBFTEndpoint,
contractAddr, makerAddr, settlementCh) in the same outer goroutine and ensure
you close(settlementCh) after StreamMakerSettlements returns (use defer
close(settlementCh) immediately after creating settlementCh or explicitly close
once StreamMakerSettlements finishes), and if StreamMakerSettlements returns an
error send it to errCh; ensure you don’t close settlementCh twice.
In `@examples/ts-retail/chain-settlement.ts`:
- Around line 63-102: streamChainSettlements currently opens a WebSocket but
does not handle "close" or reconnection, so transient drops stop delivery; add a
reconnection/backoff loop (or internal retry logic) that listens for "close" and
"error" on the WebSocket created in streamChainSettlements, implements
exponential backoff (configurable via StreamOptions e.g., maxRetries and
initialDelay), and on reconnect re-create the ws, re-subscribe with the same
query (the JSON-RPC subscribe payload currently built in
streamChainSettlements), reattach the same "message" parsing and
settlementFromEvents handling, and ensure you stop retrying on explicit
cancellation/cleanup to avoid duplicate handlers or multiple concurrent sockets.
In `@examples/ts-retail/main.ts`:
- Around line 346-357: The call to streamChainSettlements(...) discards the
returned WebSocket so it cannot be closed on shutdown; capture its return (e.g.,
assign to the existing module-level settlementWs or a new variable) when
CHAIN_COMETBFT_ENDPOINT is set, and ensure the same shutdown/cleanup logic that
closes settlementWs for the WS_URL path is applied to that captured socket (use
the same onError/onSettlement handlers like printSettlement and close the socket
during shutdown).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 6d613b41-cc97-4630-b4e1-147e775045f4
📒 Files selected for processing (19)
examples/go-mm/chainsettlement/settlement.goexamples/go-mm/chainsettlement/settlement_test.goexamples/go-mm/go.modexamples/go-mm/main-grpc/main.goexamples/go-mm/main/main.goexamples/go-mm/queue/fifo.goexamples/python-mm/chain_settlement.pyexamples/python-mm/main-grpc.pyexamples/python-mm/main.pyexamples/python-mm/mark_quote_loop.pyexamples/ts-retail/chain-settlement.tsexamples/ts-retail/main.tssrc/rfq_test/actors/market_maker.pysrc/rfq_test/clients/chain_settlement.pysrc/rfq_test/clients/websocket.pysrc/rfq_test/config.pysrc/rfq_test/models/config.pytests/test_config.pytests/test_websocket_headers.py
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@examples/python-mm/main.py`:
- Around line 158-163: The loop unpacks client.get_next_event() into (msg_type,
data) but get_next_event may return None on timeout (or raise
asyncio.TimeoutError), causing a TypeError; update the while loop to await
client.get_next_event(timeout=60.0), check if the result is None before
unpacking (or catch asyncio.TimeoutError) and continue on timeout, and only
unpack into msg_type and data when the returned value is not None; reference
client.get_next_event and the IndexerTimeoutError handling to replace the direct
unpack with a guarded check.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: eb0cbbd5-c8b1-4d29-a98d-dc298f4aa423
📒 Files selected for processing (4)
examples/python-mm/main-grpc.pyexamples/python-mm/main.pyexamples/python-mm/mark_quote_loop.pysrc/rfq_test/clients/websocket.py
🚧 Files skipped from review as they are similar to previous changes (2)
- examples/python-mm/main-grpc.py
- src/rfq_test/clients/websocket.py
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
examples/python-mm/main-grpc.py (1)
373-381:⚠️ Potential issue | 🟠 Major | ⚡ Quick winSurface failures from the chain settlement task.
settlement_taskis started here but never awaited or checked afterward. Ifstream_maker_settlements()exits with an exception, chain settlement streaming dies silently and this example keeps running as if nothing happened.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@examples/python-mm/main-grpc.py` around lines 373 - 381, The settlement_task created for stream_maker_settlements is launched but never observed, so any exception inside stream_maker_settlements will be silently lost; update the caller to monitor and surface failures by either awaiting settlement_task (or awaiting a supervisor coroutine) or attaching a done callback that inspects settlement_task.exception() and logs/raises it; reference the created task name settlement_task and the producer function stream_maker_settlements and ensure the handler uses settlement_queue/settlement_stop context to clean up on failure (log the error and propagate or shut down the example run instead of ignoring the exception).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@examples/python-mm/main-grpc.py`:
- Around line 398-402: The current branch that handles settlement_read
short-circuits with a continue and can starve stream_read; after you process
settlement (use settlement = settlement_read.result(), settlements_seen.add and
print_settlement_update as shown) do not immediately continue—add logic to also
check if stream_read is in done, consume stream_read.result(), handle it with
the existing stream-handling routine (the same code path used when stream_read
finishes first), and then recreate both asyncio.create_task calls
(settlement_queue.get() and the stream read task) as needed; ensure you only
continue after both completed-item handlers are processed so RFQ stream messages
are not postponed.
---
Outside diff comments:
In `@examples/python-mm/main-grpc.py`:
- Around line 373-381: The settlement_task created for stream_maker_settlements
is launched but never observed, so any exception inside stream_maker_settlements
will be silently lost; update the caller to monitor and surface failures by
either awaiting settlement_task (or awaiting a supervisor coroutine) or
attaching a done callback that inspects settlement_task.exception() and
logs/raises it; reference the created task name settlement_task and the producer
function stream_maker_settlements and ensure the handler uses
settlement_queue/settlement_stop context to clean up on failure (log the error
and propagate or shut down the example run instead of ignoring the exception).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 23fda4a5-f3a8-40dd-b0ae-f4136ed9161a
📒 Files selected for processing (5)
examples/python-mm/main-grpc.pyexamples/python-mm/main.pyexamples/python-mm/mark_quote_loop.pysrc/rfq_test/clients/chain_settlement.pysrc/rfq_test/clients/websocket.py
🚧 Files skipped from review as they are similar to previous changes (4)
- examples/python-mm/main.py
- src/rfq_test/clients/websocket.py
- examples/python-mm/mark_quote_loop.py
- src/rfq_test/clients/chain_settlement.py
Allows to stream settlements directly from the comet BFT chain endpoint

Summary by CodeRabbit
New Features
Configuration
Tests