Skip to content

Commit 6a85c03

Browse files
authored
fix: Fix duplicate key error when using Kafka with DT enabled (#2433)
1 parent 49c65bf commit 6a85c03

File tree

3 files changed

+17
-10
lines changed

3 files changed

+17
-10
lines changed

src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/Kafka/KafkaConsumerWrapper.cs

+8-5
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,7 @@ public AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall ins
7373
if (messageAsObject is MessageMetadata messageMetaData)
7474
{
7575
headersSize = GetHeadersSize(messageMetaData.Headers);
76-
77-
transaction.InsertDistributedTraceHeaders(messageMetaData.Headers, DistributedTraceHeadersSetter);
76+
transaction.InsertDistributedTraceHeaders(messageMetaData, DistributedTraceHeadersSetter);
7877
}
7978

8079
ReportSizeMetrics(agent, transaction, topic, headersSize, messageAsObject);
@@ -134,10 +133,14 @@ private static Func<object, object> GetKeyAccessorFunc(Type t) =>
134133
private static Func<object, object> GetValueAccessorFunc(Type t) =>
135134
VisibilityBypasser.Instance.GeneratePropertyAccessor<object>(t, "Value");
136135

137-
private static void DistributedTraceHeadersSetter(Headers carrier, string key, string value)
136+
private static void DistributedTraceHeadersSetter(MessageMetadata carrier, string key, string value)
138137
{
139-
carrier ??= new Headers();
140-
carrier.Add(key, Encoding.ASCII.GetBytes(value));
138+
carrier.Headers ??= new Headers();
139+
if (!string.IsNullOrEmpty(key))
140+
{
141+
carrier.Headers.Remove(key);
142+
carrier.Headers.Add(key, Encoding.ASCII.GetBytes(value));
143+
}
141144
}
142145

143146
private static long TryGetSize(object obj)

src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/Kafka/KafkaProducerWrapper.cs

+8-4
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,19 @@ public AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall ins
2929

3030
var segment = transaction.StartMessageBrokerSegment(instrumentedMethodCall.MethodCall, MessageBrokerDestinationType.Topic, MessageBrokerAction.Produce, BrokerVendorName, topicPartition.Topic);
3131

32-
transaction.InsertDistributedTraceHeaders(messageMetadata.Headers, DistributedTraceHeadersSetter);
32+
transaction.InsertDistributedTraceHeaders(messageMetadata, DistributedTraceHeadersSetter);
3333

3434
return instrumentedMethodCall.MethodCall.Method.MethodName == "Produce" ? Delegates.GetDelegateFor(segment) : Delegates.GetAsyncDelegateFor<Task>(agent, segment);
3535
}
3636

37-
private static void DistributedTraceHeadersSetter(Headers carrier, string key, string value)
37+
private static void DistributedTraceHeadersSetter(MessageMetadata carrier, string key, string value)
3838
{
39-
carrier ??= new Headers();
40-
carrier.Add(key, Encoding.ASCII.GetBytes(value));
39+
carrier.Headers ??= new Headers();
40+
if (!string.IsNullOrEmpty(key))
41+
{
42+
carrier.Headers.Remove(key);
43+
carrier.Headers.Add(key, Encoding.ASCII.GetBytes(value));
44+
}
4145
}
4246

4347
}

tests/Agent/IntegrationTests/ContainerIntegrationTests/Tests/KafkaTests.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public void Test()
9191
() => Assert.True(produceSpan.IntrinsicAttributes.ContainsKey("parentId")),
9292
() => Assert.NotNull(consumeTxnSpan),
9393
() => Assert.True(consumeTxnSpan.UserAttributes.ContainsKey("kafka.consume.byteCount")),
94-
() => Assert.InRange((long)consumeTxnSpan.UserAttributes["kafka.consume.byteCount"], 20, 30), // usually is 24 - 26
94+
() => Assert.InRange((long)consumeTxnSpan.UserAttributes["kafka.consume.byteCount"], 460, 470), // includes headers
9595
() => Assert.True(consumeTxnSpan.IntrinsicAttributes.ContainsKey("traceId")),
9696
() => Assert.False(consumeTxnSpan.IntrinsicAttributes.ContainsKey("parentId"))
9797
);

0 commit comments

Comments
 (0)