9
9
10
10
{.used.}
11
11
12
- import stew/ byteutils
13
12
import utils
14
13
import chronicles
15
14
import ../../ libp2p/ protocols/ pubsub/ [gossipsub, mcache, peertable]
16
- import ../ helpers, ../ utils / [futures]
15
+ import ../ helpers
17
16
18
17
suite " GossipSub Mesh Management" :
19
18
teardown:
20
19
checkTrackers()
21
20
22
- asyncTest " topic params" :
23
- let params = TopicParams.init()
24
- params.validateParameters().tryGet()
25
-
26
21
asyncTest " subscribe/unsubscribeAll" :
27
22
let topic = " foobar"
28
23
let (gossipSub, conns, peers) =
@@ -176,7 +171,34 @@ suite "GossipSub Mesh Management":
176
171
# ensure we give priority and keep at least dOut outbound peers
177
172
check outbound >= gossipSub.parameters.dOut
178
173
179
- asyncTest " dont prune peers if mesh len is less than d_high" :
174
+ asyncTest " rebalanceMesh Degree Hi - dScore controls number of peers to retain by score when pruning" :
175
+ # Given GossipSub node starting with 13 peers in mesh
176
+ let
177
+ topic = " foobar"
178
+ totalPeers = 13
179
+
180
+ let (gossipSub, conns, peers) = setupGossipSubWithPeers(
181
+ totalPeers, topic, populateGossipsub = true , populateMesh = true
182
+ )
183
+ defer :
184
+ await teardownGossipSub(gossipSub, conns)
185
+
186
+ # And mesh is larger than dHigh
187
+ gossipSub.parameters.dLow = 4
188
+ gossipSub.parameters.d = 6
189
+ gossipSub.parameters.dHigh = 8
190
+ gossipSub.parameters.dOut = 3
191
+ gossipSub.parameters.dScore = 13
192
+
193
+ check gossipSub.mesh[topic].len == totalPeers
194
+
195
+ # When mesh is rebalanced
196
+ gossipSub.rebalanceMesh(topic)
197
+
198
+ # Then prunning is not triggered when mesh is not larger than dScore
199
+ check gossipSub.mesh[topic].len == totalPeers
200
+
201
+ asyncTest " Nodes graft peers according to DValues - numberOfNodes < dHigh" :
180
202
let
181
203
numberOfNodes = 5
182
204
topic = " foobar"
@@ -195,7 +217,7 @@ suite "GossipSub Mesh Management":
195
217
node.mesh.getOrDefault(topic).len == expectedNumberOfPeers
196
218
node.fanout.len == 0
197
219
198
- asyncTest " prune peers if mesh len is higher than d_high " :
220
+ asyncTest " Nodes graft peers according to DValues - numberOfNodes > dHigh " :
199
221
let
200
222
numberOfNodes = 15
201
223
topic = " foobar"
@@ -219,86 +241,6 @@ suite "GossipSub Mesh Management":
219
241
node.mesh.getOrDefault(topic).len <= dHigh
220
242
node.fanout.len == 0
221
243
222
- asyncTest " GossipSub unsub - resub faster than backoff" :
223
- # For this test to work we'd require a way to disable fanout.
224
- # There's not a way to toggle it, and mocking it didn't work as there's not a reliable mock available.
225
- skip()
226
- return
227
-
228
- # Instantiate handlers and validators
229
- var handlerFut0 = newFuture[bool ]()
230
- proc handler0(topic: string , data: seq [byte ]) {.async.} =
231
- check topic == " foobar"
232
- handlerFut0.complete(true )
233
-
234
- var handlerFut1 = newFuture[bool ]()
235
- proc handler1(topic: string , data: seq [byte ]) {.async.} =
236
- check topic == " foobar"
237
- handlerFut1.complete(true )
238
-
239
- var validatorFut = newFuture[bool ]()
240
- proc validator(
241
- topic: string , message: Message
242
- ): Future[ValidationResult] {.async.} =
243
- check topic == " foobar"
244
- validatorFut.complete(true )
245
- result = ValidationResult.Accept
246
-
247
- # Setup nodes and start switches
248
- let
249
- nodes = generateNodes(2 , gossip = true , unsubscribeBackoff = 5 .seconds)
250
- topic = " foobar"
251
-
252
- # Connect nodes
253
- startNodesAndDeferStop(nodes)
254
- await connectNodesStar(nodes)
255
-
256
- # Subscribe both nodes to the topic and node1 (receiver) to the validator
257
- nodes[0 ].subscribe(topic, handler0)
258
- nodes[1 ].subscribe(topic, handler1)
259
- nodes[1 ].addValidator(" foobar" , validator)
260
- await sleepAsync(DURATION_TIMEOUT)
261
-
262
- # Wait for both nodes to verify others' subscription
263
- var subs: seq [Future[void ]]
264
- subs &= waitSub(nodes[1 ], nodes[0 ], topic)
265
- subs &= waitSub(nodes[0 ], nodes[1 ], topic)
266
- await allFuturesThrowing(subs)
267
-
268
- # When unsubscribing and resubscribing in a short time frame, the backoff period should be triggered
269
- nodes[1 ].unsubscribe(topic, handler1)
270
- await sleepAsync(DURATION_TIMEOUT)
271
- nodes[1 ].subscribe(topic, handler1)
272
- await sleepAsync(DURATION_TIMEOUT)
273
-
274
- # Backoff is set to 5 seconds, and the amount of sleeping time since the unsubsribe until now is 3-4s~
275
- # Meaning, the subscription shouldn't have been processed yet because it's still in backoff period
276
- # When publishing under this condition
277
- discard await nodes[0 ].publish(" foobar" , " Hello!" .toBytes())
278
- await sleepAsync(DURATION_TIMEOUT)
279
-
280
- # Then the message should not be received:
281
- check:
282
- validatorFut.toState().isPending()
283
- handlerFut1.toState().isPending()
284
- handlerFut0.toState().isPending()
285
-
286
- validatorFut.reset()
287
- handlerFut0.reset()
288
- handlerFut1.reset()
289
-
290
- # If we wait backoff period to end, around 1-2s
291
- await waitForMesh(nodes[0 ], nodes[1 ], topic, 3 .seconds)
292
-
293
- discard await nodes[0 ].publish(" foobar" , " Hello!" .toBytes())
294
- await sleepAsync(DURATION_TIMEOUT)
295
-
296
- # Then the message should be received
297
- check:
298
- validatorFut.toState().isCompleted()
299
- handlerFut1.toState().isCompleted()
300
- handlerFut0.toState().isPending()
301
-
302
244
asyncTest " e2e - GossipSub should add remote peer topic subscriptions" :
303
245
proc handler(topic: string , data: seq [byte ]) {.async.} =
304
246
discard
@@ -481,3 +423,108 @@ suite "GossipSub Mesh Management":
481
423
topic notin node.topics
482
424
topic notin node.mesh
483
425
topic notin node.gossipsub
426
+
427
+ asyncTest " Unsubscribe backoff" :
428
+ const
429
+ numberOfNodes = 3
430
+ topic = " foobar"
431
+ unsubscribeBackoff = 1 .seconds # 1s is the minimum
432
+ let nodes = generateNodes(
433
+ numberOfNodes, gossip = true , unsubscribeBackoff = unsubscribeBackoff
434
+ )
435
+ .toGossipSub()
436
+
437
+ startNodesAndDeferStop(nodes)
438
+
439
+ # Nodes are connected to Node0
440
+ for i in 1 ..< numberOfNodes:
441
+ await connectNodes(nodes[0 ], nodes[i])
442
+ subscribeAllNodes(nodes, topic, voidTopicHandler)
443
+ await waitForHeartbeat()
444
+
445
+ check:
446
+ nodes[0 ].mesh[topic].len == numberOfNodes - 1
447
+
448
+ # When Node0 unsubscribes from the topic
449
+ nodes[0 ].unsubscribe(topic, voidTopicHandler)
450
+
451
+ # And subscribes back straight away
452
+ nodes[0 ].subscribe(topic, voidTopicHandler)
453
+
454
+ # Then its mesh is pruned and peers have applied unsubscribeBackoff
455
+ # Waiting more than one heartbeat (60ms) and less than unsubscribeBackoff (1s)
456
+ await sleepAsync(unsubscribeBackoff.div (2 ))
457
+ check:
458
+ not nodes[0 ].mesh.hasKey(topic)
459
+
460
+ # When unsubscribeBackoff period is done
461
+ await sleepAsync(unsubscribeBackoff)
462
+
463
+ # Then on the next heartbeat mesh is rebalanced and peers are regrafted
464
+ check:
465
+ nodes[0 ].mesh[topic].len == numberOfNodes - 1
466
+
467
+ asyncTest " Prune backoff" :
468
+ const
469
+ numberOfNodes = 9
470
+ topic = " foobar"
471
+ pruneBackoff = 1 .seconds # 1s is the minimum
472
+ dValues = some(
473
+ DValues(
474
+ dLow: some(6 ),
475
+ dHigh: some(8 ),
476
+ d: some(6 ),
477
+ dLazy: some(6 ),
478
+ dScore: some(4 ),
479
+ dOut: some(2 ),
480
+ )
481
+ )
482
+ let
483
+ nodes = generateNodes(
484
+ numberOfNodes, gossip = true , dValues = dValues, pruneBackoff = pruneBackoff
485
+ )
486
+ .toGossipSub()
487
+ node0 = nodes[0 ]
488
+
489
+ startNodesAndDeferStop(nodes)
490
+
491
+ # Nodes are connected to Node0
492
+ for i in 1 ..< numberOfNodes:
493
+ await connectNodes(node0, nodes[i])
494
+ subscribeAllNodes(nodes, topic, voidTopicHandler)
495
+
496
+ checkUntilTimeout:
497
+ node0.mesh.getOrDefault(topic).len == numberOfNodes - 1
498
+
499
+ # When DValues of Node0 are updated to lower than initial dValues
500
+ const newDValues = some(
501
+ DValues(
502
+ dLow: some(2 ),
503
+ dHigh: some(4 ),
504
+ d: some(3 ),
505
+ dLazy: some(3 ),
506
+ dScore: some(2 ),
507
+ dOut: some(2 ),
508
+ )
509
+ )
510
+ node0.parameters.applyDValues(newDValues)
511
+
512
+ # Then Node0 mesh is pruned to newDValues.dHigh length
513
+ # And pruned peers have applied pruneBackoff
514
+ checkUntilTimeout:
515
+ node0.mesh.getOrDefault(topic).len == newDValues.get.dHigh.get
516
+
517
+ # When DValues of Node0 are updated back to the initial dValues
518
+ node0.parameters.applyDValues(dValues)
519
+
520
+ # Waiting more than one heartbeat (60ms) and less than pruneBackoff (1s)
521
+ await sleepAsync(pruneBackoff.div (2 ))
522
+ check:
523
+ node0.mesh.getOrDefault(topic).len == newDValues.get.dHigh.get
524
+
525
+ # When pruneBackoff period is done
526
+ await sleepAsync(pruneBackoff)
527
+
528
+ # Then on the next heartbeat mesh is rebalanced and peers are regrafted to the initial d value
529
+ check:
530
+ node0.mesh.getOrDefault(topic).len == dValues.get.d.get
0 commit comments