Skip to content

Commit f1bddb8

Browse files
committed
_AsyncConcurrentMapIterable: safely retrive and store current event loop in new event_loop attribute for performance and 3.14 compliance
see python/cpython#126353
1 parent 63baa47 commit f1bddb8

File tree

3 files changed

+28
-12
lines changed

3 files changed

+28
-12
lines changed

streamable/iterators.py

+9-3
Original file line numberDiff line numberDiff line change
@@ -608,6 +608,12 @@ def __init__(
608608
) -> None:
609609
super().__init__(iterator, buffersize, ordered)
610610
self.transformation = wrap_error(transformation, StopIteration)
611+
self.event_loop: asyncio.AbstractEventLoop
612+
try:
613+
self.event_loop = asyncio.get_event_loop()
614+
except RuntimeError:
615+
self.event_loop = asyncio.new_event_loop()
616+
asyncio.set_event_loop(self.event_loop)
611617

612618
async def _safe_transformation(
613619
self, elem: T
@@ -627,16 +633,16 @@ def _launch_task(
627633
) -> "Future[Union[U, _RaisingIterator.ExceptionContainer]]":
628634
return cast(
629635
"Future[Union[U, _RaisingIterator.ExceptionContainer]]",
630-
asyncio.get_event_loop().create_task(self._safe_transformation(elem)),
636+
self.event_loop.create_task(self._safe_transformation(elem)),
631637
)
632638

633639
def _future_result_collection(
634640
self,
635641
) -> FutureResultCollection[Union[U, _RaisingIterator.ExceptionContainer]]:
636642
if self.ordered:
637-
return FIFOAsyncFutureResultCollection()
643+
return FIFOAsyncFutureResultCollection(self.event_loop)
638644
else:
639-
return FDFOAsyncFutureResultCollection()
645+
return FDFOAsyncFutureResultCollection(self.event_loop)
640646

641647

642648
class AsyncConcurrentMapIterator(_RaisingIterator[U]):

streamable/util/futuretools.py

+13-8
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from collections import deque
44
from concurrent.futures import Future
55
from queue import Queue
6-
from typing import Deque, Iterator, Sized, Type, TypeVar
6+
from typing import Awaitable, Deque, Iterator, Sized, Type, TypeVar, cast
77

88
T = TypeVar("T")
99

@@ -75,26 +75,31 @@ class FIFOAsyncFutureResultCollection(DequeFutureResultCollection[T]):
7575
First In First Out
7676
"""
7777

78+
def __init__(self, event_loop: asyncio.AbstractEventLoop) -> None:
79+
super().__init__()
80+
self.event_loop = event_loop
81+
7882
def __next__(self) -> T:
79-
return asyncio.get_event_loop().run_until_complete(self._futures.popleft()) # type: ignore
83+
return self.event_loop.run_until_complete(
84+
cast(Awaitable[T], self._futures.popleft())
85+
)
8086

8187

8288
class FDFOAsyncFutureResultCollection(CallbackFutureResultCollection[T]):
8389
"""
8490
First Done First Out
8591
"""
8692

87-
def __init__(self) -> None:
93+
def __init__(self, event_loop: asyncio.AbstractEventLoop) -> None:
8894
super().__init__()
89-
self._waiter: asyncio.futures.Future[T] = (
90-
asyncio.get_event_loop().create_future()
91-
)
95+
self.event_loop = event_loop
96+
self._waiter: asyncio.futures.Future[T] = self.event_loop.create_future()
9297

9398
def _done_callback(self, future: "Future[T]") -> None:
9499
self._waiter.set_result(future.result())
95100

96101
def __next__(self) -> T:
97-
result = asyncio.get_event_loop().run_until_complete(self._waiter)
102+
result = self.event_loop.run_until_complete(self._waiter)
98103
self._n_futures -= 1
99-
self._waiter = asyncio.get_event_loop().create_future()
104+
self._waiter = self.event_loop.create_future()
100105
return result

tests/test_iterators.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
1+
import asyncio
12
import unittest
23

3-
from streamable.iterators import ObserveIterator, _OSConcurrentMapIterable
4+
from streamable.iterators import (
5+
ObserveIterator,
6+
_AsyncConcurrentMapIterable,
7+
_OSConcurrentMapIterable,
8+
)
49

510

611
class TestIterators(unittest.TestCase):

0 commit comments

Comments
 (0)