Skip to content

Clean up #210

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion resonate/conventions/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from .base import Base
from .default import Default
from .sleep import Sleep

__all__ = ["Default", "Sleep"]
__all__ = ["Base", "Default", "Sleep"]
3 changes: 2 additions & 1 deletion resonate/message_sources/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from .local import LocalMessageSource
from .poller import Poller

__all__ = ["Poller"]
__all__ = ["LocalMessageSource", "Poller"]
55 changes: 55 additions & 0 deletions resonate/message_sources/local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from __future__ import annotations

import threading
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from resonate.models.message import Mesg
from resonate.models.message_source import MessageQ
from resonate.stores import LocalStore


class LocalMessageSource:
def __init__(self, group: str, id: str, store: LocalStore) -> None:
self._group = group
self._id = id
self._store = store
self._thread: threading.Thread | None = None
self._stop_event = threading.Event()

@property
def unicast(self) -> str:
return f"poll://{self._group}/{self._id}"

@property
def anycast(self) -> str:
return f"poll://{self._group}/{self._id}"

def start(self, mq: MessageQ) -> None:
if self._thread is not None:
return

self._stop_event.clear()
self._thread = threading.Thread(
target=self._loop,
args=(mq,),
name="local_msg_source",
daemon=True,
)
self._thread.start()

def stop(self) -> None:
if self._thread is not None:
self._stop_event.set()
self._thread.join()
self._thread = None
self._stop_event.clear()

def _loop(self, mq: MessageQ) -> None:
while not self._stop_event.is_set():
for msg in self.step():
mq.enqueue(msg)
self._stop_event.wait(0.1)

def step(self) -> list[Mesg]:
return [m for _, m in self._store.step()]
2 changes: 1 addition & 1 deletion resonate/message_sources/poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import requests

from resonate.encoders.json import JsonEncoder
from resonate.encoders import JsonEncoder
from resonate.logging import logger
from resonate.models.message import InvokeMesg, NotifyMesg, ResumeMesg

Expand Down
2 changes: 1 addition & 1 deletion resonate/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import TYPE_CHECKING, TypedDict

from resonate.errors import ResonateValidationError
from resonate.retry_policies.never import Never
from resonate.retry_policies import Never

if TYPE_CHECKING:
from resonate.models.retry_policy import RetryPolicy
Expand Down
13 changes: 5 additions & 8 deletions resonate/resonate.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,17 @@
from typing import TYPE_CHECKING, Any, Concatenate, overload

from resonate.bridge import Bridge
from resonate.conventions.base import Base
from resonate.conventions.default import Default
from resonate.conventions.sleep import Sleep
from resonate.conventions import Base, Default, Sleep
from resonate.coroutine import LFC, LFI, RFC, RFI
from resonate.dependencies import Dependencies
from resonate.message_sources.poller import Poller
from resonate.message_sources import LocalMessageSource, Poller
from resonate.models.commands import Invoke, Listen
from resonate.models.durable_promise import DurablePromise
from resonate.models.handle import Handle
from resonate.options import Options
from resonate.registry import Registry
from resonate.retry_policies import Exponential, Never
from resonate.stores.local import LocalStore, _LocalMessageSource
from resonate.stores.remote import RemoteStore
from resonate.stores import LocalStore, RemoteStore

if TYPE_CHECKING:
from collections.abc import Generator
Expand All @@ -47,8 +44,8 @@ def __init__(
) -> None:
# enforce mutual inclusion/exclusion of store and message source
assert (store is None) == (message_source is None), "store and message source must both be set or both be unset"
assert not isinstance(store, LocalStore) or isinstance(message_source, _LocalMessageSource), "message source must be local message source"
assert not isinstance(store, RemoteStore) or not isinstance(message_source, _LocalMessageSource), "message source must not be local message source"
assert not isinstance(store, LocalStore) or isinstance(message_source, LocalMessageSource), "message source must be local message source"
assert not isinstance(store, RemoteStore) or not isinstance(message_source, LocalMessageSource), "message source must not be local message source"

self._started = False

Expand Down
60 changes: 6 additions & 54 deletions resonate/stores/local.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
from __future__ import annotations

import threading
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Literal, final

from resonate.clocks.wall import WallClock
from resonate.encoders.base64 import Base64Encoder
from resonate.encoders.chain import ChainEncoder
from resonate.encoders.json import JsonEncoder
from resonate.clocks import WallClock
from resonate.encoders import Base64Encoder, ChainEncoder, JsonEncoder
from resonate.errors import ResonateStoreError
from resonate.message_sources import LocalMessageSource
from resonate.models.callback import Callback
from resonate.models.durable_promise import DurablePromise
from resonate.models.message import InvokeMesg, Mesg, NotifyMesg, ResumeMesg, TaskMesg
from resonate.models.task import Task
from resonate.routers.tag import TagRouter
from resonate.routers import TagRouter

