Skip to content

Commit 4d7d392

Browse files
Make handler span surround handler pipeline stage (#7319) (#7326)
Newline Pragma Fix tests Verify that the handler span can be accessed in the handler pipeline Newline
1 parent 096fdf2 commit 4d7d392

File tree

11 files changed

+56
-53
lines changed

11 files changed

+56
-53
lines changed

src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/Traces/When_processing_message_with_multiple_handlers.cs

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
11
namespace NServiceBus.AcceptanceTests.Core.OpenTelemetry.Traces;
22

3+
using System;
34
using System.Collections.Generic;
45
using System.Diagnostics;
56
using System.Threading.Tasks;
67
using NServiceBus.AcceptanceTesting;
8+
using NServiceBus.Pipeline;
79
using NUnit.Framework;
810

911
public class When_processing_message_with_multiple_handlers : OpenTelemetryAcceptanceTest
1012
{
1113
[Test]
1214
public async Task Should_create_message_handler_spans()
1315
{
14-
var context = await Scenario.Define<Context>()
16+
await Scenario.Define<Context>()
1517
.WithEndpoint<ReceivingEndpoint>(b =>
1618
b.When(session => session.SendLocal(new SomeMessage()))
1719
)
@@ -39,6 +41,8 @@ public async Task Should_create_message_handler_spans()
3941
Assert.That(invokedHandlerActivity.ParentId, Is.EqualTo(receivePipelineActivities[0].Id));
4042
Assert.That(invokedHandlerActivity.Status, Is.EqualTo(ActivityStatusCode.Ok));
4143
});
44+
45+
Assert.That(invokedHandlerActivity.GetTagItem("custom_handler_tag"), Is.Not.Null, "Custom tag should be set");
4246
}
4347

4448
Assert.That(recordedHandlerTypes, Does.Contain(typeof(ReceivingEndpoint.HandlerOne).FullName), "invocation of handler one should be traced");
@@ -53,27 +57,28 @@ class Context : ScenarioContext
5357

5458
class ReceivingEndpoint : EndpointConfigurationBuilder
5559
{
56-
public ReceivingEndpoint() => EndpointSetup<OpenTelemetryEnabledEndpoint>();
60+
public ReceivingEndpoint() => EndpointSetup<OpenTelemetryEnabledEndpoint>(c => c.Pipeline.Register(typeof(AddTagToHandlerSpanBehavior), "Adds a custom tag to the handler span"));
5761

58-
public class HandlerOne : IHandleMessages<SomeMessage>
62+
class AddTagToHandlerSpanBehavior : Behavior<IInvokeHandlerContext>
5963
{
60-
Context testContext;
61-
62-
public HandlerOne(Context context) => testContext = context;
64+
public override async Task Invoke(IInvokeHandlerContext context, Func<Task> next)
65+
{
66+
Activity.Current?.AddTag("custom_handler_tag", "some value");
67+
await next();
68+
}
69+
}
6370

71+
public class HandlerOne(Context testContext) : IHandleMessages<SomeMessage>
72+
{
6473
public Task Handle(SomeMessage message, IMessageHandlerContext context)
6574
{
6675
testContext.FirstHandlerRun = true;
6776
return Task.CompletedTask;
6877
}
6978
}
7079

71-
public class HandlerTwo : IHandleMessages<SomeMessage>
80+
public class HandlerTwo(Context testContext) : IHandleMessages<SomeMessage>
7281
{
73-
Context testContext;
74-
75-
public HandlerTwo(Context context) => testContext = context;
76-
7782
public Task Handle(SomeMessage message, IMessageHandlerContext context)
7883
{
7984
testContext.SecondHandlerRun = true;

src/NServiceBus.Core.Tests/OpenTelemetry/ActivityFactoryTests.cs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ class StartHandlerActivity : ActivityFactoryTests
249249
public void Should_not_start_activity_when_no_parent_activity_exists()
250250
{
251251
Type handlerType = typeof(StartHandlerActivity);
252-
var activity = activityFactory.StartHandlerActivity(new MessageHandler((_, _, _) => Task.CompletedTask, handlerType), null);
252+
var activity = activityFactory.StartHandlerActivity(new MessageHandler((_, _, _) => Task.CompletedTask, handlerType));
253253

254254
Assert.That(activity, Is.Null, "should not start handler activity when no parent activity exists");
255255
}
@@ -262,7 +262,7 @@ public void Should_set_handler_type_as_tag()
262262
using var ambientActivity = new Activity("ambient activity");
263263
ambientActivity.Start();
264264

265-
var activity = activityFactory.StartHandlerActivity(new MessageHandler((_, _, _) => Task.CompletedTask, handlerType), null);
265+
var activity = activityFactory.StartHandlerActivity(new MessageHandler((_, _, _) => Task.CompletedTask, handlerType));
266266

267267
Assert.That(activity, Is.Not.Null);
268268
var tags = activity.Tags.ToImmutableDictionary();
@@ -277,12 +277,9 @@ public void Should_set_saga_id_when_saga()
277277
using var ambientActivity = new Activity("ambient activity");
278278
ambientActivity.Start();
279279

280-
var activity = activityFactory.StartHandlerActivity(new MessageHandler((_, _, _) => Task.CompletedTask, typeof(StartHandlerActivity)), sagaInstance);
280+
var activity = activityFactory.StartHandlerActivity(new MessageHandler((_, _, _) => Task.CompletedTask, typeof(StartHandlerActivity)));
281281

282282
Assert.That(activity, Is.Not.Null);
283-
var tags = activity.Tags.ToImmutableDictionary();
284-
285-
Assert.That(tags[ActivityTags.HandlerSagaId], Is.EqualTo(sagaInstance.SagaId));
286283
}
287284
}
288285
}

src/NServiceBus.Core.Tests/Pipeline/Incoming/InvokeHandlerTerminatorTest.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
[TestFixture]
1212
public class InvokeHandlerTerminatorTest
1313
{
14-
InvokeHandlerTerminator terminator = new(new NoOpActivityFactory(), new IncomingPipelineMetrics(new TestMeterFactory(), "queue", "disc"));
14+
InvokeHandlerTerminator terminator = new(new IncomingPipelineMetrics(new TestMeterFactory(), "queue", "disc"));
1515

1616
[Test]
1717
public async Task When_saga_found_and_handler_is_saga_should_invoke_handler()

src/NServiceBus.Core.Tests/Unicast/LoadHandlersConnectorTests.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public class LoadHandlersConnectorTests
1616
[Test]
1717
public void Should_throw_when_there_are_no_registered_message_handlers()
1818
{
19-
var behavior = new LoadHandlersConnector(new MessageHandlerRegistry());
19+
var behavior = new LoadHandlersConnector(new MessageHandlerRegistry(), new NoOpActivityFactory());
2020

2121
var context = new TestableIncomingLogicalMessageContext();
2222

@@ -29,7 +29,7 @@ public void Should_throw_when_there_are_no_registered_message_handlers()
2929
[Test]
3030
public void Should_throw_if_ambient_transaction_is_different_from_scope_used_by_transport()
3131
{
32-
var behavior = new LoadHandlersConnector(new MessageHandlerRegistry());
32+
var behavior = new LoadHandlersConnector(new MessageHandlerRegistry(), new NoOpActivityFactory());
3333

3434
var context = new TestableIncomingLogicalMessageContext();
3535

@@ -49,7 +49,7 @@ public void Should_throw_if_ambient_transaction_is_different_from_scope_used_by_
4949
[Test]
5050
public void Should_throw_if_ambient_transaction_suppressed_when_transport_uses_a_scope()
5151
{
52-
var behavior = new LoadHandlersConnector(new MessageHandlerRegistry());
52+
var behavior = new LoadHandlersConnector(new MessageHandlerRegistry(), new NoOpActivityFactory());
5353

5454
var context = new TestableIncomingLogicalMessageContext();
5555

@@ -77,7 +77,7 @@ public void Should_not_throw_if_ambient_scope_is_same_as_transport_scope()
7777
context.Services.AddSingleton<FakeHandler>();
7878
context.Extensions.Set<IOutboxTransaction>(new NoOpOutboxTransaction());
7979

80-
var behavior = new LoadHandlersConnector(messageHandlerRegistry);
80+
var behavior = new LoadHandlersConnector(messageHandlerRegistry, new NoOpActivityFactory());
8181

8282
using (new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
8383
{

src/NServiceBus.Core/OpenTelemetry/Tracing/ActivityFactory.cs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
using System.Diagnostics;
44
using Pipeline;
5-
using Sagas;
65
using Transport;
76

87
class ActivityFactory : IActivityFactory
@@ -84,7 +83,7 @@ public Activity StartOutgoingPipelineActivity(string activityName, string displa
8483
return activity;
8584
}
8685

87-
public Activity StartHandlerActivity(MessageHandler messageHandler, ActiveSagaInstance saga)
86+
public Activity StartHandlerActivity(MessageHandler messageHandler)
8887
{
8988
if (Activity.Current == null)
9089
{
@@ -98,11 +97,6 @@ public Activity StartHandlerActivity(MessageHandler messageHandler, ActiveSagaIn
9897
{
9998
activity.DisplayName = messageHandler.HandlerType.Name;
10099
activity.AddTag(ActivityTags.HandlerType, messageHandler.HandlerType.FullName);
101-
102-
if (saga != null)
103-
{
104-
activity.AddTag(ActivityTags.HandlerSagaId, saga.SagaId);
105-
}
106100
}
107101

108102
return activity;

src/NServiceBus.Core/OpenTelemetry/Tracing/IActivityFactory.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,11 @@
22

33
using System.Diagnostics;
44
using Pipeline;
5-
using Sagas;
65
using Transport;
76

87
interface IActivityFactory
98
{
109
Activity StartIncomingPipelineActivity(MessageContext context);
1110
Activity StartOutgoingPipelineActivity(string activityName, string displayName, IBehaviorContext outgoingContext);
12-
Activity StartHandlerActivity(MessageHandler messageHandler, ActiveSagaInstance saga);
11+
Activity StartHandlerActivity(MessageHandler messageHandler);
1312
}

src/NServiceBus.Core/OpenTelemetry/Tracing/NoOpActivityFactory.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
using System.Diagnostics;
44
using Pipeline;
5-
using Sagas;
65
using Transport;
76

87
class NoOpActivityFactory : IActivityFactory
@@ -11,5 +10,5 @@ class NoOpActivityFactory : IActivityFactory
1110

1211
public Activity StartOutgoingPipelineActivity(string activityName, string displayName, IBehaviorContext outgoingContext) => null;
1312

14-
public Activity StartHandlerActivity(MessageHandler messageHandler, ActiveSagaInstance saga) => null;
13+
public Activity StartHandlerActivity(MessageHandler messageHandler) => null;
1514
}

src/NServiceBus.Core/Pipeline/Incoming/InvokeHandlerTerminator.cs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
namespace NServiceBus;
22

33
using System;
4-
using System.Diagnostics;
54
using System.Threading.Tasks;
65
using Pipeline;
76
using Sagas;
87

9-
class InvokeHandlerTerminator(IActivityFactory activityFactory, IncomingPipelineMetrics messagingMetricsMeters) : PipelineTerminator<IInvokeHandlerContext>
8+
class InvokeHandlerTerminator(IncomingPipelineMetrics messagingMetricsMeters) : PipelineTerminator<IInvokeHandlerContext>
109
{
1110
protected override async Task Terminate(IInvokeHandlerContext context)
1211
{
@@ -15,8 +14,6 @@ protected override async Task Terminate(IInvokeHandlerContext context)
1514
return;
1615
}
1716

18-
using var activity = activityFactory.StartHandlerActivity(context.MessageHandler, saga);
19-
2017
var messageHandler = context.MessageHandler;
2118

2219
// Might as well abort before invoking the handler if we're shutting down
@@ -29,7 +26,6 @@ await messageHandler
2926
.ThrowIfNull()
3027
.ConfigureAwait(false);
3128

32-
activity?.SetStatus(ActivityStatusCode.Ok);
3329
messagingMetricsMeters.RecordSuccessfulMessageHandlerTime(context, DateTimeOffset.UtcNow - startTime);
3430
}
3531
#pragma warning disable PS0019 // Do not catch Exception without considering OperationCanceledException - enriching and rethrowing
@@ -42,10 +38,8 @@ await messageHandler
4238
ex.Data["Handler failure time"] = DateTimeOffsetHelper.ToWireFormattedString(DateTimeOffset.UtcNow);
4339
ex.Data["Handler canceled"] = context.CancellationToken.IsCancellationRequested;
4440

45-
activity?.SetErrorStatus(ex);
4641
messagingMetricsMeters.RecordFailedMessageHandlerTime(context, DateTimeOffset.UtcNow - startTime, ex);
4742
throw;
4843
}
4944
}
50-
5145
}

src/NServiceBus.Core/Pipeline/Incoming/LoadHandlersConnector.cs

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,20 @@
22

33
using System;
44
using System.Collections.Generic;
5+
using System.Diagnostics;
56
using System.Linq;
67
using System.Text;
78
using System.Threading.Tasks;
89
using System.Transactions;
910
using Logging;
1011
using Microsoft.Extensions.DependencyInjection;
11-
using NServiceBus.Transport;
12+
using Transport;
1213
using Persistence;
1314
using Pipeline;
1415
using Unicast;
1516

16-
class LoadHandlersConnector : StageConnector<IIncomingLogicalMessageContext, IInvokeHandlerContext>
17+
class LoadHandlersConnector(MessageHandlerRegistry messageHandlerRegistry, IActivityFactory activityFactory) : StageConnector<IIncomingLogicalMessageContext, IInvokeHandlerContext>
1718
{
18-
public LoadHandlersConnector(MessageHandlerRegistry messageHandlerRegistry) => this.messageHandlerRegistry = messageHandlerRegistry;
19-
2019
public override async Task Invoke(IIncomingLogicalMessageContext context, Func<IInvokeHandlerContext, Task> stage)
2120
{
2221
ValidateTransactionMode(context);
@@ -46,14 +45,31 @@ public override async Task Invoke(IIncomingLogicalMessageContext context, Func<I
4645
messageHandler.Instance = context.Builder.GetRequiredService(messageHandler.HandlerType);
4746

4847
var handlingContext = this.CreateInvokeHandlerContext(messageHandler, storageSession, context);
49-
await stage(handlingContext).ConfigureAwait(false);
48+
49+
using (var activity = activityFactory.StartHandlerActivity(messageHandler))
50+
{
51+
try
52+
{
53+
await stage(handlingContext).ConfigureAwait(false);
54+
55+
activity?.SetStatus(ActivityStatusCode.Ok);
56+
}
57+
#pragma warning disable PS0019
58+
catch (Exception ex)
59+
#pragma warning restore PS0019
60+
{
61+
activity?.SetErrorStatus(ex);
62+
throw;
63+
}
64+
}
5065

5166
if (handlingContext.HandlerInvocationAborted)
5267
{
5368
//if the chain was aborted skip the other handlers
5469
break;
5570
}
5671
}
72+
5773
context.MessageHandled = true;
5874
await storageSession.CompleteAsync(context.CancellationToken).ConfigureAwait(false);
5975
}
@@ -95,8 +111,6 @@ static void LogHandlersInvocation(IIncomingLogicalMessageContext context, List<M
95111
logger.Debug(builder.ToString());
96112
}
97113

98-
readonly MessageHandlerRegistry messageHandlerRegistry;
99-
100114
static readonly ILog logger = LogManager.GetLogger<LoadHandlersConnector>();
101115
static readonly bool isDebugIsEnabled = logger.IsDebugEnabled;
102116
static readonly string scopeInconsistencyMessage =

src/NServiceBus.Core/Receiving/ReceiveComponent.cs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,9 @@ public static ReceiveComponent Configure(
6666
return new TransportReceiveToPhysicalMessageConnector(storage, b.GetRequiredService<IncomingPipelineMetrics>());
6767
}, "Allows to abort processing the message");
6868

69-
pipelineSettings.Register("LoadHandlersConnector", b =>
70-
{
71-
return new LoadHandlersConnector(b.GetRequiredService<MessageHandlerRegistry>());
72-
}, "Gets all the handlers to invoke from the MessageHandler registry based on the message type.");
69+
pipelineSettings.Register("LoadHandlersConnector", b => new LoadHandlersConnector(b.GetRequiredService<MessageHandlerRegistry>(), hostingConfiguration.ActivityFactory), "Gets all the handlers to invoke from the MessageHandler registry based on the message type.");
7370

74-
pipelineSettings.Register("InvokeHandlers", sp => new InvokeHandlerTerminator(hostingConfiguration.ActivityFactory, sp.GetService<IncomingPipelineMetrics>()), "Calls the IHandleMessages<T>.Handle(T)");
71+
pipelineSettings.Register("InvokeHandlers", sp => new InvokeHandlerTerminator(sp.GetService<IncomingPipelineMetrics>()), "Calls the IHandleMessages<T>.Handle(T)");
7572

7673
var handlerDiagnostics = new Dictionary<string, List<string>>();
7774

src/NServiceBus.Core/Sagas/SagaPersistenceBehavior.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System;
44
using System.Collections.Generic;
55
using System.ComponentModel;
6+
using System.Diagnostics;
67
using System.Linq;
78
using System.Threading.Tasks;
89
using Logging;
@@ -111,6 +112,8 @@ public async Task Invoke(IInvokeHandlerContext context, Func<IInvokeHandlerConte
111112
sagaInstanceState.AttachExistingEntity(loadedEntity);
112113
}
113114

115+
Activity.Current?.AddTag(ActivityTags.HandlerSagaId, sagaInstanceState.SagaId);
116+
114117
await next(context).ConfigureAwait(false);
115118

116119
if (sagaInstanceState.NotFound)
@@ -265,6 +268,7 @@ static SagaFinderDefinition GetSagaFinder(SagaMetadata metadata, IInvokeHandlerC
265268
return finderDefinition;
266269
}
267270
}
271+
268272
return null;
269273
}
270274

@@ -314,4 +318,4 @@ IContainSagaData CreateNewSagaEntity(SagaMetadata metadata, IInvokeHandlerContex
314318

315319
static readonly Task<IContainSagaData> DefaultSagaDataCompletedTask = Task.FromResult(default(IContainSagaData));
316320
static readonly ILog logger = LogManager.GetLogger<SagaPersistenceBehavior>();
317-
}
321+
}

0 commit comments

Comments
 (0)