@@ -40,14 +40,15 @@ def __init__(self, driver: common_utils.SupportedDriverType, size: int = 100):
40
40
self ._should_stop = threading .Event ()
41
41
self ._lock = threading .RLock ()
42
42
43
- def _create_new_session (self , timeout : float ):
43
+ def _create_new_session (self , timeout : Optional [ float ] ):
44
44
session = QuerySession (self ._driver )
45
45
session .create (settings = BaseRequestSettings ().with_timeout (timeout ))
46
46
logger .debug (f"New session was created for pool. Session id: { session ._state .session_id } " )
47
47
return session
48
48
49
- def acquire (self , timeout : float ) -> QuerySession :
50
- acquired = self ._lock .acquire (timeout = timeout )
49
+ def acquire (self , timeout : Optional [float ] = None ) -> QuerySession :
50
+ lock_acquire_timeout = timeout if timeout is not None else - 1
51
+ acquired = self ._lock .acquire (timeout = lock_acquire_timeout )
51
52
try :
52
53
if self ._should_stop .is_set ():
53
54
logger .error ("An attempt to take session from closed session pool." )
@@ -76,7 +77,7 @@ def acquire(self, timeout: float) -> QuerySession:
76
77
77
78
logger .debug (f"Session pool is not large enough: { self ._current_size } < { self ._size } , will create new one." )
78
79
finish = time .monotonic ()
79
- time_left = timeout - (finish - start )
80
+ time_left = timeout - (finish - start ) if timeout is not None else None
80
81
session = self ._create_new_session (time_left )
81
82
82
83
self ._current_size += 1
@@ -89,7 +90,7 @@ def release(self, session: QuerySession) -> None:
89
90
self ._queue .put_nowait (session )
90
91
logger .debug ("Session returned to queue: %s" , session ._state .session_id )
91
92
92
- def checkout (self , timeout : float = 10 ) -> "SimpleQuerySessionCheckout" :
93
+ def checkout (self , timeout : Optional [ float ] = None ) -> "SimpleQuerySessionCheckout" :
93
94
"""WARNING: This API is experimental and could be changed.
94
95
Return a Session context manager, that opens session on enter and closes session on exit.
95
96
"""
@@ -109,7 +110,7 @@ def retry_operation_sync(self, callee: Callable, retry_settings: Optional[RetryS
109
110
retry_settings = RetrySettings () if retry_settings is None else retry_settings
110
111
111
112
def wrapped_callee ():
112
- with self .checkout () as session :
113
+ with self .checkout (timeout = retry_settings . max_session_acquire_timeout ) as session :
113
114
return callee (session , * args , ** kwargs )
114
115
115
116
return retry_operation_sync (wrapped_callee , retry_settings )
@@ -137,14 +138,15 @@ def execute_with_retries(
137
138
retry_settings = RetrySettings () if retry_settings is None else retry_settings
138
139
139
140
def wrapped_callee ():
140
- with self .checkout () as session :
141
+ with self .checkout (timeout = retry_settings . max_session_acquire_timeout ) as session :
141
142
it = session .execute (query , parameters , * args , ** kwargs )
142
143
return [result_set for result_set in it ]
143
144
144
145
return retry_operation_sync (wrapped_callee , retry_settings )
145
146
146
- def stop (self , timeout = - 1 ):
147
- acquired = self ._lock .acquire (timeout = timeout )
147
+ def stop (self , timeout = None ):
148
+ acquire_timeout = timeout if timeout is not None else - 1
149
+ acquired = self ._lock .acquire (timeout = acquire_timeout )
148
150
try :
149
151
self ._should_stop .set ()
150
152
while True :
@@ -170,7 +172,7 @@ def __del__(self):
170
172
171
173
172
174
class SimpleQuerySessionCheckout :
173
- def __init__ (self , pool : QuerySessionPool , timeout : float ):
175
+ def __init__ (self , pool : QuerySessionPool , timeout : Optional [ float ] ):
174
176
self ._pool = pool
175
177
self ._timeout = timeout
176
178
self ._session = None
0 commit comments