Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sending a file-like data payload prevents connection reuse #10325

Open
1 task done
Tjstretchalot opened this issue Jan 14, 2025 · 27 comments
Open
1 task done

Sending a file-like data payload prevents connection reuse #10325

Tjstretchalot opened this issue Jan 14, 2025 · 27 comments
Labels

Comments

@Tjstretchalot
Copy link

Tjstretchalot commented Jan 14, 2025

Describe the bug

If using aiohttp to send a 0-byte payload (as if via io.BytesIO(b'')) to get a very small response, the connection will be incorrectly closed when it could be reused

from adding prints directly to aiohttp to figure out why the connections were not being reused, I confirmed it still initializes write_bytes, but _cleanup_writer is called by payload.on_eof during ClientResponse#start, which leads to the writer being cancelled

when asyncio.CancelledError is raised in write_bytes, conn.close() is called with a comment noting that the body hasn't been fully sent so the connection can't be reused, but in this case that's incorrect - there was nothing to send in the first place!

This same sequence of events happens even if the payload is not empty, though it's not as self-evident that the comment is incorrect

To Reproduce

https://github.com/Tjstretchalot/aiohttp-reuse/blob/main/src/main.py

Expected behavior

I would expect aiohttp to reuse the connection whenever possible, regardless if passed a bytes object or a file-like object

Logs/tracebacks

======== Running on http://0.0.0.0:3003 ========
(Press CTRL+C to quit)
Traceback (most recent call last):
  File "src/main.py", line 43, in <module>
    asyncio.run(main())
  File "Python311\Lib\asyncio\runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "Python311\Lib\asyncio\runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "Python311\Lib\asyncio\base_events.py", line 653, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "src\main.py", line 35, in main
    len(session._connector._conns) == 1  # fails here
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AssertionError: defaultdict(<class 'collections.deque'>, {})

Python Version

$ python --version
Python 3.11.1

aiohttp Version

$ python -m pip show aiohttp
Version: 3.11.11

multidict Version

$ python -m pip show multidict
Version: 6.1.0

propcache Version

$ python -m pip show propcache
Version: 0.2.1

yarl Version

$ python -m pip show yarl
Version: 1.18.3

OS

Windows 10

Related component

Client

Additional context

No response

Code of Conduct

  • I agree to follow the aio-libs Code of Conduct
Tjstretchalot added a commit to Tjstretchalot/lonelypss that referenced this issue Jan 14, 2025
@Tjstretchalot
Copy link
Author

Actually, from followup testing I don't think this is restricted to just 0-byte payloads, though that's where I definitely saw it the most. I still see connections not being reused due to a similar chain of events (write_bytes CancelledError) with larger payloads, just perhaps less often. Commenting out conn.close() does completely eliminate them without error, so I'm confident it's a false positive that the body hasn't finished writing yet

@Tjstretchalot
Copy link
Author

Tjstretchalot commented Jan 14, 2025

This example seems to reproduce it every time

import asyncio
import io
import aiohttp
import aiohttp.web


async def hello(_request: aiohttp.web.Request) -> aiohttp.web.Response:
    return aiohttp.web.Response(body=b"")


async def main():
    app = aiohttp.web.Application()
    app.router.add_post("/hello", hello)
    server_task = asyncio.create_task(aiohttp.web._run_app(app, port=3003))

    async with aiohttp.ClientSession() as session:
        async with session.post(
            "http://127.0.0.1:3003/hello",
            data=b"",
            headers={"Content-Length": "0"},
        ) as response:
            response.raise_for_status()

        assert len(session._connector._conns) == 1, session._connector._conns

    async with aiohttp.ClientSession() as session:
        async with session.post(
            "http://127.0.0.1:3003/hello",
            data=io.BytesIO(),
            headers={"Content-Length": "0"},
        ) as response:
            response.raise_for_status()

        assert (
            len(session._connector._conns) == 1  # fails here
        ), session._connector._conns

    server_task.cancel()
    await server_task


if __name__ == "__main__":
    asyncio.run(main())

