Skip to content

Commit affffc7

Browse files
miss-islingtongraingertZeroIntensity
authored
[3.12] gh-124309: fix staggered race on eager tasks (GH-124847) (#125340)
gh-124309: fix staggered race on eager tasks (GH-124847) This patch is entirely by Thomas and Peter (cherry picked from commit 979c0df) Co-authored-by: Thomas Grainger <[email protected]> Co-authored-by: Peter Bierma <[email protected]>
1 parent 2264c09 commit affffc7

File tree

4 files changed

+88
-3
lines changed

4 files changed

+88
-3
lines changed

Lib/asyncio/staggered.py

+14-3
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,11 @@ async def staggered_race(coro_fns, delay, *, loop=None):
6969
exceptions = []
7070
running_tasks = []
7171

72-
async def run_one_coro(previous_failed) -> None:
72+
async def run_one_coro(ok_to_start, previous_failed) -> None:
73+
# in eager tasks this waits for the calling task to append this task
74+
# to running_tasks, in regular tasks this wait is a no-op that does
75+
# not yield a future. See gh-124309.
76+
await ok_to_start.wait()
7377
# Wait for the previous task to finish, or for delay seconds
7478
if previous_failed is not None:
7579
with contextlib.suppress(exceptions_mod.TimeoutError):
@@ -85,8 +89,12 @@ async def run_one_coro(previous_failed) -> None:
8589
return
8690
# Start task that will run the next coroutine
8791
this_failed = locks.Event()
88-
next_task = loop.create_task(run_one_coro(this_failed))
92+
next_ok_to_start = locks.Event()
93+
next_task = loop.create_task(run_one_coro(next_ok_to_start, this_failed))
8994
running_tasks.append(next_task)
95+
# next_task has been appended to running_tasks so next_task is ok to
96+
# start.
97+
next_ok_to_start.set()
9098
assert len(running_tasks) == this_index + 2
9199
# Prepare place to put this coroutine's exceptions if not won
92100
exceptions.append(None)
@@ -116,8 +124,11 @@ async def run_one_coro(previous_failed) -> None:
116124
if i != this_index:
117125
t.cancel()
118126

119-
first_task = loop.create_task(run_one_coro(None))
127+
ok_to_start = locks.Event()
128+
first_task = loop.create_task(run_one_coro(ok_to_start, None))
120129
running_tasks.append(first_task)
130+
# first_task has been appended to running_tasks so first_task is ok to start.
131+
ok_to_start.set()
121132
try:
122133
# Wait for a growing list of tasks to all finish: poor man's version of
123134
# curio's TaskGroup or trio's nursery

Lib/test/test_asyncio/test_eager_task_factory.py

+46
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,52 @@ async def run():
218218

219219
self.run_coro(run())
220220

221+
def test_staggered_race_with_eager_tasks(self):
222+
# See https://github.com/python/cpython/issues/124309
223+
224+
async def fail():
225+
await asyncio.sleep(0)
226+
raise ValueError("no good")
227+
228+
async def run():
229+
winner, index, excs = await asyncio.staggered.staggered_race(
230+
[
231+
lambda: asyncio.sleep(2, result="sleep2"),
232+
lambda: asyncio.sleep(1, result="sleep1"),
233+
lambda: fail()
234+
],
235+
delay=0.25
236+
)
237+
self.assertEqual(winner, 'sleep1')
238+
self.assertEqual(index, 1)
239+
self.assertIsNone(excs[index])
240+
self.assertIsInstance(excs[0], asyncio.CancelledError)
241+
self.assertIsInstance(excs[2], ValueError)
242+
243+
self.run_coro(run())
244+
245+
def test_staggered_race_with_eager_tasks_no_delay(self):
246+
# See https://github.com/python/cpython/issues/124309
247+
async def fail():
248+
raise ValueError("no good")
249+
250+
async def run():
251+
winner, index, excs = await asyncio.staggered.staggered_race(
252+
[
253+
lambda: fail(),
254+
lambda: asyncio.sleep(1, result="sleep1"),
255+
lambda: asyncio.sleep(0, result="sleep0"),
256+
],
257+
delay=None
258+
)
259+
self.assertEqual(winner, 'sleep1')
260+
self.assertEqual(index, 1)
261+
self.assertIsNone(excs[index])
262+
self.assertIsInstance(excs[0], ValueError)
263+
self.assertEqual(len(excs), 2)
264+
265+
self.run_coro(run())
266+
221267

222268
class PyEagerTaskFactoryLoopTests(EagerTaskFactoryLoopTests, test_utils.TestCase):
223269
Task = tasks._PyTask

Lib/test/test_asyncio/test_staggered.py

+27
Original file line numberDiff line numberDiff line change
@@ -95,3 +95,30 @@ async def coro(index):
9595
self.assertEqual(len(excs), 2)
9696
self.assertIsInstance(excs[0], ValueError)
9797
self.assertIsInstance(excs[1], ValueError)
98+
99+
100+
async def test_multiple_winners(self):
101+
event = asyncio.Event()
102+
103+
async def coro(index):
104+
await event.wait()
105+
return index
106+
107+
async def do_set():
108+
event.set()
109+
await asyncio.Event().wait()
110+
111+
winner, index, excs = await staggered_race(
112+
[
113+
lambda: coro(0),
114+
lambda: coro(1),
115+
do_set,
116+
],
117+
delay=0.1,
118+
)
119+
self.assertIs(winner, 0)
120+
self.assertIs(index, 0)
121+
self.assertEqual(len(excs), 3)
122+
self.assertIsNone(excs[0], None)
123+
self.assertIsInstance(excs[1], asyncio.CancelledError)
124+
self.assertIsInstance(excs[2], asyncio.CancelledError)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fixed :exc:`AssertionError` when using :func:`!asyncio.staggered.staggered_race` with :attr:`asyncio.eager_task_factory`.

0 commit comments

Comments
 (0)