feat(automation): add EventTrigger for Soroban event monitoring#327
feat(automation): add EventTrigger for Soroban event monitoring#327salazarsebas wants to merge 1 commit into
Conversation
Implement EventTrigger to monitor Soroban contract events via RPC polling, filter by contract ID and topics, invoke callbacks on matching events, and gracefully reconnect after RPC connection dropouts. Closes Galaxy-KJ#302
📝 WalkthroughWalkthroughAdds a new ChangesEventTrigger: Soroban event-based automation trigger
Sequence Diagram(s)sequenceDiagram
participant Caller
participant EventTrigger
participant ContractEventMonitor
Caller->>EventTrigger: startListening(filter, callback)
EventTrigger->>ContractEventMonitor: subscribeToEvents(subscription)
ContractEventMonitor-->>EventTrigger: subscriptionId
ContractEventMonitor->>EventTrigger: onEvent(ContractEventDetail)
EventTrigger->>EventTrigger: matchesFilter(event, filter)
EventTrigger-->>Caller: callback(event)
ContractEventMonitor->>EventTrigger: onError(err)
EventTrigger->>ContractEventMonitor: unsubscribe(subscriptionId)
EventTrigger->>EventTrigger: handleConnectionError → scheduleReconnect
EventTrigger->>EventTrigger: attemptReconnect → subscribe
Caller->>EventTrigger: stopListening()
EventTrigger->>ContractEventMonitor: unsubscribe(subscriptionId)
EventTrigger->>EventTrigger: clearReconnectTimer, reset state
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✨ Finishing Touches🧪 Generate unit tests (beta)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 ESLint
packages/core/automation/index.tsOops! Something went wrong! :( ESLint: 9.39.2 YAMLException: Cannot read config file: /.eslintrc.js.bak 1 | module.exports = { packages/core/automation/jest.config.cjsOops! Something went wrong! :( ESLint: 9.39.2 YAMLException: Cannot read config file: /.eslintrc.js.bak 1 | module.exports = { packages/core/automation/src/test/event-trigger.test.tsOops! Something went wrong! :( ESLint: 9.39.2 YAMLException: Cannot read config file: /.eslintrc.js.bak 1 | module.exports = {
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@packages/core/automation/src/triggers/event-trigger.ts`:
- Around line 18-22: `EventTriggerOptions` is exported with reconnect settings,
but `EventTrigger` does not accept or use them, so the public API and
implementation are out of sync. Update the `EventTrigger` constructor (and any
call sites/defaults inside `event-trigger.ts`) to accept the full
`EventTriggerOptions` object and apply `maxReconnectAttempts` and
`reconnectDelayMs` when configuring reconnection behavior, or remove those
unused fields from `EventTriggerOptions` if they are not meant to be supported.
Keep the `EventTriggerOptions` and `EventTrigger` symbols aligned so the exposed
API matches what the class actually uses.
- Around line 61-66: In startListening() on EventTrigger, the trigger state is
being committed before subscribe() succeeds, so a failed first subscribe leaves
isListening/currentFilter/callback set and can still leave reconnect handling
active. Update startListening() to only mark the trigger active after
subscribe(filter, callback) completes, and if subscribe throws, roll back any
partial state and clear reconnectAttempts so the instance stays idle. Use the
EventTrigger methods startListening() and subscribe() to locate the fix.
- Around line 101-120: The subscribe flow in EventTrigger.subscribe can complete
after stopListening() has already been called, leaving a leaked listener and
stale callback. Update the subscription lifecycle so late resolves are ignored:
check the trigger’s stopped/listening state before assigning this.subscriptionId
and before wiring the callback path, and make sure a resolved subscribeToEvents
result is immediately unsubscribed if stopListening() has already run. Use the
existing subscribe(), stopListening(), and this.subscriptionId handling in
EventTrigger to locate the fix.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 289e01bc-80df-4da8-822c-0ad57b90cae5
📒 Files selected for processing (5)
packages/core/automation/index.tspackages/core/automation/jest.config.cjspackages/core/automation/src/test/event-trigger.test.tspackages/core/automation/src/triggers/event-trigger.tspackages/core/automation/tsconfig.json
| export interface EventTriggerOptions { | ||
| rpcUrl?: string; | ||
| maxReconnectAttempts?: number; | ||
| reconnectDelayMs?: number; | ||
| } |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟠 Major | ⚡ Quick win
Wire EventTriggerOptions into the constructor or drop the export.
EventTriggerOptions advertises configurable maxReconnectAttempts and reconnectDelayMs, but the constructor only accepts rpcUrl and always uses the hardcoded defaults. Consumers cannot actually tune reconnection behavior, so the new public API does not match the shipped implementation.
Suggested fix
- constructor(
- rpcUrl?: string,
- monitor?: ContractEventMonitor
- ) {
+ constructor(
+ optionsOrRpcUrl?: string | EventTriggerOptions,
+ monitor?: ContractEventMonitor
+ ) {
+ const resolvedOptions =
+ typeof optionsOrRpcUrl === 'string'
+ ? { rpcUrl: optionsOrRpcUrl }
+ : (optionsOrRpcUrl ?? {});
+
this.options = {
- rpcUrl: rpcUrl ?? DEFAULT_RPC_URL,
- maxReconnectAttempts: DEFAULT_MAX_RECONNECT_ATTEMPTS,
- reconnectDelayMs: DEFAULT_RECONNECT_DELAY_MS,
+ rpcUrl: resolvedOptions.rpcUrl ?? DEFAULT_RPC_URL,
+ maxReconnectAttempts:
+ resolvedOptions.maxReconnectAttempts ?? DEFAULT_MAX_RECONNECT_ATTEMPTS,
+ reconnectDelayMs:
+ resolvedOptions.reconnectDelayMs ?? DEFAULT_RECONNECT_DELAY_MS,
};
this.monitor = monitor ?? new ContractEventMonitor(this.options.rpcUrl);
}Also applies to: 38-47
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/core/automation/src/triggers/event-trigger.ts` around lines 18 - 22,
`EventTriggerOptions` is exported with reconnect settings, but `EventTrigger`
does not accept or use them, so the public API and implementation are out of
sync. Update the `EventTrigger` constructor (and any call sites/defaults inside
`event-trigger.ts`) to accept the full `EventTriggerOptions` object and apply
`maxReconnectAttempts` and `reconnectDelayMs` when configuring reconnection
behavior, or remove those unused fields from `EventTriggerOptions` if they are
not meant to be supported. Keep the `EventTriggerOptions` and `EventTrigger`
symbols aligned so the exposed API matches what the class actually uses.
| this.currentFilter = filter; | ||
| this.callback = callback; | ||
| this.isListening = true; | ||
| this.reconnectAttempts = 0; | ||
|
|
||
| await this.subscribe(filter, callback); |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟠 Major | ⚡ Quick win
Roll back state when the first subscription attempt fails.
startListening() marks the trigger active before awaiting subscribe(). If the initial subscribeToEvents() throws, the promise rejects but isListening, currentFilter, and callback stay set. With the current ContractEventMonitor contract, that failed subscribe can also fire onError before rethrowing, so you end up with a rejected startListening() call and a background reconnect timer still running.
Suggested fix
this.currentFilter = filter;
this.callback = callback;
this.isListening = true;
this.reconnectAttempts = 0;
- await this.subscribe(filter, callback);
+ try {
+ await this.subscribe(filter, callback);
+ } catch (error) {
+ this.clearReconnectTimer();
+ this.subscriptionId = undefined;
+ this.isListening = false;
+ this.callback = undefined;
+ this.currentFilter = undefined;
+ this.reconnectAttempts = 0;
+ throw error;
+ }
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| this.currentFilter = filter; | |
| this.callback = callback; | |
| this.isListening = true; | |
| this.reconnectAttempts = 0; | |
| await this.subscribe(filter, callback); | |
| this.currentFilter = filter; | |
| this.callback = callback; | |
| this.isListening = true; | |
| this.reconnectAttempts = 0; | |
| try { | |
| await this.subscribe(filter, callback); | |
| } catch (error) { | |
| this.clearReconnectTimer(); | |
| this.subscriptionId = undefined; | |
| this.isListening = false; | |
| this.callback = undefined; | |
| this.currentFilter = undefined; | |
| this.reconnectAttempts = 0; | |
| throw error; | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/core/automation/src/triggers/event-trigger.ts` around lines 61 - 66,
In startListening() on EventTrigger, the trigger state is being committed before
subscribe() succeeds, so a failed first subscribe leaves
isListening/currentFilter/callback set and can still leave reconnect handling
active. Update startListening() to only mark the trigger active after
subscribe(filter, callback) completes, and if subscribe throws, roll back any
partial state and clear reconnectAttempts so the instance stays idle. Use the
EventTrigger methods startListening() and subscribe() to locate the fix.
| private async subscribe( | ||
| filter: EventFilter, | ||
| callback: (event: ContractEventDetail) => void | ||
| ): Promise<void> { | ||
| const subscription: EventSubscription = { | ||
| id: '', | ||
| contractId: filter.contractId, | ||
| eventTypes: filter.topics.length > 0 ? filter.topics : undefined, | ||
| onEvent: (event: ContractEventDetail) => { | ||
| if (this.matchesFilter(event, filter)) { | ||
| callback(event); | ||
| } | ||
| }, | ||
| onError: (error: Error) => { | ||
| this.handleConnectionError(error); | ||
| }, | ||
| }; | ||
|
|
||
| this.subscriptionId = await this.monitor.subscribeToEvents(subscription); | ||
| } |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟠 Major | ⚡ Quick win
Guard late subscription completions after stopListening().
This method stores subscriptionId only after the awaited RPC call returns. If stopListening() runs while that call is in flight, it has nothing to unsubscribe; when the promise resolves, a live subscription is leaked and the closed-over callback can still fire even though the trigger was stopped.
Suggested fix
export class EventTrigger {
+ private listenGeneration = 0;
private readonly monitor: ContractEventMonitor;
private readonly options: Required<EventTriggerOptions>;
private subscriptionId?: string;
@@
async startListening(
filter: EventFilter,
callback: (event: ContractEventDetail) => void
): Promise<void> {
+ const generation = ++this.listenGeneration;
if (this.isListening) {
throw new Error('EventTrigger is already listening');
}
@@
- await this.subscribe(filter, callback);
+ await this.subscribe(filter, callback, generation);
}
@@
stopListening(): void {
+ this.listenGeneration += 1;
this.clearReconnectTimer();
@@
private async subscribe(
filter: EventFilter,
- callback: (event: ContractEventDetail) => void
+ callback: (event: ContractEventDetail) => void,
+ generation: number
): Promise<void> {
@@
- this.subscriptionId = await this.monitor.subscribeToEvents(subscription);
+ const subscriptionId = await this.monitor.subscribeToEvents(subscription);
+ if (!this.isListening || generation !== this.listenGeneration) {
+ this.monitor.unsubscribe(subscriptionId);
+ return;
+ }
+ this.subscriptionId = subscriptionId;
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| private async subscribe( | |
| filter: EventFilter, | |
| callback: (event: ContractEventDetail) => void | |
| ): Promise<void> { | |
| const subscription: EventSubscription = { | |
| id: '', | |
| contractId: filter.contractId, | |
| eventTypes: filter.topics.length > 0 ? filter.topics : undefined, | |
| onEvent: (event: ContractEventDetail) => { | |
| if (this.matchesFilter(event, filter)) { | |
| callback(event); | |
| } | |
| }, | |
| onError: (error: Error) => { | |
| this.handleConnectionError(error); | |
| }, | |
| }; | |
| this.subscriptionId = await this.monitor.subscribeToEvents(subscription); | |
| } | |
| private async subscribe( | |
| filter: EventFilter, | |
| callback: (event: ContractEventDetail) => void, | |
| generation: number | |
| ): Promise<void> { | |
| const subscription: EventSubscription = { | |
| id: '', | |
| contractId: filter.contractId, | |
| eventTypes: filter.topics.length > 0 ? filter.topics : undefined, | |
| onEvent: (event: ContractEventDetail) => { | |
| if (this.matchesFilter(event, filter)) { | |
| callback(event); | |
| } | |
| }, | |
| onError: (error: Error) => { | |
| this.handleConnectionError(error); | |
| }, | |
| }; | |
| const subscriptionId = await this.monitor.subscribeToEvents(subscription); | |
| if (!this.isListening || generation !== this.listenGeneration) { | |
| this.monitor.unsubscribe(subscriptionId); | |
| return; | |
| } | |
| this.subscriptionId = subscriptionId; | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/core/automation/src/triggers/event-trigger.ts` around lines 101 -
120, The subscribe flow in EventTrigger.subscribe can complete after
stopListening() has already been called, leaving a leaked listener and stale
callback. Update the subscription lifecycle so late resolves are ignored: check
the trigger’s stopped/listening state before assigning this.subscriptionId and
before wiring the callback path, and make sure a resolved subscribeToEvents
result is immediately unsubscribed if stopListening() has already run. Use the
existing subscribe(), stopListening(), and this.subscriptionId handling in
EventTrigger to locate the fix.
Summary
Implements
EventTriggerfor monitoring Soroban contract events and firing automation callbacks when matching events occur (e.g. Deposit, Liquidation).EventTriggerwithstartListening(filter, callback)usingContractEventMonitorfrom@galaxy-kj/core-stellar-sdk/sorobanemitSimulatedEvent()Changes
packages/core/automation/src/triggers/event-trigger.ts— new EventTrigger classpackages/core/automation/src/test/event-trigger.test.ts— unit tests (7 cases)packages/core/automation/index.ts— export EventTrigger and typespackages/core/automation/jest.config.cjs— stellar-sdk/soroban module mapper for testspackages/core/automation/tsconfig.json— soroban subpath resolutionTest plan
npx jest src/test/event-trigger.test.ts— all 7 tests passCloses #302
Summary by CodeRabbit
New Features
Bug Fixes
Tests