@Tjstretchalot Tjstretchalot changed the title Sending an empty payload prevents connection reuse Sending a file-like data payload prevents connection reuse Jan 14, 2025
@bdraco
Copy link
Member

bdraco commented Jan 14, 2025

I tried to write a failing test for this in #10326

... The behavior is a bit strange as either I've got something wrong, or it works on the first client, and fails the second client.. but it could also be the mocking

@bdraco
Copy link
Member

bdraco commented Jan 14, 2025

Looks like there is a race somewhere as #10326 fails on some runners and passes on others

@bdraco
Copy link
Member

bdraco commented Jan 14, 2025

Ran out of time to dig for the race. Will try again later

@bdraco
Copy link
Member

bdraco commented Jan 14, 2025

Also it would be helpful to know if you can reproduce on Python 3.12+

@Tjstretchalot
Copy link
Author

Tjstretchalot commented Jan 14, 2025

Reproduces on py3.9, py3.10, py3.11, and py3.12 on linux, with very minor tweaking to pass linting and to avoid racing the server start

uploaded the standalone reproduction to https://github.com/Tjstretchalot/aiohttp-reuse/blob/main/src/main.py

view the output at https://github.com/Tjstretchalot/aiohttp-reuse/actions/runs/12776635905

EDIT: also ran it again at https://github.com/Tjstretchalot/aiohttp-reuse/actions/runs/12776732560/job/35616013216 with continue-on-error: true to show the job cancellation didn't effect anything

@Dreamsorcerer
Copy link
Member

as #10326 fails on some runners and passes on others

Even within one runner: https://github.com/aio-libs/aiohttp/actions/runs/12775358190/job/35611584770?pr=10326
The test run failed because the test actually passed, then the rerun of the failed test xfailed as expected. So, it passed and failed on the same CI job.

@Dreamsorcerer
Copy link
Member

from adding prints directly to aiohttp to figure out why the connections were not being reused, I confirmed it still initializes write_bytes, but _cleanup_writer is called by payload.on_eof during ClientResponse#start, which leads to the writer being cancelled

I guess we could either check if the length is 0 and skip the write_bytes() task, or we could make the same check later so as not to close the connection.

Although, it seems a bit odd for a user to be explicitly sending empty bytes, so this doesn't seem like a normal case that users should be hitting. Normally, if you don't want to send a payload, you wouldn't pass one.

@Dreamsorcerer
Copy link
Member

Dreamsorcerer commented Jan 15, 2025

as #10326 fails on some runners and passes on others

Even within one runner: https://github.com/aio-libs/aiohttp/actions/runs/12775358190/job/35611584770?pr=10326 The test run failed because the test actually passed, then the rerun of the failed test xfailed as expected. So, it passed and failed on the same CI job.

Just thinking about the race condition, this is normal because the connection gets closed when the payload from the server is complete and the client payload hasn't been sent. As the server is not waiting for anything, it will send the complete response immediately. The race condition is between whether the client manages to parse the response before or after it's sent the payload (which in this case is empty).

@Dreamsorcerer
Copy link
Member

Overall, this feels like a non-issue to me and not worth spending any time on. If you don't want to send a payload, don't send one, instead of trying to send a 0-byte payload..

@Tjstretchalot
Copy link
Author

Tjstretchalot commented Jan 15, 2025

@Dreamsorcerer To be clear, it still happens if you send a payload, it's just more reliable to reproduce without one since more server implementations respond immediately in that situation, but they don't have to wait for the end of the request body to determine their response, and in practice often don't need to much of the time. If the client doesn't reuse connections, this eventually exhausts all the clients ports which leads to really confusing error messages (usually on windows you get "duplicate name exists on the network"). Originally I thought it was just the empty body, but hopefully this reproducer makes it clear it's worth fixing:

https://github.com/Tjstretchalot/aiohttp-reuse/blob/4a91130a3cee2d6161268ebb1e71d6403e2558b9/src/main.py

https://github.com/Tjstretchalot/aiohttp-reuse/actions/runs/12781432781

