Skip to content

Commit 3f55012

Browse files
committed
Emit pipeline and message operation events for diagnostics purposes
1 parent 27f2f00 commit 3f55012

File tree

8 files changed

+333
-5
lines changed

8 files changed

+333
-5
lines changed

src/NServiceBus.Core/NServiceBus.Core.csproj

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
<RootNamespace>NServiceBus</RootNamespace>
66
<SignAssembly>true</SignAssembly>
77
<AssemblyOriginatorKeyFile>..\NServiceBus.snk</AssemblyOriginatorKeyFile>
8+
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
89
</PropertyGroup>
910

1011
<PropertyGroup Condition="'$(TargetFramework)' == 'net452'">

src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs

+8
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,12 @@ public async Task Invoke(MessageContext messageContext)
2525
var rootContext = pipelineComponent.CreateRootContext(childBuilder, messageContext.Extensions);
2626
var transportReceiveContext = new TransportReceiveContext(message, messageContext.TransportTransaction, messageContext.ReceiveCancellationTokenSource, rootContext);
2727

28+
var isFaulted = false;
29+
var pipelineEventSource = PipelineEventSource.Log;
2830
try
2931
{
32+
pipelineEventSource.MainStart(messageContext.MessageId);
33+
3034
await transportReceiveContext.InvokePipeline<ITransportReceiveContext>().ConfigureAwait(false);
3135
}
3236
catch (Exception e)
@@ -39,6 +43,10 @@ public async Task Invoke(MessageContext messageContext)
3943

4044
throw;
4145
}
46+
finally
47+
{
48+
pipelineEventSource.MainStop(messageContext.MessageId, isFaulted);
49+
}
4250

4351
await transportReceiveContext.RaiseNotification(new ReceivePipelineCompleted(message, pipelineStartedAt, DateTime.UtcNow)).ConfigureAwait(false);
4452
}

src/NServiceBus.Core/Pipeline/Pipeline.cs

+22-1
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,32 @@ public Pipeline(IBuilder builder, PipelineModifications pipelineModifications)
2929

3030
public Task Invoke(TContext context)
3131
{
32-
return pipeline(context);
32+
return !PipelineEventSource.Log.IsEnabled() ? pipeline.Invoke(context) : InvokePipelineAndEmitEvents(context);
33+
}
34+
35+
async Task InvokePipelineAndEmitEvents(TContext context)
36+
{
37+
var isFaulted = false;
38+
var pipelineEventSource = PipelineEventSource.Log;
39+
pipelineEventSource.InvokeStart(pipelineName);
40+
try
41+
{
42+
await pipeline.Invoke(context).ConfigureAwait(false);
43+
}
44+
catch
45+
{
46+
isFaulted = true;
47+
throw;
48+
}
49+
finally
50+
{
51+
pipelineEventSource.InvokeStop(pipelineName, isFaulted);
52+
}
3353
}
3454

