You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I'm implementing a job flow using BullMQ (latest version) with NestJS, and I've encountered an issue with error handling in flow jobs.
The setup
I have a parent flow job and multiple child jobs. The flow is set up like this:
constchildren=buildingImports.map((jobData)=>({name: buildingImportQueueName,queueName: buildingImportQueueName,data: {data: jobData,
title,},opts: {failParentOnFailure: false,// I've tried both true and false},}));constflow=awaitflowProducer.add({name: `${FLOW_JOB_PREFIX}-${uuidv4()}`,queueName: buildingImportFlowQueueName,// Separate queue for flow jobsdata: {
title,
description,
targetOrgId,},
children,opts: {jobId: asyncJob.id.toString(),
...jobCleanupConfig,},});
I've created separate processors for the flow job and child jobs:
@Processor(buildingImportFlowQueueName)exportclassBuildingImportFlowProcessorextendsWorkerProcessorBase<BuildingImportFlowJobData>{// Process method works fineasyncprocess({ id, data }: Job<AsyncJobData<BuildingImportFlowJobData>>){logger.info('Processing parent flow job',{flowId: id,flowData: data});return;}// This works when all children complete successfully
@OnWorkerEvent('completed')asynconCompleted(job: Job<AsyncJobData<BuildingImportFlowJobData>>){// This handler works correctly}// This is never called when child jobs fail
@OnWorkerEvent('failed')asynconFailed(job: Job<AsyncJobData<BuildingImportFlowJobData>>,error: Error){logger.error('Building import flow failed event handler triggered',{flowId: job.id||'unknown',error: error.message,});// Additional handling...}}
The problem
When all child jobs complete successfully, the parent job's process method is called and the completed event handler works as expected.
However, when any child job fails, neither the parent job's process method nor its failed event handler is called. There's no logging or indication that the parent job is aware of the child failure.
What I've tried
Using separate queues for flow jobs and child jobs
Setting failParentOnFailure: true and failParentOnFailure: false
Adding extensive logging in both child and parent job handlers
Added event handlers in the child processor to confirm child failures are happening
Checked that child jobs have correct parent keys
According to the BullMQ documentation on Flows, parent jobs should be notified when children fail, but this isn't happening in my case.
Question
How can I ensure that when a child job fails, the parent flow job's failed event handler is triggered? Is there a specific configuration I'm missing for failure propagation in BullMQ flows?
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
Uh oh!
There was an error while loading. Please reload this page.
-
I'm implementing a job flow using BullMQ (latest version) with NestJS, and I've encountered an issue with error handling in flow jobs.
The setup
I have a parent flow job and multiple child jobs. The flow is set up like this:
I've created separate processors for the flow job and child jobs:
The problem
When all child jobs complete successfully, the parent job's
process
method is called and thecompleted
event handler works as expected.However, when any child job fails, neither the parent job's
process
method nor itsfailed
event handler is called. There's no logging or indication that the parent job is aware of the child failure.What I've tried
failParentOnFailure: true
andfailParentOnFailure: false
According to the BullMQ documentation on Flows, parent jobs should be notified when children fail, but this isn't happening in my case.
Question
How can I ensure that when a child job fails, the parent flow job's
failed
event handler is triggered? Is there a specific configuration I'm missing for failure propagation in BullMQ flows?Beta Was this translation helpful? Give feedback.
All reactions