Skip to content

Commit 7b52a67

Browse files
authored
feature: Add AWS SDK instrumentation for Kinesis Data Streams and Kinesis Delivery Streams (Firehose) (#3049)
* Initial code * Checkpoint * Checkpoint, working * Add handler for Firehose * No message broker operations for Firehose * Feature work: Kinesis POC cleanup (#3034) * Refactored common helper methods into helper class; added unit tests for helpers * More unit tests * Kinesis/Firehose integration tests (#3044) * Refactored common helper methods into helper class; added unit tests for helpers * More unit tests * Initial scaffolding for Kinesis integrations tests * Implement rest of Kinesis operations and get the expected metrics correct. Passes. * Attempting to assert on entity relationship attributes. Seeing some issues. Checkpointing. * Kinesis integration tests passing * Initial Firehose integration test scaffolding * Missed a couple of Firehose changes * All operations plumbed, tests passing (basic metrics) * Entity linking data assertions * Fix unit test failures * Address PR feedback Use visby and cache results to avoid reflection performance penalty * Add kinesis and firehose to dotty
1 parent c6424c2 commit 7b52a67

File tree

17 files changed

+1468
-2
lines changed

17 files changed

+1468
-2
lines changed

.github/workflows/scripts/nugetSlackNotifications/packageInfo.json

+6
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@
3232
{
3333
"packageName": "awssdk.bedrockruntime"
3434
},
35+
{
36+
"packageName": "awssdk.kinesis"
37+
},
38+
{
39+
"packageName": "awssdk.kinesisfirehose"
40+
},
3541
{
3642
"packageName": "awssdk.sqs"
3743
},
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// Copyright 2020 New Relic, Inc. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
using System;
5+
using System.Collections.Concurrent;
6+
using System.Linq;
7+
using System.Reflection;
8+
using NewRelic.Reflection;
9+
10+
namespace NewRelic.Agent.Extensions.AwsSdk
11+
{
12+
public static class KinesisHelper
13+
{
14+
15+
private static readonly ConcurrentDictionary<string, string> _streamNameCache = new();
16+
private static readonly ConcurrentDictionary<string, Func<object, string>> _propertyGetterCache = new();
17+
18+
public static string GetStreamNameFromRequest(object request)
19+
{
20+
var streamName = GetPropertyFromRequest(request, "StreamName");
21+
if (streamName != null)
22+
{
23+
return streamName;
24+
}
25+
// if StreamName is null/unavailable, StreamARN may exist
26+
var streamARN = GetStreamArnFromRequest(request);
27+
if (streamARN != null)
28+
{
29+
return GetStreamNameFromArn(streamARN);
30+
}
31+
return null;
32+
}
33+
34+
35+
public static string GetDeliveryStreamNameFromRequest(object request)
36+
{
37+
var streamName = GetPropertyFromRequest(request, "DeliveryStreamName");
38+
if (streamName != null)
39+
{
40+
return streamName;
41+
}
42+
// if StreamName is null/unavailable, StreamARN may exist
43+
var streamARN = GetDeliveryStreamArnFromRequest(request);
44+
if (streamARN != null)
45+
{
46+
return GetStreamNameFromArn(streamARN);
47+
}
48+
return null;
49+
}
50+
51+
public static string GetStreamArnFromRequest(object request)
52+
{
53+
return GetPropertyFromRequest(request, "StreamARN");
54+
}
55+
56+
public static string GetDeliveryStreamArnFromRequest(object request)
57+
{
58+
return GetPropertyFromRequest(request, "DeliveryStreamARN");
59+
}
60+
61+
public static string GetStreamNameFromArn(string streamARN)
62+
{
63+
// arn:aws:kinesis:us-west-2:111111111111:deliverystream/NameOfStream
64+
if (_streamNameCache.ContainsKey(streamARN))
65+
{
66+
return _streamNameCache[streamARN];
67+
}
68+
else
69+
{
70+
var arnParts = streamARN.Split(':');
71+
if (arnParts.Length > 1)
72+
{
73+
var lastPart = arnParts[arnParts.Length - 1];
74+
if (lastPart.Contains('/'))
75+
{
76+
var streamName = lastPart.Split('/')[1];
77+
return _streamNameCache[streamARN] = streamName;
78+
}
79+
}
80+
return null;
81+
}
82+
}
83+
84+
private static string GetPropertyFromRequest(object request, string propertyName)
85+
{
86+
Type type = request.GetType();
87+
var key = type.Name + propertyName;
88+
var getter = _propertyGetterCache.GetOrAdd(key, GetPropertyAccessor(type, propertyName));
89+
return getter(request);
90+
}
91+
92+
private static Func<object, string> GetPropertyAccessor(Type type, string propertyName)
93+
{
94+
try
95+
{
96+
return VisibilityBypasser.Instance.GeneratePropertyAccessor<string>(Assembly.GetAssembly(type).FullName, type.FullName, propertyName);
97+
}
98+
catch
99+
{
100+
// if the attempt to generate the property accessor fails, that means that the requested property name does not exist for this particular
101+
// Kinesis/Firehose request type. Return a delegate that always returns null for any object input
102+
return (o) => null;
103+
}
104+
}
105+
106+
}
107+
}

src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AwsSdk/AwsSdkPipelineWrapper.cs

+10
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,16 @@ public AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall ins
120120
return DynamoDbRequestHandler.HandleDynamoDbRequest(instrumentedMethodCall, agent, transaction, request, isAsync, builder);
121121
}
122122

123+
if (requestType.StartsWith("Amazon.KinesisFirehose"))
124+
{
125+
return FirehoseRequestHandler.HandleFirehoseRequest(instrumentedMethodCall, agent, transaction, request, isAsync, builder);
126+
}
127+
128+
if (requestType.StartsWith("Amazon.Kinesis."))
129+
{
130+
return KinesisRequestHandler.HandleKinesisRequest(instrumentedMethodCall, agent, transaction, request, isAsync, builder);
131+
}
132+
123133
if (!_unsupportedRequestTypes.Contains(requestType)) // log once per unsupported request type
124134
{
125135
agent.Logger.Debug($"AwsSdkPipelineWrapper: Unsupported request type: {requestType}. Returning NoOp delegate.");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
// Copyright 2020 New Relic, Inc. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
using System.Collections.Concurrent;
5+
using System.Threading.Tasks;
6+
using NewRelic.Agent.Api;
7+
using NewRelic.Agent.Extensions.AwsSdk;
8+
using NewRelic.Agent.Extensions.Parsing;
9+
using NewRelic.Agent.Extensions.Providers.Wrapper;
10+
11+
namespace NewRelic.Providers.Wrapper.AwsSdk.RequestHandlers
12+
{
13+
internal static class FirehoseRequestHandler
14+
{
15+
public const string VendorName = "Firehose";
16+
private static ConcurrentDictionary<string, string> _operationNameCache = new();
17+
18+
public static AfterWrappedMethodDelegate HandleFirehoseRequest(InstrumentedMethodCall instrumentedMethodCall, IAgent agent, ITransaction transaction, object request, bool isAsync, ArnBuilder builder)
19+
{
20+
var requestType = request.GetType().Name as string;
21+
22+
// Unlike Kinesis Data Streams, all Firehose requests are instrumented with ordinary method segments
23+
24+
// Not all request types have a stream name or a stream ARN
25+
26+
var streamName = KinesisHelper.GetDeliveryStreamNameFromRequest(request);
27+
string arn = KinesisHelper.GetDeliveryStreamArnFromRequest(request);
28+
if (arn == null && streamName != null)
29+
{
30+
//arn:aws:firehose:us-west-2:111111111111:deliverystream/FirehoseStreamName
31+
arn = builder.Build("firehose", $"deliverystream/{streamName}");
32+
}
33+
34+
var operation = _operationNameCache.GetOrAdd(requestType, requestType.Replace("Request", string.Empty).ToSnakeCase());
35+
36+
ISegment segment;
37+
38+
var operationName = requestType.Replace("Request", "");
39+
40+
if (streamName != null)
41+
{
42+
operationName += "/" + streamName;
43+
}
44+
45+
segment = transaction.StartMethodSegment(instrumentedMethodCall.MethodCall, VendorName, operationName, isLeaf: false);
46+
47+
48+
if (arn != null)
49+
{
50+
segment.AddCloudSdkAttribute("cloud.resource_id", arn);
51+
}
52+
segment.AddCloudSdkAttribute("aws.operation", operation);
53+
segment.AddCloudSdkAttribute("aws.region", builder.Region);
54+
segment.AddCloudSdkAttribute("cloud.platform", "aws_kinesis_delivery_streams");
55+
56+
return isAsync ?
57+
Delegates.GetAsyncDelegateFor<Task>(agent, segment, true, ProcessResponse, TaskContinuationOptions.ExecuteSynchronously)
58+
:
59+
Delegates.GetDelegateFor(
60+
onComplete: segment.End,
61+
onSuccess: () =>
62+
{
63+
return;
64+
}
65+
);
66+
67+
void ProcessResponse(Task responseTask)
68+
{
69+
return;
70+
}
71+
}
72+
73+
74+
}
75+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
// Copyright 2020 New Relic, Inc. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
using System.Collections.Concurrent;
5+
using System.Collections.Generic;
6+
using System.Threading.Tasks;
7+
using NewRelic.Agent.Api;
8+
using NewRelic.Agent.Api.Experimental;
9+
using NewRelic.Agent.Extensions.AwsSdk;
10+
using NewRelic.Agent.Extensions.Parsing;
11+
using NewRelic.Agent.Extensions.Providers.Wrapper;
12+
13+
namespace NewRelic.Providers.Wrapper.AwsSdk.RequestHandlers
14+
{
15+
internal static class KinesisRequestHandler
16+
{
17+
public const string VendorName = "Kinesis";
18+
public static readonly List<string> MessageBrokerRequestTypes = new List<string> { "GetRecordsRequest", "PutRecordsRequest", "PutRecordRequest" };
19+
private static ConcurrentDictionary<string, string> _operationNameCache = new();
20+
21+
22+
public static AfterWrappedMethodDelegate HandleKinesisRequest(InstrumentedMethodCall instrumentedMethodCall, IAgent agent, ITransaction transaction, object request, bool isAsync, ArnBuilder builder)
23+
{
24+
var requestType = request.GetType().Name as string;
25+
26+
// Only some Kinesis requests are treated as message queue operations (put records, get records). The rest should be treated as ordinary spans.
27+
28+
// Not all request types have a stream name or a stream ARN
29+
30+
var streamName = KinesisHelper.GetStreamNameFromRequest(request);
31+
string arn = KinesisHelper.GetStreamArnFromRequest(request);
32+
if (arn == null && streamName != null)
33+
{
34+
arn = builder.Build("kinesis", $"stream/{streamName}");
35+
}
36+
37+
var operation = _operationNameCache.GetOrAdd(requestType, requestType.Replace("Request", string.Empty).ToSnakeCase());
38+
39+
ISegment segment;
40+
41+
if (MessageBrokerRequestTypes.Contains(requestType))
42+
{
43+
var action = requestType.StartsWith("Put") ? MessageBrokerAction.Produce : MessageBrokerAction.Consume;
44+
45+
segment = transaction.StartMessageBrokerSegment(instrumentedMethodCall.MethodCall, MessageBrokerDestinationType.Queue, action, VendorName, destinationName: streamName ?? "Unknown", messagingSystemName: "aws_kinesis_data_streams", cloudAccountId: builder.AccountId, cloudRegion: builder.Region);
46+
segment.GetExperimentalApi().MakeLeaf();
47+
}
48+
else
49+
{
50+
var operationName = requestType.Replace("Request", "");
51+
52+
if (streamName != null)
53+
{
54+
operationName += "/" + streamName;
55+
}
56+
57+
segment = transaction.StartMethodSegment(instrumentedMethodCall.MethodCall, "Kinesis", operationName, isLeaf: false);
58+
59+
}
60+
61+
if (arn != null)
62+
{
63+
segment.AddCloudSdkAttribute("cloud.resource_id", arn);
64+
}
65+
segment.AddCloudSdkAttribute("aws.operation", operation);
66+
segment.AddCloudSdkAttribute("aws.region", builder.Region);
67+
segment.AddCloudSdkAttribute("cloud.platform", "aws_kinesis_data_streams");
68+
69+
return isAsync ?
70+
Delegates.GetAsyncDelegateFor<Task>(agent, segment, true, ProcessResponse, TaskContinuationOptions.ExecuteSynchronously)
71+
:
72+
Delegates.GetDelegateFor(
73+
onComplete: segment.End,
74+
onSuccess: () =>
75+
{
76+
return;
77+
}
78+
);
79+
80+
void ProcessResponse(Task responseTask)
81+
{
82+
return;
83+
}
84+
}
85+
86+
}
87+
}
+6-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,7 @@
11
# NuGet packages
2-
packages
2+
packages
3+
4+
# Container integration tests
5+
6+
# LocalStack assets
7+
ContainerApplications/volume

0 commit comments

Comments
 (0)