Skip to content

Commit 8839e8a

Browse files
authored
Refactor queue management to a service class. (#94)
1 parent e10f95e commit 8839e8a

File tree

3 files changed

+65
-36
lines changed

3 files changed

+65
-36
lines changed

api/src/models/Job.ts

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,31 +6,31 @@ import Config from "./Config";
66
import { execSync } from "child_process";
77
import JobMetadata from "./JobMetadata";
88
import ImageFile from "./ImageFile";
9-
import { Queue } from "bullmq";
9+
import QueueManager from "../services/QueueManager";
1010

1111
class Job {
1212
dir: string;
1313
name: string;
1414
_metadata: JobMetadata = null;
1515
config: Config;
16+
queue: QueueManager;
1617

17-
constructor(dir: string, config: Config) {
18+
constructor(dir: string, config: Config, queue: QueueManager) {
1819
this.dir = dir;
1920
this.name = path.basename(dir);
2021
this.config = config;
22+
this.queue = queue;
2123
}
2224

2325
public static build(dir: string): Job {
24-
return new Job(dir, Config.getInstance());
26+
return new Job(dir, Config.getInstance(), QueueManager.getInstance());
2527
}
2628

2729
async ingest(): Promise<void> {
2830
const lockfile = this.metadata.ingestLockfile;
2931
if (this.metadata.published && !fileExists(lockfile)) {
3032
closeSync(openSync(lockfile, "w")); // touch
31-
const q = new Queue("vudl");
32-
await q.add("ingest", { dir: this.dir });
33-
q.close();
33+
this.queue.ingestJob(this.dir);
3434
}
3535
}
3636

@@ -48,9 +48,7 @@ class Job {
4848

4949
if (status.expected > status.processed && !fileExists(lockfile)) {
5050
closeSync(openSync(lockfile, "w")); // touch
51-
const q = new Queue("vudl");
52-
await q.add("derivatives", { dir: this.dir });
53-
q.close();
51+
this.queue.buildDerivatives(this.dir);
5452
}
5553
}
5654

api/src/routes/messenger.ts

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,13 @@ import SolrIndexer from "../services/SolrIndexer";
22
import bodyParser = require("body-parser");
33
import express = require("express");
44
import Config from "../models/Config";
5-
import { Queue } from "bullmq";
5+
import QueueManager from "../services/QueueManager";
66
import { requireToken } from "./auth";
77
import { pidSanitizer } from "./sanitize";
88
const messenger = express.Router();
99

1010
messenger.post("/pdfgenerator/:pid", pidSanitizer, requireToken, async function (req, res) {
11-
const q = new Queue("vudl");
12-
await q.add("generatepdf", { pid: req.params.pid });
13-
q.close();
11+
QueueManager.getInstance().generatePdf(req.params.pid);
1412
res.status(200).send("ok");
1513
});
1614

@@ -38,27 +36,6 @@ messenger.post("/solrindex/:pid", pidSanitizer, requireToken, async function (re
3836
}
3937
});
4038

41-
async function queueIndexOperation(pid, action): Promise<void> {
42-
// Fedora often fires many change events about the same object in rapid succession;
43-
// we don't want to index more times than we have to, so let's not re-queue anything
44-
// that is already awaiting indexing.
45-
const q = new Queue("vudl");
46-
const jobs = await q.getJobs("wait");
47-
let lastPidAction = null;
48-
for (let i = 0; i < jobs.length; i++) {
49-
if (jobs[i].name === "index" && jobs[i].data.pid === pid) {
50-
lastPidAction = jobs[i].data.action;
51-
break;
52-
}
53-
}
54-
if (action === lastPidAction) {
55-
console.log("Skipping queue; " + pid + " is already awaiting " + action + ".");
56-
} else {
57-
await q.add("index", { pid: pid, action: action });
58-
}
59-
q.close();
60-
}
61-
6239
messenger.post("/camel", bodyParser.json(), async function (req, res) {
6340
const fedoraBase = Config.getInstance().restBaseUrl;
6441
const idParts = req?.body?.id.replace(fedoraBase, "").split("/");
@@ -84,10 +61,10 @@ messenger.post("/camel", bodyParser.json(), async function (req, res) {
8461
switch (action) {
8562
case "Create":
8663
case "Update":
87-
await queueIndexOperation(pid, "index");
64+
await QueueManager.getInstance().performIndexOperation(pid, "index");
8865
break;
8966
case "Delete":
90-
await queueIndexOperation(pid, "delete");
67+
await QueueManager.getInstance().performIndexOperation(pid, "delete");
9168
break;
9269
default: {
9370
const msg = "Unexpected action: " + action + " (on PID: " + pid + ")";

api/src/services/QueueManager.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import { Queue } from "bullmq";
2+
3+
class QueueManager {
4+
private static instance: QueueManager;
5+
protected defaultQueueName = "vudl";
6+
7+
public static getInstance(): QueueManager {
8+
if (!QueueManager.instance) {
9+
QueueManager.instance = new QueueManager();
10+
}
11+
return QueueManager.instance;
12+
}
13+
14+
protected async addToQueue(jobName: string, data: Record<string, string>, queueName: string = null): Promise<void> {
15+
const q = new Queue(queueName ?? this.defaultQueueName);
16+
await q.add(jobName, data);
17+
q.close();
18+
}
19+
20+
public async buildDerivatives(dir: string): Promise<void> {
21+
return await this.addToQueue("derivatives", { dir });
22+
}
23+
24+
public async generatePdf(pid: string): Promise<void> {
25+
return await this.addToQueue("generatepdf", { pid });
26+
}
27+
28+
public async ingestJob(dir: string): Promise<void> {
29+
return await this.addToQueue("ingest", { dir });
30+
}
31+
32+
public async performIndexOperation(pid: string, action: string): Promise<void> {
33+
// Fedora often fires many change events about the same object in rapid succession;
34+
// we don't want to index more times than we have to, so let's not re-queue anything
35+
// that is already awaiting indexing.
36+
const q = new Queue(this.defaultQueueName);
37+
const jobs = await q.getJobs("wait");
38+
let lastPidAction = null;
39+
for (let i = 0; i < jobs.length; i++) {
40+
if (jobs[i].name === "index" && jobs[i].data.pid === pid) {
41+
lastPidAction = jobs[i].data.action;
42+
break;
43+
}
44+
}
45+
if (action === lastPidAction) {
46+
console.log("Skipping queue; " + pid + " is already awaiting " + action + ".");
47+
} else {
48+
await q.add("index", { pid, action });
49+
}
50+
q.close();
51+
}
52+
}
53+
54+
export default QueueManager;

0 commit comments

Comments
 (0)