Skip to content

Commit 17a353f

Browse files
committed
Refactored the triggers
Triggers are now stateful, allowing for correct operation of the combining triggers. They are also JSON serializable now.
1 parent 367068f commit 17a353f

29 files changed

+1642
-1901
lines changed

apscheduler/abc.py

+256
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
from abc import ABCMeta, abstractmethod
2+
from base64 import b64encode, b64decode
3+
from dataclasses import dataclass, field
4+
from datetime import datetime, timedelta
5+
from typing import (
6+
Callable, Iterable, Iterator, Mapping, Any, NoReturn, Optional, Union, AsyncIterable, Dict,
7+
FrozenSet, List, Set)
8+
9+
from .events import Event
10+
11+
12+
class Trigger(Iterator[datetime], metaclass=ABCMeta):
13+
"""Abstract base class that defines the interface that every trigger must implement."""
14+
15+
__slots__ = ()
16+
17+
@abstractmethod
18+
def next(self) -> Optional[datetime]:
19+
"""
20+
Return the next datetime to fire on.
21+
22+
If no such datetime can be calculated, ``None`` is returned.
23+
:raises apscheduler.exceptions.MaxIterationsReached:
24+
"""
25+
26+
@abstractmethod
27+
def __getstate__(self):
28+
"""Return the (JSON compatible) serializable state of the trigger."""
29+
30+
@abstractmethod
31+
def __setstate__(self, state):
32+
"""Initialize an empty instance from an existing state."""
33+
34+
def __iter__(self):
35+
return self
36+
37+
def __next__(self) -> datetime:
38+
dateval = self.next()
39+
if dateval is None:
40+
raise StopIteration
41+
else:
42+
return dateval
43+
44+
45+
@dataclass
46+
class Task:
47+
id: str
48+
func: Callable
49+
max_instances: Optional[int] = None
50+
metadata_arg: Optional[str] = None
51+
stateful: bool = False
52+
misfire_grace_time: Optional[timedelta] = None
53+
54+
55+
@dataclass
56+
class Schedule:
57+
id: str
58+
task_id: str
59+
trigger: Trigger
60+
args: tuple = ()
61+
kwargs: Dict[str, Any] = field(default_factory=dict)
62+
coalesce: bool = True
63+
misfire_grace_time: Optional[timedelta] = None
64+
tags: Optional[FrozenSet[str]] = frozenset()
65+
last_fire_time: Optional[datetime] = field(init=False, default=None)
66+
next_fire_time: Optional[datetime] = field(init=False, default=None)
67+
68+
69+
@dataclass(frozen=True)
70+
class Job:
71+
func_ref: str
72+
args: Optional[tuple] = None
73+
kwargs: Optional[Dict[str, Any]] = None
74+
schedule_id: Optional[str] = None
75+
scheduled_start_time: Optional[datetime] = None
76+
start_deadline: Optional[datetime] = None
77+
tags: Optional[FrozenSet[str]] = frozenset()
78+
79+
80+
class Serializer(metaclass=ABCMeta):
81+
__slots__ = ()
82+
83+
@abstractmethod
84+
def serialize(self, obj) -> bytes:
85+
pass
86+
87+
def serialize_to_unicode(self, obj) -> str:
88+
return b64encode(self.serialize(obj)).decode('ascii')
89+
90+
@abstractmethod
91+
def deserialize(self, serialized: bytes):
92+
pass
93+
94+
def deserialize_from_unicode(self, serialized: str):
95+
return self.deserialize(b64decode(serialized))
96+
97+
98+
class DataStore(metaclass=ABCMeta):
99+
__slots__ = ()
100+
101+
async def __aenter__(self):
102+
await self.start()
103+
return self
104+
105+
async def __aexit__(self):
106+
await self.stop()
107+
108+
async def start(self) -> None:
109+
pass
110+
111+
async def stop(self) -> None:
112+
pass
113+
114+
@abstractmethod
115+
async def add_or_update_schedule(self, schedule: Schedule) -> None:
116+
"""Add or update the given schedule in the store."""
117+
118+
@abstractmethod
119+
async def remove_schedule(self, schedule_id: str) -> None:
120+
"""Remove the designated schedule from the store."""
121+
122+
@abstractmethod
123+
async def remove_all_schedules(self) -> None:
124+
"""Remove all schedules from the store."""
125+
126+
@abstractmethod
127+
async def get_all_schedules(self) -> List[Schedule]:
128+
"""Get a list of all schedules, sorted on the "id" attribute."""
129+
130+
@abstractmethod
131+
async def get_next_fire_time(self) -> Optional[datetime]:
132+
"""
133+
Return the earliest fire time among all unclaimed schedules.
134+
135+
If no running, unclaimed schedules exist, ``None`` is returned.
136+
"""
137+
138+
@abstractmethod
139+
async def acquire_due_schedules(self, scheduler_id: str) -> List[Schedule]:
140+
"""
141+
Acquire an undefined amount of due schedules not claimed by any other scheduler.
142+
143+
This method claims due schedules for the given scheduler and returns them.
144+
When the scheduler has updated the objects, it calls :meth:`release_due_schedules` to
145+
release the claim on them.
146+
"""
147+
148+
@abstractmethod
149+
async def release_due_schedules(self, scheduler_id: str, schedules: List[Schedule]) -> None:
150+
"""
151+
Update the given schedules and release the claim on them held by this scheduler.
152+
153+
This method should do the following:
154+
155+
#. Remove any of the schedules in the store that have no next fire time
156+
#. Update the schedules that do have a next fire time
157+
#. Release any locks held on the schedules by this scheduler
158+
159+
:param scheduler_id: identifier of the scheduler
160+
:param schedules: schedules previously acquired using :meth:`acquire_due_schedules`
161+
"""
162+
163+
@abstractmethod
164+
async def acquire_job(self, worker_id: str, tags: Set[str]) -> Job:
165+
"""
166+
Claim and return the next matching job from the queue.
167+
168+
:return: the acquired job
169+
"""
170+
171+
@abstractmethod
172+
async def release_job(self, job: Job) -> None:
173+
"""Remove the given job from the queue."""
174+
175+
176+
class EventHub(metaclass=ABCMeta):
177+
__slots__ = ()
178+
179+
async def start(self) -> None:
180+
pass
181+
182+
async def stop(self) -> None:
183+
pass
184+
185+
@abstractmethod
186+
async def publish(self, event: Event) -> None:
187+
"""Publish an event."""
188+
189+
@abstractmethod
190+
async def subscribe(self) -> AsyncIterable[Event]:
191+
"""Return an asynchronous iterable yielding newly received events."""
192+
193+
194+
class AsyncScheduler(metaclass=ABCMeta):
195+
__slots__ = ()
196+
197+
@abstractmethod
198+
def define_task(self, func: Callable, task_id: Optional[str] = None, *,
199+
max_instances: Optional[int],
200+
misfire_grace_time: Union[float, timedelta]) -> str:
201+
if not task_id:
202+
task_id = f'{func.__module__}.{func.__qualname__}'
203+
if isinstance(misfire_grace_time, float):
204+
misfire_grace_time = timedelta(misfire_grace_time)
205+
206+
task = Task(id=task_id, func=func, max_instances=max_instances,
207+
misfire_grace_time=misfire_grace_time)
208+
209+
return task_id
210+
211+
@abstractmethod
212+
async def add_schedule(self, task: Union[str, Callable], trigger: Trigger, *, args: Iterable,
213+
kwargs: Mapping[str, Any]) -> str:
214+
"""
215+
216+
217+
218+
:param task: callable or ID of a predefined task
219+
:param trigger: trigger to define the run times of the schedule
220+
:param args: positional arguments to pass to the task callable
221+
:param kwargs: keyword arguments to pass to the task callable
222+
:return: identifier of the created schedule
223+
"""
224+
225+
@abstractmethod
226+
async def remove_schedule(self, schedule_id: str) -> None:
227+
"""Removes the designated schedule."""
228+
229+
@abstractmethod
230+
async def run(self) -> NoReturn:
231+
"""
232+
Runs the scheduler loop.
233+
234+
This method does not return.
235+
"""
236+
237+
238+
class SyncScheduler(metaclass=ABCMeta):
239+
__slots__ = ()
240+
241+
@abstractmethod
242+
def add_schedule(self, task: Callable, trigger: Trigger, *, args: Iterable,
243+
kwargs: Mapping[str, Any]) -> str:
244+
pass
245+
246+
@abstractmethod
247+
def remove_schedule(self, schedule_id: str) -> None:
248+
pass
249+
250+
@abstractmethod
251+
def run(self) -> NoReturn:
252+
pass
253+
254+
add_schedule.__doc__ = AsyncScheduler.add_schedule.__doc__
255+
remove_schedule.__doc__ = AsyncScheduler.remove_schedule.__doc__
256+
run.__doc__ = AsyncScheduler.run.__doc__

