Skip to content

Emit pipeline and message operation events for diagnostics purposes #5465

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/NServiceBus.Core/NServiceBus.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
<RootNamespace>NServiceBus</RootNamespace>
<SignAssembly>true</SignAssembly>
<AssemblyOriginatorKeyFile>..\NServiceBus.snk</AssemblyOriginatorKeyFile>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
</PropertyGroup>

<PropertyGroup Condition="'$(TargetFramework)' == 'net452'">
Expand Down
9 changes: 9 additions & 0 deletions src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,17 @@ public async Task Invoke(MessageContext messageContext)

var transportReceiveContext = new TransportReceiveContext(message, messageContext.TransportTransaction, messageContext.ReceiveCancellationTokenSource, rootContext);

var isFaulted = false;
var pipelineEventSource = PipelineEventSource.Log;
try
{
pipelineEventSource.MainStart(messageContext.MessageId);

await receivePipeline.Invoke(transportReceiveContext).ConfigureAwait(false);
}
catch (Exception e)
{
isFaulted = true;
e.Data["Message ID"] = message.MessageId;
if (message.NativeMessageId != message.MessageId)
{
Expand All @@ -44,6 +49,10 @@ public async Task Invoke(MessageContext messageContext)

throw;
}
finally
{
pipelineEventSource.MainStop(messageContext.MessageId, isFaulted);
}

await receivePipelineNotification.Raise(new ReceivePipelineCompleted(message, pipelineStartedAt, DateTime.UtcNow)).ConfigureAwait(false);
}
Expand Down
23 changes: 22 additions & 1 deletion src/NServiceBus.Core/Pipeline/Pipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,32 @@ public Pipeline(IBuilder builder, PipelineModifications pipelineModifications)

public Task Invoke(TContext context)
{
return pipeline(context);
return !PipelineEventSource.Log.IsEnabled() ? pipeline.Invoke(context) : InvokePipelineAndEmitEvents(context);
}

async Task InvokePipelineAndEmitEvents(TContext context)
{
var isFaulted = false;
var pipelineEventSource = PipelineEventSource.Log;
pipelineEventSource.InvokeStart(pipelineName);
try
{
await pipeline.Invoke(context).ConfigureAwait(false);
}
catch
{
isFaulted = true;
throw;
}
finally
{
pipelineEventSource.InvokeStop(pipelineName, isFaulted);
}
}

