15
15
import logging
16
16
from typing import TYPE_CHECKING , Awaitable , Callable , Dict , List , Optional , Set , Tuple
17
17
18
- from prometheus_client import Gauge
19
-
20
18
from twisted .python .failure import Failure
21
19
22
20
from synapse .logging .context import nested_logging_context
23
- from synapse .metrics .background_process_metrics import run_as_background_process
21
+ from synapse .metrics import LaterGauge
22
+ from synapse .metrics .background_process_metrics import (
23
+ run_as_background_process ,
24
+ wrap_as_background_process ,
25
+ )
24
26
from synapse .types import JsonMapping , ScheduledTask , TaskStatus
25
27
from synapse .util .stringutils import random_string
26
28
30
32
logger = logging .getLogger (__name__ )
31
33
32
34
33
- running_tasks_gauge = Gauge (
34
- "synapse_scheduler_running_tasks" ,
35
- "The number of concurrent running tasks handled by the TaskScheduler" ,
36
- )
37
-
38
-
39
35
class TaskScheduler :
40
36
"""
41
37
This is a simple task sheduler aimed at resumable tasks: usually we use `run_in_background`
@@ -70,6 +66,8 @@ class TaskScheduler:
70
66
# Precision of the scheduler, evaluation of tasks to run will only happen
71
67
# every `SCHEDULE_INTERVAL_MS` ms
72
68
SCHEDULE_INTERVAL_MS = 1 * 60 * 1000 # 1mn
69
+ # How often to clean up old tasks.
70
+ CLEANUP_INTERVAL_MS = 30 * 60 * 1000
73
71
# Time before a complete or failed task is deleted from the DB
74
72
KEEP_TASKS_FOR_MS = 7 * 24 * 60 * 60 * 1000 # 1 week
75
73
# Maximum number of tasks that can run at the same time
@@ -92,14 +90,26 @@ def __init__(self, hs: "HomeServer"):
92
90
] = {}
93
91
self ._run_background_tasks = hs .config .worker .run_background_tasks
94
92
93
+ # Flag to make sure we only try and launch new tasks once at a time.
94
+ self ._launching_new_tasks = False
95
+
95
96
if self ._run_background_tasks :
96
97
self ._clock .looping_call (
97
- run_as_background_process ,
98
+ self ._launch_scheduled_tasks ,
99
+ TaskScheduler .SCHEDULE_INTERVAL_MS ,
100
+ )
101
+ self ._clock .looping_call (
102
+ self ._clean_scheduled_tasks ,
98
103
TaskScheduler .SCHEDULE_INTERVAL_MS ,
99
- "handle_scheduled_tasks" ,
100
- self ._handle_scheduled_tasks ,
101
104
)
102
105
106
+ LaterGauge (
107
+ "synapse_scheduler_running_tasks" ,
108
+ "The number of concurrent running tasks handled by the TaskScheduler" ,
109
+ labels = None ,
110
+ caller = lambda : len (self ._running_tasks ),
111
+ )
112
+
103
113
def register_action (
104
114
self ,
105
115
function : Callable [
@@ -234,6 +244,7 @@ async def get_tasks(
234
244
resource_id : Optional [str ] = None ,
235
245
statuses : Optional [List [TaskStatus ]] = None ,
236
246
max_timestamp : Optional [int ] = None ,
247
+ limit : Optional [int ] = None ,
237
248
) -> List [ScheduledTask ]:
238
249
"""Get a list of tasks. Returns all the tasks if no args is provided.
239
250
@@ -247,6 +258,7 @@ async def get_tasks(
247
258
statuses: Limit the returned tasks to the specific statuses
248
259
max_timestamp: Limit the returned tasks to the ones that have
249
260
a timestamp inferior to the specified one
261
+ limit: Only return `limit` number of rows if set.
250
262
251
263
Returns
252
264
A list of `ScheduledTask`, ordered by increasing timestamps
@@ -256,6 +268,7 @@ async def get_tasks(
256
268
resource_id = resource_id ,
257
269
statuses = statuses ,
258
270
max_timestamp = max_timestamp ,
271
+ limit = limit ,
259
272
)
260
273
261
274
async def delete_task (self , id : str ) -> None :
@@ -273,34 +286,58 @@ async def delete_task(self, id: str) -> None:
273
286
raise Exception (f"Task { id } is currently ACTIVE and can't be deleted" )
274
287
await self ._store .delete_scheduled_task (id )
275
288
276
- async def _handle_scheduled_tasks (self ) -> None :
277
- """Main loop taking care of launching tasks and cleaning up old ones."""
278
- await self ._launch_scheduled_tasks ()
279
- await self ._clean_scheduled_tasks ()
289
+ def launch_task_by_id (self , id : str ) -> None :
290
+ """Try launching the task with the given ID."""
291
+ # Don't bother trying to launch new tasks if we're already at capacity.
292
+ if len (self ._running_tasks ) >= TaskScheduler .MAX_CONCURRENT_RUNNING_TASKS :
293
+ return
294
+
295
+ run_as_background_process ("launch_task_by_id" , self ._launch_task_by_id , id )
296
+
297
+ async def _launch_task_by_id (self , id : str ) -> None :
298
+ """Helper async function for `launch_task_by_id`."""
299
+ task = await self .get_task (id )
300
+ if task :
301
+ await self ._launch_task (task )
280
302
303
+ @wrap_as_background_process ("launch_scheduled_tasks" )
281
304
async def _launch_scheduled_tasks (self ) -> None :
282
305
"""Retrieve and launch scheduled tasks that should be running at that time."""
283
- for task in await self . get_tasks ( statuses = [ TaskStatus . ACTIVE ]):
284
- await self ._launch_task ( task )
285
- for task in await self . get_tasks (
286
- statuses = [ TaskStatus . SCHEDULED ], max_timestamp = self . _clock . time_msec ()
287
- ) :
288
- await self . _launch_task ( task )
306
+ # Don't bother trying to launch new tasks if we're already at capacity.
307
+ if len ( self ._running_tasks ) >= TaskScheduler . MAX_CONCURRENT_RUNNING_TASKS :
308
+ return
309
+
310
+ if self . _launching_new_tasks :
311
+ return
289
312
290
- running_tasks_gauge . set ( len ( self ._running_tasks ))
313
+ self ._launching_new_tasks = True
291
314
315
+ try :
316
+ for task in await self .get_tasks (
317
+ statuses = [TaskStatus .ACTIVE ], limit = self .MAX_CONCURRENT_RUNNING_TASKS
318
+ ):
319
+ await self ._launch_task (task )
320
+ for task in await self .get_tasks (
321
+ statuses = [TaskStatus .SCHEDULED ],
322
+ max_timestamp = self ._clock .time_msec (),
323
+ limit = self .MAX_CONCURRENT_RUNNING_TASKS ,
324
+ ):
325
+ await self ._launch_task (task )
326
+
327
+ finally :
328
+ self ._launching_new_tasks = False
329
+
330
+ @wrap_as_background_process ("clean_scheduled_tasks" )
292
331
async def _clean_scheduled_tasks (self ) -> None :
293
332
"""Clean old complete or failed jobs to avoid clutter the DB."""
333
+ now = self ._clock .time_msec ()
294
334
for task in await self ._store .get_scheduled_tasks (
295
- statuses = [TaskStatus .FAILED , TaskStatus .COMPLETE ]
335
+ statuses = [TaskStatus .FAILED , TaskStatus .COMPLETE ],
336
+ max_timestamp = now - TaskScheduler .KEEP_TASKS_FOR_MS ,
296
337
):
297
338
# FAILED and COMPLETE tasks should never be running
298
339
assert task .id not in self ._running_tasks
299
- if (
300
- self ._clock .time_msec ()
301
- > task .timestamp + TaskScheduler .KEEP_TASKS_FOR_MS
302
- ):
303
- await self ._store .delete_scheduled_task (task .id )
340
+ await self ._store .delete_scheduled_task (task .id )
304
341
305
342
async def _launch_task (self , task : ScheduledTask ) -> None :
306
343
"""Launch a scheduled task now.
@@ -339,6 +376,9 @@ async def wrapper() -> None:
339
376
)
340
377
self ._running_tasks .remove (task .id )
341
378
379
+ # Try launch a new task since we've finished with this one.
380
+ self ._clock .call_later (1 , self ._launch_scheduled_tasks )
381
+
342
382
if len (self ._running_tasks ) >= TaskScheduler .MAX_CONCURRENT_RUNNING_TASKS :
343
383
return
344
384
@@ -355,4 +395,4 @@ async def wrapper() -> None:
355
395
356
396
self ._running_tasks .add (task .id )
357
397
await self .update_task (task .id , status = TaskStatus .ACTIVE )
358
- run_as_background_process (task .action , wrapper )
398
+ run_as_background_process (f" task- { task .action } " , wrapper )
0 commit comments