3555
// ReSharper disable once PrivateFieldCanBeConvertedToLocalVariable
3656
IBehavior[] behaviors;
3757
Func<TContext, Task> pipeline;
58+
string pipelineName = typeof(TContext).FullName;
3859
}
3960
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
namespace NServiceBus
2+
{
3+
using System;
4+
using System.Diagnostics.CodeAnalysis;
5+
using System.Diagnostics.Tracing;
6+
using System.Runtime.CompilerServices;
7+
8+
/// <summary>
9+
/// Do not rename the methods or parameters on this class nor change the event declarations because it directly affects
10+
/// the events that are generated.
11+
/// </summary>
12+
[EventSource(Name = EventSourceName)]
13+
[SuppressMessage("ReSharper", "ParameterHidesMember")]
14+
sealed class PipelineEventSource : EventSource
15+
{
16+
PipelineEventSource()
17+
{
18+
}
19+
20+
[MethodImpl(MethodImplOptions.NoInlining)]
21+
[Event(MainPipelineStartEventId, Message = "Main Pipeline for MessageId '{0}' started.", Level = EventLevel.Informational)]
22+
public void MainStart(string MessageId) // used within already existing state machine. Enabled check done as part of method implementation
23+
{
24+
if (IsEnabled())
25+
{
26+
WriteEvent(MainPipelineStartEventId, MessageId);
27+
}
28+
}
29+
30+
[MethodImpl(MethodImplOptions.NoInlining)]
31+
[Event(MainPipelineStopEventId, Message = "Main Pipeline for MessageId '{0}' stopped.", Level = EventLevel.Informational)]
32+
public unsafe void MainStop(string MessageId, bool IsFaulted) // used within already existing state machine. Enabled check done as part of method implementation
33+
{
34+
if (IsEnabled())
35+
{
36+
fixed(char* messageIdPtr = MessageId)
37+
{
38+
var eventPayload = stackalloc EventData[2];
39+
40+
eventPayload[0].Size = (MessageId.Length + 1) * 2;
41+
eventPayload[0].DataPointer = (IntPtr)messageIdPtr;
42+
43+
eventPayload[1].Size = sizeof(bool);
44+
eventPayload[1].DataPointer = (IntPtr)(&IsFaulted);
45+
WriteEventCore(MainPipelineStopEventId, 2, eventPayload);
46+
}
47+
}
48+
}
49+
50+
[MethodImpl(MethodImplOptions.NoInlining)]
51+
[Event(SatellitePipelineStartEventId, Message = "Satellite Pipeline '{0}' for MessageId '{1}' started.", Level = EventLevel.Informational)]
52+
public void SatelliteStart(string Name, string MessageId) // used on on hot path where async state machine was optimized away. Enable check done as part of caller
53+
{
54+
WriteEvent(SatellitePipelineStartEventId, Name, MessageId);
55+
}
56+
57+
[MethodImpl(MethodImplOptions.NoInlining)]
58+
[Event(SatellitePipelineStopEventId, Message = "Satellite Pipeline '{0}' for MessageId '{1}' stopped.", Level = EventLevel.Informational)]
59+
public unsafe void SatelliteStop(string Name, string MessageId, bool IsFaulted) // used on on hot path where async state machine was optimized away. Enable check done as part of caller
60+
{
61+
fixed(char* namePtr = Name)
62+
fixed(char* messageIdPtr = MessageId)
63+
{
64+
var eventPayload = stackalloc EventData[3];
65+
66+
eventPayload[0].Size = (Name.Length + 1) * 2;
67+
eventPayload[0].DataPointer = (IntPtr)namePtr;
68+
69+
eventPayload[1].Size = (MessageId.Length + 1) * 2;
70+
eventPayload[1].DataPointer = (IntPtr)messageIdPtr;
71+
72+
eventPayload[2].Size = sizeof(bool);
73+
eventPayload[2].DataPointer = (IntPtr)(&IsFaulted);
74+
WriteEventCore(SatellitePipelineStopEventId, 3, eventPayload);
75+
}
76+
}
77+
78+
79+
[MethodImpl(MethodImplOptions.NoInlining)]
80+
[Event(PipelineStartEventId, Message = "Pipeline '{0}' started.'", Level = EventLevel.Verbose
81+
#if NETSTANDARD
82+
, ActivityOptions = EventActivityOptions.Recursive // Pipelines are started within pipeline, to avoid auto-stop we need recursive on the platform it is available
83+
#endif
84+
)]
85+
public void InvokeStart(string Name) // used on on hot path where async state machine was optimized away. Enable check done as part of caller
86+
{
87+
WriteEvent(PipelineStartEventId, Name);
88+
}
89+
90+
91+
[MethodImpl(MethodImplOptions.NoInlining)]
92+
[Event(PipelineStopEventId, Message = "Pipeline '{0}' stopped.'", Level = EventLevel.Verbose)]
93+
public unsafe void InvokeStop(string Name, bool IsFaulted) // used on on hot path where async state machine was optimized away. Enable check done as part of caller
94+
{
95+
fixed(char* namePtr = Name)
96+
{
97+
var eventPayload = stackalloc EventData[2];
98+
99+
eventPayload[0].Size = (Name.Length + 1) * 2;
100+
eventPayload[0].DataPointer = (IntPtr)namePtr;
101+
102+
eventPayload[1].Size = sizeof(bool);
103+
eventPayload[1].DataPointer = (IntPtr)(&IsFaulted);
104+
WriteEventCore(PipelineStopEventId, 2, eventPayload);
105+
}
106+
}
107+
108+
const string EventSourceName = "NServiceBus.Pipeline";
109+
const int MainPipelineStartEventId = 1;
110+
const int MainPipelineStopEventId = 2;
111+
const int PipelineStartEventId = 3;
112+
const int PipelineStopEventId = 4;
113+
const int SatellitePipelineStartEventId = 5;
114+
const int SatellitePipelineStopEventId = 6;
115+
116+
internal static readonly PipelineEventSource Log = new PipelineEventSource();
117+
}
118+
}

