Skip to content

Commit 0980f1b

Browse files
committed
Fix memory leaks in Driver class
- Remove waitForChannelReady from constructor to prevent Promise chain memory leaks - Add lazy initialization for discovery client (only created when needed) - Implement channel connectivity check in ready() method for non-discovery mode - Add test for memory leak prevention (100,000 Driver instances) - Use Promise.withResolvers() for cleaner promise handling - Properly handle AbortSignal cancellation in channel connectivity loop - Import ConnectivityState enum for type-safe state checks The main issue was that waitForChannelReady() in constructor created Promise chains with closures that held references to 'this', preventing garbage collection. Now channel connectivity is checked lazily only when ready() is called, and the promise chain is properly managed."
1 parent 46eff19 commit 0980f1b

File tree

4 files changed

+103
-34
lines changed

4 files changed

+103
-34
lines changed

packages/core/src/driver.test.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,30 @@ test('allows custom channel options override', () => {
5656

5757
driver.close()
5858
})
59+
60+
test('creating thousands of drivers with using does not leak memory', async () => {
61+
let iterations = 100000
62+
let memoryBefore = process.memoryUsage().heapUsed
63+
64+
for (let i = 0; i < iterations; i++) {
65+
using _driver = new Driver('grpc://localhost:2136/local', {
66+
'ydb.sdk.enable_discovery': false,
67+
})
68+
69+
if (i % 1000 === 0 && i > 0) {
70+
if (global.gc) {
71+
global.gc()
72+
}
73+
}
74+
}
75+
76+
if (global.gc) {
77+
global.gc()
78+
}
79+
80+
let memoryAfter = process.memoryUsage().heapUsed
81+
let memoryGrowth = memoryAfter - memoryBefore
82+
let memoryGrowthMB = memoryGrowth / (1024 * 1024)
83+
84+
expect(memoryGrowthMB).toBeLessThan(50)
85+
})

packages/core/src/driver.ts

