Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add streaming backpressure #152

Open
pgjones opened this issue Jul 7, 2022 · 4 comments · May be fixed by #427
Open

Add streaming backpressure #152

pgjones opened this issue Jul 7, 2022 · 4 comments · May be fixed by #427

Comments

@pgjones
Copy link
Member

pgjones commented Jul 7, 2022

Some more context when this is relevant.

flowchart LR
    httpx-->|slow network| server
      subgraph one[Host 1]
        client-->|localhost\nconnection| quart
        subgraph quart[Quart app]
          httpx[httpx async\nclient]
        end
      end
      subgraph two[Host 2]
        server
      end
Loading

In this configuration, the Quart app acts as a proxy. As the httpx client is slowly consuming data because of the network speed, Quart buffers incoming data which is being uploaded at a very high speed because of a localhost connection. For huge payloads, this results in either memory allocation errors, or the out-of-memory killer killing the app.

^ From @andrewsh

@laggardkernel
Copy link
Contributor

laggardkernel commented Aug 3, 2022

Doesn't look like a problem for Quart, the ASGI app.

The ASGI separates ASGI server and ASGI app. All the ASGI app gets is scope, receive(), send(). An app reads request data and returns a response. The ASGI server handle connection and data reading, sending.

In my understanding, "adding streaming backpressue" means calling transp.pause_reading() in ASGI app. This seems contradict the ASGI abstration above.

From what I've read,

In asyncio, transp defines pause_reading(), resume_reading() and called by proto.
Proto defines pause_writing(), resume_writing() and called by transp.
It's because from the view of an ASGI app, the app read req data from transp, and resp data is sent from the app.

In ASGI servers like uvicorn, hypercorn, they add more layers under protocol.
e.g. RequestResponseCycle (wrapping around await app()) in uvicorn. HTTPStream, Context.spawn_app() in hypercorn.
Accessibility of transp.pause_reading(), resume_reading() should be extended into these new layers in ASGI servers. Besides, .pause_writing(), resume_writing() should be decided by the new most inner layer but not the proto.

uvicorn introduces a FlowControl layer to share read, write availability between proto and RequestResponseCycle. proto pauses reading if read buffer RequestResponseCycle.body > HIGH_WATER_LIMIT, Onece receive() gets called by the ASGI app, it resumes reading.

The backpressure thing sounds like an ASGI server thing.

@pgjones
Copy link
Member Author

pgjones commented Aug 3, 2022

It is both. At the moment there is no way for the app to place back pressure on the server. In the ASGI setup this could be done, for example, by not awaiting receive whilst the app catches up.

@lordmauve
Copy link

Is this about streaming uploads?

I'm seeing 413 errors due to hitting MAX_CONTENT_LENGTH, even though I'm streaming the body to disk. The docs say

This allows larger bodies to be received and consumed if desired. The key being that the data is consumed and not otherwise stored in memory. An example is,

async def route():
   async for data in request.body:
       # Do something with the data
       ...

it is advisable to add a timeout within each chunk if streaming the request.

But this is not working, and it's clear that Body.append() is sync and simply appends to the buffer and then fails with HTTP 413 if MAX_CONTENT_LENGTH is now exceeded. Because it is sync it cannot block if the buffer is "full" so it doesn't transmit backpressure from the application code.

The backpressure would need to be applied here at the ASGI layer such that if the request body buffer is not draining then we don't accept a new ASGI message on this connection.

@Wyzard256
Copy link

I've encountered this problem too. The root of it is that receiver_task runs concurrently with handler_task, consuming input from the client and appending it to a bytearray regardless of whether the app's handler is ready to use it. In an application that handles large file uploads, I've had my async for chunk in request.body loop receive chunks more than 5 gigabytes in size, just because that's how much the receiver_task accumulated into the bytearray while the handler_task was working on the previous (also very large) chunk.

Ideally, receiver_task and the handle_messages function shouldn't exist at all. Instead, the ASGIReceiveCallable and the functionality of handle_messages should be embedded into the request wrapper such that in an async for chunk in request.body loop, the request body performs one await receive() on the ASGI callable in each iteration, yielding the bytes to the async loop instead of appending them to a bytearray.

(This would involve the ASGIHTTPConnection._create_request_from_scope method taking a receive parameter in addition to the send parameter. It would also simplify the implementation of ASGIHTTPConnection.__call__, since there'd be only one task to await: no need for asyncio.wait(...) or cancel_task, and probably no need for asyncio.ensure_future either.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants