Skip to content

Commit 228659d

Browse files
authored
Add processor for concurrent execution of function (#283)
* Add processor for concurrent execution of function * use callback and shutdown queue * back to use none * Join queue * ignore rule * allow workers to be configurable * Add tests for the processor * back to bind variables * Add test for the processor and remove join on queue
1 parent d31ee53 commit 228659d

File tree

5 files changed

+233
-134
lines changed

5 files changed

+233
-134
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ dev = [
2121
"docutils>=0.21.2",
2222
"pydoctor>=24.11.2",
2323
"pyright>=1.1.396",
24-
"pytest>=8.3.5",
2524
"pytest-cov>=6.1.1",
25+
"pytest>=8.3.5",
2626
"ruff>=0.11.0",
2727
"tabulate>=0.9.0",
2828
"types-requests>=2.32.0.20250306",

resonate/bridge.py

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import threading
55
import time
66
from concurrent.futures import Future
7-
from typing import TYPE_CHECKING, Any
7+
from typing import TYPE_CHECKING
88

99
from resonate.conventions import Base
1010
from resonate.delay_q import DelayQ
@@ -34,9 +34,10 @@
3434
Return,
3535
)
3636
from resonate.models.durable_promise import DurablePromise
37-
from resonate.models.result import Ko, Ok, Result
37+
from resonate.models.result import Ko, Ok
3838
from resonate.models.task import Task
3939
from resonate.options import Options
40+
from resonate.processor import Processor
4041
from resonate.scheduler import Done, Info, More, Scheduler
4142
from resonate.utils import exit_on_exception
4243

@@ -84,6 +85,7 @@ def __init__(
8485
self._unicast,
8586
self._anycast,
8687
)
88+
self._processor = Processor()
8789

8890
self._bridge_thread = threading.Thread(target=self._process_cq, name="bridge", daemon=True)
8991
self._message_source_thread = threading.Thread(target=self._process_msgs, name="message-source", daemon=True)
@@ -166,6 +168,8 @@ def get(self, id: str, opts: Options, future: Future) -> DurablePromise:
166168
return promise
167169

168170
def start(self) -> None:
171+
self._processor.start()
172+
169173
if not self._message_source_thread.is_alive():
170174
self._message_source.start()
171175
self._message_source_thread.start()
@@ -191,6 +195,7 @@ def stop(self) -> None:
191195

192196
def _stop_no_join(self) -> None:
193197
"""Stop internal components and threads. Does not join the threads, to be able to call it from the bridge itself."""
198+
self._processor.stop()
194199
self._message_source.stop()
195200
self._cq.put_nowait(None)
196201
self._heartbeat_active.clear()
@@ -218,7 +223,7 @@ def _process_cq(self) -> None:
218223
return
219224

220225
case Function(id, cid, func):
221-
self._cq.put_nowait(Return(id, cid, self._handle_function(func)))
226+
self._processor.enqueue(func, lambda r, id=id, cid=cid: self._cq.put_nowait(Return(id, cid, r)))
222227
case Delayed() as item:
223228
self._handle_delay(item)
224229

@@ -341,7 +346,7 @@ def _process_delayed_events(self) -> None:
341346
for item in events:
342347
match item:
343348
case Function(id, cid, func):
344-
self._cq.put_nowait(Return(id, cid, self._handle_function(func)))
349+
self._processor.enqueue(func, lambda r, id=id, cid=cid: self._cq.put_nowait(Return(id, cid, r)))
345350
case retry:
346351
self._cq.put_nowait(retry)
347352

@@ -434,10 +439,3 @@ def _handle_network_request(self, cmd_id: str, cid: str, req: CreatePromiseReq |
434439

435440
case _:
436441
raise NotImplementedError
437-
438-
def _handle_function(self, func: Callable[[], Any]) -> Result:
439-
try:
440-
r = func()
441-
return Ok(r)
442-
except Exception as e:
443-
return Ko(e)

resonate/processor.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
from __future__ import annotations
2+
3+
import os
4+
import queue
5+
from collections.abc import Callable
6+
from threading import Thread
7+
from typing import Any
8+
9+
from resonate.models.result import Ko, Ok, Result
10+
from resonate.utils import exit_on_exception
11+
12+
13+
class Processor:
14+
def __init__(self, workers: int | None = None) -> None:
15+
self.threads = set[Thread]()
16+
for _ in range(min(32, workers or (os.cpu_count() or 1))):
17+
self.threads.add(Thread(target=self._run, daemon=True))
18+
19+
self.sq = queue.Queue[tuple[Callable[[], Any], Callable[[Result[Any]], None]] | None]()
20+
21+
@exit_on_exception
22+
def _run(self) -> None:
23+
while sqe := self.sq.get():
24+
func, callback = sqe
25+
26+
try:
27+
r = Ok(func())
28+
except Exception as e:
29+
r = Ko(e)
30+
31+
callback(r)
32+
33+
def enqueue(self, func: Callable[[], Any], callback: Callable[[Result[Any]], None]) -> None:
34+
self.sq.put((func, callback))
35+
36+
def start(self) -> None:
37+
for t in self.threads:
38+
if not t.is_alive():
39+
t.start()
40+
41+
def stop(self) -> None:
42+
for _ in self.threads:
43+
self.sq.put(None)
44+
45+
for t in self.threads:
46+
# we might want to consider specifying a timeout
47+
# in the case the user has a long-running function blocking
48+
t.join()

tests/test_processor.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
from __future__ import annotations
2+
3+
from queue import Queue
4+
5+
import pytest
6+
7+
from resonate.models.result import Ok, Result
8+
from resonate.processor import Processor
9+
10+
11+
def greet(name: str) -> str:
12+
return f"Hi {name}"
13+
14+
15+
def callback(q: Queue[tuple[str, str]], expected: str, result: Result[str]) -> None:
16+
assert isinstance(result, Ok)
17+
q.put((result.value, expected))
18+
19+
20+
@pytest.mark.parametrize("workers", [1, 2, 3])
21+
def test_processor(workers: int) -> None:
22+
q = Queue[tuple[str, str]]()
23+
24+
p = Processor(workers)
25+
assert len(p.threads) == workers
26+
27+
names = ["A", "B"]
28+
expected_greet = [greet("A"), greet("B")]
29+
p.start()
30+
for name, expected in zip(names, expected_greet, strict=False):
31+
p.enqueue(lambda name=name: greet(name), lambda r, expected=expected: callback(q, expected, r))
32+
33+
p.stop()
34+
assert q.qsize() == len(names)
35+
36+
for _ in range(q.qsize()):
37+
actual, expected = q.get()
38+
assert actual == expected
39+
q.task_done()
40+
41+
q.join()
42+
assert q.empty()

0 commit comments

Comments
 (0)