Skip to content

Commit 62b6496

Browse files
committed
Add exit on exception for deamon threads
1 parent f82c6e3 commit 62b6496

File tree

4 files changed

+43
-1
lines changed

4 files changed

+43
-1
lines changed

resonate/bridge.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from resonate.models.task import Task
3636
from resonate.options import Options
3737
from resonate.scheduler import Done, Info, More, Scheduler
38+
from resonate.utils import exit_on_exception
3839

3940
if TYPE_CHECKING:
4041
from collections.abc import Callable
@@ -176,6 +177,7 @@ def stop(self) -> None:
176177
if self._heartbeat_thread.is_alive():
177178
self._heartbeat_thread.join()
178179

180+
@exit_on_exception
179181
def _process_cq(self) -> None:
180182
while True:
181183
item = self._cq.get()
@@ -242,6 +244,7 @@ def _process_cq(self) -> None:
242244
if task is not None:
243245
self._store.tasks.complete(id=task.id, counter=task.counter)
244246

247+
@exit_on_exception
245248
def _process_msgs(self) -> None:
246249
def _invoke(root: DurablePromise) -> Invoke:
247250
assert "func" in root.param.data
@@ -301,6 +304,7 @@ def _invoke(root: DurablePromise) -> Invoke:
301304
durable_promise = DurablePromise.from_dict(self._store, _promise)
302305
self._cq.put_nowait(Notify(durable_promise.id, durable_promise))
303306

307+
@exit_on_exception
304308
def _process_delayed_events(self) -> None:
305309
while not self._shutdown.is_set():
306310
with self._delay_condition:
@@ -337,6 +341,7 @@ def _process_delayed_events(self) -> None:
337341
def start_heartbeat(self) -> None:
338342
self._heartbeat_active.set()
339343

344+
@exit_on_exception
340345
def _heartbeat(self) -> None:
341346
while not self._shutdown.is_set():
342347
# If this timeout don't execute the heartbeat

resonate/message_sources/local.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import threading
44
from typing import TYPE_CHECKING
55

6+
from resonate.utils import exit_on_exception
7+
68
if TYPE_CHECKING:
79
from resonate.models.message import Mesg
810
from resonate.models.message_source import MessageQ
@@ -45,6 +47,7 @@ def stop(self) -> None:
4547
self._thread = None
4648
self._stop_event.clear()
4749

50+
@exit_on_exception
4851
def _loop(self, mq: MessageQ) -> None:
4952
while not self._stop_event.is_set():
5053
for msg in self.step():

resonate/message_sources/poller.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from resonate.encoders import JsonEncoder
1111
from resonate.logging import logger
1212
from resonate.models.message import InvokeMesg, NotifyMesg, ResumeMesg
13+
from resonate.utils import exit_on_exception
1314

1415
if TYPE_CHECKING:
1516
from collections.abc import Generator
@@ -69,6 +70,7 @@ def stop(self) -> None:
6970
# value, which is not the default. This is still useful for tests.
7071
self._shutdown = True
7172

73+
@exit_on_exception
7274
def loop(self, mq: MessageQ) -> None:
7375
while True:
7476
if self._shutdown:
@@ -91,7 +93,7 @@ def loop(self, mq: MessageQ) -> None:
9193
continue
9294
except Exception as e:
9395
logger.warning("Unexpected error in poller loop for group %s: %s", self._group, e)
94-
break
96+
raise
9597

9698
def step(self) -> list[Mesg]:
9799
with requests.get(self.url, stream=True, timeout=self._timeout) as res:

resonate/utils.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
from __future__ import annotations
2+
3+
import os
4+
import traceback
5+
import urllib.parse
6+
from functools import wraps
7+
from importlib.metadata import version
8+
from typing import TYPE_CHECKING, ParamSpec, TypeVar
9+
10+
from resonate.logging import logger
11+
12+
if TYPE_CHECKING:
13+
from collections.abc import Callable
14+
15+
P = ParamSpec("P")
16+
R = TypeVar("R")
17+
18+
19+
def exit_on_exception(func: Callable[P, R]) -> Callable[P, R]:
20+
@wraps(func)
21+
def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
22+
try:
23+
return func(*args, **kwargs)
24+
except Exception:
25+
v = version("resonate-sdk")
26+
logger.exception(
27+
"An unexpected error happened.\n\nPlease report this issue so we can fix it as fast a possible:\n - https://github.com/resonatehq/resonate-sdk-py/issues/new?body=%s\n\n",
28+
urllib.parse.quote(f"Resonate (version {v}) process exited with error:\n```bash\n{traceback.format_exc()}```"),
29+
)
30+
os._exit(1)
31+
32+
return wrapper

0 commit comments

Comments
 (0)