EDIT: for common examples of when you don't need the request body to finish a response would be if-match headers, if-none-match headers, and/or authorization headers failing. you can then format an e.g. 412 precondition failed or 401 unauthorized right away, no need to waste time reading the request body

@Tjstretchalot
Copy link
Author

Tjstretchalot commented Jan 15, 2025

Did a little more digging; the server is sending the response in the right part of the http protocol (i.e., theres nothing special about these requests), this race condition is just based on if the server can respond fast enough.

i've got it reproducing reliably with all the following:

  • there is data included in the request body
  • the server reads that data and prints it out to confirm it really read the entire request body before responding
  • content-length header is not included on the client (which means the length of the body is unknown to aiohttp) (aiohttp is using tell/seek to get length for bytesio in this example)
  • i repeat the version that works (sending bytes instead of bytesio) 5 times before and after to show that variant works, and the version that fails (sending bytesio) 5 times to show it fails every single time

reproducer: https://github.com/Tjstretchalot/aiohttp-reuse/blob/ca2d0e10c731f723afe60f232495774b2830c296/src/main.py
actions: https://github.com/Tjstretchalot/aiohttp-reuse/actions/runs/12789695518

i also verified that aiohttp behaves correctly if the server is slow enough to respond by adding a 0.1 second sleep before it responds back:

reproducer: https://github.com/Tjstretchalot/aiohttp-reuse/blob/3920f2c6f4dc7ceb6c3e88cc605bedd11f4e4c0d/src/main.py
actions: https://github.com/Tjstretchalot/aiohttp-reuse/actions/runs/12789788567

@Dreamsorcerer
Copy link
Member

it still happens if you send a payload, it's just more reliable to reproduce without one since more server implementations respond immediately in that situation, but they don't have to wait for the end of the request body to determine their response

Yes, this is normal expected behaviour of an HTTP client. If the server isn't waiting for the payload (i.e. it doesn't need it to process the response), then the client may receive the completed response before it's sent the payload.

In this situation, we:
a) Don't want to waste time/bandwidth continuing to send a payload the server isn't going to read.
b) Can't reuse the connection without sending the payload.

Therefore, the connection gets closed early.

@Dreamsorcerer
Copy link
Member

Dreamsorcerer commented Jan 15, 2025

  • there is data included in the request body

  • the server reads that data and prints it out to confirm it really read the entire request body before responding

  • content-length header is not included on the client (which means the length of the body is unknown to aiohttp) (aiohttp is using tell/seek to get length for bytesio in this example)

  • i repeat the version that works (sending bytes instead of bytesio) 5 times before and after to show that variant works, and the version that fails (sending bytesio) 5 times to show it fails every single time

This sounds like a different issue from before. If the server is reading the data fully, then I don't think we should be losing the connection here. So, this variation may be a bug..

@Dreamsorcerer
Copy link
Member

content-length header is not included on the client (aiohttp is using tell/seek to get length for bytesio in this example)

If the content-length header is not set automatically from the tell/seek code, then that also seems like a bug.. Probably separate from the issue of not managing to reuse the connection after a chunked request.

@Tjstretchalot
Copy link
Author

I didn't realize it special cased BytesIO to tell/seek. So I did some more testing and confirmed that chunked transports seems to avoid the issue, its only when using non-chunked:

  • Passing aiohttp.payload.BytesIOPayload wrapping BytesIO makes no difference

reproducer: https://github.com/Tjstretchalot/aiohttp-reuse/blob/4634c7e1bea90350d03b0b0ad0937249cc480f2c/src/main.py
actions: https://github.com/Tjstretchalot/aiohttp-reuse/actions/runs/12790143562

  • Passing aiohttp.payload.IOBasePayload instead of aiohttp.payload.BytesIOPayload does cause the test to pass, likely because it's using chunked transport, but only if the server waits for the request body. According to the comment you just wrote, this is the correct behavior

reproducer: https://github.com/Tjstretchalot/aiohttp-reuse/blob/c8eb14e7a9d0c25d44f065095893951b6850b80b/src/main.py
actions: https://github.com/Tjstretchalot/aiohttp-reuse/actions/runs/12790241114

  • Passing a 512kb body instead of a 4 byte one makes no difference

