Open
Description
Given the standard code of subscribing to pubsub with otel support
def foo(self):
with tracer.start_as_current_span("foo") as span:
logging.info("foo")
span.set_attribute("foo.attribute", "foo")
time.sleep(0.01)
subscriber = SubscriberClient(
subscriber_options=SubscriberOptions(enable_open_telemetry_tracing=True)
)
subscription_path = subscriber.subscription_path(self._project_id, self._subscription_id)
self._streaming_pull_future = self._subscriber.subscribe(
subscription_path,
callback=foo
)
I would like to see the foo span as child of the process span but i don't see it.
I researched the issue and found this.
Looking at this line:
def _wrap_callback_errors(
callback: Callable[["google.cloud.pubsub_v1.subscriber.message.Message"], Any],
on_callback_error: Callable[[BaseException], Any],
message: "google.cloud.pubsub_v1.subscriber.message.Message",
):
"""Wraps a user callback so that if an exception occurs the message is
nacked.
Args:
callback: The user callback.
message: The Pub/Sub message.
"""
try:
if message.opentelemetry_data:
message.opentelemetry_data.end_subscribe_concurrency_control_span()
message.opentelemetry_data.start_process_span()
callback(message)
except BaseException as exc:
# Note: the likelihood of this failing is extremely low. This just adds
# a message to a queue, so if this doesn't work the world is in an
# unrecoverable state and this thread should just bail.
_LOGGER.exception(
"Top-level exception occurred in callback while processing a message"
)
message.nack()
on_callback_error(exc)
the callback function is not in the scope of start_process_span and therefore does not get the context from this function. Although the start_process_span uses end_on_exit=False, this does not affect the context which is outside the with context but just the ability to close the span.
def start_process_span(self) -> None:
assert self._subscribe_span is not None
tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
publish_create_span_link: Optional[trace.Link] = None
if self._publisher_create_span_context:
publish_create_span: trace.Span = trace.get_current_span(
self._publisher_create_span_context
)
span_context: Optional[
trace.SpanContext
] = publish_create_span.get_span_context()
publish_create_span_link = (
trace.Link(span_context) if span_context else None
)
with tracer.start_as_current_span(
name=f"{self._subscription_id} process",
attributes={
"messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM,
},
kind=trace.SpanKind.INTERNAL,
context=set_span_in_context(self._subscribe_span),
links=[publish_create_span_link] if publish_create_span_link else None,
end_on_exit=False,
) as process_span:
self._process_span = process_span
This means that it is not associated to the span tree of this trace.
I would think that it is desired to see all the spans of the context as child spans of process. Unless you intended it otherwise but what was the reason to do so?
Thanks in advance.