Skip to content

Commit 91a34ab

Browse files
authored
Merge pull request #437 Fixed leak sessions on asyncio timeout
2 parents 295fb74 + 15614e4 commit 91a34ab

File tree

6 files changed

+30
-13
lines changed

6 files changed

+30
-13
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Fixed leak sessions on asyncio timeout
2+
13
## 3.12.2 ##
24
* Added support ydb github repo with own auth protobuf
35

ydb/_topic_writer/topic_writer_asyncio.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -537,7 +537,11 @@ async def _send_loop(self, writer: "WriterAsyncIOStream"):
537537
m = await self._new_messages.get() # type: InternalMessage
538538
if m.seq_no > last_seq_no:
539539
writer.write([m])
540-
except Exception as e:
540+
except asyncio.CancelledError:
541+
# the loop task cancelled be parent code, for example for reconnection
542+
# no need to stop all work.
543+
raise
544+
except BaseException as e:
541545
self._stop(e)
542546
raise
543547

ydb/aio/credentials.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ async def _refresh(self):
8686
await asyncio.sleep(1)
8787
self._tp.submit(self._refresh)
8888

89+
except BaseException as e:
90+
self.last_error = str(e)
91+
raise
92+
8993
async def token(self):
9094
current_time = time.time()
9195
if current_time > self._refresh_in:

ydb/aio/pool.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ async def __call__(
247247
wait_timeout = settings.timeout if settings else 10
248248
try:
249249
connection = await self._store.get(preferred_endpoint, fast_fail=fast_fail, wait_timeout=wait_timeout)
250-
except Exception:
250+
except BaseException:
251251
self._discovery.notify_disconnected()
252252
raise
253253

ydb/aio/resolver.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ async def resolve(self):
3030
connection = conn_impl.Connection(endpoint, self._driver_config)
3131
try:
3232
await connection.connection_ready()
33-
except Exception:
33+
except BaseException:
3434
self._add_debug_details(
3535
'Failed to establish connection to YDB discovery endpoint: "%s". Check endpoint correctness.' % endpoint
3636
)
@@ -53,7 +53,7 @@ async def resolve(self):
5353
)
5454

5555
return resolved
56-
except Exception as e:
56+
except BaseException as e:
5757

5858
self._add_debug_details(
5959
'Failed to resolve endpoints for database %s. Endpoint: "%s". Error details:\n %s',

ydb/aio/table.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ async def retry_operation(callee, retry_settings=None, *args, **kwargs): # pyli
221221
else:
222222
try:
223223
return await next_opt.result
224-
except Exception as e: # pylint: disable=W0703
224+
except BaseException as e: # pylint: disable=W0703
225225
next_opt.set_exception(e)
226226

227227

@@ -236,7 +236,7 @@ def __init__(self, pool, timeout, retry_timeout):
236236
:param blocking: A flag that specifies that session acquire method should blocks
237237
:param timeout: A timeout in seconds for session acquire
238238
"""
239-
self._pool = pool
239+
self._pool: SessionPool = pool
240240
self._acquired = None
241241
self._timeout = timeout
242242
self._retry_timeout = retry_timeout
@@ -251,7 +251,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
251251

252252

253253
class SessionPool:
254-
def __init__(self, driver: ydb.pool.IConnectionPool, size: int, min_pool_size: int = 0):
254+
def __init__(self, driver: "ydb.aio.Driver", size: int, min_pool_size: int = 0):
255255
self._driver_await_timeout = 3
256256
self._should_stop = asyncio.Event()
257257
self._waiters = 0
@@ -286,7 +286,7 @@ async def wrapper_callee():
286286

287287
return await retry_operation(wrapper_callee, retry_settings)
288288

289-
def _create(self) -> ydb.ISession:
289+
def _create(self) -> Session:
290290
self._active_count += 1
291291
session = self._driver.table_client.session()
292292
self._logger.debug("Created session %s", session)
@@ -301,6 +301,9 @@ async def _init_session_logic(self, session: ydb.ISession) -> typing.Optional[yd
301301
self._logger.error("Failed to create session. Reason: %s", str(e))
302302
except Exception as e: # pylint: disable=W0703
303303
self._logger.exception("Failed to create session. Reason: %s", str(e))
304+
except BaseException as e: # pylint: disable=W0703
305+
self._logger.exception("Failed to create session. Reason (base exception): %s", str(e))
306+
raise
304307

305308
return None
306309

@@ -324,7 +327,7 @@ async def _prepare_session(self, timeout, retry_num) -> ydb.ISession:
324327
if not new_sess:
325328
self._destroy(session)
326329
return new_sess
327-
except Exception as e:
330+
except BaseException as e:
328331
self._destroy(session)
329332
raise e
330333

@@ -338,7 +341,7 @@ async def _get_session_from_queue(self, timeout: float):
338341
_, session = task_wait.result()
339342
return session
340343

341-
async def acquire(self, timeout: float = None, retry_timeout: float = None, retry_num: int = None) -> ydb.ISession:
344+
async def acquire(self, timeout: float = None, retry_timeout: float = None, retry_num: int = None) -> Session:
342345

343346
if self._should_stop.is_set():
344347
self._logger.error("Take session from closed session pool")
@@ -408,7 +411,10 @@ def _destroy(self, session: ydb.ISession, wait_for_del: bool = False):
408411
asyncio.ensure_future(coro)
409412
return None
410413

411-
async def release(self, session: ydb.ISession):
414+
async def release(self, session: Session):
415+
self._release_nowait(session)
416+
417+
def _release_nowait(self, session: Session):
412418
self._logger.debug("Put on session %s", session.session_id)
413419
if session.closing():
414420
self._destroy(session)
@@ -421,7 +427,8 @@ async def release(self, session: ydb.ISession):
421427
self._destroy(session)
422428
return False
423429

424-
await self._active_queue.put((time.time() + 10 * 60, session))
430+
# self._active_queue has no size limit, it means that put_nowait will be successfully always
431+
self._active_queue.put_nowait((time.time() + 10 * 60, session))
425432
self._logger.debug("Session returned to queue: %s", session.session_id)
426433

427434
async def _pick_for_keepalive(self):
@@ -445,7 +452,7 @@ async def _send_keep_alive(self, session: ydb.ISession):
445452
await session.keep_alive(self._req_settings)
446453
try:
447454
await self.release(session)
448-
except Exception: # pylint: disable=W0703
455+
except BaseException: # pylint: disable=W0703
449456
self._destroy(session)
450457

451458
async def _keep_alive_loop(self):

0 commit comments

Comments
 (0)