@@ -54,12 +54,12 @@ class ModelEntry(ScheduleEntry):
54
54
"""Scheduler entry taken from database row."""
55
55
56
56
model_schedules = (
57
- (schedules .crontab , CrontabSchedule , " crontab" ),
58
- (schedules .schedule , IntervalSchedule , " interval" ),
59
- (schedules .solar , SolarSchedule , " solar" ),
60
- (clocked , ClockedSchedule , " clocked" ),
57
+ (schedules .crontab , CrontabSchedule , ' crontab' ),
58
+ (schedules .schedule , IntervalSchedule , ' interval' ),
59
+ (schedules .solar , SolarSchedule , ' solar' ),
60
+ (clocked , ClockedSchedule , ' clocked' )
61
61
)
62
- save_fields = [" last_run_at" , " total_run_count" , " no_changes" ]
62
+ save_fields = [' last_run_at' , ' total_run_count' , ' no_changes' ]
63
63
64
64
def __init__ (self , model , app = None ):
65
65
"""Initialize the model entry."""
@@ -70,34 +70,33 @@ def __init__(self, model, app=None):
70
70
self .schedule = model .schedule
71
71
except model .DoesNotExist :
72
72
logger .error (
73
- " Disabling schedule %s that was removed from database" ,
73
+ ' Disabling schedule %s that was removed from database' ,
74
74
self .name ,
75
75
)
76
76
self ._disable (model )
77
77
try :
78
- self .args = loads (model .args or "[]" )
79
- self .kwargs = loads (model .kwargs or "{}" )
78
+ self .args = loads (model .args or '[]' )
79
+ self .kwargs = loads (model .kwargs or '{}' )
80
80
except ValueError as exc :
81
81
logger .exception (
82
- "Removing schedule %s for argument deseralization error: %r" ,
83
- self .name ,
84
- exc ,
82
+ 'Removing schedule %s for argument deseralization error: %r' ,
83
+ self .name , exc ,
85
84
)
86
85
self ._disable (model )
87
86
88
87
self .options = {}
89
- for option in [" queue" , " exchange" , " routing_key" , " priority" ]:
88
+ for option in [' queue' , ' exchange' , ' routing_key' , ' priority' ]:
90
89
value = getattr (model , option )
91
90
if value is None :
92
91
continue
93
92
self .options [option ] = value
94
93
95
- if getattr (model , " expires_" , None ):
96
- self .options [" expires" ] = getattr (model , " expires_" )
94
+ if getattr (model , ' expires_' , None ):
95
+ self .options [' expires' ] = getattr (model , ' expires_' )
97
96
98
- headers = loads (model .headers or "{}" )
99
- headers [" periodic_task_name" ] = model .name
100
- self .options [" headers" ] = headers
97
+ headers = loads (model .headers or '{}' )
98
+ headers [' periodic_task_name' ] = model .name
99
+ self .options [' headers' ] = headers
101
100
102
101
self .total_run_count = model .total_run_count
103
102
self .model = model
@@ -109,9 +108,8 @@ def __init__(self, model, app=None):
109
108
# This will trigger the job to run at start_time
110
109
# and avoid the heap block.
111
110
if self .model .start_time :
112
- model .last_run_at = model .last_run_at - datetime .timedelta (
113
- days = 365 * 30
114
- )
111
+ model .last_run_at = model .last_run_at \
112
+ - datetime .timedelta (days = 365 * 30 )
115
113
116
114
self .last_run_at = model .last_run_at
117
115
@@ -128,7 +126,7 @@ def is_due(self):
128
126
# START DATE: only run after the `start_time`, if one exists.
129
127
if self .model .start_time is not None :
130
128
now = self ._default_now ()
131
- if getattr (settings , " DJANGO_CELERY_BEAT_TZ_AWARE" , True ):
129
+ if getattr (settings , ' DJANGO_CELERY_BEAT_TZ_AWARE' , True ):
132
130
now = maybe_make_aware (self ._default_now ())
133
131
if now < self .model .start_time :
134
132
# The datetime is before the start date - don't run.
@@ -149,7 +147,8 @@ def is_due(self):
149
147
return schedules .schedstate (False , NEVER_CHECK_TIMEOUT )
150
148
151
149
# ONE OFF TASK: Disable one off tasks after they've ran once
152
- if self .model .one_off and self .model .enabled and self .model .total_run_count > 0 :
150
+ if self .model .one_off and self .model .enabled \
151
+ and self .model .total_run_count > 0 :
153
152
self .model .enabled = False
154
153
self .model .total_run_count = 0 # Reset
155
154
self .model .no_changes = False # Mark the model entry as changed
@@ -164,7 +163,7 @@ def is_due(self):
164
163
return self .schedule .is_due (last_run_at_in_tz )
165
164
166
165
def _default_now (self ):
167
- if getattr (settings , " DJANGO_CELERY_BEAT_TZ_AWARE" , True ):
166
+ if getattr (settings , ' DJANGO_CELERY_BEAT_TZ_AWARE' , True ):
168
167
now = datetime .datetime .now (self .app .timezone )
169
168
else :
170
169
# this ends up getting passed to maybe_make_aware, which expects
@@ -177,7 +176,6 @@ def __next__(self):
177
176
self .model .total_run_count += 1
178
177
self .model .no_changes = True
179
178
return self .__class__ (self .model )
180
-
181
179
next = __next__ # for 2to3
182
180
183
181
def save (self ):
@@ -197,20 +195,20 @@ def to_model_schedule(cls, schedule):
197
195
model_schedule = model_type .from_schedule (schedule )
198
196
model_schedule .save ()
199
197
return model_schedule , model_field
200
- raise ValueError (f"Cannot convert schedule type { schedule !r} to model" )
198
+ raise ValueError (
199
+ f'Cannot convert schedule type { schedule !r} to model' )
201
200
202
201
@classmethod
203
202
def from_entry (cls , name , app = None , ** entry ):
204
203
obj , created = PeriodicTask ._default_manager .update_or_create (
205
- name = name ,
206
- defaults = cls ._unpack_fields (** entry ),
204
+ name = name , defaults = cls ._unpack_fields (** entry ),
207
205
)
208
206
return cls (obj , app = app )
209
207
210
208
@classmethod
211
- def _unpack_fields (
212
- cls , schedule , args = None , kwargs = None , relative = None , options = None , ** entry
213
- ):
209
+ def _unpack_fields (cls , schedule ,
210
+ args = None , kwargs = None , relative = None , options = None ,
211
+ ** entry ):
214
212
entry_schedules = {
215
213
model_field : None for _ , _ , model_field in cls .model_schedules
216
214
}
@@ -220,37 +218,27 @@ def _unpack_fields(
220
218
entry_schedules ,
221
219
args = dumps (args or []),
222
220
kwargs = dumps (kwargs or {}),
223
- ** cls ._unpack_options (** options or {}),
221
+ ** cls ._unpack_options (** options or {})
224
222
)
225
223
return entry
226
224
227
225
@classmethod
228
- def _unpack_options (
229
- cls ,
230
- queue = None ,
231
- exchange = None ,
232
- routing_key = None ,
233
- priority = None ,
234
- headers = None ,
235
- expire_seconds = None ,
236
- ** kwargs ,
237
- ):
226
+ def _unpack_options (cls , queue = None , exchange = None , routing_key = None ,
227
+ priority = None , headers = None , expire_seconds = None ,
228
+ ** kwargs ):
238
229
return {
239
- " queue" : queue ,
240
- " exchange" : exchange ,
241
- " routing_key" : routing_key ,
242
- " priority" : priority ,
243
- " headers" : dumps (headers or {}),
244
- " expire_seconds" : expire_seconds ,
230
+ ' queue' : queue ,
231
+ ' exchange' : exchange ,
232
+ ' routing_key' : routing_key ,
233
+ ' priority' : priority ,
234
+ ' headers' : dumps (headers or {}),
235
+ ' expire_seconds' : expire_seconds ,
245
236
}
246
237
247
238
def __repr__ (self ):
248
- return "<ModelEntry: {} {}(*{}, **{}) {}>" .format (
249
- safe_str (self .name ),
250
- self .task ,
251
- safe_repr (self .args ),
252
- safe_repr (self .kwargs ),
253
- self .schedule ,
239
+ return '<ModelEntry: {} {}(*{}, **{}) {}>' .format (
240
+ safe_str (self .name ), self .task , safe_repr (self .args ),
241
+ safe_repr (self .kwargs ), self .schedule ,
254
242
)
255
243
256
244
@@ -273,17 +261,16 @@ def __init__(self, *args, **kwargs):
273
261
Scheduler .__init__ (self , * args , ** kwargs )
274
262
self ._finalize = Finalize (self , self .sync , exitpriority = 5 )
275
263
self .max_interval = (
276
- kwargs .get (" max_interval" )
264
+ kwargs .get (' max_interval' )
277
265
or self .app .conf .beat_max_loop_interval
278
- or DEFAULT_MAX_INTERVAL
279
- )
266
+ or DEFAULT_MAX_INTERVAL )
280
267
281
268
def setup_schedule (self ):
282
269
self .install_default_entries (self .schedule )
283
270
self .update_from_dict (self .app .conf .beat_schedule )
284
271
285
272
def all_as_schedule (self ):
286
- debug (" DatabaseScheduler: Fetching database schedule" )
273
+ debug (' DatabaseScheduler: Fetching database schedule' )
287
274
s = {}
288
275
for model in self .enabled_models ():
289
276
try :
@@ -305,7 +292,8 @@ def enabled_models_qs(self):
305
292
seconds = SCHEDULE_SYNC_MAX_INTERVAL
306
293
)
307
294
exclude_clock_tasks_query = Q (
308
- clocked__isnull = False , clocked__clocked_time__gt = next_schedule_sync
295
+ clocked__isnull = False ,
296
+ clocked__clocked_time__gt = next_schedule_sync
309
297
)
310
298
311
299
exclude_cron_tasks_query = self ._get_crontab_exclude_query ()
@@ -330,7 +318,9 @@ def _get_crontab_exclude_query(self):
330
318
server_hour = server_time .hour
331
319
332
320
# Window of +/- 2 hours around the current hour in server tz.
333
- hours_to_include = [(server_hour + offset ) % 24 for offset in range (- 2 , 3 )]
321
+ hours_to_include = [
322
+ (server_hour + offset ) % 24 for offset in range (- 2 , 3 )
323
+ ]
334
324
hours_to_include += [4 ] # celery's default cleanup task
335
325
336
326
# Get all tasks with a simple numeric hour value
@@ -342,30 +332,30 @@ def _get_crontab_exclude_query(self):
342
332
# Annotate these tasks with their server-hour equivalent
343
333
annotated_tasks = numeric_hour_tasks .annotate (
344
334
# Cast hour string to integer
345
- hour_int = Cast ("hour" , IntegerField ()),
335
+ hour_int = Cast ('hour' , IntegerField ()),
336
+
346
337
# Calculate server-hour based on timezone offset
347
338
server_hour = Case (
348
339
# Handle each timezone specifically
349
340
* [
350
341
When (
351
342
timezone = timezone_name ,
352
343
then = (
353
- F (" hour_int" )
344
+ F (' hour_int' )
354
345
+ self ._get_timezone_offset (timezone_name )
355
346
+ 24
356
- )
357
- % 24 ,
347
+ ) % 24
358
348
)
359
349
for timezone_name in self ._get_unique_timezone_names ()
360
350
],
361
351
# Default case - use hour as is
362
- default = F (" hour_int" ),
363
- ),
352
+ default = F (' hour_int' )
353
+ )
364
354
)
365
355
366
356
excluded_hour_task_ids = annotated_tasks .exclude (
367
357
server_hour__in = hours_to_include
368
- ).values_list ("id" , flat = True )
358
+ ).values_list ('id' , flat = True )
369
359
370
360
# Build the final exclude query:
371
361
# Exclude crontab tasks that are not in our include list
@@ -380,11 +370,15 @@ def _get_valid_hour_formats(self):
380
370
Return a list of all valid hour values (0-23).
381
371
Both zero-padded ("00"–"09") and non-padded ("0"–"23")
382
372
"""
383
- return [str (hour ) for hour in range (24 )] + [f"{ hour :02d} " for hour in range (10 )]
373
+ return [str (hour ) for hour in range (24 )] + [
374
+ f"{ hour :02d} " for hour in range (10 )
375
+ ]
384
376
385
377
def _get_unique_timezone_names (self ):
386
378
"""Get a list of all unique timezone names used in CrontabSchedule"""
387
- return CrontabSchedule .objects .values_list ("timezone" , flat = True ).distinct ()
379
+ return CrontabSchedule .objects .values_list (
380
+ 'timezone' , flat = True
381
+ ).distinct ()
388
382
389
383
def _get_timezone_offset (self , timezone_name ):
390
384
"""
@@ -437,12 +431,12 @@ def schedule_changed(self):
437
431
438
432
last , ts = self ._last_timestamp , self .Changes .last_change ()
439
433
except DatabaseError as exc :
440
- logger .exception (" Database gave error: %r" , exc )
434
+ logger .exception (' Database gave error: %r' , exc )
441
435
return False
442
436
except InterfaceError :
443
437
warning (
444
- " DatabaseScheduler: InterfaceError in schedule_changed(), "
445
- " waiting to retry in next call..."
438
+ ' DatabaseScheduler: InterfaceError in schedule_changed(), '
439
+ ' waiting to retry in next call...'
446
440
)
447
441
return False
448
442
@@ -462,7 +456,7 @@ def reserve(self, entry):
462
456
463
457
def sync (self ):
464
458
if logger .isEnabledFor (logging .DEBUG ):
465
- debug (" Writing entries..." )
459
+ debug (' Writing entries...' )
466
460
_tried = set ()
467
461
_failed = set ()
468
462
try :
@@ -476,11 +470,11 @@ def sync(self):
476
470
except (KeyError , TypeError , ObjectDoesNotExist ):
477
471
_failed .add (name )
478
472
except DatabaseError as exc :
479
- logger .exception (" Database error while sync: %r" , exc )
473
+ logger .exception (' Database error while sync: %r' , exc )
480
474
except InterfaceError :
481
475
warning (
482
- " DatabaseScheduler: InterfaceError in sync(), "
483
- " waiting to retry in next call..."
476
+ ' DatabaseScheduler: InterfaceError in sync(), '
477
+ ' waiting to retry in next call...'
484
478
)
485
479
finally :
486
480
# retry later, only for the failed ones
@@ -490,7 +484,9 @@ def update_from_dict(self, mapping):
490
484
s = {}
491
485
for name , entry_fields in mapping .items ():
492
486
try :
493
- entry = self .Entry .from_entry (name , app = self .app , ** entry_fields )
487
+ entry = self .Entry .from_entry (name ,
488
+ app = self .app ,
489
+ ** entry_fields )
494
490
if entry .model .enabled :
495
491
s [name ] = entry
496
492
@@ -502,11 +498,10 @@ def install_default_entries(self, data):
502
498
entries = {}
503
499
if self .app .conf .result_expires :
504
500
entries .setdefault (
505
- "celery.backend_cleanup" ,
506
- {
507
- "task" : "celery.backend_cleanup" ,
508
- "schedule" : schedules .crontab ("0" , "4" , "*" ),
509
- "options" : {"expire_seconds" : 12 * 3600 },
501
+ 'celery.backend_cleanup' , {
502
+ 'task' : 'celery.backend_cleanup' ,
503
+ 'schedule' : schedules .crontab ('0' , '4' , '*' ),
504
+ 'options' : {'expire_seconds' : 12 * 3600 },
510
505
},
511
506
)
512
507
self .update_from_dict (entries )
@@ -523,20 +518,26 @@ def schedule(self):
523
518
current_time = datetime .datetime .now ()
524
519
525
520
if self ._initial_read :
526
- debug (" DatabaseScheduler: initial read" )
521
+ debug (' DatabaseScheduler: initial read' )
527
522
initial = update = True
528
523
self ._initial_read = False
529
524
self ._last_full_sync = current_time
530
525
elif self .schedule_changed ():
531
- info (" DatabaseScheduler: Schedule changed." )
526
+ info (' DatabaseScheduler: Schedule changed.' )
532
527
update = True
533
528
self ._last_full_sync = current_time
534
529
535
530
# Force update the schedule if it's been more than 5 minutes
536
531
if not update :
537
- time_since_last_sync = (current_time - self ._last_full_sync ).total_seconds ()
538
- if time_since_last_sync >= SCHEDULE_SYNC_MAX_INTERVAL :
539
- debug ("DatabaseScheduler: Forcing full sync after 5 minutes" )
532
+ time_since_last_sync = (
533
+ current_time - self ._last_full_sync
534
+ ).total_seconds ()
535
+ if (
536
+ time_since_last_sync >= SCHEDULE_SYNC_MAX_INTERVAL
537
+ ):
538
+ debug (
539
+ 'DatabaseScheduler: Forcing full sync after 5 minutes'
540
+ )
540
541
update = True
541
542
self ._last_full_sync = current_time
542
543
@@ -548,8 +549,7 @@ def schedule(self):
548
549
self ._heap = []
549
550
self ._heap_invalidated = True
550
551
if logger .isEnabledFor (logging .DEBUG ):
551
- debug (
552
- "Current schedule:\n %s" ,
553
- "\n " .join (repr (entry ) for entry in self ._schedule .values ()),
552
+ debug ('Current schedule:\n %s' , '\n ' .join (
553
+ repr (entry ) for entry in self ._schedule .values ()),
554
554
)
555
555
return self ._schedule
0 commit comments