Skip to content

No Built-in Way to Identify Tasks Scheduled by Celery Beat #854

Open
@Bauerna

Description

@Bauerna

Description

Celery Beat currently does not provide any metadata in the task.request object to indicate that a task was scheduled by Beat. This makes it impossible to differentiate between a manually triggered task (.delay() or .apply_async()) and a Beat-scheduled task without adding a custom kwargs parameter.

Steps to Reproduce

Set up a simple task for your worker to handle.

# tasks/simple_tasks.py
from app.tasks.celery_app import celery


@celery.task(name="tasks.add")
def add(x: int, y: int):
    return x + y

Set up Celery Beat (your producer) with a schedule to trigger the task. Also, create a prerun.connect method so we can compare logging from the different sources.

# tasks/celery_app.py
import logging
from celery import Celery
from celery.signals import task_prerun
from celery.schedules import crontab

logger = logging.getLogger(__name__)

celery = Celery(
    "tasks",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/0",
)

celery.conf.beat_schedule = {
    "add_every_minute": {
        "task": "tasks.add",
        "schedule": crontab(minute="*"),  # Runs every minute
        "args": (2, 3),
    }
}

celery.conf.timezone = "UTC"

@task_prerun.connect
def log_scheduled_task(task_id, task, **kwargs):
    logger.info(f"Request {task.request}")

In a python console, trigger the task manually.

from app.tasks.celery_app import celery
from app.tasks.simple_tasks import add
task = add.delay(2, 3)

Finally, wait for the producer to trigger the task.

Capture the logs of both and compare.

Expected Behavior

Celery Beat should provide some metadata (e.g., a celerybeat: true header or a specific delivery_info field) to distinguish Beat-scheduled tasks from manually triggered ones.

Actual Behavior

The request object is identical in both cases:

Request object from manual triggering:

[2025-03-16 12:44:47,780: INFO/ForkPoolWorker-8] Request <Context: {'lang': 'py', 'task': 'tasks.add', 'id': 'cb3ba2c6-652e-4742-856a-ad0b532d9154', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'cb3ba2c6-652e-4742-856a-ad0b532d9154', 'parent_id': None, 'argsrepr': '(2, 3)', 'kwargsrepr': '{}', 'origin': 'XXX@XXX', 'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 'stamps': {}, 'properties': {'correlation_id': 'cb3ba2c6-652e-4742-856a-ad0b532d9154', 'reply_to': 'ba8db540-47a4-3eb9-bdd5-520017cccb2d', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': '79f94ba2-d732-40c6-87ad-64446ee9c8e9'}, 'reply_to': 'ba8db540-47a4-3eb9-bdd5-520017cccb2d', 'correlation_id': 'cb3ba2c6-652e-4742-856a-ad0b532d9154', 'hostname': 'XXX@XXX', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': False}, 'args': [2, 3], 'kwargs': {}, 'is_eager': False, 'callbacks': None, 'errbacks': None, 'chain': None, 'chord': None, 'called_directly': False, 'headers': None}>

Request object from beat triggering:

[2025-03-16 12:45:00,004: INFO/ForkPoolWorker-8] Request <Context: {'lang': 'py', 'task': 'tasks.add', 'id': 'f456e265-a921-453d-9b92-82cff94335bc', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'f456e265-a921-453d-9b92-82cff94335bc', 'parent_id': None, 'argsrepr': '[2, 3]', 'kwargsrepr': '{}', 'origin': 'XXX@XXX', 'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 'stamps': {}, 'properties': {'correlation_id': 'f456e265-a921-453d-9b92-82cff94335bc', 'reply_to': '15316467-2a05-31ec-bfe5-ed1d11a6b3d1', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': '7b77e502-0fc2-4df0-974b-3d71a230cf8d'}, 'reply_to': '15316467-2a05-31ec-bfe5-ed1d11a6b3d1', 'correlation_id': 'f456e265-a921-453d-9b92-82cff94335bc', 'hostname': 'XXX@XXX', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': False}, 'args': [2, 3], 'kwargs': {}, 'is_eager': False, 'callbacks': None, 'errbacks': None, 'chain': None, 'chord': None, 'called_directly': False, 'headers': None}>

Proposed Solution

  • Add a custom header (celerybeat: true) when Celery Beat schedules a task.
  • Modify delivery_info to indicate that the task was scheduled by Beat.

Environment

Celery v5.4.0
Django Celery Beat v2.7.0
Broker: Redis
Backend: Redis

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions