Skip to content

Commit 0c7ca92

Browse files
committed
[#390] Use temporarily uds in process local; add waitset tests
1 parent c86cc87 commit 0c7ca92

File tree

5 files changed

+233
-112
lines changed

5 files changed

+233
-112
lines changed

iceoryx2/src/node/mod.rs

+28-6
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,6 @@ pub mod node_name;
121121
pub mod testing;
122122

123123
use crate::node::node_name::NodeName;
124-
use crate::prelude::WaitEvent;
125124
use crate::service::builder::{Builder, OpenDynamicStorageFailure};
126125
use crate::service::config_scheme::{
127126
node_details_path, node_monitoring_config, service_tag_config,
@@ -196,6 +195,23 @@ impl std::fmt::Display for NodeCreationFailure {
196195

197196
impl std::error::Error for NodeCreationFailure {}
198197

198+
/// The failures that can occur when a list of [`NodeState`]s is created with [`Node::list()`].
199+
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
200+
pub enum NodeWaitFailure {
201+
/// The process received an interrupt signal while acquiring the list of all [`Node`]s.
202+
Interrupt,
203+
/// A termination signal `SIGTERM` was received.
204+
TerminationRequest,
205+
}
206+
207+
impl std::fmt::Display for NodeWaitFailure {
208+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209+
std::write!(f, "NodeWaitFailure::{:?}", self)
210+
}
211+
}
212+
213+
impl std::error::Error for NodeWaitFailure {}
214+
199215
/// The failures that can occur when a list of [`NodeState`]s is created with [`Node::list()`].
200216
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
201217
pub enum NodeListFailure {
@@ -791,20 +807,26 @@ impl<Service: service::Service> Node<Service> {
791807

792808
/// Waits until an event was received. It returns
793809
/// [`WaitEvent::Tick`] when the `cycle_time` has passed, otherwise event that occurred.
794-
pub fn wait(&self, cycle_time: Duration) -> WaitEvent {
810+
pub fn wait(&self, cycle_time: Duration) -> Result<(), NodeWaitFailure> {
811+
let msg = "Unable to wait on node";
795812
if SignalHandler::termination_requested() {
796-
return WaitEvent::TerminationRequest;
813+
fail!(from self, with NodeWaitFailure::TerminationRequest,
814+
"{msg} since a termination request was received.");
797815
}
798816

799817
match nanosleep(cycle_time) {
800818
Ok(()) => {
801819
if SignalHandler::termination_requested() {
802-
WaitEvent::TerminationRequest
820+
fail!(from self, with NodeWaitFailure::TerminationRequest,
821+
"{msg} since a termination request was received.");
803822
} else {
804-
WaitEvent::Tick
823+
Ok(())
805824
}
806825
}
807-
Err(NanosleepError::InterruptedBySignal(_)) => WaitEvent::Interrupt,
826+
Err(NanosleepError::InterruptedBySignal(_)) => {
827+
fail!(from self, with NodeWaitFailure::Interrupt,
828+
"{msg} since a interrupt signal was received.");
829+
}
808830
Err(v) => {
809831
fatal_panic!(from self,
810832
"Failed to wait with cycle time {:?} in main event look, caused by ({:?}).",

iceoryx2/src/port/waitset.rs

+63-75
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
//! }
6565
//! };
6666
//!
67-
//! while waitset.run(event_handler) != Ok(WaitEvent::TerminationRequest) {}
67+
//! while waitset.run(event_handler).is_ok() {}
6868
//!
6969
//! # Ok(())
7070
//! # }
@@ -97,7 +97,7 @@
9797
//! }
9898
//! };
9999
//!
100-
//! while waitset.run(event_handler) != Ok(WaitEvent::TerminationRequest) {}
100+
//! while waitset.run(event_handler).is_ok() {}
101101
//!
102102
//! # Ok(())
103103
//! # }
@@ -132,7 +132,7 @@
132132
//! }
133133
//! };
134134
//!
135-
//! while waitset.run(event_handler) != Ok(WaitEvent::TerminationRequest) {}
135+
//! while waitset.run(event_handler).is_ok() {}
136136
//!
137137
//! # Ok(())
138138
//! # }
@@ -172,7 +172,7 @@
172172
//! println!("received notification {:?}", event_id);
173173
//! }
174174
//! }
175-
//! }) != Ok(WaitEvent::TerminationRequest) {}
175+
//! }).is_ok() {}
176176
//!
177177
//! # Ok(())
178178
//! # }
@@ -193,20 +193,6 @@ use iceoryx2_bb_posix::{
193193
use iceoryx2_cal::reactor::*;
194194
use iceoryx2_pal_concurrency_sync::iox_atomic::IoxAtomicUsize;
195195

196-
/// Defines the type of that triggered [`WaitSet::try_wait()`], [`WaitSet::timed_wait()`] or
197-
/// [`WaitSet::blocking_wait()`].
198-
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
199-
pub enum WaitEvent {
200-
/// A termination signal `SIGTERM` was received.
201-
TerminationRequest,
202-
/// An interrupt signal `SIGINT` was received.
203-
Interrupt,
204-
/// No event was triggered.
205-
Tick,
206-
/// One or more event notifications were received.
207-
Notification,
208-
}
209-
210196
/// Defines the failures that can occur when attaching something with [`WaitSet::attach()`].
211197
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
212198
pub enum WaitSetAttachmentError {
@@ -226,27 +212,28 @@ impl std::fmt::Display for WaitSetAttachmentError {
226212

227213
impl std::error::Error for WaitSetAttachmentError {}
228214

229-
/// Defines the failures that can occur when calling
230-
/// * [`WaitSet::try_wait()`]
231-
/// * [`WaitSet::timed_wait()`]
232-
/// * [`WaitSet::blocking_wait()`]
215+
/// Defines the failures that can occur when calling [`WaitSet::run()`].
233216
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
234-
pub enum WaitSetWaitError {
217+
pub enum WaitSetRunError {
235218
/// The process has not sufficient permissions to wait on the attachments.
236219
InsufficientPermissions,
237220
/// An internal error has occurred.
238221
InternalError,
239222
/// Waiting on an empty [`WaitSet`] would lead to a deadlock therefore it causes an error.
240223
NoAttachments,
224+
/// A termination signal `SIGTERM` was received.
225+
TerminationRequest,
226+
/// An interrupt signal `SIGINT` was received.
227+
Interrupt,
241228
}
242229

243-
impl std::fmt::Display for WaitSetWaitError {
230+
impl std::fmt::Display for WaitSetRunError {
244231
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
245-
std::write!(f, "WaitSetWaitError::{:?}", self)
232+
std::write!(f, "WaitSetRunError::{:?}", self)
246233
}
247234
}
248235

249-
impl std::error::Error for WaitSetWaitError {}
236+
impl std::error::Error for WaitSetRunError {}
250237

251238
/// Defines the failures that can occur when calling [`WaitSetBuilder::create()`].
252239
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
@@ -327,12 +314,20 @@ impl<Service: crate::service::Service> AttachmentId<Service> {
327314
}
328315
}
329316

330-
/// Returns true if an event was emitted from the attachment corresponding to [`Guard`].
317+
/// Returns true if an event was emitted from a notification or deadline attachment
318+
/// corresponding to [`Guard`].
331319
pub fn event_from(&self, other: &Guard<Service>) -> bool {
332-
if let AttachmentIdType::Deadline(..) = self.attachment_type {
333-
false
320+
let other_attachment = other.to_attachment_id();
321+
if let AttachmentIdType::Deadline(other_waitset, other_reactor_idx, _) =
322+
other_attachment.attachment_type
323+
{
324+
if let AttachmentIdType::Notification(waitset, reactor_idx) = self.attachment_type {
325+
waitset == other_waitset && reactor_idx == other_reactor_idx
326+
} else {
327+
false
328+
}
334329
} else {
335-
self.attachment_type == other.to_attachment_id().attachment_type
330+
self.attachment_type == other_attachment.attachment_type
336331
}
337332
}
338333

@@ -481,19 +476,15 @@ impl<Service: crate::service::Service> WaitSet<Service> {
481476
.remove(&deadline_queue_idx);
482477
}
483478

484-
fn contains_deadlines(&self) -> bool {
485-
!self.attachment_to_deadline.borrow().is_empty()
486-
}
487-
488479
fn reset_deadline(
489480
&self,
490481
reactor_idx: i32,
491-
) -> Result<Option<DeadlineQueueIndex>, WaitSetWaitError> {
482+
) -> Result<Option<DeadlineQueueIndex>, WaitSetRunError> {
492483
let msg = "Unable to reset deadline";
493484
if let Some(deadline_queue_idx) = self.attachment_to_deadline.borrow().get(&reactor_idx) {
494485
fail!(from self,
495486
when self.deadline_queue.reset(*deadline_queue_idx),
496-
with WaitSetWaitError::InternalError,
487+
with WaitSetRunError::InternalError,
497488
"{msg} since the deadline_queue guard could not be reset for the attachment {reactor_idx}. Continuing operations will lead to invalid deadline failures.");
498489
Ok(Some(*deadline_queue_idx))
499490
} else {
@@ -505,7 +496,7 @@ impl<Service: crate::service::Service> WaitSet<Service> {
505496
&self,
506497
fn_call: &mut F,
507498
error_msg: &str,
508-
) -> Result<(), WaitSetWaitError> {
499+
) -> Result<(), WaitSetRunError> {
509500
let deadline_to_attachment = self.deadline_to_attachment.borrow();
510501
let call = |idx: DeadlineQueueIndex| {
511502
if let Some(reactor_idx) = deadline_to_attachment.get(&idx) {
@@ -515,12 +506,10 @@ impl<Service: crate::service::Service> WaitSet<Service> {
515506
}
516507
};
517508

518-
if self.contains_deadlines() {
519-
fail!(from self,
509+
fail!(from self,
520510
when self.deadline_queue.missed_deadlines(call),
521-
with WaitSetWaitError::InternalError,
511+
with WaitSetRunError::InternalError,
522512
"{error_msg} since the missed deadlines could not be acquired.");
523-
}
524513

525514
Ok(())
526515
}
@@ -530,32 +519,22 @@ impl<Service: crate::service::Service> WaitSet<Service> {
530519
triggered_file_descriptors: &Vec<i32>,
531520
fn_call: &mut F,
532521
error_msg: &str,
533-
) -> Result<(), WaitSetWaitError> {
522+
) -> Result<(), WaitSetRunError> {
534523
// we need to reset the deadlines first, otherwise a long fn_call may extend the
535524
// deadline unintentionally
536-
if self.contains_deadlines() {
537-
let mut fd_and_deadline_queue_idx = Vec::new();
538-
fd_and_deadline_queue_idx.reserve(triggered_file_descriptors.len());
525+
let mut fd_and_deadline_queue_idx = Vec::new();
526+
fd_and_deadline_queue_idx.reserve(triggered_file_descriptors.len());
539527

540-
for fd in triggered_file_descriptors {
541-
fd_and_deadline_queue_idx.push((fd, self.reset_deadline(*fd)?));
542-
}
528+
for fd in triggered_file_descriptors {
529+
fd_and_deadline_queue_idx.push((fd, self.reset_deadline(*fd)?));
530+
}
543531

544-
// must be called after the deadlines have been reset, in the case that the
545-
// event has been received shortly before the deadline ended.
546-
self.handle_deadlines(fn_call, error_msg)?;
532+
// must be called after the deadlines have been reset, in the case that the
533+
// event has been received shortly before the deadline ended.
534+
self.handle_deadlines(fn_call, error_msg)?;
547535

548-
for (fd, deadline_queue_idx) in fd_and_deadline_queue_idx {
549-
if let Some(deadline_queue_idx) = deadline_queue_idx {
550-
fn_call(AttachmentId::deadline(self, *fd, deadline_queue_idx));
551-
} else {
552-
fn_call(AttachmentId::notification(self, *fd));
553-
}
554-
}
555-
} else {
556-
for fd in triggered_file_descriptors {
557-
fn_call(AttachmentId::notification(self, *fd));
558-
}
536+
for fd in triggered_file_descriptors {
537+
fn_call(AttachmentId::notification(self, *fd));
559538
}
560539

561540
Ok(())
@@ -591,10 +570,15 @@ impl<Service: crate::service::Service> WaitSet<Service> {
591570
let reactor_guard = self.attach_to_reactor(attachment)?;
592571
let deadline_queue_guard = self.attach_to_deadline_queue(deadline)?;
593572

594-
self.attachment_to_deadline.borrow_mut().insert(
595-
unsafe { reactor_guard.file_descriptor().native_handle() },
596-
deadline_queue_guard.index(),
597-
);
573+
let reactor_idx = unsafe { reactor_guard.file_descriptor().native_handle() };
574+
let deadline_idx = deadline_queue_guard.index();
575+
576+
self.attachment_to_deadline
577+
.borrow_mut()
578+
.insert(reactor_idx, deadline_idx);
579+
self.deadline_to_attachment
580+
.borrow_mut()
581+
.insert(deadline_idx, reactor_idx);
598582
self.attach()?;
599583

600584
Ok(Guard {
@@ -625,21 +609,22 @@ impl<Service: crate::service::Service> WaitSet<Service> {
625609
pub fn run<F: FnMut(AttachmentId<Service>)>(
626610
&self,
627611
mut fn_call: F,
628-
) -> Result<WaitEvent, WaitSetWaitError> {
612+
) -> Result<(), WaitSetRunError> {
629613
let msg = "Unable to call WaitSet::run()";
630614

631615
if SignalHandler::termination_requested() {
632-
return Ok(WaitEvent::TerminationRequest);
616+
fail!(from self, with WaitSetRunError::TerminationRequest,
617+
"{msg} since a termination request was received.");
633618
}
634619

635620
if self.is_empty() {
636-
fail!(from self, with WaitSetWaitError::NoAttachments,
621+
fail!(from self, with WaitSetRunError::NoAttachments,
637622
"{msg} since the WaitSet has no attachments, therefore the call would end up in a deadlock.");
638623
}
639624

640625
let next_timeout = fail!(from self,
641626
when self.deadline_queue.duration_until_next_deadline(),
642-
with WaitSetWaitError::InternalError,
627+
with WaitSetRunError::InternalError,
643628
"{msg} since the next timeout could not be acquired.");
644629

645630
let mut triggered_file_descriptors = vec![];
@@ -655,19 +640,22 @@ impl<Service: crate::service::Service> WaitSet<Service> {
655640
) {
656641
Ok(0) => {
657642
self.handle_deadlines(&mut fn_call, msg)?;
658-
Ok(WaitEvent::Tick)
643+
Ok(())
659644
}
660645
Ok(_) => {
661646
self.handle_all_attachments(&triggered_file_descriptors, &mut fn_call, msg)?;
662-
Ok(WaitEvent::Notification)
647+
Ok(())
648+
}
649+
Err(ReactorWaitError::Interrupt) => {
650+
fail!(from self, with WaitSetRunError::Interrupt,
651+
"{msg} since an interrupt signal was received.");
663652
}
664-
Err(ReactorWaitError::Interrupt) => Ok(WaitEvent::Interrupt),
665653
Err(ReactorWaitError::InsufficientPermissions) => {
666-
fail!(from self, with WaitSetWaitError::InsufficientPermissions,
654+
fail!(from self, with WaitSetRunError::InsufficientPermissions,
667655
"{msg} due to insufficient permissions.");
668656
}
669657
Err(ReactorWaitError::UnknownError) => {
670-
fail!(from self, with WaitSetWaitError::InternalError,
658+
fail!(from self, with WaitSetRunError::InternalError,
671659
"{msg} due to an internal error.");
672660
}
673661
}

iceoryx2/src/prelude.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
pub use crate::config::Config;
1414
pub use crate::node::{node_name::NodeName, Node, NodeBuilder, NodeState};
1515
pub use crate::port::event_id::EventId;
16-
pub use crate::port::waitset::{AttachmentId, WaitEvent, WaitSet, WaitSetBuilder};
16+
pub use crate::port::waitset::{AttachmentId, WaitSet, WaitSetBuilder};
1717
pub use crate::service::messaging_pattern::MessagingPattern;
1818
pub use crate::service::{
1919
attribute::AttributeSet, attribute::AttributeSpecifier, attribute::AttributeVerifier, ipc,

iceoryx2/src/service/local.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ impl crate::service::Service for Service {
5353
type ServiceNameHasher = hash::sha1::Sha1;
5454
type SharedMemory = shared_memory::process_local::Memory<PoolAllocator>;
5555
type Connection = zero_copy_connection::process_local::Connection;
56-
type Event = event::process_local::EventImpl;
56+
//type Event = event::process_local::EventImpl;
57+
type Event = event::unix_datagram_socket::EventImpl;
5758
type Monitoring = monitoring::process_local::ProcessLocalMonitoring;
5859
type Reactor = reactor::posix_select::Reactor;
5960
}

0 commit comments

Comments
 (0)