Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 0170cdf

Browse files
committedNov 24, 2022
fix(core): Adjust connection timeout and retry logic
Do not use the session timeout as connection timeout. This value is too large and results in a bad non-responsive server holding up the client long enough for the session to timeout. Use the KazooRetry object to use an increasing backoff timeout and cycle over all servers quickly, working around bad servers with minimal impact.
1 parent 4b13135 commit 0170cdf

File tree

3 files changed

+115
-3
lines changed

3 files changed

+115
-3
lines changed
 

‎kazoo/protocol/connection.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,9 @@ def _connect_attempt(self, host, hostip, port, retry):
614614

615615
try:
616616
self._xid = 0
617-
read_timeout, connect_timeout = self._connect(host, hostip, port)
617+
read_timeout, connect_timeout = self._connect(
618+
host, hostip, port, timeout=retry.cur_delay
619+
)
618620
read_timeout = read_timeout / 1000.0
619621
connect_timeout = connect_timeout / 1000.0
620622
retry.reset()
@@ -685,7 +687,7 @@ def _connect_attempt(self, host, hostip, port, retry):
685687
if self._socket is not None:
686688
self._socket.close()
687689

688-
def _connect(self, host, hostip, port):
690+
def _connect(self, host, hostip, port, timeout):
689691
client = self.client
690692
self.logger.info(
691693
"Connecting to %s(%s):%s, use_ssl: %r",
@@ -705,7 +707,7 @@ def _connect(self, host, hostip, port):
705707
with self._socket_error_handling():
706708
self._socket = self.handler.create_connection(
707709
address=(hostip, port),
708-
timeout=client._session_timeout / 1000.0,
710+
timeout=timeout,
709711
use_ssl=self.client.use_ssl,
710712
keyfile=self.client.keyfile,
711713
certfile=self.client.certfile,

‎kazoo/retry.py

+4
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,10 @@ def copy(self):
109109
obj.retry_exceptions = self.retry_exceptions
110110
return obj
111111

112+
@property
113+
def cur_delay(self):
114+
return self._cur_delay
115+
112116
def __call__(self, func, *args, **kwargs):
113117
"""Call a function with arguments until it completes without
114118
throwing a Kazoo exception

‎kazoo/tests/test_connection.py

+106
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
from unittest import mock
2+
3+
import pytest
4+
5+
from kazoo import retry
6+
from kazoo.handlers import threading
7+
from kazoo.protocol import connection
8+
from kazoo.protocol import states
9+
10+
11+
@mock.patch("kazoo.protocol.connection.ConnectionHandler._expand_client_hosts")
12+
def test_retry_logic(mock_expand):
13+
mock_client = mock.Mock()
14+
mock_client._state = states.KeeperState.CLOSED
15+
mock_client._session_id = None
16+
mock_client._session_passwd = b"\x00" * 16
17+
mock_client._stopped.is_set.return_value = False
18+
mock_client.handler.timeout_exception = threading.KazooTimeoutError
19+
mock_client.handler.create_connection.side_effect = (
20+
threading.KazooTimeoutError()
21+
)
22+
test_retry = retry.KazooRetry(
23+
max_tries=6,
24+
delay=1.0,
25+
backoff=2,
26+
max_delay=30.0,
27+
max_jitter=0.0,
28+
sleep_func=lambda _x: None,
29+
)
30+
test_cnx = connection.ConnectionHandler(
31+
client=mock_client,
32+
retry_sleeper=test_retry,
33+
)
34+
mock_expand.return_value = [
35+
("a", "1.1.1.1", 2181),
36+
("b", "2.2.2.2", 2181),
37+
("c", "3.3.3.3", 2181),
38+
]
39+
40+
with pytest.raises(retry.RetryFailedError):
41+
test_retry(test_cnx._connect_loop, test_retry)
42+
43+
assert mock_client.handler.create_connection.call_args_list[:3] == [
44+
mock.call(
45+
address=("1.1.1.1", 2181),
46+
timeout=1.0,
47+
use_ssl=mock.ANY,
48+
keyfile=mock.ANY,
49+
certfile=mock.ANY,
50+
ca=mock.ANY,
51+
keyfile_password=mock.ANY,
52+
verify_certs=mock.ANY,
53+
),
54+
mock.call(
55+
address=("2.2.2.2", 2181),
56+
timeout=1.0,
57+
use_ssl=mock.ANY,
58+
keyfile=mock.ANY,
59+
certfile=mock.ANY,
60+
ca=mock.ANY,
61+
keyfile_password=mock.ANY,
62+
verify_certs=mock.ANY,
63+
),
64+
mock.call(
65+
address=("3.3.3.3", 2181),
66+
timeout=1.0,
67+
use_ssl=mock.ANY,
68+
keyfile=mock.ANY,
69+
certfile=mock.ANY,
70+
ca=mock.ANY,
71+
keyfile_password=mock.ANY,
72+
verify_certs=mock.ANY,
73+
),
74+
]
75+
assert mock_client.handler.create_connection.call_args_list[-3:] == [
76+
mock.call(
77+
address=("1.1.1.1", 2181),
78+
timeout=30.0,
79+
use_ssl=mock.ANY,
80+
keyfile=mock.ANY,
81+
certfile=mock.ANY,
82+
ca=mock.ANY,
83+
keyfile_password=mock.ANY,
84+
verify_certs=mock.ANY,
85+
),
86+
mock.call(
87+
address=("2.2.2.2", 2181),
88+
timeout=30.0,
89+
use_ssl=mock.ANY,
90+
keyfile=mock.ANY,
91+
certfile=mock.ANY,
92+
ca=mock.ANY,
93+
keyfile_password=mock.ANY,
94+
verify_certs=mock.ANY,
95+
),
96+
mock.call(
97+
address=("3.3.3.3", 2181),
98+
timeout=30.0,
99+
use_ssl=mock.ANY,
100+
keyfile=mock.ANY,
101+
certfile=mock.ANY,
102+
ca=mock.ANY,
103+
keyfile_password=mock.ANY,
104+
verify_certs=mock.ANY,
105+
),
106+
]

0 commit comments

Comments
 (0)
Please sign in to comment.