Skip to content

Commit 23a7326

Browse files
authored
fix: align registered Runtime & appended Worker timeout (#653)
* fix: align registered Runtime & appended Worker timeout Signed-off-by: Keming <[email protected]> * fix timeout condition Signed-off-by: Keming <[email protected]> * add fish to envd Signed-off-by: Keming <[email protected]> --------- Signed-off-by: Keming <[email protected]>
1 parent 8a9dab8 commit 23a7326

File tree

9 files changed

+120
-69
lines changed

9 files changed

+120
-69
lines changed

Cargo.lock

Lines changed: 35 additions & 35 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "mosec"
3-
version = "0.9.4"
3+
version = "0.9.5"
44
authors = ["Keming <[email protected]>", "Zichen <[email protected]>"]
55
edition = "2024"
66
license = "Apache-2.0"
@@ -49,8 +49,8 @@ serde = "1.0"
4949
serde_json = "1.0"
5050
utoipa = "5.3"
5151
utoipa-swagger-ui = { version = "9", features = ["axum"] }
52-
tower = "0.5.1"
53-
tower-http = { version = "0.6.4", features = [
52+
tower = "0.5.2"
53+
tower-http = { version = "0.6.6", features = [
5454
"compression-zstd",
5555
"decompression-zstd",
5656
"compression-gzip",

build.envd

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ def jax():
2929

3030
def build():
3131
base(dev=True, image="ubuntu:22.04")
32-
install.conda()
33-
install.python()
32+
install.uv()
33+
shell("fish")
3434
rust()
3535
runtime.init(["make install"])
3636

mosec/coordinator.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,11 @@ class State(enum.IntFlag):
5050

5151

5252
@contextmanager
53-
def set_mosec_timeout(duration: int):
53+
def set_mosec_timeout(duration: float):
5454
"""Context manager to set a timeout for a code block.
5555
5656
Args:
57-
duration (float): the duration in seconds before timing out
57+
duration: the duration in seconds before timing out
5858
5959
"""
6060

@@ -64,11 +64,11 @@ def handler(signum, frame):
6464
)
6565

6666
signal.signal(signal.SIGALRM, handler)
67-
signal.alarm(duration)
67+
signal.setitimer(signal.ITIMER_REAL, duration)
6868
try:
6969
yield
7070
finally:
71-
signal.alarm(0)
71+
signal.setitimer(signal.ITIMER_REAL, 0)
7272

7373

7474
class FakeSemaphore:
@@ -108,7 +108,7 @@ def __init__(
108108
socket_prefix: str,
109109
stage_name: str,
110110
worker_id: int,
111-
timeout: int,
111+
timeout: float,
112112
):
113113
"""Initialize the mosec coordinator.
114114

mosec/env.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,14 @@ def validate_int_ge(number, name, threshold=1):
8585
assert number >= threshold, f"{name} must be no less than {threshold}"
8686

8787

88+
def validate_float_ge(number, name, threshold=0.0):
89+
"""Validate float number is greater than threshold."""
90+
assert isinstance(number, float), (
91+
f"{name} must be float but you give {type(number)}"
92+
)
93+
assert number >= threshold, f"{name} must be no less than {threshold}"
94+
95+
8896
def validate_str_dict(dictionary: Dict):
8997
"""Validate keys and values of the dictionary is string type."""
9098
for key, value in dictionary.items():

mosec/runtime.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
from typing import Callable, Dict, Iterable, List, Optional, Type, Union, cast
2525

2626
from mosec.coordinator import Coordinator
27-
from mosec.env import env_var_context, validate_env, validate_int_ge
27+
from mosec.env import env_var_context, validate_env, validate_float_ge, validate_int_ge
2828
from mosec.log import get_internal_logger
2929
from mosec.utils import get_mosec_path
3030
from mosec.worker import Worker
@@ -48,23 +48,24 @@ def __init__(
4848
worker: Type[Worker],
4949
num: int = 1,
5050
max_batch_size: int = 1,
51-
max_wait_time: int = 10,
52-
timeout: int = 3,
51+
max_wait_time: int = 0,
52+
timeout: float = 0.0,
5353
start_method: str = "spawn",
5454
env: Union[None, List[Dict[str, str]]] = None,
5555
):
5656
"""Initialize the mosec coordinator.
5757
5858
Args:
59-
worker (Worker): subclass of `mosec.Worker` implemented by users.
60-
num (int): number of workers
59+
worker: subclass of `mosec.Worker` implemented by users.
60+
num: number of workers
6161
max_batch_size: the maximum batch size allowed (>=1), will enable the
6262
dynamic batching if it > 1
63-
max_wait_time (int): the maximum wait time (millisecond)
63+
max_wait_time: the maximum wait time (millisecond)
6464
for dynamic batching, needs to be used with `max_batch_size`
6565
to enable the feature. If not configure, will use the CLI
6666
argument `--wait` (default=10ms)
67-
timeout (int): timeout (second) for the `forward` function.
67+
timeout: timeout (second) for the `forward` function.
68+
If not set, will use the CLI argument `--timeout` (default=3s)
6869
start_method: the process starting method ("spawn" or "fork")
6970
env: the environment variables to set before starting the process
7071
@@ -73,7 +74,7 @@ def __init__(
7374
self.num = num
7475
self.max_batch_size = max_batch_size
7576
self.max_wait_time = max_wait_time
76-
self.timeout = timeout
77+
self.timeout_sec = float(timeout)
7778
self.start_method = start_method
7879
self.env = env
7980

@@ -84,6 +85,13 @@ def __init__(
8485

8586
self._validate()
8687

88+
def _align_timeout_wait(self, timeout_sec: float, max_wait_time: int):
89+
"""Align the ``timeout`` and ``max_wait_time`` arguments."""
90+
self.timeout_sec = self.timeout_sec if self.timeout_sec > 0 else timeout_sec
91+
self.max_wait_time = (
92+
self.max_wait_time if self.max_wait_time > 0 else max_wait_time
93+
)
94+
8795
@staticmethod
8896
def _process_healthy(process: Union[BaseProcess, None]) -> bool:
8997
"""Check if the child process is healthy.
@@ -118,7 +126,7 @@ def _start_process(
118126
work_path,
119127
self.name,
120128
worker_id + 1,
121-
self.timeout,
129+
self.timeout_sec,
122130
),
123131
daemon=True,
124132
)
@@ -173,7 +181,7 @@ def _validate(self):
173181
validate_int_ge(self.num, "worker number")
174182
validate_int_ge(self.max_batch_size, "maximum batch size")
175183
validate_int_ge(self.max_wait_time, "maximum wait time", 0)
176-
validate_int_ge(self.timeout, "forward timeout", 0)
184+
validate_float_ge(self.timeout_sec, "forward timeout", 0)
177185
assert self.start_method in NEW_PROCESS_METHOD, (
178186
f"start method must be one of {NEW_PROCESS_METHOD}"
179187
)

0 commit comments

Comments
 (0)