Skip to content

Commit 1751cf7

Browse files
authored
Fix rfi from string in same node (#139)
* Fix rfi from string in same node * Remove unused script
1 parent e3a0341 commit 1751cf7

File tree

3 files changed

+26
-21
lines changed

3 files changed

+26
-21
lines changed

src/resonate/record.py

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,24 @@
2121
T = TypeVar("T")
2222

2323

24+
def _set_retry_policy(invocation: LFI | RFI) -> retry_policy.RetryPolicy | None:
25+
if not isinstance(invocation.unit, Invocation):
26+
retry_policy = None
27+
elif isinstance(invocation.unit.fn, str):
28+
retry_policy = invocation.opts.retry_policy or None
29+
elif isgeneratorfunction(invocation.unit.fn):
30+
retry_policy = invocation.opts.retry_policy or never()
31+
else:
32+
assert iscoroutinefunction(invocation.unit.fn) or isfunction(invocation.unit.fn)
33+
retry_policy = invocation.opts.retry_policy or exponential(
34+
base_delay=1,
35+
factor=2,
36+
max_retries=-1,
37+
max_delay=30,
38+
)
39+
return retry_policy
40+
41+
2442
@final
2543
class Record(Generic[T]):
2644
def __init__(
@@ -38,24 +56,7 @@ def __init__(
3856
self._result: Result[T, Exception] | None = None
3957
self.children: list[Record[Any]] = []
4058
self.invocation: LFI | RFI = invocation
41-
self.retry_policy: retry_policy.RetryPolicy | None
42-
if not isinstance(invocation.unit, Invocation):
43-
self.retry_policy = None
44-
elif isinstance(invocation.unit.fn, str):
45-
self.retry_policy = invocation.opts.retry_policy or None
46-
elif isgeneratorfunction(invocation.unit.fn):
47-
self.retry_policy = invocation.opts.retry_policy or never()
48-
else:
49-
assert iscoroutinefunction(invocation.unit.fn) or isfunction(
50-
invocation.unit.fn
51-
)
52-
self.retry_policy = invocation.opts.retry_policy or exponential(
53-
base_delay=1,
54-
factor=2,
55-
max_retries=-1,
56-
max_delay=30,
57-
)
58-
59+
self.retry_policy = _set_retry_policy(invocation=invocation)
5960
self._attempt: int = 1
6061
self.durable_promise: DurablePromiseRecord | None = None
6162
self._task: TaskRecord | None = None
@@ -70,6 +71,10 @@ def __init__(
7071
self.parent.id if self.parent else None,
7172
)
7273

74+
def overwrite_invocation(self, invocation: RFI) -> None:
75+
self.invocation = invocation
76+
self.retry_policy = _set_retry_policy(invocation)
77+
7378
def get_coro(self) -> ResonateCoro[T]:
7479
assert self._coro
7580
return self._coro

src/resonate/scheduler/scheduler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ def _process_invoke_msg(
422422
assert record.is_root
423423
assert isinstance(record.invocation, RFI)
424424
assert record.durable_promise is not None
425-
record.invocation = rfi
425+
record.overwrite_invocation(rfi)
426426
else:
427427
record = Record[Any](
428428
id=invoke_msg.root_durable_promise.id,

tests/test_functionality.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ def test_golden_device_rfi() -> None:
246246
group = "test-golden-device-rfi"
247247

248248
def foo_golden_device_rfi(ctx: Context, n: str) -> Generator[Yieldable, Any, str]:
249-
p: Promise[str] = yield ctx.rfi(bar_golden_device_rfi, n).options(
249+
p: Promise[str] = yield ctx.rfi("bar_golden_device_rfi", n).options(
250250
id="bar", send_to=poll(group)
251251
)
252252
assert isinstance(p, Promise)
@@ -281,7 +281,7 @@ def exec_id(n: int) -> str:
281281
def factorial_rfi(ctx: Context, n: int) -> Generator[Yieldable, Any, int]:
282282
if n == 0:
283283
return 1
284-
p = yield ctx.rfi(factorial_rfi, n - 1).options(
284+
p = yield ctx.rfi("factorial_rfi", n - 1).options(
285285
id=exec_id(n - 1), send_to=poll(group)
286286
)
287287
return n * (yield p)

0 commit comments

Comments
 (0)