Skip to content

Commit 0249582

Browse files
authored
fix: Defensively handle null source address in MassTransit instrumentation. (#2060)
* Defensively handle null source address * Also handle sourceAddress with no underscores Added unit tests * Refactor MassTransitHelpers and tests to extensions project
1 parent 70dfcf9 commit 0249582

File tree

9 files changed

+171
-160
lines changed

9 files changed

+171
-160
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright 2020 New Relic, Inc. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
using System;
5+
using NewRelic.Agent.Extensions.Providers.Wrapper;
6+
7+
8+
namespace NewRelic.Agent.Extensions.Helpers
9+
{
10+
public class MassTransitQueueData
11+
{
12+
public string QueueName { get; set; } = "Unknown";
13+
public MessageBrokerDestinationType DestinationType { get; set; } = MessageBrokerDestinationType.Queue;
14+
}
15+
16+
public class MassTransitHelpers
17+
{
18+
public static MassTransitQueueData GetQueueData(Uri sourceAddress)
19+
{
20+
var data = new MassTransitQueueData();
21+
22+
if (sourceAddress != null)
23+
{
24+
// rabbitmq://localhost/SomeHostname_MassTransitTest_bus_iyeoyyge44oc7yijbdp5i1opfd?temporary=true
25+
var items = sourceAddress.AbsoluteUri.Split('_');
26+
if (items.Length > 1)
27+
{
28+
var queueData = items[items.Length - 1].Split('?');
29+
data.QueueName = queueData[0];
30+
if (queueData.Length == 2 && queueData[1] == "temporary=true")
31+
{
32+
data.DestinationType = MessageBrokerDestinationType.TempQueue;
33+
}
34+
}
35+
}
36+
return data;
37+
}
38+
}
39+
}

src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/MassTransit/MassTransitHelpers.cs

-47
This file was deleted.

src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/MassTransit/NewRelicFilter.cs

+23-12
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
using System.Collections.Generic;
66
using System.Threading.Tasks;
77
using MassTransit;
8+
using NewRelic.Agent.Api;
9+
using NewRelic.Agent.Extensions.Helpers;
810
using NewRelic.Agent.Extensions.Providers.Wrapper;
911
using MethodCall = NewRelic.Agent.Extensions.Providers.Wrapper.MethodCall;
1012

@@ -38,19 +40,19 @@ public async Task Send(ConsumeContext context, IPipe<ConsumeContext> next)
3840

3941
var mc = new MethodCall(_consumeMethod, context, default(string[]), true);
4042

41-
var destName = MassTransitHelpers.GetQueue(context.SourceAddress);
43+
var queueData = MassTransitHelpers.GetQueueData(context.SourceAddress);
4244

4345
var transaction = _agent.CreateTransaction(
44-
destinationType: MassTransitHelpers.GetBrokerDestinationType(context.SourceAddress),
46+
destinationType: queueData.DestinationType,
4547
brokerVendorName: MessageBrokerVendorName,
46-
destination: destName);
48+
destination: queueData.QueueName);
4749

4850
transaction.AttachToAsync();
4951
transaction.DetachFromPrimary();
5052

5153
transaction.AcceptDistributedTraceHeaders(context.Headers, GetHeaderValue, TransportType.AMQP);
5254

53-
var segment = transaction.StartMessageBrokerSegment(mc, MessageBrokerDestinationType.Queue, MessageBrokerAction.Consume, MessageBrokerVendorName, destName);
55+
var segment = transaction.StartMessageBrokerSegment(mc, MessageBrokerDestinationType.Queue, MessageBrokerAction.Consume, MessageBrokerVendorName, queueData.QueueName);
5456

5557
await next.Send(context);
5658
segment.End();
@@ -84,12 +86,11 @@ public async Task Send(PublishContext context, IPipe<PublishContext> next)
8486

8587
var mc = new MethodCall(_publishMethod, context, default(string[]), true);
8688

87-
var destName = MassTransitHelpers.GetQueue(context.SourceAddress);
88-
var destType = MassTransitHelpers.GetBrokerDestinationType(context.SourceAddress);
89+
var queueData = MassTransitHelpers.GetQueueData(context.SourceAddress);
8990

