Skip to content

Commit 4367761

Browse files
committed
feat: adding abstract producer with clinet connector interface
Signed-off-by: Fawzi Abdulfattah <[email protected]>
1 parent feadee5 commit 4367761

File tree

6 files changed

+221
-5
lines changed

6 files changed

+221
-5
lines changed

core/constants/index.ts

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
/**
2+
* A collection of constants used throughout the project.
3+
*/
4+
export const CONSTANTS = {
5+
/**
6+
* The prefix used for naming queues.
7+
* @type {string}
8+
*/
9+
QUEUE_PREFIX: "runmq:",
10+
};

core/producer/index.ts

+67
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import { IClientConnector } from "@core/types";
2+
3+
/**
4+
* AbstractProducer is an abstract class that provides basic functionalities for adding data to queues.
5+
* It ensures atomic operations if needed and handles graceful shutdown on process termination signals.
6+
*/
7+
export default abstract class AbstractProducer<K> {
8+
/**
9+
* Creates an instance of AbstractProducer.
10+
* @param name - The name of the queue.
11+
* @param client - An instance of IClientConnector to interact with the queue.
12+
*/
13+
constructor(
14+
private name: string,
15+
private client: IClientConnector<K>
16+
) {
17+
process.once("SIGINT", this.handleShutdown.bind(this));
18+
process.once("SIGTERM", this.handleShutdown.bind(this));
19+
}
20+
21+
/**
22+
* Adds data to the main queue and optionally to additional queues atomically.
23+
* @param data - The data to be added to the queue.
24+
* @param additionalQueues - Optional. Additional queues to which the data should be added atomically.
25+
* @returns A promise that resolves when the data has been added.
26+
*/
27+
public async add(data: any, additionalQueues?: string[]) {
28+
await this.client.create();
29+
if (additionalQueues && additionalQueues.length) {
30+
await this.client.addAtomic(this.name, additionalQueues, data);
31+
return;
32+
}
33+
await this.client.add(this.name, data);
34+
}
35+
36+
/**
37+
* Adds multiple data entries to the main queue and optionally to additional queues atomically.
38+
* @param data - An array of data entries to be added to the queue.
39+
* @param additionalQueues - Optional. Additional queues to which the data should be added atomically.
40+
* @returns A promise that resolves when the data has been added.
41+
*/
42+
public async bulk(data: any[], additionalQueues: string[] = []) {
43+
await this.client.create();
44+
if (additionalQueues.length) {
45+
await this.client.bulkAtomic(this.name, additionalQueues, data);
46+
return;
47+
}
48+
await this.client.bulk(this.name, data);
49+
}
50+
51+
/**
52+
* Disconnects the client from the queue.
53+
* @returns A promise that resolves when the client has been disconnected.
54+
*/
55+
public async disconnect() {
56+
await this.client.quit();
57+
}
58+
59+
/**
60+
* Handles graceful shutdown by disconnecting the client.
61+
* This method is bound to process termination signals (SIGINT, SIGTERM).
62+
* @private
63+
*/
64+
private async handleShutdown() {
65+
await this.disconnect();
66+
}
67+
}

core/types/client.connector.ts

