Skip to content

PYTHON-5215 Add an asyncio.Protocol implementation for KMS #2460

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 34 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
a490cfd
PYTHON-5215 Add an asyncio.Protocol implementation for KMS
blink1073 Aug 6, 2025
404c1fc
cleanup
blink1073 Aug 6, 2025
e4a588b
restore comment
blink1073 Aug 6, 2025
2af62a3
fix close
blink1073 Aug 6, 2025
8494954
Merge branch 'master' of github.com:mongodb/mongo-python-driver into …
blink1073 Aug 6, 2025
0a43477
wip
blink1073 Aug 6, 2025
13537cb
wip
blink1073 Aug 7, 2025
18c51cb
wip
blink1073 Aug 7, 2025
24aa733
wip
blink1073 Aug 7, 2025
b33f78e
wip
blink1073 Aug 7, 2025
4b1bdd6
fixup
blink1073 Aug 7, 2025
fa0dd8d
always allow partial reads
blink1073 Aug 7, 2025
432380e
fixup
blink1073 Aug 7, 2025
484aa9f
cleanup
blink1073 Aug 8, 2025
e50685c
fix sync kms
blink1073 Aug 8, 2025
2622a7a
undo change to justfile
blink1073 Aug 8, 2025
73b4309
remove unused code
blink1073 Aug 8, 2025
0cbdd58
undo lock file changes
blink1073 Aug 8, 2025
6fe6ba3
use det branch
blink1073 Aug 8, 2025
6ed92bb
fix branch name
blink1073 Aug 8, 2025
971139c
fix buffer handling and close handling
blink1073 Aug 8, 2025
4f174f9
Merge branch 'master' of github.com:mongodb/mongo-python-driver into …
blink1073 Aug 8, 2025
db4332d
fix close conn behavior
blink1073 Aug 8, 2025
c2f6ae8
skip another test
blink1073 Aug 8, 2025
d9e65b6
skip tests
blink1073 Aug 8, 2025
4546f23
address review
blink1073 Aug 11, 2025
39b4526
use upstream d-e-t
blink1073 Aug 11, 2025
da04fc8
fix waiting logic
blink1073 Aug 11, 2025
28afc38
address review
blink1073 Aug 11, 2025
33add41
Merge branch 'master' into PYTHON-5215
blink1073 Aug 14, 2025
8b357cd
use the base Protocol
blink1073 Aug 15, 2025
0708278
fixups
blink1073 Aug 15, 2025
14a6199
Merge branch 'PYTHON-5215' of github.com:blink1073/mongo-python-drive…
blink1073 Aug 15, 2025
513e66a
Merge branch 'master' of github.com:mongodb/mongo-python-driver into …
blink1073 Aug 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 10 additions & 17 deletions pymongo/asynchronous/encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
from pymongo.asynchronous.cursor import AsyncCursor
from pymongo.asynchronous.database import AsyncDatabase
from pymongo.asynchronous.mongo_client import AsyncMongoClient
from pymongo.asynchronous.pool import AsyncBaseConnection
from pymongo.common import CONNECT_TIMEOUT
from pymongo.daemon import _spawn_daemon
from pymongo.encryption_options import AutoEncryptionOpts, RangeOpts
Expand All @@ -75,11 +76,11 @@
NetworkTimeout,
ServerSelectionTimeoutError,
)
from pymongo.network_layer import async_socket_sendall
from pymongo.network_layer import PyMongoKMSProtocol, async_receive_kms, async_sendall
from pymongo.operations import UpdateOne
from pymongo.pool_options import PoolOptions
from pymongo.pool_shared import (
_async_configured_socket,
_configured_protocol_interface,
_get_timeout_details,
_raise_connection_failure,
)
Expand All @@ -93,10 +94,8 @@
if TYPE_CHECKING:
from pymongocrypt.mongocrypt import MongoCryptKmsContext

from pymongo.pyopenssl_context import _sslConn
from pymongo.typings import _Address


_IS_SYNC = False

_HTTPS_PORT = 443
Expand All @@ -111,9 +110,10 @@
_KEY_VAULT_OPTS = CodecOptions(document_class=RawBSONDocument)


async def _connect_kms(address: _Address, opts: PoolOptions) -> Union[socket.socket, _sslConn]:
async def _connect_kms(address: _Address, opts: PoolOptions) -> AsyncBaseConnection:
try:
return await _async_configured_socket(address, opts)
interface = await _configured_protocol_interface(address, opts, PyMongoKMSProtocol)
return AsyncBaseConnection(interface, opts)
except Exception as exc:
_raise_connection_failure(address, exc, timeout_details=_get_timeout_details(opts))

