Skip to content

Commit 6887591

Browse files
committed
_AsyncConcurrentMapIterable: safely retrive and store current event loop in new event_loop attribute for performance and 3.14 compliance
see python/cpython#126353
1 parent 63baa47 commit 6887591

File tree

2 files changed

+19
-10
lines changed

2 files changed

+19
-10
lines changed

streamable/iterators.py

+9-3
Original file line numberDiff line numberDiff line change
@@ -608,6 +608,12 @@ def __init__(
608608
) -> None:
609609
super().__init__(iterator, buffersize, ordered)
610610
self.transformation = wrap_error(transformation, StopIteration)
611+
self.event_loop: asyncio.AbstractEventLoop
612+
try:
613+
self.event_loop = asyncio.get_event_loop()
614+
except RuntimeError:
615+
self.event_loop = asyncio.new_event_loop()
616+
asyncio.set_event_loop(self.event_loop)
611617

612618
async def _safe_transformation(
613619
self, elem: T
@@ -627,16 +633,16 @@ def _launch_task(
627633
) -> "Future[Union[U, _RaisingIterator.ExceptionContainer]]":
628634
return cast(
629635
"Future[Union[U, _RaisingIterator.ExceptionContainer]]",
630-
asyncio.get_event_loop().create_task(self._safe_transformation(elem)),
636+
self.event_loop.create_task(self._safe_transformation(elem)),
631637
)
632638

633639
def _future_result_collection(
634640
self,
635641
) -> FutureResultCollection[Union[U, _RaisingIterator.ExceptionContainer]]:
636642
if self.ordered:
637-
return FIFOAsyncFutureResultCollection()
643+
return FIFOAsyncFutureResultCollection(self.event_loop)
638644
else:
639-
return FDFOAsyncFutureResultCollection()
645+
return FDFOAsyncFutureResultCollection(self.event_loop)
640646

641647

642648
class AsyncConcurrentMapIterator(_RaisingIterator[U]):

streamable/util/futuretools.py

+10-7
Original file line numberDiff line numberDiff line change
@@ -75,26 +75,29 @@ class FIFOAsyncFutureResultCollection(DequeFutureResultCollection[T]):
7575
First In First Out
7676
"""
7777

78+
def __init__(self, event_loop: asyncio.AbstractEventLoop) -> None:
79+
super().__init__()
80+
self.event_loop = event_loop
81+
7882
def __next__(self) -> T:
79-
return asyncio.get_event_loop().run_until_complete(self._futures.popleft()) # type: ignore
83+
return self.event_loop.run_until_complete(self._futures.popleft()) # type: ignore
8084

8185

8286
class FDFOAsyncFutureResultCollection(CallbackFutureResultCollection[T]):
8387
"""
8488
First Done First Out
8589
"""
8690

87-
def __init__(self) -> None:
91+
def __init__(self, event_loop: asyncio.AbstractEventLoop) -> None:
8892
super().__init__()
89-
self._waiter: asyncio.futures.Future[T] = (
90-
asyncio.get_event_loop().create_future()
91-
)
93+
self.event_loop = event_loop
94+
self._waiter: asyncio.futures.Future[T] = self.event_loop.create_future()
9295

9396
def _done_callback(self, future: "Future[T]") -> None:
9497
self._waiter.set_result(future.result())
9598

9699
def __next__(self) -> T:
97-
result = asyncio.get_event_loop().run_until_complete(self._waiter)
100+
result = self.event_loop.run_until_complete(self._waiter)
98101
self._n_futures -= 1
99-
self._waiter = asyncio.get_event_loop().create_future()
102+
self._waiter = self.event_loop.create_future()
100103
return result

0 commit comments

Comments
 (0)