reproducer: https://github.com/Tjstretchalot/aiohttp-reuse/blob/24befe0bfd4e9b7871af79d233dc4dac7c5e8290/src/main.py
actions: https://github.com/Tjstretchalot/aiohttp-reuse/actions/runs/12790341189

@Dreamsorcerer
Copy link
Member

I updated #10326 to test what should be working behaviour as above, and the test is failing. So, looks like we have a proper test we can look at now.

@Tjstretchalot
Copy link
Author

Here's a basic proof of concept for a fix which isn't really sound (it eats the cancellederror, it has a timeout vs detecting something meaningful, and doesn't obviously fix all cases since i think the other side mostly just works by coincidence of being fewer event loop iterations) but does pass the reproducer I have

diff --git a/aiohttp/client_reqrep.py b/aiohttp/client_reqrep.py
index d3d9ced4..8e52778f 100644
--- a/aiohttp/client_reqrep.py
+++ b/aiohttp/client_reqrep.py
@@ -593,7 +593,11 @@ class ClientRequest:
         assert protocol is not None
         try:
             if isinstance(self.body, payload.Payload):
-                await self.body.write(writer)
+                write_task = asyncio.create_task(self.body.write(writer))
+                try:
+                    await asyncio.shield(write_task)
+                except asyncio.CancelledError:
+                    await asyncio.wait_for(write_task, timeout=0.1)
             else:
                 if isinstance(self.body, (bytes, bytearray)):
                     self.body = (self.body,)

in practice I've found asyncio cancellation just about impossible to use safely except by wrapping in something that detects cancellation and converting it to an asyncio.Event or threading.Event object as appropriate, so if I were to try a bigger fix it my first instinct would be converting write_bytes to that strategy; here it should be pretty straightforward since the cancel points can be found directly so can just be converted to using an event, but if AbstractStreamWriter is part of the api then it would be pretty difficult to actually implement write_bytes safely since i dont think with just asyncio cancellation its possible to tell if the write finished normally on another thread and scheduled to return from the coroutine and is just is waiting for an event loop vs the write is actually still pending

@Tjstretchalot
Copy link
Author

Actually, thinking about it more, probably the reason this side is failing is after the write it tries to close the stream, which definitely takes at least one event loop since it's run in executor, during which time it gets the response:

await loop.run_in_executor(None, self._value.close)

@Tjstretchalot
Copy link
Author

Expanding on the previous I now believe it takes 2 event loops at minimum after the final chunk is written before write() returns, during which time it gets the response

here is what happens if the payload is between 1 and 2^16 bytes , but the equivalent happens regardless:

  • the final chunk is read with non-zero bytes

chunk = await loop.run_in_executor(None, self._value.read, 2**16)

  • the final chunk is written and succeeds normally. at this point, any cancellation will incorrectly assume the write hasn't finished when the entire payload has actually already been sent

await writer.write(chunk)

  • another read is scheduled, taking up at least an event loop (but often more, e.g., GC triggers, executor needs to spin up a thread, whatever)

chunk = await loop.run_in_executor(None, self._value.read, 2**16)

  • that read is empty, so now it schedules closing which takes up at least an event loop (but, again, often more)

await loop.run_in_executor(None, self._value.close)

@Tjstretchalot
Copy link
Author

Confirming the above, the following proof of concept fix does pass my reproducer, though it's obviously very finicky to rely on counting event loop iterations, especially on an abstract object, and I doubt aiohttp wants to switch to synchronous file io:

monkey patch: https://github.com/Tjstretchalot/aiohttp-reuse/blob/ab8d6c04fb6a249b957b79fcd4fe362821041636/src/main.py
actions: https://github.com/Tjstretchalot/aiohttp-reuse/actions/runs/12791340459

@Dreamsorcerer
Copy link
Member

I guess if we're already closing, then we should consider the task complete. So, fix is probably just:

            with suppress(asyncio.CancelledError):
                await loop.run_in_executor(None, self._value.close)

Or similar.

@Tjstretchalot
Copy link
Author

It usually fails (though only like 60% of the time) reduced to one event loop with a synchronous close; the trickier one to remove would be the final read after the last of the data has been read and sent, especially if it happened to be an exact multiple of the chunk size. in this particular case it only effects non-chunked transport meaning the amount of data that is expected to be read is known, so it would be ok to count bytes. however, i think this is just an artifact of the reproducer; the race condition would exist regardless, just be less likely to trigger

@Dreamsorcerer
Copy link
Member

Dreamsorcerer commented Jan 15, 2025

in this particular case it only effects non-chunked transport meaning the amount of data that is expected to be read is known, so it would be ok to count bytes.

Yes, I think you're right. We could check the number of bytes remaining to be sent and skip the final read, then possibly suppress cancellation on the close.

however, i think this is just an artifact of the reproducer; the race condition would exist regardless, just be less likely to trigger

There would be no race condition for chunked messages as the ending marker for the chunk wouldn't be sent until write_bytes() has returned (I think...). That's why you've never seen it on chunked messages.

@Dreamsorcerer
Copy link
Member

I think the most systemic solution that could solve all cases is to modify this code to check the writer:

try:
if isinstance(self.body, payload.Payload):
await self.body.write(writer)
else:
if isinstance(self.body, (bytes, bytearray)):
self.body = (self.body,)
for chunk in self.body:
await writer.write(chunk)
except OSError as underlying_exc:
reraised_exc = underlying_exc
exc_is_not_timeout = underlying_exc.errno is not None or not isinstance(
underlying_exc, asyncio.TimeoutError
)
if exc_is_not_timeout:
reraised_exc = ClientOSError(
underlying_exc.errno,
f"Can not write request body for {self.url !s}",
)
set_exception(protocol, reraised_exc, underlying_exc)
except asyncio.CancelledError:
# Body hasn't been fully sent, so connection can't be reused.
conn.close()
raise

It's probably not quite that simple, but it might be possible to just do something along the lines of:

if self.chunked or writer.output_size < self.content_length:
    conn.close()

In the cancellation handler.

@Tjstretchalot
Copy link
Author

for chunked transport you seem to be correct that it doesn't prevent connection reuse; it can race in a similar way in the sense that CancelledError can be raised at

await writer.write_eof()

where the EOF has already been written at

self._writelines((chunk_len_pre, chunk, b"\r\n0\r\n\r\n"))

and it's waiting at

await self.drain()

but it doesn't end up mattering because

await writer.write_eof()

is past the try/except, so it doesn't close the connection. i think it's possible that the eof actually hasn't been written yet so the connection wouldn't be safe to reuse, but this side errs on the more common case (it was written and got raced) so if it is possible to write a test to catch an edge case there it would be showing the connection was improperly reused (the opposite of this issue)


for checking writer.output_size my reproducer passes with this, though i had to check the size earlier (and probably not in the right way, theres probably an isinstance check that works instead of getattr) to avoid trying to use the stream after it was closed (reading .size after close attempts to seek after close which errors)

diff --git a/aiohttp/client_reqrep.py b/aiohttp/client_reqrep.py
index d3d9ced4..a8fdaed6 100644
--- a/aiohttp/client_reqrep.py
+++ b/aiohttp/client_reqrep.py
@@ -591,6 +591,8 @@ class ClientRequest:
 
         protocol = conn.protocol
         assert protocol is not None
+        
+        expected_write_size = getattr(self.body, 'size', None) 
         try:
             if isinstance(self.body, payload.Payload):
                 await self.body.write(writer)
@@ -615,7 +617,9 @@ class ClientRequest:
             set_exception(protocol, reraised_exc, underlying_exc)
         except asyncio.CancelledError:
             # Body hasn't been fully sent, so connection can't be reused.
-            conn.close()
+            if self.chunked or expected_write_size is None or writer.output_size < expected_write_size:
+                # Body hasn't been fully sent, so connection can't be reused.
+                conn.close()
             raise
         except Exception as underlying_exc:
             set_exception(

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants