Skip to content

Commit d417e29

Browse files
authored
chore: Refactor SQS handler to a separate class (#2639)
1 parent 334937d commit d417e29

File tree

2 files changed

+135
-117
lines changed

2 files changed

+135
-117
lines changed
Lines changed: 1 addition & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,9 @@
11
// Copyright 2020 New Relic, Inc. All rights reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
using System.Collections.Concurrent;
5-
using System;
64
using System.Collections.Generic;
7-
using System.Threading.Tasks;
85
using NewRelic.Agent.Api;
9-
using NewRelic.Agent.Extensions.AwsSdk;
106
using NewRelic.Agent.Extensions.Providers.Wrapper;
11-
using NewRelic.Reflection;
12-
using System.Linq;
137

148
namespace NewRelic.Providers.Wrapper.AwsSdk
159
{
@@ -18,9 +12,7 @@ public class AwsSdkPipelineWrapper : IWrapper
1812
public bool IsTransactionRequired => true;
1913

2014
private const string WrapperName = "AwsSdkPipelineWrapper";
21-
private static readonly ConcurrentDictionary<Type, Func<object, object>> _getRequestResponseFromGeneric = new();
2215
private static HashSet<string> _unsupportedRequestTypes = new();
23-
private static HashSet<string> _unsupportedSQSRequestTypes = new();
2416

2517
public CanWrapResponse CanWrap(InstrumentedMethodInfo methodInfo)
2618
{
@@ -60,121 +52,13 @@ public AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall ins
6052

6153
if (requestType.StartsWith("Amazon.SQS"))
6254
{
63-
return HandleSQSRequest(instrumentedMethodCall, agent, transaction, request, isAsync, executionContext);
55+
return SQSRequestHandler.HandleSQSRequest(instrumentedMethodCall, agent, transaction, request, isAsync, executionContext);
6456
}
6557

6658
if (_unsupportedRequestTypes.Add(requestType)) // log once per unsupported request type
6759
agent.Logger.Debug($"AwsSdkPipelineWrapper: Unsupported request type: {requestType}. Returning NoOp delegate.");
6860

6961
return Delegates.NoOp;
7062
}
71-
72-
private static AfterWrappedMethodDelegate HandleSQSRequest(InstrumentedMethodCall instrumentedMethodCall, IAgent agent,
73-
ITransaction transaction, dynamic request, bool isAsync, dynamic executionContext)
74-
{
75-
var requestType = request.GetType().Name;
76-
77-
MessageBrokerAction action;
78-
switch (requestType)
79-
{
80-
case "SendMessageRequest":
81-
case "SendMessageBatchRequest":
82-
action = MessageBrokerAction.Produce;
83-
break;
84-
case "ReceiveMessageRequest":
85-
action = MessageBrokerAction.Consume;
86-
break;
87-
case "PurgeQueueRequest":
88-
action = MessageBrokerAction.Purge;
89-
break;
90-
default:
91-
if (_unsupportedSQSRequestTypes.Add(requestType)) // log once per unsupported request type
92-
agent.Logger.Debug($"AwsSdkPipelineWrapper: SQS Request type {requestType} is not supported. Returning NoOp delegate.");
93-
94-
return Delegates.NoOp;
95-
}
96-
97-
var dtHeaders = agent.GetConfiguredDTHeaders();
98-
99-
string requestQueueUrl = request.QueueUrl;
100-
ISegment segment = SqsHelper.GenerateSegment(transaction, instrumentedMethodCall.MethodCall, requestQueueUrl, action);
101-
if (action == MessageBrokerAction.Produce)
102-
{
103-
if (requestType == "SendMessageRequest")
104-
{
105-
if (request.MessageAttributes == null)
106-
{
107-
agent.Logger.Debug("AwsSdkPipelineWrapper: requestContext.OriginalRequest.MessageAttributes is null, unable to insert distributed trace headers.");
108-
}
109-
else
110-
{
111-
SqsHelper.InsertDistributedTraceHeaders(transaction, request, dtHeaders.Count);
112-
}
113-
}
114-
else if (requestType == "SendMessageBatchRequest")
115-
{
116-
// loop through each message in the batch and insert distributed trace headers
117-
foreach (var message in request.Entries)
118-
{
119-
if (message.MessageAttributes == null)
120-
{
121-
agent.Logger.Debug("AwsSdkPipelineWrapper: requestContext.OriginalRequest.Entries.MessageAttributes is null, unable to insert distributed trace headers.");
122-
}
123-
else
124-
{
125-
SqsHelper.InsertDistributedTraceHeaders(transaction, message, dtHeaders.Count);
126-
}
127-
}
128-
}
129-
}
130-
131-
// modify the request to ask for DT headers in the response message attributes.
132-
if (action == MessageBrokerAction.Consume)
133-
{
134-
if (request.MessageAttributeNames == null)
135-
request.MessageAttributeNames = new List<string>();
136-
137-
foreach (var header in dtHeaders)
138-
request.MessageAttributeNames.Add(header);
139-
}
140-
141-
if (isAsync)
142-
{
143-
return Delegates.GetAsyncDelegateFor<Task>(agent, segment, true, ProcessResponse, TaskContinuationOptions.ExecuteSynchronously);
144-
145-
void ProcessResponse(Task responseTask)
146-
{
147-
if (!ValidTaskResponse(responseTask) || (segment == null) || action != MessageBrokerAction.Consume)
148-
return;
149-
150-
// taskResult is a ReceiveMessageResponse
151-
var taskResultGetter = _getRequestResponseFromGeneric.GetOrAdd(responseTask.GetType(), t => VisibilityBypasser.Instance.GeneratePropertyAccessor<object>(t, "Result"));
152-
dynamic receiveMessageResponse = taskResultGetter(responseTask);
153-
154-
// accept distributed trace headers from the first message in the response
155-
SqsHelper.AcceptDistributedTraceHeaders(transaction, receiveMessageResponse.Messages[0].MessageAttributes);
156-
}
157-
}
158-
159-
return Delegates.GetDelegateFor(
160-
onComplete: segment.End,
161-
onSuccess: () =>
162-
{
163-
if (action != MessageBrokerAction.Consume)
164-
return;
165-
166-
var ec = executionContext;
167-
var response = ec.ResponseContext.Response; // response is a ReceiveMessageResponse
168-
169-
// accept distributed trace headers from the first message in the response
170-
SqsHelper.AcceptDistributedTraceHeaders(transaction, response.Messages[0].MessageAttributes);
171-
}
172-
);
173-
}
174-
175-
private static bool ValidTaskResponse(Task response)
176-
{
177-
return response?.Status == TaskStatus.RanToCompletion;
178-
}
17963
}
18064
}
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
// Copyright 2020 New Relic, Inc. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
using System;
5+
using System.Collections.Concurrent;
6+
using System.Collections.Generic;
7+
using System.Threading.Tasks;
8+
using NewRelic.Agent.Api;
9+
using NewRelic.Agent.Extensions.AwsSdk;
10+
using NewRelic.Agent.Extensions.Providers.Wrapper;
11+
using NewRelic.Reflection;
12+
13+
namespace NewRelic.Providers.Wrapper.AwsSdk
14+
{
15+
internal static class SQSRequestHandler
16+
{
17+
private static readonly ConcurrentDictionary<Type, Func<object, object>> _getRequestResponseFromGeneric = new();
18+
private static readonly HashSet<string> _unsupportedSQSRequestTypes = [];
19+
20+
public static AfterWrappedMethodDelegate HandleSQSRequest(InstrumentedMethodCall instrumentedMethodCall, IAgent agent, ITransaction transaction, dynamic request, bool isAsync, dynamic executionContext)
21+
{
22+
var requestType = request.GetType().Name;
23+
24+
MessageBrokerAction action;
25+
switch (requestType)
26+
{
27+
case "SendMessageRequest":
28+
case "SendMessageBatchRequest":
29+
action = MessageBrokerAction.Produce;
30+
break;
31+
case "ReceiveMessageRequest":
32+
action = MessageBrokerAction.Consume;
33+
break;
34+
case "PurgeQueueRequest":
35+
action = MessageBrokerAction.Purge;
36+
break;
37+
default:
38+
if (_unsupportedSQSRequestTypes.Add(requestType)) // log once per unsupported request type
39+
agent.Logger.Debug($"AwsSdkPipelineWrapper: SQS Request type {requestType} is not supported. Returning NoOp delegate.");
40+
41+
return Delegates.NoOp;
42+
}
43+
44+
var dtHeaders = agent.GetConfiguredDTHeaders();
45+
46+
string requestQueueUrl = request.QueueUrl;
47+
ISegment segment = SqsHelper.GenerateSegment(transaction, instrumentedMethodCall.MethodCall, requestQueueUrl, action);
48+
switch (action)
49+
{
50+
case MessageBrokerAction.Produce when requestType == "SendMessageRequest":
51+
{
52+
if (request.MessageAttributes == null)
53+
{
54+
agent.Logger.Debug("AwsSdkPipelineWrapper: requestContext.OriginalRequest.MessageAttributes is null, unable to insert distributed trace headers.");
55+
}
56+
else
57+
{
58+
SqsHelper.InsertDistributedTraceHeaders(transaction, request, dtHeaders.Count);
59+
}
60+
61+
break;
62+
}
63+
case MessageBrokerAction.Produce:
64+
{
65+
if (requestType == "SendMessageBatchRequest")
66+
{
67+
// loop through each message in the batch and insert distributed trace headers
68+
foreach (var message in request.Entries)
69+
{
70+
if (message.MessageAttributes == null)
71+
{
72+
agent.Logger.Debug("AwsSdkPipelineWrapper: requestContext.OriginalRequest.Entries.MessageAttributes is null, unable to insert distributed trace headers.");
73+
}
74+
else
75+
{
76+
SqsHelper.InsertDistributedTraceHeaders(transaction, message, dtHeaders.Count);
77+
}
78+
}
79+
}
80+
81+
break;
82+
}
83+
84+
// modify the request to ask for DT headers in the response message attributes.
85+
case MessageBrokerAction.Consume:
86+
{
87+
if (request.MessageAttributeNames == null)
88+
request.MessageAttributeNames = new List<string>();
89+
90+
foreach (var header in dtHeaders)
91+
request.MessageAttributeNames.Add(header);
92+
break;
93+
}
94+
}
95+
96+
return isAsync ?
97+
Delegates.GetAsyncDelegateFor<Task>(agent, segment, true, ProcessResponse, TaskContinuationOptions.ExecuteSynchronously)
98+
:
99+
Delegates.GetDelegateFor(
100+
onComplete: segment.End,
101+
onSuccess: () =>
102+
{
103+
if (action != MessageBrokerAction.Consume)
104+
return;
105+
106+
var ec = executionContext;
107+
var response = ec.ResponseContext.Response; // response is a ReceiveMessageResponse
108+
109+
// accept distributed trace headers from the first message in the response
110+
SqsHelper.AcceptDistributedTraceHeaders(transaction, response.Messages[0].MessageAttributes);
111+
}
112+
);
113+
114+
void ProcessResponse(Task responseTask)
115+
{
116+
if (!ValidTaskResponse(responseTask) || (segment == null) || action != MessageBrokerAction.Consume)
117+
return;
118+
119+
// taskResult is a ReceiveMessageResponse
120+
var taskResultGetter = _getRequestResponseFromGeneric.GetOrAdd(responseTask.GetType(), t => VisibilityBypasser.Instance.GeneratePropertyAccessor<object>(t, "Result"));
121+
dynamic receiveMessageResponse = taskResultGetter(responseTask);
122+
123+
// accept distributed trace headers from the first message in the response
124+
SqsHelper.AcceptDistributedTraceHeaders(transaction, receiveMessageResponse.Messages[0].MessageAttributes);
125+
}
126+
}
127+
128+
private static bool ValidTaskResponse(Task response)
129+
{
130+
return response?.Status == TaskStatus.RanToCompletion;
131+
}
132+
133+
}
134+
}

0 commit comments

Comments
 (0)