9091
var transaction = _agent.CurrentTransaction;
91-
MassTransitHelpers.InsertDistributedTraceHeaders(context.Headers, transaction);
92-
var segment = transaction.StartMessageBrokerSegment(mc, destType, MessageBrokerAction.Produce, MessageBrokerVendorName, destName);
92+
InsertDistributedTraceHeaders(context.Headers, transaction);
93+
var segment = transaction.StartMessageBrokerSegment(mc, queueData.DestinationType, MessageBrokerAction.Produce, MessageBrokerVendorName, queueData.QueueName);
9394

9495
await next.Send(context);
9596
segment.End();
@@ -102,15 +103,25 @@ public async Task Send(SendContext context, IPipe<SendContext> next)
102103

103104
var mc = new MethodCall(_sendMethod, context, default(string[]), true);
104105

105-
var destName = MassTransitHelpers.GetQueue(context.SourceAddress);
106-
var destType = MassTransitHelpers.GetBrokerDestinationType(context.SourceAddress);
106+
var queueData = MassTransitHelpers.GetQueueData(context.SourceAddress);
107107

108108
var transaction = _agent.CurrentTransaction;
109-
MassTransitHelpers.InsertDistributedTraceHeaders(context.Headers, transaction);
110-
var segment = transaction.StartMessageBrokerSegment(mc, destType, MessageBrokerAction.Produce, MessageBrokerVendorName, destName);
109+
InsertDistributedTraceHeaders(context.Headers, transaction);
110+
var segment = transaction.StartMessageBrokerSegment(mc, queueData.DestinationType, MessageBrokerAction.Produce, MessageBrokerVendorName, queueData.QueueName);
111111

112112
await next.Send(context);
113113
segment.End();
114114
}
115+
116+
public static void InsertDistributedTraceHeaders(SendHeaders headers, ITransaction transaction)
117+
{
118+
var setHeaders = new Action<SendHeaders, string, string>((carrier, key, value) =>
119+
{
120+
carrier.Set(key, value);
121+
});
122+
123+
transaction.InsertDistributedTraceHeaders(headers, setHeaders);
124+
}
125+
115126
}
116127
}

src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/MassTransitLegacy/MassTransitHelpers.cs

-47
This file was deleted.

src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/MassTransitLegacy/NewRelicFilter.cs

+23-12
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
using System.Threading.Tasks;
77
using GreenPipes;
88
using MassTransit;
9+
using NewRelic.Agent.Api;
10+
using NewRelic.Agent.Extensions.Helpers;
911
using NewRelic.Agent.Extensions.Providers.Wrapper;
1012
using MethodCall = NewRelic.Agent.Extensions.Providers.Wrapper.MethodCall;
1113

@@ -39,19 +41,19 @@ public async Task Send(ConsumeContext context, IPipe<ConsumeContext> next)
3941

4042
var mc = new MethodCall(_consumeMethod, context, default(string[]), true);
4143

42-
var destName = MassTransitHelpers.GetQueue(context.SourceAddress);
44+
var queueData = MassTransitHelpers.GetQueueData(context.SourceAddress);
4345

4446
var transaction = _agent.CreateTransaction(
45-
destinationType: MassTransitHelpers.GetBrokerDestinationType(context.SourceAddress),
47+
destinationType: queueData.DestinationType,
4648
brokerVendorName: MessageBrokerVendorName,
47-
destination: destName);
49+
destination: queueData.QueueName);
4850

4951
transaction.AttachToAsync();
5052
transaction.DetachFromPrimary();
5153

5254
transaction.AcceptDistributedTraceHeaders(context.Headers, GetHeaderValue, TransportType.AMQP);
5355

54-
var segment = transaction.StartMessageBrokerSegment(mc, MessageBrokerDestinationType.Queue, MessageBrokerAction.Consume, MessageBrokerVendorName, destName);
56+
var segment = transaction.StartMessageBrokerSegment(mc, MessageBrokerDestinationType.Queue, MessageBrokerAction.Consume, MessageBrokerVendorName, queueData.QueueName);
5557

5658
await next.Send(context);
5759
segment.End();
@@ -85,12 +87,11 @@ public async Task Send(PublishContext context, IPipe<PublishContext> next)
8587

8688
var mc = new MethodCall(_publishMethod, context, default(string[]), true);
8789

88-
var destName = MassTransitHelpers.GetQueue(context.SourceAddress);
89-
var destType = MassTransitHelpers.GetBrokerDestinationType(context.SourceAddress);
90+
var queueData = MassTransitHelpers.GetQueueData(context.SourceAddress);
9091