src/NServiceBus.Core/Pipeline/SatellitePipelineExecutor.cs

+22-1
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,30 @@ public Task Invoke(MessageContext messageContext)
1616
{
1717
messageContext.Extensions.Set(messageContext.TransportTransaction);
1818

19-
return satelliteDefinition.OnMessage(builder, messageContext);
19+
return !PipelineEventSource.Log.IsEnabled() ? satelliteDefinition.OnMessage(builder, messageContext) : InvokePipelineAndEmitEvents(messageContext);
2020
}
2121

22+
async Task InvokePipelineAndEmitEvents(MessageContext messageContext)
23+
{
24+
var isFaulted = false;
25+
var pipelineEventSource = PipelineEventSource.Log;
26+
try
27+
{
28+
pipelineEventSource.SatelliteStart(satelliteDefinition.Name, messageContext.MessageId);
29+
await satelliteDefinition.OnMessage(builder, messageContext).ConfigureAwait(false);
30+
}
31+
catch
32+
{
33+
isFaulted = true;
34+
throw;
35+
}
36+
finally
37+
{
38+
pipelineEventSource.SatelliteStop(satelliteDefinition.Name, messageContext.MessageId, isFaulted);
39+
}
40+
}
41+
42+
2243
SatelliteDefinition satelliteDefinition;
2344
IBuilder builder;
2445
}

src/NServiceBus.Core/Unicast/MessageOperations.cs

+60-3
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,26 @@ static Task Publish(IBehaviorContext context, Type messageType, object message,
3939
options.Context,
4040
context);
4141

42-
return publishContext.InvokePipeline<IOutgoingPublishContext>();
42+
return !MessageOperationsEventSource.Log.IsEnabled() ? publishContext.InvokePipeline<IOutgoingPublishContext>() : InvokePipelineAndEmitEvents(publishContext);
43+
}
44+
45+
static async Task InvokePipelineAndEmitEvents(IOutgoingPublishContext outgoingContext)
46+
{
47+
var isFaulted = false;
48+
var messageOperationsEventSource = MessageOperationsEventSource.Log;
49+
try
50+
{
51+
messageOperationsEventSource.PublishStart(outgoingContext.MessageId);
52+
await outgoingContext.InvokePipeline().ConfigureAwait(false);
53+
}
54+
catch
55+
{
56+
isFaulted = true;
57+
}
58+
finally
59+
{
60+
messageOperationsEventSource.PublishStop(outgoingContext.MessageId, isFaulted);
61+
}
4362
}
4463

4564
public static Task Subscribe(IBehaviorContext context, Type eventType, SubscribeOptions options)
@@ -99,7 +118,26 @@ static Task SendMessage(this IBehaviorContext context, Type messageType, object
99118
outgoingContext.AddDeliveryConstraint(options.DelayedDeliveryConstraint);
100119
}
101120

102-
return outgoingContext.InvokePipeline<IOutgoingSendContext>();
121+
return !MessageOperationsEventSource.Log.IsEnabled() ? outgoingContext.InvokePipeline<IOutgoingSendContext>() : InvokePipelineAndEmitEvents(outgoingContext);
122+
}
123+
124+
static async Task InvokePipelineAndEmitEvents(IOutgoingSendContext outgoingContext)
125+
{
126+
var isFaulted = false;
127+
var messageOperationsEventSource = MessageOperationsEventSource.Log;
128+
try
129+
{
130+
messageOperationsEventSource.SendStart(outgoingContext.MessageId);
131+
await outgoingContext.InvokePipeline().ConfigureAwait(false);
132+
}
133+
catch
134+
{
135+
isFaulted = true;
136+
}
137+
finally
138+
{
139+
messageOperationsEventSource.SendStop(outgoingContext.MessageId, isFaulted);
140+
}
103141
}
104142

105143
public static Task Reply(IBehaviorContext context, object message, ReplyOptions options)
@@ -132,7 +170,26 @@ static Task ReplyMessage(this IBehaviorContext context, Type messageType, object
132170
options.Context,
133171
context);
134172

