Skip to content

Commit ed08a29

Browse files
committed
[FEAT] [CORE] added the ability to publish by providing a Msg argument. This is useful downstream to some applications.
1 parent e3ccac2 commit ed08a29

File tree

3 files changed

+80
-0
lines changed

3 files changed

+80
-0
lines changed

nats-base-client/core.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,13 @@ export interface NatsConnection {
549549
options?: PublishOptions,
550550
): void;
551551

552+
/**
553+
* Publishes using the subject of the specified message, specifying the
554+
* data, headers and reply found in the message if any.
555+
* @param msg
556+
*/
557+
publishMessage(msg: Msg): void;
558+
552559
/**
553560
* Subscribe expresses interest in the specified subject. The subject may
554561
* have wildcards. Messages are delivered to the {@link SubOpts#callback |SubscriptionOptions callback}

nats-base-client/nats.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,13 @@ export class NatsConnectionImpl implements NatsConnection {
126126
this.protocol.publish(subject, data, options);
127127
}
128128

129+
publishMessage(msg: Msg) {
130+
return this.publish(msg.subject, msg.data, {
131+
reply: msg.reply,
132+
headers: msg.headers,
133+
});
134+
}
135+
129136
subscribe(
130137
subject: string,
131138
opts: SubscriptionOptions = {},

tests/basics_test.ts

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,17 @@ import {
5151
headers,
5252
isIP,
5353
NatsConnectionImpl,
54+
Payload,
55+
PublishOptions,
5456
RequestStrategy,
5557
SubscriptionImpl,
5658
} from "../nats-base-client/internal_mod.ts";
5759
import { Feature } from "../nats-base-client/semver.ts";
5860
import { syncIterator } from "../nats-base-client/core.ts";
61+
import {
62+
MsgHdrs,
63+
Publisher,
64+
} from "https://deno.land/x/[email protected]/nats-base-client/core.ts";
5965

6066
Deno.test("basics - connect port", async () => {
6167
const ns = await NatsServer.start();
@@ -1374,3 +1380,63 @@ Deno.test("basics - sync subscription", async () => {
13741380

13751381
await cleanup(ns, nc);
13761382
});
1383+
1384+
1385+
1386+
Deno.test("basics - respond message", async () => {
1387+
const { ns, nc } = await setup();
1388+
const sub = nc.subscribe("q");
1389+
1390+
const nis = new MM(nc);
1391+
nis.data = new TextEncoder().encode("not in service");
1392+
1393+
(async () => {
1394+
for await (const m of sub) {
1395+
if (m.reply) {
1396+
nis.subject = m.reply;
1397+
nc.publishMessage(nis);
1398+
}
1399+
}
1400+
})().then();
1401+
1402+
const r = await nc.request("q");
1403+
assertEquals(r.string(), "not in service");
1404+
1405+
await cleanup(ns, nc);
1406+
});
1407+
1408+
1409+
class MM implements Msg {
1410+
data!: Uint8Array;
1411+
sid: number;
1412+
subject!: string;
1413+
reply?: string;
1414+
headers?: MsgHdrs;
1415+
publisher: Publisher;
1416+
1417+
constructor(p: Publisher) {
1418+
this.publisher = p;
1419+
this.sid = -1;
1420+
}
1421+
1422+
json<T>(): T {
1423+
throw new Error("not implemented");
1424+
}
1425+
1426+
respond(payload?: Payload, opts?: PublishOptions): boolean {
1427+
if (!this.reply) {
1428+
return false;
1429+
}
1430+
payload = payload || Empty;
1431+
this.publisher.publish(this.reply, payload, opts);
1432+
return true;
1433+
}
1434+
1435+
respondMessage(m: Msg): boolean {
1436+
return this.respond(m.data, { headers: m.headers, reply: m.reply });
1437+
}
1438+
1439+
string(): string {
1440+
return "";
1441+
}
1442+
}

0 commit comments

Comments
 (0)