Skip to content

Commit 61dbbf9

Browse files
authored
feat: Added support for response streaming Lambda functions (#2981)
1 parent 79fb8e9 commit 61dbbf9

File tree

3 files changed

+547
-62
lines changed

3 files changed

+547
-62
lines changed

lib/serverless/aws-lambda.js

+119-37
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ const EVENT_SOURCE_TYPE_KEY = `${EVENT_SOURCE_PREFIX}.eventType`
2424
const NAMES = require('../metrics/names')
2525

2626
const EVENT_SOURCE_INFO = require('./event-sources')
27+
const HANDLER_STREAMING = Symbol.for('aws.lambda.runtime.handler.streaming')
2728

2829
// A function with no references used to stub out closures
2930
function cleanClosure() {}
@@ -35,7 +36,7 @@ let transactionEnders = []
3536
// the invocation transaction.
3637
let uncaughtException = null
3738

38-
// Tracking the first time patchLambdaHandler is called for one off functionality
39+
// Tracking the first time patchLambdaHandler is called for one-off functionality
3940
let patchCalled = false
4041
let coldStartRecorded = false
4142

@@ -105,6 +106,45 @@ class AwsLambda {
105106
})
106107
}
107108

109+
/**
110+
* Response-streaming handlers are identified by symbol properties on the function.
111+
* We propagate any symbols if they're present, so that the handler keeps its signatue for any AWS features that rely on symbols
112+
* @param handler
113+
* @param nrHandler
114+
*/
115+
propagateSymbols(handler, nrHandler) {
116+
for (const symbol of Object.getOwnPropertySymbols(handler)) {
117+
logger.trace(`Setting symbol ${symbol.toString()} on handler`)
118+
nrHandler[symbol] = handler[symbol]
119+
}
120+
}
121+
122+
createSegment({ event, context, transaction, recorder }) {
123+
const shim = this.shim
124+
const functionName = context.functionName
125+
const group = NAMES.FUNCTION.PREFIX
126+
const transactionName = group + functionName
127+
128+
const activeSegment = shim.tracer.getSegment()
129+
130+
transaction.setPartialName(transactionName)
131+
const txnEnder = endTransaction.bind(null, transaction, transactionEnders.length)
132+
133+
transactionEnders.push(txnEnder)
134+
const segment = shim.createSegment(functionName, recorder, activeSegment)
135+
transaction.baseSegment = segment
136+
const awsAttributes = this._getAwsAgentAttributes(event, context)
137+
transaction.trace.attributes.addAttributes(ATTR_DEST.TRANS_COMMON, awsAttributes)
138+
139+
shim.agent.setLambdaArn(context.invokedFunctionArn)
140+
141+
shim.agent.setLambdaFunctionVersion(context.functionVersion)
142+
segment.addSpanAttributes(awsAttributes)
143+
144+
segment.start()
145+
return { segment, txnEnder }
146+
}
147+
108148
patchLambdaHandler(handler) {
109149
const awsLambda = this
110150
const shim = this.shim
@@ -114,6 +154,11 @@ class AwsLambda {
114154
return handler
115155
}
116156

157+
const isStreamHandler = handler[HANDLER_STREAMING] === 'response'
158+
if (isStreamHandler) {
159+
this.agent.recordSupportability('Nodejs/Serverless/Lambda/ResponseStreaming')
160+
}
161+
117162
if (!patchCalled) {
118163
// Only wrap emit on process the first time patch is called.
119164
patchCalled = true
@@ -122,52 +167,87 @@ class AwsLambda {
122167
this.wrapFatal()
123168
}
124169

125-
return shim.bindCreateTransaction(wrappedHandler, new specs.TransactionSpec({ type: shim.BG }))
170+
const wrapper = isStreamHandler ? wrappedStreamHandler : wrappedHandler
171+
const nrHandler = shim.bindCreateTransaction(wrapper, new specs.TransactionSpec({ type: shim.BG }))
172+
awsLambda.propagateSymbols(handler, nrHandler)
173+
174+
return nrHandler
175+
176+
/**
177+
* Wraps a response streaming lambda handler.
178+
*
179+
* Creates and applies segment based on function name, assigns attributes to transaction trace,
180+
* listen when stream errors(log error), ends(end transaction)
181+
*
182+
* **Note**: AWS doesn't support response streaming with API gateway invoked lambdas.
183+
* This means we do not handle that as it would require intercepting the stream to parse
184+
* the response code and headers.
185+
*/
186+
function wrappedStreamHandler() {
187+
const transaction = shim.tracer.getTransaction()
188+
if (!transaction) {
189+
logger.trace('No active transaction, not wrapping streaming handler')
190+
return handler.apply(this, arguments)
191+
}
126192

127-
function wrappedHandler() {
128193
const args = shim.argsToArray.apply(shim, arguments)
129-
130194
const event = args[0]
131-
const context = args[1]
195+
const context = args[2]
196+
logger.trace('In stream handler, lambda function name', context?.functionName)
197+
const { segment, txnEnder } = awsLambda.createSegment({ context, event, transaction, recorder: recordBackground })
198+
args[1] = awsLambda.wrapStreamAndCaptureError(
199+
transaction,
200+
txnEnder,
201+
args[1]
202+
)
132203

133-
const functionName = context.functionName
134-
const group = NAMES.FUNCTION.PREFIX
135-
const transactionName = group + functionName
204+
let res
205+
try {
206+
res = shim.applySegment(handler, segment, false, this, args)
207+
} catch (err) {
208+
uncaughtException = err
209+
txnEnder()
210+
throw err
211+
}
212+
213+
return res
214+
}
136215

216+
/**
217+
* Wraps a non response streaming lambda handler.
218+
*
219+
* Creates and applies segment based on function name, assigns attributes to transaction trace,
220+
* adds handlers if api gateway to wrap request/response
221+
* wraps the callback(if present), wraps the context `done`, `succeed`, `fail methods`, intercepts promise
222+
* and properly ends transaction
223+
*/
224+
function wrappedHandler() {
137225
const transaction = shim.tracer.getTransaction()
138226
if (!transaction) {
227+
logger.trace('No active transaction, not wrapping handler')
139228
return handler.apply(this, arguments)
140229
}
141-
const activeSegment = shim.tracer.getSegment()
142-
143-
transaction.setPartialName(transactionName)
230+
const args = shim.argsToArray.apply(shim, arguments)
144231

232+
const event = args[0]
233+
const context = args[1]
234+
logger.trace('Lambda function name', context?.functionName)
145235
const isApiGatewayLambdaProxy = apiGateway.isLambdaProxyEvent(event)
236+
logger.trace('Is this Lambda event an API Gateway or ALB web proxy?', isApiGatewayLambdaProxy)
237+
logger.trace('Lambda event keys', Object.keys(event))
146238
const segmentRecorder = isApiGatewayLambdaProxy ? recordWeb : recordBackground
147-
const segment = shim.createSegment(functionName, segmentRecorder, activeSegment)
148-
transaction.baseSegment = segment
239+
const { segment, txnEnder } = awsLambda.createSegment({ context, event, transaction, recorder: segmentRecorder })
240+
149241
// resultProcessor is used to execute additional logic based on the
150242
// payload supplied to the callback.
151243
let resultProcessor
152244

153-
logger.trace('Is this Lambda event an API Gateway or ALB web proxy?', isApiGatewayLambdaProxy)
154-
logger.trace('Lambda event keys', Object.keys(event))
155-
156245
if (isApiGatewayLambdaProxy) {
157246
const webRequest = new apiGateway.LambdaProxyWebRequest(event)
158247
setWebRequest(shim, transaction, webRequest)
159248
resultProcessor = getApiGatewayLambdaProxyResultProcessor(transaction)
160249
}
161-
162250
const cbIndex = args.length - 1
163-
164-
// Add transaction ending closure to the list of functions to be called on
165-
// beforeExit (i.e. in the case that context.{done,fail,succeed} or callback
166-
// were not called).
167-
const txnEnder = endTransaction.bind(null, transaction, transactionEnders.length)
168-
169-
transactionEnders.push(txnEnder)
170-
171251
args[cbIndex] = awsLambda.wrapCallbackAndCaptureError(
172252
transaction,
173253
txnEnder,
@@ -186,16 +266,6 @@ class AwsLambda {
186266
}
187267
})
188268

189-
const awsAttributes = awsLambda._getAwsAgentAttributes(event, context)
190-
transaction.trace.attributes.addAttributes(ATTR_DEST.TRANS_COMMON, awsAttributes)
191-
192-
shim.agent.setLambdaArn(context.invokedFunctionArn)
193-
194-
shim.agent.setLambdaFunctionVersion(context.functionVersion)
195-
segment.addSpanAttributes(awsAttributes)
196-
197-
segment.start()
198-
199269
let res
200270
try {
201271
res = shim.applySegment(handler, segment, false, this, args)
@@ -251,6 +321,18 @@ class AwsLambda {
251321
}
252322
}
253323

324+
wrapStreamAndCaptureError(transaction, txnEnder, stream) {
325+
const shim = this.shim
326+
stream.on('error', (error) => {
327+
shim.agent.errors.add(transaction, error)
328+
})
329+
330+
stream.on('close', () => {
331+
txnEnder()
332+
})
333+
return stream
334+
}
335+
254336
_getAwsAgentAttributes(event, context) {
255337
const attributes = {
256338
'aws.lambda.arn': context.invokedFunctionArn,
@@ -372,7 +454,7 @@ function lowercaseObjectKeys(original) {
372454
}
373455

374456
function endTransaction(transaction, enderIndex) {
375-
if (transactionEnders[enderIndex] === cleanClosure) {
457+
if (transactionEnders.length === 0 || transactionEnders[enderIndex] === cleanClosure) {
376458
// In the case where we have already been called, we return early. There may be a
377459
// case where this is called more than once, given the lambda is left in a dirty
378460
// state after thread suspension (e.g. timeouts)
@@ -411,7 +493,7 @@ function setWebResponse(transaction, response) {
411493

412494
// We are adding http.statusCode to base segment as
413495
// we found in testing async invoked lambdas, the
414-
// active segement is not available at this point.
496+
// active segment is not available at this point.
415497
const segment = transaction.baseSegment
416498

417499
segment.addSpanAttribute('http.statusCode', responseCode)

0 commit comments

Comments
 (0)