Skip to content

[#139] events with bitset #167

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
bbeac8a
[#139] Add and test bitset reset_next
elfenpiff Mar 19, 2024
f405333
[#139] Introduce signal and id_tracker traits
elfenpiff Mar 19, 2024
88886bf
[#139] Remove TriggerId trait and replace it with struct
elfenpiff Mar 19, 2024
998916b
[#139] Add bitset as id tracker
elfenpiff Mar 19, 2024
ed3da85
[#139] Provide id tracker impl
elfenpiff Mar 19, 2024
61876fb
[#139] Implement semaphore as signal mechanism
elfenpiff Mar 19, 2024
6bd832c
[#139] IdTracker is based on a RelocatableContainer
elfenpiff Mar 19, 2024
61669e0
[#139] Add tests for signal mechanism
elfenpiff Mar 20, 2024
bf7e68e
[#139] Finalize signal mechanism tests; rename wait into blocking_wait
elfenpiff Mar 25, 2024
4987d65
[#139] Add and test events based on new event concept
elfenpiff Mar 25, 2024
41ab76e
[#139] Add trigger id max test
elfenpiff Mar 25, 2024
e3698e0
[#139] Use new shm semaphore bitset event
elfenpiff Mar 25, 2024
4b70a15
[#139] Track active bits for performance
elfenpiff Mar 25, 2024
e6c5133
[#139] Add test to verify trigger id max settings
elfenpiff Mar 25, 2024
79ea3cb
[#139] Merge current main
elfenpiff Apr 4, 2024
e29d53a
[#139] Add documentation
elfenpiff Apr 4, 2024
66d1d78
[#139] Improve bitset code
elfenpiff Apr 5, 2024
668321d
[#139] Add bitset benchmark
elfenpiff Apr 8, 2024
93778cd
[#139] Adjust benchmark; add small optimization
elfenpiff Apr 8, 2024
998bfe9
[#139] Remove premature optimization trap active_bits to massively in…
elfenpiff Apr 8, 2024
b4f3687
[#139] Refactor benchmark so that it is a event service benchmark
elfenpiff Apr 8, 2024
9db4b1e
[#139] Add event benchmark to readme
elfenpiff Apr 8, 2024
99d056a
[#139] Fix doc examples
elfenpiff Apr 8, 2024
021142e
[#139] Reduce repetitions in event tests since uds have smaller buffe…
elfenpiff Apr 8, 2024
6f7ab35
[#139] Disable semaphore approach for mac os since atomic wait seems …
elfenpiff Apr 8, 2024
bd6a1de
[#139] Add detailed explanation and reworded doc
elfenpiff Apr 8, 2024
9ca036a
[#139] Do not use sem bitset in shm for windows
elfenpiff Apr 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ members = [

"examples",

"benchmarks/publish-subscribe"
"benchmarks/publish-subscribe",
"benchmarks/event"
]

[workspace.package]
Expand Down
29 changes: 24 additions & 5 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,38 @@

## Publish-Subscribe

The benchmark quantifies the latency between a publisher sending a message and
a subscriber receiving it. In the setup, a bidirectional connection is
The benchmark quantifies the latency between a `Publisher` sending a message and
a `Subscriber` receiving it. In the setup, a bidirectional connection is
established from process `a` to `b` (service name `a2b`) and back
(service name `b2a`). Subscribers employ multithreaded busy waiting and promptly
(service name `b2a`). `Subscriber`s employ multithreaded busy waiting and promptly
respond upon message reception. This process repeats `n` times, and the average
latency is subsequently computed.

```sh
cargo run --release benchmark-publish-subscribe -- --bench-all
cargo run --bin benchmark-publish-subscribe --release -- --bench-all
```

For more benchmark configuration details, see

```sh
cargo run --release benchmark-publish-subscribe -- --help
cargo run --bin benchmark-publish-subscribe --release -- --help
```

## Event

The event quantifies the latency between a `Notifier` sending a notification and
a `Listener` waking up from and responding to it. In the setup, a bidirectional connection is
established from process `a` to `b` (service name `a2b`) and back
(service name `b2a`). The `Listener` employs a blocking wait and wakes up on signal
reception to promptly respond with a return signal notification. This process repeats `n`
times, and the average latency is subsequently computed.

```sh
cargo run --bin benchmark-event --release -- -i 10000
```

For more benchmark configuration details, see

```sh
cargo run --bin benchmark-event --release -- --help
```
17 changes: 17 additions & 0 deletions benchmarks/event/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
name = "benchmark-event"
description = "iceoryx2: [internal] benchmark for the iceoryx2 event services"
categories = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
keywords = { workspace = true }
license = { workspace = true }
repository = { workspace = true }
rust-version = { workspace = true }
version = { workspace = true }

[dependencies]
iceoryx2 = { workspace = true }
iceoryx2-bb-log = { workspace = true }

clap = { workspace = true }
102 changes: 102 additions & 0 deletions benchmarks/event/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright (c) 2024 Contributors to the Eclipse Foundation
//
// See the NOTICE file(s) distributed with this work for additional
// information regarding copyright ownership.
//
// This program and the accompanying materials are made available under the
// terms of the Apache Software License 2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license
// which is available at https://opensource.org/licenses/MIT.
//
// SPDX-License-Identifier: Apache-2.0 OR MIT

use std::{sync::Barrier, time::Instant};

use clap::Parser;
use iceoryx2::prelude::*;
use iceoryx2_bb_log::set_log_level;

fn perform_benchmark<T: Service>(args: &Args) {
let service_name_a2b = ServiceName::new("a2b").unwrap();
let service_name_b2a = ServiceName::new("b2a").unwrap();

let service_a2b = T::new(&service_name_a2b)
.event()
.max_notifiers(1)
.max_listeners(1)
.create()
.unwrap();

let service_b2a = T::new(&service_name_b2a)
.event()
.max_notifiers(1)
.max_listeners(1)
.create()
.unwrap();

let barrier = Barrier::new(3);

std::thread::scope(|s| {
let t1 = s.spawn(|| {
let notifier_a2b = service_a2b.notifier().create().unwrap();
let mut listener_b2a = service_b2a.listener().create().unwrap();

barrier.wait();
notifier_a2b.notify().expect("failed to notify");

for _ in 0..args.iterations {
while listener_b2a.blocking_wait().unwrap().is_empty() {}
notifier_a2b.notify().expect("failed to notify");
}
});

let t2 = s.spawn(|| {
let notifier_b2a = service_b2a.notifier().create().unwrap();
let mut listener_a2b = service_a2b.listener().create().unwrap();

barrier.wait();
for _ in 0..args.iterations {
while listener_a2b.blocking_wait().unwrap().is_empty() {}
notifier_b2a.notify().expect("failed to notify");
}
});

std::thread::sleep(std::time::Duration::from_millis(100));
let start = Instant::now();
barrier.wait();
Comment on lines +64 to +66
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
std::thread::sleep(std::time::Duration::from_millis(100));
let start = Instant::now();
barrier.wait();
barrier.wait();
let start = Instant::now();

Why not just this?

Copy link
Contributor Author

@elfenpiff elfenpiff Apr 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it is possible that the OS does not schedule you anymore after the wait call and then you start measuring too late and the numbers are in your favor. So I wanted to be a bit more pessimistic here.

I do the same thing in the publish-subscribe benchmark

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case two barriers can be used to get rid of the sleep and make it more deterministic ... just a suggestion, not action required


t1.join().expect("thread failure");
t2.join().expect("thread failure");

let stop = start.elapsed();
println!(
"{} ::: Iterations: {}, Time: {}, Latency: {} ns",
std::any::type_name::<T>(),
args.iterations,
stop.as_secs_f64(),
stop.as_nanos() / (args.iterations as u128 * 2)
);
});
}

const ITERATIONS: usize = 1000000;
const CAPACITY: usize = 128;

#[derive(Parser, Debug)]
#[clap(version, about, long_about = None)]
struct Args {
/// Number of iterations the A --> B --> A communication is repeated
#[clap(short, long, default_value_t = ITERATIONS)]
iterations: usize,
/// Capacity of the bitset
#[clap(short, long, default_value_t = CAPACITY)]
capacity: usize,
}

fn main() {
let args = Args::parse();
set_log_level(iceoryx2_bb_log::LogLevel::Error);

perform_benchmark::<zero_copy::Service>(&args);
perform_benchmark::<process_local::Service>(&args);
}
20 changes: 2 additions & 18 deletions iceoryx2-bb/lock-free/src/mpmc/bit_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@
//! bitset.set(5);
//!
//! // resets the bitset and calls the callback for every bit that was set
//! bitset.reset(|id| {
//! bitset.reset_all(|id| {
//! println!("bit {} was set", id );
//! });
//! ```

use std::{
alloc::Layout,
fmt::Debug,
sync::atomic::{AtomicBool, AtomicU8, Ordering},
sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering},
};

use iceoryx2_bb_elementary::{
Expand All @@ -59,7 +59,6 @@ pub type RelocatableBitSet = details::BitSet<RelocatablePointer<details::BitsetE

#[doc(hidden)]
pub mod details {
use std::sync::atomic::AtomicUsize;

use super::*;

Expand Down Expand Up @@ -87,7 +86,6 @@ pub mod details {
capacity: usize,
array_capacity: usize,
reset_position: AtomicUsize,
active_bits: AtomicUsize,
is_memory_initialized: AtomicBool,
}

Expand Down Expand Up @@ -115,7 +113,6 @@ pub mod details {
array_capacity,
is_memory_initialized: AtomicBool::new(true),
reset_position: AtomicUsize::new(0),
active_bits: AtomicUsize::new(0),
}
}
}
Expand All @@ -128,7 +125,6 @@ pub mod details {
array_capacity: Self::array_capacity(capacity),
is_memory_initialized: AtomicBool::new(false),
reset_position: AtomicUsize::new(0),
active_bits: AtomicUsize::new(0),
}
}

Expand Down Expand Up @@ -176,7 +172,6 @@ pub mod details {
array_capacity: Self::array_capacity(capacity),
is_memory_initialized: AtomicBool::new(true),
reset_position: AtomicUsize::new(0),
active_bits: AtomicUsize::new(0),
}
}
}
Expand Down Expand Up @@ -224,7 +219,6 @@ pub mod details {
Ordering::Relaxed,
) {
Ok(_) => {
self.active_bits.fetch_add(1, Ordering::Relaxed);
return true;
}
Err(v) => current = v,
Expand All @@ -251,7 +245,6 @@ pub mod details {
Ordering::Relaxed,
) {
Ok(_) => {
self.active_bits.fetch_sub(1, Ordering::Relaxed);
return true;
}
Err(v) => current = v,
Expand All @@ -277,9 +270,6 @@ pub mod details {
/// [`None`].
pub fn reset_next(&self) -> Option<usize> {
self.verify_init("reset_next");
if self.active_bits.load(Ordering::Relaxed) == 0 {
return None;
}

let current_position = self.reset_position.load(Ordering::Relaxed);
for pos in (current_position..self.capacity).chain(0..current_position) {
Expand All @@ -299,18 +289,12 @@ pub mod details {

for i in 0..self.array_capacity {
let value = unsafe { (*self.data_ptr.as_ptr().add(i)).swap(0, Ordering::Relaxed) };
let mut counter = 0;
let main_index = i * BITSET_ELEMENT_BITSIZE;
for b in 0..BITSET_ELEMENT_BITSIZE {
if value & (1 << b) != 0 {
callback(main_index + b);
counter += 1;
}
}

if self.active_bits.fetch_sub(counter, Ordering::Relaxed) == counter {
return;
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions iceoryx2-bb/posix/src/semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ impl NamedSemaphoreCreationBuilder {
/// .expect("failed to open semaphore");
///
/// loop {
/// semaphore.wait().expect("failed to wait on semaphore");
/// semaphore.blocking_wait().expect("failed to wait on semaphore");
/// println!("process 1 has triggered me");
/// }
/// ```
Expand Down Expand Up @@ -712,7 +712,7 @@ impl Drop for UnnamedSemaphoreHandle {
/// thread::scope(|s| {
/// s.spawn(|| {
/// loop {
/// semaphore.wait().expect("failed to wait on semaphore");
/// semaphore.blocking_wait().expect("failed to wait on semaphore");
/// println!("the thread was triggered");
/// }
/// });
Expand Down
23 changes: 15 additions & 8 deletions iceoryx2-cal/src/event/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
pub mod details {
use std::{fmt::Debug, marker::PhantomData, time::Duration};

use iceoryx2_bb_log::fail;
use iceoryx2_bb_log::{debug, fail};
use iceoryx2_bb_memory::bump_allocator::BumpAllocator;
use iceoryx2_bb_system_types::{file_name::FileName, path::Path};

Expand All @@ -32,7 +32,7 @@ pub mod details {
},
};

const TRIGGER_ID_DEFAULT_MAX: TriggerId = TriggerId::new(u16::MAX as u64);
const TRIGGER_ID_DEFAULT_MAX: TriggerId = TriggerId::new(u16::MAX as _);

#[derive(Debug)]
pub struct Management<Tracker: IdTracker, WaitMechanism: SignalMechanism> {
Expand Down Expand Up @@ -383,6 +383,7 @@ pub mod details {
fn try_wait(
&self,
) -> Result<Option<crate::event::TriggerId>, crate::event::ListenerWaitError> {
// collect all notifications until no more are available
while unsafe { self.storage.get().signal_mechanism.try_wait()? } {}
Ok(unsafe { self.storage.get().id_tracker.acquire() })
}
Expand All @@ -395,11 +396,14 @@ pub mod details {
return Ok(Some(id));
}

if unsafe { self.storage.get().signal_mechanism.timed_wait(timeout)? } {
return Ok(unsafe { self.storage.get().id_tracker.acquire() });
}

Ok(None)
Ok(unsafe {
self.storage
.get()
.signal_mechanism
.timed_wait(timeout)?
.then_some(self.storage.get().id_tracker.acquire())
.flatten()
})
}

fn blocking_wait(
Expand Down Expand Up @@ -459,10 +463,13 @@ pub mod details {
mgmt: &mut Management<Tracker, WaitMechanism>,
allocator: &mut BumpAllocator,
) -> bool {
let origin = "init()";
if unsafe { mgmt.id_tracker.init(allocator).is_err() } {
debug!(from origin, "Unable to initialize IdTracker.");
return false;
}
if unsafe { mgmt.signal_mechanism.init().is_err() } {
debug!(from origin, "Unable to initialize SignalMechanism.");
return false;
}

Expand All @@ -489,7 +496,7 @@ pub mod details {
ListenerCreateError,
> {
let msg = "Failed to create Listener";
let id_tracker_capacity = self.trigger_id_max.as_u64() as usize;
let id_tracker_capacity = self.trigger_id_max.as_u64() as usize + 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the +1 here and teh -1 in trigger_id_max?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized that I assumed that trigger_id_max is the greatest trigger id I can trigger but actually it was the smallest trigger id I can no longer trigger. So it was counter intuitive and I used it wrong right from the beginning.

To support all ids up to - including - trigger id max, the bitset requires a capacity of + 1 and the trigger id max on the other side is then capacity - 1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it acts kind of like an INVALID_TRIGGER_ID?


match Storage::Builder::new(&self.name)
.config(&self.config.convert())
Expand Down
4 changes: 2 additions & 2 deletions iceoryx2-cal/src/event/id_tracker/bit_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ use crate::event::{NotifierNotifyError, TriggerId};

impl IdTracker for RelocatableBitSet {
fn trigger_id_max(&self) -> TriggerId {
TriggerId::new(self.capacity() as u64)
TriggerId::new(self.capacity() as u64 - 1)
}

unsafe fn add(&self, id: TriggerId) -> Result<(), NotifierNotifyError> {
if self.trigger_id_max() <= id {
if self.trigger_id_max() < id {
fail!(from self, with NotifierNotifyError::TriggerIdOutOfBounds,
"Unable to set bit {:?} since it is out of bounds (max = {:?}).",
id, self.trigger_id_max());
Expand Down
Loading
Loading