Skip to content

otel context in subscriber callback is not propagated from process span #1428

Open
@david-gang

Description

@david-gang

Given the standard code of subscribing to pubsub with otel support

def foo(self):
        with tracer.start_as_current_span("foo") as span:
            logging.info("foo")
            span.set_attribute("foo.attribute", "foo")
            time.sleep(0.01)
subscriber = SubscriberClient(
            subscriber_options=SubscriberOptions(enable_open_telemetry_tracing=True)
        )
subscription_path = subscriber.subscription_path(self._project_id, self._subscription_id)
self._streaming_pull_future = self._subscriber.subscribe(
                subscription_path,
                callback=foo
            )

I would like to see the foo span as child of the process span but i don't see it.

I researched the issue and found this.
Looking at this line:

def _wrap_callback_errors(
    callback: Callable[["google.cloud.pubsub_v1.subscriber.message.Message"], Any],
    on_callback_error: Callable[[BaseException], Any],
    message: "google.cloud.pubsub_v1.subscriber.message.Message",
):
    """Wraps a user callback so that if an exception occurs the message is
    nacked.

    Args:
        callback: The user callback.
        message: The Pub/Sub message.
    """
    try:
        if message.opentelemetry_data:
            message.opentelemetry_data.end_subscribe_concurrency_control_span()
            message.opentelemetry_data.start_process_span()
        callback(message)
    except BaseException as exc:
        # Note: the likelihood of this failing is extremely low. This just adds
        # a message to a queue, so if this doesn't work the world is in an
        # unrecoverable state and this thread should just bail.
        _LOGGER.exception(
            "Top-level exception occurred in callback while processing a message"
        )
        message.nack()
        on_callback_error(exc)

the callback function is not in the scope of start_process_span and therefore does not get the context from this function. Although the start_process_span uses end_on_exit=False, this does not affect the context which is outside the with context but just the ability to close the span.

def start_process_span(self) -> None:
        assert self._subscribe_span is not None
        tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
        publish_create_span_link: Optional[trace.Link] = None
        if self._publisher_create_span_context:
            publish_create_span: trace.Span = trace.get_current_span(
                self._publisher_create_span_context
            )
            span_context: Optional[
                trace.SpanContext
            ] = publish_create_span.get_span_context()
            publish_create_span_link = (
                trace.Link(span_context) if span_context else None
            )

        with tracer.start_as_current_span(
            name=f"{self._subscription_id} process",
            attributes={
                "messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM,
            },
            kind=trace.SpanKind.INTERNAL,
            context=set_span_in_context(self._subscribe_span),
            links=[publish_create_span_link] if publish_create_span_link else None,
            end_on_exit=False,
        ) as process_span:
            self._process_span = process_span

This means that it is not associated to the span tree of this trace.

I would think that it is desired to see all the spans of the context as child spans of process. Unless you intended it otherwise but what was the reason to do so?

Thanks in advance.

Metadata

Metadata

Assignees

Labels

api: pubsubIssues related to the googleapis/python-pubsub API.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions