Skip to content

Commit 8ea7fc1

Browse files
authored
feat: subscription improvements for memory usage (#71)
* feat: subscription improvements for memory usage * chore: update following review and bump version
1 parent ac2fbd3 commit 8ea7fc1

6 files changed

Lines changed: 287 additions & 149 deletions

File tree

package-lock.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "zkverifyjs",
3-
"version": "2.0.0",
3+
"version": "2.1.0",
44
"description": "Submit proofs to zkVerify and query proof state with ease using our npm package.",
55
"author": "zkVerify <web3-platform@zkverify.io>",
66
"license": "GPL-3.0",

src/api/aggregation/index.test.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,7 @@ describe('subscribeToNewAggregationReceipts', () => {
142142
phase: expectedPhase,
143143
}),
144144
);
145-
expect(emitSpy).not.toHaveBeenCalledWith(ZkVerifyEvents.Unsubscribe);
146-
expect(mockApiUnsubscribe).not.toHaveBeenCalled();
145+
expect(mockApiUnsubscribe).toHaveBeenCalled();
147146
});
148147

149148
it('should reject immediately if aggregationId is provided without domainId', async () => {

src/api/aggregation/index.ts

Lines changed: 149 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,52 @@ export async function subscribeToNewAggregationReceipts(
3232
let domainId: string | undefined = undefined;
3333
let aggregationId: string | undefined = undefined;
3434
let timeoutId: NodeJS.Timeout | undefined;
35+
let unsubscribeFinalizedHeads: (() => void) | undefined;
36+
let isResolved = false;
37+
let isRejected = false;
38+
39+
const cleanup = () => {
40+
if (timeoutId) {
41+
clearTimeout(timeoutId);
42+
timeoutId = undefined;
43+
}
44+
if (unsubscribeFinalizedHeads) {
45+
try {
46+
unsubscribeFinalizedHeads();
47+
} catch (err) {
48+
console.debug('Error during finalized heads cleanup:', err);
49+
}
50+
unsubscribeFinalizedHeads = undefined;
51+
}
52+
};
53+
54+
const safeResolve = (value: EventEmitter) => {
55+
if (!isResolved && !isRejected) {
56+
isResolved = true;
57+
cleanup();
58+
resolve(value);
59+
}
60+
};
61+
62+
const safeReject = (error: unknown) => {
63+
if (!isResolved && !isRejected) {
64+
isRejected = true;
65+
cleanup();
66+
reject(error);
67+
}
68+
};
69+
70+
(emitter as EventEmitter & { _cleanup?: () => void })._cleanup = cleanup;
71+
72+
emitter.once(ZkVerifyEvents.Unsubscribe, cleanup);
3573

3674
if (options) {
3775
domainId = options.domainId?.toString().trim();
3876
if ('aggregationId' in options) {
3977
aggregationId = options.aggregationId?.toString().trim();
4078

4179
if (!domainId) {
42-
reject(
80+
safeReject(
4381
new Error(
4482
'Cannot filter by aggregationId without also providing domainId.',
4583
),
@@ -57,7 +95,7 @@ export async function subscribeToNewAggregationReceipts(
5795

5896
timeoutId = setTimeout(() => {
5997
unsubscribe(emitter);
60-
reject(
98+
safeReject(
6199
new Error(
62100
`Timeout exceeded: No event received within ${timeoutValue} ms`,
63101
),
@@ -66,83 +104,114 @@ export async function subscribeToNewAggregationReceipts(
66104
}
67105

68106
try {
69-
api.rpc.chain.subscribeFinalizedHeads(async (header) => {
70-
const blockHash = header.hash.toHex();
71-
const apiAt = await api.at(blockHash);
72-
const events =
73-
(await apiAt.query.system.events()) as unknown as Vec<EventRecord>;
74-
75-
events.forEach((record) => {
76-
const { event, phase } = record;
77-
78-
if (
79-
event.section === 'aggregate' &&
80-
event.method === 'NewAggregationReceipt'
81-
) {
82-
let currentDomainId: string | undefined;
83-
let currentAggregationId: string | undefined;
84-
85-
const eventData = event.data.toHuman
86-
? event.data.toHuman()
87-
: Array.from(event.data as Iterable<Codec>, (item: Codec) =>
88-
item.toString(),
89-
);
90-
91-
const eventObject = {
92-
event: ZkVerifyEvents.NewAggregationReceipt,
93-
blockHash,
94-
data: eventData,
95-
phase:
96-
phase && typeof phase.toJSON === 'function'
97-
? phase.toJSON()
98-
: phase?.toString() || '',
99-
};
100-
101-
try {
102-
currentDomainId = event.data[0]?.toString();
103-
currentAggregationId = event.data[1]?.toString();
104-
105-
if (!currentDomainId || !currentAggregationId) {
106-
reject(
107-
new Error(
108-
'Event data is missing required fields: domainId or aggregationId.',
109-
),
110-
);
107+
const subscriptionResult = api.rpc.chain.subscribeFinalizedHeads(
108+
async (header) => {
109+
const blockHash = header.hash.toHex();
110+
const apiAt = await api.at(blockHash);
111+
const events =
112+
(await apiAt.query.system.events()) as unknown as Vec<EventRecord>;
113+
114+
events.forEach((record) => {
115+
const { event, phase } = record;
116+
117+
if (
118+
event.section === 'aggregate' &&
119+
event.method === 'NewAggregationReceipt'
120+
) {
121+
let currentDomainId: string | undefined;
122+
let currentAggregationId: string | undefined;
123+
124+
const eventData = event.data.toHuman
125+
? event.data.toHuman()
126+
: Array.from(event.data as Iterable<Codec>, (item: Codec) =>
127+
item.toString(),
128+
);
129+
130+
const eventObject = {
131+
event: ZkVerifyEvents.NewAggregationReceipt,
132+
blockHash,
133+
data: eventData,
134+
phase:
135+
phase && typeof phase.toJSON === 'function'
136+
? phase.toJSON()
137+
: phase?.toString() || '',
138+
};
139+
140+
try {
141+
currentDomainId = event.data[0]?.toString();
142+
currentAggregationId = event.data[1]?.toString();
143+
144+
if (!currentDomainId || !currentAggregationId) {
145+
emitter.emit(
146+
ZkVerifyEvents.ErrorEvent,
147+
new Error(
148+
'Event data is missing required fields: domainId or aggregationId.',
149+
),
150+
);
151+
safeReject(
152+
new Error(
153+
'Event data is missing required fields: domainId or aggregationId.',
154+
),
155+
);
156+
return;
157+
}
158+
} catch (error) {
159+
emitter.emit(ZkVerifyEvents.ErrorEvent, error);
160+
safeReject(error);
111161
return;
112162
}
113-
} catch (error) {
114-
emitter.emit(ZkVerifyEvents.ErrorEvent, error);
115-
reject(error);
116-
return;
117-
}
118163

119-
if (!options || (!aggregationId && !domainId)) {
120-
emitter.emit(ZkVerifyEvents.NewAggregationReceipt, eventObject);
121-
callback(eventObject);
122-
} else if (
123-
domainId &&
124-
!aggregationId &&
125-
domainId === currentDomainId
126-
) {
127-
emitter.emit(ZkVerifyEvents.NewAggregationReceipt, eventObject);
128-
callback(eventObject);
129-
} else if (
130-
domainId === currentDomainId &&
131-
currentAggregationId === aggregationId
164+
if (!options || (!aggregationId && !domainId)) {
165+
emitter.emit(ZkVerifyEvents.NewAggregationReceipt, eventObject);
166+
callback(eventObject);
167+
} else if (
168+
domainId &&
169+
!aggregationId &&
170+
domainId === currentDomainId
171+
) {
172+
emitter.emit(ZkVerifyEvents.NewAggregationReceipt, eventObject);
173+
callback(eventObject);
174+
} else if (
175+
domainId === currentDomainId &&
176+
currentAggregationId === aggregationId
177+
) {
178+
emitter.emit(ZkVerifyEvents.NewAggregationReceipt, eventObject);
179+
callback(eventObject);
180+
cleanup();
181+
safeResolve(emitter);
182+
return;
183+
}
184+
}
185+
});
186+
},
187+
);
188+
189+
if (typeof subscriptionResult === 'function') {
190+
unsubscribeFinalizedHeads = subscriptionResult;
191+
} else if (
192+
subscriptionResult &&
193+
typeof subscriptionResult.then === 'function'
194+
) {
195+
subscriptionResult
196+
.then((unsubscribeFn: () => void) => {
197+
if (
198+
!isResolved &&
199+
!isRejected &&
200+
typeof unsubscribeFn === 'function'
132201
) {
133-
if (timeoutId) clearTimeout(timeoutId);
134-
emitter.emit(ZkVerifyEvents.NewAggregationReceipt, eventObject);
135-
callback(eventObject);
136-
resolve(emitter);
137-
return;
202+
unsubscribeFinalizedHeads = unsubscribeFn;
203+
}
204+
})
205+
.catch((error: unknown) => {
206+
if (!isResolved && !isRejected) {
207+
emitter.emit(ZkVerifyEvents.ErrorEvent, error);
208+
safeReject(error);
138209
}
139-
}
140-
});
141-
});
210+
});
211+
}
142212
} catch (error) {
143-
if (timeoutId) clearTimeout(timeoutId);
144213
emitter.emit(ZkVerifyEvents.ErrorEvent, error);
145-
reject(error);
214+
safeReject(error);
146215
}
147216

148217
return emitter;
@@ -158,6 +227,14 @@ export async function subscribeToNewAggregationReceipts(
158227
* @param {EventEmitter} emitter - The EventEmitter instance returned by the subscription.
159228
*/
160229
export function unsubscribe(emitter: EventEmitter): void {
230+
const emitterWithCleanup = emitter as EventEmitter & {
231+
_cleanup?: () => void;
232+
};
233+
if (emitterWithCleanup._cleanup) {
234+
emitterWithCleanup._cleanup();
235+
delete emitterWithCleanup._cleanup;
236+
}
237+
161238
emitter.emit(ZkVerifyEvents.Unsubscribe);
162239
emitter.removeAllListeners();
163240
}

0 commit comments

Comments
 (0)