From d748c4dc0c2e9d7613abed8f7415eea373d49c6e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 19 Dec 2025 23:07:09 +0000 Subject: [PATCH 1/3] Initial plan From 72b33652f95dbf62002c5b55686c61e0fb844974 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 19 Dec 2025 23:14:05 +0000 Subject: [PATCH 2/3] Merge add_http_multipart_transport branch Co-authored-by: magicmark <1590756+magicmark@users.noreply.github.com> --- .github/workflows/tests.yml | 4 +- docs/advanced/async_advanced_usage.rst | 24 +- docs/advanced/async_permanent_session.rst | 77 +- docs/code_examples/http_multipart_async.py | 37 + .../reconnecting_mutation_http.py | 10 +- .../code_examples/reconnecting_mutation_ws.py | 10 +- docs/modules/gql.rst | 1 + docs/modules/transport_http_multipart.rst | 7 + docs/transports/async_transports.rst | 1 + docs/transports/http_multipart.rst | 159 ++++ gql/cli.py | 5 +- gql/client.py | 36 +- gql/transport/http_multipart_transport.py | 323 ++++++++ gql/utilities/get_introspection_query_ast.py | 9 + setup.py | 6 +- tests/conftest.py | 1 + tests/starwars/test_dsl.py | 48 ++ tests/test_cli.py | 2 + tests/test_http_multipart_transport.py | 691 ++++++++++++++++++ tox.ini | 5 +- 20 files changed, 1394 insertions(+), 62 deletions(-) create mode 100644 docs/code_examples/http_multipart_async.py create mode 100644 docs/modules/transport_http_multipart.rst create mode 100644 docs/transports/http_multipart.rst create mode 100644 gql/transport/http_multipart_transport.py create mode 100644 tests/test_http_multipart_transport.py diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 8463ac00..37b381c5 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -11,7 +11,7 @@ jobs: strategy: max-parallel: 4 matrix: - python-version: ["3.9", "3.10", "3.11", "3.12", "3.13", "pypy3.10"] + python-version: ["3.9", "3.10", "3.11", "3.12", "3.13", "3.14", "pypy3.10"] os: [ubuntu-24.04, windows-latest] exclude: - os: windows-latest @@ -22,6 +22,8 @@ jobs: python-version: "3.11" - os: windows-latest python-version: "3.13" + - os: windows-latest + python-version: "3.14" - os: windows-latest python-version: "pypy3.10" diff --git a/docs/advanced/async_advanced_usage.rst b/docs/advanced/async_advanced_usage.rst index 4164cb37..78952d0f 100644 --- a/docs/advanced/async_advanced_usage.rst +++ b/docs/advanced/async_advanced_usage.rst @@ -6,7 +6,7 @@ Async advanced usage It is possible to send multiple GraphQL queries (query, mutation or subscription) in parallel, on the same websocket connection, using asyncio tasks. -In order to retry in case of connection failure, we can use the great `backoff`_ module. +In order to retry in case of connection failure, we can use the great `tenacity`_ module. .. code-block:: python @@ -28,10 +28,22 @@ In order to retry in case of connection failure, we can use the great `backoff`_ async for result in session.subscribe(subscription2): print(result) - # Then create a couroutine which will connect to your API and run all your queries as tasks. - # We use a `backoff` decorator to reconnect using exponential backoff in case of connection failure. - - @backoff.on_exception(backoff.expo, Exception, max_time=300) + # Then create a couroutine which will connect to your API and run all your + # queries as tasks. We use a `tenacity` retry decorator to reconnect using + # exponential backoff in case of connection failure. + + from tenacity import ( + retry, + retry_if_exception_type, + stop_after_delay, + wait_exponential, + ) + + @retry( + retry=retry_if_exception_type(Exception), + stop=stop_after_delay(300), # max_time in seconds + wait=wait_exponential(), + ) async def graphql_connection(): transport = WebsocketsTransport(url="wss://YOUR_URL") @@ -54,4 +66,4 @@ Subscriptions tasks can be stopped at any time by running task.cancel() -.. _backoff: https://github.com/litl/backoff +.. _tenacity: https://github.com/jd/tenacity diff --git a/docs/advanced/async_permanent_session.rst b/docs/advanced/async_permanent_session.rst index e42010cf..885d2fd2 100644 --- a/docs/advanced/async_permanent_session.rst +++ b/docs/advanced/async_permanent_session.rst @@ -36,19 +36,22 @@ Retries Connection retries ^^^^^^^^^^^^^^^^^^ -With :code:`reconnecting=True`, gql will use the `backoff`_ module to repeatedly try to connect with -exponential backoff and jitter with a maximum delay of 60 seconds by default. +With :code:`reconnecting=True`, gql will use the `tenacity`_ module to repeatedly +try to connect with exponential backoff and jitter with a maximum delay of +60 seconds by default. You can change the default reconnecting profile by providing your own -backoff decorator to the :code:`retry_connect` argument. +retry decorator (from tenacity) to the :code:`retry_connect` argument. .. code-block:: python + from tenacity import retry, retry_if_exception_type, wait_exponential + # Here wait maximum 5 minutes between connection retries - retry_connect = backoff.on_exception( - backoff.expo, # wait generator (here: exponential backoff) - Exception, # which exceptions should cause a retry (here: everything) - max_value=300, # max wait time in seconds + retry_connect = retry( + # which exceptions should cause a retry (here: everything) + retry=retry_if_exception_type(Exception), + wait=wait_exponential(max=300), # max wait time in seconds ) session = await client.connect_async( reconnecting=True, @@ -66,32 +69,49 @@ There is no retry in case of a :code:`TransportQueryError` exception as it indic the connection to the backend is working correctly. You can change the default execute retry profile by providing your own -backoff decorator to the :code:`retry_execute` argument. +retry decorator (from tenacity) to the :code:`retry_execute` argument. .. code-block:: python + from tenacity import ( + retry, + retry_if_exception_type, + stop_after_attempt, + wait_exponential, + ) + # Here Only 3 tries for execute calls - retry_execute = backoff.on_exception( - backoff.expo, - Exception, - max_tries=3, + retry_execute = retry( + retry=retry_if_exception_type(Exception), + stop=stop_after_attempt(3), + wait=wait_exponential(), ) session = await client.connect_async( reconnecting=True, retry_execute=retry_execute, ) -If you don't want any retry on the execute calls, you can disable the retries with :code:`retry_execute=False` +If you don't want any retry on the execute calls, you can disable the retries +with :code:`retry_execute=False` .. note:: If you want to retry even with :code:`TransportQueryError` exceptions, - then you need to make your own backoff decorator on your own method: + then you need to make your own retry decorator (from tenacity) on your own method: .. code-block:: python - @backoff.on_exception(backoff.expo, - Exception, - max_tries=3) + from tenacity import ( + retry, + retry_if_exception_type, + stop_after_attempt, + wait_exponential, + ) + + @retry( + retry=retry_if_exception_type(Exception), + stop=stop_after_attempt(3), + wait=wait_exponential(), + ) async def execute_with_retry(session, query): return await session.execute(query) @@ -100,14 +120,25 @@ Subscription retries There is no :code:`retry_subscribe` as it is not feasible with async generators. If you want retries for your subscriptions, then you can do it yourself -with backoff decorators on your methods. +with retry decorators (from tenacity) on your methods. .. code-block:: python - @backoff.on_exception(backoff.expo, - Exception, - max_tries=3, - giveup=lambda e: isinstance(e, TransportQueryError)) + from tenacity import ( + retry, + retry_if_exception_type, + retry_unless_exception_type, + stop_after_attempt, + wait_exponential, + ) + from gql.transport.exceptions import TransportQueryError + + @retry( + retry=retry_if_exception_type(Exception) + & retry_unless_exception_type(TransportQueryError), + stop=stop_after_attempt(3), + wait=wait_exponential(), + ) async def execute_subscription1(session): async for result in session.subscribe(subscription1): print(result) @@ -123,4 +154,4 @@ Console example .. literalinclude:: ../code_examples/console_async.py .. _difficult to manage: https://github.com/graphql-python/gql/issues/179 -.. _backoff: https://github.com/litl/backoff +.. _tenacity: https://github.com/jd/tenacity diff --git a/docs/code_examples/http_multipart_async.py b/docs/code_examples/http_multipart_async.py new file mode 100644 index 00000000..d6d6e372 --- /dev/null +++ b/docs/code_examples/http_multipart_async.py @@ -0,0 +1,37 @@ +import asyncio +import logging + +from gql import Client, gql +from gql.transport.http_multipart_transport import HTTPMultipartTransport + +logging.basicConfig(level=logging.INFO) + + +async def main(): + + transport = HTTPMultipartTransport(url="https://gql-book-server.fly.dev/graphql") + + # Using `async with` on the client will start a connection on the transport + # and provide a `session` variable to execute queries on this connection + async with Client( + transport=transport, + ) as session: + + # Request subscription + subscription = gql( + """ + subscription { + book { + title + author + } + } + """ + ) + + # Subscribe and receive streaming updates + async for result in session.subscribe(subscription): + print(f"Received: {result}") + + +asyncio.run(main()) diff --git a/docs/code_examples/reconnecting_mutation_http.py b/docs/code_examples/reconnecting_mutation_http.py index 5deb5063..1eaf0111 100644 --- a/docs/code_examples/reconnecting_mutation_http.py +++ b/docs/code_examples/reconnecting_mutation_http.py @@ -1,7 +1,7 @@ import asyncio import logging -import backoff +from tenacity import retry, retry_if_exception_type, wait_exponential from gql import Client, gql from gql.transport.aiohttp import AIOHTTPTransport @@ -17,11 +17,9 @@ async def main(): client = Client(transport=transport) - retry_connect = backoff.on_exception( - backoff.expo, - Exception, - max_value=10, - jitter=None, + retry_connect = retry( + retry=retry_if_exception_type(Exception), + wait=wait_exponential(max=10), ) session = await client.connect_async(reconnecting=True, retry_connect=retry_connect) diff --git a/docs/code_examples/reconnecting_mutation_ws.py b/docs/code_examples/reconnecting_mutation_ws.py index d7e7cfe2..4d083d54 100644 --- a/docs/code_examples/reconnecting_mutation_ws.py +++ b/docs/code_examples/reconnecting_mutation_ws.py @@ -1,7 +1,7 @@ import asyncio import logging -import backoff +from tenacity import retry, retry_if_exception_type, wait_exponential from gql import Client, gql from gql.transport.websockets import WebsocketsTransport @@ -17,11 +17,9 @@ async def main(): client = Client(transport=transport) - retry_connect = backoff.on_exception( - backoff.expo, - Exception, - max_value=10, - jitter=None, + retry_connect = retry( + retry=retry_if_exception_type(Exception), + wait=wait_exponential(max=10), ) session = await client.connect_async(reconnecting=True, retry_connect=retry_connect) diff --git a/docs/modules/gql.rst b/docs/modules/gql.rst index 035f196f..6937286e 100644 --- a/docs/modules/gql.rst +++ b/docs/modules/gql.rst @@ -29,6 +29,7 @@ Sub-Packages transport_common_adapters_aiohttp transport_common_adapters_websockets transport_exceptions + transport_http_multipart transport_phoenix_channel_websockets transport_requests transport_httpx diff --git a/docs/modules/transport_http_multipart.rst b/docs/modules/transport_http_multipart.rst new file mode 100644 index 00000000..0e91e0af --- /dev/null +++ b/docs/modules/transport_http_multipart.rst @@ -0,0 +1,7 @@ +gql.transport.http\_multipart\_transport module +=============================================== + +.. automodule:: gql.transport.http_multipart_transport + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/transports/async_transports.rst b/docs/transports/async_transports.rst index ba5ca136..7e81fd35 100644 --- a/docs/transports/async_transports.rst +++ b/docs/transports/async_transports.rst @@ -11,6 +11,7 @@ Async transports are transports which are using an underlying async library. The aiohttp httpx_async + http_multipart websockets aiohttp_websockets phoenix diff --git a/docs/transports/http_multipart.rst b/docs/transports/http_multipart.rst new file mode 100644 index 00000000..416f82c9 --- /dev/null +++ b/docs/transports/http_multipart.rst @@ -0,0 +1,159 @@ +.. _http_multipart_transport: + +HTTPMultipartTransport +====================== + +This transport implements GraphQL subscriptions over HTTP using the `multipart subscription protocol`_ +as implemented by Apollo GraphOS Router and other compatible servers. + +This provides an HTTP-based alternative to WebSocket transports for receiving streaming +subscription updates. It's particularly useful when: + +- WebSocket connections are not available or blocked by infrastructure +- You want to use standard HTTP with existing load balancers and proxies +- The backend implements the multipart subscription protocol + +Reference: :class:`gql.transport.http_multipart_transport.HTTPMultipartTransport` + +.. note:: + + This transport is specifically designed for GraphQL subscriptions. While it can handle + queries and mutations via the ``execute()`` method, standard HTTP transports like + :ref:`AIOHTTPTransport ` are more efficient for those operations. + +.. literalinclude:: ../code_examples/http_multipart_async.py + +How It Works +------------ + +The transport sends a standard HTTP POST request with an ``Accept`` header indicating +support for multipart responses: + +.. code-block:: text + + Accept: multipart/mixed;subscriptionSpec="1.0", application/json + +The server responds with a ``multipart/mixed`` content type and streams subscription +updates as separate parts in the response body. Each part contains a JSON payload +with GraphQL execution results. + +Protocol Details +---------------- + +**Message Format** + +Each message part follows this structure: + +.. code-block:: text + + --graphql + Content-Type: application/json + + {"payload": {"data": {...}, "errors": [...]}} + +**Heartbeats** + +Servers may send empty JSON objects (``{}``) as heartbeat messages to keep the +connection alive. These are automatically filtered out by the transport. + +**Error Handling** + +The protocol distinguishes between two types of errors: + +- **GraphQL errors**: Returned within the ``payload`` property alongside data +- **Transport errors**: Returned with a top-level ``errors`` field and ``null`` payload + +**End of Stream** + +The subscription ends when the server sends the final boundary marker: + +.. code-block:: text + + --graphql-- + +Authentication +-------------- + +Authentication works the same as with :ref:`AIOHTTPTransport `. + +Using HTTP Headers +^^^^^^^^^^^^^^^^^^ + +.. code-block:: python + + transport = HTTPMultipartTransport( + url='https://SERVER_URL:SERVER_PORT/graphql', + headers={'Authorization': 'Bearer YOUR_TOKEN'} + ) + +Using HTTP Cookies +^^^^^^^^^^^^^^^^^^ + +.. code-block:: python + + transport = HTTPMultipartTransport( + url=url, + cookies={"session_id": "your_session_cookie"} + ) + +Or use a cookie jar to save and reuse cookies: + +.. code-block:: python + + import aiohttp + + jar = aiohttp.CookieJar() + transport = HTTPMultipartTransport( + url=url, + client_session_args={'cookie_jar': jar} + ) + +Configuration +------------- + +Timeout Settings +^^^^^^^^^^^^^^^^ + +Set a timeout for the HTTP request: + +.. code-block:: python + + transport = HTTPMultipartTransport( + url='https://SERVER_URL/graphql', + timeout=30 # 30 second timeout + ) + +SSL Configuration +^^^^^^^^^^^^^^^^^ + +Control SSL certificate verification: + +.. code-block:: python + + transport = HTTPMultipartTransport( + url='https://SERVER_URL/graphql', + ssl=False # Disable SSL verification (not recommended for production) + ) + +Or provide a custom SSL context: + +.. code-block:: python + + import ssl + + ssl_context = ssl.create_default_context() + ssl_context.load_cert_chain('client.crt', 'client.key') + + transport = HTTPMultipartTransport( + url='https://SERVER_URL/graphql', + ssl=ssl_context + ) + +Limitations +----------- + +- This transport requires the server to implement the multipart subscription protocol +- Long-lived connections may be terminated by intermediate proxies or load balancers +- Some server configurations may not support HTTP/1.1 chunked transfer encoding required for streaming + +.. _multipart subscription protocol: https://www.apollographql.com/docs/graphos/routing/operations/subscriptions/multipart-protocol diff --git a/gql/cli.py b/gql/cli.py index 37be3656..01dfb20f 100644 --- a/gql/cli.py +++ b/gql/cli.py @@ -140,7 +140,9 @@ def get_parser(with_examples: bool = False) -> ArgumentParser: - input_value_deprecation:false to omit deprecated input fields - specified_by_url:true - schema_description:true - - directive_is_repeatable:true""" + - directive_is_repeatable:true + - input_object_one_of:true + """ ), dest="schema_download", ) @@ -430,6 +432,7 @@ def get_introspection_args(args: Namespace) -> Dict: "directive_is_repeatable", "schema_description", "input_value_deprecation", + "input_object_one_of", ] if args.schema_download is not None: diff --git a/gql/client.py b/gql/client.py index e17a0b7c..93c1078c 100644 --- a/gql/client.py +++ b/gql/client.py @@ -21,7 +21,6 @@ overload, ) -import backoff from anyio import fail_after from graphql import ( ExecutionResult, @@ -31,6 +30,13 @@ parse, validate, ) +from tenacity import ( + retry, + retry_if_exception_type, + retry_unless_exception_type, + stop_after_attempt, + wait_exponential, +) from .graphql_request import GraphQLRequest, support_deprecated_request from .transport.async_transport import AsyncTransport @@ -1902,11 +1908,12 @@ def __init__( """ :param client: the :class:`client ` used. :param retry_connect: Either a Boolean to activate/deactivate the retries - for the connection to the transport OR a backoff decorator to - provide specific retries parameters for the connections. + for the connection to the transport OR a retry decorator + (e.g., from tenacity) to provide specific retries parameters + for the connections. :param retry_execute: Either a Boolean to activate/deactivate the retries - for the execute method OR a backoff decorator to - provide specific retries parameters for this method. + for the execute method OR a retry decorator (e.g., from tenacity) + to provide specific retries parameters for this method. """ self.client = client self._connect_task = None @@ -1917,10 +1924,9 @@ def __init__( if retry_connect is True: # By default, retry again and again, with maximum 60 seconds # between retries - self.retry_connect = backoff.on_exception( - backoff.expo, - Exception, - max_value=60, + self.retry_connect = retry( + retry=retry_if_exception_type(Exception), + wait=wait_exponential(max=60), ) elif retry_connect is False: self.retry_connect = lambda e: e @@ -1930,11 +1936,11 @@ def __init__( if retry_execute is True: # By default, retry 5 times, except if we receive a TransportQueryError - self.retry_execute = backoff.on_exception( - backoff.expo, - Exception, - max_tries=5, - giveup=lambda e: isinstance(e, TransportQueryError), + self.retry_execute = retry( + retry=retry_if_exception_type(Exception) + & retry_unless_exception_type(TransportQueryError), + stop=stop_after_attempt(5), + wait=wait_exponential(), ) elif retry_execute is False: self.retry_execute = lambda e: e @@ -1943,7 +1949,7 @@ def __init__( self.retry_execute = retry_execute # Creating the _execute_with_retries and _connect_with_retries methods - # using the provided backoff decorators + # using the provided retry decorators self._execute_with_retries = self.retry_execute(self._execute_once) self._connect_with_retries = self.retry_connect(self.transport.connect) diff --git a/gql/transport/http_multipart_transport.py b/gql/transport/http_multipart_transport.py new file mode 100644 index 00000000..a869d36f --- /dev/null +++ b/gql/transport/http_multipart_transport.py @@ -0,0 +1,323 @@ +""" +HTTP Multipart Transport for GraphQL Subscriptions + +This transport implements support for GraphQL subscriptions over HTTP using +the multipart subscription protocol as implemented by Apollo GraphOS Router +and other compatible servers. + +Reference: +https://www.apollographql.com/docs/graphos/routing/operations/subscriptions/multipart-protocol +""" + +import asyncio +import json +import logging +from ssl import SSLContext +from typing import Any, AsyncGenerator, Callable, Dict, Optional, Union + +import aiohttp +from aiohttp.client_reqrep import Fingerprint +from aiohttp.helpers import BasicAuth +from aiohttp.typedefs import LooseCookies, LooseHeaders +from graphql import ExecutionResult +from multidict import CIMultiDictProxy + +from gql.graphql_request import GraphQLRequest +from gql.transport.async_transport import AsyncTransport +from gql.transport.common.aiohttp_closed_event import create_aiohttp_closed_event +from gql.transport.exceptions import ( + TransportAlreadyConnected, + TransportClosed, + TransportConnectionFailed, + TransportProtocolError, + TransportServerError, +) + +log = logging.getLogger(__name__) + + +class HTTPMultipartTransport(AsyncTransport): + """ + Async Transport for GraphQL subscriptions using the multipart subscription protocol. + + This transport sends GraphQL subscription queries via HTTP POST and receives + streaming multipart/mixed responses, where each part contains a JSON payload + with GraphQL execution results. This protocol is implemented by Apollo GraphOS + Router and other compatible servers. + """ + + def __init__( + self, + url: str, + headers: Optional[LooseHeaders] = None, + cookies: Optional[LooseCookies] = None, + auth: Optional[BasicAuth] = None, + ssl: Union[SSLContext, bool, Fingerprint] = True, + timeout: Optional[int] = None, + ssl_close_timeout: Optional[Union[int, float]] = 10, + json_serialize: Callable = json.dumps, + json_deserialize: Callable = json.loads, + client_session_args: Optional[Dict[str, Any]] = None, + ) -> None: + """ + Initialize the HTTP Multipart transport. + + :param url: The GraphQL server URL (http or https) + :param headers: Dict of HTTP Headers + :param cookies: Dict of HTTP cookies + :param auth: BasicAuth object for HTTP authentication + :param ssl: SSL context or validation mode + :param timeout: Request timeout in seconds + :param ssl_close_timeout: Timeout for SSL connection close + :param json_serialize: JSON serializer function + :param json_deserialize: JSON deserializer function + :param client_session_args: Extra args for aiohttp.ClientSession + """ + self.url = url + self.headers = headers or {} + self.cookies = cookies + self.auth = auth + self.ssl = ssl + self.timeout = timeout + self.ssl_close_timeout = ssl_close_timeout + self.json_serialize = json_serialize + self.json_deserialize = json_deserialize + self.client_session_args = client_session_args or {} + + self.session: Optional[aiohttp.ClientSession] = None + self.response_headers: Optional[CIMultiDictProxy[str]] = None + + async def connect(self) -> None: + """Create an aiohttp ClientSession.""" + if self.session is not None: + raise TransportAlreadyConnected("Transport is already connected") + + client_session_args: Dict[str, Any] = { + "cookies": self.cookies, + "headers": self.headers, + "auth": self.auth, + "json_serialize": self.json_serialize, + } + + if self.timeout is not None: + client_session_args["timeout"] = aiohttp.ClientTimeout(total=self.timeout) + + client_session_args.update(self.client_session_args) + + log.debug("Connecting HTTP Multipart transport") + self.session = aiohttp.ClientSession(**client_session_args) + + async def close(self) -> None: + """Close the aiohttp session.""" + if self.session is not None: + log.debug("Closing HTTP Multipart transport") + + if ( + self.client_session_args + and self.client_session_args.get("connector_owner") is False + ): + log.debug("connector_owner is False -> not closing connector") + else: + closed_event = create_aiohttp_closed_event(self.session) + await self.session.close() + try: + await asyncio.wait_for(closed_event.wait(), self.ssl_close_timeout) + except asyncio.TimeoutError: + pass + + self.session = None + + async def subscribe( + self, + request: GraphQLRequest, + ) -> AsyncGenerator[ExecutionResult, None]: + """ + Execute a GraphQL subscription and yield results from multipart response. + + :param request: GraphQL request to execute + :yields: ExecutionResult objects as they arrive in the multipart stream + """ + if self.session is None: + raise TransportClosed("Transport is not connected") + + payload = request.payload + if log.isEnabledFor(logging.DEBUG): + log.debug(">>> %s", self.json_serialize(payload)) + + headers = { + "Content-Type": "application/json", + "Accept": ( + "multipart/mixed;boundary=graphql;" + "subscriptionSpec=1.0,application/json" + ), + } + + try: + # Make the POST request + async with self.session.post( + self.url, + json=payload, + headers=headers, + ssl=self.ssl, + ) as response: + # Save response headers + self.response_headers = response.headers + + # Check for errors + if response.status >= 400: + error_text = await response.text() + raise TransportServerError( + f"Server returned {response.status}: {error_text}", + response.status, + ) + + initial_content_type = response.headers.get("Content-Type", "") + if ( + ("multipart/mixed" not in initial_content_type) + or ("boundary=graphql" not in initial_content_type) + or ("subscriptionSpec=1.0" not in initial_content_type) + or ("application/json" not in initial_content_type) + ): + raise TransportProtocolError( + f"Unexpected content-type: {initial_content_type}. " + "Server may not support the multipart subscription protocol." + ) + + # Parse multipart response + async for result in self._parse_multipart_response(response): + yield result + + except (TransportServerError, TransportProtocolError): + # Let these exceptions propagate without wrapping + raise + except Exception as e: + raise TransportConnectionFailed(str(e)) from e + + async def _parse_multipart_response( + self, + response: aiohttp.ClientResponse, + ) -> AsyncGenerator[ExecutionResult, None]: + """ + Parse a multipart response stream and yield execution results. + + Uses aiohttp's built-in MultipartReader to handle the multipart protocol. + + :param response: The aiohttp response object + :yields: ExecutionResult objects + """ + # Use aiohttp's built-in multipart reader + reader = aiohttp.MultipartReader.from_response(response) + + # Iterate through each part in the multipart response + while True: + try: + part = await reader.next() + except Exception: + # reader.next() throws on empty parts at the end of the stream. + # (some servers may send this.) + # see: https://github.com/aio-libs/aiohttp/pull/11857 + # As an ugly workaround for now, we can check if we've reached + # EOF and assume this was the case. + if reader.at_eof(): + break + + # Otherwise, re-raise unexpected errors + raise + + if part is None: + # No more parts + break + + # Skip nested multipart (not expected in GraphQL subscriptions) + if isinstance(part, aiohttp.MultipartReader): + continue + + result = await self._parse_multipart_part(part) + if result: + yield result + + async def _parse_multipart_part( + self, part: aiohttp.BodyPartReader + ) -> Optional[ExecutionResult]: + """ + Parse a single part from a multipart response. + + :param part: aiohttp BodyPartReader for the part + :return: ExecutionResult or None if part is empty/heartbeat + """ + # Verify the part has the correct content type + content_type = part.headers.get(aiohttp.hdrs.CONTENT_TYPE, "") + if not content_type.startswith("application/json"): + raise TransportProtocolError( + f"Unexpected part content-type: {content_type}. " + "Expected 'application/json'." + ) + + try: + # Read the part content as text + body = await part.text() + body = body.strip() + + if log.isEnabledFor(logging.DEBUG): + log.debug("<<< %s", body or "(empty body, skipping)") + + if not body: + return None + + # Parse JSON body using custom deserializer + data = self.json_deserialize(body) + + # Handle heartbeats - empty JSON objects + if not data: + log.debug("Received heartbeat, ignoring") + return None + + # The multipart subscription protocol wraps data in a "payload" property + if "payload" not in data: + log.warning("Invalid response: missing 'payload' field") + return None + + payload = data["payload"] + + # Check for transport-level errors (payload is null) + if payload is None: + # If there are errors, this is a transport-level error + errors = data.get("errors") + if errors: + error_messages = [ + error.get("message", "Unknown transport error") + for error in errors + ] + + for message in error_messages: + log.error(f"Transport error: {message}") + + raise TransportServerError("\n\n".join(error_messages)) + else: + # Null payload without errors - just skip this part + return None + + # Extract GraphQL data from payload + return ExecutionResult( + data=payload.get("data"), + errors=payload.get("errors"), + extensions=payload.get("extensions"), + ) + except json.JSONDecodeError as e: + log.warning( + f"Failed to parse JSON: {e}, body: {body[:100] if body else ''}" + ) + return None + + async def execute( + self, + request: GraphQLRequest, + ) -> ExecutionResult: + """ + :raises: NotImplementedError - This transport only supports subscriptions + """ + raise NotImplementedError( + "The HTTP multipart transport does not support queries or " + "mutations. Use HTTPTransport for queries and mutations, or use " + "subscribe() for subscriptions." + ) diff --git a/gql/utilities/get_introspection_query_ast.py b/gql/utilities/get_introspection_query_ast.py index 0422a225..8d981600 100644 --- a/gql/utilities/get_introspection_query_ast.py +++ b/gql/utilities/get_introspection_query_ast.py @@ -11,6 +11,8 @@ def get_introspection_query_ast( directive_is_repeatable: bool = False, schema_description: bool = False, input_value_deprecation: bool = True, + input_object_one_of: bool = False, + *, type_recursion_level: int = 7, ) -> DocumentNode: """Get a query for introspection as a document using the DSL module. @@ -68,6 +70,13 @@ def get_introspection_query_ast( ) if descriptions: fragment_FullType.select(ds.__Type.description) + if input_object_one_of: + try: + fragment_FullType.select(ds.__Type.isOneOf) + except AttributeError: # pragma: no cover + raise NotImplementedError( + "isOneOf is only supported from graphql-core version 3.3.0a7" + ) if specified_by_url: fragment_FullType.select(ds.__Type.specifiedByURL) diff --git a/setup.py b/setup.py index 39a6e453..f4508b86 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ install_requires = [ "graphql-core>=3.3.0a3,<3.4", "yarl>=1.6,<2.0", - "backoff>=1.11.1,<3.0", + "tenacity>=9.1.2,<10.0", "anyio>=3.0,<5", "typing_extensions>=4.0.0; python_version<'3.11'", ] @@ -16,8 +16,9 @@ tests_requires = [ "parse==1.20.2", + "packaging>=21.0", "pytest==8.3.4", - "pytest-asyncio==0.25.3", + "pytest-asyncio==1.2.0", "pytest-console-scripts==1.4.1", "pytest-cov==6.0.0", "vcrpy==7.0.0", @@ -95,6 +96,7 @@ "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14", "Programming Language :: Python :: Implementation :: PyPy", ], keywords="api graphql protocol rest relay gql client", diff --git a/tests/conftest.py b/tests/conftest.py index cef561f7..9de910ed 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -128,6 +128,7 @@ async def ssl_aiohttp_server(): "gql.transport.appsync", "gql.transport.common.base", "gql.transport.httpx", + "gql.transport.http_multipart_transport", "gql.transport.phoenix_channel_websockets", "gql.transport.requests", "gql.transport.websockets", diff --git a/tests/starwars/test_dsl.py b/tests/starwars/test_dsl.py index 7f042a07..ca9137a7 100644 --- a/tests/starwars/test_dsl.py +++ b/tests/starwars/test_dsl.py @@ -15,11 +15,15 @@ NonNullTypeNode, NullValueNode, Undefined, +) +from graphql import __version__ as graphql_version +from graphql import ( build_ast_schema, parse, print_ast, ) from graphql.utilities import get_introspection_query +from packaging import version from gql import Client, gql from gql.dsl import ( @@ -1084,6 +1088,50 @@ def test_get_introspection_query_ast(option): ) +@pytest.mark.skipif( + version.parse(graphql_version) < version.parse("3.3.0a7"), + reason="Requires graphql-core >= 3.3.0a7", +) +@pytest.mark.parametrize("option", [True, False]) +def test_get_introspection_query_ast_is_one_of(option): + + introspection_query = print_ast( + gql( + get_introspection_query( + input_value_deprecation=option, + ) + ).document + ) + + # Because the option does not exist yet in graphql-core, + # we add it manually here for now + if option: + introspection_query = introspection_query.replace( + "fields", + "isOneOf\n fields", + ) + + dsl_introspection_query = get_introspection_query_ast( + input_value_deprecation=option, + input_object_one_of=option, + type_recursion_level=9, + ) + + assert introspection_query == print_ast(dsl_introspection_query) + + +@pytest.mark.skipif( + version.parse(graphql_version) >= version.parse("3.3.0a7"), + reason="Test only for older graphql-core versions < 3.3.0a7", +) +def test_get_introspection_query_ast_is_one_of_not_implemented_yet(): + + with pytest.raises(NotImplementedError): + get_introspection_query_ast( + input_object_one_of=True, + ) + + def test_typename_aliased(ds): query = """ hero { diff --git a/tests/test_cli.py b/tests/test_cli.py index 4c6b7d15..df613afc 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -407,6 +407,7 @@ def test_cli_parse_schema_download(parser): "specified_by_url:True", "schema_description:true", "directive_is_repeatable:true", + "input_object_one_of:true", "--print-schema", ] ) @@ -419,6 +420,7 @@ def test_cli_parse_schema_download(parser): "specified_by_url": True, "schema_description": True, "directive_is_repeatable": True, + "input_object_one_of": True, } assert introspection_args == expected_args diff --git a/tests/test_http_multipart_transport.py b/tests/test_http_multipart_transport.py new file mode 100644 index 00000000..0784f08f --- /dev/null +++ b/tests/test_http_multipart_transport.py @@ -0,0 +1,691 @@ +import asyncio +import json +from unittest.mock import AsyncMock, patch + +import pytest +from aiohttp import web + +from gql import Client, gql +from gql.graphql_request import GraphQLRequest +from gql.transport.exceptions import ( + TransportAlreadyConnected, + TransportClosed, + TransportConnectionFailed, + TransportProtocolError, + TransportServerError, +) + +subscription_str = """ + subscription { + book { + title + author + } + } +""" + +book1 = {"title": "Book 1", "author": "Author 1"} +book2 = {"title": "Book 2", "author": "Author 2"} +book3 = {"title": "Book 3", "author": "Author 3"} + + +def create_multipart_response(books, *, separator="\r\n", include_heartbeat=False): + """Helper to create parts for a streamed response body.""" + parts = [] + + for idx, book in enumerate(books): + data = {"data": {"book": book}} + payload = {"payload": data} + + parts.append(( + f"--graphql{separator}" + f"Content-Type: application/json{separator}" + f"{separator}" + f"{json.dumps(payload)}{separator}" + )) # fmt: skip + + # Add heartbeat after first item if requested + if include_heartbeat and idx == 0: + parts.append(( + f"--graphql{separator}" + f"Content-Type: application/json{separator}" + f"{separator}" + f"{{}}{separator}" + )) # fmt: skip + + # Add end boundary + parts.append(f"--graphql--{separator}") + + return parts + + +@pytest.fixture +@pytest.mark.aiohttp +def multipart_server(aiohttp_server): + async def create_server( + parts, + *, + content_type=( + "multipart/mixed;boundary=graphql;subscriptionSpec=1.0,application/json" + ), + request_handler=lambda *args: None, + ): + async def handler(request): + request_handler(request) + response = web.StreamResponse() + response.headers["Content-Type"] = content_type + response.enable_chunked_encoding() + await response.prepare(request) + for part in parts: + await response.write(part.encode()) + await asyncio.sleep(0) # force the chunk to be written + await response.write_eof() + return response + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + return server + + return create_server + + +@pytest.mark.asyncio +async def test_http_multipart_subscription(multipart_server): + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + def assert_response_headers(request): + # Verify the Accept header follows the spec + accept_header = request.headers["accept"] + assert "multipart/mixed" in accept_header + assert "boundary=graphql" in accept_header + assert "subscriptionSpec=1.0" in accept_header + assert "application/json" in accept_header + + parts = create_multipart_response([book1, book2]) + server = await multipart_server(parts, request_handler=assert_response_headers) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + query = gql(subscription_str) + + async with Client(transport=transport) as session: + results = [] + async for result in session.subscribe(query): + results.append(result) + + # Heartbeats should be filtered out + assert len(results) == 2 + assert results[0]["book"]["title"] == "Book 1" + assert results[1]["book"]["title"] == "Book 2" + + +@pytest.mark.asyncio +async def test_http_multipart_subscription_with_heartbeat(multipart_server): + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + parts = create_multipart_response([book1, book2], include_heartbeat=True) + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + query = gql(subscription_str) + + async with Client(transport=transport) as session: + results = [] + async for result in session.subscribe(query): + results.append(result) + + # Heartbeats should be filtered out + assert len(results) == 2 + assert results[0]["book"]["title"] == "Book 1" + assert results[1]["book"]["title"] == "Book 2" + + +@pytest.mark.aiohttp +@pytest.mark.asyncio +async def test_http_multipart_unsupported_content_type(aiohttp_server): + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + # Return text/html instead of application/json + return web.Response(text="

