Skip to content

Commit ab438a0

Browse files
feat(instrumentation-aws-sdk): add gen ai conventions for converse stream span (#2769)
Co-authored-by: Marc Pichler <[email protected]>
1 parent 60612f2 commit ab438a0

File tree

7 files changed

+287
-16
lines changed

7 files changed

+287
-16
lines changed

plugins/node/opentelemetry-instrumentation-aws-sdk/src/aws-sdk.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -402,13 +402,17 @@ export class AwsInstrumentation extends InstrumentationBase<AwsSdkInstrumentatio
402402
request: normalizedRequest,
403403
requestId: requestId,
404404
};
405-
self.servicesExtensions.responseHook(
405+
const override = self.servicesExtensions.responseHook(
406406
normalizedResponse,
407407
span,
408408
self.tracer,
409409
self.getConfig(),
410410
startTime
411411
);
412+
if (override) {
413+
response.output = override;
414+
normalizedResponse.data = override;
415+
}
412416
self._callUserResponseHook(span, normalizedResponse);
413417
return response;
414418
})
@@ -442,7 +446,9 @@ export class AwsInstrumentation extends InstrumentationBase<AwsSdkInstrumentatio
442446
throw err;
443447
})
444448
.finally(() => {
445-
span.end();
449+
if (!requestMetadata.isStream) {
450+
span.end();
451+
}
446452
});
447453
promiseWithResponseLogic
448454
.then(res => {

plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/ServiceExtension.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ import {
3131
export interface RequestMetadata {
3232
// isIncoming - if true, then the operation callback / promise should be bind with the operation's span
3333
isIncoming: boolean;
34+
// isStream - if true, then the response is a stream so the span should not be ended by the middleware.
35+
// the ServiceExtension must end the span itself, generally by wrapping the stream and ending after it is
36+
// consumed.
37+
isStream?: boolean;
3438
spanAttributes?: SpanAttributes;
3539
spanKind?: SpanKind;
3640
spanName?: string;
@@ -47,13 +51,14 @@ export interface ServiceExtension {
4751
// called before request is sent, and after span is started
4852
requestPostSpanHook?: (request: NormalizedRequest) => void;
4953

54+
// called after response is received. If value is returned, it replaces the response output.
5055
responseHook?: (
5156
response: NormalizedResponse,
5257
span: Span,
5358
tracer: Tracer,
5459
config: AwsSdkInstrumentationConfig,
5560
startTime: HrTime
56-
) => void;
61+
) => any | undefined;
5762

5863
updateMetricInstruments?: (meter: Meter) => void;
5964
}

plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/ServicesExtensions.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,14 @@ export class ServicesExtensions implements ServiceExtension {
6868
startTime: HrTime
6969
) {
7070
const serviceExtension = this.services.get(response.request.serviceName);
71-
serviceExtension?.responseHook?.(response, span, tracer, config, startTime);
71+
72+
return serviceExtension?.responseHook?.(
73+
response,
74+
span,
75+
tracer,
76+
config,
77+
startTime
78+
);
7279
}
7380

7481
updateMetricInstruments(meter: Meter) {

plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/bedrock-runtime.ts

Lines changed: 76 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ import {
4646
NormalizedRequest,
4747
NormalizedResponse,
4848
} from '../types';
49+
import type {
50+
ConverseStreamOutput,
51+
TokenUsage,
52+
} from '@aws-sdk/client-bedrock-runtime';
4953
import {
5054
hrTime,
5155
hrTimeDuration,
@@ -93,7 +97,9 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
9397
): RequestMetadata {
9498
switch (request.commandName) {
9599
case 'Converse':
96-
return this.requestPreSpanHookConverse(request, config, diag);
100+
return this.requestPreSpanHookConverse(request, config, diag, false);
101+
case 'ConverseStream':
102+
return this.requestPreSpanHookConverse(request, config, diag, true);
97103
case 'InvokeModel':
98104
return this.requestPreSpanHookInvokeModel(request, config, diag);
99105
}
@@ -106,7 +112,8 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
106112
private requestPreSpanHookConverse(
107113
request: NormalizedRequest,
108114
config: AwsSdkInstrumentationConfig,
109-
diag: DiagLogger
115+
diag: DiagLogger,
116+
isStream: boolean
110117
): RequestMetadata {
111118
let spanName = GEN_AI_OPERATION_NAME_VALUE_CHAT;
112119
const spanAttributes: Attributes = {
@@ -142,6 +149,7 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
142149
return {
143150
spanName,
144151
isIncoming: false,
152+
isStream,
145153
spanAttributes,
146154
};
147155
}
@@ -328,6 +336,14 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
328336
config,
329337
startTime
330338
);
339+
case 'ConverseStream':
340+
return this.responseHookConverseStream(
341+
response,
342+
span,
343+
tracer,
344+
config,
345+
startTime
346+
);
331347
case 'InvokeModel':
332348
return this.responseHookInvokeModel(response, span, tracer, config);
333349
}
@@ -342,6 +358,64 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
342358
) {
343359
const { stopReason, usage } = response.data;
344360

361+
BedrockRuntimeServiceExtension.setStopReason(span, stopReason);
362+
this.setUsage(response, span, usage, startTime);
363+
}
364+
365+
private responseHookConverseStream(
366+
response: NormalizedResponse,
367+
span: Span,
368+
tracer: Tracer,
369+
config: AwsSdkInstrumentationConfig,
370+
startTime: HrTime
371+
) {
372+
return {
373+
...response.data,
374+
// Wrap and replace the response stream to allow processing events to telemetry
375+
// before yielding to the user.
376+
stream: this.wrapConverseStreamResponse(
377+
response,
378+
response.data.stream,
379+
span,
380+
startTime
381+
),
382+
};
383+
}
384+
385+
private async *wrapConverseStreamResponse(
386+
response: NormalizedResponse,
387+
stream: AsyncIterable<ConverseStreamOutput>,
388+
span: Span,
389+
startTime: HrTime
390+
) {
391+
try {
392+
let usage: TokenUsage | undefined;
393+
for await (const item of stream) {
394+
BedrockRuntimeServiceExtension.setStopReason(
395+
span,
396+
item.messageStop?.stopReason
397+
);
398+
usage = item.metadata?.usage;
399+
yield item;
400+
}
401+
this.setUsage(response, span, usage, startTime);
402+
} finally {
403+
span.end();
404+
}
405+
}
406+
407+
private static setStopReason(span: Span, stopReason: string | undefined) {
408+
if (stopReason !== undefined) {
409+
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [stopReason]);
410+
}
411+
}
412+
413+
private setUsage(
414+
response: NormalizedResponse,
415+
span: Span,
416+
usage: TokenUsage | undefined,
417+
startTime: HrTime
418+
) {
345419
const sharedMetricAttrs: Attributes = {
346420
[ATTR_GEN_AI_SYSTEM]: GEN_AI_SYSTEM_VALUE_AWS_BEDROCK,
347421
[ATTR_GEN_AI_OPERATION_NAME]: GEN_AI_OPERATION_NAME_VALUE_CHAT,
@@ -371,10 +445,6 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
371445
});
372446
}
373447
}
374-
375-
if (stopReason !== undefined) {
376-
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [stopReason]);
377-
}
378448
}
379449

