refactor(ddp-streamer): backpressure, deterministic logout, metrics, fan-out tests#40358
refactor(ddp-streamer): backpressure, deterministic logout, metrics, fan-out tests#40358
Conversation
…fan-out tests
Internal refactor of ee/apps/ddp-streamer keeping the wire protocol identical
(DDP-over-WS, EJSON, login via Account.login, method fallback to MeteorService).
Heartbeat & backpressure:
- Split heartbeat into idleTimer / pongTimer. The pongTimer is armed only when
a server PING is sent and is cleared exclusively by an inbound PONG; other
inbound traffic no longer extends the deadline (a broken client that keeps
sending data but never replies to PING is now disconnected deterministically).
- Drop slow consumers: when ws.bufferedAmount exceeds MAX_BUFFERED_BYTES
(default 4 MiB, env override DDP_MAX_BUFFERED_BYTES), close with code 1013.
Fan-out path:
- Encapsulate the ws private _sender.sendFrame call behind RawSender so the
upgrade path to a different ws/uWS implementation is local. Apply the same
bufferedAmount guard at the fan-out level.
- Extract fanOutText from Stream.sendToManySubscriptions so the loop is
testable without instantiating the full Streamer base class.
Lifecycle:
- Replace setTimeout(1ms) on logout with setImmediate; ws frame ordering
guarantees the result/updated frames reach the wire before the close frame.
- Decode binary frames as UTF-8 in Server.parse instead of throwing 500
(some proxies/wrappers deliver UTF-8 JSON in binary frames).
Observability:
- Register ddp_method_total{namespace,status}, ddp_close_total{code} and
ddp_send_buffer_bytes{nodeID} (sampled every 5s).
- Method labels are bucketed by the prefix before ':' or '.' to keep
Prometheus cardinality bounded.
- Replace console.error/warn with @rocket.chat/logger in Client.ts and
Streamer.ts.
Tests:
- New specs: Client.spec, Streamer.spec, RawSender.spec.
- Server.spec extended to cover parse() and metric increments.
- Suite grows from 6 to 32 tests; coverage from ~10% to 63% (RawSender 94%,
Streamer 75%, Server 65%, Client 60%).
|
Looks like this PR is not ready to merge, because of the following issues:
Please fix the issues and try again If you have any trouble, please check the PR guidelines |
|
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## develop #40358 +/- ##
===========================================
+ Coverage 69.97% 69.99% +0.02%
===========================================
Files 3301 3305 +4
Lines 120443 120700 +257
Branches 21559 21630 +71
===========================================
+ Hits 84281 84486 +205
- Misses 32862 32921 +59
+ Partials 3300 3293 -7
Flags with carried forward coverage won't be shown. Click here to find out more. 🚀 New features to boost your workflow:
|
Summary
Internal refactor of
ee/apps/ddp-streamerkeeping the wire protocol identical (DDP-over-WS, EJSON, login viaAccount.login, method fallback toMeteorService). Five focused changes split into one commit.Heartbeat & backpressure
idleTimer/pongTimer. ThepongTimeris armed only when the server sends a PING and is cleared exclusively by an inbound PONG. Other inbound traffic no longer extends the deadline — a broken client that keeps sending data but never replies to PING is now disconnected deterministically.ws.bufferedAmount > MAX_BUFFERED_BYTES(default 4 MiB, env overrideDDP_MAX_BUFFERED_BYTES), the socket is closed with code1013. Prevents heap blow-up on the streamer pod when a single peer stalls.Fan-out path
RawSenderhelper. The privatews._sender.sendFramecall is now isolated inlib/RawSender.ts, with the buffer-amount guard applied at fan-out too. The upgrade path to a differentws/uWS implementation is now local to one file.fanOutTextextracted fromStream.sendToManySubscriptionsso the loop is testable without instantiating the fullStreamerbase class.Lifecycle
setTimeout(1ms)→setImmediate.wsguarantees frame ordering on the wire, so theresult/updatedframes reach the wire before the close frame.Server.parsedecodes binary buffers as UTF-8 instead of throwing 500. Some proxies andArrayBufferpaths deliver text payloads in binary frames.Observability
ddp_method_total{namespace, status}— counter, labeled by method namespace prefix (before:/.) to keep Prometheus cardinality boundedddp_close_total{code}— counterddp_send_buffer_bytes{nodeID}— gauge sampled every 5 sconsole.error/warnwith@rocket.chat/loggerinClient.tsandStreamer.ts.Tests
Client.spec,Streamer.spec,RawSender.spec.Server.specextended to coverparse()and metric increments.Out of scope (future work)
wstouWebSockets.js.permessage-deflate.Test plan
yarn workspace @rocket.chat/ddp-streamer test— 32/32 passingyarn workspace @rocket.chat/ddp-streamer typecheck— cleanyarn workspace @rocket.chat/ddp-streamer lint— 0 errors (8 pre-existing warnings in untouched code)stream-room-messages, send messages, logout, reconnectddp_send_buffer_bytes,ddp_close_total{code="1013"}for one full traffic cycleusers_connected/users_loggedgauges