Replies: 3 comments 1 reply
-
Is it possible a feature like this the one you need? https://docs.bullmq.io/bullmq-pro/batches |
Beta Was this translation helpful? Give feedback.
-
Sort of. Except I want to handle timeouts differently. I want to send users notifications for editing a document. But I don't want to send fifty different notifications for fifty different edits on the same document that happened within the same half hour, right? I'd like to send those as a single batch. Hey, within the last half hour, someone did all of these changes. But if someone also just made a tiny little change and didn't touch the document again for, say, five minutes, I don't want to have to wait 25 more minutes to send the notification. So as long as I'm accumulating edits, I'd like to take a little while to send the notifications. But it was just one edit and then silence, I'd like to wait less. I just realised that |
Beta Was this translation helpful? Give feedback.
-
What about this idea? One criticism I've heard is that this does too many Redis roundtrips. Does it have races? interface DocEditJobData {
documentId: string;
userId: string;
changes: string[];
}
async function addDocEditJob(data: DocEditJobData) {
const dedupId = `${data.documentId}:${data.userId}`;
const jobId = uuidv4();
console.log(`adding job with id ${jobId}`);
let queuedJob = await docEditsQueue.add('doc-edits', data, {
jobId,
delay: firstDelay,
deduplication: {id: dedupId},
});
if (queuedJob.id != jobId) {
// Check if the job is still delayed, as we may have lost a race
if (await queuedJob.isDelayed()) {
console.log(`got a duplicate job: ${jobId}`)
queuedJob = await docEditsQueue.getJob(queuedJob.id!);
queuedJob.data.changes.push(data.changes[0]);
queuedJob.updateData(queuedJob.data);
const elapsed = Date.now() - queuedJob.timestamp;
const upperBound = Math.max(maxDelay - elapsed, 0);
queuedJob.changeDelay(Math.min(upperBound, firstDelay));
} else {
console.log('lost a race, waiting to resubmit');
// The job is already running. Too late to change it. Wait for
// it to finish and then resubmit
await queuedJob.waitUntilFinished(docEditsQueueEvents);
await addDocEditJob(data);
}
}
} |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
When I try to add a job that has the same deduplication id, the deduplicated job isn't added to the queue. In the
queue.on('deduplicated', ...)
event handler, how can I get the details of the job that was deduplicated? All that's passed into the callback is the deduplicated job ID, but I want the details of the job. I can't do something likequeue.getJob(deduplicatedJobId)
because the job was never added to the queue.I'd like to modify the existing deduplicated job and batch together some of the details of the attempted duplicate job, so that we can run just one job with some updated details.
It's similar to this question. Should I not be using deduplication at all if I want to batch together several similar jobs as a single job?
Beta Was this translation helpful? Give feedback.
All reactions