Skip to content

Commit 9b47c9f

Browse files
authored
Reorganize retry match statement (#255)
1 parent f91d5d7 commit 9b47c9f

File tree

1 file changed

+19
-36
lines changed

1 file changed

+19
-36
lines changed

resonate/scheduler.py

Lines changed: 19 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -738,46 +738,37 @@ def _eval(self, node: Node[State]) -> list[Function | Delayed[Function | Retry]
738738
case Enabled(Running(Lfnc(id=id, func=func, args=args, kwargs=kwargs, opts=opts, attempt=attempt, ctx=ctx, result=result, suspends=suspends, timeout=timeout) as f)):
739739
assert id == node.id, "Id must match node id."
740740

741-
match result, f.retry_policy.next(attempt), opts.durable:
741+
match result, opts.durable, f.retry_policy.next(attempt):
742742
case None, _, _:
743743
node.transition(Blocked(Running(f)))
744744
return [
745745
Function(id, self.id, lambda: func(ctx, *args, **kwargs)),
746746
]
747-
case Ok(v), _, True:
747+
case Ok(v), True, _:
748748
node.transition(Blocked(Running(f)))
749749
return [
750750
Network(id, self.id, ResolvePromiseReq(id=id, ikey=id, data=v)),
751751
]
752-
case Ok(), _, False:
752+
case Ok(), False, _:
753753
node.transition(Enabled(Completed(f.map(result=result))))
754754
self._unblock(suspends, result)
755755
return []
756-
case Ko(e), delay, True if delay is None or time.time() + delay > timeout:
756+
case Ko(e), True, d if d is None or time.time() + d > timeout or type(e) in opts.non_retryable_exceptions:
757757
node.transition(Blocked(Running(f)))
758758
return [
759759
Network(id, self.id, RejectPromiseReq(id=id, ikey=id, data=e)),
760760
]
761-
case Ko(), delay, False if delay is None or time.time() + delay > timeout:
761+
case Ko(e), False, d if d is None or time.time() + d > timeout or type(e) in opts.non_retryable_exceptions:
762762
node.transition(Enabled(Completed(f.map(result=result))))
763763
self._unblock(suspends, result)
764764
return []
765-
case Ko(e), _, True if type(e) in opts.non_retryable_exceptions:
766-
node.transition(Blocked(Running(f)))
767-
return [
768-
Network(id, self.id, RejectPromiseReq(id=id, ikey=id, data=e)),
769-
]
770-
case Ko(e), _, False if type(e) in opts.non_retryable_exceptions:
771-
node.transition(Enabled(Completed(f.map(result=result))))
772-
self._unblock(suspends, result)
773-
return []
774-
case Ko(), delay, _ if delay is not None:
765+
case Ko(), _, d:
766+
assert d is not None, "Delay must be set."
767+
775768
node.transition(Blocked(Running(f.map(attempt=attempt + 1))))
776769
return [
777-
Delayed(Function(id, self.id, lambda: func(ctx, *args, **kwargs)), delay),
770+
Delayed(Function(id, self.id, lambda: func(ctx, *args, **kwargs)), d),
778771
]
779-
case _:
780-
raise NotImplementedError
781772

782773
case Enabled(Running(Coro(id=id, coro=coro, next=next, opts=opts, attempt=attempt, ctx=parent_ctx, timeout=timeout) as c)):
783774
cmd = coro.send(next)
@@ -852,41 +843,33 @@ def _eval(self, node: Node[State]) -> list[Function | Delayed[Function | Retry]
852843
case TRM(id, result), _:
853844
assert id == node.id, "Id must match node id."
854845

855-
match result, c.retry_policy.next(attempt), opts.durable:
856-
case Ok(v), _, True:
846+
match result, opts.durable, c.retry_policy.next(attempt):
847+
case Ok(v), True, _:
857848
node.transition(Blocked(Running(c)))
858849
return [
859850
Network(id, self.id, ResolvePromiseReq(id=id, ikey=id, data=v)),
860851
]
861-
case Ok(), _, False:
852+
case Ok(), False, _:
862853
node.transition(Enabled(Completed(c.map(result=result))))
863854
self._unblock(c.suspends, result)
864855
return []
865-
case Ko(e), delay, True if delay is None or time.time() + delay > timeout:
856+
case Ko(e), True, d if d is None or time.time() + d > timeout or type(e) in opts.non_retryable_exceptions:
866857
node.transition(Blocked(Running(c)))
867858
return [
868859
Network(id, self.id, RejectPromiseReq(id=id, ikey=id, data=e)),
869860
]
870-
case Ko(), delay, False if delay is None or time.time() + delay > timeout:
861+
case Ko(e), False, d if d is None or time.time() + d > timeout or type(e) in opts.non_retryable_exceptions:
871862
node.transition(Enabled(Completed(c.map(result=result))))
872863
self._unblock(c.suspends, result)
873864
return []
874-
case Ko(e), _, True if type(e) in opts.non_retryable_exceptions:
875-
node.transition(Blocked(Running(c)))
876-
return [
877-
Network(id, self.id, RejectPromiseReq(id=id, ikey=id, data=e)),
878-
]
879-
case Ko(e), _, False if type(e) in opts.non_retryable_exceptions:
880-
node.transition(Enabled(Completed(c.map(result=result))))
881-
self._unblock(c.suspends, result)
882-
return []
883-
case Ko(), delay, _ if delay is not None:
865+
case Ko(), _, d:
866+
assert d is not None, "Delay must be set."
867+
884868
node.transition(Blocked(Running(c.map(attempt=attempt + 1))))
885869
return [
886-
Delayed(Retry(id, self.id), delay),
870+
Delayed(Retry(id, self.id), d),
887871
]
888-
case _:
889-
raise NotImplementedError
872+
890873
case _:
891874
raise NotImplementedError
892875

0 commit comments

Comments
 (0)