Skip to content

Commit 7f7d648

Browse files
authored
Improve pipeline jitter (#1335)
* Rework of pipeline backoff * Cargo fmt * Fic backoff calculation * Fix backoff calculation * Fix lint * Add event tests * Improve event tests * Update event API * Improve event tests * Precommit * Fix event wait_timeout and wait_deadline impls * Add event_deadline tests * Pre-commit * Update batching config * Fix typos * Address review comments
1 parent 6dea3bf commit 7f7d648

File tree

16 files changed

+803
-152
lines changed

16 files changed

+803
-152
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ console-subscriber = "0.3.0"
9090
const_format = "0.2.30"
9191
crc = "3.0.1"
9292
criterion = "0.5"
93+
crossbeam-utils = "0.8.2"
9394
derive_more = "0.99.17"
9495
derive-new = "0.6.0"
9596
tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] }

DEFAULT_CONFIG.json5

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -353,8 +353,6 @@
353353
/// Therefore, the maximum batch size is 2^16-1 (i.e. 65535).
354354
/// The default batch size value is the maximum batch size: 65535.
355355
batch_size: 65535,
356-
/// Perform batching of messages if they are smaller of the batch_size
357-
batching: true,
358356
/// Each zenoh link has a transmission queue that can be configured
359357
queue: {
360358
/// The size of each priority queue indicates the number of batches a given queue can contain.
@@ -380,9 +378,16 @@
380378
/// The maximum time in microseconds to wait for an available batch before dropping the message if still no batch is available.
381379
wait_before_drop: 1000,
382380
},
383-
/// The initial exponential backoff time in nanoseconds to allow the batching to eventually progress.
384-
/// Higher values lead to a more aggressive batching but it will introduce additional latency.
385-
backoff: 100,
381+
/// Perform batching of messages if they are smaller of the batch_size
382+
batching: {
383+
/// Perform adaptive batching of messages if they are smaller of the batch_size.
384+
/// When the network is detected to not be fast enough to transmit every message individually, many small messages may be
385+
/// batched together and sent all at once on the wire reducing the overall network overhead. This is typically of a high-throughput
386+
/// scenario mainly composed of small messages. In other words, batching is activated by the network back-pressure.
387+
enabled: true,
388+
/// The maximum time limit (in ms) a message should be retained for batching when back-pressure happens.
389+
time_limit: 1,
390+
}
386391
},
387392
},
388393
/// Configure the zenoh RX parameters of a link

commons/zenoh-config/src/defaults.rs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -191,17 +191,6 @@ impl Default for LinkTxConf {
191191
batch_size: BatchSize::MAX,
192192
queue: QueueConf::default(),
193193
threads: num,
194-
batching: true,
195-
}
196-
}
197-
}
198-
199-
impl Default for QueueConf {
200-
fn default() -> Self {
201-
Self {
202-
size: QueueSizeConf::default(),
203-
congestion_control: CongestionControlConf::default(),
204-
backoff: 100,
205194
}
206195
}
207196
}
@@ -234,6 +223,15 @@ impl Default for CongestionControlConf {
234223
}
235224
}
236225

226+
impl Default for BatchingConf {
227+
fn default() -> Self {
228+
BatchingConf {
229+
enabled: true,
230+
time_limit: 1,
231+
}
232+
}
233+
}
234+
237235
impl Default for LinkRxConf {
238236
fn default() -> Self {
239237
Self {

commons/zenoh-config/src/lib.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -407,9 +407,8 @@ validated_struct::validator! {
407407
keep_alive: usize,
408408
/// Zenoh's MTU equivalent (default: 2^16-1) (max: 2^16-1)
409409
batch_size: BatchSize,
410-
/// Perform batching of messages if they are smaller of the batch_size
411-
batching: bool,
412-
pub queue: QueueConf {
410+
pub queue: #[derive(Default)]
411+
QueueConf {
413412
/// The size of each priority queue indicates the number of batches a given queue can contain.
414413
/// The amount of memory being allocated for each queue is then SIZE_XXX * BATCH_SIZE.
415414
/// In the case of the transport link MTU being smaller than the ZN_BATCH_SIZE,
@@ -432,9 +431,15 @@ validated_struct::validator! {
432431
/// The maximum time in microseconds to wait for an available batch before dropping the message if still no batch is available.
433432
pub wait_before_drop: u64,
434433
},
435-
/// The initial exponential backoff time in nanoseconds to allow the batching to eventually progress.
436-
/// Higher values lead to a more aggressive batching but it will introduce additional latency.
437-
backoff: u64,
434+
pub batching: BatchingConf {
435+
/// Perform adaptive batching of messages if they are smaller of the batch_size.
436+
/// When the network is detected to not be fast enough to transmit every message individually, many small messages may be
437+
/// batched together and sent all at once on the wire reducing the overall network overhead. This is typically of a high-throughput
438+
/// scenario mainly composed of small messages. In other words, batching is activated by the network back-pressure.
439+
enabled: bool,
440+
/// The maximum time limit (in ms) a message should be retained for batching when back-pressure happens.
441+
time_limit: u64,
442+
},
438443
},
439444
// Number of threads used for TX
440445
threads: usize,

commons/zenoh-shm/src/header/storage.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use super::{
2525
segment::HeaderSegment,
2626
};
2727

28-
#[dynamic(lazy,drop)]
28+
#[dynamic(lazy, drop)]
2929
pub static mut GLOBAL_HEADER_STORAGE: HeaderStorage = HeaderStorage::new(32768usize).unwrap();
3030

3131
pub struct HeaderStorage {

commons/zenoh-shm/src/header/subscription.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,8 @@ use super::{
2424
segment::HeaderSegment,
2525
};
2626

27-
#[dynamic(lazy,drop)]
28-
pub static mut GLOBAL_HEADER_SUBSCRIPTION: Subscription = Subscription::new();
29-
27+
#[dynamic(lazy, drop)]
28+
pub static mut GLOBAL_HEADER_SUBSCRIPTION: Subscription = Subscription::new();
3029

3130
pub struct Subscription {
3231
linked_table: Mutex<BTreeMap<HeaderSegmentID, Arc<HeaderSegment>>>,

commons/zenoh-shm/src/watchdog/confirmator.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,9 @@ use super::{
2727
segment::Segment,
2828
};
2929

30-
#[dynamic(lazy,drop)]
30+
#[dynamic(lazy, drop)]
3131
pub static mut GLOBAL_CONFIRMATOR: WatchdogConfirmator =
32-
WatchdogConfirmator::new(Duration::from_millis(50));
33-
32+
WatchdogConfirmator::new(Duration::from_millis(50));
3433

3534
pub struct ConfirmedDescriptor {
3635
pub owned: OwnedDescriptor,

commons/zenoh-shm/src/watchdog/storage.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use zenoh_result::{zerror, ZResult};
2121

2222
use super::{allocated_watchdog::AllocatedWatchdog, descriptor::OwnedDescriptor, segment::Segment};
2323

24-
#[dynamic(lazy,drop)]
24+
#[dynamic(lazy, drop)]
2525
pub static mut GLOBAL_STORAGE: WatchdogStorage = WatchdogStorage::new(32768usize).unwrap();
2626

2727
pub struct WatchdogStorage {

0 commit comments

Comments
 (0)