+130
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/**
2+
* IClientConnector is an interface that defines the methods required for interacting with a queueing system.
3+
*/
4+
export interface IClientConnector<K> {
5+
/**
6+
* Initializes the connection to the queueing system.
7+
* @returns A promise that resolves to the initialized connection.
8+
*/
9+
create(): Promise<K>;
10+
11+
/**
12+
* Checks if a consumer group exists in the specified queue.
13+
* @param queue - The name of the queue.
14+
* @param group - The name of the consumer group.
15+
* @returns A promise that resolves when the existence check is complete.
16+
*/
17+
groupExists(queue: string, group: string): Promise<void>;
18+
19+
/**
20+
* Retrieves the length of the specified queue.
21+
* @param queue - The name of the queue.
22+
* @returns A promise that resolves to the length of the queue.
23+
*/
24+
getStreamLength(queue: string): Promise<number>;
25+
26+
/**
27+
* Reads messages from a consumer group in the specified queue.
28+
* @param queue - The name of the queue.
29+
* @param group - The name of the consumer group.
30+
* @param consumerId - The ID of the consumer.
31+
* @param count - The maximum number of messages to read.
32+
* @param block - The maximum amount of time to block if no messages are available.
33+
* @returns A promise that resolves to the read messages.
34+
*/
35+
readGroup(
36+
queue: string,
37+
group: string,
38+
consumerId: string,
39+
count: number,
40+
block: number
41+
): Promise<any>;
42+
43+
/**
44+
* Acknowledges a message in a consumer group as processed.
45+
* @param queue - The name of the queue.
46+
* @param group - The name of the consumer group.
47+
* @param messageId - The ID of the message to acknowledge.
48+
* @returns A promise that resolves when the message is acknowledged.
49+
*/
50+
acknowledgeMessage(
51+
queue: string,
52+
group: string,
53+
messageId: string
54+
): Promise<void>;
55+
56+
/**
57+
* Deletes a consumer from a consumer group in the specified queue.
58+
* @param consumersSetId - The ID of the set of consumers.
59+
* @param queue - The name of the queue.
60+
* @param group - The name of the consumer group.
61+
* @param consumerId - The ID of the consumer to delete.
62+
* @returns A promise that resolves when the consumer is deleted.
63+
*/
64+
deleteConsumer(
65+
consumersSetId: string,
66+
queue: string,
67+
group: string,
68+
consumerId: string
69+
): Promise<void>;
70+
71+
/**
72+
* Adds data to the specified queue.
73+
* @param queue - The name of the queue.
74+
* @param data - The data to add to the queue.
75+
* @returns A promise that resolves when the data is added.
76+
*/
77+
add(queue: string, data: any): Promise<void>;
78+
79+
/**
80+
* Adds data atomically to the specified queue and additional queues.
81+
* @param queue - The name of the main queue.
82+
* @param additionalQueues - Additional queues to add the data to atomically.
83+
* @param data - The data to add to the queues.
84+
* @returns A promise that resolves when the data is added.
85+
*/
86+
addAtomic(queue: string, additionalQueues: string[], data: any): Promise<void>;
87+
88+
/**
89+
* Adds multiple data entries to the specified queue.
90+
* @param queue - The name of the queue.
91+
* @param data - An array of data entries to add to the queue.
92+
* @returns A promise that resolves when the data is added.
93+
*/
94+
bulk(queue: string, data: any[]): Promise<void>;
95+
96+
/**
97+
* Adds multiple data entries atomically to the specified queue and additional queues.
98+
* @param queue - The name of the main queue.
99+
* @param additionalQueues - Additional queues to add the data to atomically.
100+
* @param data - An array of data entries to add to the queues.
101+
* @returns A promise that resolves when the data is added.
102+
*/
103+
bulkAtomic(queue: string, additionalQueues: string[], data: any[]): Promise<void>;
104+
105+
/**
106+
* Sets a heartbeat for the specified worker.
107+
* @param workerId - The ID of the worker.
108+
* @returns A promise that resolves when the heartbeat is set.
109+
*/
110+
setHeartbeat(workerId: string): Promise<void>;
111+
112+
/**
113+
* Retrieves all heartbeats from the queueing system.
114+
* @returns A promise that resolves to an object containing all heartbeats, indexed by worker ID.
115+
*/
116+
getAllHeartbeats(): Promise<{ [key: string]: string }>;
117+
118+
/**
119+
* Deletes the heartbeat for the specified worker.
120+
* @param workerId - The ID of the worker.
121+
* @returns A promise that resolves when the heartbeat is deleted.
122+
*/
123+
deleteHeartbeat(workerId: string): Promise<void>;
124+
125+
/**
126+
* Disconnects the client from the queueing system.
127+
* @returns A promise that resolves when the client is disconnected.
128+
*/
129+
quit(): Promise<void>;
130+
}

core/types/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export * from "./client.connector"

package.json

+8-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
"scripts": {
77
"test": "echo \"Error: no test specified\" && exit 1"
88
},
9+
"_moduleAliases": {
10+
"@": "dist"
11+
},
912
"repository": {
1013
"type": "git",
1114
"url": "git+https://github.com/runmq/queue.git"
@@ -15,5 +18,8 @@
1518
"bugs": {
1619
"url": "https://github.com/runmq/queue/issues"
1720
},
18-
"homepage": "https://github.com/runmq/queue#readme"
19-
}
21+
"homepage": "https://github.com/runmq/queue#readme",
22+
"devDependencies": {
23+
"@types/node": "^20.12.13"
24+
}
25+
}

tsconfig.json

+5-3
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,11 @@
2727
/* Modules */
2828
"module": "ES2020", /* Specify what module code is generated. */
2929
// "rootDir": "./", /* Specify the root folder within your source files. */
30-
// "moduleResolution": "node", /* Specify how TypeScript looks up a file from a given module specifier. */
31-
// "baseUrl": "./", /* Specify the base directory to resolve non-relative module names. */
32-
// "paths": {}, /* Specify a set of entries that re-map imports to additional lookup locations. */
30+
"moduleResolution": "node", /* Specify how TypeScript looks up a file from a given module specifier. */
31+
"baseUrl": "./", /* Specify the base directory to resolve non-relative module names. */
32+
"paths": {
33+
"@*": ["*"]
34+
}, /* Specify a set of entries that re-map imports to additional lookup locations. */
3335
// "rootDirs": [], /* Allow multiple folders to be treated as one when resolving modules. */
3436
// "typeRoots": [], /* Specify multiple folders that act like './node_modules/@types'. */
3537
// "types": [], /* Specify type package names to be included without being referenced in a source file. */

0 commit comments

Comments
 (0)