Skip to content

Commit 44d54fb

Browse files
committed
use callback and shutdown queue
1 parent c7eea97 commit 44d54fb

File tree

2 files changed

+20
-20
lines changed

2 files changed

+20
-20
lines changed

resonate/bridge.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
ResolvePromiseRes,
3232
Resume,
3333
Retry,
34+
Return,
3435
)
3536
from resonate.models.durable_promise import DurablePromise
3637
from resonate.models.result import Ko, Ok
@@ -84,7 +85,7 @@ def __init__(
8485
self._unicast,
8586
self._anycast,
8687
)
87-
self._processor = Processor(self._cq)
88+
self._processor = Processor()
8889

8990
self._bridge_thread = threading.Thread(target=self._process_cq, name="bridge", daemon=True)
9091
self._message_source_thread = threading.Thread(target=self._process_msgs, name="message-source", daemon=True)
@@ -222,7 +223,7 @@ def _process_cq(self) -> None:
222223
return
223224

224225
case Function(id, cid, func):
225-
self._processor.enqueue(id, cid, func)
226+
self._processor.enqueue(func, lambda r, id=id, cid=cid: self._cq.put_nowait(Return(id, cid, r)))
226227
case Delayed() as item:
227228
self._handle_delay(item)
228229

@@ -345,7 +346,7 @@ def _process_delayed_events(self) -> None:
345346
for item in events:
346347
match item:
347348
case Function(id, cid, func):
348-
self._processor.enqueue(id, cid, func)
349+
self._processor.enqueue(func, lambda r, id=id, cid=cid: self._cq.put_nowait(Return(id, cid, r)))
349350
case retry:
350351
self._cq.put_nowait(retry)
351352

resonate/processor.py

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,47 +4,46 @@
44
import queue
55
from collections.abc import Callable
66
from threading import Thread
7-
from typing import TYPE_CHECKING, Any
7+
from typing import Any
88

9-
from resonate.models.commands import Command, Return
10-
from resonate.models.result import Ko, Ok
9+
from resonate.models.result import Ko, Ok, Result
1110
from resonate.utils import exit_on_exception
1211

13-
if TYPE_CHECKING:
14-
from concurrent.futures import Future
15-
1612

1713
class Processor:
18-
def __init__(self, cq: queue.Queue[Command | tuple[Command, Future] | None]) -> None:
14+
def __init__(self) -> None:
1915
self.threads = set[Thread]()
2016
for _ in range(min(32, (os.cpu_count() or 1))):
2117
self.threads.add(Thread(target=self._run, daemon=True))
2218

23-
self.sq = queue.Queue[tuple[str, str, Callable[[], Any]] | None]()
24-
self.cq = cq
19+
self.sq = queue.Queue[tuple[Callable[[], Any], Callable[[Result[Any]], None]]]()
2520

2621
@exit_on_exception
2722
def _run(self) -> None:
28-
while sqe := self.sq.get():
29-
id, cid, func = sqe
23+
while True:
24+
try:
25+
func, callback = self.sq.get()
26+
except queue.ShutDown:
27+
break
28+
3029
try:
3130
r = Ok(func())
3231
except Exception as e:
3332
r = Ko(e)
34-
self.cq.put_nowait(Return(id, cid, r))
35-
self.sq.task_done()
3633

37-
self.sq.put(None)
34+
callback(r)
35+
self.sq.task_done()
3836

39-
def enqueue(self, id: str, cid: str, func: Callable[[], Any]) -> None:
40-
self.sq.put((id, cid, func))
37+
def enqueue(self, func: Callable[[], Any], callback: Callable[[Result[Any]], None]) -> None:
38+
self.sq.put((func, callback))
4139

4240
def start(self) -> None:
4341
for t in self.threads:
4442
if not t.is_alive():
4543
t.start()
4644

4745
def stop(self) -> None:
48-
self.sq.put(None)
46+
self.sq.shutdown()
4947
for t in self.threads:
5048
t.join()
49+
self.sq.join()

0 commit comments

Comments
 (0)