Skip to content

Commit 9004a4a

Browse files
Merge pull request #11642 from ryoctrl/nack_when_no_matching_handler
fix(microservices): to nack when there is no matching handler
2 parents 4bb43f0 + 4f45a70 commit 9004a4a

File tree

3 files changed

+54
-4
lines changed

3 files changed

+54
-4
lines changed

packages/microservices/constants.ts

+5
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ export const RQM_DEFAULT_QUEUE_OPTIONS = {};
4040
export const RQM_DEFAULT_NOACK = true;
4141
export const RQM_DEFAULT_PERSISTENT = false;
4242
export const RQM_DEFAULT_NO_ASSERT = false;
43+
export const RQM_NO_EVENT_HANDLER = (
44+
text: TemplateStringsArray,
45+
pattern: string,
46+
) =>
47+
`An unsupported event was received. It has been acknowledged, so it will not be re-delivered. Pattern: ${pattern}`;
4348
export const GRPC_DEFAULT_PROTO_LOADER = '@grpc/proto-loader';
4449

4550
export const NO_EVENT_HANDLER = (text: TemplateStringsArray, pattern: string) =>

packages/microservices/server/server-rmq.ts

+18-3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import {
1717
RQM_DEFAULT_QUEUE,
1818
RQM_DEFAULT_QUEUE_OPTIONS,
1919
RQM_DEFAULT_URL,
20+
RQM_NO_EVENT_HANDLER,
2021
} from '../constants';
2122
import { RmqContext } from '../ctx-host';
2223
import { Transport } from '../enums';
@@ -25,6 +26,7 @@ import { CustomTransportStrategy, RmqOptions } from '../interfaces';
2526
import {
2627
IncomingRequest,
2728
OutgoingResponse,
29+
ReadPacket,
2830
} from '../interfaces/packet.interface';
2931
import { RmqRecordSerializer } from '../serializers/rmq-record.serializer';
3032
import { Server } from './server';
@@ -42,6 +44,7 @@ export class ServerRMQ extends Server implements CustomTransportStrategy {
4244
protected readonly urls: string[] | RmqUrl[];
4345
protected readonly queue: string;
4446
protected readonly prefetchCount: number;
47+
protected readonly noAck: boolean;
4548
protected readonly queueOptions: any;
4649
protected readonly isGlobalPrefetchCount: boolean;
4750
protected readonly noAssert: boolean;
@@ -54,6 +57,7 @@ export class ServerRMQ extends Server implements CustomTransportStrategy {
5457
this.prefetchCount =
5558
this.getOptionsProp(this.options, 'prefetchCount') ||
5659
RQM_DEFAULT_PREFETCH_COUNT;
60+
this.noAck = this.getOptionsProp(this.options, 'noAck', RQM_DEFAULT_NOACK);
5761
this.isGlobalPrefetchCount =
5862
this.getOptionsProp(this.options, 'isGlobalPrefetchCount') ||
5963
RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT;
@@ -141,8 +145,6 @@ export class ServerRMQ extends Server implements CustomTransportStrategy {
141145
}
142146

143147
public async setupChannel(channel: any, callback: Function) {
144-
const noAck = this.getOptionsProp(this.options, 'noAck', RQM_DEFAULT_NOACK);
145-
146148
if (!this.queueOptions.noAssert) {
147149
await channel.assertQueue(this.queue, this.queueOptions);
148150
}
@@ -151,7 +153,7 @@ export class ServerRMQ extends Server implements CustomTransportStrategy {
151153
this.queue,
152154
(msg: Record<string, any>) => this.handleMessage(msg, channel),
153155
{
154-
noAck,
156+
noAck: this.noAck,
155157
},
156158
);
157159
callback();
@@ -200,6 +202,19 @@ export class ServerRMQ extends Server implements CustomTransportStrategy {
200202
response$ && this.send(response$, publish);
201203
}
202204

205+
public async handleEvent(
206+
pattern: string,
207+
packet: ReadPacket,
208+
context: RmqContext,
209+
): Promise<any> {
210+
const handler = this.getHandlerByPattern(pattern);
211+
if (!handler && !this.noAck) {
212+
this.channel.nack(context.getMessage(), false, false);
213+
return this.logger.warn(RQM_NO_EVENT_HANDLER`${pattern}`);
214+
}
215+
return super.handleEvent(pattern, packet, context);
216+
}
217+
203218
public sendMessage<T = any>(
204219
message: T,
205220
replyTo: any,

packages/microservices/test/server/server-rmq.spec.ts

+31-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import * as sinon from 'sinon';
33
import { NO_MESSAGE_HANDLER } from '../../constants';
44
import { BaseRpcContext } from '../../ctx-host/base-rpc.context';
55
import { ServerRMQ } from '../../server/server-rmq';
6+
import { RmqContext } from '../../ctx-host';
67

78
describe('ServerRMQ', () => {
89
let server: ServerRMQ;
@@ -99,10 +100,15 @@ describe('ServerRMQ', () => {
99100
data: 'tests',
100101
id: '3',
101102
});
103+
const channel = {
104+
nack: sinon.spy(),
105+
};
106+
102107
let sendMessageStub: sinon.SinonStub;
103108

104109
beforeEach(() => {
105110
sendMessageStub = sinon.stub(server, 'sendMessage').callsFake(() => ({}));
111+
(server as any).channel = channel;
106112
});
107113
it('should call "handleEvent" if identifier is not present', async () => {
108114
const handleEventSpy = sinon.spy(server, 'handleEvent');
@@ -223,9 +229,33 @@ describe('ServerRMQ', () => {
223229
server.handleEvent(
224230
channel,
225231
{ pattern: '', data },
226-
new BaseRpcContext([]),
232+
new RmqContext([{}, {}, '']),
227233
);
228234
expect(handler.calledWith(data)).to.be.true;
229235
});
236+
237+
it('should negative acknowledge without retrying if key does not exists in handlers object and noAck option is false', () => {
238+
const nack = sinon.spy();
239+
const message = { pattern: 'no-exists', data };
240+
(server as any).channel = {
241+
nack,
242+
};
243+
(server as any).noAck = false;
244+
server.handleEvent(channel, message, new RmqContext([message, '', '']));
245+
246+
expect(nack.calledWith(message, false, false)).to.be.true;
247+
});
248+
249+
it('should not negative acknowledge if key does not exists in handlers object but noAck option is true', () => {
250+
const nack = sinon.spy();
251+
const message = { pattern: 'no-exists', data };
252+
(server as any).channel = {
253+
nack,
254+
};
255+
(server as any).noAck = true;
256+
server.handleEvent(channel, message, new RmqContext([message, '', '']));
257+
258+
expect(nack.calledWith(message, false, false)).not.to.be.true;
259+
});
230260
});
231261
});

0 commit comments

Comments
 (0)