Skip to content

Additional metrics exported from Celery workers #3463

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

### Added

- `opentelemetry-instrumentation-celery` Add three additional worker metrics to count active and prefetched tasks, as well as prefetch duration
([#3463](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3463))

### Fixed

- `opentelemetry-instrumentation-celery` Fix a memory leak where a reference to a task identifier is kept indefinitely
([#3463](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3463))

### Breaking changes

- `opentelemetry-instrumentation-celery` Rename `flower.task.runtime.seconds` metric to `messaging.process.duration` according to semconv
([#3463](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3463))

## Version 1.34.0/0.55b0 (2025-06-04)

### Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def add(x, y):
"""

import logging
from timeit import default_timer
import time
from typing import Collection, Iterable

from billiard import VERSION
Expand All @@ -76,6 +76,7 @@ def add(x, y):
from opentelemetry.metrics import get_meter
from opentelemetry.propagate import extract, inject
from opentelemetry.propagators.textmap import Getter
from opentelemetry.semconv._incubating.metrics import messaging_metrics
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace.status import Status, StatusCode

Expand All @@ -96,6 +97,12 @@ def add(x, y):
_TASK_REVOKED_TERMINATED_SIGNAL_KEY = "celery.terminated.signal"
_TASK_NAME_KEY = "celery.task_name"

# Metric names
_TASK_COUNT_ACTIVE = "messaging.client.active_tasks"
_TASK_COUNT_PREFETCHED = "messaging.client.prefetched_tasks"
_TASK_PROCESSING_TIME = messaging_metrics.MESSAGING_PROCESS_DURATION
_TASK_PREFETCH_TIME = "messaging.prefetch.duration"


class CeleryGetter(Getter):
def get(self, carrier, key):
Expand All @@ -113,10 +120,36 @@ def keys(self, carrier):
celery_getter = CeleryGetter()


class CeleryInstrumentor(BaseInstrumentor):
metrics = None
task_id_to_start_time = {}
class TaskDurationTracker:
def __init__(self, metrics):
self.metrics = metrics
self.tracker = {}

def record_start(self, key, step):
self.tracker.setdefault(key, {})[step] = time.perf_counter()

def record_finish(self, key, metric_name, attributes):
try:
time_elapsed = self._time_elapsed(key, metric_name)
self.metrics[metric_name].record(
max(0, time_elapsed), attributes=attributes
)
except KeyError:
logger.warning("Failed to record %s for task %s", metric_name, key)

def _time_elapsed(self, key, step):
end_time = time.perf_counter()
try:
start_time = self.tracker.get(key, {}).pop(step)
time_elapsed = end_time - start_time
return time_elapsed
finally:
# Cleanup operation
if key in self.tracker and not self.tracker.get(key):
self.tracker.pop(key)


class CeleryInstrumentor(BaseInstrumentor):
def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

Expand All @@ -139,8 +172,10 @@ def _instrument(self, **kwargs):
schema_url="https://opentelemetry.io/schemas/1.11.0",
)

self.create_celery_metrics(meter)
self.metrics = _create_celery_worker_metrics(meter)
self.time_tracker = TaskDurationTracker(self.metrics)

signals.task_received.connect(self._trace_received, weak=False)
signals.task_prerun.connect(self._trace_prerun, weak=False)
signals.task_postrun.connect(self._trace_postrun, weak=False)
signals.before_task_publish.connect(
Expand All @@ -153,27 +188,52 @@ def _instrument(self, **kwargs):
signals.task_retry.connect(self._trace_retry, weak=False)

def _uninstrument(self, **kwargs):
signals.task_received.disconnect(self._trace_received)
signals.task_prerun.disconnect(self._trace_prerun)
signals.task_postrun.disconnect(self._trace_postrun)
signals.before_task_publish.disconnect(self._trace_before_publish)
signals.after_task_publish.disconnect(self._trace_after_publish)
signals.task_failure.disconnect(self._trace_failure)
signals.task_retry.disconnect(self._trace_retry)

def _trace_received(self, *args, **kwargs):
"""
On receive signal, task is prefetched and prefetch timer starts
"""

request = utils.retrieve_request(kwargs)

metrics_attributes = utils.get_metrics_attributes_from_request(request)
self.metrics[_TASK_COUNT_PREFETCHED].add(
1, attributes=metrics_attributes
)
self.time_tracker.record_start(request.task_id, _TASK_PREFETCH_TIME)

def _trace_prerun(self, *args, **kwargs):
"""
On prerun signal, task is no longer prefetched, and execution timer
starts along with the task span
"""

task = utils.retrieve_task(kwargs)
task_id = utils.retrieve_task_id(kwargs)

if task is None or task_id is None:
return

self.update_task_duration_time(task_id)
metrics_attributes = utils.get_metrics_attributes_from_task(task)
self.metrics[_TASK_COUNT_PREFETCHED].add(
-1, attributes=metrics_attributes
)
self.time_tracker.record_finish(
task_id, _TASK_PREFETCH_TIME, metrics_attributes
)
self.time_tracker.record_start(task_id, _TASK_PROCESSING_TIME)

request = task.request
tracectx = extract(request, getter=celery_getter) or None
token = context_api.attach(tracectx) if tracectx is not None else None

logger.debug("prerun signal start task_id=%s", task_id)

operation_name = f"{_TASK_RUN}/{task.name}"
span = self._tracer.start_span(
operation_name, context=tracectx, kind=trace.SpanKind.CONSUMER
Expand All @@ -183,14 +243,24 @@ def _trace_prerun(self, *args, **kwargs):
activation.__enter__() # pylint: disable=E1101
utils.attach_context(task, task_id, span, activation, token)

self.metrics[_TASK_COUNT_ACTIVE].add(1, attributes=metrics_attributes)

def _trace_postrun(self, *args, **kwargs):
"""
On postrun signal, task is no longer being executed
"""

task = utils.retrieve_task(kwargs)
task_id = utils.retrieve_task_id(kwargs)

if task is None or task_id is None:
return

logger.debug("postrun signal task_id=%s", task_id)
metrics_attributes = utils.get_metrics_attributes_from_task(task)
self.metrics[_TASK_COUNT_ACTIVE].add(-1, attributes=metrics_attributes)
self.time_tracker.record_finish(
task_id, _TASK_PROCESSING_TIME, metrics_attributes
)

# retrieve and finish the Span
ctx = utils.retrieve_context(task, task_id)
Expand All @@ -210,10 +280,8 @@ def _trace_postrun(self, *args, **kwargs):

activation.__exit__(None, None, None)
utils.detach_context(task, task_id)
self.update_task_duration_time(task_id)
labels = {"task": task.name, "worker": task.request.hostname}
self._record_histograms(task_id, labels)
# if the process sending the task is not instrumented

# If the process sending the task is not instrumented,
# there's no incoming context and no token to detach
if token is not None:
context_api.detach(token)
Expand Down Expand Up @@ -345,29 +413,29 @@ def _trace_retry(*args, **kwargs):
# something that isn't an `Exception`
span.set_attribute(_TASK_RETRY_REASON_KEY, str(reason))

def update_task_duration_time(self, task_id):
cur_time = default_timer()
task_duration_time_until_now = (
cur_time - self.task_id_to_start_time[task_id]
if task_id in self.task_id_to_start_time
else cur_time
)
self.task_id_to_start_time[task_id] = task_duration_time_until_now

def _record_histograms(self, task_id, metric_attributes):
if task_id is None:
return

self.metrics["flower.task.runtime.seconds"].record(
self.task_id_to_start_time.get(task_id),
attributes=metric_attributes,
)

def create_celery_metrics(self, meter) -> None:
self.metrics = {
"flower.task.runtime.seconds": meter.create_histogram(
name="flower.task.runtime.seconds",
unit="seconds",
description="The time it took to run the task.",
)
}
def _create_celery_worker_metrics(meter) -> None:
metrics = {
_TASK_COUNT_ACTIVE: meter.create_up_down_counter(
name=_TASK_COUNT_ACTIVE,
unit="{message}",
description="Number of tasks currently being executed by the worker",
),
_TASK_COUNT_PREFETCHED: meter.create_up_down_counter(
name=_TASK_COUNT_PREFETCHED,
unit="{message}",
description="Number of tasks prefetched by the worker",
),
_TASK_PREFETCH_TIME: meter.create_histogram(
name=_TASK_PREFETCH_TIME,
unit="s",
description="The time the task spent in prefetch mode",
),
_TASK_PROCESSING_TIME: meter.create_histogram(
name=_TASK_PROCESSING_TIME,
unit="s",
description="The time it took to run the task.",
),
}

return metrics
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
from celery import registry # pylint: disable=no-name-in-module
from celery.app.task import Task

from opentelemetry.semconv._incubating.attributes.messaging_attributes import (
MESSAGING_CLIENT_ID,
MESSAGING_OPERATION_NAME,
)
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import Span

Expand Down Expand Up @@ -217,6 +221,14 @@ def retrieve_task_id(kwargs):
return task_id


def retrieve_request(kwargs):
request = kwargs.get("request")
if request is None:
logger.debug("Unable to retrieve the request from signal arguments")

return request


def retrieve_task_id_from_request(kwargs):
# retry signal does not include task_id as argument so use request argument
request = kwargs.get("request")
Expand Down Expand Up @@ -250,3 +262,17 @@ def retrieve_reason(kwargs):
if not reason:
logger.debug("Unable to retrieve the retry reason")
return reason


def get_metrics_attributes_from_request(request):
return {
MESSAGING_OPERATION_NAME: request.task.name,
MESSAGING_CLIENT_ID: request.hostname,
}


def get_metrics_attributes_from_task(task):
return {
MESSAGING_OPERATION_NAME: task.name,
MESSAGING_CLIENT_ID: task.request.hostname,
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import time

from celery import Celery

from opentelemetry import baggage


class Config:
result_backend = "rpc"
broker_backend = "memory"
result_backend = "rpc://"
without_gossip = True
without_heartbeat = True
without_mingle = True


app = Celery(broker="memory:///")
Expand All @@ -31,8 +35,14 @@ class CustomError(Exception):


@app.task
def task_add(num_a, num_b):
return num_a + num_b
def task_add(x=1, y=2):
return x + y


@app.task
def task_sleep(sleep_time):
time.sleep(sleep_time)
return 1


@app.task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,3 @@ def test_duplicate_instrumentaion(self):
CeleryInstrumentor().uninstrument()
self.assertIsNotNone(first.metrics)
self.assertIsNotNone(second.metrics)
self.assertEqual(first.task_id_to_start_time, {})
self.assertEqual(second.task_id_to_start_time, {})
Loading