@@ -592,13 +592,19 @@ where
592
592
// Transform the data before building a raw_message.
593
593
let transformed_data = self
594
594
. data_transform
595
- . outbound_transform ( & topic, data. clone ( ) ) ?;
595
+ . outbound_transform ( & topic. clone ( ) , data. clone ( ) ) ?;
596
+
597
+ let max_transmit_size_for_topic = self
598
+ . config
599
+ . protocol_config ( )
600
+ . max_transmit_size_for_topic ( & topic) ;
596
601
597
602
// check that the size doesn't exceed the max transmission size.
598
- if transformed_data. len ( ) > self . config . max_transmit_size ( ) {
603
+ if transformed_data. len ( ) > max_transmit_size_for_topic {
599
604
return Err ( PublishError :: MessageTooLarge ) ;
600
605
}
601
606
607
+ let mesh_n = self . config . mesh_n_for_topic ( & topic) ;
602
608
let raw_message = self . build_raw_message ( topic, transformed_data) ?;
603
609
604
610
// calculate the message id from the un-transformed data
@@ -648,7 +654,7 @@ where
648
654
Some ( mesh_peers) => {
649
655
// We have a mesh set. We want to make sure to publish to at least `mesh_n`
650
656
// peers (if possible).
651
- let needed_extra_peers = self . config . mesh_n ( ) . saturating_sub ( mesh_peers. len ( ) ) ;
657
+ let needed_extra_peers = mesh_n. saturating_sub ( mesh_peers. len ( ) ) ;
652
658
653
659
if needed_extra_peers > 0 {
654
660
// We don't have `mesh_n` peers in our mesh, we will randomly select extras
@@ -687,7 +693,6 @@ where
687
693
}
688
694
} else {
689
695
// We have no fanout peers, select mesh_n of them and add them to the fanout
690
- let mesh_n = self . config . mesh_n ( ) ;
691
696
let new_peers =
692
697
get_random_peers ( & self . connected_peers , & topic_hash, mesh_n, {
693
698
|p| {
@@ -971,7 +976,7 @@ where
971
976
}
972
977
973
978
let mut added_peers = HashSet :: new ( ) ;
974
-
979
+ let mesh_n = self . config . mesh_n_for_topic ( topic_hash ) ;
975
980
if let Some ( m) = self . metrics . as_mut ( ) {
976
981
m. joined ( topic_hash)
977
982
}
@@ -993,7 +998,7 @@ where
993
998
994
999
// Add up to mesh_n of them to the mesh
995
1000
// NOTE: These aren't randomly added, currently FIFO
996
- let add_peers = std:: cmp:: min ( peers. len ( ) , self . config . mesh_n ( ) ) ;
1001
+ let add_peers = std:: cmp:: min ( peers. len ( ) , mesh_n) ;
997
1002
tracing:: debug!(
998
1003
topic=%topic_hash,
999
1004
"JOIN: Adding {:?} peers from the fanout for topic" ,
@@ -1016,19 +1021,20 @@ where
1016
1021
}
1017
1022
1018
1023
// check if we need to get more peers, which we randomly select
1019
- if added_peers. len ( ) < self . config . mesh_n ( ) {
1024
+ if added_peers. len ( ) < mesh_n {
1020
1025
// get the peers
1021
1026
let new_peers = get_random_peers (
1022
1027
& self . connected_peers ,
1023
1028
topic_hash,
1024
- self . config . mesh_n ( ) - added_peers. len ( ) ,
1029
+ mesh_n - added_peers. len ( ) ,
1025
1030
|peer| {
1026
1031
!added_peers. contains ( peer)
1027
1032
&& !self . explicit_peers . contains ( peer)
1028
1033
&& !self . score_below_threshold ( peer, |_| 0.0 ) . 0
1029
1034
&& !self . backoffs . is_backoff_with_slack ( topic_hash, peer)
1030
1035
} ,
1031
1036
) ;
1037
+
1032
1038
added_peers. extend ( new_peers. clone ( ) ) ;
1033
1039
// add them to the mesh
1034
1040
tracing:: debug!(
@@ -1468,9 +1474,9 @@ where
1468
1474
1469
1475
// check mesh upper bound and only allow graft if the upper bound is not reached
1470
1476
// or if it is an outbound peer
1471
- if peers . len ( ) > = self . config . mesh_n_high ( )
1472
- && ! self . outbound_peers . contains ( peer_id )
1473
- {
1477
+ let mesh_n_high = self . config . mesh_n_high_for_topic ( & topic_hash ) ;
1478
+
1479
+ if peers . len ( ) >= mesh_n_high && ! self . outbound_peers . contains ( peer_id ) {
1474
1480
to_prune_topics. insert ( topic_hash. clone ( ) ) ;
1475
1481
continue ;
1476
1482
}
@@ -1946,9 +1952,9 @@ where
1946
1952
. is_backoff_with_slack ( topic_hash, propagation_source)
1947
1953
{
1948
1954
if let Some ( peers) = self . mesh . get_mut ( topic_hash) {
1949
- if peers . len ( ) < self . config . mesh_n_low ( )
1950
- && peers . insert ( * propagation_source )
1951
- {
1955
+ let mesh_n_low = self . config . mesh_n_low_for_topic ( topic_hash ) ;
1956
+
1957
+ if peers . len ( ) < mesh_n_low && peers . insert ( * propagation_source ) {
1952
1958
tracing:: debug!(
1953
1959
peer=%propagation_source,
1954
1960
topic=%topic_hash,
@@ -2102,6 +2108,11 @@ where
2102
2108
let backoffs = & self . backoffs ;
2103
2109
let outbound_peers = & self . outbound_peers ;
2104
2110
2111
+ let mesh_n = self . config . mesh_n_for_topic ( topic_hash) ;
2112
+ let mesh_n_low = self . config . mesh_n_low_for_topic ( topic_hash) ;
2113
+ let mesh_n_high = self . config . mesh_n_high_for_topic ( topic_hash) ;
2114
+ let mesh_outbound_min = self . config . mesh_outbound_min_for_topic ( topic_hash) ;
2115
+
2105
2116
// drop all peers with negative score, without PX
2106
2117
// if there is at some point a stable retain method for BTreeSet the following can be
2107
2118
// written more efficiently with retain.
@@ -2138,15 +2149,15 @@ where
2138
2149
}
2139
2150
2140
2151
// too little peers - add some
2141
- if peers. len ( ) < self . config . mesh_n_low ( ) {
2152
+ if peers. len ( ) < mesh_n_low {
2142
2153
tracing:: debug!(
2143
2154
topic=%topic_hash,
2144
2155
"HEARTBEAT: Mesh low. Topic contains: {} needs: {}" ,
2145
2156
peers. len( ) ,
2146
- self . config . mesh_n_low( )
2157
+ mesh_n_low
2147
2158
) ;
2148
2159
// not enough peers - get mesh_n - current_length more
2149
- let desired_peers = self . config . mesh_n ( ) - peers. len ( ) ;
2160
+ let desired_peers = mesh_n - peers. len ( ) ;
2150
2161
let peer_list =
2151
2162
get_random_peers ( & self . connected_peers , topic_hash, desired_peers, |peer| {
2152
2163
!peers. contains ( peer)
@@ -2167,14 +2178,14 @@ where
2167
2178
}
2168
2179
2169
2180
// too many peers - remove some
2170
- if peers. len ( ) > self . config . mesh_n_high ( ) {
2181
+ if peers. len ( ) > mesh_n_high {
2171
2182
tracing:: debug!(
2172
2183
topic=%topic_hash,
2173
2184
"HEARTBEAT: Mesh high. Topic contains: {} needs: {}" ,
2174
2185
peers. len( ) ,
2175
- self . config . mesh_n_high( )
2186
+ mesh_n_high
2176
2187
) ;
2177
- let excess_peer_no = peers. len ( ) - self . config . mesh_n ( ) ;
2188
+ let excess_peer_no = peers. len ( ) - mesh_n;
2178
2189
2179
2190
// shuffle the peers and then sort by score ascending beginning with the worst
2180
2191
let mut rng = thread_rng ( ) ;
@@ -2206,7 +2217,7 @@ where
2206
2217
break ;
2207
2218
}
2208
2219
if self . outbound_peers . contains ( & peer) {
2209
- if outbound <= self . config . mesh_outbound_min ( ) {
2220
+ if outbound <= mesh_outbound_min {
2210
2221
// do not remove anymore outbound peers
2211
2222
continue ;
2212
2223
}
@@ -2227,13 +2238,13 @@ where
2227
2238
}
2228
2239
2229
2240
// do we have enough outbound peers?
2230
- if peers. len ( ) >= self . config . mesh_n_low ( ) {
2241
+ if peers. len ( ) >= mesh_n_low {
2231
2242
// count number of outbound peers we have
2232
2243
let outbound = { peers. iter ( ) . filter ( |p| outbound_peers. contains ( * p) ) . count ( ) } ;
2233
2244
2234
2245
// if we have not enough outbound peers, graft to some new outbound peers
2235
- if outbound < self . config . mesh_outbound_min ( ) {
2236
- let needed = self . config . mesh_outbound_min ( ) - outbound;
2246
+ if outbound < mesh_outbound_min {
2247
+ let needed = mesh_outbound_min - outbound;
2237
2248
let peer_list =
2238
2249
get_random_peers ( & self . connected_peers , topic_hash, needed, |peer| {
2239
2250
!peers. contains ( peer)
@@ -2242,6 +2253,7 @@ where
2242
2253
&& * scores. get ( peer) . unwrap_or ( & 0.0 ) >= 0.0
2243
2254
&& outbound_peers. contains ( peer)
2244
2255
} ) ;
2256
+
2245
2257
for peer in & peer_list {
2246
2258
let current_topic = to_graft. entry ( * peer) . or_insert_with ( Vec :: new) ;
2247
2259
current_topic. push ( topic_hash. clone ( ) ) ;
@@ -2356,6 +2368,8 @@ where
2356
2368
Some ( ( _, thresholds, _) ) => thresholds. publish_threshold ,
2357
2369
_ => 0.0 ,
2358
2370
} ;
2371
+ let mesh_n = self . config . mesh_n_for_topic ( topic_hash) ;
2372
+
2359
2373
for peer_id in peers. iter ( ) {
2360
2374
// is the peer still subscribed to the topic?
2361
2375
let peer_score = * scores. get ( peer_id) . unwrap_or ( & 0.0 ) ;
@@ -2380,13 +2394,13 @@ where
2380
2394
}
2381
2395
2382
2396
// not enough peers
2383
- if peers. len ( ) < self . config . mesh_n ( ) {
2397
+ if peers. len ( ) < mesh_n {
2384
2398
tracing:: debug!(
2385
2399
"HEARTBEAT: Fanout low. Contains: {:?} needs: {:?}" ,
2386
2400
peers. len( ) ,
2387
- self . config . mesh_n( )
2401
+ mesh_n
2388
2402
) ;
2389
- let needed_peers = self . config . mesh_n ( ) - peers. len ( ) ;
2403
+ let needed_peers = mesh_n - peers. len ( ) ;
2390
2404
let explicit_peers = & self . explicit_peers ;
2391
2405
let new_peers =
2392
2406
get_random_peers ( & self . connected_peers , topic_hash, needed_peers, |peer_id| {
0 commit comments