9192
var transaction = _agent.CurrentTransaction;
92-
MassTransitHelpers.InsertDistributedTraceHeaders(context.Headers, transaction);
93-
var segment = transaction.StartMessageBrokerSegment(mc, destType, MessageBrokerAction.Produce, MessageBrokerVendorName, destName);
93+
InsertDistributedTraceHeaders(context.Headers, transaction);
94+
var segment = transaction.StartMessageBrokerSegment(mc, queueData.DestinationType, MessageBrokerAction.Produce, MessageBrokerVendorName, queueData.QueueName);
9495

9596
await next.Send(context);
9697
segment.End();
@@ -103,15 +104,25 @@ public async Task Send(SendContext context, IPipe<SendContext> next)
103104

104105
var mc = new MethodCall(_sendMethod, context, default(string[]), true);
105106

106-
var destName = MassTransitHelpers.GetQueue(context.SourceAddress);
107-
var destType = MassTransitHelpers.GetBrokerDestinationType(context.SourceAddress);
107+
var queueData = MassTransitHelpers.GetQueueData(context.SourceAddress);
108108

109109
var transaction = _agent.CurrentTransaction;
110-
MassTransitHelpers.InsertDistributedTraceHeaders(context.Headers, transaction);
111-
var segment = transaction.StartMessageBrokerSegment(mc, destType, MessageBrokerAction.Produce, MessageBrokerVendorName, destName);
110+
InsertDistributedTraceHeaders(context.Headers, transaction);
111+
var segment = transaction.StartMessageBrokerSegment(mc, queueData.DestinationType, MessageBrokerAction.Produce, MessageBrokerVendorName, queueData.QueueName);
112112

113113
await next.Send(context);
114114
segment.End();
115115
}
116+
117+
public static void InsertDistributedTraceHeaders(SendHeaders headers, ITransaction transaction)
118+
{
119+
var setHeaders = new Action<SendHeaders, string, string>((carrier, key, value) =>
120+
{
121+
carrier.Set(key, value);
122+
});
123+
124+
transaction.InsertDistributedTraceHeaders(headers, setHeaders);
125+
}
126+
116127
}
117128
}

tests/Agent/IntegrationTests/IntegrationTests/MassTransit/MassTransitTests.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2020 New Relic, Inc. All rights reserved.
1+
// Copyright 2020 New Relic, Inc. All rights reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

44
using System.Collections.Generic;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Copyright 2020 New Relic, Inc. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
using System;
5+
using NUnit.Framework;
6+
using NewRelic.Agent.Extensions.Providers.Wrapper;
7+
using NewRelic.Agent.Extensions.Helpers;
8+
9+
namespace Agent.Extensions.Tests.Helpers
10+
{
11+
[TestFixture]
12+
public class MassTransitHelperTests
13+
{
14+
15+
[Test]
16+
[TestCase("rabbitmq://localhost/SomeHostname_MassTransitTest_bus_myqueuename?temporary=true", "myqueuename")]
17+
[TestCase("rabbitmq://localhost/SomeHostname_MassTransitTest_bus_myqueuename", "myqueuename")]
18+
[TestCase("rabbitmq://localhost/bogus", "Unknown")]
19+
[TestCase(null, "Unknown")]
20+
public void GetQueueName(Uri uri, string expectedQueueName)
21+
{
22+
// Act
23+
var queueName = MassTransitHelpers.GetQueueData(uri).QueueName;
24+
25+
// Assert
26+
Assert.AreEqual(expectedQueueName, queueName, "Did not get expected queue name");
27+
}
28+
29+
[Test]
30+
[TestCase("rabbitmq://localhost/NRHXPSQL3_MassTransitTest_bus_myqueuename?temporary=true", MessageBrokerDestinationType.TempQueue)]
31+
[TestCase("rabbitmq://localhost/NRHXPSQL3_MassTransitTest_bus_myqueuename?temporary=false", MessageBrokerDestinationType.Queue)]
32+
[TestCase("rabbitmq://localhost/NRHXPSQL3_MassTransitTest_bus_myqueuename", MessageBrokerDestinationType.Queue)]
33+
[TestCase(null, MessageBrokerDestinationType.Queue)]
34+
public void GetBrokerDestinationType(Uri uri, MessageBrokerDestinationType expectedDestType)
35+
{
36+
// Act
37+
var destType = MassTransitHelpers.GetQueueData(uri).DestinationType;
38+
39+
// Assert
40+
Assert.AreEqual(expectedDestType, destType, "Did not get expected queue type");
41+
}
42+
}
43+
44+
}

0 commit comments

Comments
 (0)