Lines changed: 64 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import { loggers } from '@ydbjs/debug'
1212
import { YDBError } from '@ydbjs/error'
1313
import { type RetryConfig, defaultRetryConfig, retry } from '@ydbjs/retry'
1414
import {
15+
type Channel,
1516
type ChannelOptions,
1617
type Client,
1718
ClientError,
@@ -21,13 +22,13 @@ import {
2122
Status,
2223
composeClientMiddleware,
2324
createClientFactory,
24-
waitForChannelReady,
2525
} from 'nice-grpc'
2626
import pkg from '../package.json' with { type: 'json' }
2727
import { type Connection, LazyConnection } from './conn.js'
2828
import { debug } from './middleware.js'
2929
import { ConnectionPool } from './pool.js'
3030
import { detectRuntime } from './runtime.js'
31+
import { ConnectivityState } from '@grpc/grpc-js/build/src/connectivity-state.js'
3132

3233
let dbg = loggers.driver
3334

@@ -170,24 +171,11 @@ export class Driver implements Disposable {
170171

171172
this.#pool = new ConnectionPool(channelCredentials, this.options.channelOptions)
172173

173-
this.#discoveryClient = createClientFactory()
174-
.use(this.#middleware)
175-
.create(DiscoveryServiceDefinition, this.#connection.channel)
176-
177174
if (this.options['ydb.sdk.enable_discovery'] === false) {
178175
dbg.log('discovery disabled, using single endpoint')
179-
waitForChannelReady(
180-
this.#connection.channel,
181-
new Date(Date.now() + (this.options['ydb.sdk.ready_timeout_ms'] || 10000))
182-
)
183-
.then(() => {
184-
dbg.log('single endpoint ready')
185-
return this.#ready.resolve()
186-
})
187-
.catch((error) => {
188-
dbg.log('single endpoint failed to become ready: %O', error)
189-
this.#ready.reject(error)
190-
})
176+
// Channel will be lazily created on first use
177+
// Readiness check is skipped to avoid memory leaks from Promise chains
178+
this.#ready.resolve()
191179
}
192180

193181
if (this.options['ydb.sdk.enable_discovery'] === true) {
@@ -242,6 +230,16 @@ export class Driver implements Disposable {
242230
return this.cs.protocol === 'https:' || this.cs.protocol === 'grpcs:'
243231
}
244232

233+
get #getDiscoveryClient(): Client<typeof DiscoveryServiceDefinition> {
234+
if (this.#discoveryClient === null) {
235+
dbg.log('creating discovery client')
236+
this.#discoveryClient = createClientFactory()
237+
.use(this.#middleware)
238+
.create(DiscoveryServiceDefinition, this.#connection.channel)
239+
}
240+
return this.#discoveryClient
241+
}
242+
245243
async #discovery(signal: AbortSignal): Promise<void> {
246244
dbg.log('starting discovery for database: %s', this.database)
247245

@@ -255,7 +253,7 @@ export class Driver implements Disposable {
255253

256254
let result = await retry(retryConfig, async (signal) => {
257255
dbg.log('attempting to list endpoints for database: %s', this.database)
258-
let response = await this.#discoveryClient.listEndpoints({ database: this.database }, { signal })
256+
let response = await this.#getDiscoveryClient.listEndpoints({ database: this.database }, { signal })
259257
if (!response.operation) {
260258
throw new ClientError(
261259
DiscoveryServiceDefinition.listEndpoints.path,
@@ -289,19 +287,63 @@ export class Driver implements Disposable {
289287

290288
async ready(signal?: AbortSignal): Promise<void> {
291289
dbg.log('waiting for driver to become ready')
292-
signal = signal
293-
? AbortSignal.any([signal, AbortSignal.timeout(this.options['ydb.sdk.ready_timeout_ms']!)])
294-
: AbortSignal.timeout(this.options['ydb.sdk.ready_timeout_ms']!)
290+
291+
let timeoutMs = this.options['ydb.sdk.ready_timeout_ms']!
292+
let effectiveSignal = signal
293+
? AbortSignal.any([signal, AbortSignal.timeout(timeoutMs)])
294+
: AbortSignal.timeout(timeoutMs)
295295

296296
try {
297-
await abortable(signal, this.#ready.promise)
297+
await abortable(effectiveSignal, this.#ready.promise)
298+
299+
if (this.options['ydb.sdk.enable_discovery'] === false) {
300+
dbg.log('checking channel connectivity for single endpoint mode')
301+
await this.#checkChannelConnectivity(this.#connection.channel, timeoutMs, effectiveSignal)
302+
}
303+
298304
dbg.log('driver is ready')
299305
} catch (error) {
300306
dbg.log('driver failed to become ready: %O', error)
301307
throw error
302308
}
303309
}
304310

311+
async #checkChannelConnectivity(channel: Channel, timeoutMs: number, signal: AbortSignal): Promise<void> {
312+
let deadline = new Date(Date.now() + timeoutMs)
313+
314+
while (true) {
315+
if (signal.aborted) {
316+
throw signal.reason || new Error('Aborted while waiting for channel connectivity')
317+
}
318+
319+
let state = channel.getConnectivityState(true) // true = try to connect
320+
dbg.log('channel connectivity state: %d', state)
321+
322+
if (state === ConnectivityState.READY) {
323+
dbg.log('channel is ready')
324+
return
325+
}
326+
327+
if (state === ConnectivityState.SHUTDOWN) {
328+
throw new Error('Channel is shutdown')
329+
}
330+
331+
let { promise, resolve, reject } = Promise.withResolvers<void>()
332+
channel.watchConnectivityState(state, deadline, (err?: Error) => {
333+
if (err) {
334+
dbg.log('channel connectivity state change timeout: %O', err)
335+
reject(err)
336+
} else {
337+
dbg.log('channel connectivity state changed')
338+
resolve()
339+
}
340+
})
341+
342+
// oxlint-disable-next-line no-await-in-loop
343+
await abortable(signal, promise)
344+
}
345+
}
346+
305347
close(): void {
306348
dbg.log('closing driver')
307349
if (this.#rediscoverTimer) {

vitest.config.ts

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,27 @@ import { defineConfig } from 'vitest/config'
22

33
export default defineConfig({
44
test: {
5+
execArgv: ['--expose-gc'],
6+
vmMemoryLimit: '300Mb',
57
projects: [
68
{
79
test: {
810
name: {
911
label: 'uni',
10-
color: 'yellow'
12+
color: 'yellow',
1113
},
1214
include: ['packages/*/src/**/*.test.ts'],
1315
environment: 'node',
1416
benchmark: {
15-
include: ['packages/*/src/**/*.bench.ts']
16-
}
17+
include: ['packages/*/src/**/*.bench.ts'],
18+
},
1719
},
1820
},
1921
{
2022
test: {
2123
name: {
2224
label: 'int',
23-
color: 'blue'
25+
color: 'blue',
2426
},
2527
include: ['packages/*/tests/**/*.test.ts'],
2628
environment: 'node',
@@ -34,7 +36,7 @@ export default defineConfig({
3436
test: {
3537
name: {
3638
label: 'e2e',
37-
color: 'magenta'
39+
color: 'magenta',
3840
},
3941
include: ['e2e/**/*.test.ts'],
4042
environment: 'node',
@@ -43,8 +45,8 @@ export default defineConfig({
4345
hookTimeout: 30000,
4446
maxConcurrency: 1,
4547
benchmark: {
46-
include: ['e2e/**/*.bench.ts']
47-
}
48+
include: ['e2e/**/*.bench.ts'],
49+
},
4850
},
4951
},
5052
],
@@ -59,7 +61,7 @@ export default defineConfig({
5961
'**/vitest.*',
6062
'**/*.test.ts',
6163
'**/*.bench.ts',
62-
]
63-
}
64+
],
65+
},
6466
},
6567
})

vitest.setup.ydb.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,8 @@ export async function setup(project: TestProject) {
3232
return
3333
}
3434

35-
let ports = ['2135', '2136', '8765', '9092'].map((port) => `--publish ${port}`).join(' ')
36-
3735
// prettier-ignore
38-
let container = await $`docker run --rm --detach --hostname localhost --platform linux/amd64 ${ports} ydbplatform/local-ydb:25.2`.text()
36+
let container = await $`docker run --rm --detach --hostname localhost --platform linux/amd64 --publish 2135:2135 --publish 2136:2136 --publish 8765:8765 --publish 9092:9092 ydbplatform/local-ydb:25.2.1`.text()
3937
containerID = container.trim()
4038

4139
let signal = AbortSignal.timeout(30 * 1000)

0 commit comments

Comments
 (0)