if TYPE_CHECKING:
from resonate.models.clock import Clock
from resonate.models.encoder import Encoder
from resonate.models.message_source import MessageQ, MessageSource
from resonate.models.message_source import MessageSource
from resonate.models.router import Router


Expand Down Expand Up @@ -60,7 +58,7 @@ def add_router(self, router: Router) -> None:
self._routers.append(router)

def message_source(self, group: str, id: str) -> MessageSource:
return _LocalMessageSource(group, id, self)
return LocalMessageSource(group, id, self)

def step(self) -> list[tuple[str, Mesg]]:
messages: list[tuple[str, Mesg]] = []
Expand Down Expand Up @@ -777,49 +775,3 @@ def to_dict(self) -> dict[str, Any]:

def ikey_match(left: str | None, right: str | None) -> bool:
return left is not None and right is not None and left == right


class _LocalMessageSource:
def __init__(self, group: str, id: str, store: LocalStore) -> None:
self._group = group
self._id = id
self._store = store
self._thread: threading.Thread | None = None
self._stop_event = threading.Event()

@property
def unicast(self) -> str:
return f"poll://{self._group}/{self._id}"

@property
def anycast(self) -> str:
return f"poll://{self._group}/{self._id}"

def start(self, mq: MessageQ) -> None:
if self._thread is not None:
return

self._stop_event.clear()
self._thread = threading.Thread(
target=self._loop,
args=(mq,),
name="local_msg_source",
daemon=True,
)
self._thread.start()

def stop(self) -> None:
if self._thread is not None:
self._stop_event.set()
self._thread.join()
self._thread = None
self._stop_event.clear()

def _loop(self, mq: MessageQ) -> None:
while not self._stop_event.is_set():
for msg in self.step():
mq.enqueue(msg)
self._stop_event.wait(0.1)

def step(self) -> list[Mesg]:
return [m for _, m in self._store.step()]
4 changes: 1 addition & 3 deletions resonate/stores/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@

from requests import PreparedRequest, Request, Response, Session

from resonate.encoders.base64 import Base64Encoder
from resonate.encoders.chain import ChainEncoder
from resonate.encoders.json import JsonEncoder
from resonate.encoders import Base64Encoder, ChainEncoder, JsonEncoder
from resonate.errors import ResonateStoreError
from resonate.models.callback import Callback
from resonate.models.durable_promise import DurablePromise
Expand Down
4 changes: 2 additions & 2 deletions sim/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import TYPE_CHECKING, Any

from resonate import Context
from resonate.clocks.step import StepClock
from resonate.clocks import StepClock
from resonate.dependencies import Dependencies
from resonate.errors import ResonateStoreError
from resonate.models.commands import (
Expand Down Expand Up @@ -43,7 +43,7 @@
from resonate.models.task import Task
from resonate.options import Options
from resonate.scheduler import Done, More, Scheduler
from resonate.stores.local import LocalStore
from resonate.stores import LocalStore

if TYPE_CHECKING:
from collections.abc import Callable
Expand Down
5 changes: 2 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@

import pytest

from resonate.message_sources.poller import Poller
from resonate.stores.local import LocalStore
from resonate.stores.remote import RemoteStore
from resonate.message_sources import Poller
from resonate.stores import LocalStore, RemoteStore

if TYPE_CHECKING:
from resonate.models.message_source import MessageSource
Expand Down
4 changes: 2 additions & 2 deletions tests/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from inspect import isgeneratorfunction
from typing import TYPE_CHECKING, Any, Protocol

from resonate.conventions.sleep import Sleep
from resonate.conventions import Sleep
from resonate.coroutine import LFC, LFI, RFC, RFI
from resonate.models.commands import (
CancelPromiseReq,
Expand All @@ -31,7 +31,7 @@
from resonate.registry import Registry
from resonate.resonate import Default
from resonate.scheduler import Scheduler
from resonate.stores.local import LocalStore
from resonate.stores import LocalStore

if TYPE_CHECKING:
from collections.abc import Callable
Expand Down
2 changes: 1 addition & 1 deletion tests/test_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pytest

from resonate.resonate import Resonate
from resonate.retry_policies.constant import Constant
from resonate.retry_policies import Constant

if TYPE_CHECKING:
from collections.abc import Generator
Expand Down
4 changes: 1 addition & 3 deletions tests/test_encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@

import pytest

from resonate.encoders.base64 import Base64Encoder
from resonate.encoders.chain import ChainEncoder
from resonate.encoders.json import JsonEncoder
from resonate.encoders import Base64Encoder, ChainEncoder, JsonEncoder


@pytest.mark.parametrize(
Expand Down
2 changes: 1 addition & 1 deletion tests/test_store_promise.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import pytest

from resonate.stores.local import ResonateStoreError
from resonate.errors import ResonateStoreError

if TYPE_CHECKING:
from resonate.models.store import Store
Expand Down
Loading