// ReSharper disable once PrivateFieldCanBeConvertedToLocalVariable
IBehavior[] behaviors;
Func<TContext, Task> pipeline;
string pipelineName = typeof(TContext).FullName;
}
}
118 changes: 118 additions & 0 deletions src/NServiceBus.Core/Pipeline/PipelineEventSource.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
namespace NServiceBus
{
using System;
using System.Diagnostics.CodeAnalysis;
using System.Diagnostics.Tracing;
using System.Runtime.CompilerServices;

/// <summary>
/// Do not rename the methods or parameters on this class nor change the event declarations because it directly affects
/// the events that are generated.
/// </summary>
[EventSource(Name = EventSourceName)]
[SuppressMessage("ReSharper", "ParameterHidesMember")]
sealed class PipelineEventSource : EventSource
{
PipelineEventSource()
{
}

[MethodImpl(MethodImplOptions.NoInlining)]
[Event(MainPipelineStartEventId, Message = "Main Pipeline for MessageId '{0}' started.", Level = EventLevel.Informational)]
public void MainStart(string MessageId) // used within already existing state machine. Enabled check done as part of method implementation
{
if (IsEnabled())
{
WriteEvent(MainPipelineStartEventId, MessageId);
}
}

[MethodImpl(MethodImplOptions.NoInlining)]
[Event(MainPipelineStopEventId, Message = "Main Pipeline for MessageId '{0}' stopped.", Level = EventLevel.Informational)]
public unsafe void MainStop(string MessageId, bool IsFaulted) // used within already existing state machine. Enabled check done as part of method implementation
{
if (IsEnabled())
{
fixed(char* messageIdPtr = MessageId)
{
var eventPayload = stackalloc EventData[2];

eventPayload[0].Size = (MessageId.Length + 1) * 2;
eventPayload[0].DataPointer = (IntPtr)messageIdPtr;

eventPayload[1].Size = sizeof(bool);
eventPayload[1].DataPointer = (IntPtr)(&IsFaulted);
WriteEventCore(MainPipelineStopEventId, 2, eventPayload);
}
}
}

[MethodImpl(MethodImplOptions.NoInlining)]
[Event(SatellitePipelineStartEventId, Message = "Satellite Pipeline '{0}' for MessageId '{1}' started.", Level = EventLevel.Informational)]
public void SatelliteStart(string Name, string MessageId) // used on on hot path where async state machine was optimized away. Enable check done as part of caller
{
WriteEvent(SatellitePipelineStartEventId, Name, MessageId);
}

[MethodImpl(MethodImplOptions.NoInlining)]
[Event(SatellitePipelineStopEventId, Message = "Satellite Pipeline '{0}' for MessageId '{1}' stopped.", Level = EventLevel.Informational)]
public unsafe void SatelliteStop(string Name, string MessageId, bool IsFaulted) // used on on hot path where async state machine was optimized away. Enable check done as part of caller
{
fixed(char* namePtr = Name)
fixed(char* messageIdPtr = MessageId)
{
var eventPayload = stackalloc EventData[3];

eventPayload[0].Size = (Name.Length + 1) * 2;
eventPayload[0].DataPointer = (IntPtr)namePtr;

eventPayload[1].Size = (MessageId.Length + 1) * 2;
eventPayload[1].DataPointer = (IntPtr)messageIdPtr;

eventPayload[2].Size = sizeof(bool);
eventPayload[2].DataPointer = (IntPtr)(&IsFaulted);
WriteEventCore(SatellitePipelineStopEventId, 3, eventPayload);
}
}


[MethodImpl(MethodImplOptions.NoInlining)]
[Event(PipelineStartEventId, Message = "Pipeline '{0}' started.'", Level = EventLevel.Verbose
#if NETSTANDARD
, ActivityOptions = EventActivityOptions.Recursive // Pipelines are started within pipeline, to avoid auto-stop we need recursive on the platform it is available
#endif
)]
public void InvokeStart(string Name) // used on on hot path where async state machine was optimized away. Enable check done as part of caller
{
WriteEvent(PipelineStartEventId, Name);
}


[MethodImpl(MethodImplOptions.NoInlining)]
[Event(PipelineStopEventId, Message = "Pipeline '{0}' stopped.'", Level = EventLevel.Verbose)]
public unsafe void InvokeStop(string Name, bool IsFaulted) // used on on hot path where async state machine was optimized away. Enable check done as part of caller
{
fixed(char* namePtr = Name)
{
var eventPayload = stackalloc EventData[2];

eventPayload[0].Size = (Name.Length + 1) * 2;
eventPayload[0].DataPointer = (IntPtr)namePtr;

eventPayload[1].Size = sizeof(bool);
eventPayload[1].DataPointer = (IntPtr)(&IsFaulted);
WriteEventCore(PipelineStopEventId, 2, eventPayload);
}
}

const string EventSourceName = "NServiceBus.Pipeline";
const int MainPipelineStartEventId = 1;
const int MainPipelineStopEventId = 2;
const int PipelineStartEventId = 3;
const int PipelineStopEventId = 4;
const int SatellitePipelineStartEventId = 5;
const int SatellitePipelineStopEventId = 6;

internal static readonly PipelineEventSource Log = new PipelineEventSource();
}
}
23 changes: 22 additions & 1 deletion src/NServiceBus.Core/Pipeline/SatellitePipelineExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,30 @@ public Task Invoke(MessageContext messageContext)
{
messageContext.Extensions.Set(messageContext.TransportTransaction);

return satelliteDefinition.OnMessage(builder, messageContext);
return !PipelineEventSource.Log.IsEnabled() ? satelliteDefinition.OnMessage(builder, messageContext) : InvokePipelineAndEmitEvents(messageContext);
}

async Task InvokePipelineAndEmitEvents(MessageContext messageContext)
{
var isFaulted = false;
var pipelineEventSource = PipelineEventSource.Log;
try
{
pipelineEventSource.SatelliteStart(satelliteDefinition.Name, messageContext.MessageId);
await satelliteDefinition.OnMessage(builder, messageContext).ConfigureAwait(false);
}
catch
{
isFaulted = true;
throw;
}
finally
{
pipelineEventSource.SatelliteStop(satelliteDefinition.Name, messageContext.MessageId, isFaulted);
}
}


SatelliteDefinition satelliteDefinition;
IBuilder builder;
}
Expand Down
63 changes: 60 additions & 3 deletions src/NServiceBus.Core/Unicast/MessageOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,26 @@ Task Publish(IBehaviorContext context, Type messageType, object message, Publish
options.Context,
context);

return publishPipeline.Invoke(publishContext);
return !MessageOperationsEventSource.Log.IsEnabled() ? publishPipeline.Invoke(publishContext) : InvokePipelineAndEmitEvents(publishContext);
}

async Task InvokePipelineAndEmitEvents(IOutgoingPublishContext outgoingContext)
{
var isFaulted = false;
var messageOperationsEventSource = MessageOperationsEventSource.Log;
try
{
messageOperationsEventSource.PublishStart(outgoingContext.MessageId);
await publishPipeline.Invoke(outgoingContext).ConfigureAwait(false);
}
catch
{
isFaulted = true;
}
finally
{
messageOperationsEventSource.PublishStop(outgoingContext.MessageId, isFaulted);
}
}

public Task Subscribe(IBehaviorContext context, Type eventType, SubscribeOptions options)
Expand Down Expand Up @@ -116,7 +135,26 @@ Task SendMessage(IBehaviorContext context, Type messageType, object message, Sen
outgoingContext.AddDeliveryConstraint(options.DelayedDeliveryConstraint);
}