135-
return outgoingContext.InvokePipeline<IOutgoingReplyContext>();
173+
return !MessageOperationsEventSource.Log.IsEnabled() ? outgoingContext.InvokePipeline<IOutgoingReplyContext>() : InvokePipelineAndEmitEvents(outgoingContext);
174+
}
175+
176+
static async Task InvokePipelineAndEmitEvents(IOutgoingReplyContext outgoingContext)
177+
{
178+
var isFaulted = false;
179+
var messageOperationsEventSource = MessageOperationsEventSource.Log;
180+
try
181+
{
182+
messageOperationsEventSource.ReplyStart(outgoingContext.MessageId);
183+
await outgoingContext.InvokePipeline().ConfigureAwait(false);
184+
}
185+
catch
186+
{
187+
isFaulted = true;
188+
}
189+
finally
190+
{
191+
messageOperationsEventSource.ReplyStop(outgoingContext.MessageId, isFaulted);
192+
}
136193
}
137194
}
138195
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
namespace NServiceBus
2+
{
3+
using System;
4+
using System.Diagnostics.Tracing;
5+
6+
/// <summary>
7+
/// Do not rename the methods or parameters on this class nor change the event declarations because it directly affects
8+
/// the events that are generated.
9+
/// </summary>
10+
/// <remarks>
11+
/// Operations here are used on on hot path where async state machine was optimized away. Enable check done as
12+
/// part of caller
13+
/// </remarks>
14+
[EventSource(Name = EventSourceName)]
15+
sealed class MessageOperationsEventSource : EventSource
16+
{
17+
MessageOperationsEventSource()
18+
{
19+
}
20+
21+
[Event(SendStartEventId, Message = "Sending message with MessageId '{0}' started.", Level = EventLevel.Informational)]
22+
public void SendStart(string MessageId) => WriteEvent(SendStartEventId, MessageId);
23+
24+
[Event(SendStopEventId, Message = "Sending message with MessageId '{0}' stopped.", Level = EventLevel.Informational)]
25+
public void SendStop(string MessageId, bool IsFaulted) => WriteEvent(SendStopEventId, MessageId, IsFaulted);
26+
27+
[Event(PublishStartEventId, Message = "Publishing message with MessageId '{0}' started.", Level = EventLevel.Informational)]
28+
public void PublishStart(string MessageId) => WriteEvent(PublishStartEventId, MessageId);
29+
30+
[Event(PublishStopEventId, Message = "Publishing message with MessageId '{0}' stopped.", Level = EventLevel.Informational)]
31+
public void PublishStop(string MessageId, bool IsFaulted) => WriteEvent(PublishStopEventId, MessageId, IsFaulted);
32+
33+
[Event(ReplyStartEventId, Message = "Replying message with MessageId '{0}' started.", Level = EventLevel.Informational)]
34+
public void ReplyStart(string MessageId) => WriteEvent(ReplyStartEventId, MessageId);
35+
36+
[Event(ReplyStopEventId, Message = "Replying message with MessageId '{0}' stopped.", Level = EventLevel.Informational)]
37+
public void ReplyStop(string MessageId, bool IsFaulted) => WriteEvent(ReplyStopEventId, MessageId, IsFaulted);
38+
39+
// optimized version for the common signature
40+
[NonEvent]
41+
unsafe void WriteEvent(int EventId, string MessageId, bool IsFaulted)
42+
{
43+
fixed(char* messageIdPtr = MessageId)
44+
{
45+
var eventPayload = stackalloc EventData[2];
46+
47+
eventPayload[0].Size = (MessageId.Length + 1) * 2;
48+
eventPayload[0].DataPointer = (IntPtr)messageIdPtr;
49+
50+
eventPayload[1].Size = sizeof(bool);
51+
eventPayload[1].DataPointer = (IntPtr)(&IsFaulted);
52+
WriteEventCore(EventId, 2, eventPayload);
53+
}
54+
}
55+
56+
const string EventSourceName = "NServiceBus.Messages";
57+
const int SendStartEventId = 1;
58+
const int SendStopEventId = 2;
59+
const int PublishStartEventId = 3;
60+
const int PublishStopEventId = 4;
61+
const int ReplyStartEventId = 5;
62+
const int ReplyStopEventId = 6;
63+
64+
internal static readonly MessageOperationsEventSource Log = new MessageOperationsEventSource();
65+
}
66+
}

0 commit comments

Comments
 (0)