apscheduler/events.py

+21
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55
'EVENT_JOB_ERROR', 'EVENT_JOB_MISSED', 'EVENT_JOB_SUBMITTED', 'EVENT_JOB_MAX_INSTANCES',
66
'SchedulerEvent', 'JobEvent', 'JobExecutionEvent', 'JobSubmissionEvent')
77

8+
from datetime import datetime
9+
from typing import Optional
10+
11+
from dataclasses import dataclass
812

913
EVENT_SCHEDULER_STARTED = EVENT_SCHEDULER_START = 2 ** 0
1014
EVENT_SCHEDULER_SHUTDOWN = 2 ** 1
@@ -30,6 +34,23 @@
3034
EVENT_JOB_ERROR | EVENT_JOB_MISSED | EVENT_JOB_SUBMITTED | EVENT_JOB_MAX_INSTANCES)
3135

3236

37+
@dataclass
38+
class Event:
39+
type: int
40+
scheduler_id: str
41+
42+
43+
@dataclass
44+
class ScheduleEvent(Event):
45+
schedule_id: str
46+
next_fire_time: Optional[datetime]
47+
48+
49+
@dataclass
50+
class JobEent(Event):
51+
job_id: str
52+
53+
3354
class SchedulerEvent:
3455
"""
3556
An event that concerns the scheduler itself.

apscheduler/exceptions.py

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
2+
3+
class JobLookupError(KeyError):
4+
"""Raised when the job store cannot find a job for update or removal."""
5+
6+
def __init__(self, job_id):
7+
super().__init__(u'No job by the id of %s was found' % job_id)
8+
9+
10+
class ConflictingIdError(KeyError):
11+
"""Raised when the uniqueness of job IDs is being violated."""
12+
13+
def __init__(self, job_id):
14+
super().__init__(
15+
u'Job identifier (%s) conflicts with an existing job' % job_id)
16+
17+
18+
class TransientJobError(ValueError):
19+
"""
20+
Raised when an attempt to add transient (with no func_ref) job to a persistent job store is
21+
detected.
22+
"""
23+
24+
def __init__(self, job_id):
25+
super().__init__(
26+
f'Job ({job_id}) cannot be added to this job store because a reference to the '
27+
f'callable could not be determined.')
28+
29+
30+
class SerializationError(Exception):
31+
"""Raised when a serializer fails to serialize the given object."""
32+
33+
34+
class DeserializationError(Exception):
35+
"""Raised when a serializer fails to deserialize the given object."""
36+
37+
38+
class MaxIterationsReached(Exception):
39+
"""
40+
Raised when a trigger has reached its maximum number of allowed computation iterations when
41+
trying to calculate the next fire time.
42+
"""
File renamed without changes.

apscheduler/serializers/cbor.py

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
from dataclasses import dataclass, field
2+
from typing import Dict, Any
3+
4+
from cbor2 import dumps, loads, CBOREncodeTypeError, CBORTag
5+
6+
from ..abc import Serializer
7+
from ..util import marshal_object, unmarshal_object
8+
9+
10+
@dataclass
11+
class CBORSerializer(Serializer):
12+
type_tag: int = 4664
13+
dump_options: Dict[str, Any] = field(default_factory=dict)
14+
load_options: Dict[str, Any] = field(default_factory=dict)
15+
16+
def __post_init__(self):
17+
self.dump_options.setdefault('default', self._default_hook)
18+
self.load_options.setdefault('tag_hook', self._tag_hook)
19+
20+
def _default_hook(self, encoder, value):
21+
if hasattr(value, '__getstate__'):
22+
marshalled = marshal_object(value)
23+
encoder.encode(CBORTag(self.type_tag, marshalled))
24+
else:
25+
raise CBOREncodeTypeError(f'cannot serialize type {value.__class__.__name__}')
26+
27+
def _tag_hook(self, decoder, tag: CBORTag, shareable_index: int = None):
28+
if tag.tag == self.type_tag:
29+
cls_ref, state = tag.value
30+
return unmarshal_object(cls_ref, state)
31+
32+
def serialize(self, obj) -> bytes:
33+
return dumps(obj, **self.dump_options)
34+
35+
def deserialize(self, serialized: bytes):
36+
return loads(serialized, **self.load_options)

0 commit comments

Comments
 (0)