Skip to content

Commit 7e84758

Browse files
SentinelManagedConnection searches for new master upon connection failure (#3560)
1 parent 513c8d0 commit 7e84758

File tree

6 files changed

+74
-22
lines changed

6 files changed

+74
-22
lines changed

redis/asyncio/connection.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -295,13 +295,18 @@ async def connect(self):
295295
"""Connects to the Redis server if not already connected"""
296296
await self.connect_check_health(check_health=True)
297297

298-
async def connect_check_health(self, check_health: bool = True):
298+
async def connect_check_health(
299+
self, check_health: bool = True, retry_socket_connect: bool = True
300+
):
299301
if self.is_connected:
300302
return
301303
try:
302-
await self.retry.call_with_retry(
303-
lambda: self._connect(), lambda error: self.disconnect()
304-
)
304+
if retry_socket_connect:
305+
await self.retry.call_with_retry(
306+
lambda: self._connect(), lambda error: self.disconnect()
307+
)
308+
else:
309+
await self._connect()
305310
except asyncio.CancelledError:
306311
raise # in 3.7 and earlier, this is an Exception, not BaseException
307312
except (socket.timeout, asyncio.TimeoutError):

redis/asyncio/sentinel.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,12 @@
1111
SSLConnection,
1212
)
1313
from redis.commands import AsyncSentinelCommands
14-
from redis.exceptions import ConnectionError, ReadOnlyError, ResponseError, TimeoutError
15-
from redis.utils import str_if_bytes
14+
from redis.exceptions import (
15+
ConnectionError,
16+
ReadOnlyError,
17+
ResponseError,
18+
TimeoutError,
19+
)
1620

1721

1822
class MasterNotFoundError(ConnectionError):
@@ -37,11 +41,10 @@ def __repr__(self):
3741

3842
async def connect_to(self, address):
3943
self.host, self.port = address
40-
await super().connect()
41-
if self.connection_pool.check_connection:
42-
await self.send_command("PING")
43-
if str_if_bytes(await self.read_response()) != "PONG":
44-
raise ConnectionError("PING failed")
44+
await self.connect_check_health(
45+
check_health=self.connection_pool.check_connection,
46+
retry_socket_connect=False,
47+
)
4548

4649
async def _connect_retry(self):
4750
if self._reader:

redis/connection.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -378,13 +378,18 @@ def connect(self):
378378
"Connects to the Redis server if not already connected"
379379
self.connect_check_health(check_health=True)
380380

381-
def connect_check_health(self, check_health: bool = True):
381+
def connect_check_health(
382+
self, check_health: bool = True, retry_socket_connect: bool = True
383+
):
382384
if self._sock:
383385
return
384386
try:
385-
sock = self.retry.call_with_retry(
386-
lambda: self._connect(), lambda error: self.disconnect(error)
387-
)
387+
if retry_socket_connect:
388+
sock = self.retry.call_with_retry(
389+
lambda: self._connect(), lambda error: self.disconnect(error)
390+
)
391+
else:
392+
sock = self._connect()
388393
except socket.timeout:
389394
raise TimeoutError("Timeout connecting to server")
390395
except OSError as e:

redis/sentinel.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,12 @@
55
from redis.client import Redis
66
from redis.commands import SentinelCommands
77
from redis.connection import Connection, ConnectionPool, SSLConnection
8-
from redis.exceptions import ConnectionError, ReadOnlyError, ResponseError, TimeoutError
9-
from redis.utils import str_if_bytes
8+
from redis.exceptions import (
9+
ConnectionError,
10+
ReadOnlyError,
11+
ResponseError,
12+
TimeoutError,
13+
)
1014

1115

1216
class MasterNotFoundError(ConnectionError):
@@ -35,11 +39,11 @@ def __repr__(self):
3539

3640
def connect_to(self, address):
3741
self.host, self.port = address
38-
super().connect()
39-
if self.connection_pool.check_connection:
40-
self.send_command("PING")
41-
if str_if_bytes(self.read_response()) != "PONG":
42-
raise ConnectionError("PING failed")
42+
43+
self.connect_check_health(
44+
check_health=self.connection_pool.check_connection,
45+
retry_socket_connect=False,
46+
)
4347

4448
def _connect_retry(self):
4549
if self._sock:

tests/test_asyncio/test_sentinel_managed_connection.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,5 @@ async def mock_connect():
3333
conn._connect.side_effect = mock_connect
3434
await conn.connect()
3535
assert conn._connect.call_count == 3
36+
assert connection_pool.get_master_address.call_count == 3
3637
await conn.disconnect()
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import socket
2+
3+
from redis.retry import Retry
4+
from redis.sentinel import SentinelManagedConnection
5+
from redis.backoff import NoBackoff
6+
from unittest import mock
7+
8+
9+
def test_connect_retry_on_timeout_error(master_host):
10+
"""Test that the _connect function is retried in case of a timeout"""
11+
connection_pool = mock.Mock()
12+
connection_pool.get_master_address = mock.Mock(
13+
return_value=(master_host[0], master_host[1])
14+
)
15+
conn = SentinelManagedConnection(
16+
retry_on_timeout=True,
17+
retry=Retry(NoBackoff(), 3),
18+
connection_pool=connection_pool,
19+
)
20+
origin_connect = conn._connect
21+
conn._connect = mock.Mock()
22+
23+
def mock_connect():
24+
# connect only on the last retry
25+
if conn._connect.call_count <= 2:
26+
raise socket.timeout
27+
else:
28+
return origin_connect()
29+
30+
conn._connect.side_effect = mock_connect
31+
conn.connect()
32+
assert conn._connect.call_count == 3
33+
assert connection_pool.get_master_address.call_count == 3
34+
conn.disconnect()

0 commit comments

Comments
 (0)