@@ -29,6 +29,7 @@ import {
29
29
} from "../../src/mod.ts" ;
30
30
import { JsMsgImpl , parseInfo , toJsMsg } from "../jsmsg.ts" ;
31
31
import {
32
+ assertBetween ,
32
33
cleanup ,
33
34
jetstreamServerConf ,
34
35
setup ,
@@ -225,3 +226,97 @@ Deno.test("jsmsg - explicit consumer ackAck timeout", async () => {
225
226
226
227
await cleanup ( ns , nc ) ;
227
228
} ) ;
229
+
230
+ Deno . test ( "jsmsg - custom consumer ackAck timeout" , async ( ) => {
231
+ const { ns, nc } = await setup ( jetstreamServerConf ( ) ) ;
232
+ const jsm = await nc . jetstreamManager ( ) as JetStreamManagerImpl ;
233
+ await jsm . streams . add ( {
234
+ name : "A" ,
235
+ subjects : [ "a.>" ] ,
236
+ storage : StorageType . Memory ,
237
+ allow_direct : true ,
238
+ } ) ;
239
+
240
+ const js = nc . jetstream ( ) ;
241
+ await js . publish ( "a.a" ) ;
242
+
243
+ await jsm . consumers . add ( "A" , { durable_name : "a" } ) ;
244
+ const c = await js . consumers . get ( "A" , "a" ) ;
245
+ const jm = await c . next ( ) ;
246
+ // change the subject
247
+ ( ( jm as JsMsgImpl ) . msg as MsgImpl ) . _reply = "xxxx" ;
248
+ nc . subscribe ( "xxxx" ) ;
249
+ const start = Date . now ( ) ;
250
+ await assertRejects (
251
+ ( ) : Promise < boolean > => {
252
+ return jm ! . ackAck ( { timeout : 1500 } ) ;
253
+ } ,
254
+ Error ,
255
+ "TIMEOUT" ,
256
+ ) ;
257
+ assertBetween ( Date . now ( ) - start , 1300 , 1700 ) ;
258
+ await cleanup ( ns , nc ) ;
259
+ } ) ;
260
+
261
+ Deno . test ( "jsmsg - custom consumer ackAck timeout in jsopts" , async ( ) => {
262
+ const { ns, nc } = await setup ( jetstreamServerConf ( ) ) ;
263
+ const jsm = await nc . jetstreamManager ( ) as JetStreamManagerImpl ;
264
+ await jsm . streams . add ( {
265
+ name : "A" ,
266
+ subjects : [ "a.>" ] ,
267
+ storage : StorageType . Memory ,
268
+ allow_direct : true ,
269
+ } ) ;
270
+
271
+ const js = nc . jetstream ( { timeout : 2000 } ) ;
272
+ await js . publish ( "a.a" ) ;
273
+
274
+ await jsm . consumers . add ( "A" , { durable_name : "a" } ) ;
275
+ const c = await js . consumers . get ( "A" , "a" ) ;
276
+ const jm = await c . next ( ) ;
277
+ // change the subject
278
+ ( ( jm as JsMsgImpl ) . msg as MsgImpl ) . _reply = "xxxx" ;
279
+ nc . subscribe ( "xxxx" ) ;
280
+ const start = Date . now ( ) ;
281
+ await assertRejects (
282
+ ( ) : Promise < boolean > => {
283
+ return jm ! . ackAck ( ) ;
284
+ } ,
285
+ Error ,
286
+ "TIMEOUT" ,
287
+ ) ;
288
+ assertBetween ( Date . now ( ) - start , 1800 , 2200 ) ;
289
+
290
+ await cleanup ( ns , nc ) ;
291
+ } ) ;
292
+
293
+ Deno . test ( "jsmsg - ackAck() timeout legacy jsopts" , async ( ) => {
294
+ const { ns, nc } = await setup ( jetstreamServerConf ( ) ) ;
295
+ const jsm = await nc . jetstreamManager ( ) as JetStreamManagerImpl ;
296
+ await jsm . streams . add ( {
297
+ name : "A" ,
298
+ subjects : [ "a.>" ] ,
299
+ storage : StorageType . Memory ,
300
+ allow_direct : true ,
301
+ } ) ;
302
+
303
+ const js = nc . jetstream ( { timeout : 1500 } ) ;
304
+ await js . publish ( "a.a" ) ;
305
+
306
+ await jsm . consumers . add ( "A" , { durable_name : "a" } ) ;
307
+ const jm = await js . pull ( "A" , "a" ) ;
308
+ // change the subject
309
+ ( ( jm as JsMsgImpl ) . msg as MsgImpl ) . _reply = "xxxx" ;
310
+ nc . subscribe ( "xxxx" ) ;
311
+ const start = Date . now ( ) ;
312
+ await assertRejects (
313
+ ( ) : Promise < boolean > => {
314
+ return jm ! . ackAck ( ) ;
315
+ } ,
316
+ Error ,
317
+ "TIMEOUT" ,
318
+ ) ;
319
+ assertBetween ( Date . now ( ) - start , 1300 , 1700 ) ;
320
+
321
+ await cleanup ( ns , nc ) ;
322
+ } ) ;
0 commit comments