Skip to content

feat: base RPC, logger #14

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lib/common/get-uniqId.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import { randomUUID } from 'node:crypto';
export const getUniqId = (): string => {
return randomUUID();
};
3 changes: 3 additions & 0 deletions lib/common/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
export * from './meta-teg.discovery';
export * from './get-uniqId';
export * from './rmq-intercepter';
export * from './rmq-pipe';
29 changes: 29 additions & 0 deletions lib/common/logger.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { Logger, LoggerService } from '@nestjs/common';
import { blueBright, white, yellow } from 'chalk';

export class RQMColorLogger implements LoggerService {
logMessages: boolean;

constructor(logMessages: boolean) {
this.logMessages = logMessages ?? false;
}
log(message: any, context?: string): any {
Logger.log(message, context);
}
error(message: any, trace?: string, context?: string): any {
Logger.error(message, trace, context);
}
debug(message: any, context?: string): any {
if (!this.logMessages) {
return;
}
const msg = JSON.stringify(message);
const action = context.split(',')[0];
const topic = context.split(',')[1];
Logger.log(`${blueBright(action)} [${yellow(topic)}] ${white(msg)}`);
console.warn(`${blueBright(action)} [${yellow(topic)}] ${white(msg)}`);
}
warn(message: any, context?: string): any {
Logger.warn(message, context);
}
}
7 changes: 4 additions & 3 deletions lib/common/meta-teg.discovery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ import { Inject, Injectable } from '@nestjs/common';
import { ModulesContainer, Reflector } from '@nestjs/core';
import { MetadataScanner } from '@nestjs/core';
import { InstanceWrapper } from '@nestjs/core/injector/instance-wrapper';
import { MetaTegsMap } from '../interfaces';
import { TARGET_MODULE } from '../constants';
import { IMetaTegsMap } from '../interfaces';