380450
private responseHookInvokeModel(

plugins/node/opentelemetry-instrumentation-aws-sdk/test/bedrock-runtime.test.ts

Lines changed: 130 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,12 @@
2828
*/
2929

3030
import { getTestSpans } from '@opentelemetry/contrib-test-utils';
31-
import { metricReader } from './load-instrumentation';
31+
import { meterProvider, metricExporter } from './load-instrumentation';
3232

3333
import {
3434
BedrockRuntimeClient,
3535
ConverseCommand,
36+
ConverseStreamCommand,
3637
ConversationRole,
3738
InvokeModelCommand,
3839
} from '@aws-sdk/client-bedrock-runtime';
@@ -106,6 +107,9 @@ describe('Bedrock', () => {
106107

107108
afterEach(async function () {
108109
nockDone();
110+
111+
await meterProvider.forceFlush();
112+
metricExporter.reset();
109113
});
110114

111115
describe('Converse', () => {
@@ -154,7 +158,131 @@ describe('Bedrock', () => {
154158
[ATTR_GEN_AI_RESPONSE_FINISH_REASONS]: ['max_tokens'],
155159
});
156160

157-
const { resourceMetrics } = await metricReader.collect();
161+
await meterProvider.forceFlush();
162+
const [resourceMetrics] = metricExporter.getMetrics();
163+
expect(resourceMetrics.scopeMetrics.length).toBe(1);
164+
const scopeMetrics = resourceMetrics.scopeMetrics[0];
165+
const tokenUsage = scopeMetrics.metrics.filter(
166+
m => m.descriptor.name === 'gen_ai.client.token.usage'
167+
);
168+
expect(tokenUsage.length).toBe(1);
169+
expect(tokenUsage[0].descriptor).toMatchObject({
170+
name: 'gen_ai.client.token.usage',
171+
type: 'HISTOGRAM',
172+
description: 'Measures number of input and output tokens used',
173+
unit: '{token}',
174+
});
175+
expect(tokenUsage[0].dataPoints.length).toBe(2);
176+
expect(tokenUsage[0].dataPoints).toEqual(
177+
expect.arrayContaining([
178+
expect.objectContaining({
179+
value: expect.objectContaining({
180+
sum: 8,
181+
}),
182+
attributes: {
183+
'gen_ai.system': 'aws.bedrock',
184+
'gen_ai.operation.name': 'chat',
185+
'gen_ai.request.model': 'amazon.titan-text-lite-v1',
186+
'gen_ai.token.type': 'input',
187+
},
188+
}),
189+
expect.objectContaining({
190+
value: expect.objectContaining({
191+
sum: 10,
192+
}),
193+
attributes: {
194+
'gen_ai.system': 'aws.bedrock',
195+
'gen_ai.operation.name': 'chat',
196+
'gen_ai.request.model': 'amazon.titan-text-lite-v1',
197+
'gen_ai.token.type': 'output',
198+
},
199+
}),
200+
])
201+
);
202+
203+
const operationDuration = scopeMetrics.metrics.filter(
204+
m => m.descriptor.name === 'gen_ai.client.operation.duration'
205+
);
206+
expect(operationDuration.length).toBe(1);
207+
expect(operationDuration[0].descriptor).toMatchObject({
208+
name: 'gen_ai.client.operation.duration',
209+
type: 'HISTOGRAM',
210+
description: 'GenAI operation duration',
211+
unit: 's',
212+
});
213+
expect(operationDuration[0].dataPoints.length).toBe(1);
214+
expect(operationDuration[0].dataPoints).toEqual([
215+
expect.objectContaining({
216+
value: expect.objectContaining({
217+
sum: expect.any(Number),
218+
}),
219+
attributes: {
220+
'gen_ai.system': 'aws.bedrock',
221+
'gen_ai.operation.name': 'chat',
222+
'gen_ai.request.model': 'amazon.titan-text-lite-v1',
223+
},
224+
}),
225+
]);
226+
expect(
227+
(operationDuration[0].dataPoints[0].value as any).sum
228+
).toBeGreaterThan(0);
229+
});
230+
});
231+
232+
describe('ConverseStream', () => {
233+
it('adds genai conventions', async () => {
234+
const modelId = 'amazon.titan-text-lite-v1';
235+
const messages = [
236+
{
237+
role: ConversationRole.USER,
238+
content: [{ text: 'Say this is a test' }],
239+
},
240+
];
241+
const inferenceConfig = {
242+
maxTokens: 10,
243+
temperature: 0.8,
244+
topP: 1,
245+
stopSequences: ['|'],
246+
};
247+
248+
const command = new ConverseStreamCommand({
249+
modelId,
250+
messages,
251+
inferenceConfig,
252+
});
253+
254+
const response = await client.send(command);
255+
const chunks: string[] = [];
256+
for await (const item of response.stream!) {
257+
const text = item.contentBlockDelta?.delta?.text;
258+
if (text) {
259+
chunks.push(text);
260+
}
261+
}
262+
expect(chunks.join('')).toBe('Hi! How are you? How');
263+
264+
const testSpans: ReadableSpan[] = getTestSpans();
265+
const converseSpans: ReadableSpan[] = testSpans.filter(
266+
(s: ReadableSpan) => {
267+
return s.name === 'chat amazon.titan-text-lite-v1';
268+
}
269+
);
270+
expect(converseSpans.length).toBe(1);
271+
expect(converseSpans[0].attributes).toMatchObject({
272+
[ATTR_GEN_AI_SYSTEM]: GEN_AI_SYSTEM_VALUE_AWS_BEDROCK,
273+
[ATTR_GEN_AI_OPERATION_NAME]: GEN_AI_OPERATION_NAME_VALUE_CHAT,
274+
[ATTR_GEN_AI_REQUEST_MODEL]: modelId,
275+
[ATTR_GEN_AI_REQUEST_MAX_TOKENS]: 10,
276+
[ATTR_GEN_AI_REQUEST_TEMPERATURE]: 0.8,
277+
[ATTR_GEN_AI_REQUEST_TOP_P]: 1,
278+
[ATTR_GEN_AI_REQUEST_STOP_SEQUENCES]: ['|'],
279+
[ATTR_GEN_AI_USAGE_INPUT_TOKENS]: 8,
280+
[ATTR_GEN_AI_USAGE_OUTPUT_TOKENS]: 10,
281+
[ATTR_GEN_AI_RESPONSE_FINISH_REASONS]: ['max_tokens'],
282+
});
283+
284+
await meterProvider.forceFlush();
285+
const [resourceMetrics] = metricExporter.getMetrics();
158286
expect(resourceMetrics.scopeMetrics.length).toBe(1);
159287
const scopeMetrics = resourceMetrics.scopeMetrics[0];
160288
const tokenUsage = scopeMetrics.metrics.filter(

plugins/node/opentelemetry-instrumentation-aws-sdk/test/load-instrumentation.ts

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,25 @@
2121
* specific test. We instead instantiate a single instrumentation instance here to
2222
* use within all tests.
2323
*/
24+
import { registerInstrumentationTesting } from '@opentelemetry/contrib-test-utils';
2425
import {
25-
initMeterProvider,
26-
registerInstrumentationTesting,
27-
} from '@opentelemetry/contrib-test-utils';
26+
AggregationTemporality,
27+
InMemoryMetricExporter,
28+
MeterProvider,
29+
PeriodicExportingMetricReader,
30+
} from '@opentelemetry/sdk-metrics';
2831
import { AwsInstrumentation } from '../src';
2932

3033
export const instrumentation = new AwsInstrumentation();
31-
export const metricReader = initMeterProvider(instrumentation);
34+
export const metricExporter = new InMemoryMetricExporter(
35+
AggregationTemporality.DELTA
36+
);
37+
export const meterProvider = new MeterProvider({
38+
readers: [
39+
new PeriodicExportingMetricReader({
40+
exporter: metricExporter,
41+
}),
42+
],
43+
});
44+
instrumentation.setMeterProvider(meterProvider);
3245
registerInstrumentationTesting(instrumentation);

0 commit comments

Comments
 (0)