Skip to content

Commit 8943672

Browse files
authored
chore: Added consumer attribute reconciliation (#2957)
1 parent 6278107 commit 8943672

File tree

5 files changed

+116
-20
lines changed

5 files changed

+116
-20
lines changed

lib/otel/segments/consumer.js

+7-11
Original file line numberDiff line numberDiff line change
@@ -14,30 +14,26 @@ module.exports = createConsumerSegment
1414

1515
const Transaction = require('../../transaction/')
1616
const recorder = require('../../metrics/recorders/message-transaction')
17-
const { DESTINATIONS, TYPES } = Transaction
17+
const { TYPES } = Transaction
1818

1919
const {
2020
ATTR_MESSAGING_DESTINATION,
2121
ATTR_MESSAGING_DESTINATION_KIND,
22+
ATTR_MESSAGING_DESTINATION_NAME,
2223
ATTR_MESSAGING_SYSTEM
2324
} = require('../constants')
2425

2526
function createConsumerSegment(agent, otelSpan) {
27+
const attrs = otelSpan.attributes
2628
const transaction = new Transaction(agent)
2729
transaction.type = TYPES.MESSAGE
2830

29-
const system = otelSpan.attributes[ATTR_MESSAGING_SYSTEM] ?? 'unknown'
30-
const destination = otelSpan.attributes[ATTR_MESSAGING_DESTINATION] ?? 'unknown'
31-
const destKind = otelSpan.attributes[ATTR_MESSAGING_DESTINATION_KIND] ?? 'unknown'
31+
const system = attrs[ATTR_MESSAGING_SYSTEM] ?? 'unknown'
32+
// _NAME is the current preferred attribute with semantic conventions >=1.3.0.
33+
const destination = attrs[ATTR_MESSAGING_DESTINATION_NAME] ?? attrs[ATTR_MESSAGING_DESTINATION] ?? 'unknown'
34+
const destKind = attrs[ATTR_MESSAGING_DESTINATION_KIND] ?? 'unknown'
3235
const segmentName = `${system}/${destKind}/Named/${destination}`
3336

34-
const txAttrs = transaction.trace.attributes
35-
txAttrs.addAttribute(DESTINATIONS.TRANS_SCOPE, 'message.queueName', destination)
36-
// txAttrs.addAttribute(
37-
// DESTINATIONS.TRANS_SCOPE,
38-
// 'host',
39-
//
40-
// )
4137
transaction.setPartialName(segmentName)
4238

4339
const segment = agent.tracer.createSegment({

lib/otel/span-processor.js

+64-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ const {
1717
ATTR_HTTP_ROUTE,
1818
ATTR_HTTP_STATUS_CODE,
1919
ATTR_HTTP_STATUS_TEXT,
20+
ATTR_MESSAGING_DESTINATION,
21+
ATTR_MESSAGING_DESTINATION_NAME,
2022
ATTR_MESSAGING_MESSAGE_CONVERSATION_ID,
2123
ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY,
2224
ATTR_NET_PEER_NAME,
@@ -38,7 +40,7 @@ module.exports = class NrSpanProcessor {
3840

3941
/**
4042
* Synthesize segment at start of span and assign to a symbol
41-
* that will be removed in `onEnd` once the correspondig
43+
* that will be removed in `onEnd` once the corresponding
4244
* segment is read.
4345
* @param {object} span otel span getting tested
4446
*/
@@ -71,12 +73,72 @@ module.exports = class NrSpanProcessor {
7173
this.reconcileServerAttributes({ segment, span, transaction })
7274
} else if (span.kind === SpanKind.CLIENT && span.attributes[ATTR_DB_SYSTEM]) {
7375
this.reconcileDbAttributes({ segment, span })
76+
} else if (span.kind === SpanKind.CONSUMER) {
77+
this.reconcileConsumerAttributes({ segment, span, transaction })
7478
} else if (span.kind === SpanKind.PRODUCER) {
7579
this.reconcileProducerAttributes({ segment, span })
7680
}
7781
// TODO: add http external checks
7882
}
7983

84+
/**
85+
* Detect messaging consumer attributes in the OTEL span and add them
86+
* to the New Relic transaction. Note: this method ends the current
87+
* transaction.
88+
*
89+
* @param {object} params
90+
* @param {object} params.span The OTEL span entity that possibly contains
91+
* desired attributes.
92+
* @param {Transaction} params.transaction The NR transaction to attach
93+
* the found attributes to.
94+
*/
95+
reconcileConsumerAttributes({ span, transaction }) { // eslint-disable-line sonarjs/cognitive-complexity
96+
const baseSegment = transaction.baseSegment
97+
const trace = transaction.trace
98+
const isHighSecurity = this.agent.config.high_security ?? false
99+
100+
for (const [key, value] of Object.entries(span.attributes)) {
101+
switch (key) {
102+
case ATTR_SERVER_ADDRESS: {
103+
if (value) {
104+
let serverAddress = value
105+
if (urltils.isLocalhost(value)) {
106+
serverAddress = this.agent.config.getHostnameSafe(value)
107+
}
108+
baseSegment.addAttribute('host', serverAddress)
109+
}
110+
break
111+
}
112+
113+
case ATTR_SERVER_PORT: {
114+
baseSegment.addAttribute('port', value)
115+
break
116+
}
117+
118+
case ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY: {
119+
if (isHighSecurity === true || !value) break
120+
trace.attributes.addAttribute(DESTINATIONS.TRANS_COMMON, 'message.routingKey', value)
121+
baseSegment.addAttribute('message.routingKey', value)
122+
break
123+
}
124+
125+
case ATTR_MESSAGING_DESTINATION_NAME:
126+
case ATTR_MESSAGING_DESTINATION: {
127+
if (isHighSecurity === true || !value) break
128+
trace.attributes.addAttribute(DESTINATIONS.TRANS_COMMON, 'message.queueName', value)
129+
baseSegment.addAttribute('message.queueName', value)
130+
break
131+
}
132+
133+
default: {
134+
baseSegment.addAttribute(key, value)
135+
}
136+
}
137+
}
138+
139+
transaction.end()
140+
}
141+
80142
reconcileServerAttributes({ segment, span, transaction }) {
81143
if (span.attributes[ATTR_RPC_SYSTEM]) {
82144
this.reconcileRpcAttributes({ segment, span, transaction })
@@ -86,7 +148,7 @@ module.exports = class NrSpanProcessor {
86148

87149
// End the corresponding transaction for the entry point server span.
88150
// We do then when the span ends to ensure all data has been processed
89-
// for the correspondig server span.
151+
// for the corresponding server span.
90152
transaction.end()
91153
}
92154

lib/spans/span-event.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ class HttpSpanEvent extends SpanEvent {
225225
* Span event class for datastore operations and queries.
226226
*
227227
* @private
228-
* @class.
228+
* @class
229229
*/
230230
class DatastoreSpanEvent extends SpanEvent {
231231
constructor(attributes, customAttributes) {

test/unit/lib/otel/consumer.test.js

-5
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ const {
1818
ATTR_MESSAGING_SYSTEM,
1919
} = require('#agentlib/otel/constants.js')
2020

21-
const { DESTINATIONS } = require('../../../../lib/transaction')
2221
const helper = require('../../../lib/agent_helper')
2322
const createSpan = require('./fixtures/span')
2423
const SegmentSynthesizer = require('../../../../lib/otel/segment-synthesis')
@@ -63,8 +62,4 @@ test('should create consumer segment from otel span', (t) => {
6362
assert.equal(transaction.name, expectedName)
6463
assert.equal(transaction.type, 'message')
6564
assert.equal(transaction.baseSegment, segment)
66-
assert.equal(
67-
transaction.trace.attributes.get(DESTINATIONS.TRANS_SCOPE)['message.queueName'],
68-
'dest1'
69-
)
7065
})

test/versioned/otel-bridge/span.test.js

+44-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ const { hrTimeToMilliseconds } = require('@opentelemetry/core')
1313
const helper = require('../../lib/agent_helper')
1414
const { otelSynthesis } = require('../../../lib/symbols')
1515

16+
const { DESTINATIONS } = require('../../../lib/transaction')
1617
const {
1718
ATTR_DB_NAME,
1819
ATTR_DB_STATEMENT,
@@ -26,6 +27,7 @@ const {
2627
ATTR_HTTP_URL,
2728
ATTR_MESSAGING_DESTINATION,
2829
ATTR_MESSAGING_DESTINATION_KIND,
30+
ATTR_MESSAGING_DESTINATION_NAME,
2931
ATTR_MESSAGING_MESSAGE_CONVERSATION_ID,
3032
ATTR_MESSAGING_OPERATION,
3133
ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY,
@@ -414,12 +416,15 @@ test('Otel producer span test', (t, end) => {
414416

415417
test('messaging consumer metrics are bridged correctly', (t, end) => {
416418
const { agent, tracer } = t.nr
419+
const expectedHost = agent.config.getHostnameSafe('localhost')
417420
const attributes = {
418421
[ATTR_MESSAGING_SYSTEM]: 'kafka',
419422
[ATTR_MESSAGING_OPERATION]: 'getMessage',
420423
[ATTR_SERVER_ADDRESS]: '127.0.0.1',
424+
[ATTR_SERVER_PORT]: '1234',
421425
[ATTR_MESSAGING_DESTINATION]: 'work-queue',
422-
[ATTR_MESSAGING_DESTINATION_KIND]: 'queue'
426+
[ATTR_MESSAGING_DESTINATION_KIND]: 'queue',
427+
[ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: 'test-key'
423428
}
424429

425430
tracer.startActiveSpan('consumer-test', { kind: otel.SpanKind.CONSUMER, attributes }, (span) => {
@@ -444,6 +449,44 @@ test('messaging consumer metrics are bridged correctly', (t, end) => {
444449
assert.equal(unscopedMetrics[expectedMetric].callCount, 1, `${expectedMetric}.callCount`)
445450
}
446451

452+
// Verify that required reconciled attributes are present:
453+
let attrs = tx.baseSegment.getAttributes()
454+
assert.equal(attrs.host, expectedHost)
455+
assert.equal(attrs.port, '1234')
456+
attrs = tx.trace.attributes.get(DESTINATIONS.TRANS_COMMON)
457+
assert.equal(attrs['message.queueName'], 'work-queue')
458+
assert.equal(attrs['message.routingKey'], 'test-key')
459+
460+
end()
461+
})
462+
})
463+
464+
test('messaging consumer skips high security attributes', (t, end) => {
465+
const { agent, tracer } = t.nr
466+
const expectedHost = agent.config.getHostnameSafe('localhost')
467+
const attributes = {
468+
[ATTR_MESSAGING_SYSTEM]: 'kafka',
469+
[ATTR_MESSAGING_OPERATION]: 'getMessage',
470+
[ATTR_SERVER_ADDRESS]: '127.0.0.1',
471+
[ATTR_SERVER_PORT]: '1234',
472+
[ATTR_MESSAGING_DESTINATION_KIND]: 'queue',
473+
[ATTR_MESSAGING_DESTINATION_NAME]: 'test-queue',
474+
[ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: 'test-key'
475+
}
476+
agent.config.high_security = true
477+
478+
tracer.startActiveSpan('consumer-test', { kind: otel.SpanKind.CONSUMER, attributes }, (span) => {
479+
const tx = agent.getTransaction()
480+
span.end()
481+
482+
// Verify that required reconciled attributes are present:
483+
let attrs = tx.baseSegment.getAttributes()
484+
assert.equal(attrs.host, expectedHost)
485+
assert.equal(attrs.port, '1234')
486+
attrs = tx.trace.attributes.get(DESTINATIONS.TRANS_COMMON)
487+
assert.equal(attrs['message.queueName'], undefined)
488+
assert.equal(attrs['message.routingKey'], undefined)
489+
447490
end()
448491
})
449492
})

0 commit comments

Comments
 (0)