Skip to content

Add resonate:root and resonate:parent tags to promises #260

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion resonate/bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
class Bridge:
def __init__(
self,
ctx: Callable[[str, Info], Context],
ctx: Callable[[str, str, Info], Context],
store: Store,
message_source: MessageSource,
registry: Registry,
Expand Down
4 changes: 3 additions & 1 deletion resonate/conventions/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
@dataclass
class Local:
id: str
r_id: str
p_id: str
opts: Options = field(default_factory=Options, repr=False)

@property
Expand All @@ -32,7 +34,7 @@ def timeout(self) -> float:

@property
def tags(self) -> dict[str, str]:
return {**self.opts.tags, "resonate:scope": "local"}
return {**self.opts.tags, "resonate:root": self.r_id, "resonate:parent": self.p_id, "resonate:scope": "local"}

def options(
self,
Expand Down
4 changes: 3 additions & 1 deletion resonate/conventions/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
@dataclass
class Remote:
id: str
r_id: str
p_id: str
name: str
args: tuple[Any, ...] = field(default_factory=tuple)
kwargs: dict[str, Any] = field(default_factory=dict)
Expand All @@ -35,7 +37,7 @@ def timeout(self) -> float:

@property
def tags(self) -> dict[str, str]:
return {**self.opts.tags, "resonate:scope": "global", "resonate:invoke": self.opts.target}
return {**self.opts.tags, "resonate:root": self.r_id, "resonate:parent": self.p_id, "resonate:scope": "global", "resonate:invoke": self.opts.target}

def options(
self,
Expand Down
38 changes: 19 additions & 19 deletions resonate/resonate.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def __init__(
self._message_source = self._store.message_source(self._group, self._pid)

self._bridge = Bridge(
ctx=lambda id, info: Context(id, info, self._registry, self._dependencies),
ctx=lambda id, cid, info: Context(id, cid, info, self._registry, self._dependencies),
pid=self._pid,
ttl=ttl,
anycast=self._message_source.anycast,
Expand Down Expand Up @@ -211,7 +211,7 @@ def run[**P, R](
name, func, version = self._registry.get(func, self._opts.version)
opts = self._opts.merge(version=version)

self._bridge.run(Remote(id, name, args, kwargs, opts), func, args, kwargs, opts, future)
self._bridge.run(Remote(id, id, id, name, args, kwargs, opts), func, args, kwargs, opts, future)
return Handle(future)

@overload
Expand Down Expand Up @@ -246,7 +246,7 @@ def rpc[**P, R](
else:
name, _, version = self._registry.get(func, self._opts.version)

self._bridge.rpc(Remote(id, name, args, kwargs, self._opts.merge(version=version)), future)
self._bridge.rpc(Remote(id, id, id, name, args, kwargs, self._opts.merge(version=version)), future)
return Handle(future)

def get(self, id: str) -> Handle[Any]:
Expand All @@ -261,8 +261,9 @@ def set_dependency(self, name: str, obj: Any) -> None:


class Context:
def __init__(self, id: str, info: Info, registry: Registry, dependencies: Dependencies) -> None:
def __init__(self, id: str, cid: str, info: Info, registry: Registry, dependencies: Dependencies) -> None:
self._id = id
self._cid = cid
self._info = info
self._registry = registry
self._dependencies = dependencies
Expand All @@ -271,7 +272,7 @@ def __init__(self, id: str, info: Info, registry: Registry, dependencies: Depend
self._counter = 0

def __repr__(self) -> str:
return f"Context(id={self._id}, info={self._info})"
return f"Context(id={self._id}, cid={self._cid}, info={self._info})"

@property
def id(self) -> str:
Expand All @@ -298,19 +299,17 @@ def lfi[**P, R](
*args: P.args,
**kwargs: P.kwargs,
) -> LFI[R]:
self._counter += 1
opts = Options(version=self._registry.latest(func))
return LFI(Local(f"{self.id}.{self._counter}", opts), func, args, kwargs, opts)
return LFI(Local(self._next(), self._cid, self._id, opts), func, args, kwargs, opts)

def lfc[**P, R](
self,
func: Callable[Concatenate[Context, P], Generator[Any, Any, R] | R],
*args: P.args,
**kwargs: P.kwargs,
) -> LFC[R]:
self._counter += 1
opts = Options(version=self._registry.latest(func))
return LFC(Local(f"{self.id}.{self._counter}", opts), func, args, kwargs, opts)
return LFC(Local(self._next(), self._cid, self._id, opts), func, args, kwargs, opts)

@overload
def rfi[**P, R](
Expand All @@ -332,9 +331,8 @@ def rfi(
*args: Any,
**kwargs: Any,
) -> RFI:
self._counter += 1
name, _, version = (func, None, self._registry.latest(func)) if isinstance(func, str) else self._registry.get(func)
return RFI(Remote(f"{self.id}.{self._counter}", name, args, kwargs, Options(version=version)))
return RFI(Remote(self._next(), self._cid, self._id, name, args, kwargs, Options(version=version)))

@overload
def rfc[**P, R](
Expand All @@ -356,9 +354,8 @@ def rfc(
*args: Any,
**kwargs: Any,
) -> RFC:
self._counter += 1
name, _, version = (func, None, self._registry.latest(func)) if isinstance(func, str) else self._registry.get(func)
return RFC(Remote(f"{self.id}.{self._counter}", name, args, kwargs, Options(version=version)))
return RFC(Remote(self._next(), self._cid, self._id, name, args, kwargs, Options(version=version)))

@overload
def detached[**P, R](
Expand All @@ -380,9 +377,8 @@ def detached(
*args: Any,
**kwargs: Any,
) -> RFI:
self._counter += 1
name, _, version = (func, None, self._registry.latest(func)) if isinstance(func, str) else self._registry.get(func)
return RFI(Remote(f"{self.id}.{self._counter}", name, args, kwargs, Options(version=version)), mode="detached")
return RFI(Remote(self._next(), self._cid, self._id, name, args, kwargs, Options(version=version)), mode="detached")

@overload
def typesafe[T](self, cmd: LFI[T] | RFI[T]) -> Generator[LFI[T] | RFI[T], Promise[T], Promise[T]]: ...
Expand All @@ -392,8 +388,7 @@ def typesafe(self, cmd: LFI | RFI | LFC | RFC | Promise) -> Generator[LFI | RFI
return (yield cmd)

def sleep(self, secs: float) -> RFC[None]:
self._counter += 1
return RFC(Sleep(f"{self.id}.{self._counter}", secs))
return RFC(Sleep(self._next(), secs))

def promise(
self,
Expand All @@ -405,8 +400,9 @@ def promise(
data: Any = None,
tags: dict[str, str] | None = None,
) -> RFI:
self._counter += 1
id = id or f"{self.id}.{self._counter}"
default_id = self._next()
id = id or default_id

return RFI(
Base(
id,
Expand All @@ -418,6 +414,10 @@ def promise(
),
)

def _next(self) -> str:
self._counter += 1
return f"{self._id}.{self._counter}"


class Time:
def __init__(self, ctx: Context) -> None:
Expand Down
12 changes: 6 additions & 6 deletions resonate/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def format(self, record: logging.LogRecord) -> str:
class Scheduler:
def __init__(
self,
ctx: Callable[[str, Info], Context],
ctx: Callable[[str, str, Info], Context],
pid: str | None = None,
unicast: str | None = None,
anycast: str | None = None,
Expand Down Expand Up @@ -313,7 +313,7 @@ class Lfnc:
args: tuple[Any, ...]
kwargs: dict[str, Any]
opts: Options
ccls: Callable[[str, Info], Context]
ccls: Callable[[str, str, Info], Context]

attempt: int = field(default=1, init=False)
ctx: Context = field(init=False)
Expand All @@ -323,7 +323,7 @@ class Lfnc:
suspends: list[Node[State]] = field(default_factory=list, init=False)

def __post_init__(self) -> None:
self.ctx = self.ccls(self.id, Info(self))
self.ctx = self.ccls(self.id, self.cid, Info(self))
self.retry_policy = self.opts.retry_policy(self.func) if callable(self.opts.retry_policy) else self.opts.retry_policy

def map(
Expand Down Expand Up @@ -397,7 +397,7 @@ class Coro:
args: tuple[Any, ...]
kwargs: dict[str, Any]
opts: Options
ccls: Callable[[str, Info], Context]
ccls: Callable[[str, str, Info], Context]

coro: Coroutine = field(init=False)
next: None | AWT | Result = field(default=None, init=False)
Expand All @@ -409,7 +409,7 @@ class Coro:
suspends: list[Node[State]] = field(default_factory=list, init=False)

def __post_init__(self) -> None:
self.ctx = self.ccls(self.id, Info(self))
self.ctx = self.ccls(self.id, self.cid, Info(self))
self.coro = Coroutine(self.id, self.cid, self.func(self.ctx, *self.args, **self.kwargs))
self.retry_policy = self.opts.retry_policy(self.func) if callable(self.opts.retry_policy) else self.opts.retry_policy

Expand Down Expand Up @@ -467,7 +467,7 @@ class Done:


class Computation:
def __init__(self, id: str, ctx: Callable[[str, Info], Context], pid: str, unicast: str, anycast: str) -> None:
def __init__(self, id: str, ctx: Callable[[str, str, Info], Context], pid: str, unicast: str, anycast: str) -> None:
self.id = id
self.ctx = ctx
self.pid = pid
Expand Down
2 changes: 1 addition & 1 deletion resonate/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ def __init__(self, r: Random, uni: str, any: str, registry: Registry, dependenci
self.last_heartbeat = 0.0

self.scheduler = Scheduler(
lambda id, info: Context(id, info, self.registry, self.dependencies),
lambda id, cid, info: Context(id, cid, info, self.registry, self.dependencies),
pid=self.uni,
unicast=f"sim://uni@{self.uni}",
anycast=f"sim://any@{self.uni}", # this looks silly, but this is right
Expand Down
Loading
Loading