Description
Description
Celery Beat currently does not provide any metadata in the task.request object to indicate that a task was scheduled by Beat. This makes it impossible to differentiate between a manually triggered task (.delay() or .apply_async()) and a Beat-scheduled task without adding a custom kwargs parameter.
Steps to Reproduce
Set up a simple task for your worker to handle.
# tasks/simple_tasks.py
from app.tasks.celery_app import celery
@celery.task(name="tasks.add")
def add(x: int, y: int):
return x + y
Set up Celery Beat (your producer) with a schedule to trigger the task. Also, create a prerun.connect method so we can compare logging from the different sources.
# tasks/celery_app.py
import logging
from celery import Celery
from celery.signals import task_prerun
from celery.schedules import crontab
logger = logging.getLogger(__name__)
celery = Celery(
"tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0",
)
celery.conf.beat_schedule = {
"add_every_minute": {
"task": "tasks.add",
"schedule": crontab(minute="*"), # Runs every minute
"args": (2, 3),
}
}
celery.conf.timezone = "UTC"
@task_prerun.connect
def log_scheduled_task(task_id, task, **kwargs):
logger.info(f"Request {task.request}")
In a python console, trigger the task manually.
from app.tasks.celery_app import celery
from app.tasks.simple_tasks import add
task = add.delay(2, 3)
Finally, wait for the producer to trigger the task.
Capture the logs of both and compare.
Expected Behavior
Celery Beat should provide some metadata (e.g., a celerybeat: true header or a specific delivery_info field) to distinguish Beat-scheduled tasks from manually triggered ones.
Actual Behavior
The request object is identical in both cases:
Request object from manual triggering:
[2025-03-16 12:44:47,780: INFO/ForkPoolWorker-8] Request <Context: {'lang': 'py', 'task': 'tasks.add', 'id': 'cb3ba2c6-652e-4742-856a-ad0b532d9154', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'cb3ba2c6-652e-4742-856a-ad0b532d9154', 'parent_id': None, 'argsrepr': '(2, 3)', 'kwargsrepr': '{}', 'origin': 'XXX@XXX', 'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 'stamps': {}, 'properties': {'correlation_id': 'cb3ba2c6-652e-4742-856a-ad0b532d9154', 'reply_to': 'ba8db540-47a4-3eb9-bdd5-520017cccb2d', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': '79f94ba2-d732-40c6-87ad-64446ee9c8e9'}, 'reply_to': 'ba8db540-47a4-3eb9-bdd5-520017cccb2d', 'correlation_id': 'cb3ba2c6-652e-4742-856a-ad0b532d9154', 'hostname': 'XXX@XXX', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': False}, 'args': [2, 3], 'kwargs': {}, 'is_eager': False, 'callbacks': None, 'errbacks': None, 'chain': None, 'chord': None, 'called_directly': False, 'headers': None}>
Request object from beat triggering:
[2025-03-16 12:45:00,004: INFO/ForkPoolWorker-8] Request <Context: {'lang': 'py', 'task': 'tasks.add', 'id': 'f456e265-a921-453d-9b92-82cff94335bc', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'f456e265-a921-453d-9b92-82cff94335bc', 'parent_id': None, 'argsrepr': '[2, 3]', 'kwargsrepr': '{}', 'origin': 'XXX@XXX', 'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 'stamps': {}, 'properties': {'correlation_id': 'f456e265-a921-453d-9b92-82cff94335bc', 'reply_to': '15316467-2a05-31ec-bfe5-ed1d11a6b3d1', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': '7b77e502-0fc2-4df0-974b-3d71a230cf8d'}, 'reply_to': '15316467-2a05-31ec-bfe5-ed1d11a6b3d1', 'correlation_id': 'f456e265-a921-453d-9b92-82cff94335bc', 'hostname': 'XXX@XXX', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': False}, 'args': [2, 3], 'kwargs': {}, 'is_eager': False, 'callbacks': None, 'errbacks': None, 'chain': None, 'chord': None, 'called_directly': False, 'headers': None}>
Proposed Solution
- Add a custom header (celerybeat: true) when Celery Beat schedules a task.
- Modify delivery_info to indicate that the task was scheduled by Beat.
Environment
Celery v5.4.0
Django Celery Beat v2.7.0
Broker: Redis
Backend: Redis