Closed
Description
I am integrating my faststream app with senty. For realizing end-to-end tracing between multiple faststream apps i need receive the trace_id from headers of certain message in kafka. In Perfect working with batch=False
from faststream.kafka import KafkaBroker, KafkaMessage
from mymodels import Event
broker = KafkaBroker('127.0.0.1:9092')
@broker.subscriber('topic', group_id='my_group', batch=False)
async def unify_format_grafana(message: Event, kafka_message: KafkaMessage):
sentry_trace_id = kafka_message.headers.get('sentry_trace_id')
baggage = kafka_message.headers.get('baggage')
... do something and push message into next topic ...
await broker.publish(
updated_message,
'next_topic',
key=some_key,
headers={
'sentry-trace':sentry_sdk.get_traceparent(),
'baggage': sentry_sdk.get_baggage()
}
)
but i didn't find awesome solution how i can get access to concrete message headers in batch=True mode