Skip to content

[#139] fix event concurrency issues #186

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -252,4 +252,5 @@ jobs:
uses: codecov/codecov-action@v4
with:
file: target/debug/coverage/lcov.info
fail_ci_if_error: true
fail_ci_if_error: false
token: ${{ secrets.CODECOV_TOKEN }}
1 change: 1 addition & 0 deletions doc/release-notes/iceoryx2-unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
* Fix failing reacquire of delivered samples in the zero copy receive channel [#130](https://github.com/eclipse-iceoryx/iceoryx2/issues/130)
* Fix receiving of invalid samples when subscriber is connected [#131](https://github.com/eclipse-iceoryx/iceoryx2/issues/131)
* Fix problem where sample is released to the wrong publisher [#133](https://github.com/eclipse-iceoryx/iceoryx2/issues/133)
* Fix event notifier deadlock with reconnecting listeners [#139](https://github.com/eclipse-iceoryx/iceoryx2/issues/139)
* Fixes for FreeBSD 14.0 [#140](https://github.com/eclipse-iceoryx/iceoryx2/issues/140)
* Fix segfault in `iceoryx2-pal-posix;:shm_list()` caused by `sysctl`
* Adjust test to handle unordered event notifications
Expand Down
2 changes: 2 additions & 0 deletions iceoryx2-bb/posix/src/unix_datagram_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ enum_gen! {
entry:
MessageTooLarge,
ConnectionReset,
ConnectionRefused,
Interrupt,
IOerror,
InsufficientPermissions,
Expand Down Expand Up @@ -694,6 +695,7 @@ impl UnixDatagramSender {
handle_errno!(UnixDatagramSendError, from self,
success Errno::EAGAIN => false,
Errno::ECONNRESET => (ConnectionReset, "{} since the connection was reset by peer.", msg),
Errno::ECONNREFUSED => (ConnectionRefused, "{} since the connection was refused by peer.", msg),
Errno::EINTR => (Interrupt, "{} since an interrupt signal was received.", msg),
Errno::EMSGSIZE => (MessageTooLarge, "{} since the message size of {} bytes is too large to be send in one package.", msg, data.len()),
Errno::EIO => (IOerror, "{} since an I/O error occurred while writing to the file system.", msg),
Expand Down
98 changes: 89 additions & 9 deletions iceoryx2-cal/src/event/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@

#[doc(hidden)]
pub mod details {
use std::{fmt::Debug, marker::PhantomData, time::Duration};
use std::{
fmt::Debug,
marker::PhantomData,
sync::atomic::{AtomicBool, AtomicUsize, Ordering},
time::Duration,
};

use iceoryx2_bb_log::{debug, fail};
use iceoryx2_bb_memory::bump_allocator::BumpAllocator;
Expand All @@ -35,9 +40,12 @@ pub mod details {
const TRIGGER_ID_DEFAULT_MAX: TriggerId = TriggerId::new(u16::MAX as _);

#[derive(Debug)]
#[repr(C)]
pub struct Management<Tracker: IdTracker, WaitMechanism: SignalMechanism> {
id_tracker: Tracker,
signal_mechanism: WaitMechanism,
reference_counter: AtomicUsize,
has_listener: AtomicBool,
}

#[derive(Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -232,6 +240,25 @@ pub mod details {
_wait_mechanism: PhantomData<WaitMechanism>,
}

impl<
Tracker: IdTracker,
WaitMechanism: SignalMechanism,
Storage: DynamicStorage<Management<Tracker, WaitMechanism>>,
> Drop for Notifier<Tracker, WaitMechanism, Storage>
{
fn drop(&mut self) {
if self
.storage
.get()
.reference_counter
.fetch_sub(1, Ordering::Relaxed)
== 1
{
self.storage.acquire_ownership();
}
}
}

impl<
Tracker: IdTracker,
WaitMechanism: SignalMechanism,
Expand All @@ -254,10 +281,16 @@ pub mod details {
}

fn notify(&self, id: crate::event::TriggerId) -> Result<(), NotifierNotifyError> {
let msg = "Failed to notify listener";
if !self.storage.get().has_listener.load(Ordering::Relaxed) {
fail!(from self, with NotifierNotifyError::Disconnected,
"{} since the listener is no longer connected.", msg);
}

if self.storage.get().id_tracker.trigger_id_max() < id {
fail!(from self, with NotifierNotifyError::TriggerIdOutOfBounds,
"Failed to notify since the TriggerId {:?} is greater than the max supported TriggerId {:?}.",
id, self.storage.get().id_tracker.trigger_id_max());
"{} since the TriggerId {:?} is greater than the max supported TriggerId {:?}.",
msg, id, self.storage.get().id_tracker.trigger_id_max());
}

unsafe { self.storage.get().id_tracker.add(id)? };
Expand Down Expand Up @@ -326,11 +359,32 @@ pub mod details {
.timeout(self.creation_timeout)
.open()
{
Ok(storage) => Ok(Notifier {
storage,
_tracker: PhantomData,
_wait_mechanism: PhantomData,
}),
Ok(storage) => {
let mut ref_count = storage.get().reference_counter.load(Ordering::Relaxed);

loop {
if !storage.get().has_listener.load(Ordering::Relaxed) || ref_count == 0 {
fail!(from self, with NotifierCreateError::DoesNotExist,
"{} since it has no listener and will no longer exist.", msg);
}

match storage.get().reference_counter.compare_exchange(
ref_count,
ref_count + 1,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(v) => ref_count = v,
};
}

Ok(Notifier {
storage,
_tracker: PhantomData,
_wait_mechanism: PhantomData,
})
}
Err(DynamicStorageOpenError::DoesNotExist) => {
fail!(from self, with NotifierCreateError::DoesNotExist,
"{} since it does not exist.", msg);
Expand Down Expand Up @@ -363,6 +417,30 @@ pub mod details {
_wait_mechanism: PhantomData<WaitMechanism>,
}

impl<
Tracker: IdTracker,
WaitMechanism: SignalMechanism,
Storage: DynamicStorage<Management<Tracker, WaitMechanism>>,
> Drop for Listener<Tracker, WaitMechanism, Storage>
{
fn drop(&mut self) {
self.storage
.get()
.has_listener
.store(false, Ordering::Relaxed);

if self
.storage
.get()
.reference_counter
.fetch_sub(1, Ordering::Relaxed)
== 1
{
self.storage.acquire_ownership();
}
}
}

impl<
Tracker: IdTracker,
WaitMechanism: SignalMechanism,
Expand Down Expand Up @@ -540,10 +618,12 @@ pub mod details {
.config(&self.config.convert())
.supplementary_size(Tracker::memory_size(id_tracker_capacity))
.initializer(Self::init)
.has_ownership(true)
.has_ownership(false)
.create(Management {
id_tracker: unsafe { Tracker::new_uninit(id_tracker_capacity) },
signal_mechanism: WaitMechanism::new(),
reference_counter: AtomicUsize::new(1),
has_listener: AtomicBool::new(true),
}) {
Ok(storage) => Ok(Listener {
storage,
Expand Down
1 change: 1 addition & 0 deletions iceoryx2-cal/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub use iceoryx2_bb_system_types::path::Path;
pub enum NotifierNotifyError {
FailedToDeliverSignal,
TriggerIdOutOfBounds,
Disconnected,
InternalFailure,
}

Expand Down
23 changes: 19 additions & 4 deletions iceoryx2-cal/src/event/process_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const DEFAULT_CAPACITY: usize = 2048;
#[self_referencing]
#[derive(Debug)]
struct Management {
has_listener: AtomicBool,
mtx_handle: MutexHandle<ConditionVariableData<FixedSizeQueue<TriggerId, DEFAULT_CAPACITY>>>,
#[borrows(mtx_handle)]
#[covariant]
Expand Down Expand Up @@ -112,7 +113,7 @@ impl NamedConceptConfiguration for Configuration {
pub struct Duplex {
name: FileName,
management: Arc<Management>,
has_ownership: bool,
is_listener: bool,
config: Configuration,
}

Expand All @@ -125,6 +126,15 @@ impl NamedConcept for Duplex {
impl Notifier for Duplex {
fn notify(&self, id: TriggerId) -> Result<(), NotifierNotifyError> {
let msg = "Unable to notify event::process_local::Listener";
if !self
.management
.borrow_has_listener()
.load(Ordering::Relaxed)
{
fail!(from self, with NotifierNotifyError::Disconnected,
"{} since the listener is no longer connected.", msg);
}

let push_successful = AtomicBool::new(false);

if self
Expand All @@ -148,7 +158,11 @@ impl Notifier for Duplex {

impl Drop for Duplex {
fn drop(&mut self) {
if self.has_ownership {
if self.is_listener {
self.management
.as_ref()
.borrow_has_listener()
.store(false, Ordering::Relaxed);
fatal_panic!(from self, when unsafe { EventImpl::remove_cfg(&self.name, &self.config) },
"This should never happen! Unable to remove resources.");
}
Expand Down Expand Up @@ -286,7 +300,7 @@ impl NotifierBuilder<EventImpl> for Builder {
.clone()
.downcast::<Management>()
.unwrap(),
has_ownership: false,
is_listener: false,
config: self.config,
})
}
Expand Down Expand Up @@ -314,6 +328,7 @@ impl ListenerBuilder<EventImpl> for Builder {

let storage_details = Arc::new(
ManagementBuilder {
has_listener: AtomicBool::new(true),
mtx_handle: MutexHandle::new(),
cvar_builder: |mtx_handle: &MutexHandle<
ConditionVariableData<FixedSizeQueue<TriggerId, DEFAULT_CAPACITY>>,
Expand Down Expand Up @@ -356,7 +371,7 @@ impl ListenerBuilder<EventImpl> for Builder {
.clone()
.downcast::<Management>()
.unwrap(),
has_ownership: true,
is_listener: true,
config: self.config,
})
}
Expand Down
8 changes: 7 additions & 1 deletion iceoryx2-cal/src/event/unix_datagram_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,13 @@ impl crate::event::Notifier for Notifier {
Ok(true) => Ok(()),
Ok(false) | Err(UnixDatagramSendError::MessagePartiallySend(_)) => {
fail!(from self, with NotifierNotifyError::FailedToDeliverSignal,
"{} since the signal could not be delivered", msg);
"{} since the signal could not be delivered.", msg);
}
Err(
UnixDatagramSendError::ConnectionReset | UnixDatagramSendError::ConnectionRefused,
) => {
fail!(from self, with NotifierNotifyError::Disconnected,
"{} since the notifier is no longer connected to the listener.", msg);
}
Err(v) => {
fail!(from self, with NotifierNotifyError::InternalFailure,
Expand Down
3 changes: 3 additions & 0 deletions iceoryx2-cal/tests/communication_channel_trait_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mod communication_channel {
use iceoryx2_bb_posix::unique_system_id::UniqueSystemId;
use iceoryx2_bb_system_types::file_name::FileName;
use iceoryx2_bb_testing::assert_that;
use iceoryx2_bb_testing::watchdog::Watchdog;
use iceoryx2_cal::communication_channel;
use iceoryx2_cal::communication_channel::*;
use iceoryx2_cal::named_concept::*;
Expand Down Expand Up @@ -366,6 +367,8 @@ mod communication_channel {

#[test]
fn custom_suffix_keeps_channels_separated<Sut: CommunicationChannel<usize>>() {
let _watch_dog = Watchdog::new();

let config_1 = <Sut as NamedConceptMgmt>::Configuration::default()
.suffix(unsafe { FileName::new_unchecked(b".s1") });
let config_2 = <Sut as NamedConceptMgmt>::Configuration::default()
Expand Down
14 changes: 14 additions & 0 deletions iceoryx2-cal/tests/event_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,20 @@ mod event {
});
}

#[test]
fn out_of_scope_listener_shall_not_corrupt_notifier<Sut: Event>() {
let name = generate_name();

let sut_listener = Sut::ListenerBuilder::new(&name).create().unwrap();
let sut_notifier = Sut::NotifierBuilder::new(&name).open().unwrap();

drop(sut_listener);

let result = sut_notifier.notify(TriggerId::new(0));
assert_that!(result, is_err);
assert_that!(result.err().unwrap(), eq NotifierNotifyError::Disconnected);
}

#[instantiate_tests(<iceoryx2_cal::event::unix_datagram_socket::EventImpl>)]
mod unix_datagram {}

Expand Down
8 changes: 4 additions & 4 deletions iceoryx2/src/port/details/publisher_connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
//
// SPDX-License-Identifier: Apache-2.0 OR MIT

use std::{cell::UnsafeCell, rc::Rc};
use std::{cell::UnsafeCell, sync::Arc};

use crate::{
config,
Expand Down Expand Up @@ -80,21 +80,21 @@ impl<Service: service::Service> Connection<Service> {
pub(crate) struct PublisherConnections<Service: service::Service> {
connections: Vec<UnsafeCell<Option<Connection<Service>>>>,
subscriber_id: UniqueSubscriberId,
config: Rc<config::Config>,
config: Arc<config::Config>,
static_config: StaticConfig,
}

impl<Service: service::Service> PublisherConnections<Service> {
pub(crate) fn new(
capacity: usize,
subscriber_id: UniqueSubscriberId,
config: &Rc<config::Config>,
config: &Arc<config::Config>,
static_config: &StaticConfig,
) -> Self {
Self {
connections: (0..capacity).map(|_| UnsafeCell::new(None)).collect(),
subscriber_id,
config: Rc::clone(config),
config: Arc::clone(config),
static_config: static_config.clone(),
}
}
Expand Down
8 changes: 4 additions & 4 deletions iceoryx2/src/port/details/subscriber_connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// SPDX-License-Identifier: Apache-2.0 OR MIT

use std::cell::UnsafeCell;
use std::rc::Rc;
use std::sync::Arc;

use iceoryx2_bb_log::fail;
use iceoryx2_cal::named_concept::NamedConceptBuilder;
Expand Down Expand Up @@ -61,22 +61,22 @@ impl<Service: service::Service> Connection<Service> {
pub(crate) struct SubscriberConnections<Service: service::Service> {
connections: Vec<UnsafeCell<Option<Connection<Service>>>>,
port_id: UniquePublisherId,
config: Rc<config::Config>,
config: Arc<config::Config>,
static_config: StaticConfig,
number_of_samples: usize,
}

impl<Service: service::Service> SubscriberConnections<Service> {
pub(crate) fn new(
capacity: usize,
config: &Rc<config::Config>,
config: &Arc<config::Config>,
port_id: UniquePublisherId,
static_config: &StaticConfig,
number_of_samples: usize,
) -> Self {
Self {
connections: (0..capacity).map(|_| UnsafeCell::new(None)).collect(),
config: Rc::clone(config),
config: Arc::clone(config),
port_id,
static_config: static_config.clone(),
number_of_samples,
Expand Down
Loading
Loading