Skip to content

Commit 82fdae3

Browse files
committed
feat: implementing addAtomic
Signed-off-by: Fawzi Abdulfattah <[email protected]>
1 parent 8e694a8 commit 82fdae3

File tree

5 files changed

+36
-18
lines changed

5 files changed

+36
-18
lines changed

.gitignore

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@
22
node_modules
33

44
.vscode
5-
package-lock.json
5+
package-lock.json
6+
testing.ts

core/connectors/ioredis.connector.ts

+12-10
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
/* eslint-disable @typescript-eslint/no-unused-vars */
22
import { IClientConnector, ProducerConfigs } from "@core/types";
3+
import { GetPrefixedQueue } from "@core/helpers";
34
import { Redis } from "ioredis";
45

56
export class RedisIOClient implements IClientConnector<Redis> {
@@ -32,28 +33,29 @@ export class RedisIOClient implements IClientConnector<Redis> {
3233
return this.client;
3334
}
3435

35-
async add(queue: string, data: unknown): Promise<string> {
36+
async add(queue: string, data: string): Promise<string> {
3637
if (this.client.status === "wait") {
3738
await this.client.connect();
3839
}
39-
// it will never return null given NOMKSTREAM is not set.
40-
return (await this.client.xadd(
41-
queue,
42-
"*",
43-
"data",
44-
JSON.stringify(data)
45-
)) as unknown as string;
40+
const clientResponse = await this.client.xadd(queue, "*", "data", data);
41+
return clientResponse as string;
4642
}
4743

4844
async addAtomic(
4945
queue: string,
5046
additionalQueues: string[],
51-
data: unknown
47+
data: string
5248
): Promise<string[]> {
5349
if (this.client.status === "wait") {
5450
await this.client.connect();
5551
}
56-
throw new Error("Method not implemented.");
52+
const multi = this.client.multi();
53+
multi.xadd(queue, "*", "data", data);
54+
additionalQueues.forEach((additionalQueueName: string) => {
55+
multi.xadd(GetPrefixedQueue(additionalQueueName), "*", "data", data);
56+
});
57+
const clientMultiResponse = await multi.exec();
58+
return clientMultiResponse?.map((res) => res[1]) as string[];
5759
}
5860

5961
async bulk(queue: string, data: unknown[]): Promise<string[]> {

core/helpers/get.prefixed.queue.ts

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
import { CONSTANTS } from "@core/constants";
2+
3+
export function GetPrefixedQueue(queue: string): string {
4+
return CONSTANTS.QUEUE_PREFIX + queue;
5+
}

core/helpers/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export * from "./get.prefixed.queue"

core/producer/producer.abstract.ts

+16-7
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { CONSTANTS } from "@core/constants";
1+
import { GetPrefixedQueue } from "@core/helpers";
22
import { IClientConnector } from "@core/types";
33
/**
44
* AbstractProducer is an abstract class that provides basic functionalities for adding data to queues.
@@ -12,7 +12,7 @@ export default abstract class AbstractProducer<K> {
1212
* @param client - An instance of IClientConnector to interact with the queue.
1313
*/
1414
constructor(private name: string, private client: IClientConnector<K>) {
15-
this.queueName = CONSTANTS.QUEUE_PREFIX + this.name;
15+
this.queueName = GetPrefixedQueue(this.name);
1616
process.once("SIGINT", this.handleShutdown.bind(this));
1717
process.once("SIGTERM", this.handleShutdown.bind(this));
1818
}
@@ -21,14 +21,23 @@ export default abstract class AbstractProducer<K> {
2121
* Adds data to the main queue and optionally to additional queues atomically.
2222
* @param data - The data to be added to the queue.
2323
* @param additionalQueues - Optional. Additional queues to which the data should be added atomically.
24-
* @returns A promise that resolves when the data has been added.
24+
* @returns A promise that resolves to the job ID(s) when the data has been added.
25+
* If data is added to a single queue, it returns a string representing the job ID.
26+
* If data is added to multiple queues, it returns an array of strings representing the jobs IDs.
2527
*/
26-
public async add(data: unknown, additionalQueues?: string[]) {
28+
public async add(
29+
data: unknown,
30+
additionalQueues?: string[]
31+
): Promise<string | string[]> {
32+
const stringifiedData = JSON.stringify(data);
2733
if (additionalQueues && additionalQueues.length) {
28-
await this.client.addAtomic(this.queueName, additionalQueues, data);
29-
return;
34+
return await this.client.addAtomic(
35+
this.queueName,
36+
additionalQueues,
37+
stringifiedData
38+
);
3039
}
31-
await this.client.add(this.queueName, data);
40+
return await this.client.add(this.queueName, stringifiedData);
3241
}
3342

3443
/**

0 commit comments

Comments
 (0)