9
9
import type express from 'express' ;
10
10
import { Container } from 'typedi' ;
11
11
import get from 'lodash/get' ;
12
- import { pipeline } from 'stream/promises' ;
12
+ import { finished } from 'stream/promises' ;
13
13
import formidable from 'formidable' ;
14
14
15
15
import { BinaryDataService , NodeExecuteFunctions } from 'n8n-core' ;
@@ -30,6 +30,7 @@ import type {
30
30
IWebhookResponseData ,
31
31
IWorkflowDataProxyAdditionalKeys ,
32
32
IWorkflowExecuteAdditionalData ,
33
+ WebhookResponseMode ,
33
34
Workflow ,
34
35
WorkflowExecuteMode ,
35
36
} from 'n8n-workflow' ;
@@ -272,7 +273,7 @@ export async function executeWebhook(
272
273
additionalKeys ,
273
274
undefined ,
274
275
'onReceived' ,
275
- ) ;
276
+ ) as WebhookResponseMode ;
276
277
const responseCode = workflow . expression . getSimpleParameterValue (
277
278
workflowStartNode ,
278
279
webhookData . webhookDescription . responseCode as string ,
@@ -291,7 +292,7 @@ export async function executeWebhook(
291
292
'firstEntryJson' ,
292
293
) ;
293
294
294
- if ( ! [ 'onReceived' , 'lastNode' , 'responseNode' ] . includes ( responseMode as string ) ) {
295
+ if ( ! [ 'onReceived' , 'lastNode' , 'responseNode' ] . includes ( responseMode ) ) {
295
296
// If the mode is not known we error. Is probably best like that instead of using
296
297
// the default that people know as early as possible (probably already testing phase)
297
298
// that something does not resolve properly.
@@ -562,7 +563,8 @@ export async function executeWebhook(
562
563
if ( binaryData ?. id ) {
563
564
res . header ( response . headers ) ;
564
565
const stream = await Container . get ( BinaryDataService ) . getAsStream ( binaryData . id ) ;
565
- await pipeline ( stream , res ) ;
566
+ stream . pipe ( res , { end : false } ) ;
567
+ await finished ( stream ) ;
566
568
responseCallback ( null , { noWebhookResponse : true } ) ;
567
569
} else if ( Buffer . isBuffer ( response . body ) ) {
568
570
res . header ( response . headers ) ;
@@ -595,6 +597,7 @@ export async function executeWebhook(
595
597
} ) ;
596
598
}
597
599
600
+ process . nextTick ( ( ) => res . end ( ) ) ;
598
601
didSendResponse = true ;
599
602
} )
600
603
. catch ( async ( error ) => {
@@ -659,17 +662,9 @@ export async function executeWebhook(
659
662
return data ;
660
663
}
661
664
662
- if ( responseMode === 'responseNode' ) {
663
- if ( ! didSendResponse ) {
664
- // Return an error if no Webhook-Response node did send any data
665
- responseCallback ( null , {
666
- data : {
667
- message : 'Workflow executed successfully' ,
668
- } ,
669
- responseCode,
670
- } ) ;
671
- didSendResponse = true ;
672
- }
665
+ // in `responseNode` mode `responseCallback` is called by `responsePromise`
666
+ if ( responseMode === 'responseNode' && responsePromise ) {
667
+ await Promise . allSettled ( [ responsePromise . promise ( ) ] ) ;
673
668
return undefined ;
674
669
}
675
670
@@ -795,14 +790,16 @@ export async function executeWebhook(
795
790
res . setHeader ( 'Content-Type' , binaryData . mimeType ) ;
796
791
if ( binaryData . id ) {
797
792
const stream = await Container . get ( BinaryDataService ) . getAsStream ( binaryData . id ) ;
798
- await pipeline ( stream , res ) ;
793
+ stream . pipe ( res , { end : false } ) ;
794
+ await finished ( stream ) ;
799
795
} else {
800
- res . end ( Buffer . from ( binaryData . data , BINARY_ENCODING ) ) ;
796
+ res . write ( Buffer . from ( binaryData . data , BINARY_ENCODING ) ) ;
801
797
}
802
798
803
799
responseCallback ( null , {
804
800
noWebhookResponse : true ,
805
801
} ) ;
802
+ process . nextTick ( ( ) => res . end ( ) ) ;
806
803
}
807
804
} else if ( responseData === 'noData' ) {
808
805
// Return without data
0 commit comments