return sendPipeline.Invoke(outgoingContext);
return !MessageOperationsEventSource.Log.IsEnabled() ? sendPipeline.Invoke(outgoingContext) : InvokePipelineAndEmitEvents(outgoingContext);
}

async Task InvokePipelineAndEmitEvents(IOutgoingSendContext outgoingContext)
{
var isFaulted = false;
var messageOperationsEventSource = MessageOperationsEventSource.Log;
try
{
messageOperationsEventSource.SendStart(outgoingContext.MessageId);
await sendPipeline.Invoke(outgoingContext).ConfigureAwait(false);
}
catch
{
isFaulted = true;
}
finally
{
messageOperationsEventSource.SendStop(outgoingContext.MessageId, isFaulted);
}
}

public Task Reply(IBehaviorContext context, object message, ReplyOptions options)
Expand Down Expand Up @@ -146,7 +184,26 @@ Task ReplyMessage(IBehaviorContext context, Type messageType, object message, Re
options.Context,
context);

return replyPipeline.Invoke(outgoingContext);
return !MessageOperationsEventSource.Log.IsEnabled() ? replyPipeline.Invoke(outgoingContext) : InvokePipelineAndEmitEvents(outgoingContext);
}

async Task InvokePipelineAndEmitEvents(IOutgoingReplyContext outgoingContext)
{
var isFaulted = false;
var messageOperationsEventSource = MessageOperationsEventSource.Log;
try
{
messageOperationsEventSource.ReplyStart(outgoingContext.MessageId);
await replyPipeline.Invoke(outgoingContext).ConfigureAwait(false);
}
catch
{
isFaulted = true;
}
finally
{
messageOperationsEventSource.ReplyStop(outgoingContext.MessageId, isFaulted);
}
}
}
}
66 changes: 66 additions & 0 deletions src/NServiceBus.Core/Unicast/MessageOperationsEventSource.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
namespace NServiceBus
{
using System;
using System.Diagnostics.Tracing;

/// <summary>
/// Do not rename the methods or parameters on this class nor change the event declarations because it directly affects
/// the events that are generated.
/// </summary>
/// <remarks>
/// Operations here are used on on hot path where async state machine was optimized away. Enable check done as
/// part of caller
/// </remarks>
[EventSource(Name = EventSourceName)]
sealed class MessageOperationsEventSource : EventSource
{
MessageOperationsEventSource()
{
}

[Event(SendStartEventId, Message = "Sending message with MessageId '{0}' started.", Level = EventLevel.Informational)]
public void SendStart(string MessageId) => WriteEvent(SendStartEventId, MessageId);

[Event(SendStopEventId, Message = "Sending message with MessageId '{0}' stopped.", Level = EventLevel.Informational)]
public void SendStop(string MessageId, bool IsFaulted) => WriteEvent(SendStopEventId, MessageId, IsFaulted);

[Event(PublishStartEventId, Message = "Publishing message with MessageId '{0}' started.", Level = EventLevel.Informational)]
public void PublishStart(string MessageId) => WriteEvent(PublishStartEventId, MessageId);

[Event(PublishStopEventId, Message = "Publishing message with MessageId '{0}' stopped.", Level = EventLevel.Informational)]
public void PublishStop(string MessageId, bool IsFaulted) => WriteEvent(PublishStopEventId, MessageId, IsFaulted);

[Event(ReplyStartEventId, Message = "Replying message with MessageId '{0}' started.", Level = EventLevel.Informational)]
public void ReplyStart(string MessageId) => WriteEvent(ReplyStartEventId, MessageId);

[Event(ReplyStopEventId, Message = "Replying message with MessageId '{0}' stopped.", Level = EventLevel.Informational)]
public void ReplyStop(string MessageId, bool IsFaulted) => WriteEvent(ReplyStopEventId, MessageId, IsFaulted);

// optimized version for the common signature
[NonEvent]
unsafe void WriteEvent(int EventId, string MessageId, bool IsFaulted)
{
fixed(char* messageIdPtr = MessageId)
{
var eventPayload = stackalloc EventData[2];

eventPayload[0].Size = (MessageId.Length + 1) * 2;
eventPayload[0].DataPointer = (IntPtr)messageIdPtr;

eventPayload[1].Size = sizeof(bool);
eventPayload[1].DataPointer = (IntPtr)(&IsFaulted);
WriteEventCore(EventId, 2, eventPayload);
}
}

const string EventSourceName = "NServiceBus.Messages";
const int SendStartEventId = 1;
const int SendStopEventId = 2;
const int PublishStartEventId = 3;
const int PublishStopEventId = 4;
const int ReplyStartEventId = 5;
const int ReplyStopEventId = 6;

internal static readonly MessageOperationsEventSource Log = new MessageOperationsEventSource();
}
}
Loading