Skip to content

Fix api event drops #6556

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 44 commits into from
Dec 12, 2023
Merged

Fix api event drops #6556

merged 44 commits into from
Dec 12, 2023

Conversation

aliabid94
Copy link
Collaborator

@aliabid94 aliabid94 commented Nov 23, 2023

I believe this should fix the issue of random "Event ID not found".

Previously, we had two http requests that get fired for each prediction - first the SSE connection that gets establishes a continuous connection for the duration of the request, and then later when this session is popped off the queue in the backend, the SSE requests data from the frontend, so the frontend fires off a POST request to upload the data. However, the backend only waits 5 seconds for the data to get uploaded, after that it assumes the connection has dropped. So if the frontend's ability to send that POST request gets blocked and it takes longer than 5 seconds to upload data, we see the error we are seeing (event id not found when uploading data, because the event was killed).

I'm not sure if 100% of these issues can be explained by this, but what I think was happening was the the follow-up POST request was being blocked because of the limitation of 6 pending HTTP connections limit of browsers to the same domain. If there were multiple long SSE requests pending, then the follow POST request would get queued up and sent past the 5 second timeout.

My solution for now is to send the POST request with the data first, and then immediately after establish an SSE connection. This way, we will never have the problem of the backend requesting data when the queue is free but no data is available. This means the backend will hold on to data for sessions before the session gets popped off the queue. This also means that for connections that get broken between the POST request and the SSE request (even though these requests are right after each other), the backend holds on to the data indefinitely. To prevent this from leading to an ever-expanding data structure in memory, I store the data from the POST request in an LRUCache with a maximum size of 1000 items. I think this will be more than plenty.

Because the API has changed, I updated the clients to support this new API.

Fixes: #6366 #6274

To test this:

Run this simple demo:

import gradio as gr


def greet(name):
    import time
    time.sleep(20)
    return "Hello " + name + "!"


with gr.Blocks() as demo:
    name = gr.Textbox(label="Name")
    output = gr.Textbox(label="Output Box")
    greet_btn = gr.Button("Greet")
    greet_btn.click(fn=greet, inputs=name, outputs=output, api_name="greet")

if __name__ == "__main__":
    demo.launch()

Then run this demo across 6+ tabs and submit simultaneously. Old gradio would have event_id not found error, now resolved.

@gradio-pr-bot
Copy link
Collaborator

gradio-pr-bot commented Nov 23, 2023

🪼 branch checks and previews

Name Status URL
Spaces ready! Spaces preview
Website ready! Website preview
Storybook ready! Storybook preview
Visual tests all good! Build review
🦄 Changes detected! Details
📓 Notebooks not matching! Details

The demo notebooks don't match the run.py files. Please run this command from the root of the repo and then commit the changes:

pip install nbformat && cd demo && python generate_notebooks.py

Install Gradio from this PR

pip install https://gradio-builds.s3.amazonaws.com/655c720f09fe4fe74272393402f8c4c9edd4d81e/gradio-4.8.0-py3-none-any.whl

Install Gradio Python Client from this PR

pip install "gradio-client @ git+https://github.com/gradio-app/gradio@655c720f09fe4fe74272393402f8c4c9edd4d81e#subdirectory=client/python"

@gradio-pr-bot
Copy link
Collaborator

gradio-pr-bot commented Nov 23, 2023

🦄 change detected

This Pull Request includes changes to the following packages.

Package Version
@gradio/client patch
gradio patch
gradio_client patch
  • Maintainers can select this checkbox to manually select packages to update.

With the following changelog entry.

Fix api event drops

Maintainers or the PR author can modify the PR title to modify this entry.

Something isn't right?

  • Maintainers can change the version label to modify the version bump.
  • If the bot has failed to detect any changes, or if this pull request needs to update multiple packages to different versions or requires a more comprehensive changelog entry, maintainers can update the changelog file directly.

event_id = resp["event_id"]