Expand Down Expand Up @@ -198,18 +198,11 @@ async def kms_request(self, kms_context: MongoCryptKmsContext) -> None:
try:
conn = await _connect_kms(address, opts)
try:
await async_socket_sendall(conn, message)
await async_sendall(conn.conn.get_conn, message)
while kms_context.bytes_needed > 0:
# CSOT: update timeout.
conn.settimeout(max(_csot.clamp_remaining(_KMS_CONNECT_TIMEOUT), 0))
if _IS_SYNC:
data = conn.recv(kms_context.bytes_needed)
else:
from pymongo.network_layer import ( # type: ignore[attr-defined]
async_receive_data_socket,
)

data = await async_receive_data_socket(conn, kms_context.bytes_needed)
conn.set_conn_timeout(max(_csot.clamp_remaining(_KMS_CONNECT_TIMEOUT), 0))
data = await async_receive_kms(conn, kms_context.bytes_needed)
if not data:
raise OSError("KMS connection closed")
kms_context.feed(data)
Expand All @@ -228,7 +221,7 @@ async def kms_request(self, kms_context: MongoCryptKmsContext) -> None:
address, exc, msg_prefix=msg_prefix, timeout_details=_get_timeout_details(opts)
)
finally:
conn.close()
await conn.close_conn(None)
except MongoCryptError:
raise # Propagate MongoCryptError errors directly.
except Exception as exc:
Expand Down
161 changes: 90 additions & 71 deletions pymongo/asynchronous/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,89 @@ def _set_non_inheritable_non_atomic(fd: int) -> None: # noqa: ARG001
_IS_SYNC = False


class AsyncConnection:
class AsyncBaseConnection:
"""A base connection object for server and kms connections."""

def __init__(self, conn: AsyncNetworkingInterface, opts: PoolOptions):
self.conn = conn
self.socket_checker: SocketChecker = SocketChecker()
self.cancel_context: _CancellationContext = _CancellationContext()
self.is_sdam = False
self.closed = False
self.last_timeout: float | None = None
self.more_to_come = False
self.opts = opts
self.max_wire_version = -1

def set_conn_timeout(self, timeout: Optional[float]) -> None:
"""Cache last timeout to avoid duplicate calls to conn.settimeout."""
if timeout == self.last_timeout:
return
self.last_timeout = timeout
self.conn.get_conn.settimeout(timeout)

def apply_timeout(
self, client: AsyncMongoClient[Any], cmd: Optional[MutableMapping[str, Any]]
) -> Optional[float]:
# CSOT: use remaining timeout when set.
timeout = _csot.remaining()
if timeout is None:
# Reset the socket timeout unless we're performing a streaming monitor check.
if not self.more_to_come:
self.set_conn_timeout(self.opts.socket_timeout)
return None
# RTT validation.
rtt = _csot.get_rtt()
if rtt is None:
rtt = self.connect_rtt
max_time_ms = timeout - rtt
if max_time_ms < 0:
timeout_details = _get_timeout_details(self.opts)
formatted = format_timeout_details(timeout_details)
# CSOT: raise an error without running the command since we know it will time out.
errmsg = f"operation would exceed time limit, remaining timeout:{timeout:.5f} <= network round trip time:{rtt:.5f} {formatted}"
if self.max_wire_version != -1:
raise ExecutionTimeout(
errmsg,
50,
{"ok": 0, "errmsg": errmsg, "code": 50},
self.max_wire_version,
)
else:
raise TimeoutError(errmsg)
if cmd is not None:
cmd["maxTimeMS"] = int(max_time_ms * 1000)
self.set_conn_timeout(timeout)
return timeout

async def close_conn(self, reason: Optional[str]) -> None:
"""Close this connection with a reason."""
if self.closed:
return
await self._close_conn()

async def _close_conn(self) -> None:
"""Close this connection."""
if self.closed:
return
self.closed = True
self.cancel_context.cancel()
# Note: We catch exceptions to avoid spurious errors on interpreter
# shutdown.
try:
await self.conn.close()
except Exception: # noqa: S110
pass

def conn_closed(self) -> bool:
"""Return True if we know socket has been closed, False otherwise."""
if _IS_SYNC:
return self.socket_checker.socket_closed(self.conn.get_conn)
else:
return self.conn.is_closing()


class AsyncConnection(AsyncBaseConnection):
"""Store a connection with some metadata.

:param conn: a raw connection object
Expand All @@ -142,29 +224,27 @@ def __init__(
id: int,
is_sdam: bool,
):
super().__init__(conn, pool.opts)
self.pool_ref = weakref.ref(pool)
self.conn = conn
self.address = address
self.id = id
self.address: tuple[str, int] = address
self.id: int = id
self.is_sdam = is_sdam
self.closed = False
self.last_checkin_time = time.monotonic()
self.performed_handshake = False
self.is_writable: bool = False
self.max_wire_version = MAX_WIRE_VERSION
self.max_bson_size = MAX_BSON_SIZE
self.max_message_size = MAX_MESSAGE_SIZE
self.max_write_batch_size = MAX_WRITE_BATCH_SIZE
self.max_bson_size: int = MAX_BSON_SIZE
self.max_message_size: int = MAX_MESSAGE_SIZE
self.max_write_batch_size: int = MAX_WRITE_BATCH_SIZE
self.supports_sessions = False
self.hello_ok: bool = False
self.is_mongos = False
self.is_mongos: bool = False
self.op_msg_enabled = False
self.listeners = pool.opts._event_listeners
self.enabled_for_cmap = pool.enabled_for_cmap
self.enabled_for_logging = pool.enabled_for_logging
self.compression_settings = pool.opts._compression_settings
self.compression_context: Union[SnappyContext, ZlibContext, ZstdContext, None] = None
self.socket_checker: SocketChecker = SocketChecker()
self.oidc_token_gen_id: Optional[int] = None
# Support for mechanism negotiation on the initial handshake.
self.negotiated_mechs: Optional[list[str]] = None
Expand All @@ -175,9 +255,6 @@ def __init__(
self.pool_gen = pool.gen
self.generation = self.pool_gen.get_overall()
self.ready = False
self.cancel_context: _CancellationContext = _CancellationContext()
self.opts = pool.opts
self.more_to_come: bool = False
# For load balancer support.
self.service_id: Optional[ObjectId] = None
self.server_connection_id: Optional[int] = None
Expand All @@ -193,44 +270,6 @@ def __init__(
# For gossiping $clusterTime from the connection handshake to the client.
self._cluster_time = None

def set_conn_timeout(self, timeout: Optional[float]) -> None:
"""Cache last timeout to avoid duplicate calls to conn.settimeout."""
if timeout == self.last_timeout:
return
self.last_timeout = timeout
self.conn.get_conn.settimeout(timeout)

def apply_timeout(
self, client: AsyncMongoClient[Any], cmd: Optional[MutableMapping[str, Any]]
) -> Optional[float]:
# CSOT: use remaining timeout when set.
timeout = _csot.remaining()
if timeout is None:
# Reset the socket timeout unless we're performing a streaming monitor check.
if not self.more_to_come:
self.set_conn_timeout(self.opts.socket_timeout)
return None
# RTT validation.
rtt = _csot.get_rtt()
if rtt is None:
rtt = self.connect_rtt
max_time_ms = timeout - rtt
if max_time_ms < 0:
timeout_details = _get_timeout_details(self.opts)
formatted = format_timeout_details(timeout_details)
# CSOT: raise an error without running the command since we know it will time out.
errmsg = f"operation would exceed time limit, remaining timeout:{timeout:.5f} <= network round trip time:{rtt:.5f} {formatted}"
raise ExecutionTimeout(
errmsg,
50,
{"ok": 0, "errmsg": errmsg, "code": 50},
self.max_wire_version,
)
if cmd is not None:
cmd["maxTimeMS"] = int(max_time_ms * 1000)
self.set_conn_timeout(timeout)
return timeout

def pin_txn(self) -> None:
self.pinned_txn = True
assert not self.pinned_cursor
Expand Down Expand Up @@ -574,26 +613,6 @@ async def close_conn(self, reason: Optional[str]) -> None:
error=reason,
)

async def _close_conn(self) -> None:
"""Close this connection."""
if self.closed:
return
self.closed = True
self.cancel_context.cancel()
# Note: We catch exceptions to avoid spurious errors on interpreter
# shutdown.
try:
await self.conn.close()
except Exception: # noqa: S110
pass

def conn_closed(self) -> bool:
"""Return True if we know socket has been closed, False otherwise."""
if _IS_SYNC:
return self.socket_checker.socket_closed(self.conn.get_conn)
else:
return self.conn.is_closing()

def send_cluster_time(
self,
command: MutableMapping[str, Any],
Expand Down
Loading
Loading