Skip to content

Commit cdbe957

Browse files
committed
polymorphism for reset available connections instead
1 parent c75262b commit cdbe957

File tree

6 files changed

+91
-81
lines changed

6 files changed

+91
-81
lines changed

redis/asyncio/connection.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838

3939
from redis.asyncio.retry import Retry
4040
from redis.backoff import NoBackoff
41-
from redis.connection import DEFAULT_RESP_VERSION, ConnectionsIndexer
41+
from redis.connection import DEFAULT_RESP_VERSION
4242
from redis.credentials import CredentialProvider, UsernamePasswordCredentialProvider
4343
from redis.exceptions import (
4444
AuthenticationError,
@@ -1118,7 +1118,6 @@ def __init__(
11181118
self,
11191119
connection_class: Type[AbstractConnection] = Connection,
11201120
max_connections: Optional[int] = None,
1121-
index_available_connections: bool = False,
11221121
**connection_kwargs,
11231122
):
11241123
max_connections = max_connections or 2**31
@@ -1129,11 +1128,14 @@ def __init__(
11291128
self.connection_kwargs = connection_kwargs
11301129
self.max_connections = max_connections
11311130

1131+
<<<<<<< HEAD
11321132
self._available_connections: ConnectionsIndexer = (
11331133
ConnectionsIndexer() if index_available_connections else []
11341134
)
1135+
=======
1136+
self._available_connections = self.reset_available_connections()
1137+
>>>>>>> bafbc03 (polymorphism for reset available connections instead)
11351138
self._in_use_connections: Set[AbstractConnection] = set()
1136-
self._index_available_connections = index_available_connections
11371139
self.encoder_class = self.connection_kwargs.get("encoder_class", Encoder)
11381140

11391141
def __repr__(self):
@@ -1143,11 +1145,12 @@ def __repr__(self):
11431145
)
11441146

11451147
def reset(self):
1146-
self._available_connections: ConnectionsIndexer | list = (
1147-
ConnectionsIndexer() if self._index_available_connections else []
1148-
)
1148+
self._available_connections = self.reset_available_connections()
11491149
self._in_use_connections = weakref.WeakSet()
11501150

1151+
def reset_available_connections(self):
1152+
return []
1153+
11511154
def can_get_connection(self) -> bool:
11521155
"""Return True if a connection can be retrieved from the pool."""
11531156
return (

redis/asyncio/sentinel.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
)
2222
from redis.commands import AsyncSentinelCommands
2323
from redis.exceptions import ConnectionError, ReadOnlyError, ResponseError, TimeoutError
24+
from redis.sentinel import ConnectionsIndexer
2425
from redis.utils import str_if_bytes
2526

2627

@@ -163,6 +164,9 @@ def reset(self):
163164
self.master_address = None
164165
self.slave_rr_counter = None
165166

167+
def reset_available_connections(self):
168+
return ConnectionsIndexer()
169+
166170
def owns_connection(self, connection: Connection):
167171
check = not self.is_master or (
168172
self.is_master and self.master_address == (connection.host, connection.port)

redis/connection.py

Lines changed: 5 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,10 @@
66
import threading
77
import weakref
88
from abc import abstractmethod
9-
from collections import defaultdict
109
from itertools import chain
1110
from queue import Empty, Full, LifoQueue
1211
from time import time
13-
from typing import Any, Callable, Iterable, List, Optional, Sequence, Type, Union
12+
from typing import Any, Callable, List, Optional, Sequence, Type, Union
1413
from urllib.parse import parse_qs, unquote, urlparse
1514

1615
from ._cache import (
@@ -735,57 +734,6 @@ def _host_error(self):
735734
return f"{self.host}:{self.port}"
736735

737736

738-
class ConnectionsIndexer(Iterable):
739-
"""
740-
Data structure that simulates a list of available connections.
741-
Instead of list, we keep 2 additional DS to support O(1) operations
742-
on all of the class' methods.
743-
The first DS is indexed on the connection object's ID.
744-
The second DS is indexed on the address (ip and port) of the connection.
745-
"""
746-
747-
def __init__(self):
748-
# Map the id to the connection object
749-
self._id_to_connection = {}
750-
# Map the address to a dictionary of connections
751-
# The inner dictionary is a map between the object id to the object itself
752-
# Both of these DS support O(1) operations on all of the class' methods
753-
self._address_to_connections = defaultdict(dict)
754-
755-
def pop(self):
756-
try:
757-
_, connection = self._id_to_connection.popitem()
758-
del self._address_to_connections[(connection.host, connection.port)][
759-
id(connection)
760-
]
761-
except KeyError:
762-
# We are simulating a list, hence we raise IndexError
763-
# when there's no item in the dictionary
764-
raise IndexError()
765-
return connection
766-
767-
def append(self, connection: Connection):
768-
self._id_to_connection[id(connection)] = connection
769-
self._address_to_connections[(connection.host, connection.port)][
770-
id(connection)
771-
] = connection
772-
773-
def get_connection(self, host: str, port: int):
774-
try:
775-
_, connection = self._address_to_connections[(host, port)].popitem()
776-
del self._id_to_connection[id(connection)]
777-
except KeyError:
778-
return None
779-
return connection
780-
781-
def __iter__(self):
782-
# This is an O(1) operation in python3.7 and later
783-
return iter(self._id_to_connection.values())
784-
785-
def __len__(self):
786-
return len(self._id_to_connection)
787-
788-
789737
class SSLConnection(Connection):
790738
"""Manages SSL connections to and from the Redis server(s).
791739
This class extends the Connection class, adding SSL functionality, and making
@@ -1135,7 +1083,6 @@ def __init__(
11351083
self,
11361084
connection_class=Connection,
11371085
max_connections: Optional[int] = None,
1138-
index_available_connections: bool = False,
11391086
**connection_kwargs,
11401087
):
11411088
max_connections = max_connections or 2**31
@@ -1155,7 +1102,6 @@ def __init__(
11551102
# will notice the first thread already did the work and simply
11561103
# release the lock.
11571104
self._fork_lock = threading.Lock()
1158-
self._index_available_connections = index_available_connections
11591105
self.reset()
11601106

11611107
def __repr__(self) -> (str, str):
@@ -1177,9 +1123,7 @@ def cleanup(self, **options):
11771123
def reset(self) -> None:
11781124
self._lock = threading.Lock()
11791125
self._created_connections = 0
1180-
self._available_connections: ConnectionsIndexer | list = (
1181-
ConnectionsIndexer() if self._index_available_connections else []
1182-
)
1126+
self._available_connections = self.reset_available_connections()
11831127
self._in_use_connections = set()
11841128

11851129
# this must be the last operation in this method. while reset() is
@@ -1193,6 +1137,9 @@ def reset(self) -> None:
11931137
# reset() and they will immediately release _fork_lock and continue on.
11941138
self.pid = os.getpid()
11951139

1140+
def reset_available_connections(self):
1141+
return []
1142+
11961143
def _checkpid(self) -> None:
11971144
# _checkpid() attempts to keep ConnectionPool fork-safe on modern
11981145
# systems. this is called by all ConnectionPool methods that

redis/sentinel.py

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import random
22
import weakref
3-
from typing import Any, Optional
3+
from typing import Any, Iterable, Optional
4+
from collections import defaultdict
45

56
from redis.client import Redis
67
from redis.commands import SentinelCommands
@@ -155,6 +156,57 @@ def rotate_slaves(self):
155156
raise SlaveNotFoundError(f"No slave found for {self.service_name!r}")
156157

157158

159+
class ConnectionsIndexer(Iterable):
160+
"""
161+
Data structure that stores available connections in a pool.
162+
Instead of list, we keep 2 additional DS to support O(1) operations
163+
on all of the class' methods.
164+
The first DS is indexed on the connection object's ID.
165+
The second DS is indexed on the address (ip and port) of the connection.
166+
"""
167+
168+
def __init__(self):
169+
# Map the id to the connection object
170+
self._id_to_connection = {}
171+
# Map the address to a dictionary of connections
172+
# The inner dictionary is a map between the object id to the object itself
173+
# Both of these DS support O(1) operations on all of the class' methods
174+
self._address_to_connections = defaultdict(dict)
175+
176+
def pop(self):
177+
try:
178+
_, connection = self._id_to_connection.popitem()
179+
del self._address_to_connections[(connection.host, connection.port)][
180+
id(connection)
181+
]
182+
except KeyError:
183+
# We are simulating a list, hence we raise IndexError
184+
# when there's no item in the dictionary
185+
raise IndexError()
186+
return connection
187+
188+
def append(self, connection: Connection):
189+
self._id_to_connection[id(connection)] = connection
190+
self._address_to_connections[(connection.host, connection.port)][
191+
id(connection)
192+
] = connection
193+
194+
def get_connection(self, host: str, port: int):
195+
try:
196+
_, connection = self._address_to_connections[(host, port)].popitem()
197+
del self._id_to_connection[id(connection)]
198+
except KeyError:
199+
return None
200+
return connection
201+
202+
def __iter__(self):
203+
# This is an O(1) operation in python3.7 and later
204+
return iter(self._id_to_connection.values())
205+
206+
def __len__(self):
207+
return len(self._id_to_connection)
208+
209+
158210
class SentinelConnectionPool(ConnectionPool):
159211
"""
160212
Sentinel backed connection pool.
@@ -181,7 +233,7 @@ def __init__(self, service_name, sentinel_manager, **kwargs):
181233
service_name=service_name,
182234
sentinel_manager=sentinel_manager,
183235
)
184-
super().__init__(index_available_connections=True, **kwargs)
236+
super().__init__(**kwargs)
185237
self.connection_kwargs["connection_pool"] = self.proxy
186238
self.service_name = service_name
187239
self.sentinel_manager = sentinel_manager
@@ -198,6 +250,9 @@ def reset(self):
198250
super().reset()
199251
self.proxy.reset()
200252

253+
def reset_available_connections(self):
254+
return ConnectionsIndexer()
255+
201256
@property
202257
def master_address(self):
203258
return self.proxy.master_address

tests/test_connection.py

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
from redis.backoff import NoBackoff
1111
from redis.connection import (
1212
Connection,
13-
ConnectionsIndexer,
1413
SSLConnection,
1514
UnixDomainSocketConnection,
1615
parse_url,
@@ -348,17 +347,3 @@ def test_unix_socket_connection_failure():
348347
== "Error 2 connecting to unix:///tmp/a.sock. No such file or directory."
349348
)
350349

351-
352-
def test_connections_indexer_operations():
353-
ci = ConnectionsIndexer()
354-
c1 = Connection(host="1", port=2)
355-
ci.append(c1)
356-
assert list(ci) == [c1]
357-
assert ci.pop() == c1
358-
359-
c2 = Connection(host="3", port=4)
360-
ci.append(c1)
361-
ci.append(c2)
362-
assert ci.get_connection("3", 4) == c2
363-
assert not ci.get_connection("5", 6)
364-
assert list(ci) == [c1]

tests/test_sentinel.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import redis.sentinel
66
from redis import exceptions
77
from redis.sentinel import (
8+
ConnectionsIndexer,
89
MasterNotFoundError,
910
Sentinel,
1011
SentinelConnectionPool,
@@ -266,3 +267,18 @@ def mock_disconnect():
266267

267268
assert calls == 1
268269
pool.disconnect()
270+
271+
272+
def test_connections_indexer_operations():
273+
ci = ConnectionsIndexer()
274+
c1 = Connection(host="1", port=2)
275+
ci.append(c1)
276+
assert list(ci) == [c1]
277+
assert ci.pop() == c1
278+
279+
c2 = Connection(host="3", port=4)
280+
ci.append(c1)
281+
ci.append(c2)
282+
assert ci.get_connection("3", 4) == c2
283+
assert not ci.get_connection("5", 6)
284+
assert list(ci) == [c1]

0 commit comments

Comments
 (0)