Skip to content

Commit eaf3b93

Browse files
Ensure Sync generators get closed if they are cancelled (#11396)
1 parent 7d98414 commit eaf3b93

File tree

3 files changed

+37
-0
lines changed

3 files changed

+37
-0
lines changed

.changeset/thick-hands-bet.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"gradio": patch
3+
---
4+
5+
fix:Ensure Sync generators get closed if they are cancelled

gradio/queueing.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
LRUCache,
3535
error_payload,
3636
run_coro_in_background,
37+
safe_aclose_iterator,
3738
safe_get_lock,
3839
set_task_name,
3940
)
@@ -808,6 +809,10 @@ async def reset_iterators(self, event_id: str):
808809
# Failure, but don't raise an error
809810
return
810811
async with app.lock:
812+
try:
813+
await safe_aclose_iterator(app.iterators[event_id])
814+
except Exception:
815+
pass
811816
del app.iterators[event_id]
812817
app.iterators_to_reset.add(event_id)
813818
return

gradio/utils.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -744,6 +744,9 @@ async def __anext__(self):
744744
run_sync_iterator_async, self.iterator, limiter=self.limiter
745745
)
746746

747+
def aclose(self):
748+
self.iterator.close()
749+
747750

748751
async def async_iteration(iterator):
749752
return await anext(iterator)
@@ -1778,3 +1781,27 @@ def get_function_description(fn: Callable) -> tuple[str, dict[str, str], list[st
17781781
pass
17791782

17801783
return description, parameters, returns
1784+
1785+
1786+
async def safe_aclose_iterator(iterator, timeout=60.0, retry_interval=0.05):
1787+
"""
1788+
Safely close generators by calling the aclose method.
1789+
Sync generators are tricky because if you call `aclose` while the loop is running
1790+
then you get a ValueError and the generator will not shut down gracefully.
1791+
So the solution is to retry calling the aclose method until we succeed (with timeout).
1792+
"""
1793+
start = time.monotonic()
1794+
if isinstance(iterator, SyncToAsyncIterator):
1795+
while True:
1796+
try:
1797+
iterator.aclose()
1798+
break
1799+
except ValueError as e:
1800+
if "already executing" in str(e):
1801+
if time.monotonic() - start > timeout:
1802+
raise
1803+
await asyncio.sleep(retry_interval)
1804+
else:
1805+
raise
1806+
else:
1807+
iterator.aclose()

0 commit comments

Comments
 (0)