Skip to content

Commit db9ed5e

Browse files
authored
Merge pull request #424 from elfenpiff/iox2-390-waitset
[#390] waitset
2 parents 6f05b7b + 5904a48 commit db9ed5e

File tree

78 files changed

+2354
-575
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

78 files changed

+2354
-575
lines changed

README.md

+5-5
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
123123

124124
let publisher = service.publisher_builder().create()?;
125125

126-
while let NodeEvent::Tick = node.wait(CYCLE_TIME) {
126+
while node.wait(CYCLE_TIME) != WaitEvent::TerminationRequest {
127127
let sample = publisher.loan_uninit()?;
128128
let sample = sample.write_payload(1234);
129129
sample.send()?;
@@ -150,7 +150,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
150150

151151
let subscriber = service.subscriber_builder().create()?;
152152

153-
while let NodeEvent::Tick = node.wait(CYCLE_TIME) {
153+
while node.wait(CYCLE_TIME) != WaitEvent::TerminationRequest {
154154
while let Some(sample) = subscriber.receive()? {
155155
println!("received: {:?}", *sample);
156156
}
@@ -200,7 +200,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
200200
let notifier = event.notifier_builder().create()?;
201201

202202
let id = EventId::new(12);
203-
while let NodeEvent::Tick = node.wait(CYCLE_TIME) {
203+
while node.wait(CYCLE_TIME) != WaitEvent::TerminationRequest {
204204
notifier.notify_with_custom_event_id(id)?;
205205

206206
println!("Trigger event with id {:?} ...", id);
@@ -227,7 +227,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
227227

228228
let listener = event.listener_builder().create()?;
229229

230-
while let NodeEvent::Tick = node.wait(Duration::ZERO) {
230+
while node.wait(Duration::ZERO) != WaitEvent::TerminationRequest {
231231
if let Ok(Some(event_id)) = listener.timed_wait_one(CYCLE_TIME) {
232232
println!("event was triggered with id: {:?}", event_id);
233233
}
@@ -254,7 +254,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
254254

255255
let listener = event.listener_builder().create()?;
256256

257-
while let NodeEvent::Tick = node.wait(Duration::ZERO) {
257+
while node.wait(Duration::ZERO) != WaitEvent::TerminationRequest {
258258
listener.timed_wait_all(
259259
|event_id| {
260260
println!("event was triggered with id: {:?}", event_id);

doc/release-notes/iceoryx2-unreleased.md

+9-4
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
conflicts when merging.
1212
-->
1313

14+
* Add Event-Multiplexer `WaitSet` [#390](https://github.com/eclipse-iceoryx/iceoryx2/issues/390)
1415
* Add `PeriodicTimer` into POSIX building blocks [#425](https://github.com/eclipse-iceoryx/iceoryx2/issues/425)
1516

1617
### Bugfixes
@@ -29,7 +30,7 @@
2930
conflicts when merging.
3031
-->
3132

32-
* Example text [#1](https://github.com/eclipse-iceoryx/iceoryx2/issues/1)
33+
* Rename `NodeEvent` into `WaitEvent` [#390](https://github.com/eclipse-iceoryx/iceoryx2/issues/390)
3334

3435
### Workflow
3536

@@ -51,12 +52,16 @@
5152

5253
### API Breaking Changes
5354

54-
1. Example
55+
1. Renamed `NodeEvent` into `WaitEvent`
5556

5657
```rust
5758
// old
58-
let fuu = hello().is_it_me_you_re_looking_for()
59+
while node.wait(CYCLE_TIME) != NodeEvent::TerminationRequest {
60+
// ...
61+
}
5962

6063
// new
61-
let fuu = hypnotoad().all_glory_to_the_hypnotoad()
64+
while node.wait(CYCLE_TIME) != WaitEvent::TerminationRequest {
65+
// ...
66+
}
6267
```

examples/Cargo.toml

+10
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,16 @@ path = "rust/event/listener.rs"
4343
name = "event_notifier"
4444
path = "rust/event/notifier.rs"
4545

46+
# event multiplexing
47+
48+
[[example]]
49+
name = "event_multiplexing_notifier"
50+
path = "rust/event_multiplexing/notifier.rs"
51+
52+
[[example]]
53+
name = "event_multiplexing_wait"
54+
path = "rust/event_multiplexing/wait.rs"
55+
4656
# publish_subscribe
4757

4858
[[example]]

examples/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ and exchange data.
4949
| docker | [all](rust/docker) | Communicate between different docker containers and the host. |
5050
| domains | [C](c/domains) [C++](cxx/domains) [Rust](rust/domains) | Establish separate domains that operate independently from one another. |
5151
| event | [C](c/event) [C++](cxx/event) [Rust](rust/event) | Push notifications - send event signals to wakeup processes that are waiting for them. |
52+
| event multiplexing | [Rust](rust/event_multiplexing) | Wait on multiple listeners or sockets with a single call. The WaitSet demultiplexes incoming events and notifies the user. |
5253
| publish subscribe | [C](c/publish_subscribe) [C++](cxx/publish_subscribe) [Rust](rust/publish_subscribe) | Communication between multiple processes with a [publish subscribe messaging pattern](https://en.wikipedia.org/wiki/Publish–subscribe_pattern). |
5354
| publish subscribe dynamic data | [Rust](rust/publish_subscribe_dynamic_data) | Communication between multiple processes with a [publish subscribe messaging pattern](https://en.wikipedia.org/wiki/Publish–subscribe_pattern) and payload data that has a dynamic size. |
5455
| publish subscribe with user header | [C](c/publish_subscribe_with_user_header) [C++](cxx/publish_subscribe_with_user_header) [Rust](rust/publish_subscribe_with_user_header) | Add a user header to the payload (samples) to transfer additional information. |

examples/c/domains/src/publisher.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ int main(int argc, char** argv) {
9494
}
9595

9696
int32_t counter = 0;
97-
while (iox2_node_wait(&node_handle, 1, 0) == iox2_node_event_e_TICK) {
97+
while (iox2_node_wait(&node_handle, 1, 0) == IOX2_OK) {
9898
counter += 1;
9999

100100
// loan sample

examples/c/domains/src/subscriber.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ int main(int argc, char** argv) {
9696

9797
uint64_t counter = 0;
9898
printf("subscribed to: [domain: \"%s\", service: \"%s\"]\n", argv[1], argv[2]);
99-
while (iox2_node_wait(&node_handle, 1, 0) == iox2_node_event_e_TICK) {
99+
while (iox2_node_wait(&node_handle, 1, 0) == IOX2_OK) {
100100
counter += 1;
101101

102102
// receive sample

examples/c/event/src/listener.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ int main(void) {
5252
}
5353
iox2_event_id_t event_id;
5454

55-
while (iox2_node_wait(&node_handle, 0, 0) == iox2_node_event_e_TICK) {
55+
while (iox2_node_wait(&node_handle, 0, 0) == IOX2_OK) {
5656
bool has_received_one = false;
5757
if (iox2_listener_timed_wait_one(&listener, &event_id, &has_received_one, 1, 0) != IOX2_OK) {
5858
printf("Unable to wait for notification!\n");

examples/c/event/src/notifier.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ int main(void) {
5959
}
6060

6161
uint64_t counter = 0;
62-
while (iox2_node_wait(&node_handle, 0, 0) == iox2_node_event_e_TICK) {
62+
while (iox2_node_wait(&node_handle, 0, 0) == IOX2_OK) {
6363
counter += 1;
6464
iox2_event_id_t event_id = { .value = counter % 12 }; // NOLINT
6565
if (iox2_notifier_notify_with_custom_event_id(&notifier, &event_id, NULL) != IOX2_OK) {

examples/c/publish_subscribe/src/publisher.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ int main(void) {
7474
}
7575

7676
int32_t counter = 0;
77-
while (iox2_node_wait(&node_handle, 1, 0) == iox2_node_event_e_TICK) {
77+
while (iox2_node_wait(&node_handle, 1, 0) == IOX2_OK) {
7878
counter += 1;
7979

8080
// loan sample

examples/c/publish_subscribe/src/subscriber.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ int main(void) {
7474
}
7575

7676
uint64_t counter = 0;
77-
while (iox2_node_wait(&node_handle, 1, 0) == iox2_node_event_e_TICK) {
77+
while (iox2_node_wait(&node_handle, 1, 0) == IOX2_OK) {
7878
counter += 1;
7979

8080
// receive sample

examples/c/publish_subscribe_with_user_header/src/publisher.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ int main(void) {
8787
}
8888

8989
int32_t counter = 0;
90-
while (iox2_node_wait(&node_handle, 1, 0) == iox2_node_event_e_TICK) {
90+
while (iox2_node_wait(&node_handle, 1, 0) == IOX2_OK) {
9191
counter += 1;
9292

9393
// loan sample

examples/c/publish_subscribe_with_user_header/src/subscriber.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ int main(void) {
8787
}
8888

8989
uint64_t counter = 0;
90-
while (iox2_node_wait(&node_handle, 1, 0) == iox2_node_event_e_TICK) {
90+
while (iox2_node_wait(&node_handle, 1, 0) == IOX2_OK) {
9191
counter += 1;
9292

9393
// receive sample

examples/cxx/complex_data_types/src/complex_data_types.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ auto main() -> int {
5151
auto subscriber = service.subscriber_builder().create().expect("successful subscriber creation");
5252

5353
auto counter = 0;
54-
while (node.wait(CYCLE_TIME) == NodeEvent::Tick) {
54+
while (node.wait(CYCLE_TIME).has_value()) {
5555
counter += 1;
5656
auto sample = publisher.loan_uninit().expect("acquire sample");
5757
new (&sample.payload_mut()) ComplexDataType {};

examples/cxx/domains/src/publisher.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ auto main(int argc, char** argv) -> int {
6161
auto publisher = service.publisher_builder().create().expect("successful publisher creation");
6262

6363
auto counter = 0;
64-
while (node.wait(CYCLE_TIME) == NodeEvent::Tick) {
64+
while (node.wait(CYCLE_TIME).has_value()) {
6565
counter += 1;
6666

6767
auto sample = publisher.loan_uninit().expect("acquire sample");

examples/cxx/domains/src/subscriber.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ auto main(int argc, char** argv) -> int {
5858

5959
std::cout << "subscribed to: [domain: \"" << args.domain() << "\", service: \"" << args.service() << "\"]"
6060
<< std::endl;
61-
while (node.wait(CYCLE_TIME) == NodeEvent::Tick) {
61+
while (node.wait(CYCLE_TIME).has_value()) {
6262
auto sample = subscriber.receive().expect("receive succeeds");
6363
while (sample.has_value()) {
6464
std::cout << "received: " << sample->payload() << std::endl;

examples/cxx/event/src/listener.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ auto main() -> int {
3030

3131
auto listener = service.listener_builder().create().expect("successful listener creation");
3232

33-
while (node.wait(iox::units::Duration::zero()) == NodeEvent::Tick) {
33+
while (node.wait(iox::units::Duration::zero()).has_value()) {
3434
listener.timed_wait_one(CYCLE_TIME).and_then([](auto maybe_event_id) {
3535
maybe_event_id.and_then(
3636
[](auto event_id) { std::cout << "event was triggered with id: " << event_id << std::endl; });

examples/cxx/event/src/notifier.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ auto main() -> int {
3333
auto notifier = service.notifier_builder().create().expect("successful notifier creation");
3434

3535
auto counter = 0;
36-
while (node.wait(CYCLE_TIME) == NodeEvent::Tick) {
36+
while (node.wait(CYCLE_TIME).has_value()) {
3737
counter += 1;
3838
const auto event_id = EventId(counter % max_event_id);
3939
notifier.notify_with_custom_event_id(event_id).expect("notification");

examples/cxx/publish_subscribe/src/publisher.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ auto main() -> int {
3434
auto publisher = service.publisher_builder().create().expect("successful publisher creation");
3535

3636
auto counter = 0;
37-
while (node.wait(CYCLE_TIME) == NodeEvent::Tick) {
37+
while (node.wait(CYCLE_TIME).has_value()) {
3838
counter += 1;
3939

4040
auto sample = publisher.loan_uninit().expect("acquire sample");

examples/cxx/publish_subscribe/src/subscriber.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ auto main() -> int {
3131

3232
auto subscriber = service.subscriber_builder().create().expect("successful subscriber creation");
3333

34-
while (node.wait(CYCLE_TIME) == NodeEvent::Tick) {
34+
while (node.wait(CYCLE_TIME).has_value()) {
3535
auto sample = subscriber.receive().expect("receive succeeds");
3636
while (sample.has_value()) {
3737
std::cout << "received: " << sample->payload() << std::endl;

examples/cxx/publish_subscribe_dynamic_data/src/publisher.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ auto main() -> int {
4040

4141
auto counter = 1;
4242

43-
while (node.wait(CYCLE_TIME) == NodeEvent::Tick) {
43+
while (node.wait(CYCLE_TIME).has_value()) {
4444
counter += 1;
4545

4646
auto required_memory_size = (8 + counter) % 16; // NOLINT

examples/cxx/publish_subscribe_dynamic_data/src/subscriber.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ auto main() -> int {
3232

3333
auto subscriber = service.subscriber_builder().create().expect("successful subscriber creation");
3434

35-
while (node.wait(CYCLE_TIME) == NodeEvent::Tick) {
35+
while (node.wait(CYCLE_TIME).has_value()) {
3636
auto sample = subscriber.receive().expect("receive succeeds");
3737
while (sample.has_value()) {
3838
std::cout << "received " << sample->payload().size() << " bytes: ";

examples/cxx/publish_subscribe_with_user_header/src/publisher.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ auto main() -> int {
3838
auto publisher = service.publisher_builder().create().expect("successful publisher creation");
3939

4040
auto counter = 0;
41-
while (node.wait(CYCLE_TIME) == NodeEvent::Tick) {
41+
while (node.wait(CYCLE_TIME).has_value()) {
4242
counter += 1;
4343
auto sample = publisher.loan_uninit().expect("acquire sample");
4444

examples/cxx/publish_subscribe_with_user_header/src/subscriber.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ auto main() -> int {
3333

3434
auto subscriber = service.subscriber_builder().create().expect("successful subscriber creation");
3535

36-
while (node.wait(CYCLE_TIME) == NodeEvent::Tick) {
36+
while (node.wait(CYCLE_TIME).has_value()) {
3737
auto sample = subscriber.receive().expect("receive succeeds");
3838
while (sample.has_value()) {
3939
std::cout << "received: " << sample->payload() << ", user_header: " << sample->user_header() << std::endl;

examples/cxx/service_attributes/src/creator.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ auto main() -> int {
4343

4444
std::cout << "defined service attributes: " << service.attributes() << std::endl;
4545

46-
while (node.wait(CYCLE_TIME) == NodeEvent::Tick) {
46+
while (node.wait(CYCLE_TIME).has_value()) {
4747
auto sample = publisher.loan().expect("acquire sample");
4848
sample.payload_mut() = 0;
4949
send(std::move(sample)).expect("send successful");

examples/cxx/service_attributes/src/opener.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ auto main() -> int {
3636

3737
std::cout << "defined service attributes: " << service.attributes() << std::endl;
3838

39-
while (node.wait(CYCLE_TIME) == NodeEvent::Tick) {
39+
while (node.wait(CYCLE_TIME).has_value()) {
4040
auto sample = subscriber.receive().expect("receive succeeds");
4141
while (sample.has_value()) {
4242
std::cout << "received: " << sample->payload() << std::endl;

examples/rust/complex_data_types/complex_data_types.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
5454
let subscriber = service.subscriber_builder().create()?;
5555
let mut counter = 0;
5656

57-
while let NodeEvent::Tick = node.wait(CYCLE_TIME) {
57+
while node.wait(CYCLE_TIME).is_ok() {
5858
// ComplexDataType as a size of over 30MB, we need to perform a placement new
5959
// otherwise we will encounter a stack overflow in debug builds.
6060
// Therefore, we acquire an uninitialized sample, use the PlacementDefault

examples/rust/domains/publisher.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
4747

4848
let mut counter: u64 = 0;
4949

50-
while let NodeEvent::Tick = node.wait(CYCLE_TIME) {
50+
while node.wait(CYCLE_TIME).is_ok() {
5151
counter += 1;
5252
let sample = publisher.loan_uninit()?;
5353

examples/rust/domains/subscriber.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
4646
"subscribed to: [domain: \"{}\", service: \"{}\"]",
4747
args.domain, args.service
4848
);
49-
while let NodeEvent::Tick = node.wait(CYCLE_TIME) {
49+
while node.wait(CYCLE_TIME).is_ok() {
5050
while let Some(sample) = subscriber.receive()? {
5151
println!("received: {:?}", *sample);
5252
}

examples/rust/event/listener.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
2525

2626
let listener = event.listener_builder().create()?;
2727

28-
while let NodeEvent::Tick = node.wait(Duration::ZERO) {
28+
while node.wait(Duration::ZERO).is_ok() {
2929
if let Ok(Some(event_id)) = listener.timed_wait_one(CYCLE_TIME) {
3030
println!("event was triggered with id: {:?}", event_id);
3131
}

examples/rust/event/notifier.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
2727
let notifier = event.notifier_builder().create()?;
2828

2929
let mut counter: usize = 0;
30-
while let NodeEvent::Tick = node.wait(CYCLE_TIME) {
30+
while node.wait(CYCLE_TIME).is_ok() {
3131
counter += 1;
3232
notifier.notify_with_custom_event_id(EventId::new(counter % max_event_id))?;
3333

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
# Event Multiplexing
2+
3+
## Running The Example
4+
5+
This example demonstrates iceoryx2's event multiplexing mechanism,
6+
called the `WaitSet`. It allows waiting, with a single call, on
7+
multiple `Listener` ports as well as external file descriptor-based
8+
events such as `sockets`.
9+
10+
In this setup, the `wait` process monitors an arbitrary number of
11+
services, which the user can specify via the command line option `-s`.
12+
The `notifier` can define the service to which it will send event
13+
notifications using the `-s` option and specify the event ID with
14+
the `-e` option.
15+
16+
In the example below, we are waiting for events on the services `fuu` and
17+
`bar`. Service `fuu` is notified with event ID `123`, and service `bar` is
18+
notified with event ID `456`.
19+
20+
### Terminal 1
21+
22+
```sh
23+
cargo run --example event_multiplexing_wait -- -s "fuu" -s "bar"
24+
```
25+
26+
### Terminal 2
27+
28+
```sh
29+
cargo run --example event_multiplexing_notifier -- -s "fuu" -e 123
30+
```
31+
32+
### Terminal 3
33+
34+
```sh
35+
cargo run --example event_multiplexing_notifier -- -s "bar" -e 456
36+
```
37+
38+
Feel free to instantiate multiple notifiers for the same service with the same
39+
or different event id's. Or to for different services.
40+
41+
## Technical Details
42+
43+
The `WaitSet` utilizes `epoll`, `select`, or other event-multiplexing
44+
mechanisms. Before the `WaitSet` can monitor a specific event, it must first be
45+
attached using `WaitSet::attach()`, which returns a RAII `Guard`. This `Guard`
46+
automatically detaches the attachment when it goes out of scope.
47+
48+
The `WaitSet::**_wait()` calls require a closure that is invoked for each
49+
triggered attachment and provides the `AttachmentId`. The user can either use
50+
`AttachmentId::originates_from($ATTACHED_OBJECT$)` to identify the object
51+
associated with the `AttachmentId`, or set up a
52+
`HashMap::<AttachmentId, Listener<ipc::Service>>` to quickly access the
53+
corresponding object.

0 commit comments

Comments
 (0)