Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 87d324a

Browse files
Mathieu Veltenhughns
authored andcommitted
Task scheduler: add replication notify for new task to launch ASAP (#16184)
1 parent a5138ea commit 87d324a

File tree

5 files changed

+114
-67
lines changed

5 files changed

+114
-67
lines changed

changelog.d/16184.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Task scheduler: add replication notify for new task to launch ASAP.

synapse/replication/tcp/commands.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,17 @@ def to_line(self) -> str:
452452
return json_encoder.encode([self.instance_name, self.lock_name, self.lock_key])
453453

454454

455+
class NewActiveTaskCommand(_SimpleCommand):
456+
"""Sent to inform instance handling background tasks that a new active task is available to run.
457+
458+
Format::
459+
460+
NEW_ACTIVE_TASK "<task_id>"
461+
"""
462+
463+
NAME = "NEW_ACTIVE_TASK"
464+
465+
455466
_COMMANDS: Tuple[Type[Command], ...] = (
456467
ServerCommand,
457468
RdataCommand,
@@ -466,6 +477,7 @@ def to_line(self) -> str:
466477
RemoteServerUpCommand,
467478
ClearUserSyncsCommand,
468479
LockReleasedCommand,
480+
NewActiveTaskCommand,
469481
)
470482

471483
# Map of command name to command type.

synapse/replication/tcp/handler.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
Command,
4141
FederationAckCommand,
4242
LockReleasedCommand,
43+
NewActiveTaskCommand,
4344
PositionCommand,
4445
RdataCommand,
4546
RemoteServerUpCommand,
@@ -238,6 +239,10 @@ def __init__(self, hs: "HomeServer"):
238239
if self._is_master:
239240
self._server_notices_sender = hs.get_server_notices_sender()
240241

242+
self._task_scheduler = None
243+
if hs.config.worker.run_background_tasks:
244+
self._task_scheduler = hs.get_task_scheduler()
245+
241246
if hs.config.redis.redis_enabled:
242247
# If we're using Redis, it's the background worker that should
243248
# receive USER_IP commands and store the relevant client IPs.
@@ -663,6 +668,15 @@ def on_LOCK_RELEASED(
663668
cmd.instance_name, cmd.lock_name, cmd.lock_key
664669
)
665670

671+
async def on_NEW_ACTIVE_TASK(
672+
self, conn: IReplicationConnection, cmd: NewActiveTaskCommand
673+
) -> None:
674+
"""Called when get a new NEW_ACTIVE_TASK command."""
675+
if self._task_scheduler:
676+
task = await self._task_scheduler.get_task(cmd.data)
677+
if task:
678+
await self._task_scheduler._launch_task(task)
679+
666680
def new_connection(self, connection: IReplicationConnection) -> None:
667681
"""Called when we have a new connection."""
668682
self._connections.append(connection)
@@ -776,6 +790,10 @@ def on_lock_released(
776790
if instance_name == self._instance_name:
777791
self.send_command(LockReleasedCommand(instance_name, lock_name, lock_key))
778792

793+
def send_new_active_task(self, task_id: str) -> None:
794+
"""Called when a new task has been scheduled for immediate launch and is ACTIVE."""
795+
self.send_command(NewActiveTaskCommand(task_id))
796+
779797

780798
UpdateToken = TypeVar("UpdateToken")
781799
UpdateRow = TypeVar("UpdateRow")

synapse/util/task_scheduler.py

Lines changed: 43 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,13 @@ class TaskScheduler:
5757
the code launching the task.
5858
You can also specify the `result` (and/or an `error`) when returning from the function.
5959
60-
The reconciliation loop runs every 5 mns, so this is not a precise scheduler. When wanting
61-
to launch now, the launch will still not happen before the next loop run.
62-
63-
Tasks will be run on the worker specified with `run_background_tasks_on` config,
64-
or the main one by default.
60+
The reconciliation loop runs every minute, so this is not a precise scheduler.
6561
There is a limit of 10 concurrent tasks, so tasks may be delayed if the pool is already
6662
full. In this regard, please take great care that scheduled tasks can actually finished.
6763
For now there is no mechanism to stop a running task if it is stuck.
64+
65+
Tasks will be run on the worker specified with `run_background_tasks_on` config,
66+
or the main one by default.
6867
"""
6968

7069
# Precision of the scheduler, evaluation of tasks to run will only happen
@@ -85,7 +84,7 @@ def __init__(self, hs: "HomeServer"):
8584
self._actions: Dict[
8685
str,
8786
Callable[
88-
[ScheduledTask, bool],
87+
[ScheduledTask],
8988
Awaitable[Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]],
9089
],
9190
] = {}
@@ -98,11 +97,13 @@ def __init__(self, hs: "HomeServer"):
9897
"handle_scheduled_tasks",
9998
self._handle_scheduled_tasks,
10099
)
100+
else:
101+
self.replication_client = hs.get_replication_command_handler()
101102

102103
def register_action(
103104
self,
104105
function: Callable[
105-
[ScheduledTask, bool],
106+
[ScheduledTask],
106107
Awaitable[Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]],
107108
],
108109
action_name: str,
@@ -115,10 +116,9 @@ def register_action(
115116
calling `schedule_task` but rather in an `__init__` method.
116117
117118
Args:
118-
function: The function to be executed for this action. The parameters
119-
passed to the function when launched are the `ScheduledTask` being run,
120-
and a `first_launch` boolean to signal if it's a resumed task or the first
121-
launch of it. The function should return a tuple of new `status`, `result`
119+
function: The function to be executed for this action. The parameter
120+
passed to the function when launched is the `ScheduledTask` being run.
121+
The function should return a tuple of new `status`, `result`
122122
and `error` as specified in `ScheduledTask`.
123123
action_name: The name of the action to be associated with the function
124124
"""
@@ -171,6 +171,12 @@ async def schedule_task(
171171
)
172172
await self._store.insert_scheduled_task(task)
173173

174+
if status == TaskStatus.ACTIVE:
175+
if self._run_background_tasks:
176+
await self._launch_task(task)
177+
else:
178+
self.replication_client.send_new_active_task(task.id)
179+
174180
return task.id
175181

176182
async def update_task(
@@ -265,21 +271,13 @@ async def delete_task(self, id: str) -> None:
265271
Args:
266272
id: id of the task to delete
267273
"""
268-
if self.task_is_running(id):
269-
raise Exception(f"Task {id} is currently running and can't be deleted")
274+
task = await self.get_task(id)
275+
if task is None:
276+
raise Exception(f"Task {id} does not exist")
277+
if task.status == TaskStatus.ACTIVE:
278+
raise Exception(f"Task {id} is currently ACTIVE and can't be deleted")
270279
await self._store.delete_scheduled_task(id)
271280

272-
def task_is_running(self, id: str) -> bool:
273-
"""Check if a task is currently running.
274-
275-
Can only be called from the worker handling the task scheduling.
276-
277-
Args:
278-
id: id of the task to check
279-
"""
280-
assert self._run_background_tasks
281-
return id in self._running_tasks
282-
283281
async def _handle_scheduled_tasks(self) -> None:
284282
"""Main loop taking care of launching tasks and cleaning up old ones."""
285283
await self._launch_scheduled_tasks()
@@ -288,29 +286,11 @@ async def _handle_scheduled_tasks(self) -> None:
288286
async def _launch_scheduled_tasks(self) -> None:
289287
"""Retrieve and launch scheduled tasks that should be running at that time."""
290288
for task in await self.get_tasks(statuses=[TaskStatus.ACTIVE]):
291-
if not self.task_is_running(task.id):
292-
if (
293-
len(self._running_tasks)
294-
< TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS
295-
):
296-
await self._launch_task(task, first_launch=False)
297-
else:
298-
if (
299-
self._clock.time_msec()
300-
> task.timestamp + TaskScheduler.LAST_UPDATE_BEFORE_WARNING_MS
301-
):
302-
logger.warn(
303-
f"Task {task.id} (action {task.action}) has seen no update for more than 24h and may be stuck"
304-
)
289+
await self._launch_task(task)
305290
for task in await self.get_tasks(
306291
statuses=[TaskStatus.SCHEDULED], max_timestamp=self._clock.time_msec()
307292
):
308-
if (
309-
not self.task_is_running(task.id)
310-
and len(self._running_tasks)
311-
< TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS
312-
):
313-
await self._launch_task(task, first_launch=True)
293+
await self._launch_task(task)
314294

315295
running_tasks_gauge.set(len(self._running_tasks))
316296

@@ -320,27 +300,27 @@ async def _clean_scheduled_tasks(self) -> None:
320300
statuses=[TaskStatus.FAILED, TaskStatus.COMPLETE]
321301
):
322302
# FAILED and COMPLETE tasks should never be running
323-
assert not self.task_is_running(task.id)
303+
assert task.id not in self._running_tasks
324304
if (
325305
self._clock.time_msec()
326306
> task.timestamp + TaskScheduler.KEEP_TASKS_FOR_MS
327307
):
328308
await self._store.delete_scheduled_task(task.id)
329309

330-
async def _launch_task(self, task: ScheduledTask, first_launch: bool) -> None:
310+
async def _launch_task(self, task: ScheduledTask) -> None:
331311
"""Launch a scheduled task now.
332312
333313
Args:
334314
task: the task to launch
335-
first_launch: `True` if it's the first time is launched, `False` otherwise
336315
"""
337-
assert task.action in self._actions
316+
assert self._run_background_tasks
338317

318+
assert task.action in self._actions
339319
function = self._actions[task.action]
340320

341321
async def wrapper() -> None:
342322
try:
343-
(status, result, error) = await function(task, first_launch)
323+
(status, result, error) = await function(task)
344324
except Exception:
345325
f = Failure()
346326
logger.error(
@@ -360,6 +340,20 @@ async def wrapper() -> None:
360340
)
361341
self._running_tasks.remove(task.id)
362342

343+
if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS:
344+
return
345+
346+
if (
347+
self._clock.time_msec()
348+
> task.timestamp + TaskScheduler.LAST_UPDATE_BEFORE_WARNING_MS
349+
):
350+
logger.warn(
351+
f"Task {task.id} (action {task.action}) has seen no update for more than 24h and may be stuck"
352+
)
353+
354+
if task.id in self._running_tasks:
355+
return
356+
363357
self._running_tasks.add(task.id)
364358
await self.update_task(task.id, status=TaskStatus.ACTIVE)
365359
description = f"{task.id}-{task.action}"

tests/util/test_task_scheduler.py

Lines changed: 40 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,11 @@
2222
from synapse.util import Clock
2323
from synapse.util.task_scheduler import TaskScheduler
2424

25-
from tests import unittest
25+
from tests.replication._base import BaseMultiWorkerStreamTestCase
26+
from tests.unittest import HomeserverTestCase, override_config
2627

2728

28-
class TestTaskScheduler(unittest.HomeserverTestCase):
29+
class TestTaskScheduler(HomeserverTestCase):
2930
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
3031
self.task_scheduler = hs.get_task_scheduler()
3132
self.task_scheduler.register_action(self._test_task, "_test_task")
@@ -34,7 +35,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
3435
self.task_scheduler.register_action(self._resumable_task, "_resumable_task")
3536

3637
async def _test_task(
37-
self, task: ScheduledTask, first_launch: bool
38+
self, task: ScheduledTask
3839
) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
3940
# This test task will copy the parameters to the result
4041
result = None
@@ -77,32 +78,26 @@ def test_schedule_task(self) -> None:
7778
self.assertIsNone(task)
7879

7980
async def _sleeping_task(
80-
self, task: ScheduledTask, first_launch: bool
81+
self, task: ScheduledTask
8182
) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
8283
# Sleep for a second
8384
await deferLater(self.reactor, 1, lambda: None)
8485
return TaskStatus.COMPLETE, None, None
8586

8687
def test_schedule_lot_of_tasks(self) -> None:
8788
"""Schedule more than `TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS` tasks and check the behavior."""
88-
timestamp = self.clock.time_msec() + 30 * 1000
8989
task_ids = []
9090
for i in range(TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS + 1):
9191
task_ids.append(
9292
self.get_success(
9393
self.task_scheduler.schedule_task(
9494
"_sleeping_task",
95-
timestamp=timestamp,
9695
params={"val": i},
9796
)
9897
)
9998
)
10099

101-
# The timestamp being 30s after now the task should been executed
102-
# after the first scheduling loop is run
103-
self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000))
104-
105-
# This is to give the time to the sleeping tasks to finish
100+
# This is to give the time to the active tasks to finish
106101
self.reactor.advance(1)
107102

108103
# Check that only MAX_CONCURRENT_RUNNING_TASKS tasks has run and that one
@@ -120,10 +115,11 @@ def test_schedule_lot_of_tasks(self) -> None:
120115
)
121116

122117
scheduled_tasks = [
123-
t for t in tasks if t is not None and t.status == TaskStatus.SCHEDULED
118+
t for t in tasks if t is not None and t.status == TaskStatus.ACTIVE
124119
]
125120
self.assertEquals(len(scheduled_tasks), 1)
126121

122+
# We need to wait for the next run of the scheduler loop
127123
self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000))
128124
self.reactor.advance(1)
129125

@@ -138,23 +134,21 @@ def test_schedule_lot_of_tasks(self) -> None:
138134
)
139135

140136
async def _raising_task(
141-
self, task: ScheduledTask, first_launch: bool
137+
self, task: ScheduledTask
142138
) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
143139
raise Exception("raising")
144140

145141
def test_schedule_raising_task(self) -> None:
146142
"""Schedule a task raising an exception and check it runs to failure and report exception content."""
147143
task_id = self.get_success(self.task_scheduler.schedule_task("_raising_task"))
148144

149-
self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000))
150-
151145
task = self.get_success(self.task_scheduler.get_task(task_id))
152146
assert task is not None
153147
self.assertEqual(task.status, TaskStatus.FAILED)
154148
self.assertEqual(task.error, "raising")
155149

156150
async def _resumable_task(
157-
self, task: ScheduledTask, first_launch: bool
151+
self, task: ScheduledTask
158152
) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
159153
if task.result and "in_progress" in task.result:
160154
return TaskStatus.COMPLETE, {"success": True}, None
@@ -169,8 +163,6 @@ def test_schedule_resumable_task(self) -> None:
169163
"""Schedule a resumable task and check that it gets properly resumed and complete after simulating a synapse restart."""
170164
task_id = self.get_success(self.task_scheduler.schedule_task("_resumable_task"))
171165

172-
self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000))
173-
174166
task = self.get_success(self.task_scheduler.get_task(task_id))
175167
assert task is not None
176168
self.assertEqual(task.status, TaskStatus.ACTIVE)
@@ -184,3 +176,33 @@ def test_schedule_resumable_task(self) -> None:
184176
self.assertEqual(task.status, TaskStatus.COMPLETE)
185177
assert task.result is not None
186178
self.assertTrue(task.result.get("success"))
179+
180+
181+
class TestTaskSchedulerWithBackgroundWorker(BaseMultiWorkerStreamTestCase):
182+
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
183+
self.task_scheduler = hs.get_task_scheduler()
184+
self.task_scheduler.register_action(self._test_task, "_test_task")
185+
186+
async def _test_task(
187+
self, task: ScheduledTask
188+
) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
189+
return (TaskStatus.COMPLETE, None, None)
190+
191+
@override_config({"run_background_tasks_on": "worker1"})
192+
def test_schedule_task(self) -> None:
193+
"""Check that a task scheduled to run now is launch right away on the background worker."""
194+
bg_worker_hs = self.make_worker_hs(
195+
"synapse.app.generic_worker",
196+
extra_config={"worker_name": "worker1"},
197+
)
198+
bg_worker_hs.get_task_scheduler().register_action(self._test_task, "_test_task")
199+
200+
task_id = self.get_success(
201+
self.task_scheduler.schedule_task(
202+
"_test_task",
203+
)
204+
)
205+
206+
task = self.get_success(self.task_scheduler.get_task(task_id))
207+
assert task is not None
208+
self.assertEqual(task.status, TaskStatus.COMPLETE)

0 commit comments

Comments
 (0)