@@ -46,6 +46,7 @@ var hideReplicatorLog bool
46
46
var showProgress bool
47
47
var in string
48
48
var out string
49
+ var pubFirst bool
49
50
50
51
func startReplicator (connections []conf.ConnectorConfig ) (* core.NATSReplicator , error ) {
51
52
config := conf .DefaultConfig ()
@@ -114,6 +115,7 @@ func main() {
114
115
flag .StringVar (& natsURL2 , "nats2" , "" , "nats url for the subscriber side, defaults to nats://localhost:4222" )
115
116
flag .StringVar (& stanClusterID2 , "stan2" , "" , "stan cluster id for the subscriber side, defaults to test-cluster" )
116
117
flag .BoolVar (& direct , "direct" , false , "skip the replicator and just" )
118
+ flag .BoolVar (& pubFirst , "pubFirst" , false , "pre-run the publiser, then start the replicator and/or subscriber" )
117
119
flag .BoolVar (& pubOnly , "pub" , false , "only publish, don't subscribe, useful for testing send times across a long connection" )
118
120
flag .BoolVar (& subOnly , "sub" , false , "only time the reads, useful for testing read times across a long connection, timer starts with first receive" )
119
121
flag .BoolVar (& showStats , "stats" , false , "print replicator stats, if not direct" )
@@ -124,13 +126,23 @@ func main() {
124
126
flag .Parse ()
125
127
126
128
var replicator * core.NATSReplicator
129
+ var startPub time.Time
130
+ var endPub time.Time
131
+ var startSub time.Time
132
+ var endSub time.Time
133
+ var startRep time.Time
134
+ var endRep time.Time
127
135
128
136
incoming := nuid .Next ()
129
137
outgoing := nuid .Next ()
130
138
msgString := strings .Repeat ("a" , messageSize )
131
139
msg := []byte (msgString )
132
140
msgLen := len (msg )
133
- wg := sync.WaitGroup {}
141
+ pubwg := sync.WaitGroup {}
142
+ repwg := sync.WaitGroup {}
143
+ subwg := sync.WaitGroup {}
144
+ interval := int (iterations / 10 )
145
+ repTimeout := make (chan bool , 1 )
134
146
135
147
if natsURL2 == "" {
136
148
natsURL2 = natsURL
@@ -153,35 +165,16 @@ func main() {
153
165
log .Printf ("Pub and sub only mode always run with direct mode, no replicator is used" )
154
166
}
155
167
156
- if ! direct {
157
- connect := []conf.ConnectorConfig {
158
- {
159
- Type : "StanToStan" ,
160
- IncomingConnection : "stan" ,
161
- OutgoingConnection : "stan2" ,
162
- IncomingChannel : incoming ,
163
- OutgoingChannel : outgoing ,
164
- },
165
- }
166
-
167
- var err error
168
- replicator , err = startReplicator (connect )
169
- if err != nil {
170
- log .Fatalf ("error starting replicator, %s" , err .Error ())
171
- }
172
- } else {
168
+ if direct {
173
169
log .Printf ("Direct mode uses the same nats url and stan cluster id for both connections" )
174
170
if in == "" && out == "" {
171
+ log .Printf ("Unless custom channels are set, the same channel is used for read/write in direct mode" )
175
172
outgoing = incoming
176
173
}
177
174
stanClusterID2 = stanClusterID
178
175
natsURL2 = natsURL
179
176
}
180
177
181
- done := make (chan bool , 1 )
182
- count := 0
183
- interval := int (iterations / 10 )
184
-
185
178
nc , err := nats .Connect (natsURL , nats .Timeout (time .Second * 5 ), nats .MaxReconnects (5 ), nats .ReconnectWait (time .Second * 5 ))
186
179
if err != nil {
187
180
log .Fatalf ("error connecting to nats, %s" , err .Error ())
@@ -206,78 +199,145 @@ func main() {
206
199
207
200
log .Printf ("Incoming/Replicated channel %s : Outgoing/Subscribed channel: %s" , incoming , outgoing )
208
201
209
- var start time.Time
202
+ if pubFirst || pubOnly {
203
+ log .Printf ("Sending %d messages of size %d bytes..." , iterations , messageSize )
204
+ pubwg .Add (iterations )
205
+ pubCount := 0
206
+ startPub = time .Now ()
207
+ for i := 0 ; i < iterations ; i ++ {
208
+ _ , err := sc .PublishAsync (incoming , msg , func (aguid string , err error ) {
209
+ pubCount ++
210
+ if err != nil {
211
+ log .Fatalf ("error in ack handler, %s" , err .Error ())
212
+ }
213
+ if (pubCount % interval == 0 || pubCount == iterations ) && showProgress {
214
+ log .Printf ("async send count = %d" , pubCount )
215
+ }
216
+ pubwg .Done ()
217
+ })
218
+
219
+ if err != nil {
220
+ log .Fatalf ("error publishing message, %s" , err .Error ())
221
+ }
222
+ }
223
+ pubwg .Wait ()
224
+ endPub = time .Now ()
225
+ }
210
226
211
227
if ! pubOnly {
212
- sc2 .Subscribe (outgoing , func (msg * stan.Msg ) {
213
- if subOnly && count == 0 {
214
- start = time .Now ()
228
+ subwg .Add (iterations )
229
+ subCount := 0
230
+ _ , err := sc2 .Subscribe (outgoing , func (msg * stan.Msg ) {
231
+ if subCount == 0 {
232
+ startSub = time .Now () // start timing on the first message
215
233
}
216
- count ++
217
- if count % interval == 0 && showProgress {
218
- log .Printf ("received count = %d" , count )
234
+ subCount ++
235
+ if ( subCount % interval == 0 || subCount == iterations ) && showProgress {
236
+ log .Printf ("received count = %d" , subCount )
219
237
}
220
238
221
239
if len (msg .Data ) != msgLen {
222
240
log .Fatalf ("received message that is the wrong size %d != %d" , len (msg .Data ), msgLen )
223
241
}
224
242
225
- if count = = iterations {
226
- done <- true
243
+ if subCount < = iterations {
244
+ subwg . Done ()
227
245
}
228
- })
229
- } else {
230
- done <- true
246
+ }, stan .DeliverAllAvailable ())
247
+
248
+ if err != nil {
249
+ log .Fatalf ("error subscribing to %s, %s" , outgoing , err .Error ())
250
+ }
231
251
}
232
252
233
- log .Printf ("Sending %d messages of size %d bytes..." , iterations , messageSize )
253
+ if ! direct {
254
+ connect := []conf.ConnectorConfig {
255
+ {
256
+ Type : "StanToStan" ,
257
+ IncomingConnection : "stan" ,
258
+ OutgoingConnection : "stan2" ,
259
+ IncomingChannel : incoming ,
260
+ OutgoingChannel : outgoing ,
261
+ IncomingStartAtSequence : 0 ,
262
+ },
263
+ }
234
264
235
- wg .Add (iterations )
265
+ var err error
266
+ replicator , err = startReplicator (connect )
267
+ if err != nil {
268
+ log .Fatalf ("error starting replicator, %s" , err .Error ())
269
+ }
236
270
237
- start = time .Now ()
238
- for i := 0 ; i < iterations ; i ++ {
239
- _ , err := sc .PublishAsync (incoming , msg , func (aguid string , err error ) {
240
- if err != nil {
241
- log .Fatalf ("error in ack handler, %s" , err .Error ())
271
+ // Start trying to capture the replicator ack activity
272
+ repwg .Add (1 )
273
+ go func () {
274
+ repTicker := time .NewTicker (100 * time .Millisecond )
275
+ loop:
276
+ for {
277
+ select {
278
+ case t := <- repTicker .C :
279
+ reqcount := replicator .SafeStats ().RequestCount
280
+ if reqcount >= 0 && startRep .IsZero () {
281
+ startRep = t
282
+ }
283
+ if reqcount >= int64 (iterations ) {
284
+ endRep = t
285
+ break loop
286
+ }
287
+ case <- repTimeout :
288
+ break loop
289
+ }
242
290
}
243
- wg .Done ()
244
- })
291
+ repwg .Done ()
292
+ repTicker .Stop ()
293
+ }()
294
+ }
245
295
246
- if err != nil {
247
- log .Fatalf ("error publishing message, %s" , err .Error ())
248
- }
296
+ if ! pubFirst && ! pubOnly {
297
+ log .Printf ("Sending %d messages of size %d bytes..." , iterations , messageSize )
298
+ pubwg .Add (iterations )
299
+ pubCount := 0
300
+ startPub = time .Now ()
301
+ for i := 0 ; i < iterations ; i ++ {
302
+ _ , err := sc .PublishAsync (incoming , msg , func (aguid string , err error ) {
303
+ pubCount ++
304
+ if err != nil {
305
+ log .Fatalf ("error in ack handler, %s" , err .Error ())
306
+ }
307
+ if (pubCount % interval == 0 || pubCount == iterations ) && showProgress {
308
+ log .Printf ("async send count = %d" , pubCount )
309
+ }
310
+ pubwg .Done ()
311
+ })
249
312
250
- if i % interval == 0 && i != 0 && showProgress {
251
- log .Printf ("async send count = %d" , i )
313
+ if err != nil {
314
+ log .Fatalf ("error publishing message, %s" , err .Error ())
315
+ }
252
316
}
317
+ pubwg .Wait ()
318
+ endPub = time .Now ()
253
319
}
254
- wg .Wait ()
255
- <- done
256
- end := time .Now ()
257
320
258
- if replicator != nil {
259
- log .Printf ("Trying to wait for acks to return to replicator before we shut it down" )
260
- timeout := time .Duration (5000 ) * time .Millisecond // 5 second timeout
261
- stop := time .Now ().Add (timeout )
262
- requestsOk := make (chan bool )
321
+ if ! pubOnly {
322
+ subwg .Wait ()
323
+ endSub = time .Now ()
324
+ }
263
325
264
- ticker := time .NewTicker (50 * time .Millisecond )
326
+ if ! direct {
327
+ log .Printf ("Waiting for acks to return to replicator before we shut it down" )
265
328
go func () {
329
+ stop := time .Now ().Add (10 * time .Second )
330
+ ticker := time .NewTicker (500 * time .Millisecond )
266
331
for t := range ticker .C {
267
332
if t .After (stop ) {
268
- requestsOk <- false
269
- break
270
- }
271
-
272
- if replicator .SafeStats ().RequestCount >= int64 (iterations ) {
273
- requestsOk <- true
333
+ repTimeout <- true
274
334
break
275
335
}
276
336
}
277
337
ticker .Stop ()
278
338
}()
279
339
280
- <- requestsOk
340
+ repwg . Wait ()
281
341
282
342
stats := replicator .SafeStats ()
283
343
statsJSON , _ := json .MarshalIndent (stats , "" , " " )
@@ -294,18 +354,53 @@ func main() {
294
354
nc2 .Close ()
295
355
nc .Close ()
296
356
297
- diff := end .Sub (start )
298
- rate := float64 (iterations ) / float64 (diff .Seconds ())
299
- sizeRate := float64 (messageSize ) * rate / (1024 * 1024 )
357
+ if ! direct && endRep .IsZero () {
358
+ log .Printf ("Test Failed, replicator did not receive all of the acks within the timeout" )
359
+ return
360
+ }
361
+
362
+ var totalDiff time.Duration
300
363
301
364
if pubOnly {
302
- log .Printf ("Sent %d messages to a streaming channel %s" , iterations , diff )
365
+ totalDiff = endPub .Sub (startPub )
366
+ log .Printf ("Sent %d messages to a streaming channel %s" , iterations , totalDiff )
303
367
} else if subOnly {
304
- log .Printf ("Read %d messages from a streaming channel in %s" , iterations , diff )
368
+ totalDiff = endSub .Sub (startSub )
369
+ log .Printf ("Read %d messages from a streaming channel in %s" , iterations , totalDiff )
305
370
} else if direct {
306
- log .Printf ("Sent %d messages through a streaming channel to a streaming subscriber in %s" , iterations , diff )
371
+ totalDiff = endSub .Sub (startPub )
372
+ log .Printf ("Sent %d messages through a streaming channel to a streaming subscriber in %s" , iterations , totalDiff )
373
+ log .Printf ("Total messages moved were %d" , 2 * iterations )
307
374
} else {
308
- log .Printf ("Sent %d messages through a channel to the replicator and read from another channel in %s" , iterations , diff )
375
+ totalDiff = endSub .Sub (startPub )
376
+ log .Printf ("Sent %d messages through a channel to the replicator and read from another channel in %s" , iterations , totalDiff )
377
+ log .Printf ("Total messages moved were %d" , 4 * iterations )
378
+ }
379
+
380
+ if pubFirst {
381
+ log .Printf ("Messages were pushed to streaming before a replicator or subscriber was created" )
382
+ }
383
+
384
+ totalRate := float64 (iterations ) / float64 (totalDiff .Seconds ())
385
+ totalSizeRate := float64 (messageSize ) * totalRate / (1024 * 1024 )
386
+ log .Printf ("Total stats - %.2f msgs/sec ~ %.2f MB/sec (using %d full paths)" , totalRate , totalSizeRate , iterations )
387
+
388
+ pubDiff := endPub .Sub (startPub )
389
+ pubRate := float64 (iterations ) / float64 (pubDiff .Seconds ())
390
+ pubSizeRate := float64 (messageSize ) * pubRate / (1024 * 1024 )
391
+ log .Printf (" Pub stats - %.2f msgs/sec ~ %.2f MB/sec" , pubRate , pubSizeRate )
392
+
393
+ if ! pubOnly {
394
+ subDiff := endSub .Sub (startSub )
395
+ subRate := float64 (iterations ) / float64 (subDiff .Seconds ())
396
+ subSizeRate := float64 (messageSize ) * subRate / (1024 * 1024 )
397
+ log .Printf (" Sub stats - %.2f msgs/sec ~ %.2f MB/sec" , subRate , subSizeRate )
398
+ }
399
+
400
+ if ! direct {
401
+ repDiff := endRep .Sub (startRep )
402
+ repRate := float64 (iterations ) / float64 (repDiff .Seconds ())
403
+ repSizeRate := float64 (messageSize ) * repRate / (1024 * 1024 )
404
+ log .Printf (" Rep stats - %.2f msgs/sec ~ %.2f MB/sec" , repRate , repSizeRate )
309
405
}
310
- log .Printf ("%.2f msgs/sec ~ %.2f MB/sec" , rate , sizeRate )
311
406
}
0 commit comments