-
Notifications
You must be signed in to change notification settings - Fork 2.6k
fix: make sure scan iterator commands are always issued to the same replica #3769
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
fix: make sure scan iterator commands are always issued to the same replica #3769
Conversation
6499430
to
5ba56ef
Compare
Hi @rdelcampog, thank you for your contribution! We will review your change soon. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements a fix to ensure scan iterator commands (SCAN, SSCAN, HSCAN, ZSCAN) always use the same replica connection throughout their iteration when using Sentinel-managed connections, addressing issue #3197.
Key changes made:
- Added
iter_req_id
tracking to maintain connection consistency for scan operations - Modified scan iterator methods to generate unique IDs and clean up after completion
- Enhanced connection pools to support replica affinity during scan iterations
Reviewed Changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 6 comments.
Show a summary per file
File | Description |
---|---|
redis/sentinel.py |
Added _iter_req_connections tracking and enhanced get_connection() with replica affinity logic |
redis/asyncio/sentinel.py |
Added async version of replica affinity tracking for AsyncSentinelConnectionPool |
redis/commands/core.py |
Modified all scan iterator methods to generate unique iter_req_id and ensure cleanup |
redis/client.py |
Added cleanup logic when SCAN commands complete (cursor returns to 0) |
redis/asyncio/client.py |
Added async version of cleanup logic for completed SCAN commands |
tests/test_sentinel_managed_connection.py |
Comprehensive test coverage for replica consistency behavior |
tests/test_asyncio/test_sentinel_managed_connection.py |
Async version of replica consistency tests |
Comments suppressed due to low confidence (1)
redis/asyncio/sentinel.py:1
- The
pytestmark = pytest.mark.asyncio
line was removed but this is needed for async tests to run properly. This line should be restored to ensure all async test functions in this file are automatically marked with the asyncio pytest marker.
import asyncio
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
try: | ||
replica_address = next(self.rotate_slaves()) | ||
connection.connect_to(replica_address) | ||
# Track this replica for future requests with this iter_req_id | ||
self._iter_req_connections[iter_req_id] = replica_address | ||
except (SlaveNotFoundError, StopIteration): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using next()
on rotate_slaves()
assumes it returns an iterator, but rotate_slaves()
returns a generator from proxy.rotate_slaves()
. This should handle the StopIteration exception properly or use a different approach to get a single replica address.
try: | |
replica_address = next(self.rotate_slaves()) | |
connection.connect_to(replica_address) | |
# Track this replica for future requests with this iter_req_id | |
self._iter_req_connections[iter_req_id] = replica_address | |
except (SlaveNotFoundError, StopIteration): | |
replica_address = next(self.rotate_slaves(), None) | |
if replica_address is not None: | |
connection.connect_to(replica_address) | |
# Track this replica for future requests with this iter_req_id | |
self._iter_req_connections[iter_req_id] = replica_address | |
else: |
Copilot uses AI. Check for mistakes.
if hasattr(connection, "connect_to"): | ||
# Let the connection establish to its target replica | ||
try: | ||
replica_address = await self.rotate_slaves().__anext__() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using __anext__()
directly on an async generator is not the recommended approach. Consider using anext()
builtin function instead: replica_address = await anext(self.rotate_slaves())
.
Copilot uses AI. Check for mistakes.
import random | ||
import time | ||
|
||
self.host = f"host-{random.randint(0, 10)}" | ||
self.port = time.time() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The imports random
and time
are placed inside the method rather than at the top of the file. These should be moved to the top-level imports for better code organization and consistency.
Copilot uses AI. Check for mistakes.
import random | ||
import time | ||
|
||
self.host = f"host-{random.randint(0, 10)}" | ||
self.port = time.time() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The imports random
and time
are placed inside the method rather than at the top of the file. These should be moved to the top-level imports for better code organization and consistency.
Copilot uses AI. Check for mistakes.
if hasattr(pool, "cleanup"): | ||
pool.cleanup(iter_req_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The variable pool
is not defined in this scope. It should be referencing conn.connection_pool
or self.connection_pool
to access the cleanup method.
if hasattr(pool, "cleanup"): | |
pool.cleanup(iter_req_id) | |
if hasattr(conn.connection_pool, "cleanup"): | |
conn.connection_pool.cleanup(iter_req_id) |
Copilot uses AI. Check for mistakes.
and len(result) >= 2 | ||
and result[0] == 0 | ||
): | ||
if hasattr(pool, "cleanup"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The variable pool
is not defined in this scope. It should be referencing conn.connection_pool
or self.connection_pool
to access the cleanup method.
Copilot uses AI. Check for mistakes.
Pull Request check-list
Please make sure to review and check all of these items:
NOTE: these things are not required to open a PR and can be done
afterwards / while the PR is open.
Description of change
Fixes #3197. Inspired on @agnesnatasya pull request here.
cc/ @dmaier-redislabs @gerzse as both participated in the other PR.