Skip to content

Commit bab8dd1

Browse files
committed
[eclipse-iceoryx#690] Implement slice API in active request
1 parent 76b1208 commit bab8dd1

File tree

5 files changed

+178
-46
lines changed

5 files changed

+178
-46
lines changed

iceoryx2/src/active_request.rs

+119-29
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,19 @@
4242
extern crate alloc;
4343

4444
use alloc::sync::Arc;
45-
use core::{fmt::Debug, marker::PhantomData, mem::MaybeUninit, ops::Deref, sync::atomic::Ordering};
45+
use core::{
46+
any::TypeId, fmt::Debug, marker::PhantomData, mem::MaybeUninit, ops::Deref,
47+
sync::atomic::Ordering,
48+
};
4649
use iceoryx2_bb_elementary::zero_copy_send::ZeroCopySend;
4750
use iceoryx2_pal_concurrency_sync::iox_atomic::IoxAtomicUsize;
4851

4952
use iceoryx2_bb_log::{error, fail};
5053
use iceoryx2_bb_posix::unique_system_id::UniqueSystemId;
51-
use iceoryx2_cal::zero_copy_connection::{ChannelId, ZeroCopyReceiver, ZeroCopyReleaseError};
54+
use iceoryx2_cal::{
55+
shm_allocator::AllocationStrategy,
56+
zero_copy_connection::{ChannelId, ZeroCopyReceiver, ZeroCopyReleaseError},
57+
};
5258

5359
use crate::{
5460
port::{
@@ -60,7 +66,10 @@ use crate::{
6066
raw_sample::{RawSample, RawSampleMut},
6167
response_mut::ResponseMut,
6268
response_mut_uninit::ResponseMutUninit,
63-
service::{self, builder::publish_subscribe::CustomPayloadMarker},
69+
service::{
70+
self, builder::publish_subscribe::CustomPayloadMarker,
71+
static_config::message_type_details::TypeVariant,
72+
},
6473
};
6574

6675
/// Represents a one-to-one connection to a [`Client`](crate::port::client::Client)
@@ -217,6 +226,28 @@ impl<
217226
pub fn origin(&self) -> UniqueClientId {
218227
UniqueClientId(UniqueSystemId::from(self.details.origin))
219228
}
229+
230+
fn increment_loan_counter(&self) -> Result<(), LoanError> {
231+
let mut current_loan_count = self.shared_loan_counter.load(Ordering::Relaxed);
232+
loop {
233+
if self.max_loan_count <= current_loan_count {
234+
fail!(from self,
235+
with LoanError::ExceedsMaxLoans,
236+
"Unable to loan memory for Response since it would exceed the maximum number of loans of {}.",
237+
self.max_loan_count);
238+
}
239+
240+
match self.shared_loan_counter.compare_exchange(
241+
current_loan_count,
242+
current_loan_count + 1,
243+
Ordering::Relaxed,
244+
Ordering::Relaxed,
245+
) {
246+
Ok(_) => return Ok(()),
247+
Err(v) => current_loan_count = v,
248+
}
249+
}
250+
}
220251
}
221252

222253
////////////////////////
@@ -259,25 +290,7 @@ impl<
259290
&self,
260291
) -> Result<ResponseMutUninit<Service, MaybeUninit<ResponsePayload>, ResponseHeader>, LoanError>
261292
{
262-
let mut current_loan_count = self.shared_loan_counter.load(Ordering::Relaxed);
263-
loop {
264-
if self.max_loan_count <= current_loan_count {
265-
fail!(from self,
266-
with LoanError::ExceedsMaxLoans,
267-
"Unable to loan memory for Response since it would exceed the maximum number of loans of {}.",
268-
self.max_loan_count);
269-
}
270-
271-
match self.shared_loan_counter.compare_exchange(
272-
current_loan_count,
273-
current_loan_count + 1,
274-
Ordering::Relaxed,
275-
Ordering::Relaxed,
276-
) {
277-
Ok(_) => break,
278-
Err(v) => current_loan_count = v,
279-
}
280-
}
293+
self.increment_loan_counter()?;
281294

282295
let chunk = self
283296
.shared_state
@@ -412,32 +425,100 @@ impl<
412425
Service: crate::service::Service,
413426
RequestPayload: Debug + ZeroCopySend + ?Sized,
414427
RequestHeader: Debug + ZeroCopySend,
415-
ResponsePayload: Debug + Default + ZeroCopySend,
428+
ResponsePayload: Debug + Default + ZeroCopySend + 'static,
416429
ResponseHeader: Debug + ZeroCopySend,
417430
> ActiveRequest<Service, RequestPayload, RequestHeader, [ResponsePayload], ResponseHeader>
418431
{
419432
pub fn loan_slice(
420433
&self,
421-
number_of_elements: usize,
434+
slice_len: usize,
422435
) -> Result<ResponseMut<Service, [ResponsePayload], ResponseHeader>, LoanError> {
423-
todo!()
436+
let response = self.loan_slice_uninit(slice_len)?;
437+
Ok(response.write_from_fn(|_| ResponsePayload::default()))
424438
}
425439
}
426440

427441
impl<
428442
Service: crate::service::Service,
429443
RequestPayload: Debug + ZeroCopySend + ?Sized,
430444
RequestHeader: Debug + ZeroCopySend,
431-
ResponsePayload: Debug + ZeroCopySend,
445+
ResponsePayload: Debug + ZeroCopySend + 'static,
432446
ResponseHeader: Debug + ZeroCopySend,
433447
> ActiveRequest<Service, RequestPayload, RequestHeader, [ResponsePayload], ResponseHeader>
434448
{
435449
pub fn loan_slice_uninit(
436450
&self,
437-
number_of_elements: usize,
451+
slice_len: usize,
438452
) -> Result<ResponseMutUninit<Service, [MaybeUninit<ResponsePayload>], ResponseHeader>, LoanError>
439453
{
440-
todo!()
454+
debug_assert!(TypeId::of::<ResponsePayload>() != TypeId::of::<CustomPayloadMarker>());
455+
unsafe { self.loan_slice_uninit_impl(slice_len, slice_len) }
456+
}
457+
458+
unsafe fn loan_slice_uninit_impl(
459+
&self,
460+
slice_len: usize,
461+
underlying_number_of_slice_elements: usize,
462+
) -> Result<ResponseMutUninit<Service, [MaybeUninit<ResponsePayload>], ResponseHeader>, LoanError>
463+
{
464+
let max_slice_len = self.shared_state.config.initial_max_slice_len;
465+
466+
if self.shared_state.config.allocation_strategy == AllocationStrategy::Static
467+
&& max_slice_len < slice_len
468+
{
469+
fail!(from self, with LoanError::ExceedsMaxLoanSize,
470+
"Unable to loan slice with {} elements since it would exceed the max supported slice length of {}.",
471+
slice_len, max_slice_len);
472+
}
473+
474+
self.increment_loan_counter()?;
475+
476+
let response_layout = self.shared_state.response_sender.sample_layout(slice_len);
477+
let chunk = self
478+
.shared_state
479+
.response_sender
480+
.allocate(response_layout)?;
481+
482+
unsafe {
483+
(chunk.header as *mut service::header::request_response::ResponseHeader).write(
484+
service::header::request_response::ResponseHeader {
485+
server_port_id: UniqueServerId(UniqueSystemId::from(
486+
self.shared_state.response_sender.sender_port_id,
487+
)),
488+
request_id: self.request_id,
489+
number_of_elements: slice_len as _,
490+
},
491+
)
492+
};
493+
494+
let ptr = unsafe {
495+
RawSampleMut::<
496+
service::header::request_response::ResponseHeader,
497+
ResponseHeader,
498+
[MaybeUninit<ResponsePayload>],
499+
>::new_unchecked(
500+
chunk.header.cast(),
501+
chunk.user_header.cast(),
502+
core::slice::from_raw_parts_mut(
503+
chunk.payload.cast(),
504+
underlying_number_of_slice_elements,
505+
),
506+
)
507+
};
508+
509+
Ok(ResponseMutUninit {
510+
response: ResponseMut {
511+
ptr,
512+
shared_loan_counter: self.shared_loan_counter.clone(),
513+
shared_state: self.shared_state.clone(),
514+
offset_to_chunk: chunk.offset,
515+
channel_id: self.channel_id,
516+
connection_id: self.connection_id,
517+
sample_size: chunk.size,
518+
_response_payload: PhantomData,
519+
_response_header: PhantomData,
520+
},
521+
})
441522
}
442523
}
443524

@@ -462,7 +543,16 @@ impl<
462543
ResponseMutUninit<Service, [MaybeUninit<CustomPayloadMarker>], ResponseHeader>,
463544
LoanError,
464545
> {
465-
todo!()
546+
// TypeVariant::Dynamic == slice and only here it makes sense to loan more than one element
547+
debug_assert!(
548+
slice_len == 1
549+
|| self.shared_state.response_sender.payload_type_variant() == TypeVariant::Dynamic
550+
);
551+
552+
self.loan_slice_uninit_impl(
553+
slice_len,
554+
self.shared_state.response_sender.payload_size() * slice_len,
555+
)
466556
}
467557
}
468558
////////////////////////

iceoryx2/src/port/client.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -309,12 +309,11 @@ impl<
309309

310310
let global_config = service.__internal_state().shared_node.config();
311311
let segment_name = data_segment_name(client_port_id.value());
312-
let data_segment_type = DataSegmentType::Static;
313-
let max_number_of_segments =
314-
DataSegment::<Service>::max_number_of_segments(data_segment_type);
315312
let data_segment_type = DataSegmentType::new_from_allocation_strategy(
316313
client_factory.config.allocation_strategy,
317314
);
315+
let max_number_of_segments =
316+
DataSegment::<Service>::max_number_of_segments(data_segment_type);
318317

319318
let sample_layout = static_config
320319
.request_message_type_details

iceoryx2/src/port/server.rs

+36-9
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ use iceoryx2_bb_posix::unique_system_id::UniqueSystemId;
4949
use iceoryx2_cal::dynamic_storage::DynamicStorage;
5050

5151
use crate::service::naming_scheme::data_segment_name;
52+
use crate::service::port_factory::server::LocalServerConfig;
5253
use crate::{
5354
active_request::ActiveRequest,
5455
prelude::PortFactory,
@@ -80,6 +81,7 @@ const REQUEST_CHANNEL_ID: ChannelId = ChannelId::new(0);
8081

8182
#[derive(Debug)]
8283
pub(crate) struct SharedServerState<Service: service::Service> {
84+
pub(crate) config: LocalServerConfig,
8385
pub(crate) response_sender: Sender<Service>,
8486
server_handle: UnsafeCell<Option<ContainerHandle>>,
8587
request_receiver: Receiver<Service>,
@@ -247,24 +249,45 @@ impl<
247249
};
248250

249251
let global_config = service.__internal_state().shared_node.config();
250-
let data_segment_type = DataSegmentType::Static;
252+
let data_segment_type = DataSegmentType::new_from_allocation_strategy(
253+
server_factory.config.allocation_strategy,
254+
);
251255
let segment_name = data_segment_name(server_port_id.value());
252256
let max_number_of_segments =
253257
DataSegment::<Service>::max_number_of_segments(data_segment_type);
254-
let data_segment = DataSegment::<Service>::create_static_segment(
255-
&segment_name,
256-
static_config.response_message_type_details.sample_layout(1),
257-
global_config,
258-
number_of_responses,
259-
);
258+
let sample_layout = static_config
259+
.response_message_type_details
260+
.sample_layout(server_factory.config.initial_max_slice_len);
261+
let data_segment = match data_segment_type {
262+
DataSegmentType::Static => DataSegment::<Service>::create_static_segment(
263+
&segment_name,
264+
static_config.response_message_type_details.sample_layout(1),
265+
global_config,
266+
number_of_responses,
267+
),
268+
DataSegmentType::Dynamic => DataSegment::<Service>::create_dynamic_segment(
269+
&segment_name,
270+
sample_layout,
271+
global_config,
272+
number_of_responses,
273+
server_factory.config.allocation_strategy,
274+
),
275+
};
260276

261277
let data_segment = fail!(from origin,
262278
when data_segment,
263279
with ServerCreateError::UnableToCreateDataSegment,
264280
"{} since the server data segment could not be created.", msg);
265281

266282
let response_sender = Sender {
267-
segment_states: vec![SegmentState::new(number_of_responses)],
283+
segment_states: {
284+
let mut v =
285+
std::vec::Vec::<SegmentState>::with_capacity(max_number_of_segments as usize);
286+
for _ in 0..max_number_of_segments {
287+
v.push(SegmentState::new(number_of_responses))
288+
}
289+
v
290+
},
268291
data_segment,
269292
connections: (0..client_list.capacity())
270293
.map(|_| UnsafeCell::new(None))
@@ -284,7 +307,7 @@ impl<
284307
service_state: service.__internal_state().clone(),
285308
tagger: CyclicTagger::new(),
286309
loan_counter: IoxAtomicUsize::new(0),
287-
unable_to_deliver_strategy: server_factory.unable_to_deliver_strategy,
310+
unable_to_deliver_strategy: server_factory.config.unable_to_deliver_strategy,
288311
message_type_details: static_config.response_message_type_details.clone(),
289312
number_of_channels: number_of_requests_per_client,
290313
};
@@ -297,6 +320,7 @@ impl<
297320
.request_response()
298321
.enable_fire_and_forget_requests,
299322
shared_state: Arc::new(SharedServerState {
323+
config: server_factory.config,
300324
request_receiver,
301325
client_list_state: UnsafeCell::new(unsafe { client_list.get_state() }),
302326
server_handle: UnsafeCell::new(None),
@@ -327,6 +351,9 @@ impl<
327351
server_port_id,
328352
request_buffer_size: static_config.max_active_requests_per_client,
329353
number_of_responses,
354+
max_slice_len: server_factory.config.initial_max_slice_len,
355+
data_segment_type,
356+
max_number_of_segments,
330357
}) {
331358
Some(v) => Some(v),
332359
None => {

iceoryx2/src/service/dynamic_config/request_response.rs

+3
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ pub struct ServerDetails {
3333
pub server_port_id: UniqueServerId,
3434
pub request_buffer_size: usize,
3535
pub number_of_responses: usize,
36+
pub max_slice_len: usize,
37+
pub data_segment_type: DataSegmentType,
38+
pub max_number_of_segments: u8,
3639
}
3740

3841
#[doc(hidden)]

iceoryx2/src/service/port_factory/server.rs

+18-5
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,13 @@ use iceoryx2_bb_elementary::zero_copy_send::ZeroCopySend;
4040
use iceoryx2_bb_log::{fail, warn};
4141
use iceoryx2_cal::shm_allocator::AllocationStrategy;
4242

43+
#[derive(Debug, Clone, Copy)]
44+
pub(crate) struct LocalServerConfig {
45+
pub(crate) unable_to_deliver_strategy: UnableToDeliverStrategy,
46+
pub(crate) initial_max_slice_len: usize,
47+
pub(crate) allocation_strategy: AllocationStrategy,
48+
}
49+
4350
/// Defines a failure that can occur when a [`Server`] is created with
4451
/// [`crate::service::port_factory::server::PortFactoryServer`].
4552
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
@@ -79,8 +86,8 @@ pub struct PortFactoryServer<
7986
ResponseHeader,
8087
>,
8188

89+
pub(crate) config: LocalServerConfig,
8290
pub(crate) max_loaned_responses_per_request: usize,
83-
pub(crate) unable_to_deliver_strategy: UnableToDeliverStrategy,
8491
pub(crate) request_degradation_callback: Option<DegradationCallback<'static>>,
8592
pub(crate) response_degradation_callback: Option<DegradationCallback<'static>>,
8693
}
@@ -122,7 +129,11 @@ impl<
122129
Self {
123130
factory,
124131
max_loaned_responses_per_request: defs.server_max_loaned_responses_per_request,
125-
unable_to_deliver_strategy: defs.server_unable_to_deliver_strategy,
132+
config: LocalServerConfig {
133+
unable_to_deliver_strategy: defs.server_unable_to_deliver_strategy,
134+
initial_max_slice_len: 1,
135+
allocation_strategy: AllocationStrategy::Static,
136+
},
126137
request_degradation_callback: None,
127138
response_degradation_callback: None,
128139
}
@@ -133,7 +144,7 @@ impl<
133144
/// [`Response`](crate::response::Response) since
134145
/// its internal buffer is full.
135146
pub fn unable_to_deliver_strategy(mut self, value: UnableToDeliverStrategy) -> Self {
136-
self.unable_to_deliver_strategy = value;
147+
self.config.unable_to_deliver_strategy = value;
137148
self
138149
}
139150

@@ -218,10 +229,12 @@ impl<
218229
>
219230
{
220231
pub fn initial_max_slice_len(mut self, value: usize) -> Self {
221-
todo!()
232+
self.config.initial_max_slice_len = value;
233+
self
222234
}
223235

224236
pub fn allocation_strategy(mut self, value: AllocationStrategy) -> Self {
225-
todo!()
237+
self.config.allocation_strategy = value;
238+
self
226239
}
227240
}

0 commit comments

Comments
 (0)