-
Notifications
You must be signed in to change notification settings - Fork 1.1k
PYTHON-5215 Add an asyncio.Protocol implementation for KMS #2460
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
I'm debugging two failures:
|
I realized the benchmark test wasn't actually triggering the protocol -> I'm tweaking things locally |
This has a couple commits from #2467 |
Okay this is ready for a look. We might consider switching to the base |
I still need to make a PR for the fix to the kms mock server's 404 response. |
CSOT failure is unrelated: PYTHON-5492 |
pymongo/network_layer.py
Outdated
# Reuse the active buffer if it has space. | ||
if len(self._buffers): | ||
buffer = self._buffers[-1] | ||
if len(buffer.buffer) - buffer.end_index > sizehint: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If sizehint = -1
, which signals that the buffer size can be arbitrary, this check will always succeed, potentially returning an empty buffer, which is an error. We need to check that sizehint is a positive number as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're setting sizehint
to be at least 16384
always, is this check worth doing in the first place? I'd expect us to rarely reuse the active buffer since we'll usually have a buffer of size 16384
and a sizehint of 16384
.
""" | ||
self.transport = transport # type: ignore[assignment] | ||
|
||
async def read(self, bytes_needed: int) -> bytes: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make sure I understand the intended flow here, is this example correct?
- We call
kms_request
and enter thewhile kms_context.bytes_needed > 0:
loop. - The first chunk of data, say 16 bytes worth is written into an existing buffer that still has space.
PyMongoKMSProtocol.read()
is called and immediately returns those 10 bytes.kms_context.bytes_needed
updates to need 84 more bytes for a total of 100.- We call
PyMongoKMSProtocol.read()
again and wait on the_pending_listeners
Future we create. - The second chunk of data, the remaining 84 bytes, requires a new buffer since the active buffer is full.
- The Future is resolved with those bytes, which we return and feed into
kms_context
to complete the operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- We call kms_request and enter the while kms_context.bytes_needed > 0: loop.
- The first chunk of data, say 16 bytes worth is written into an existing buffer that still has space.
- PyMongoKMSProtocol.read() is called and immediately returns those 10 bytes, pushing the start_index up by 10
- kms_context.bytes_needed updates to need 84 more bytes
- We call PyMongoKMSProtocol.read() again and wait on the _pending_listeners Future we created.
- The second chunk of data, the remaining 84 bytes, may require a new buffer
- If any bytes are available, we read in up to the newly requested 84 bytes from the active buffer(s), advancing
start_index
and exhausting buffers as appropriate - Otherwise, we wait on the future to be resolved, which will contain up to the requested bytes.
pymongo/network_layer.py
Outdated
|
||
async def async_sendall(conn: PyMongoProtocol, buf: bytes) -> None: | ||
bytes_needed = self._pending_reads.popleft() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if we need more bytes than we have? We've already popped the waiter and set it's result to data
, which can only read up to self._bytes_ready
bytes. Are we relying on the kms_context.bytes_needed
loop to call the protocol read()
method again and create a new waiter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, we give the partial result back to the kms context, and let it ask for more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could get better performance by doing more of the looping inside the Protocol, but KMS requests won't be a significant part of runtime anyway so not worth spending more time on it. Can you add a comment to this effect somewhere saying that we rely on the looping behavior for this to function correctly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not really a question of perf, but the fact that the kms_request is blind until it knows the Content-Length, and we don't know what state it is in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a comment.
test/asynchronous/test_collection.py
Outdated
@@ -335,6 +335,8 @@ async def test_create_index(self): | |||
await db.test.create_index(["hello", ("world", DESCENDING)]) | |||
await db.test.create_index({"hello": 1}.items()) # type:ignore[arg-type] | |||
|
|||
# TODO: PYTHON-5491 - remove version max |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change should be in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah it was, I just updated this branch.
pymongo/network_layer.py
Outdated
# Reuse the active buffer if it has space. | ||
if len(self._buffers): | ||
buffer = self._buffers[-1] | ||
if len(buffer.buffer) - buffer.end_index > sizehint: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're setting sizehint
to be at least 16384
always, is this check worth doing in the first place? I'd expect us to rarely reuse the active buffer since we'll usually have a buffer of size 16384
and a sizehint of 16384
.
The actual sizehint in practice was on the order of the bytes being read from the buffer (typically less than 1000). Using the buffered protocol at all here is a bit of a mismatch imho. |
How long would refactoring to not use buffered take? No reason to use the lower-level API if we don't need to. |
It's actually dead simple, I did it along the way when I was debugging a race condition. |
I'll push a commit in the morning for comparison, we can always revert. |
I'm happy with the simplification. The tests are passing locally, this is ready for another look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you schedule a full Evergreen run? We should ensure there's no regressions introduced here by accident.
Are the benchmark results for KMS significantly different between the two Protocol implementations?
Full patch build: https://spruce.mongodb.com/version/689f5483e112170007b0ce9f/tasks?sorts=STATUS%3AASC%3BBASE_STATUS%3ADESC I updated the timings in the PR description, there no significant change. |
Okay there is one legit bug in |
See benchmark gist.
Benchmark Results:
Before:
4.93s, 5.26s
After:
4.93s
,5.05s
Depends on mongodb-labs/drivers-evergreen-tools#679