Skip to content

Commit 335b2e2

Browse files
committed
add remove on age and count to bullmq
1 parent a075461 commit 335b2e2

File tree

6 files changed

+39
-8
lines changed

6 files changed

+39
-8
lines changed

docker/.env.example

+2
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ BLOB_STORAGE_PATH=/root/.flowise/storage
8686
# QUEUE_NAME=flowise-queue
8787
# QUEUE_REDIS_EVENT_STREAM_MAX_LEN=100000
8888
# WORKER_CONCURRENCY=100000
89+
# REMOVE_ON_AGE=86400
90+
# REMOVE_ON_COUNT=10000
8991
# REDIS_URL=
9092
# REDIS_HOST=localhost
9193
# REDIS_PORT=6379

docker/docker-compose.yml

+7-5
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ services:
3838
- WORKER_CONCURRENCY=${WORKER_CONCURRENCY}
3939
- QUEUE_NAME=${QUEUE_NAME}
4040
- QUEUE_REDIS_EVENT_STREAM_MAX_LEN=${QUEUE_REDIS_EVENT_STREAM_MAX_LEN}
41+
- REMOVE_ON_AGE=${REMOVE_ON_AGE}
42+
- REMOVE_ON_COUNT=${REMOVE_ON_COUNT}
4143
- REDIS_URL=${REDIS_URL}
4244
- REDIS_HOST=${REDIS_HOST}
4345
- REDIS_PORT=${REDIS_PORT}
@@ -50,11 +52,11 @@ services:
5052
ports:
5153
- '${PORT}:${PORT}'
5254
healthcheck:
53-
test: ["CMD", "curl", "-f", "http://localhost:${PORT}/api/v1/ping"]
54-
interval: 10s
55-
timeout: 5s
56-
retries: 5
57-
start_period: 30s
55+
test: ['CMD', 'curl', '-f', 'http://localhost:${PORT}/api/v1/ping']
56+
interval: 10s
57+
timeout: 5s
58+
retries: 5
59+
start_period: 30s
5860
volumes:
5961
- ~/.flowise:/root/.flowise
6062
entrypoint: /bin/sh -c "sleep 3; flowise start"

docker/worker/docker-compose.yml

+2
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ services:
3838
- WORKER_CONCURRENCY=${WORKER_CONCURRENCY}
3939
- QUEUE_NAME=${QUEUE_NAME}
4040
- QUEUE_REDIS_EVENT_STREAM_MAX_LEN=${QUEUE_REDIS_EVENT_STREAM_MAX_LEN}
41+
- REMOVE_ON_AGE=${REMOVE_ON_AGE}
42+
- REMOVE_ON_COUNT=${REMOVE_ON_COUNT}
4143
- REDIS_URL=${REDIS_URL}
4244
- REDIS_HOST=${REDIS_HOST}
4345
- REDIS_PORT=${REDIS_PORT}

packages/server/.env.example

+2
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ PORT=3000
8383
# QUEUE_NAME=flowise-queue
8484
# QUEUE_REDIS_EVENT_STREAM_MAX_LEN=100000
8585
# WORKER_CONCURRENCY=100000
86+
# REMOVE_ON_AGE=86400
87+
# REMOVE_ON_COUNT=10000
8688
# REDIS_URL=
8789
# REDIS_HOST=localhost
8890
# REDIS_PORT=6379

packages/server/src/commands/base.ts

+5-1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ export abstract class BaseCommand extends Command {
6161
WORKER_CONCURRENCY: Flags.string(),
6262
QUEUE_NAME: Flags.string(),
6363
QUEUE_REDIS_EVENT_STREAM_MAX_LEN: Flags.string(),
64+
REMOVE_ON_AGE: Flags.string(),
65+
REMOVE_ON_COUNT: Flags.string(),
6466
REDIS_URL: Flags.string(),
6567
REDIS_HOST: Flags.string(),
6668
REDIS_PORT: Flags.string(),
@@ -196,6 +198,8 @@ export abstract class BaseCommand extends Command {
196198
if (flags.REDIS_CA) process.env.REDIS_CA = flags.REDIS_CA
197199
if (flags.WORKER_CONCURRENCY) process.env.WORKER_CONCURRENCY = flags.WORKER_CONCURRENCY
198200
if (flags.QUEUE_NAME) process.env.QUEUE_NAME = flags.QUEUE_NAME
199-
if (flags.QUEUE_REDIS_EVENT_STREAM_MAX_LEN) process.env.QUEUE_REDIS_EVENT_STREAM_MAX_LEN = flags.QUEUE_REDIS_EVENT_STREAM
201+
if (flags.QUEUE_REDIS_EVENT_STREAM_MAX_LEN) process.env.QUEUE_REDIS_EVENT_STREAM_MAX_LEN = flags.QUEUE_REDIS_EVENT_STREAM_MAX_LEN
202+
if (flags.REMOVE_ON_AGE) process.env.REMOVE_ON_AGE = flags.REMOVE_ON_AGE
203+
if (flags.REMOVE_ON_COUNT) process.env.REMOVE_ON_COUNT = flags.REMOVE_ON_COUNT
200204
}
201205
}

packages/server/src/queue/BaseQueue.ts

+21-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
1-
import { Queue, Worker, Job, QueueEvents, RedisOptions } from 'bullmq'
1+
import { Queue, Worker, Job, QueueEvents, RedisOptions, KeepJobs } from 'bullmq'
22
import { v4 as uuidv4 } from 'uuid'
33
import logger from '../utils/logger'
44

55
const QUEUE_REDIS_EVENT_STREAM_MAX_LEN = process.env.QUEUE_REDIS_EVENT_STREAM_MAX_LEN
66
? parseInt(process.env.QUEUE_REDIS_EVENT_STREAM_MAX_LEN)
77
: 10000
88
const WORKER_CONCURRENCY = process.env.WORKER_CONCURRENCY ? parseInt(process.env.WORKER_CONCURRENCY) : 100000
9+
const REMOVE_ON_AGE = process.env.REMOVE_ON_AGE ? parseInt(process.env.REMOVE_ON_AGE) : -1
10+
const REMOVE_ON_COUNT = process.env.REMOVE_ON_COUNT ? parseInt(process.env.REMOVE_ON_COUNT) : -1
911

1012
export abstract class BaseQueue {
1113
protected queue: Queue
@@ -34,7 +36,24 @@ export abstract class BaseQueue {
3436

3537
public async addJob(jobData: any): Promise<Job> {
3638
const jobId = jobData.id || uuidv4()
37-
return await this.queue.add(jobId, jobData, { removeOnFail: true })
39+
40+
let removeOnFail: number | boolean | KeepJobs | undefined = true
41+
let removeOnComplete: number | boolean | KeepJobs | undefined = undefined
42+
43+
// Only override removal options if age or count is specified
44+
if (REMOVE_ON_AGE !== -1 || REMOVE_ON_COUNT !== -1) {
45+
const keepJobObj: KeepJobs = {}
46+
if (REMOVE_ON_AGE !== -1) {
47+
keepJobObj.age = REMOVE_ON_AGE
48+
}
49+
if (REMOVE_ON_COUNT !== -1) {
50+
keepJobObj.count = REMOVE_ON_COUNT
51+
}
52+
removeOnFail = keepJobObj
53+
removeOnComplete = keepJobObj
54+
}
55+
56+
return await this.queue.add(jobId, jobData, { removeOnFail, removeOnComplete })
3857
}
3958

4059
public createWorker(concurrency: number = WORKER_CONCURRENCY): Worker {

0 commit comments

Comments
 (0)