-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Fix api event drops #6556
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix api event drops #6556
Conversation
🪼 branch checks and previews
The demo notebooks don't match the run.py files. Please run this command from the root of the repo and then commit the changes: pip install nbformat && cd demo && python generate_notebooks.py Install Gradio from this PR pip install https://gradio-builds.s3.amazonaws.com/655c720f09fe4fe74272393402f8c4c9edd4d81e/gradio-4.8.0-py3-none-any.whl Install Gradio Python Client from this PR pip install "gradio-client @ git+https://github.com/gradio-app/gradio@655c720f09fe4fe74272393402f8c4c9edd4d81e#subdirectory=client/python" |
🦄 change detectedThis Pull Request includes changes to the following packages.
With the following changelog entry.
Maintainers or the PR author can modify the PR title to modify this entry.
|
…adio into fix_api_event_dops
client/python/gradio_client/utils.py
Outdated
event_id = resp["event_id"] | ||
|
||
async with client.stream( | ||
"GET", sse_url, params={"event_id": event_id}, cookies=cookies |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs to include headers
as done here: #6602, or won't work with private Spaces
Started testing this and it does seem like the issue goes away. But there might actually be a much simpler way to solve this issue by using the “max Keep-Alive requests” header as described here: https://www.geeksforgeeks.org/what-are-max-parallel-http-connections-in-a-browser/
Can we try this approach first so that we don't end potentially with other issues that may arise due to this refactor? |
This PR is overall a better approach, as the queue will never be paused because it is waiting for data from a client. It is also needed to move in the direction of one-event-stream-per-browser-session instead of one-event-stream-per-event-trigger. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tested this with a lot of different demos. Looks good to me, and can confirm that the issue in the repro you provided is fixed.
We should add some backend unit tests that ensure that this works nicely with Spaces. Could you create 2 spaces -- one private and one public that have this PR installed? And make sure that the gr.Client
is able to work successfully with both Spaces?
Actually I'm gonna rework this PR to instead use a single event source for a whole browser session. Should scale better. |
if resp["msg"] == "heartbeat": | ||
continue | ||
elif resp["msg"] == "server_stopped": | ||
print("Server stopped!!!", self.src) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove print statement
self.client.sse_url, | ||
self.client.sse_data_url, | ||
self.client.headers, | ||
self.client.cookies, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
jw why did we remove the kwargs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no particular reason, seemed arbitrary some were kwargs and some args
helper, | ||
self.client.cookies, | ||
self.client.pending_messages_per_event, | ||
event_id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you'll need to send headers here, otherwise it won't work with private spaces
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this function doesn't make any backend calls, other than check_for_cancel, which doesn't use headers previously either. Will add to check_for_cancel though
stream_sse_v1( | ||
helper, | ||
pending_messages_per_event, | ||
event_id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also here I believe you'll need to send headers
gradio/routes.py
Outdated
blocks._queue.attach_data(body) | ||
from datetime import datetime | ||
|
||
print("recieved data @", datetime.now().strftime("%H:%M:%S.%f")[:-3]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove print statement
Awesome @aliabid94! So far in my tests of various demo, everything is working well. I'll just repeat my request from the previous review of this PR:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice @aliabid94 ! Confirmed it fixes the issue and the deployed demos still look good. Left some comments.
@@ -100,6 +113,7 @@ class Status(Enum): | |||
PROGRESS = "PROGRESS" | |||
FINISHED = "FINISHED" | |||
CANCELLED = "CANCELLED" | |||
LOG = "LOG" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log messages are gr.Info
and gr.Warning
right? I don't think they should be included in the status updates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes that's what log messages are. previously we ignored all log messages in the python client and I wanted them stored somewhere. could seperate log messages in a separate data structure in a separate PR?
def open_stream(): | ||
return utils.synchronize_async(self.stream_messages) | ||
|
||
if self.streaming_future is None or self.streaming_future.done(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think right now we're trying to only open one stream for all pending predictions like we do in the js client. I understand we're doing that in the front-end for the browser limitations but do we have to do that in the python client? I don't think we do because we can just connect to queue/join
and listen for events with the right id? I think it would be simpler if every prediction opened it's own stream. Would make this easier to maintain and also add new clients in the future for other languages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The backend currently only supports a stream listening to a session_id. This makes it impossible to have a separate stream per event from the client as you suggest, unless every client call had a separate session_id. We probably don't want that behaviour because of session state.
I like the idea of a stream per event, but that would mean creating a separate endpoint just for that in the backend. It would be complex to support both endpoints at the same time, something we could consider in another PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it! We talked about exploring "api-only" usage next year. Would be good to revisit this then because the api is getting complex (for valid reasons) but it limits the ability to use gradio from another language
gradio/routes.py
Outdated
@@ -583,61 +580,36 @@ async def predict( | |||
|
|||
@app.get("/queue/join", dependencies=[Depends(login_check)]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The route names are a bit confusing now. queue/data
joins the queue and queue/join
gives you the data for the prediction lol. Can we switch?
if blocks._queue.server_app is None: | ||
blocks._queue.set_server_app(app) | ||
|
||
event_id = await blocks._queue.push(body, request, username) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We talked about this briefly on slack but is it possible for a client to connect after the job finishes running? What happens in that case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wdym by "connect after a job finishes"? Specifically if you try to connect to the SSE stream when there are no pending events, it will throw an error. But the client doesn't let the user arbitrarily connect to the stream endpoint.
|
||
if self.streaming_future is None or self.streaming_future.done(): | ||
self.streaming_future = self.executor.submit(open_stream) | ||
self.streaming_future.add_done_callback( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove this print statement
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work @aliabid94 !
I believe this should fix the issue of random "Event ID not found".
Previously, we had two http requests that get fired for each prediction - first the SSE connection that gets establishes a continuous connection for the duration of the request, and then later when this session is popped off the queue in the backend, the SSE requests data from the frontend, so the frontend fires off a POST request to upload the data. However, the backend only waits 5 seconds for the data to get uploaded, after that it assumes the connection has dropped. So if the frontend's ability to send that POST request gets blocked and it takes longer than 5 seconds to upload data, we see the error we are seeing (event id not found when uploading data, because the event was killed).
I'm not sure if 100% of these issues can be explained by this, but what I think was happening was the the follow-up POST request was being blocked because of the limitation of 6 pending HTTP connections limit of browsers to the same domain. If there were multiple long SSE requests pending, then the follow POST request would get queued up and sent past the 5 second timeout.
My solution for now is to send the POST request with the data first, and then immediately after establish an SSE connection. This way, we will never have the problem of the backend requesting data when the queue is free but no data is available. This means the backend will hold on to data for sessions before the session gets popped off the queue. This also means that for connections that get broken between the POST request and the SSE request (even though these requests are right after each other), the backend holds on to the data indefinitely. To prevent this from leading to an ever-expanding data structure in memory, I store the data from the POST request in an LRUCache with a maximum size of 1000 items. I think this will be more than plenty.
Because the API has changed, I updated the clients to support this new API.
Fixes: #6366 #6274
To test this:
Run this simple demo:
Then run this demo across 6+ tabs and submit simultaneously. Old gradio would have
event_id not found
error, now resolved.