hello

", content_type="text/html") + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + transport = HTTPMultipartTransport(url=server.make_url("/")) + + query = gql(subscription_str) + + async with Client(transport=transport) as session: + with pytest.raises(TransportProtocolError) as exc_info: + async for result in session.subscribe(query): + pass + + assert "Unexpected content-type" in str(exc_info.value) + + +@pytest.mark.aiohttp +@pytest.mark.asyncio +async def test_http_multipart_server_error(aiohttp_server): + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + async def handler(request): + return web.Response(text="Internal Server Error", status=500) + + app = web.Application() + app.router.add_route("POST", "/", handler) + server = await aiohttp_server(app) + transport = HTTPMultipartTransport(url=server.make_url("/")) + + query = gql(subscription_str) + + async with Client(transport=transport) as session: + with pytest.raises(TransportServerError) as exc_info: + async for result in session.subscribe(query): + pass + + assert "500: Internal Server Error" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_http_multipart_transport_level_error(multipart_server): + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + # Transport error has null payload with errors at top level + error_response = { + "payload": None, + "errors": [{"message": "Transport connection failed"}], + } + parts = [ + ( + "--graphql\r\n" + "Content-Type: application/json\r\n" + "\r\n" + f"{json.dumps(error_response)}\r\n" + ), + "--graphql--\r\n", + ] + + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + with pytest.raises(TransportServerError) as exc_info: + async for result in session.subscribe(query): + pass + + assert "Transport connection failed" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_http_multipart_graphql_errors(multipart_server): + from gql.transport.exceptions import TransportQueryError + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + # GraphQL errors come inside the payload + response = { + "payload": { + "data": {"book": {**book1, "author": None}}, + "errors": [ + {"message": "could not fetch author", "path": ["book", "author"]} + ], + } + } + parts = [ + ( + f"--graphql\r\n" + f"Content-Type: application/json\r\n" + f"\r\n" + f"{json.dumps(response)}\r\n" + ), + "--graphql--\r\n", + ] + + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + # Client raises TransportQueryError when there are errors in the result + with pytest.raises(TransportQueryError) as exc_info: + async for result in session.subscribe(query): + pass + + # Verify error details + assert "could not fetch author" in str(exc_info.value).lower() + assert exc_info.value.data is not None + assert exc_info.value.data["book"]["author"] is None + # Verify we can still get data for the non-error fields + assert exc_info.value.data["book"]["title"] == "Book 1" + + +@pytest.mark.asyncio +async def test_http_multipart_execute_method(multipart_server): + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + parts = create_multipart_response([book1, book2]) + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + query = gql(subscription_str) + + async with Client(transport=transport) as session: + # execute() should raise NotImplementedError + with pytest.raises(NotImplementedError) as exc_info: + await session.execute(query) + + assert "does not support queries or mutations" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_http_multipart_transport_already_connected(multipart_server): + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + parts = create_multipart_response([]) + server = await multipart_server(parts) + transport = HTTPMultipartTransport(url=server.make_url("/")) + + await transport.connect() + + with pytest.raises(TransportAlreadyConnected): + await transport.connect() + + await transport.close() + + +@pytest.mark.asyncio +async def test_http_multipart_transport_not_connected(multipart_server): + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + parts = create_multipart_response([book1]) + server = await multipart_server(parts) + transport = HTTPMultipartTransport(url=server.make_url("/")) + + query = gql(subscription_str) + request = GraphQLRequest(query) + + with pytest.raises(TransportClosed): + async for result in transport.subscribe(request): + pass + + +@pytest.mark.asyncio +async def test_http_multipart_execute_empty_response(multipart_server): + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + # Return empty multipart response (no data parts) + parts = ["--graphql--\r\n"] + server = await multipart_server(parts) + transport = HTTPMultipartTransport(url=server.make_url("/")) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + # execute() should raise NotImplementedError + with pytest.raises(NotImplementedError) as exc_info: + await session.execute(query) + + assert "does not support queries or mutations" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_http_multipart_newline_separator(multipart_server): + """Test that LF-only separators are rejected (spec requires CRLF).""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + # The GraphQL over HTTP spec requires CRLF line endings in multipart responses + # https://github.com/graphql/graphql-over-http/blob/main/rfcs/IncrementalDelivery.md + parts = create_multipart_response([book1], separator="\n") + server = await multipart_server(parts) + transport = HTTPMultipartTransport(url=server.make_url("/")) + + query = gql(subscription_str) + + async with Client(transport=transport) as session: + # Non-compliant multipart format (LF instead of CRLF) should fail + with pytest.raises(TransportConnectionFailed): + async for result in session.subscribe(query): + pass + + +@pytest.mark.asyncio +async def test_http_multipart_transport_connection_failed_error(): + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + # Use an invalid URL that will fail to connect + transport = HTTPMultipartTransport(url="http://invalid.local:-1/graphql", timeout=1) + + query = gql(subscription_str) + + async with Client(transport=transport) as session: + with pytest.raises(TransportConnectionFailed): + async for result in session.subscribe(query): + pass + + +@pytest.mark.asyncio +async def test_http_multipart_connector_owner_false(multipart_server): + """Test closing transport with connector_owner=False.""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + parts = create_multipart_response([book1]) + server = await multipart_server(parts) + url = server.make_url("/") + + transport = HTTPMultipartTransport( + url=url, client_session_args={"connector_owner": False} + ) + + query = gql(subscription_str) + + async with Client(transport=transport) as session: + results = [] + async for result in session.subscribe(query): + results.append(result) + + assert len(results) == 1 + + +@pytest.mark.asyncio +async def test_http_multipart_ssl_close_timeout(multipart_server): + """Test SSL close timeout during transport close.""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + parts = create_multipart_response([book1], separator="\n") + server = await multipart_server(parts) + url = server.make_url("/") + + transport = HTTPMultipartTransport(url=url, ssl_close_timeout=0.001) + + await transport.connect() + + # Mock the closed event to timeout + with patch( + "gql.transport.http_multipart_transport.create_aiohttp_closed_event" + ) as mock_event: + mock_wait = AsyncMock() + mock_wait.side_effect = asyncio.TimeoutError() + mock_event.return_value.wait = mock_wait + + # Should handle timeout gracefully + await transport.close() + + +@pytest.mark.asyncio +async def test_http_multipart_malformed_json(multipart_server): + """Test handling of malformed JSON in multipart response.""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + parts = [ + ( + "--graphql\r\n" + "Content-Type: application/json\r\n" + "\r\n" + "{invalid json }\r\n" + ), + "--graphql--\r\n", + ] + + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + results = [] + async for result in session.subscribe(query): + results.append(result) + + # Should skip malformed parts + assert len(results) == 0 + + +@pytest.mark.asyncio +async def test_http_multipart_payload_null_no_errors(multipart_server): + """Test handling of null payload without errors.""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + # Null payload but no errors + response = {"payload": None} + parts = [ + ( + "--graphql\r\n" + "Content-Type: application/json\r\n" + "\r\n" + f"{json.dumps(response)}\r\n" + ), + "--graphql--\r\n", + ] + + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + results = [] + async for result in session.subscribe(query): + results.append(result) + + # Null payload without errors should return nothing + assert len(results) == 0 + + +@pytest.mark.asyncio +async def test_http_multipart_invalid_utf8(multipart_server): + """Test handling of invalid UTF-8 in multipart response.""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + parts = [ + ( + "--graphql\r\n" + "Content-Type: application/json\r\n" + "\r\n" + "\xff\xfe\r\n" # Contains invalid UTF-8 + ), + "--graphql--\r\n", + ] + + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + results = [] + async for result in session.subscribe(query): + results.append(result) + + # Should skip invalid part + assert len(results) == 0 + + +@pytest.mark.asyncio +async def test_http_multipart_chunked_boundary_split(multipart_server): + """Test parsing when boundary is split across chunks.""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + parts = [ + "--gra", + ( + "phql\r\nContent-Type: application/json\r\n\r\n" + '{"payload": {"data": {"book": {"title": "Bo' + ), + 'ok 1"}}}}\r\n--graphql--\r\n', + ] + + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + results = [] + async for result in session.subscribe(query): + results.append(result) + + assert len(results) == 1 + assert results[0]["book"]["title"] == "Book 1" + + +@pytest.mark.asyncio +async def test_http_multipart_wrong_part_content_type(multipart_server): + """Test that parts with wrong content-type raise an error.""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + # Part with text/html instead of application/json + parts = [ + ("--graphql\r\n" "Content-Type: text/html\r\n" "\r\n" "

hello

\r\n"), + "--graphql--\r\n", + ] + + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + + with pytest.raises(TransportProtocolError) as exc_info: + async for result in session.subscribe(query): + pass + + assert "Unexpected part content-type" in str(exc_info.value) + assert "text/html" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_http_multipart_response_headers(multipart_server): + """Test that response headers are captured in the transport.""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + parts = create_multipart_response([book1]) + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + query = gql(subscription_str) + + async with Client(transport=transport) as session: + results = [] + async for result in session.subscribe(query): + results.append(result) + + # Verify response headers are captured + assert transport.response_headers is not None + assert "Content-Type" in transport.response_headers + assert "multipart/mixed" in transport.response_headers["Content-Type"] + + +@pytest.mark.asyncio +async def test_http_multipart_empty_body(multipart_server): + """Test part with empty body after stripping.""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + # Part with only whitespace body + parts = [ + "--graphql\r\nContent-Type: application/json\r\n\r\n \r\n", + "--graphql--\r\n", + ] + + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + results = [] + async for result in session.subscribe(query): + results.append(result) + assert len(results) == 0 + + +@pytest.mark.asyncio +async def test_http_multipart_missing_payload_field(multipart_server): + """Test handling of response missing required 'payload' field.""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + response = {"foo": "bar"} # No payload field! + parts = [ + ( + "--graphql\r\n" + "Content-Type: application/json\r\n" + "\r\n" + f"{json.dumps(response)}\r\n" + ), + "--graphql--\r\n", + ] + + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + results = [] + async for result in session.subscribe(query): + results.append(result) + + # Should skip invalid response and return no results + assert len(results) == 0 + + +@pytest.mark.asyncio +async def test_http_multipart_with_content_length_headers(multipart_server): + """Test multipart response with Content-Length headers (like real servers send).""" + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + # Simulate real server behavior: each part has Content-Length header + book1_payload = json.dumps({"payload": {"data": {"book": book1}}}) + book2_payload = json.dumps({"payload": {"data": {"book": book2}}}) + heartbeat_payload = "{}" + + parts = [ + ( + "--graphql\r\n" + "Content-Type: application/json; charset=utf-8\r\n" + f"Content-Length: {len(heartbeat_payload)}\r\n" + "\r\n" + f"{heartbeat_payload}\r\n" + ), + ( + "--graphql\r\n" + "Content-Type: application/json; charset=utf-8\r\n" + f"Content-Length: {len(book1_payload)}\r\n" + "\r\n" + f"{book1_payload}\r\n" + ), + ( + "--graphql\r\n" + "Content-Type: application/json; charset=utf-8\r\n" + f"Content-Length: {len(book2_payload)}\r\n" + "\r\n" + f"{book2_payload}\r\n" + ), + "--graphql\r\n", # Extra empty part like real servers + "--graphql--\r\n", # Final boundary + ] + + server = await multipart_server(parts) + url = server.make_url("/") + transport = HTTPMultipartTransport(url=url) + + async with Client(transport=transport) as session: + query = gql(subscription_str) + results = [] + async for result in session.subscribe(query): + results.append(result) + + # Should get 2 books (heartbeat and empty part filtered) + assert len(results) == 2 + assert results[0]["book"]["title"] == "Book 1" + assert results[1]["book"]["title"] == "Book 2" diff --git a/tox.ini b/tox.ini index f6d4b48e..21129e3c 100644 --- a/tox.ini +++ b/tox.ini @@ -1,7 +1,7 @@ [tox] envlist = black,flake8,import-order,mypy,manifest, - py{39,310,311,312,313,py3} + py{39,310,311,312,313,314,py3} [gh-actions] python = @@ -10,6 +10,7 @@ python = 3.11: py311 3.12: py312 3.13: py313 + 3.14: py314 pypy-3: pypy3 [testenv] @@ -28,7 +29,7 @@ deps = -e.[test] commands = pip install -U setuptools ; run "tox -- tests -s" to show output for debugging - py{39,310,311,312,313,py3}: pytest {posargs:tests} + py{39,310,311,312,313,314,py3}: pytest {posargs:tests} py{312}: pytest {posargs:tests --cov-report=term-missing --cov=gql} [testenv:black] From 7d92518fc200a6de42ccd731736e33322be0c528 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 19 Dec 2025 23:16:19 +0000 Subject: [PATCH 3/3] Fix nested multipart to raise error instead of silently skipping Co-authored-by: magicmark <1590756+magicmark@users.noreply.github.com> --- gql/transport/http_multipart_transport.py | 6 ++- tests/test_http_multipart_transport.py | 47 +++++++++++++++++++++++ 2 files changed, 51 insertions(+), 2 deletions(-) diff --git a/gql/transport/http_multipart_transport.py b/gql/transport/http_multipart_transport.py index a869d36f..56bd5758 100644 --- a/gql/transport/http_multipart_transport.py +++ b/gql/transport/http_multipart_transport.py @@ -228,9 +228,11 @@ async def _parse_multipart_response( # No more parts break - # Skip nested multipart (not expected in GraphQL subscriptions) + # Nested multipart is not expected in GraphQL subscriptions if isinstance(part, aiohttp.MultipartReader): - continue + raise TransportProtocolError( + "Received unexpected nested multipart part in GraphQL subscription response." + ) result = await self._parse_multipart_part(part) if result: diff --git a/tests/test_http_multipart_transport.py b/tests/test_http_multipart_transport.py index 0784f08f..745b2e37 100644 --- a/tests/test_http_multipart_transport.py +++ b/tests/test_http_multipart_transport.py @@ -2,6 +2,7 @@ import json from unittest.mock import AsyncMock, patch +import aiohttp import pytest from aiohttp import web @@ -689,3 +690,49 @@ async def test_http_multipart_with_content_length_headers(multipart_server): assert len(results) == 2 assert results[0]["book"]["title"] == "Book 1" assert results[1]["book"]["title"] == "Book 2" + + +@pytest.mark.asyncio +async def test_http_multipart_nested_multipart_error(): + """Test that nested multipart parts raise TransportProtocolError.""" + from unittest.mock import AsyncMock, MagicMock, patch + + from gql.transport.http_multipart_transport import HTTPMultipartTransport + + # Create a mock response + mock_response = MagicMock() + mock_response.status = 200 + mock_response.headers = { + "Content-Type": ( + "multipart/mixed;boundary=graphql;subscriptionSpec=1.0,application/json" + ) + } + + # Create a mock multipart reader that returns a nested MultipartReader + nested_reader = MagicMock(spec=aiohttp.MultipartReader) + mock_reader = MagicMock() + mock_reader.next = AsyncMock(side_effect=[nested_reader, None]) + mock_reader.at_eof = MagicMock(return_value=False) + + transport = HTTPMultipartTransport(url="http://test.local/graphql") + + await transport.connect() + + query = gql(subscription_str) + + try: + with patch("aiohttp.MultipartReader.from_response", return_value=mock_reader): + with patch.object( + transport.session, "post" + ) as mock_post: + mock_post.return_value.__aenter__.return_value = mock_response + + with pytest.raises(TransportProtocolError) as exc_info: + async for result in transport.subscribe( + GraphQLRequest(query) + ): + pass + + assert "nested multipart" in str(exc_info.value).lower() + finally: + await transport.close()