async with client.stream(
"GET", sse_url, params={"event_id": event_id}, cookies=cookies
Copy link
Member

@abidlabs abidlabs Nov 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs to include headers as done here: #6602, or won't work with private Spaces

@abidlabs
Copy link
Member

Started testing this and it does seem like the issue goes away. But there might actually be a much simpler way to solve this issue by using the “max Keep-Alive requests” header as described here: https://www.geeksforgeeks.org/what-are-max-parallel-http-connections-in-a-browser/

For example, if the “Keep-Alive” header specifies a “max Keep-Alive requests” value of 100, it means that the browser can initiate up to 100 requests over the persistent connection before it has to re-establish a new connection.

Can we try this approach first so that we don't end potentially with other issues that may arise due to this refactor?

@aliabid94
Copy link
Collaborator Author

Can we try this approach first so that we don't end potentially with other issues that may arise due to this refactor?

This PR is overall a better approach, as the queue will never be paused because it is waiting for data from a client. It is also needed to move in the direction of one-event-stream-per-browser-session instead of one-event-stream-per-event-trigger.

Copy link
Member

@abidlabs abidlabs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested this with a lot of different demos. Looks good to me, and can confirm that the issue in the repro you provided is fixed.

We should add some backend unit tests that ensure that this works nicely with Spaces. Could you create 2 spaces -- one private and one public that have this PR installed? And make sure that the gr.Client is able to work successfully with both Spaces?

@aliabid94
Copy link
Collaborator Author

Actually I'm gonna rework this PR to instead use a single event source for a whole browser session. Should scale better.

if resp["msg"] == "heartbeat":
continue
elif resp["msg"] == "server_stopped":
print("Server stopped!!!", self.src)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove print statement

Comment on lines +1168 to +1171
self.client.sse_url,
self.client.sse_data_url,
self.client.headers,
self.client.cookies,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

jw why did we remove the kwargs?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no particular reason, seemed arbitrary some were kwargs and some args

helper,
self.client.cookies,
self.client.pending_messages_per_event,
event_id,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you'll need to send headers here, otherwise it won't work with private spaces

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function doesn't make any backend calls, other than check_for_cancel, which doesn't use headers previously either. Will add to check_for_cancel though

stream_sse_v1(
helper,
pending_messages_per_event,
event_id,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also here I believe you'll need to send headers

gradio/routes.py Outdated
blocks._queue.attach_data(body)
from datetime import datetime

print("recieved data @", datetime.now().strftime("%H:%M:%S.%f")[:-3])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove print statement

@abidlabs
Copy link
Member

abidlabs commented Dec 7, 2023

Awesome @aliabid94! So far in my tests of various demo, everything is working well.

I'll just repeat my request from the previous review of this PR:

  • We should add some backend unit tests that ensure that this works nicely with Client & Spaces. Could you create 2 Spaces -- one private and one public that have this PR installed? And make sure that the gradio Client is able to work with with both Spaces?

Copy link
Collaborator

@freddyaboulton freddyaboulton left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice @aliabid94 ! Confirmed it fixes the issue and the deployed demos still look good. Left some comments.

@@ -100,6 +113,7 @@ class Status(Enum):
PROGRESS = "PROGRESS"
FINISHED = "FINISHED"
CANCELLED = "CANCELLED"
LOG = "LOG"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log messages are gr.Info and gr.Warning right? I don't think they should be included in the status updates.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes that's what log messages are. previously we ignored all log messages in the python client and I wanted them stored somewhere. could seperate log messages in a separate data structure in a separate PR?

def open_stream():
return utils.synchronize_async(self.stream_messages)

if self.streaming_future is None or self.streaming_future.done():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think right now we're trying to only open one stream for all pending predictions like we do in the js client. I understand we're doing that in the front-end for the browser limitations but do we have to do that in the python client? I don't think we do because we can just connect to queue/join and listen for events with the right id? I think it would be simpler if every prediction opened it's own stream. Would make this easier to maintain and also add new clients in the future for other languages.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The backend currently only supports a stream listening to a session_id. This makes it impossible to have a separate stream per event from the client as you suggest, unless every client call had a separate session_id. We probably don't want that behaviour because of session state.
I like the idea of a stream per event, but that would mean creating a separate endpoint just for that in the backend. It would be complex to support both endpoints at the same time, something we could consider in another PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it! We talked about exploring "api-only" usage next year. Would be good to revisit this then because the api is getting complex (for valid reasons) but it limits the ability to use gradio from another language

gradio/routes.py Outdated
@@ -583,61 +580,36 @@ async def predict(

@app.get("/queue/join", dependencies=[Depends(login_check)])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The route names are a bit confusing now. queue/data joins the queue and queue/join gives you the data for the prediction lol. Can we switch?

if blocks._queue.server_app is None:
blocks._queue.set_server_app(app)

event_id = await blocks._queue.push(body, request, username)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We talked about this briefly on slack but is it possible for a client to connect after the job finishes running? What happens in that case?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wdym by "connect after a job finishes"? Specifically if you try to connect to the SSE stream when there are no pending events, it will throw an error. But the client doesn't let the user arbitrarily connect to the stream endpoint.


if self.streaming_future is None or self.streaming_future.done():
self.streaming_future = self.executor.submit(open_stream)
self.streaming_future.add_done_callback(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove this print statement

Copy link
Collaborator

@freddyaboulton freddyaboulton left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work @aliabid94 !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

ValueError("Event not found", event_id)
4 participants