diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index c978010fb..c518b1c48 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -252,4 +252,5 @@ jobs: uses: codecov/codecov-action@v4 with: file: target/debug/coverage/lcov.info - fail_ci_if_error: true + fail_ci_if_error: false + token: ${{ secrets.CODECOV_TOKEN }} diff --git a/doc/release-notes/iceoryx2-unreleased.md b/doc/release-notes/iceoryx2-unreleased.md index d9928fa70..c4bfd9492 100644 --- a/doc/release-notes/iceoryx2-unreleased.md +++ b/doc/release-notes/iceoryx2-unreleased.md @@ -44,6 +44,7 @@ * Fix failing reacquire of delivered samples in the zero copy receive channel [#130](https://github.com/eclipse-iceoryx/iceoryx2/issues/130) * Fix receiving of invalid samples when subscriber is connected [#131](https://github.com/eclipse-iceoryx/iceoryx2/issues/131) * Fix problem where sample is released to the wrong publisher [#133](https://github.com/eclipse-iceoryx/iceoryx2/issues/133) +* Fix event notifier deadlock with reconnecting listeners [#139](https://github.com/eclipse-iceoryx/iceoryx2/issues/139) * Fixes for FreeBSD 14.0 [#140](https://github.com/eclipse-iceoryx/iceoryx2/issues/140) * Fix segfault in `iceoryx2-pal-posix;:shm_list()` caused by `sysctl` * Adjust test to handle unordered event notifications diff --git a/iceoryx2-bb/lock-free/tests/bitset_tests.rs b/iceoryx2-bb/lock-free/tests/bitset_tests.rs index 90a582039..e42112483 100644 --- a/iceoryx2-bb/lock-free/tests/bitset_tests.rs +++ b/iceoryx2-bb/lock-free/tests/bitset_tests.rs @@ -16,6 +16,7 @@ use std::{ atomic::{AtomicBool, Ordering}, Barrier, }, + time::Duration, }; use iceoryx2_bb_lock_free::mpmc::bit_set::*; @@ -147,7 +148,7 @@ fn bit_set_reset_next_is_fair() { #[test] fn bit_set_concurrent_set_and_reset_works() { - let _watchdog = Watchdog::new(); + let _watchdog = Watchdog::new_with_timeout(Duration::from_secs(60)); let number_of_set_threads = (SystemInfo::NumberOfCpuCores.value() / 2).clamp(2, usize::MAX); let number_of_reset_threads = (SystemInfo::NumberOfCpuCores.value() / 2).clamp(2, usize::MAX); diff --git a/iceoryx2-bb/posix/src/unix_datagram_socket.rs b/iceoryx2-bb/posix/src/unix_datagram_socket.rs index 59d7cb3c3..8b6823039 100644 --- a/iceoryx2-bb/posix/src/unix_datagram_socket.rs +++ b/iceoryx2-bb/posix/src/unix_datagram_socket.rs @@ -189,6 +189,7 @@ enum_gen! { entry: MessageTooLarge, ConnectionReset, + ConnectionRefused, Interrupt, IOerror, InsufficientPermissions, @@ -694,6 +695,7 @@ impl UnixDatagramSender { handle_errno!(UnixDatagramSendError, from self, success Errno::EAGAIN => false, Errno::ECONNRESET => (ConnectionReset, "{} since the connection was reset by peer.", msg), + Errno::ECONNREFUSED => (ConnectionRefused, "{} since the connection was refused by peer.", msg), Errno::EINTR => (Interrupt, "{} since an interrupt signal was received.", msg), Errno::EMSGSIZE => (MessageTooLarge, "{} since the message size of {} bytes is too large to be send in one package.", msg, data.len()), Errno::EIO => (IOerror, "{} since an I/O error occurred while writing to the file system.", msg), diff --git a/iceoryx2-cal/src/event/common.rs b/iceoryx2-cal/src/event/common.rs index 68aad8df9..dc56b1fa0 100644 --- a/iceoryx2-cal/src/event/common.rs +++ b/iceoryx2-cal/src/event/common.rs @@ -12,7 +12,12 @@ #[doc(hidden)] pub mod details { - use std::{fmt::Debug, marker::PhantomData, time::Duration}; + use std::{ + fmt::Debug, + marker::PhantomData, + sync::atomic::{AtomicBool, AtomicUsize, Ordering}, + time::Duration, + }; use iceoryx2_bb_log::{debug, fail}; use iceoryx2_bb_memory::bump_allocator::BumpAllocator; @@ -35,9 +40,12 @@ pub mod details { const TRIGGER_ID_DEFAULT_MAX: TriggerId = TriggerId::new(u16::MAX as _); #[derive(Debug)] + #[repr(C)] pub struct Management { id_tracker: Tracker, signal_mechanism: WaitMechanism, + reference_counter: AtomicUsize, + has_listener: AtomicBool, } #[derive(Copy, PartialEq, Eq)] @@ -232,6 +240,25 @@ pub mod details { _wait_mechanism: PhantomData, } + impl< + Tracker: IdTracker, + WaitMechanism: SignalMechanism, + Storage: DynamicStorage>, + > Drop for Notifier + { + fn drop(&mut self) { + if self + .storage + .get() + .reference_counter + .fetch_sub(1, Ordering::Relaxed) + == 1 + { + self.storage.acquire_ownership(); + } + } + } + impl< Tracker: IdTracker, WaitMechanism: SignalMechanism, @@ -254,10 +281,16 @@ pub mod details { } fn notify(&self, id: crate::event::TriggerId) -> Result<(), NotifierNotifyError> { + let msg = "Failed to notify listener"; + if !self.storage.get().has_listener.load(Ordering::Relaxed) { + fail!(from self, with NotifierNotifyError::Disconnected, + "{} since the listener is no longer connected.", msg); + } + if self.storage.get().id_tracker.trigger_id_max() < id { fail!(from self, with NotifierNotifyError::TriggerIdOutOfBounds, - "Failed to notify since the TriggerId {:?} is greater than the max supported TriggerId {:?}.", - id, self.storage.get().id_tracker.trigger_id_max()); + "{} since the TriggerId {:?} is greater than the max supported TriggerId {:?}.", + msg, id, self.storage.get().id_tracker.trigger_id_max()); } unsafe { self.storage.get().id_tracker.add(id)? }; @@ -326,11 +359,32 @@ pub mod details { .timeout(self.creation_timeout) .open() { - Ok(storage) => Ok(Notifier { - storage, - _tracker: PhantomData, - _wait_mechanism: PhantomData, - }), + Ok(storage) => { + let mut ref_count = storage.get().reference_counter.load(Ordering::Relaxed); + + loop { + if !storage.get().has_listener.load(Ordering::Relaxed) || ref_count == 0 { + fail!(from self, with NotifierCreateError::DoesNotExist, + "{} since it has no listener and will no longer exist.", msg); + } + + match storage.get().reference_counter.compare_exchange( + ref_count, + ref_count + 1, + Ordering::Relaxed, + Ordering::Relaxed, + ) { + Ok(_) => break, + Err(v) => ref_count = v, + }; + } + + Ok(Notifier { + storage, + _tracker: PhantomData, + _wait_mechanism: PhantomData, + }) + } Err(DynamicStorageOpenError::DoesNotExist) => { fail!(from self, with NotifierCreateError::DoesNotExist, "{} since it does not exist.", msg); @@ -363,6 +417,30 @@ pub mod details { _wait_mechanism: PhantomData, } + impl< + Tracker: IdTracker, + WaitMechanism: SignalMechanism, + Storage: DynamicStorage>, + > Drop for Listener + { + fn drop(&mut self) { + self.storage + .get() + .has_listener + .store(false, Ordering::Relaxed); + + if self + .storage + .get() + .reference_counter + .fetch_sub(1, Ordering::Relaxed) + == 1 + { + self.storage.acquire_ownership(); + } + } + } + impl< Tracker: IdTracker, WaitMechanism: SignalMechanism, @@ -540,10 +618,12 @@ pub mod details { .config(&self.config.convert()) .supplementary_size(Tracker::memory_size(id_tracker_capacity)) .initializer(Self::init) - .has_ownership(true) + .has_ownership(false) .create(Management { id_tracker: unsafe { Tracker::new_uninit(id_tracker_capacity) }, signal_mechanism: WaitMechanism::new(), + reference_counter: AtomicUsize::new(1), + has_listener: AtomicBool::new(true), }) { Ok(storage) => Ok(Listener { storage, diff --git a/iceoryx2-cal/src/event/mod.rs b/iceoryx2-cal/src/event/mod.rs index a69e280f3..7cfd568e3 100644 --- a/iceoryx2-cal/src/event/mod.rs +++ b/iceoryx2-cal/src/event/mod.rs @@ -28,6 +28,7 @@ pub use iceoryx2_bb_system_types::path::Path; pub enum NotifierNotifyError { FailedToDeliverSignal, TriggerIdOutOfBounds, + Disconnected, InternalFailure, } diff --git a/iceoryx2-cal/src/event/process_local.rs b/iceoryx2-cal/src/event/process_local.rs index 72c0c18b8..8b515b04a 100644 --- a/iceoryx2-cal/src/event/process_local.rs +++ b/iceoryx2-cal/src/event/process_local.rs @@ -37,6 +37,7 @@ const DEFAULT_CAPACITY: usize = 2048; #[self_referencing] #[derive(Debug)] struct Management { + has_listener: AtomicBool, mtx_handle: MutexHandle>>, #[borrows(mtx_handle)] #[covariant] @@ -112,7 +113,7 @@ impl NamedConceptConfiguration for Configuration { pub struct Duplex { name: FileName, management: Arc, - has_ownership: bool, + is_listener: bool, config: Configuration, } @@ -125,6 +126,15 @@ impl NamedConcept for Duplex { impl Notifier for Duplex { fn notify(&self, id: TriggerId) -> Result<(), NotifierNotifyError> { let msg = "Unable to notify event::process_local::Listener"; + if !self + .management + .borrow_has_listener() + .load(Ordering::Relaxed) + { + fail!(from self, with NotifierNotifyError::Disconnected, + "{} since the listener is no longer connected.", msg); + } + let push_successful = AtomicBool::new(false); if self @@ -148,7 +158,11 @@ impl Notifier for Duplex { impl Drop for Duplex { fn drop(&mut self) { - if self.has_ownership { + if self.is_listener { + self.management + .as_ref() + .borrow_has_listener() + .store(false, Ordering::Relaxed); fatal_panic!(from self, when unsafe { EventImpl::remove_cfg(&self.name, &self.config) }, "This should never happen! Unable to remove resources."); } @@ -286,7 +300,7 @@ impl NotifierBuilder for Builder { .clone() .downcast::() .unwrap(), - has_ownership: false, + is_listener: false, config: self.config, }) } @@ -314,6 +328,7 @@ impl ListenerBuilder for Builder { let storage_details = Arc::new( ManagementBuilder { + has_listener: AtomicBool::new(true), mtx_handle: MutexHandle::new(), cvar_builder: |mtx_handle: &MutexHandle< ConditionVariableData>, @@ -356,7 +371,7 @@ impl ListenerBuilder for Builder { .clone() .downcast::() .unwrap(), - has_ownership: true, + is_listener: true, config: self.config, }) } diff --git a/iceoryx2-cal/src/event/unix_datagram_socket.rs b/iceoryx2-cal/src/event/unix_datagram_socket.rs index 211484a35..49405d229 100644 --- a/iceoryx2-cal/src/event/unix_datagram_socket.rs +++ b/iceoryx2-cal/src/event/unix_datagram_socket.rs @@ -139,7 +139,13 @@ impl crate::event::Notifier for Notifier { Ok(true) => Ok(()), Ok(false) | Err(UnixDatagramSendError::MessagePartiallySend(_)) => { fail!(from self, with NotifierNotifyError::FailedToDeliverSignal, - "{} since the signal could not be delivered", msg); + "{} since the signal could not be delivered.", msg); + } + Err( + UnixDatagramSendError::ConnectionReset | UnixDatagramSendError::ConnectionRefused, + ) => { + fail!(from self, with NotifierNotifyError::Disconnected, + "{} since the notifier is no longer connected to the listener.", msg); } Err(v) => { fail!(from self, with NotifierNotifyError::InternalFailure, diff --git a/iceoryx2-cal/tests/communication_channel_trait_tests.rs b/iceoryx2-cal/tests/communication_channel_trait_tests.rs index 9c04c5e92..894e60d6f 100644 --- a/iceoryx2-cal/tests/communication_channel_trait_tests.rs +++ b/iceoryx2-cal/tests/communication_channel_trait_tests.rs @@ -17,6 +17,7 @@ mod communication_channel { use iceoryx2_bb_posix::unique_system_id::UniqueSystemId; use iceoryx2_bb_system_types::file_name::FileName; use iceoryx2_bb_testing::assert_that; + use iceoryx2_bb_testing::watchdog::Watchdog; use iceoryx2_cal::communication_channel; use iceoryx2_cal::communication_channel::*; use iceoryx2_cal::named_concept::*; @@ -366,6 +367,8 @@ mod communication_channel { #[test] fn custom_suffix_keeps_channels_separated>() { + let _watch_dog = Watchdog::new(); + let config_1 = ::Configuration::default() .suffix(unsafe { FileName::new_unchecked(b".s1") }); let config_2 = ::Configuration::default() diff --git a/iceoryx2-cal/tests/event_tests.rs b/iceoryx2-cal/tests/event_tests.rs index 6f0c36a28..862bf6aed 100644 --- a/iceoryx2-cal/tests/event_tests.rs +++ b/iceoryx2-cal/tests/event_tests.rs @@ -612,6 +612,23 @@ mod event { }); } + #[test] + fn out_of_scope_listener_shall_not_corrupt_notifier() { + let name = generate_name(); + + let sut_listener = Sut::ListenerBuilder::new(&name).create().unwrap(); + let sut_notifier = Sut::NotifierBuilder::new(&name).open().unwrap(); + + drop(sut_listener); + + let result = sut_notifier.notify(TriggerId::new(0)); + // either present a disconnect error when available or continue sending without counterpart, for + // instance when the event is network socket based + if result.is_err() { + assert_that!(result.err().unwrap(), eq NotifierNotifyError::Disconnected); + } + } + #[instantiate_tests()] mod unix_datagram {} diff --git a/iceoryx2-pal/posix/src/windows/errno.rs b/iceoryx2-pal/posix/src/windows/errno.rs index c887d25fc..9c5413369 100644 --- a/iceoryx2-pal/posix/src/windows/errno.rs +++ b/iceoryx2-pal/posix/src/windows/errno.rs @@ -14,11 +14,7 @@ #![allow(unused_variables)] use crate::posix::types::*; -use std::{ - ffi::CStr, - fmt::Display, - sync::atomic::{AtomicU32, Ordering}, -}; +use std::{cell::Cell, ffi::CStr, fmt::Display}; macro_rules! ErrnoEnumGenerator { (assign $($entry:ident = $value:expr),*; map $($map_entry:ident),*) => { @@ -196,19 +192,21 @@ ErrnoEnumGenerator!( // EHWPOISON, ); -static GLOBAL_ERRNO_VALUE: AtomicU32 = AtomicU32::new(Errno::ESUCCES as _); +thread_local! { + pub static GLOBAL_ERRNO_VALUE: Cell = Cell::new(Errno::ESUCCES as _); +} impl Errno { pub fn get() -> Errno { - GLOBAL_ERRNO_VALUE.load(Ordering::Relaxed).into() + GLOBAL_ERRNO_VALUE.get().into() } pub fn reset() { - GLOBAL_ERRNO_VALUE.store(Errno::ESUCCES as _, Ordering::Relaxed); + Errno::set(Errno::ESUCCES); } pub(crate) fn set(value: Errno) { - GLOBAL_ERRNO_VALUE.store(value as _, Ordering::Relaxed); + GLOBAL_ERRNO_VALUE.set(value as _); } } diff --git a/iceoryx2-pal/posix/src/windows/socket.rs b/iceoryx2-pal/posix/src/windows/socket.rs index 8c61fc3fc..da75e1510 100644 --- a/iceoryx2-pal/posix/src/windows/socket.rs +++ b/iceoryx2-pal/posix/src/windows/socket.rs @@ -15,6 +15,7 @@ #![allow(unused_variables)] use std::cell::OnceCell; +use std::sync::atomic::{AtomicU8, Ordering}; use std::time::{Duration, Instant}; use windows_sys::Win32::Networking::WinSock::WSAEWOULDBLOCK; @@ -42,12 +43,20 @@ impl Struct for WSADATA {} impl GlobalWsaInitializer { unsafe fn init() { static mut WSA_INSTANCE: OnceCell = OnceCell::new(); - - WSA_INSTANCE.get_or_init(||{ - let mut _wsa_data = WSADATA::new(); - win32call! {winsock windows_sys::Win32::Networking::WinSock::WSAStartup(2, &mut _wsa_data)}; - GlobalWsaInitializer { _wsa_data } - }); + static mut INITIALIZATION_STATE: AtomicU8 = AtomicU8::new(0); + + match INITIALIZATION_STATE.compare_exchange(0, 1, Ordering::Relaxed, Ordering::Relaxed) { + Ok(_) => { + WSA_INSTANCE.get_or_init(||{ + let mut _wsa_data = WSADATA::new(); + win32call! {winsock windows_sys::Win32::Networking::WinSock::WSAStartup(2, &mut _wsa_data)}; + GlobalWsaInitializer { _wsa_data } + }); + INITIALIZATION_STATE.store(2, Ordering::Relaxed); + } + Err(1) => while INITIALIZATION_STATE.load(Ordering::Relaxed) == 1 {}, + Err(_) => (), + } } } diff --git a/iceoryx2/src/port/details/publisher_connections.rs b/iceoryx2/src/port/details/publisher_connections.rs index c5f32aad0..814d90cac 100644 --- a/iceoryx2/src/port/details/publisher_connections.rs +++ b/iceoryx2/src/port/details/publisher_connections.rs @@ -10,7 +10,7 @@ // // SPDX-License-Identifier: Apache-2.0 OR MIT -use std::{cell::UnsafeCell, rc::Rc}; +use std::{cell::UnsafeCell, sync::Arc}; use crate::{ config, @@ -80,7 +80,7 @@ impl Connection { pub(crate) struct PublisherConnections { connections: Vec>>>, subscriber_id: UniqueSubscriberId, - config: Rc, + config: Arc, static_config: StaticConfig, } @@ -88,13 +88,13 @@ impl PublisherConnections { pub(crate) fn new( capacity: usize, subscriber_id: UniqueSubscriberId, - config: &Rc, + config: &Arc, static_config: &StaticConfig, ) -> Self { Self { connections: (0..capacity).map(|_| UnsafeCell::new(None)).collect(), subscriber_id, - config: Rc::clone(config), + config: Arc::clone(config), static_config: static_config.clone(), } } diff --git a/iceoryx2/src/port/details/subscriber_connections.rs b/iceoryx2/src/port/details/subscriber_connections.rs index a0556a49b..f7e9a7dcc 100644 --- a/iceoryx2/src/port/details/subscriber_connections.rs +++ b/iceoryx2/src/port/details/subscriber_connections.rs @@ -11,7 +11,7 @@ // SPDX-License-Identifier: Apache-2.0 OR MIT use std::cell::UnsafeCell; -use std::rc::Rc; +use std::sync::Arc; use iceoryx2_bb_log::fail; use iceoryx2_cal::named_concept::NamedConceptBuilder; @@ -61,7 +61,7 @@ impl Connection { pub(crate) struct SubscriberConnections { connections: Vec>>>, port_id: UniquePublisherId, - config: Rc, + config: Arc, static_config: StaticConfig, number_of_samples: usize, } @@ -69,14 +69,14 @@ pub(crate) struct SubscriberConnections { impl SubscriberConnections { pub(crate) fn new( capacity: usize, - config: &Rc, + config: &Arc, port_id: UniquePublisherId, static_config: &StaticConfig, number_of_samples: usize, ) -> Self { Self { connections: (0..capacity).map(|_| UnsafeCell::new(None)).collect(), - config: Rc::clone(config), + config: Arc::clone(config), port_id, static_config: static_config.clone(), number_of_samples, diff --git a/iceoryx2/src/port/listener.rs b/iceoryx2/src/port/listener.rs index e62e3656a..e705c954a 100644 --- a/iceoryx2/src/port/listener.rs +++ b/iceoryx2/src/port/listener.rs @@ -66,8 +66,8 @@ use iceoryx2_cal::named_concept::NamedConceptBuilder; use crate::service::naming_scheme::event_concept_name; use crate::{port::port_identifiers::UniqueListenerId, service}; -use std::rc::Rc; use std::sync::atomic::Ordering; +use std::sync::Arc; use std::time::Duration; use super::event_id::EventId; @@ -93,7 +93,7 @@ impl std::error::Error for ListenerCreateError {} pub struct Listener { dynamic_listener_handle: Option, listener: ::Listener, - dynamic_storage: Rc, + dynamic_storage: Arc, port_id: UniqueListenerId, } @@ -115,7 +115,7 @@ impl Listener { let port_id = UniqueListenerId::new(); let event_name = event_concept_name(&port_id); - let dynamic_storage = Rc::clone(&service.state().dynamic_storage); + let dynamic_storage = Arc::clone(&service.state().dynamic_storage); let listener = fail!(from origin, when ::ListenerBuilder::new(&event_name) diff --git a/iceoryx2/src/port/notifier.rs b/iceoryx2/src/port/notifier.rs index e15c51d9a..96627b7ec 100644 --- a/iceoryx2/src/port/notifier.rs +++ b/iceoryx2/src/port/notifier.rs @@ -41,10 +41,13 @@ use crate::{ service::{self, naming_scheme::event_concept_name}, }; use iceoryx2_bb_lock_free::mpmc::container::{ContainerHandle, ContainerState}; -use iceoryx2_bb_log::{fail, warn}; -use iceoryx2_cal::named_concept::NamedConceptBuilder; +use iceoryx2_bb_log::{debug, fail, warn}; use iceoryx2_cal::{dynamic_storage::DynamicStorage, event::NotifierBuilder}; -use std::{cell::UnsafeCell, rc::Rc, sync::atomic::Ordering}; +use iceoryx2_cal::{event::Event, named_concept::NamedConceptBuilder}; +use std::{ + cell::UnsafeCell, + sync::{atomic::Ordering, Arc}, +}; /// Failures that can occur when a new [`Notifier`] is created with the /// [`crate::service::port_factory::notifier::PortFactoryNotifier`]. @@ -64,7 +67,6 @@ impl std::error::Error for NotifierCreateError {} /// Defines the failures that can occur while a [`Notifier::notify()`] call. #[derive(Debug, PartialEq, Eq, Copy, Clone)] pub enum NotifierNotifyError { - OnlyPartialUpdate, EventIdOutOfBounds, } @@ -76,10 +78,16 @@ impl std::fmt::Display for NotifierNotifyError { impl std::error::Error for NotifierNotifyError {} +#[derive(Debug)] +struct Connection { + notifier: ::Notifier, + listener_id: UniqueListenerId, +} + #[derive(Debug, Default)] struct ListenerConnections { #[allow(clippy::type_complexity)] - connections: Vec::Notifier>>>, + connections: Vec>>>, } impl ListenerConnections { @@ -96,30 +104,44 @@ impl ListenerConnections { new_self } - fn create(&self, index: usize, listener_id: UniqueListenerId) -> Result<(), ()> { + fn create(&self, index: usize, listener_id: UniqueListenerId) { + let msg = "Unable to establish connection to listener"; let event_name = event_concept_name(&listener_id); if self.get(index).is_none() { - let notifier = fail!(from self, when ::NotifierBuilder::new(&event_name).open(), - with (), - "Unable to establish a connection to Listener port {:?}.", listener_id); - *self.get_mut(index) = Some(notifier); + match ::NotifierBuilder::new(&event_name) + .open() + { + Ok(notifier) => { + *self.get_mut(index) = Some(Connection { + notifier, + listener_id, + }); + } + Err( + iceoryx2_cal::event::NotifierCreateError::DoesNotExist + | iceoryx2_cal::event::NotifierCreateError::InitializationNotYetFinalized, + ) => (), + Err(iceoryx2_cal::event::NotifierCreateError::VersionMismatch) => { + warn!(from self, + "{} since a version mismatch was detected! All entities must use the same iceoryx2 version!", + msg); + } + Err(iceoryx2_cal::event::NotifierCreateError::InsufficientPermissions) => { + warn!(from self, "{} since the permissions do not match. The service or the participants are maybe misconfigured.", msg); + } + Err(iceoryx2_cal::event::NotifierCreateError::InternalFailure) => { + debug!(from self, "{} due to an internal failure.", msg); + } + } } - - Ok(()) } - fn get( - &self, - index: usize, - ) -> &Option<::Notifier> { + fn get(&self, index: usize) -> &Option> { unsafe { &(*self.connections[index].get()) } } #[allow(clippy::mut_from_ref)] - fn get_mut( - &self, - index: usize, - ) -> &mut Option<::Notifier> { + fn get_mut(&self, index: usize) -> &mut Option> { unsafe { &mut (*self.connections[index].get()) } } @@ -139,7 +161,7 @@ pub struct Notifier { listener_list_state: UnsafeCell>, default_event_id: EventId, event_id_max_value: usize, - dynamic_storage: Rc, + dynamic_storage: Arc, dynamic_notifier_handle: Option, port_id: UniqueNotifierId, } @@ -165,7 +187,7 @@ impl Notifier { let port_id = UniqueNotifierId::new(); let listener_list = &service.state().dynamic_storage.get().event().listeners; - let dynamic_storage = Rc::clone(&service.state().dynamic_storage); + let dynamic_storage = Arc::clone(&service.state().dynamic_storage); let mut new_self = Self { listener_connections: ListenerConnections::new(listener_list.capacity()), @@ -177,9 +199,7 @@ impl Notifier { port_id, }; - if let Err(e) = new_self.populate_listener_channels() { - warn!(from new_self, "The new Notifier port is unable to connect to every Listener port, caused by {:?}.", e); - } + new_self.populate_listener_channels(); std::sync::atomic::compiler_fence(Ordering::SeqCst); @@ -203,7 +223,7 @@ impl Notifier { Ok(new_self) } - fn update_connections(&self) -> Result<(), NotifierNotifyError> { + fn update_connections(&self) { if unsafe { self.dynamic_storage .get() @@ -211,15 +231,11 @@ impl Notifier { .listeners .update_state(&mut *self.listener_list_state.get()) } { - fail!(from self, when self.populate_listener_channels(), - with NotifierNotifyError::OnlyPartialUpdate, - "Connections were updated only partially since at least one connection to a Listener port failed."); + self.populate_listener_channels(); } - - Ok(()) } - fn populate_listener_channels(&self) -> Result<(), ()> { + fn populate_listener_channels(&self) { let mut visited_indices = vec![]; visited_indices.resize(self.listener_connections.len(), None); @@ -231,18 +247,25 @@ impl Notifier { for (i, index) in visited_indices.iter().enumerate() { match index { - Some(listener_id) => match self.listener_connections.create(i, *listener_id) { - Ok(()) => (), - Err(()) => { - fail!(from self, with (), - "Unable to establish connection to Listener port {:?}.", *listener_id); + Some(listener_id) => { + let create_connection = match self.listener_connections.get(i) { + None => true, + Some(connection) => { + let is_connected = connection.listener_id != *listener_id; + if is_connected { + self.listener_connections.remove(i); + } + is_connected + } + }; + + if create_connection { + self.listener_connections.create(i, *listener_id); } - }, + } None => self.listener_connections.remove(i), } } - - Ok(()) } /// Returns the [`UniqueNotifierId`] of the [`Notifier`] @@ -269,9 +292,7 @@ impl Notifier { value: EventId, ) -> Result { let msg = "Unable to notify event"; - fail!(from self, when self.update_connections(), - "{} with id {:?} since the underlying connections could not be updated.", - msg, value); + self.update_connections(); use iceoryx2_cal::event::Notifier; let mut number_of_triggered_listeners = 0; @@ -284,7 +305,10 @@ impl Notifier { for i in 0..self.listener_connections.len() { match self.listener_connections.get(i) { - Some(ref connection) => match connection.notify(value) { + Some(ref connection) => match connection.notifier.notify(value) { + Err(iceoryx2_cal::event::NotifierNotifyError::Disconnected) => { + self.listener_connections.remove(i); + } Err(e) => { warn!(from self, "Unable to send notification via connection {:?} due to {:?}.", connection, e) diff --git a/iceoryx2/src/port/publisher.rs b/iceoryx2/src/port/publisher.rs index 5fb4fdaca..3f6db0d60 100644 --- a/iceoryx2/src/port/publisher.rs +++ b/iceoryx2/src/port/publisher.rs @@ -55,8 +55,8 @@ use std::cell::UnsafeCell; use std::fmt::Debug; -use std::rc::Rc; use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; +use std::sync::Arc; use std::{alloc::Layout, marker::PhantomData, mem::MaybeUninit}; use super::port_identifiers::{UniquePublisherId, UniqueSubscriberId}; @@ -149,7 +149,7 @@ pub(crate) struct DataSegment { message_type_layout: Layout, port_id: UniquePublisherId, config: LocalPublisherConfig, - dynamic_storage: Rc, + dynamic_storage: Arc, subscriber_connections: SubscriberConnections, subscriber_list_state: UnsafeCell>, @@ -428,7 +428,7 @@ impl DataSegment { /// Sending endpoint of a publish-subscriber based communication. #[derive(Debug)] pub struct Publisher { - pub(crate) data_segment: Rc>, + pub(crate) data_segment: Arc>, dynamic_publisher_handle: Option, _phantom_message_type: PhantomData, } @@ -461,7 +461,7 @@ impl Publisher Publisher>(), diff --git a/iceoryx2/src/port/subscriber.rs b/iceoryx2/src/port/subscriber.rs index 02832098a..c2f25b455 100644 --- a/iceoryx2/src/port/subscriber.rs +++ b/iceoryx2/src/port/subscriber.rs @@ -34,8 +34,8 @@ use std::cell::UnsafeCell; use std::fmt::Debug; use std::marker::PhantomData; -use std::rc::Rc; use std::sync::atomic::Ordering; +use std::sync::Arc; use iceoryx2_bb_lock_free::mpmc::container::{ContainerHandle, ContainerState}; use iceoryx2_bb_log::{fail, warn}; @@ -97,8 +97,8 @@ pub(crate) mod internal { #[derive(Debug)] pub struct Subscriber { dynamic_subscriber_handle: Option, - publisher_connections: Rc>, - dynamic_storage: Rc, + publisher_connections: Arc>, + dynamic_storage: Arc, static_config: crate::service::static_config::StaticConfig, config: SubscriberConfig, @@ -134,9 +134,9 @@ impl Subscriber Subscriber { - pub(crate) publisher_connections: Rc>, + pub(crate) publisher_connections: Arc>, pub(crate) ptr: RawSample, pub(crate) channel_id: usize, pub(crate) offset: PointerOffset, diff --git a/iceoryx2/src/sample_mut.rs b/iceoryx2/src/sample_mut.rs index ee3eadfdc..fdb5560b9 100644 --- a/iceoryx2/src/sample_mut.rs +++ b/iceoryx2/src/sample_mut.rs @@ -41,7 +41,7 @@ use crate::{ service::header::publish_subscribe::Header, }; use iceoryx2_cal::shared_memory::*; -use std::{fmt::Debug, mem::MaybeUninit, rc::Rc}; +use std::{fmt::Debug, mem::MaybeUninit, sync::Arc}; /// Acquired by a [`crate::port::publisher::Publisher`] via /// [`crate::port::publisher::Publisher::loan()`] or @@ -58,7 +58,7 @@ use std::{fmt::Debug, mem::MaybeUninit, rc::Rc}; /// which API is used to obtain the sample. #[derive(Debug)] pub struct SampleMut { - data_segment: Rc>, + data_segment: Arc>, ptr: RawSampleMut, pub(crate) offset_to_chunk: PointerOffset, } @@ -75,12 +75,12 @@ impl SampleMut, Service> { pub(crate) fn new( - data_segment: &Rc>, + data_segment: &Arc>, ptr: RawSampleMut>, offset_to_chunk: PointerOffset, ) -> Self { Self { - data_segment: Rc::clone(data_segment), + data_segment: Arc::clone(data_segment), ptr, offset_to_chunk, } diff --git a/iceoryx2/src/service/builder/event.rs b/iceoryx2/src/service/builder/event.rs index 4c658075d..351bfaf78 100644 --- a/iceoryx2/src/service/builder/event.rs +++ b/iceoryx2/src/service/builder/event.rs @@ -199,7 +199,7 @@ impl Builder { Ok(Some((static_config, static_storage))) => { let static_config = self.verify_service_properties(&static_config)?; - let dynamic_config = Rc::new( + let dynamic_config = Arc::new( fail!(from self, when self.base.open_dynamic_config_storage(), with EventOpenError::UnableToOpenDynamicServiceInformation, "{} since the dynamic service informations could not be opened.", msg), @@ -280,7 +280,7 @@ impl Builder { ), dynamic_config::event::DynamicConfig::memory_size(&dynamic_config_setting), ) { - Ok(c) => Rc::new(c), + Ok(c) => Arc::new(c), Err(DynamicStorageCreateError::AlreadyExists) => { fail!(from self, with EventCreateError::OldConnectionsStillActive, "{} since there are still active Listeners or Notifiers.", msg); diff --git a/iceoryx2/src/service/builder/mod.rs b/iceoryx2/src/service/builder/mod.rs index 2fea6b579..9d4980f0a 100644 --- a/iceoryx2/src/service/builder/mod.rs +++ b/iceoryx2/src/service/builder/mod.rs @@ -39,7 +39,7 @@ use iceoryx2_cal::named_concept::NamedConceptMgmt; use iceoryx2_cal::serialize::Serialize; use iceoryx2_cal::static_storage::*; use std::marker::PhantomData; -use std::rc::Rc; +use std::sync::Arc; use super::config_scheme::dynamic_config_storage_config; use super::config_scheme::static_config_storage_config; @@ -123,7 +123,7 @@ impl Builder { ) -> publish_subscribe::Builder { BuilderWithServiceType::new( StaticConfig::new_publish_subscribe::(&self.name, config), - Rc::new(config.clone()), + Arc::new(config.clone()), ) .publish_subscribe() } @@ -140,7 +140,7 @@ impl Builder { pub fn event_with_custom_config(self, config: &config::Config) -> event::Builder { BuilderWithServiceType::new( StaticConfig::new_event::(&self.name, config), - Rc::new(config.clone()), + Arc::new(config.clone()), ) .event() } @@ -150,12 +150,12 @@ impl Builder { #[derive(Debug)] pub struct BuilderWithServiceType { service_config: StaticConfig, - global_config: Rc, + global_config: Arc, _phantom_data: PhantomData, } impl BuilderWithServiceType { - fn new(service_config: StaticConfig, global_config: Rc) -> Self { + fn new(service_config: StaticConfig, global_config: Arc) -> Self { Self { service_config, global_config, diff --git a/iceoryx2/src/service/builder/publish_subscribe.rs b/iceoryx2/src/service/builder/publish_subscribe.rs index ad7a97193..53213df48 100644 --- a/iceoryx2/src/service/builder/publish_subscribe.rs +++ b/iceoryx2/src/service/builder/publish_subscribe.rs @@ -302,7 +302,7 @@ impl Builder { Ok(Some((static_config, static_storage))) => { let static_config = self.verify_service_properties(&static_config)?; - let dynamic_config = Rc::new( + let dynamic_config = Arc::new( fail!(from self, when self.base.open_dynamic_config_storage(), with PublishSubscribeOpenError::UnableToOpenDynamicServiceInformation, "{} since the dynamic service information could not be opened.", msg), @@ -409,7 +409,7 @@ impl Builder { &dynamic_config_setting, ), ) { - Ok(c) => Rc::new(c), + Ok(c) => Arc::new(c), Err(DynamicStorageCreateError::AlreadyExists) => { fail!(from self, with PublishSubscribeCreateError::OldConnectionsStillActive, "{} since there are still Publishers, Subscribers or active Samples.", msg); diff --git a/iceoryx2/src/service/mod.rs b/iceoryx2/src/service/mod.rs index 6249ec33f..5483b7460 100644 --- a/iceoryx2/src/service/mod.rs +++ b/iceoryx2/src/service/mod.rs @@ -139,7 +139,7 @@ pub(crate) mod config_scheme; pub(crate) mod naming_scheme; use std::fmt::Debug; -use std::rc::Rc; +use std::sync::Arc; use crate::config; use crate::service::dynamic_config::DynamicConfig; @@ -197,16 +197,16 @@ impl std::error::Error for ServiceListError {} #[derive(Debug)] pub struct ServiceState> { pub(crate) static_config: StaticConfig, - pub(crate) global_config: Rc, - pub(crate) dynamic_storage: Rc, + pub(crate) global_config: Arc, + pub(crate) dynamic_storage: Arc, pub(crate) static_storage: Static, } impl> ServiceState { pub(crate) fn new( static_config: StaticConfig, - global_config: Rc, - dynamic_storage: Rc, + global_config: Arc, + dynamic_storage: Arc, static_storage: Static, ) -> Self { let new_self = Self { diff --git a/iceoryx2/tests/service_event_tests.rs b/iceoryx2/tests/service_event_tests.rs index 43c7a636c..3105563e8 100644 --- a/iceoryx2/tests/service_event_tests.rs +++ b/iceoryx2/tests/service_event_tests.rs @@ -22,6 +22,7 @@ mod service_event { use iceoryx2::port::notifier::NotifierNotifyError; use iceoryx2::prelude::*; use iceoryx2::service::builder::event::{EventCreateError, EventOpenError}; + use iceoryx2_bb_log::{set_log_level, LogLevel}; use iceoryx2_bb_posix::unique_system_id::UniqueSystemId; use iceoryx2_bb_testing::assert_that; use iceoryx2_bb_testing::watchdog::Watchdog; @@ -431,30 +432,28 @@ mod service_event { } #[test] - // TODO iox2-139, enable when bitset is integrated into events - #[ignore] fn concurrent_reconnecting_notifier_can_trigger_waiting_listener() { let _watch_dog = Watchdog::new(); - const NUMBER_OF_LISTENER_THREADS: usize = 2; - const NUMBER_OF_NOTIFIER_THREADS: usize = 2; - const NUMBER_OF_ITERATIONS: usize = 50; - const EVENT_ID: EventId = EventId::new(558); + let number_of_listener_threads = 2; + let number_of_notifier_threads = 2; + const NUMBER_OF_ITERATIONS: usize = 100; + const EVENT_ID: EventId = EventId::new(8); let keep_running = AtomicBool::new(true); let service_name = generate_name(); - let barrier = Barrier::new(NUMBER_OF_LISTENER_THREADS + NUMBER_OF_NOTIFIER_THREADS); + let barrier = Barrier::new(number_of_notifier_threads + number_of_listener_threads); let sut = Sut::new(&service_name) .event() - .max_listeners(NUMBER_OF_LISTENER_THREADS) - .max_notifiers(NUMBER_OF_NOTIFIER_THREADS) + .max_listeners(number_of_listener_threads) + .max_notifiers(number_of_notifier_threads) .create() .unwrap(); std::thread::scope(|s| { let mut listener_threads = vec![]; - for _ in 0..NUMBER_OF_LISTENER_THREADS { + for _ in 0..number_of_listener_threads { listener_threads.push(s.spawn(|| { let listener = sut.listener().create().unwrap(); barrier.wait(); @@ -470,7 +469,7 @@ mod service_event { })); } - for _ in 0..NUMBER_OF_NOTIFIER_THREADS { + for _ in 0..number_of_notifier_threads { s.spawn(|| { barrier.wait(); @@ -490,30 +489,29 @@ mod service_event { } #[test] - // TODO iox2-139, enable when bitset is integrated into events - #[ignore] fn concurrent_reconnecting_listener_can_wait_for_triggering_notifiers() { + set_log_level(LogLevel::Fatal); let _watch_dog = Watchdog::new(); - const NUMBER_OF_LISTENER_THREADS: usize = 2; - const NUMBER_OF_NOTIFIER_THREADS: usize = 2; - const NUMBER_OF_ITERATIONS: usize = 50; - const EVENT_ID: EventId = EventId::new(558); + let number_of_listener_threads = 2; + let number_of_notifier_threads = 2; + const NUMBER_OF_ITERATIONS: usize = 100; + const EVENT_ID: EventId = EventId::new(8); let keep_running = AtomicBool::new(true); let service_name = generate_name(); - let barrier = Barrier::new(NUMBER_OF_LISTENER_THREADS + NUMBER_OF_NOTIFIER_THREADS); + let barrier = Barrier::new(number_of_listener_threads + number_of_notifier_threads); let sut = Sut::new(&service_name) .event() - .max_listeners(NUMBER_OF_LISTENER_THREADS * 2) - .max_notifiers(NUMBER_OF_NOTIFIER_THREADS) + .max_listeners(number_of_listener_threads * 2) + .max_notifiers(number_of_notifier_threads) .create() .unwrap(); std::thread::scope(|s| { let mut listener_threads = vec![]; - for _ in 0..NUMBER_OF_LISTENER_THREADS { + for _ in 0..number_of_listener_threads { listener_threads.push(s.spawn(|| { barrier.wait(); @@ -530,7 +528,7 @@ mod service_event { })); } - for _ in 0..NUMBER_OF_NOTIFIER_THREADS { + for _ in 0..number_of_notifier_threads { s.spawn(|| { let notifier = sut.notifier().create().unwrap(); barrier.wait();