Skip to content

Commit 4225c6e

Browse files
committed
[#429] Use Vec instead of Queue for to_be_removed_connections
1 parent 97dd7e5 commit 4225c6e

File tree

4 files changed

+40
-17
lines changed

4 files changed

+40
-17
lines changed

iceoryx2/src/port/client.rs

+3-5
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use alloc::sync::Arc;
4040
use core::{
4141
cell::UnsafeCell, fmt::Debug, marker::PhantomData, mem::MaybeUninit, sync::atomic::Ordering,
4242
};
43-
use iceoryx2_bb_container::queue::Queue;
43+
use iceoryx2_bb_container::{queue::Queue, vec::Vec};
4444

4545
use iceoryx2_bb_elementary::{cyclic_tagger::CyclicTagger, CallbackProgression};
4646
use iceoryx2_bb_lock_free::mpmc::container::{ContainerHandle, ContainerState};
@@ -371,14 +371,12 @@ impl<
371371
number_of_channels: 1,
372372
},
373373
response_receiver: Receiver {
374-
connections: (0..server_list.capacity())
375-
.map(|_| UnsafeCell::new(None))
376-
.collect(),
374+
connections: Vec::from_fn(server_list.capacity(), |_| UnsafeCell::new(None)),
377375
receiver_port_id: client_port_id.value(),
378376
service_state: service.__internal_state().clone(),
379377
buffer_size: static_config.max_response_buffer_size,
380378
tagger: CyclicTagger::new(),
381-
to_be_removed_connections: Some(UnsafeCell::new(Queue::new(
379+
to_be_removed_connections: Some(UnsafeCell::new(Vec::new(
382380
service
383381
.__internal_state()
384382
.shared_node

iceoryx2/src/port/details/receiver.rs

+32-4
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::service::static_config::message_type_details::MessageTypeDetails;
2424
use crate::service::ServiceState;
2525
use crate::service::{self, config_scheme::connection_config, naming_scheme::connection_name};
2626
use alloc::sync::Arc;
27-
use iceoryx2_bb_container::queue::Queue;
27+
use iceoryx2_bb_container::vec::Vec;
2828
use iceoryx2_bb_elementary::cyclic_tagger::*;
2929
use iceoryx2_bb_log::{fail, warn};
3030
use iceoryx2_cal::named_concept::NamedConceptBuilder;
@@ -111,7 +111,7 @@ pub(crate) struct Receiver<Service: service::Service> {
111111
pub(crate) service_state: Arc<ServiceState<Service>>,
112112
pub(crate) buffer_size: usize,
113113
pub(crate) tagger: CyclicTagger,
114-
pub(crate) to_be_removed_connections: Option<UnsafeCell<Queue<Arc<Connection<Service>>>>>,
114+
pub(crate) to_be_removed_connections: Option<UnsafeCell<Vec<Arc<Connection<Service>>>>>,
115115
pub(crate) degradation_callback: Option<DegradationCallback<'static>>,
116116
pub(crate) message_type_details: MessageTypeDetails,
117117
pub(crate) receiver_max_borrowed_samples: usize,
@@ -278,22 +278,45 @@ impl<Service: service::Service> Receiver<Service> {
278278
&self,
279279
channel_id: ChannelId,
280280
) -> Result<Option<(ChunkDetails<Service>, Chunk)>, ReceiveError> {
281+
let msg = "Unable to receive data";
281282
if let Some(to_be_removed_connections) = &self.to_be_removed_connections {
282283
let to_be_removed_connections = unsafe { &mut *to_be_removed_connections.get() };
283284

284-
if let Some(connection) = to_be_removed_connections.peek() {
285+
let mut clean_connections = Vec::new(to_be_removed_connections.capacity());
286+
for (n, connection) in to_be_removed_connections.iter_mut().enumerate() {
287+
if connection.receiver.borrow_count(channel_id)
288+
== connection.receiver.max_borrowed_samples()
289+
{
290+
continue;
291+
}
292+
285293
if let Some((details, absolute_address)) =
286294
self.receive_from_connection(connection, channel_id)?
287295
{
288296
return Ok(Some((details, absolute_address)));
289297
} else {
290-
to_be_removed_connections.pop();
298+
clean_connections.push(n);
291299
}
292300
}
301+
302+
for idx in clean_connections.iter().rev() {
303+
to_be_removed_connections.remove(*idx);
304+
}
293305
}
294306

307+
let mut active_channel_count = 0;
308+
let mut all_channels_exceed_max_borrows = true;
295309
for id in 0..self.len() {
296310
if let Some(ref mut connection) = &mut self.get_mut(id) {
311+
active_channel_count += 0;
312+
if connection.receiver.borrow_count(channel_id)
313+
>= connection.receiver.max_borrowed_samples()
314+
{
315+
continue;
316+
} else {
317+
all_channels_exceed_max_borrows = false;
318+
}
319+
297320
if let Some((details, absolute_address)) =
298321
self.receive_from_connection(connection, channel_id)?
299322
{
@@ -302,6 +325,11 @@ impl<Service: service::Service> Receiver<Service> {
302325
}
303326
}
304327

328+
if all_channels_exceed_max_borrows && active_channel_count != 0 {
329+
fail!(from self, with ReceiveError::ExceedsMaxBorrows,
330+
"{msg} since every channel exceeds the max number of borrows.");
331+
}
332+
305333
Ok(None)
306334
}
307335

iceoryx2/src/port/server.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ extern crate alloc;
3737
use alloc::sync::Arc;
3838
use core::{cell::UnsafeCell, sync::atomic::Ordering};
3939
use core::{fmt::Debug, marker::PhantomData};
40+
use iceoryx2_bb_container::vec::Vec;
4041
use iceoryx2_cal::zero_copy_connection::ChannelId;
4142
use iceoryx2_pal_concurrency_sync::iox_atomic::{IoxAtomicBool, IoxAtomicUsize};
4243

@@ -236,9 +237,7 @@ impl<
236237
.clients;
237238

238239
let request_receiver = Receiver {
239-
connections: (0..client_list.capacity())
240-
.map(|_| UnsafeCell::new(None))
241-
.collect(),
240+
connections: Vec::from_fn(client_list.capacity(), |_| UnsafeCell::new(None)),
242241
receiver_port_id: server_port_id.value(),
243242
service_state: service.__internal_state().clone(),
244243
message_type_details: static_config.request_message_type_details.clone(),

iceoryx2/src/port/subscriber.rs

+3-5
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use core::sync::atomic::Ordering;
3939

4040
extern crate alloc;
4141

42-
use iceoryx2_bb_container::queue::Queue;
42+
use iceoryx2_bb_container::vec::Vec;
4343
use iceoryx2_bb_elementary::cyclic_tagger::CyclicTagger;
4444
use iceoryx2_bb_elementary::CallbackProgression;
4545
use iceoryx2_bb_lock_free::mpmc::container::{ContainerHandle, ContainerState};
@@ -146,17 +146,15 @@ impl<Service: service::Service, Payload: Debug + ?Sized, UserHeader: Debug>
146146
};
147147

148148
let receiver = Receiver {
149-
connections: (0..publisher_list.capacity())
150-
.map(|_| UnsafeCell::new(None))
151-
.collect(),
149+
connections: Vec::from_fn(publisher_list.capacity(), |_| UnsafeCell::new(None)),
152150
receiver_port_id: subscriber_id.value(),
153151
service_state: service.__internal_state().clone(),
154152
message_type_details: static_config.message_type_details.clone(),
155153
receiver_max_borrowed_samples: static_config.subscriber_max_borrowed_samples,
156154
enable_safe_overflow: static_config.enable_safe_overflow,
157155
buffer_size,
158156
tagger: CyclicTagger::new(),
159-
to_be_removed_connections: Some(UnsafeCell::new(Queue::new(
157+
to_be_removed_connections: Some(UnsafeCell::new(Vec::new(
160158
service
161159
.__internal_state()
162160
.shared_node

0 commit comments

Comments
 (0)