Skip to content

Commit 4f48fc3

Browse files
authored
refactor: Updated shim.recordConsume to use shim.record and added ability to invoke an after hook with callback args (#2207)
1 parent 330cc4b commit 4f48fc3

File tree

21 files changed

+292
-332
lines changed

21 files changed

+292
-332
lines changed

lib/instrumentation/amqplib/amqplib.js

+9-9
Original file line numberDiff line numberDiff line change
@@ -266,18 +266,20 @@ function wrapModel(shim, Model, promiseMode) {
266266
destinationName: shim.FIRST,
267267
callback: setCallback(shim, promiseMode),
268268
promise: promiseMode,
269-
messageHandler: function handleConsumedMessage(shim, fn, name, message) {
269+
after: function handleConsumedMessage({ shim, result, args, segment }) {
270+
if (!shim.agent.config.message_tracer.segment_parameters.enabled) {
271+
shim.logger.trace('Not capturing segment parameters')
272+
return
273+
}
274+
270275
// the message is the param when using the promised based model
271-
message = promiseMode ? message : message[1]
276+
const message = promiseMode ? result : args?.[1]
272277
if (!message) {
273278
shim.logger.trace('No results from consume.')
274279
return null
275280
}
276281
const parameters = getParametersFromMessage(message)
277-
278-
const headers = message?.properties?.headers
279-
280-
return { parameters, headers }
282+
shim.copySegmentParameters(segment, parameters)
281283
}
282284
})
283285
)
@@ -312,12 +314,10 @@ function wrapModel(shim, Model, promiseMode) {
312314
* Extracts the appropriate messageHandler parameters for the consume method.
313315
*
314316
* @param {Shim} shim instance of shim
315-
* @param {object} _consumer not used
316-
* @param {string} _name not used
317317
* @param {Array} args arguments passed to the consume method
318318
* @returns {object} message params
319319
*/
320-
function describeMessage(shim, _consumer, _name, args) {
320+
function describeMessage(shim, args) {
321321
const [message] = args
322322

323323
if (!message?.properties) {

lib/instrumentation/aws-sdk/v3/bedrock.js

+1-2
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,7 @@ function getBedrockSpec({ commandName }, shim, _original, _name, args) {
197197
return new RecorderSpec({
198198
promise: true,
199199
name: `Llm/${modelType}/Bedrock/${commandName}`,
200-
// eslint-disable-next-line max-params
201-
after: (shim, _fn, _fnName, err, response, segment) => {
200+
after: ({ shim, error: err, result: response, segment }) => {
202201
const passThroughParams = {
203202
shim,
204203
err,

lib/instrumentation/core/inspector.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ function initialize(agent, inspector, name, shim) {
1616
shim.wrap(sessionProto, 'post', function wrapPost(shim, fn) {
1717
return function wrappedPost() {
1818
const args = shim.argsToArray.apply(shim, arguments)
19-
shim.bindCallbackSegment(args, shim.LAST)
19+
shim.bindCallbackSegment(null, args, shim.LAST)
2020
return fn.apply(this, args)
2121
}
2222
})

lib/instrumentation/langchain/runnable.js

+2-4
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,7 @@ function instrumentInvokeChain({ langchain, shim }) {
6161
return new RecorderSpec({
6262
name: `${LANGCHAIN.CHAIN}/${fnName}`,
6363
promise: true,
64-
// eslint-disable-next-line max-params
65-
after(_shim, _fn, _name, err, output, segment) {
64+
after({ error: err, result: output, segment }) {
6665
recordChatCompletionEvents({
6766
segment,
6867
messages: [output],
@@ -97,8 +96,7 @@ function instrumentStream({ langchain, shim }) {
9796
return new RecorderSpec({
9897
name: `${LANGCHAIN.CHAIN}/${fnName}`,
9998
promise: true,
100-
// eslint-disable-next-line max-params
101-
after(_shim, _fn, _name, err, output, segment) {
99+
after({ error: err, result: output, segment }) {
102100
// Input error occurred which means a stream was not created.
103101
// Skip instrumenting streaming and create Llm Events from
104102
// the data we have

lib/instrumentation/langchain/tools.js

+1-2
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,7 @@ module.exports = function initialize(shim, tools) {
3131
return new RecorderSpec({
3232
name: `${LANGCHAIN.TOOL}/${name}`,
3333
promise: true,
34-
// eslint-disable-next-line max-params
35-
after(_shim, _fn, _name, err, output, segment) {
34+
after({ error: err, result: output, segment }) {
3635
const metadata = mergeMetadata(instanceMeta, paramsMeta)
3736
const tags = mergeTags(instanceTags, paramsTags)
3837
segment.end()

lib/instrumentation/langchain/vectorstore.js

+1-2
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,7 @@ module.exports = function initialize(shim, vectorstores) {
8989
return new RecorderSpec({
9090
name: `${LANGCHAIN.VECTORSTORE}/${fnName}`,
9191
promise: true,
92-
// eslint-disable-next-line max-params
93-
after(_shim, _fn, _name, err, output, segment) {
92+
after({ error: err, result: output, segment }) {
9493
if (!output) {
9594
// If we get an error, it is possible that `output = null`.
9695
// In that case, we define it to be an empty array.

lib/instrumentation/memcached.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ module.exports = function initialize(agent, memcached, moduleName, shim) {
7676
return new OperationSpec({
7777
name: metacall.type || 'Unknown',
7878
callback: function wrapCallback(shim, fn, fnName, opSegment) {
79-
shim.bindCallbackSegment(metacall, 'callback', opSegment)
79+
shim.bindCallbackSegment(null, metacall, 'callback', opSegment)
8080
},
8181
parameters
8282
})

lib/instrumentation/openai.js

+2-4
Original file line numberDiff line numberDiff line change
@@ -262,8 +262,7 @@ module.exports = function initialize(agent, openai, moduleName, shim) {
262262
return new RecorderSpec({
263263
name: OPENAI.COMPLETION,
264264
promise: true,
265-
// eslint-disable-next-line max-params
266-
after(_shim, _fn, _name, err, response, segment) {
265+
after({ error: err, result: response, segment }) {
267266
if (request.stream) {
268267
instrumentStream({ agent, shim, request, response, segment })
269268
} else {
@@ -294,8 +293,7 @@ module.exports = function initialize(agent, openai, moduleName, shim) {
294293
return new RecorderSpec({
295294
name: OPENAI.EMBEDDING,
296295
promise: true,
297-
// eslint-disable-next-line max-params
298-
after(_shim, _fn, _name, err, response, segment) {
296+
after({ error: err, result: response, segment }) {
299297
addLlmMeta({ agent, segment })
300298

301299
if (!response) {

lib/instrumentation/redis.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ function registerInternalSendCommand(shim, proto) {
4949
parameters,
5050
callback: function bindCallback(shim, _f, _n, segment) {
5151
if (shim.isFunction(commandObject.callback)) {
52-
shim.bindCallbackSegment(commandObject, 'callback', segment)
52+
shim.bindCallbackSegment(null, commandObject, 'callback', segment)
5353
} else {
5454
const self = this
5555
commandObject.callback = shim.bindSegment(
@@ -87,9 +87,9 @@ function registerSendCommand(shim, proto) {
8787
callback: function bindCallback(shim, _f, _n, segment) {
8888
const last = args[args.length - 1]
8989
if (shim.isFunction(last)) {
90-
shim.bindCallbackSegment(args, shim.LAST, segment)
90+
shim.bindCallbackSegment(null, args, shim.LAST, segment)
9191
} else if (shim.isArray(last) && shim.isFunction(last[last.length - 1])) {
92-
shim.bindCallbackSegment(last, shim.LAST, segment)
92+
shim.bindCallbackSegment(null, last, shim.LAST, segment)
9393
}
9494
}
9595
})

lib/instrumentation/superagent.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ function wrapCallback(shim, callback) {
5050
return function wrappedCallback() {
5151
const segment = shim.getSegment(this)
5252
if (segment && segment.transaction.isActive()) {
53-
shim.bindCallbackSegment(this, '_callback', segment)
53+
shim.bindCallbackSegment(null, this, '_callback', segment)
5454
}
5555
return callback.apply(this, arguments)
5656
}

lib/shim/message-shim/consume.js

+9-119
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
*/
55

66
'use strict'
7-
const TraceSegment = require('../../transaction/trace/segment')
87
const genericRecorder = require('../../metrics/recorders/generic')
98
const { _nameMessageSegment } = require('./common')
109
const specs = require('../specs')
@@ -25,7 +24,7 @@ module.exports = createRecorder
2524
function updateSpecFromArgs({ shim, fn, fnName, args, spec }) {
2625
let msgDesc = null
2726
if (shim.isFunction(spec)) {
28-
msgDesc = spec.call(this, shim, fn, fnName, args)
27+
msgDesc = spec(shim, fn, fnName, args)
2928
} else {
3029
msgDesc = spec
3130
const destIdx = shim.normalizeIndex(args.length, spec.destinationName)
@@ -37,130 +36,21 @@ function updateSpecFromArgs({ shim, fn, fnName, args, spec }) {
3736
return msgDesc
3837
}
3938

40-
/**
41-
* Binds the consumer callback to the active segment.
42-
*
43-
* @private
44-
* @param {object} params to function
45-
* @param {MessageShim} params.shim instance of shim
46-
* @param {Array} params.args arguments passed to original consume function
47-
* @param {specs.MessageSpec} params.msgDesc spec for the wrapped consume function
48-
* @param {TraceSegment} params.segment active segment to bind callback
49-
* @param {boolean} params.getParams flag to copy message parameters to segment
50-
* @param {Function} params.resHandler function to handle response from callback to obtain the message parameters
51-
*/
52-
function bindCallback({ shim, args, msgDesc, segment, getParams, resHandler }) {
53-
const cbIdx = shim.normalizeIndex(args.length, msgDesc.callback)
54-
if (cbIdx !== null) {
55-
shim.bindCallbackSegment(args, cbIdx, segment)
56-
57-
// If we have a callback and a results handler, then wrap the callback so
58-
// we can call the results handler and get the message properties.
59-
if (resHandler) {
60-
shim.wrap(args, cbIdx, function wrapCb(shim, cb, cbName) {
61-
if (shim.isFunction(cb)) {
62-
return function cbWrapper() {
63-
const cbArgs = shim.argsToArray.apply(shim, arguments)
64-
const msgProps = resHandler.call(this, shim, cb, cbName, cbArgs)
65-
if (getParams && msgProps && msgProps.parameters) {
66-
shim.copySegmentParameters(segment, msgProps.parameters)
67-
}
68-
69-
return cb.apply(this, arguments)
70-
}
71-
}
72-
})
73-
}
74-
}
75-
}
76-
77-
/**
78-
* Binds the consumer function to the async context and checks return to possibly
79-
* bind the promise
80-
*
81-
* @private
82-
* @param {object} params to function
83-
* @param {MessageShim} params.shim instance of shim
84-
* @param {Function} params.fn consumer function
85-
* @param {string} params.fnName name of function
86-
* @param {Array} params.args arguments passed to original consume function
87-
* @param {specs.MessageSpec} params.msgDesc spec for the wrapped consume function
88-
* @param {TraceSegment} params.segment active segment to bind callback
89-
* @param {boolean} params.getParams flag to copy message parameters to segment
90-
* @param {Function} params.resHandler function to handle response from callback to obtain the message parameters
91-
* @returns {Promise|*} response from consume function
92-
*/
93-
function bindConsumer({ shim, fn, fnName, args, msgDesc, segment, getParams, resHandler }) {
94-
// Call the method in the context of our segment.
95-
let ret = shim.applySegment(fn, segment, true, this, args)
96-
97-
if (ret && msgDesc.promise && shim.isPromise(ret)) {
98-
ret = shim.bindPromise(ret, segment)
99-
100-
// Intercept the promise to handle the result.
101-
if (resHandler) {
102-
ret = ret.then(function interceptValue(res) {
103-
const msgProps = resHandler.call(this, shim, fn, fnName, res)
104-
if (getParams && msgProps && msgProps.parameters) {
105-
shim.copySegmentParameters(segment, msgProps.parameters)
106-
}
107-
return res
108-
})
109-
}
110-
}
111-
112-
return ret
113-
}
114-
11539
/**
11640
*
11741
* @private
11842
* @param {object} params to function
11943
* @param {MessageShim} params.shim instance of shim
12044
* @param {Function} params.fn function that is being wrapped
12145
* @param {string} params.fnName name of function
46+
* @param params.args
12247
* @param {specs.MessageSpec} params.spec spec for the wrapped consume function
123-
* @returns {Function} recorder for consume function
48+
* @returns {specs.MessageSpec} updated spec with logic to name segment and apply the genericRecorder
12449
*/
125-
function createRecorder({ shim, fn, fnName, spec }) {
126-
return function consumeRecorder() {
127-
const parent = shim.getSegment()
128-
if (!parent || !parent.transaction.isActive()) {
129-
shim.logger.trace('Not recording consume, no active transaction.')
130-
return fn.apply(this, arguments)
131-
}
132-
133-
// Process the message args.
134-
const args = shim.argsToArray.apply(shim, arguments)
135-
const msgDesc = updateSpecFromArgs.call(this, { shim, fn, fnName, args, spec })
136-
137-
// Make the segment if we can.
138-
if (!msgDesc) {
139-
shim.logger.trace('Not recording consume, no message descriptor.')
140-
return fn.apply(this, args)
141-
}
142-
143-
const name = _nameMessageSegment(shim, msgDesc, shim._metrics.CONSUME)
144-
145-
// Adds details needed by createSegment when used with a spec
146-
msgDesc.name = name
147-
msgDesc.recorder = genericRecorder
148-
msgDesc.parent = parent
149-
150-
const segment = shim.createSegment(msgDesc)
151-
const getParams = shim.agent.config.message_tracer.segment_parameters.enabled
152-
const resHandler = shim.isFunction(msgDesc.messageHandler) ? msgDesc.messageHandler : null
153-
154-
bindCallback({ shim, args, msgDesc, segment, getParams, resHandler })
155-
return bindConsumer.call(this, {
156-
shim,
157-
fn,
158-
fnName,
159-
args,
160-
msgDesc,
161-
segment,
162-
getParams,
163-
resHandler
164-
})
165-
}
50+
function createRecorder({ spec, shim, fn, fnName, args }) {
51+
const msgDesc = updateSpecFromArgs({ shim, fn, fnName, args, spec })
52+
// Adds details needed by createSegment when used with a spec
53+
msgDesc.name = _nameMessageSegment(shim, msgDesc, shim._metrics.CONSUME)
54+
msgDesc.recorder = genericRecorder
55+
return msgDesc
16656
}

lib/shim/message-shim/index.js

+2-11
Original file line numberDiff line numberDiff line change
@@ -287,17 +287,8 @@ function recordConsume(nodule, properties, spec) {
287287
properties = null
288288
}
289289

290-
// This is using wrap instead of record because the spec allows for a messageHandler
291-
// which is being used to handle the result of the callback or promise of the
292-
// original wrapped consume function.
293-
// TODO: https://github.com/newrelic/node-newrelic/issues/981
294-
return this.wrap(nodule, properties, function wrapConsume(shim, fn, fnName) {
295-
if (!shim.isFunction(fn)) {
296-
shim.logger.debug('Not wrapping %s (%s) as consume', fn, fnName)
297-
return fn
298-
}
299-
300-
return createRecorder({ shim, fn, fnName, spec })
290+
return this.record(nodule, properties, function wrapConsume(shim, fn, fnName, args) {
291+
return createRecorder({ spec, shim, fn, fnName, args })
301292
})
302293
}
303294

lib/shim/message-shim/subscribe-consume.js

+4-5
Original file line numberDiff line numberDiff line change
@@ -84,12 +84,12 @@ function makeWrapConsumer({ spec, queue, destinationName, destNameIsArg }) {
8484
spec.queue = queue
8585
}
8686

87-
return function wrapConsumer(shim, consumer, cName) {
87+
return function wrapConsumer(shim, consumer) {
8888
if (!shim.isFunction(consumer)) {
8989
return consumer
9090
}
9191

92-
const consumerWrapper = createConsumerWrapper({ shim, consumer, cName, spec })
92+
const consumerWrapper = createConsumerWrapper({ shim, consumer, spec })
9393
return shim.bindCreateTransaction(
9494
consumerWrapper,
9595
new specs.TransactionSpec({
@@ -108,10 +108,9 @@ function makeWrapConsumer({ spec, queue, destinationName, destNameIsArg }) {
108108
* @param {MessageShim} params.shim instance of shim
109109
* @param {specs.MessageSubscribeSpec} params.spec spec for function
110110
* @param {Function} params.consumer function for consuming message
111-
* @param {string} params.cName name of consumer function
112111
* @returns {Function} handler for the transaction being created
113112
*/
114-
function createConsumerWrapper({ shim, spec, consumer, cName }) {
113+
function createConsumerWrapper({ shim, spec, consumer }) {
115114
return function createConsumeTrans() {
116115
// If there is no transaction or we're in a pre-existing transaction,
117116
// then don't do anything. Note that the latter should never happen.
@@ -123,7 +122,7 @@ function createConsumerWrapper({ shim, spec, consumer, cName }) {
123122
return consumer.apply(this, args)
124123
}
125124

126-
const msgDesc = spec.messageHandler.call(this, shim, consumer, cName, args)
125+
const msgDesc = spec.messageHandler.call(this, shim, args)
127126

128127
// If message could not be handled, immediately kill this transaction.
129128
if (!msgDesc) {

0 commit comments

Comments
 (0)