@Injectable()
export class MetaTegsScannerService {
constructor(
private readonly metadataScanner: MetadataScanner,
private readonly reflector: Reflector,
private readonly modulesContainer: ModulesContainer,
@Inject('TARGET_MODULE') private readonly targetModuleName: string,
@Inject(TARGET_MODULE) private readonly targetModuleName: string,
) {}

public scan(metaTeg: string) {
Expand Down Expand Up @@ -44,7 +45,7 @@ export class MetaTegsScannerService {

private lookupMethods(
metaTeg: string,
rmqMessagesMap: MetaTegsMap,
rmqMessagesMap: IMetaTegsMap,
instance: object,
prototype: object,
methodName: string,
Expand Down
14 changes: 14 additions & 0 deletions lib/common/rmq-intercepter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { Message } from 'amqplib';
import { LoggerService } from '@nestjs/common';

export class RMQIntercepterClass {
protected logger: LoggerService;

constructor(logger: LoggerService = console) {
this.logger = logger;
}

async intercept(res: any, msg: Message, error?: Error): Promise<any> {
return res;
}
}
14 changes: 14 additions & 0 deletions lib/common/rmq-pipe.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { Message } from 'amqplib';
import { LoggerService } from '@nestjs/common';

export class RMQPipeClass {
protected logger: LoggerService;

constructor(logger: LoggerService = console) {
this.logger = logger;
}

async transform(msg: Message): Promise<Message> {
return msg;
}
}
11 changes: 11 additions & 0 deletions lib/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,14 @@ export const RMQ_CONNECT_OPTIONS = 'RMQ_CONNECT_OPTIONS';
export const RMQ_BROKER_OPTIONS = 'RMQ_BROKER_OPTIONS';
export const RMQ_MESSAGE_META_TEG = 'RMQ_MESSAGE_META_TEG';
export const RMQ_ROUTES_TRANSFORM = 'RMQ_ROUTES_TRANSFORM';
export const RMQ_APP_OPTIONS = 'RMQ_APP_OPTIONS';
export const TARGET_MODULE = 'TARGET_MODULE';

export const INITIALIZATION_STEP_DELAY = 400;
export const DEFAULT_TIMEOUT = 40000;

export const INDICATE_ERROR = 'Please indicate `replyToQueue';
export const TIMEOUT_ERROR = 'Response timeout error';
export const RECIVED_MESSAGE_ERROR = 'Received a message but with an error';
export const ERROR_RMQ_SERVICE = 'Rmq service error';
export const INOF_NOT_FULL_OPTIONS = 'Queue will not be created if there is no bind';
2 changes: 1 addition & 1 deletion lib/decorators/rmq-message.decorator.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { RMQ_MESSAGE_META_TEG } from 'lib/constants';
import { RMQ_MESSAGE_META_TEG } from '../constants';

export function RMQMessage(event: string) {
return function (target: any, propertyKey: string | symbol, descriptor: any) {
Expand Down
2 changes: 1 addition & 1 deletion lib/decorators/transform.decorator.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { applyDecorators, SetMetadata } from '@nestjs/common';
import { RMQ_ROUTES_TRANSFORM } from 'lib/constants';
import { RMQ_ROUTES_TRANSFORM } from '../constants';

export const RMQTransform = (): MethodDecorator => {
return applyDecorators(SetMetadata(RMQ_ROUTES_TRANSFORM, true));
Expand Down
4 changes: 2 additions & 2 deletions lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
export * from './rmq-nestjs.module';
export * from './rmq-nestjs.service';
export * from './rmq.module';
export * from './rmq.service';
11 changes: 11 additions & 0 deletions lib/interfaces/app-options.interface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { LoggerService } from '@nestjs/common';
import { RMQIntercepterClass, RMQPipeClass } from '../common';

export interface IAppOptions {
logger?: LoggerService;
globalMiddleware?: (typeof RMQPipeClass)[];
globalIntercepters?: (typeof RMQIntercepterClass)[];
errorHandler?: object;
serviceName?: string;
logMessages: boolean;
}
2 changes: 1 addition & 1 deletion lib/interfaces/metategs.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export type MetaTegsMap = Map<string | symbol, (...args: any[]) => any>;
export type IMetaTegsMap = Map<string | symbol, (...args: any[]) => any>;
23 changes: 21 additions & 2 deletions lib/interfaces/rmq-options.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,32 @@ export interface IRMQSRootAsyncOptions extends Pick<ModuleMetadata, 'imports'> {

export interface IMessageBroker {
exchange: IExchange;
queue?: IQueue;
replyTo: Options.AssertQueue;
queue?: IQueue;
messageTimeout?: number;
targetModuleName: string;
serviceName?: string;
}
export interface BindQueue {
export interface IBindQueue {
queue: string;
source: string;
pattern: string;
args?: Record<string, any>;
}

export interface ISendMessage {
exchange: string;
routingKey: string;
content: Record<string, any>;
options: Options.Publish;
}
export interface IPublishOptions extends Options.Publish {
timeout?: number;
}

export interface ISendToReplyQueueOptions {
replyTo: string;
content: Record<string, any>;
correlationId: string;
options?: Options.Publish;
}
6 changes: 5 additions & 1 deletion lib/interfaces/rmqService.ts
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
export interface ImqService {}
import { ConsumeMessage } from 'amqplib';

export interface ImqService {
readonly listenQueue: (msg: ConsumeMessage | null) => void;
}
68 changes: 64 additions & 4 deletions lib/rmq-nestjs-connect.service.ts → lib/rmq-connect.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,19 @@ import {
IExchange,
IQueue,
TypeQueue,
BindQueue,
IBindQueue,
ISendMessage,
ISendToReplyQueueOptions,
} from './interfaces';
import { Channel, Connection, Replies, connect } from 'amqplib';
import { Channel, Connection, ConsumeMessage, Replies, connect } from 'amqplib';

@Injectable()
export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy {
private connection: Connection = null;
private baseChannel: Channel = null;
private replyToChannel: Channel = null;

private declared = false;

constructor(
@Inject(RMQ_CONNECT_OPTIONS) private readonly options: IRabbitMQConfig
) {}
Expand All @@ -47,6 +49,11 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy {
);
}
}
public ack(
...params: Parameters<Channel['ack']>
): ReturnType<Channel['ack']> {
return this.baseChannel.ack(...params);
}
public async assertQueue(
typeQueue: TypeQueue,
options: IQueue
Expand All @@ -68,7 +75,7 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy {
throw new Error(`Failed to assert ${typeQueue} queue: ${error.message}`);
}
}
async bindQueue(bindQueue: BindQueue): Promise<void> {
async bindQueue(bindQueue: IBindQueue): Promise<void> {
try {
await this.baseChannel.bindQueue(
bindQueue.queue,
Expand All @@ -82,6 +89,59 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy {
);
}
}
async sendToReplyQueue(sendToQueueOptions: ISendToReplyQueueOptions) {
try {
this.replyToChannel.sendToQueue(
sendToQueueOptions.replyTo,
Buffer.from(JSON.stringify(sendToQueueOptions.content)),
{
correlationId: sendToQueueOptions.correlationId,
}
);
} catch (error) {
throw new Error(`Failed to send Reply Queue`);
}
}
async listerReplyQueue(
queue: string,
listenQueue: (msg: ConsumeMessage | null) => void
) {
try {
await this.replyToChannel.consume(queue, listenQueue, {
noAck: false,
});
} catch (error) {
throw new Error(`Failed to send listen Reply Queue`);
}
}
async listenQueue(
queue: string,
listenQueue: (msg: ConsumeMessage | null) => void
): Promise<void> {
try {
await this.baseChannel.consume(queue, listenQueue, {
noAck: false,
});
} catch (error) {
throw new Error(`Failed to listen Queue`);
}
}

publish(sendMessage: ISendMessage): void {
try {
this.baseChannel.publish(
sendMessage.exchange,
sendMessage.routingKey,
Buffer.from(JSON.stringify(sendMessage.content)),
{
replyTo: sendMessage.options.replyTo,
correlationId: sendMessage.options.correlationId,
}
);
} catch (error) {
throw new Error(`Failed to send message ${error}`);
}
}
private async setUpConnect(options: IRabbitMQConfig) {
const { username, password, hostname, port, virtualHost } = options;
const url = `amqp://${username}:${password}@${hostname}:${port}/${virtualHost}`;
Expand Down
21 changes: 15 additions & 6 deletions lib/rmq-nestjs-core.module.ts → lib/rmq-core.module.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,30 @@
import { DynamicModule, Module, Global } from '@nestjs/common';
import { RmqNestjsConnectService } from './rmq-nestjs-connect.service';
import { RMQ_CONNECT_OPTIONS } from './constants';
import { RMQ_APP_OPTIONS, RMQ_CONNECT_OPTIONS } from './constants';
import { IRMQSRootAsyncOptions, IRabbitMQConfig } from './interfaces';
import { RmqNestjsConnectService } from './rmq-connect.service';
import { IAppOptions } from './interfaces/app-options.interface';

@Global()
@Module({})
export class RmqNestjsCoreModule {
static forRoot(options: IRabbitMQConfig): DynamicModule {
static forRoot(
options: IRabbitMQConfig,
appOptions?: IAppOptions
): DynamicModule {
return {
module: RmqNestjsCoreModule,
providers: [
{ provide: RMQ_CONNECT_OPTIONS, useValue: options },
{ provide: RMQ_APP_OPTIONS, useValue: appOptions || {} },
RmqNestjsConnectService,
],
exports: [RmqNestjsConnectService],
exports: [RmqNestjsConnectService, RMQ_APP_OPTIONS],
};
}
static forRootAsync(options: IRMQSRootAsyncOptions): DynamicModule {
static forRootAsync(
options: IRMQSRootAsyncOptions,
appOptions?: IAppOptions
): DynamicModule {
return {
module: RmqNestjsCoreModule,
imports: options.imports,
Expand All @@ -29,9 +37,10 @@ export class RmqNestjsCoreModule {
},
inject: options.inject || [],
},
{ provide: RMQ_APP_OPTIONS, useValue: appOptions || {} },
RmqNestjsConnectService,
],
exports: [RmqNestjsConnectService],
exports: [RmqNestjsConnectService, RMQ_APP_OPTIONS],
};
}
}
40 changes: 0 additions & 40 deletions lib/rmq-nestjs.module.ts

This file was deleted.

Loading
Loading