diff --git a/iceoryx2-ffi/ffi/src/api/service_builder.rs b/iceoryx2-ffi/ffi/src/api/service_builder.rs index 6c693bc24..e4f4c5c52 100644 --- a/iceoryx2-ffi/ffi/src/api/service_builder.rs +++ b/iceoryx2-ffi/ffi/src/api/service_builder.rs @@ -16,11 +16,11 @@ use crate::api::{iox2_service_type_e, AssertNonNullHandle, HandleToType}; use crate::{iox2_service_builder_pub_sub_set_user_header_type_details, iox2_type_variant_e}; use iceoryx2::prelude::*; -use iceoryx2::service::builder::publish_subscribe::{CustomHeaderMarker, CustomPayloadMarker}; use iceoryx2::service::builder::{ event::Builder as ServiceBuilderEvent, publish_subscribe::Builder as ServiceBuilderPubSub, Builder as ServiceBuilderBase, }; +use iceoryx2::service::builder::{CustomHeaderMarker, CustomPayloadMarker}; use iceoryx2_bb_elementary::static_assert::*; use iceoryx2_ffi_macros::iceoryx2_ffi; diff --git a/iceoryx2/src/active_request.rs b/iceoryx2/src/active_request.rs index 0e8234e08..2bf48578f 100644 --- a/iceoryx2/src/active_request.rs +++ b/iceoryx2/src/active_request.rs @@ -42,13 +42,19 @@ extern crate alloc; use alloc::sync::Arc; -use core::{fmt::Debug, marker::PhantomData, mem::MaybeUninit, ops::Deref, sync::atomic::Ordering}; +use core::{ + any::TypeId, fmt::Debug, marker::PhantomData, mem::MaybeUninit, ops::Deref, + sync::atomic::Ordering, +}; use iceoryx2_bb_elementary::zero_copy_send::ZeroCopySend; use iceoryx2_pal_concurrency_sync::iox_atomic::IoxAtomicUsize; use iceoryx2_bb_log::{error, fail}; use iceoryx2_bb_posix::unique_system_id::UniqueSystemId; -use iceoryx2_cal::zero_copy_connection::{ChannelId, ZeroCopyReceiver, ZeroCopyReleaseError}; +use iceoryx2_cal::{ + shm_allocator::AllocationStrategy, + zero_copy_connection::{ChannelId, ZeroCopyReceiver, ZeroCopyReleaseError}, +}; use crate::{ port::{ @@ -60,7 +66,9 @@ use crate::{ raw_sample::{RawSample, RawSampleMut}, response_mut::ResponseMut, response_mut_uninit::ResponseMutUninit, - service, + service::{ + self, builder::CustomPayloadMarker, static_config::message_type_details::TypeVariant, + }, }; /// Represents a one-to-one connection to a [`Client`](crate::port::client::Client) @@ -73,9 +81,9 @@ use crate::{ /// [`Response`](crate::response::Response)s. pub struct ActiveRequest< Service: crate::service::Service, - RequestPayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > { pub(crate) ptr: RawSample< @@ -96,9 +104,9 @@ pub struct ActiveRequest< impl< Service: crate::service::Service, - RequestPayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > Debug for ActiveRequest @@ -121,9 +129,9 @@ impl< impl< Service: crate::service::Service, - RequestPayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > Deref for ActiveRequest @@ -136,9 +144,9 @@ impl< impl< Service: crate::service::Service, - RequestPayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > Drop for ActiveRequest @@ -169,9 +177,9 @@ impl< impl< Service: crate::service::Service, - RequestPayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > ActiveRequest { @@ -194,6 +202,64 @@ impl< ) } + /// Returns a reference to the payload of the received + /// [`RequestMut`](crate::request_mut::RequestMut) + pub fn payload(&self) -> &RequestPayload { + self.ptr.as_payload_ref() + } + + /// Returns a reference to the user_header of the received + /// [`RequestMut`](crate::request_mut::RequestMut) + pub fn user_header(&self) -> &RequestHeader { + self.ptr.as_user_header_ref() + } + + /// Returns a reference to the + /// [`crate::service::header::request_response::RequestHeader`] of the received + /// [`RequestMut`](crate::request_mut::RequestMut) + pub fn header(&self) -> &crate::service::header::request_response::RequestHeader { + self.ptr.as_header_ref() + } + + /// Returns the [`UniqueClientId`] of the [`Client`](crate::port::client::Client) + pub fn origin(&self) -> UniqueClientId { + UniqueClientId(UniqueSystemId::from(self.details.origin)) + } + + fn increment_loan_counter(&self) -> Result<(), LoanError> { + let mut current_loan_count = self.shared_loan_counter.load(Ordering::Relaxed); + loop { + if self.max_loan_count <= current_loan_count { + fail!(from self, + with LoanError::ExceedsMaxLoans, + "Unable to loan memory for Response since it would exceed the maximum number of loans of {}.", + self.max_loan_count); + } + + match self.shared_loan_counter.compare_exchange( + current_loan_count, + current_loan_count + 1, + Ordering::Relaxed, + Ordering::Relaxed, + ) { + Ok(_) => return Ok(()), + Err(v) => current_loan_count = v, + } + } + } +} + +//////////////////////// +// BEGIN: typed API +//////////////////////// +impl< + Service: crate::service::Service, + RequestPayload: Debug + ZeroCopySend + ?Sized, + RequestHeader: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + Sized, + ResponseHeader: Debug + ZeroCopySend, + > ActiveRequest +{ /// Loans uninitialized memory for a [`ResponseMut`] where the user can write its payload to. /// /// # Example @@ -223,25 +289,7 @@ impl< &self, ) -> Result, ResponseHeader>, LoanError> { - let mut current_loan_count = self.shared_loan_counter.load(Ordering::Relaxed); - loop { - if self.max_loan_count <= current_loan_count { - fail!(from self, - with LoanError::ExceedsMaxLoans, - "Unable to loan memory for Response since it would exceed the maximum number of loans of {}.", - self.max_loan_count); - } - - match self.shared_loan_counter.compare_exchange( - current_loan_count, - current_loan_count + 1, - Ordering::Relaxed, - Ordering::Relaxed, - ) { - Ok(_) => break, - Err(v) => current_loan_count = v, - } - } + self.increment_loan_counter()?; let chunk = self .shared_state @@ -255,6 +303,7 @@ impl< self.shared_state.response_sender.sender_port_id, )), request_id: self.request_id, + number_of_elements: 1, }, ) }; @@ -321,37 +370,13 @@ impl< response.write_payload(value).send() } - - /// Returns a reference to the payload of the received - /// [`RequestMut`](crate::request_mut::RequestMut) - pub fn payload(&self) -> &RequestPayload { - self.ptr.as_payload_ref() - } - - /// Returns a reference to the user_header of the received - /// [`RequestMut`](crate::request_mut::RequestMut) - pub fn user_header(&self) -> &RequestHeader { - self.ptr.as_user_header_ref() - } - - /// Returns a reference to the - /// [`crate::service::header::request_response::RequestHeader`] of the received - /// [`RequestMut`](crate::request_mut::RequestMut) - pub fn header(&self) -> &crate::service::header::request_response::RequestHeader { - self.ptr.as_header_ref() - } - - /// Returns the [`UniqueClientId`] of the [`Client`](crate::port::client::Client) - pub fn origin(&self) -> UniqueClientId { - UniqueClientId(UniqueSystemId::from(self.details.origin)) - } } impl< Service: crate::service::Service, - RequestPayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + Default + ZeroCopySend, + ResponsePayload: Debug + Default + ZeroCopySend + Sized, ResponseHeader: Debug + ZeroCopySend, > ActiveRequest { @@ -388,3 +413,215 @@ impl< .write_payload(ResponsePayload::default())) } } +//////////////////////// +// END: typed API +//////////////////////// + +//////////////////////// +// BEGIN: sliced API +//////////////////////// +impl< + Service: crate::service::Service, + RequestPayload: Debug + ZeroCopySend + ?Sized, + RequestHeader: Debug + ZeroCopySend, + ResponsePayload: Debug + Default + ZeroCopySend + 'static, + ResponseHeader: Debug + ZeroCopySend, + > ActiveRequest +{ + /// Loans/allocates a [`ResponseMut`] from the underlying data segment of the + /// [`Server`](crate::port::server::Server) + /// and initializes all slice elements with the default value. This can be a performance hit + /// and [`ActiveRequest::loan_slice_uninit()`] can be used to loan a slice of + /// [`core::mem::MaybeUninit`]. + /// + /// On failure it returns [`LoanError`] describing the failure. + /// + /// # Example + /// + /// ``` + /// use iceoryx2::prelude::*; + /// # fn main() -> Result<(), Box> { + /// # let node = NodeBuilder::new().create::()?; + /// # + /// # let service = node.service_builder(&"Whatever6".try_into()?) + /// # .request_response::() + /// # .open_or_create()?; + /// # + /// # let client = service.client_builder().create()?; + /// let server = service.server_builder() + /// .initial_max_slice_len(32) + /// .create()?; + /// # let pending_response = client.send_copy(0)?; + /// let active_request = server.receive()?.unwrap(); + /// + /// let slice_length = 13; + /// let mut response = active_request.loan_slice(slice_length)?; + /// response.send()?; + /// + /// # Ok(()) + /// # } + /// ``` + pub fn loan_slice( + &self, + slice_len: usize, + ) -> Result, LoanError> { + let response = self.loan_slice_uninit(slice_len)?; + Ok(response.write_from_fn(|_| ResponsePayload::default())) + } +} + +impl< + Service: crate::service::Service, + RequestPayload: Debug + ZeroCopySend + ?Sized, + RequestHeader: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + 'static, + ResponseHeader: Debug + ZeroCopySend, + > ActiveRequest +{ + /// Loans/allocates a [`ResponseMutUninit`] from the underlying data segment of the + /// [`Server`](crate::port::server::Server). + /// The user has to initialize the payload before it can be sent. + /// + /// On failure it returns [`LoanError`] describing the failure. + /// + /// # Example + /// + /// ``` + /// use iceoryx2::prelude::*; + /// # fn main() -> Result<(), Box> { + /// # let node = NodeBuilder::new().create::()?; + /// # + /// # let service = node.service_builder(&"Whatever6".try_into()?) + /// # .request_response::() + /// # .open_or_create()?; + /// # + /// # let client = service.client_builder().create()?; + /// let server = service.server_builder() + /// .initial_max_slice_len(32) + /// .create()?; + /// # let pending_response = client.send_copy(0)?; + /// let active_request = server.receive()?.unwrap(); + /// + /// let slice_length = 13; + /// let mut response = active_request.loan_slice_uninit(slice_length)?; + /// for element in response.payload_mut() { + /// element.write(1234); + /// } + /// let response = unsafe { response.assume_init() }; + /// response.send()?; + /// + /// # Ok(()) + /// # } + /// ``` + pub fn loan_slice_uninit( + &self, + slice_len: usize, + ) -> Result], ResponseHeader>, LoanError> + { + debug_assert!(TypeId::of::() != TypeId::of::()); + unsafe { self.loan_slice_uninit_impl(slice_len, slice_len) } + } + + unsafe fn loan_slice_uninit_impl( + &self, + slice_len: usize, + underlying_number_of_slice_elements: usize, + ) -> Result], ResponseHeader>, LoanError> + { + let max_slice_len = self.shared_state.config.initial_max_slice_len; + + if self.shared_state.config.allocation_strategy == AllocationStrategy::Static + && max_slice_len < slice_len + { + fail!(from self, with LoanError::ExceedsMaxLoanSize, + "Unable to loan slice with {} elements since it would exceed the max supported slice length of {}.", + slice_len, max_slice_len); + } + + self.increment_loan_counter()?; + + let response_layout = self.shared_state.response_sender.sample_layout(slice_len); + let chunk = self + .shared_state + .response_sender + .allocate(response_layout)?; + + unsafe { + (chunk.header as *mut service::header::request_response::ResponseHeader).write( + service::header::request_response::ResponseHeader { + server_port_id: UniqueServerId(UniqueSystemId::from( + self.shared_state.response_sender.sender_port_id, + )), + request_id: self.request_id, + number_of_elements: slice_len as _, + }, + ) + }; + + let ptr = unsafe { + RawSampleMut::< + service::header::request_response::ResponseHeader, + ResponseHeader, + [MaybeUninit], + >::new_unchecked( + chunk.header.cast(), + chunk.user_header.cast(), + core::slice::from_raw_parts_mut( + chunk.payload.cast(), + underlying_number_of_slice_elements, + ), + ) + }; + + Ok(ResponseMutUninit { + response: ResponseMut { + ptr, + shared_loan_counter: self.shared_loan_counter.clone(), + shared_state: self.shared_state.clone(), + offset_to_chunk: chunk.offset, + channel_id: self.channel_id, + connection_id: self.connection_id, + sample_size: chunk.size, + _response_payload: PhantomData, + _response_header: PhantomData, + }, + }) + } +} + +impl< + Service: crate::service::Service, + RequestHeader: Debug + ZeroCopySend, + ResponseHeader: Debug + ZeroCopySend, + > + ActiveRequest< + Service, + [CustomPayloadMarker], + RequestHeader, + [CustomPayloadMarker], + ResponseHeader, + > +{ + #[doc(hidden)] + pub unsafe fn loan_custom_payload( + &self, + slice_len: usize, + ) -> Result< + ResponseMutUninit], ResponseHeader>, + LoanError, + > { + // TypeVariant::Dynamic == slice and only here it makes sense to loan more than one element + debug_assert!( + slice_len == 1 + || self.shared_state.response_sender.payload_type_variant() == TypeVariant::Dynamic + ); + + self.loan_slice_uninit_impl( + slice_len, + self.shared_state.response_sender.payload_size() * slice_len, + ) + } +} +//////////////////////// +// END: sliced API +//////////////////////// diff --git a/iceoryx2/src/pending_response.rs b/iceoryx2/src/pending_response.rs index 14fd852e4..97ea379f5 100644 --- a/iceoryx2/src/pending_response.rs +++ b/iceoryx2/src/pending_response.rs @@ -66,6 +66,7 @@ use crate::port::details::chunk::Chunk; use crate::port::details::chunk_details::ChunkDetails; use crate::port::update_connections::ConnectionFailure; use crate::raw_sample::RawSample; +use crate::service::builder::CustomPayloadMarker; use crate::{port::ReceiveError, request_mut::RequestMut, response::Response, service}; /// Represents an active connection to all [`Server`](crate::port::server::Server) @@ -77,9 +78,9 @@ use crate::{port::ReceiveError, request_mut::RequestMut, response::Response, ser /// [`Server`](crate::port::server::Server)s are informed. pub struct PendingResponse< Service: crate::service::Service, - RequestPayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > { pub(crate) request: @@ -92,9 +93,9 @@ pub struct PendingResponse< impl< Service: crate::service::Service, - RequestPayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > Drop for PendingResponse @@ -110,9 +111,9 @@ impl< impl< Service: crate::service::Service, - RequestPayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > Deref for PendingResponse @@ -125,9 +126,9 @@ impl< impl< Service: crate::service::Service, - RequestPayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > Debug for PendingResponse @@ -148,9 +149,9 @@ impl< impl< Service: crate::service::Service, - RequestPayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > PendingResponse { @@ -221,7 +222,16 @@ impl< .response_receiver .receive(self.request.channel_id) } +} +impl< + Service: crate::service::Service, + RequestPayload: Debug + ZeroCopySend + ?Sized, + RequestHeader: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + Sized, + ResponseHeader: Debug + ZeroCopySend, + > PendingResponse +{ /// Receives a [`Response`] from one of the [`Server`](crate::port::server::Server)s that /// received the [`RequestMut`]. /// @@ -281,3 +291,138 @@ impl< } } } + +impl< + Service: crate::service::Service, + RequestPayload: Debug + ZeroCopySend + ?Sized, + RequestHeader: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend, + ResponseHeader: Debug + ZeroCopySend, + > PendingResponse +{ + /// Receives a [`Response`] from one of the [`Server`](crate::port::server::Server)s that + /// received the [`RequestMut`]. + /// + /// # Example + /// + /// ``` + /// use iceoryx2::prelude::*; + /// + /// # fn main() -> Result<(), Box> { + /// # let node = NodeBuilder::new().create::()?; + /// # + /// # let service = node + /// # .service_builder(&"My/Funk/ServiceName".try_into()?) + /// # .request_response::() + /// # .open_or_create()?; + /// # + /// # let client = service.client_builder().create()?; + /// + /// # let request = client.loan_uninit()?; + /// # let request = request.write_payload(0); + /// + /// let pending_response = request.send()?; + /// + /// if let Some(response) = pending_response.receive()? { + /// println!("received response: {:?}", response); + /// } + /// + /// # Ok(()) + /// # } + /// ``` + pub fn receive( + &self, + ) -> Result>, ReceiveError> { + loop { + match self.receive_impl()? { + None => return Ok(None), + Some((details, chunk)) => { + let header = unsafe { + &*(chunk.header as *const service::header::request_response::ResponseHeader) + }; + + let response = Response { + details, + channel_id: self.request.channel_id, + ptr: unsafe { + RawSample::new_slice_unchecked( + chunk.header.cast(), + chunk.user_header.cast(), + core::slice::from_raw_parts( + chunk.payload.cast::(), + header.number_of_elements() as _, + ), + ) + }, + }; + + if response.header().request_id != self.request.header().request_id { + continue; + } + + return Ok(Some(response)); + } + } + } + } +} + +impl< + Service: crate::service::Service, + RequestHeader: Debug + ZeroCopySend, + ResponseHeader: Debug + ZeroCopySend, + > + PendingResponse< + Service, + [CustomPayloadMarker], + RequestHeader, + [CustomPayloadMarker], + ResponseHeader, + > +{ + #[doc(hidden)] + pub unsafe fn receive_custom_payload( + &self, + ) -> Result>, ReceiveError> + { + loop { + match self.receive_impl()? { + None => return Ok(None), + Some((details, chunk)) => { + let header = unsafe { + &*(chunk.header as *const service::header::request_response::ResponseHeader) + }; + + let number_of_elements = (*header).number_of_elements(); + let number_of_bytes = number_of_elements as usize + * self + .request + .client_shared_state + .response_receiver + .payload_size(); + + let response = Response { + details, + channel_id: self.request.channel_id, + ptr: unsafe { + RawSample::new_slice_unchecked( + chunk.header.cast(), + chunk.user_header.cast(), + core::slice::from_raw_parts( + chunk.payload.cast::(), + number_of_bytes as _, + ), + ) + }, + }; + + if response.header().request_id != self.request.header().request_id { + continue; + } + + return Ok(Some(response)); + } + } + } + } +} diff --git a/iceoryx2/src/port/client.rs b/iceoryx2/src/port/client.rs index 4fa6149c4..6c9c0a358 100644 --- a/iceoryx2/src/port/client.rs +++ b/iceoryx2/src/port/client.rs @@ -38,7 +38,8 @@ extern crate alloc; use alloc::sync::Arc; use core::{ - cell::UnsafeCell, fmt::Debug, marker::PhantomData, mem::MaybeUninit, sync::atomic::Ordering, + any::TypeId, cell::UnsafeCell, fmt::Debug, marker::PhantomData, mem::MaybeUninit, + sync::atomic::Ordering, }; use iceoryx2_bb_container::{queue::Queue, vec::Vec}; @@ -48,7 +49,9 @@ use iceoryx2_bb_elementary::{ use iceoryx2_bb_lock_free::mpmc::container::{ContainerHandle, ContainerState}; use iceoryx2_bb_log::{fail, fatal_panic, warn}; use iceoryx2_cal::{ - dynamic_storage::DynamicStorage, shm_allocator::PointerOffset, zero_copy_connection::ChannelId, + dynamic_storage::DynamicStorage, + shm_allocator::{AllocationStrategy, PointerOffset}, + zero_copy_connection::ChannelId, }; use iceoryx2_pal_concurrency_sync::iox_atomic::{IoxAtomicBool, IoxAtomicU64, IoxAtomicUsize}; @@ -61,9 +64,12 @@ use crate::{ request_mut_uninit::RequestMutUninit, service::{ self, + builder::CustomPayloadMarker, dynamic_config::request_response::{ClientDetails, ServerDetails}, + header, naming_scheme::data_segment_name, - port_factory::client::{ClientCreateError, PortFactoryClient}, + port_factory::client::{ClientCreateError, LocalClientConfig, PortFactoryClient}, + static_config::message_type_details::TypeVariant, }, }; @@ -118,6 +124,7 @@ impl core::error::Error for RequestSendError {} #[derive(Debug)] pub(crate) struct ClientSharedState { + pub(crate) config: LocalClientConfig, pub(crate) request_sender: Sender, pub(crate) response_receiver: Receiver, client_handle: UnsafeCell>, @@ -213,8 +220,8 @@ impl ClientSharedState { h.index() as usize, SenderDetails { port_id: port.server_port_id.value(), - max_number_of_segments: 1, - data_segment_type: DataSegmentType::Static, + max_number_of_segments: port.max_number_of_segments, + data_segment_type: port.data_segment_type, number_of_samples: port.number_of_responses, }, ); @@ -249,9 +256,9 @@ impl ClientSharedState { #[derive(Debug)] pub struct Client< Service: service::Service, - RequestPayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > { client_port_id: UniqueClientId, @@ -265,9 +272,9 @@ pub struct Client< impl< Service: service::Service, - RequestPayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > Client { @@ -302,15 +309,31 @@ impl< let global_config = service.__internal_state().shared_node.config(); let segment_name = data_segment_name(client_port_id.value()); - let data_segment_type = DataSegmentType::Static; + let data_segment_type = DataSegmentType::new_from_allocation_strategy( + client_factory.config.allocation_strategy, + ); let max_number_of_segments = DataSegment::::max_number_of_segments(data_segment_type); - let data_segment = DataSegment::::create_static_segment( - &segment_name, - static_config.request_message_type_details.sample_layout(1), - global_config, - number_of_requests, - ); + + let sample_layout = static_config + .request_message_type_details + .sample_layout(client_factory.config.initial_max_slice_len); + + let data_segment = match data_segment_type { + DataSegmentType::Static => DataSegment::::create_static_segment( + &segment_name, + sample_layout, + global_config, + number_of_requests, + ), + DataSegmentType::Dynamic => DataSegment::::create_dynamic_segment( + &segment_name, + sample_layout, + global_config, + number_of_requests, + client_factory.config.allocation_strategy, + ), + }; let data_segment = fail!(from origin, when data_segment, @@ -322,11 +345,21 @@ impl< node_id: *service.__internal_state().shared_node.id(), number_of_requests, response_buffer_size: static_config.max_response_buffer_size, + max_slice_len: client_factory.config.initial_max_slice_len, + data_segment_type, + max_number_of_segments, }; let request_sender = Sender { data_segment, - segment_states: vec![SegmentState::new(number_of_requests)], + segment_states: { + let mut v = + alloc::vec::Vec::::with_capacity(max_number_of_segments as usize); + for _ in 0..max_number_of_segments { + v.push(SegmentState::new(number_of_requests)) + } + v + }, sender_port_id: client_port_id.value(), shared_node: service.__internal_state().shared_node.clone(), connections: (0..server_list.capacity()) @@ -342,7 +375,7 @@ impl< tagger: CyclicTagger::new(), loan_counter: IoxAtomicUsize::new(0), sender_max_borrowed_samples: static_config.max_loaned_requests, - unable_to_deliver_strategy: client_factory.unable_to_deliver_strategy, + unable_to_deliver_strategy: client_factory.config.unable_to_deliver_strategy, message_type_details: static_config.request_message_type_details.clone(), // all requests are sent via one channel, only the responses require different // channels to guarantee that one response does not fill the buffer of another @@ -378,6 +411,7 @@ impl< let new_self = Self { request_id_counter: IoxAtomicU64::new(0), client_shared_state: Arc::new(ClientSharedState { + config: client_factory.config, client_handle: UnsafeCell::new(None), available_channel_ids: { let mut queue = Queue::new(number_of_requests); @@ -440,7 +474,19 @@ impl< .request_sender .unable_to_deliver_strategy } +} +//////////////////////// +// BEGIN: typed API +//////////////////////// +impl< + Service: service::Service, + RequestPayload: Debug + ZeroCopySend, + RequestHeader: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, + ResponseHeader: Debug + ZeroCopySend, + > Client +{ /// Acquires an [`RequestMutUninit`] to store payload. This API shall be used /// by default to avoid unnecessary copies. /// @@ -531,6 +577,7 @@ impl< client_port_id: self.id(), channel_id, request_id: self.request_id_counter.fetch_add(1, Ordering::Relaxed), + number_of_elements: 1, }, ) }; @@ -585,7 +632,7 @@ impl< Service: service::Service, RequestPayload: Debug + Default + ZeroCopySend, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > Client { @@ -627,3 +674,251 @@ impl< Ok(self.loan_uninit()?.write_payload(RequestPayload::default())) } } + +//////////////////////// +// END: typed API +//////////////////////// + +//////////////////////// +// BEGIN: sliced API +//////////////////////// +impl< + Service: service::Service, + RequestPayload: Default + Debug + ZeroCopySend + 'static, + RequestHeader: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, + ResponseHeader: Debug + ZeroCopySend, + > Client +{ + /// Loans/allocates a [`RequestMut`] from the underlying data segment of the [`Client`] + /// and initializes all slice elements with the default value. This can be a performance hit + /// and [`Client::loan_slice_uninit()`] can be used to loan a slice of + /// [`core::mem::MaybeUninit`]. + /// + /// On failure it returns [`LoanError`] describing the failure. + /// + /// # Example + /// + /// ``` + /// use iceoryx2::prelude::*; + /// + /// # fn main() -> Result<(), Box> { + /// # let node = NodeBuilder::new().create::()?; + /// # + /// let service = node + /// .service_builder(&"My/Funk/ServiceName".try_into()?) + /// .request_response::<[u64], u64>() + /// .open_or_create()?; + /// + /// let client = service.client_builder() + /// .initial_max_slice_len(32) + /// .create()?; + /// + /// let slice_length = 13; + /// let mut request = client.loan_slice(slice_length)?; + /// for element in request.payload_mut() { + /// *element = 1234; + /// } + /// + /// let pending_response = request.send()?; + /// + /// # Ok(()) + /// # } + /// ``` + pub fn loan_slice( + &self, + slice_len: usize, + ) -> Result< + RequestMut, + LoanError, + > { + let request = self.loan_slice_uninit(slice_len)?; + Ok(request.write_from_fn(|_| RequestPayload::default())) + } +} + +impl< + Service: service::Service, + RequestPayload: Debug + ZeroCopySend + 'static, + RequestHeader: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, + ResponseHeader: Debug + ZeroCopySend, + > Client +{ + /// Loans/allocates a [`RequestMutUninit`] from the underlying data segment of the [`Client`]. + /// The user has to initialize the payload before it can be sent. + /// + /// On failure it returns [`LoanError`] describing the failure. + /// + /// # Example + /// + /// ``` + /// use iceoryx2::prelude::*; + /// + /// # fn main() -> Result<(), Box> { + /// # let node = NodeBuilder::new().create::()?; + /// # + /// let service = node + /// .service_builder(&"My/Funk/ServiceName".try_into()?) + /// .request_response::<[u64], u64>() + /// .open_or_create()?; + /// + /// let client = service.client_builder() + /// .initial_max_slice_len(32) + /// .create()?; + /// + /// let slice_length = 13; + /// let mut request = client.loan_slice_uninit(slice_length)?; + /// for element in request.payload_mut() { + /// element.write(1234); + /// } + /// // we have written the payload, initialize the request + /// let request = unsafe { request.assume_init() }; + /// + /// let pending_response = request.send()?; + /// + /// # Ok(()) + /// # } + /// ``` + #[allow(clippy::type_complexity)] // type alias would require 5 generic parameters which hardly reduces complexity + pub fn loan_slice_uninit( + &self, + slice_len: usize, + ) -> Result< + RequestMutUninit< + Service, + [MaybeUninit], + RequestHeader, + ResponsePayload, + ResponseHeader, + >, + LoanError, + > { + debug_assert!(TypeId::of::() != TypeId::of::()); + unsafe { self.loan_slice_uninit_impl(slice_len, slice_len) } + } + + #[allow(clippy::type_complexity)] // type alias would require 5 generic parameters which hardly reduces complexity + unsafe fn loan_slice_uninit_impl( + &self, + slice_len: usize, + underlying_number_of_slice_elements: usize, + ) -> Result< + RequestMutUninit< + Service, + [MaybeUninit], + RequestHeader, + ResponsePayload, + ResponseHeader, + >, + LoanError, + > { + let max_slice_len = self.client_shared_state.config.initial_max_slice_len; + + if self.client_shared_state.config.allocation_strategy == AllocationStrategy::Static + && max_slice_len < slice_len + { + fail!(from self, with LoanError::ExceedsMaxLoanSize, + "Unable to loan slice with {} elements since it would exceed the max supported slice length of {}.", + slice_len, max_slice_len); + } + + let request_layout = self + .client_shared_state + .request_sender + .sample_layout(slice_len); + let chunk = self + .client_shared_state + .request_sender + .allocate(request_layout)?; + + let channel_id = + match unsafe { &mut *self.client_shared_state.available_channel_ids.get() }.pop() { + Some(channel_id) => channel_id, + None => { + fatal_panic!(from self, + "This should never happen! There are no more available response channels."); + } + }; + + let header_ptr = chunk.header as *mut header::request_response::RequestHeader; + unsafe { + header_ptr.write(header::request_response::RequestHeader { + client_port_id: self.id(), + channel_id, + request_id: self.request_id_counter.fetch_add(1, Ordering::Relaxed), + number_of_elements: slice_len as _, + }) + }; + + let ptr = unsafe { + RawSampleMut::< + service::header::request_response::RequestHeader, + RequestHeader, + [MaybeUninit], + >::new_unchecked( + chunk.header.cast(), + chunk.user_header.cast(), + core::slice::from_raw_parts_mut( + chunk.payload.cast(), + underlying_number_of_slice_elements, + ), + ) + }; + + Ok(RequestMutUninit { + request: RequestMut { + ptr, + sample_size: chunk.size, + channel_id, + offset_to_chunk: chunk.offset, + client_shared_state: self.client_shared_state.clone(), + _response_payload: PhantomData, + _response_header: PhantomData, + was_sample_sent: IoxAtomicBool::new(false), + }, + }) + } +} + +impl< + Service: service::Service, + RequestHeader: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, + ResponseHeader: Debug + ZeroCopySend, + > Client +{ + #[doc(hidden)] + #[allow(clippy::type_complexity)] // type alias would require 5 generic parameters which hardly reduces complexity + pub unsafe fn loan_custom_payload( + &self, + slice_len: usize, + ) -> Result< + RequestMutUninit< + Service, + [MaybeUninit], + RequestHeader, + ResponsePayload, + ResponseHeader, + >, + LoanError, + > { + // TypeVariant::Dynamic == slice and only here it makes sense to loan more than one element + debug_assert!( + slice_len == 1 + || self + .client_shared_state + .request_sender + .payload_type_variant() + == TypeVariant::Dynamic + ); + + self.loan_slice_uninit_impl( + slice_len, + self.client_shared_state.request_sender.payload_size() * slice_len, + ) + } +} +//////////////////////// +// END: sliced API +//////////////////////// diff --git a/iceoryx2/src/port/publisher.rs b/iceoryx2/src/port/publisher.rs index 10e51dfe9..e7fd4d955 100644 --- a/iceoryx2/src/port/publisher.rs +++ b/iceoryx2/src/port/publisher.rs @@ -110,7 +110,7 @@ use crate::port::update_connections::{ConnectionFailure, UpdateConnections}; use crate::prelude::UnableToDeliverStrategy; use crate::raw_sample::RawSampleMut; use crate::sample_mut_uninit::SampleMutUninit; -use crate::service::builder::publish_subscribe::CustomPayloadMarker; +use crate::service::builder::CustomPayloadMarker; use crate::service::config_scheme::{connection_config, data_segment_config}; use crate::service::dynamic_config::publish_subscribe::{PublisherDetails, SubscriberDetails}; use crate::service::header::publish_subscribe::Header; diff --git a/iceoryx2/src/port/server.rs b/iceoryx2/src/port/server.rs index f0d2df196..54d3ac720 100644 --- a/iceoryx2/src/port/server.rs +++ b/iceoryx2/src/port/server.rs @@ -48,7 +48,9 @@ use iceoryx2_bb_log::{fail, warn}; use iceoryx2_bb_posix::unique_system_id::UniqueSystemId; use iceoryx2_cal::dynamic_storage::DynamicStorage; +use crate::service::builder::CustomPayloadMarker; use crate::service::naming_scheme::data_segment_name; +use crate::service::port_factory::server::LocalServerConfig; use crate::{ active_request::ActiveRequest, prelude::PortFactory, @@ -80,6 +82,7 @@ const REQUEST_CHANNEL_ID: ChannelId = ChannelId::new(0); #[derive(Debug)] pub(crate) struct SharedServerState { + pub(crate) config: LocalServerConfig, pub(crate) response_sender: Sender, server_handle: UnsafeCell>, request_receiver: Receiver, @@ -131,8 +134,8 @@ impl SharedServerState { SenderDetails { port_id: details.client_port_id.value(), number_of_samples: details.number_of_requests, - max_number_of_segments: 1, - data_segment_type: DataSegmentType::Static, + max_number_of_segments: details.max_number_of_segments, + data_segment_type: details.data_segment_type, }, ); result = result.and(inner_result); @@ -168,9 +171,9 @@ impl SharedServerState { #[derive(Debug)] pub struct Server< Service: service::Service, - RequestPayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > { shared_state: Arc>, @@ -184,9 +187,9 @@ pub struct Server< impl< Service: service::Service, - RequestPayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > Server { @@ -247,16 +250,30 @@ impl< }; let global_config = service.__internal_state().shared_node.config(); - let data_segment_type = DataSegmentType::Static; + let data_segment_type = DataSegmentType::new_from_allocation_strategy( + server_factory.config.allocation_strategy, + ); let segment_name = data_segment_name(server_port_id.value()); let max_number_of_segments = DataSegment::::max_number_of_segments(data_segment_type); - let data_segment = DataSegment::::create_static_segment( - &segment_name, - static_config.response_message_type_details.sample_layout(1), - global_config, - number_of_responses, - ); + let sample_layout = static_config + .response_message_type_details + .sample_layout(server_factory.config.initial_max_slice_len); + let data_segment = match data_segment_type { + DataSegmentType::Static => DataSegment::::create_static_segment( + &segment_name, + sample_layout, + global_config, + number_of_responses, + ), + DataSegmentType::Dynamic => DataSegment::::create_dynamic_segment( + &segment_name, + sample_layout, + global_config, + number_of_responses, + server_factory.config.allocation_strategy, + ), + }; let data_segment = fail!(from origin, when data_segment, @@ -264,7 +281,14 @@ impl< "{} since the server data segment could not be created.", msg); let response_sender = Sender { - segment_states: vec![SegmentState::new(number_of_responses)], + segment_states: { + let mut v = + alloc::vec::Vec::::with_capacity(max_number_of_segments as usize); + for _ in 0..max_number_of_segments { + v.push(SegmentState::new(number_of_responses)) + } + v + }, data_segment, connections: (0..client_list.capacity()) .map(|_| UnsafeCell::new(None)) @@ -284,7 +308,7 @@ impl< service_state: service.__internal_state().clone(), tagger: CyclicTagger::new(), loan_counter: IoxAtomicUsize::new(0), - unable_to_deliver_strategy: server_factory.unable_to_deliver_strategy, + unable_to_deliver_strategy: server_factory.config.unable_to_deliver_strategy, message_type_details: static_config.response_message_type_details.clone(), number_of_channels: number_of_requests_per_client, }; @@ -297,6 +321,7 @@ impl< .request_response() .enable_fire_and_forget_requests, shared_state: Arc::new(SharedServerState { + config: server_factory.config, request_receiver, client_list_state: UnsafeCell::new(unsafe { client_list.get_state() }), server_handle: UnsafeCell::new(None), @@ -327,6 +352,9 @@ impl< server_port_id, request_buffer_size: static_config.max_active_requests_per_client, number_of_responses, + max_slice_len: server_factory.config.initial_max_slice_len, + data_segment_type, + max_number_of_segments, }) { Some(v) => Some(v), None => { @@ -368,6 +396,46 @@ impl< .request_receiver .receive(REQUEST_CHANNEL_ID) } +} + +impl< + Service: service::Service, + RequestPayload: Debug + ZeroCopySend, + RequestHeader: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, + ResponseHeader: Debug + ZeroCopySend, + > Server +{ + fn create_active_request( + &self, + details: ChunkDetails, + chunk: Chunk, + connection_id: usize, + ) -> ActiveRequest + { + let header = + unsafe { &*(chunk.header as *const service::header::request_response::RequestHeader) }; + + ActiveRequest { + details, + shared_loan_counter: Arc::new(IoxAtomicUsize::new(0)), + max_loan_count: self.max_loaned_responses_per_request, + request_id: header.request_id, + channel_id: header.channel_id, + connection_id, + shared_state: self.shared_state.clone(), + ptr: unsafe { + RawSample::new_unchecked( + chunk.header.cast(), + chunk.user_header.cast(), + chunk.payload.cast::(), + ) + }, + + _response_payload: PhantomData, + _response_header: PhantomData, + } + } /// Receives a [`RequestMut`](crate::request_mut::RequestMut) that was sent by a /// [`Client`](crate::port::client::Client) and returns an [`ActiveRequest`] which @@ -417,24 +485,182 @@ impl< .response_sender .get_connection_id_of(header.client_port_id.value()) { - let active_request = ActiveRequest { + let active_request = + self.create_active_request(details, chunk, connection_id); + + if !self.enable_fire_and_forget && !active_request.is_connected() { + continue; + } + + return Ok(Some(active_request)); + } + } + None => return Ok(None), + } + } + } +} + +impl< + Service: service::Service, + RequestPayload: Debug + ZeroCopySend, + RequestHeader: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, + ResponseHeader: Debug + ZeroCopySend, + > Server +{ + fn create_active_request( + &self, + details: ChunkDetails, + chunk: Chunk, + connection_id: usize, + number_of_elements: usize, + ) -> ActiveRequest + { + let header = + unsafe { &*(chunk.header as *const service::header::request_response::RequestHeader) }; + + ActiveRequest { + details, + shared_loan_counter: Arc::new(IoxAtomicUsize::new(0)), + max_loan_count: self.max_loaned_responses_per_request, + request_id: header.request_id, + channel_id: header.channel_id, + connection_id, + shared_state: self.shared_state.clone(), + ptr: unsafe { + RawSample::new_slice_unchecked( + chunk.header.cast(), + chunk.user_header.cast(), + core::slice::from_raw_parts( + chunk.payload.cast::(), + number_of_elements as _, + ), + ) + }, + _response_payload: PhantomData, + _response_header: PhantomData, + } + } + + /// Receives a [`RequestMut`](crate::request_mut::RequestMut) that was sent by a + /// [`Client`](crate::port::client::Client) and returns an [`ActiveRequest`] which + /// can be used to respond. + /// If no [`RequestMut`](crate::request_mut::RequestMut)s were received it + /// returns [`None`]. + /// + /// # Example + /// + /// ``` + /// use iceoryx2::prelude::*; + /// + /// # fn main() -> Result<(), Box> { + /// # let node = NodeBuilder::new().create::()?; + /// # + /// let service = node + /// .service_builder(&"My/Funk/ServiceName".try_into()?) + /// .request_response::<[u64], u64>() + /// .open_or_create()?; + /// + /// let server = service.server_builder().create()?; + /// + /// while let Some(active_request) = server.receive()? { + /// println!("received request: {:?}", active_request); + /// } + /// # Ok(()) + /// # } + /// ``` + #[allow(clippy::type_complexity)] // type alias would require 5 generic parameters which hardly reduces complexity + pub fn receive( + &self, + ) -> Result< + Option< + ActiveRequest< + Service, + [RequestPayload], + RequestHeader, + ResponsePayload, + ResponseHeader, + >, + >, + ReceiveError, + > { + loop { + match self.receive_impl()? { + Some((details, chunk)) => { + let header = unsafe { + &*(chunk.header as *const service::header::request_response::RequestHeader) + }; + + if let Some(connection_id) = self + .shared_state + .response_sender + .get_connection_id_of(header.client_port_id.value()) + { + let active_request = self.create_active_request( + details, + chunk, + connection_id, + header.number_of_elements() as _, + ); + + if !self.enable_fire_and_forget && !active_request.is_connected() { + continue; + } + + return Ok(Some(active_request)); + } + } + None => return Ok(None), + } + } + } +} + +impl< + Service: service::Service, + RequestHeader: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, + ResponseHeader: Debug + ZeroCopySend, + > Server +{ + #[doc(hidden)] + #[allow(clippy::type_complexity)] // type alias would require 5 generic parameters which hardly reduces complexity + pub unsafe fn receive_custom_payload( + &self, + ) -> Result< + Option< + ActiveRequest< + Service, + [CustomPayloadMarker], + RequestHeader, + ResponsePayload, + ResponseHeader, + >, + >, + ReceiveError, + > { + loop { + match self.receive_impl()? { + Some((details, chunk)) => { + let header = unsafe { + &*(chunk.header as *const service::header::request_response::RequestHeader) + }; + let number_of_elements = (*header).number_of_elements(); + let number_of_bytes = number_of_elements as usize + * self.shared_state.request_receiver.payload_size(); + + if let Some(connection_id) = self + .shared_state + .response_sender + .get_connection_id_of(header.client_port_id.value()) + { + let active_request = self.create_active_request( details, - shared_loan_counter: Arc::new(IoxAtomicUsize::new(0)), - max_loan_count: self.max_loaned_responses_per_request, - request_id: header.request_id, - channel_id: header.channel_id, + chunk, connection_id, - shared_state: self.shared_state.clone(), - ptr: unsafe { - RawSample::new_unchecked( - chunk.header.cast(), - chunk.user_header.cast(), - chunk.payload.cast::(), - ) - }, - _response_payload: PhantomData, - _response_header: PhantomData, - }; + number_of_bytes, + ); if !self.enable_fire_and_forget && !active_request.is_connected() { continue; diff --git a/iceoryx2/src/port/subscriber.rs b/iceoryx2/src/port/subscriber.rs index 5d2654831..40f560904 100644 --- a/iceoryx2/src/port/subscriber.rs +++ b/iceoryx2/src/port/subscriber.rs @@ -49,7 +49,7 @@ use iceoryx2_bb_posix::unique_system_id::UniqueSystemId; use iceoryx2_cal::dynamic_storage::DynamicStorage; use iceoryx2_cal::zero_copy_connection::ChannelId; -use crate::service::builder::publish_subscribe::CustomPayloadMarker; +use crate::service::builder::CustomPayloadMarker; use crate::service::dynamic_config::publish_subscribe::{PublisherDetails, SubscriberDetails}; use crate::service::header::publish_subscribe::Header; use crate::service::port_factory::subscriber::SubscriberConfig; diff --git a/iceoryx2/src/raw_sample.rs b/iceoryx2/src/raw_sample.rs index 5e22e5720..fb5b2aed7 100644 --- a/iceoryx2/src/raw_sample.rs +++ b/iceoryx2/src/raw_sample.rs @@ -108,7 +108,7 @@ impl Clone for RawSample Copy for RawSample {} -impl fmt::Debug for RawSample { +impl fmt::Debug for RawSample { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, @@ -202,7 +202,7 @@ impl Clone for RawSampleMut Copy for RawSampleMut {} -impl fmt::Debug for RawSampleMut { +impl fmt::Debug for RawSampleMut { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, diff --git a/iceoryx2/src/request_mut.rs b/iceoryx2/src/request_mut.rs index 60a59e9dd..ac567d078 100644 --- a/iceoryx2/src/request_mut.rs +++ b/iceoryx2/src/request_mut.rs @@ -62,9 +62,9 @@ use crate::{ /// [`Server`](crate::port::server::Server). pub struct RequestMut< Service: crate::service::Service, - RequestPayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > { pub(crate) ptr: RawSampleMut< @@ -83,9 +83,9 @@ pub struct RequestMut< impl< Service: crate::service::Service, - RequestPayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > Drop for RequestMut { @@ -111,9 +111,9 @@ impl< impl< Service: crate::service::Service, - RequestPayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > Debug for RequestMut @@ -138,9 +138,9 @@ impl< impl< Service: crate::service::Service, - RequestPayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > Deref for RequestMut @@ -153,9 +153,9 @@ impl< impl< Service: crate::service::Service, - RequestPayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > DerefMut for RequestMut @@ -167,9 +167,9 @@ impl< impl< Service: crate::service::Service, - RequestPayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > RequestMut { diff --git a/iceoryx2/src/request_mut_uninit.rs b/iceoryx2/src/request_mut_uninit.rs index 660f5dcc1..9172260e6 100644 --- a/iceoryx2/src/request_mut_uninit.rs +++ b/iceoryx2/src/request_mut_uninit.rs @@ -48,9 +48,9 @@ use core::{fmt::Debug, mem::MaybeUninit}; #[repr(transparent)] pub struct RequestMutUninit< Service: crate::service::Service, - RequestPayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > { pub(crate) request: @@ -59,9 +59,9 @@ pub struct RequestMutUninit< impl< Service: crate::service::Service, - RequestPayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > Debug for RequestMutUninit @@ -73,9 +73,9 @@ impl< impl< Service: crate::service::Service, - RequestPayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > RequestMutUninit { @@ -108,9 +108,9 @@ impl< impl< Service: crate::service::Service, - RequestPayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + Sized, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > RequestMutUninit< @@ -172,3 +172,159 @@ impl< core::mem::transmute(self.request) } } + +impl< + Service: crate::service::Service, + RequestPayload: Debug + ZeroCopySend, + RequestHeader: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, + ResponseHeader: Debug + ZeroCopySend, + > + RequestMutUninit< + Service, + [MaybeUninit], + RequestHeader, + ResponsePayload, + ResponseHeader, + > +{ + /// When the payload is manually populated by using + /// [`RequestMutUninit::payload_mut()`], then this function can be used + /// to convert it into the initialized [`RequestMut`] version. + /// + /// # Example + /// + /// ``` + /// use iceoryx2::prelude::*; + /// + /// # fn main() -> Result<(), Box> { + /// # let node = NodeBuilder::new().create::()?; + /// # + /// let service = node + /// .service_builder(&"My/Funk/ServiceName".try_into()?) + /// .request_response::<[u64], u64>() + /// .open_or_create()?; + /// + /// let client = service.client_builder() + /// .initial_max_slice_len(32) + /// .create()?; + /// + /// let slice_length = 13; + /// let mut request = client.loan_slice_uninit(slice_length)?; + /// for element in request.payload_mut() { + /// element.write(1234); + /// } + /// // we have written the payload, initialize the request + /// let request = unsafe { request.assume_init() }; + /// + /// let pending_response = request.send()?; + /// + /// # Ok(()) + /// # } + /// ``` + /// # Safety + /// + /// The caller must ensure that [`core::mem::MaybeUninit`] really is initialized. + /// Sending the content when it is not fully initialized causes immediate undefined behavior. + pub unsafe fn assume_init( + self, + ) -> RequestMut { + // the transmute is not nice but safe since MaybeUninit is #[repr(transparent)] to the inner type + core::mem::transmute(self.request) + } + + /// Writes the payload to the [`RequestMutUninit`] and labels the [`RequestMutUninit`] as + /// initialized + /// + /// # Example + /// + /// ``` + /// use iceoryx2::prelude::*; + /// + /// # fn main() -> Result<(), Box> { + /// # let node = NodeBuilder::new().create::()?; + /// # + /// let service = node + /// .service_builder(&"My/Funk/ServiceName".try_into()?) + /// .request_response::<[usize], u64>() + /// .open_or_create()?; + /// + /// let client = service.client_builder() + /// .initial_max_slice_len(32) + /// .create()?; + /// + /// let slice_length = 13; + /// let mut request = client.loan_slice_uninit(slice_length)?; + /// let request = request.write_from_fn(|index| index + 123); + /// + /// let pending_response = request.send()?; + /// + /// # Ok(()) + /// # } + /// ``` + pub fn write_from_fn RequestPayload>( + mut self, + mut initializer: F, + ) -> RequestMut { + for (i, element) in self.payload_mut().iter_mut().enumerate() { + element.write(initializer(i)); + } + + // SAFETY: this is safe since the payload was initialized on the line above + unsafe { self.assume_init() } + } +} + +impl< + Service: crate::service::Service, + RequestPayload: Debug + Copy + ZeroCopySend, + RequestHeader: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, + ResponseHeader: Debug + ZeroCopySend, + > + RequestMutUninit< + Service, + [MaybeUninit], + RequestHeader, + ResponsePayload, + ResponseHeader, + > +{ + /// Writes the payload by mem copying the provided slice into the [`RequestMutUninit`]. + /// + /// # Example + /// + /// ``` + /// use iceoryx2::prelude::*; + /// + /// # fn main() -> Result<(), Box> { + /// # let node = NodeBuilder::new().create::()?; + /// # + /// let service = node + /// .service_builder(&"My/Funk/ServiceName".try_into()?) + /// .request_response::<[u64], u64>() + /// .open_or_create()?; + /// + /// let client = service.client_builder() + /// .initial_max_slice_len(32) + /// .create()?; + /// + /// let slice_length = 3; + /// let mut request = client.loan_slice_uninit(slice_length)?; + /// let request = request.write_from_slice(&vec![1,2,3]); + /// + /// let pending_response = request.send()?; + /// + /// # Ok(()) + /// # } + /// ``` + pub fn write_from_slice( + mut self, + value: &[RequestPayload], + ) -> RequestMut { + self.payload_mut().copy_from_slice(unsafe { + core::mem::transmute::<&[RequestPayload], &[MaybeUninit]>(value) + }); + unsafe { self.assume_init() } + } +} diff --git a/iceoryx2/src/response.rs b/iceoryx2/src/response.rs index 9521b898e..7252bf8a1 100644 --- a/iceoryx2/src/response.rs +++ b/iceoryx2/src/response.rs @@ -57,7 +57,7 @@ use crate::service; /// [`Server`](crate::port::server::Server) via the [`Client`](crate::port::client::Client). pub struct Response< Service: crate::service::Service, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > { pub(crate) ptr: RawSample< @@ -71,7 +71,7 @@ pub struct Response< impl< Service: crate::service::Service, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > Drop for Response { @@ -99,7 +99,7 @@ impl< impl< Service: crate::service::Service, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > Debug for Response { @@ -117,7 +117,7 @@ impl< impl< Service: crate::service::Service, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > Deref for Response { @@ -129,7 +129,7 @@ impl< impl< Service: crate::service::Service, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > Response { diff --git a/iceoryx2/src/response_mut.rs b/iceoryx2/src/response_mut.rs index e49513b31..09704b0c1 100644 --- a/iceoryx2/src/response_mut.rs +++ b/iceoryx2/src/response_mut.rs @@ -71,7 +71,7 @@ use crate::{ /// scope. pub struct ResponseMut< Service: service::Service, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > { pub(crate) ptr: RawSampleMut< @@ -91,7 +91,7 @@ pub struct ResponseMut< impl< Service: crate::service::Service, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > Debug for ResponseMut { @@ -112,7 +112,7 @@ impl< impl< Service: crate::service::Service, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > Drop for ResponseMut { @@ -126,7 +126,7 @@ impl< impl< Service: crate::service::Service, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > Deref for ResponseMut { @@ -138,7 +138,7 @@ impl< impl< Service: crate::service::Service, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > DerefMut for ResponseMut { @@ -149,7 +149,7 @@ impl< impl< Service: crate::service::Service, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > ResponseMut { diff --git a/iceoryx2/src/response_mut_uninit.rs b/iceoryx2/src/response_mut_uninit.rs index 2ab941423..013ed72d9 100644 --- a/iceoryx2/src/response_mut_uninit.rs +++ b/iceoryx2/src/response_mut_uninit.rs @@ -59,7 +59,7 @@ use core::{fmt::Debug, mem::MaybeUninit}; /// The generic parameter `Payload` is actually [`core::mem::MaybeUninit`]. pub struct ResponseMutUninit< Service: service::Service, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > { pub(crate) response: ResponseMut, @@ -67,7 +67,7 @@ pub struct ResponseMutUninit< impl< Service: crate::service::Service, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > Debug for ResponseMutUninit { @@ -78,7 +78,7 @@ impl< impl< Service: crate::service::Service, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > ResponseMutUninit { @@ -292,3 +292,131 @@ impl< core::mem::transmute(self.response) } } + +impl< + Service: crate::service::Service, + ResponsePayload: Debug + ZeroCopySend, + ResponseHeader: Debug + ZeroCopySend, + > ResponseMutUninit], ResponseHeader> +{ + /// Converts the [`ResponseMutUninit`] into [`ResponseMut`]. This shall be done after the + /// payload was written into the [`ResponseMutUninit`]. + /// + /// # Safety + /// + /// * Must ensure that the payload was properly initialized. + /// + /// ``` + /// use iceoryx2::prelude::*; + /// # fn main() -> Result<(), Box> { + /// # let node = NodeBuilder::new().create::()?; + /// # + /// # let service = node.service_builder(&"Whatever6".try_into()?) + /// # .request_response::() + /// # .open_or_create()?; + /// # + /// # let client = service.client_builder().create()?; + /// # let server = service.server_builder() + /// .initial_max_slice_len(32) + /// .create()?; + /// # let pending_response = client.send_copy(0)?; + /// # let active_request = server.receive()?.unwrap(); + /// + /// let slice_length = 13; + /// let mut response = active_request.loan_slice_uninit(slice_length)?; + /// for element in response.payload_mut() { + /// element.write(1234); + /// } + /// // this is fine since the payload was initialized to 789 + /// let response = unsafe { response.assume_init() }; + /// + /// # Ok(()) + /// # } + /// ``` + pub unsafe fn assume_init(self) -> ResponseMut { + // the transmute is not nice but safe since MaybeUninit is #[repr(transparent)] to the inner type + core::mem::transmute(self.response) + } + + /// Writes the payload to the [`ResponseMutUninit`] and labels the [`ResponseMutUninit`] as + /// initialized + /// + /// ``` + /// use iceoryx2::prelude::*; + /// # fn main() -> Result<(), Box> { + /// # let node = NodeBuilder::new().create::()?; + /// # + /// # let service = node.service_builder(&"Whatever6".try_into()?) + /// # .request_response::() + /// # .open_or_create()?; + /// # + /// # let client = service.client_builder().create()?; + /// # let server = service.server_builder() + /// .initial_max_slice_len(32) + /// .create()?; + /// # let pending_response = client.send_copy(0)?; + /// # let active_request = server.receive()?.unwrap(); + /// + /// let slice_length = 13; + /// let mut response = active_request.loan_slice_uninit(slice_length)?; + /// let response = response.write_from_fn(|index| index * 2 + 3); + /// response.send()?; + /// + /// # Ok(()) + /// # } + /// ``` + pub fn write_from_fn ResponsePayload>( + mut self, + mut initializer: F, + ) -> ResponseMut { + for (i, element) in self.payload_mut().iter_mut().enumerate() { + element.write(initializer(i)); + } + + // SAFETY: this is safe since the payload was initialized on the line above + unsafe { self.assume_init() } + } +} + +impl< + Service: crate::service::Service, + ResponsePayload: Debug + Copy + ZeroCopySend, + ResponseHeader: Debug + ZeroCopySend, + > ResponseMutUninit], ResponseHeader> +{ + /// Writes the payload by mem copying the provided slice into the [`ResponseMutUninit`]. + /// + /// ``` + /// use iceoryx2::prelude::*; + /// # fn main() -> Result<(), Box> { + /// # let node = NodeBuilder::new().create::()?; + /// # + /// # let service = node.service_builder(&"Whatever6".try_into()?) + /// # .request_response::() + /// # .open_or_create()?; + /// # + /// # let client = service.client_builder().create()?; + /// # let server = service.server_builder() + /// .initial_max_slice_len(32) + /// .create()?; + /// # let pending_response = client.send_copy(0)?; + /// # let active_request = server.receive()?.unwrap(); + /// + /// let slice_length = 4; + /// let mut response = active_request.loan_slice_uninit(slice_length)?; + /// let response = response.write_from_slice(&vec![1, 2, 3, 4]); + /// response.send()?; + /// + /// # Ok(()) + /// # } + /// ``` + pub fn write_from_slice( + mut self, + value: &[ResponsePayload], + ) -> ResponseMut { + self.payload_mut().copy_from_slice(unsafe { + core::mem::transmute::<&[ResponsePayload], &[MaybeUninit]>(value) + }); + unsafe { self.assume_init() } + } +} diff --git a/iceoryx2/src/service/builder/mod.rs b/iceoryx2/src/service/builder/mod.rs index 202494549..39a21cb3d 100644 --- a/iceoryx2/src/service/builder/mod.rs +++ b/iceoryx2/src/service/builder/mod.rs @@ -30,6 +30,7 @@ use crate::service::dynamic_config::RegisterNodeResult; use crate::service::static_config::*; use core::fmt::Debug; use core::marker::PhantomData; +use iceoryx2_bb_derive_macros::ZeroCopySend; use iceoryx2_bb_elementary::enum_gen; use iceoryx2_bb_elementary::zero_copy_send::ZeroCopySend; use iceoryx2_bb_log::fail; @@ -63,6 +64,16 @@ enum ServiceState { Corrupted, } +#[repr(C)] +#[derive(Debug, ZeroCopySend)] +#[doc(hidden)] +pub struct CustomHeaderMarker {} + +#[repr(C)] +#[derive(Debug, ZeroCopySend)] +#[doc(hidden)] +pub struct CustomPayloadMarker(u8); + enum_gen! { #[doc(hidden)] OpenDynamicStorageFailure @@ -121,8 +132,8 @@ impl Builder { /// Create a new builder to create a /// [`MessagingPattern::RequestResponse`](crate::service::messaging_pattern::MessagingPattern::RequestResponse) [`Service`]. pub fn request_response< - RequestPayload: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, + ResponsePayload: Debug + ZeroCopySend + ?Sized, >( self, ) -> request_response::Builder { @@ -180,8 +191,8 @@ impl BuilderWithServiceType { } fn request_response< - RequestPayload: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, + ResponsePayload: Debug + ZeroCopySend + ?Sized, >( self, ) -> request_response::Builder { diff --git a/iceoryx2/src/service/builder/publish_subscribe.rs b/iceoryx2/src/service/builder/publish_subscribe.rs index 732f449f0..e385c5f98 100644 --- a/iceoryx2/src/service/builder/publish_subscribe.rs +++ b/iceoryx2/src/service/builder/publish_subscribe.rs @@ -23,7 +23,6 @@ use crate::service::port_factory::publish_subscribe; use crate::service::static_config::messaging_pattern::MessagingPattern; use crate::service::*; use builder::RETRY_LIMIT; -use iceoryx2_bb_derive_macros::ZeroCopySend; use iceoryx2_bb_elementary::{alignment::Alignment, zero_copy_send::ZeroCopySend}; use iceoryx2_bb_log::{fail, fatal_panic, warn}; use iceoryx2_cal::dynamic_storage::DynamicStorageCreateError; @@ -35,17 +34,7 @@ use self::{ message_type_details::{MessageTypeDetails, TypeDetail, TypeVariant}, }; -use super::{OpenDynamicStorageFailure, ServiceState}; - -#[repr(C)] -#[derive(Debug, ZeroCopySend)] -#[doc(hidden)] -pub struct CustomHeaderMarker {} - -#[repr(C)] -#[derive(Debug, ZeroCopySend)] -#[doc(hidden)] -pub struct CustomPayloadMarker(u8); +use super::{CustomHeaderMarker, CustomPayloadMarker, OpenDynamicStorageFailure, ServiceState}; /// Errors that can occur when an existing [`MessagingPattern::PublishSubscribe`] [`Service`] shall be opened. #[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)] diff --git a/iceoryx2/src/service/builder/request_response.rs b/iceoryx2/src/service/builder/request_response.rs index 2b9477ff9..7f460eeff 100644 --- a/iceoryx2/src/service/builder/request_response.rs +++ b/iceoryx2/src/service/builder/request_response.rs @@ -17,6 +17,7 @@ use crate::prelude::{AttributeSpecifier, AttributeVerifier}; use crate::service::builder::OpenDynamicStorageFailure; use crate::service::dynamic_config::request_response::DynamicConfigSettings; use crate::service::port_factory::request_response; +use crate::service::static_config::message_type_details::TypeDetail; use crate::service::static_config::messaging_pattern::MessagingPattern; use crate::service::{self, header, static_config}; use crate::service::{builder, dynamic_config, Service}; @@ -27,7 +28,7 @@ use iceoryx2_cal::serialize::Serialize; use iceoryx2_cal::static_storage::{StaticStorage, StaticStorageCreateError, StaticStorageLocked}; use super::message_type_details::{MessageTypeDetails, TypeVariant}; -use super::{ServiceState, RETRY_LIMIT}; +use super::{CustomHeaderMarker, CustomPayloadMarker, ServiceState, RETRY_LIMIT}; /// Errors that can occur when an existing [`MessagingPattern::RequestResponse`] [`Service`] shall /// be opened. @@ -214,15 +215,19 @@ enum ServiceAvailabilityState { /// See [`crate::service`] #[derive(Debug)] pub struct Builder< - RequestPayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, ServiceType: Service, > { base: builder::BuilderWithServiceType, override_request_alignment: Option, override_response_alignment: Option, + override_request_payload_type: Option, + override_response_payload_type: Option, + override_request_header_type: Option, + override_response_header_type: Option, verify_enable_safe_overflow_for_requests: bool, verify_enable_safe_overflow_for_responses: bool, verify_max_active_requests_per_client: bool, @@ -241,9 +246,9 @@ pub struct Builder< } impl< - RequestPayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, ServiceType: Service, > Builder @@ -253,6 +258,10 @@ impl< base, override_request_alignment: None, override_response_alignment: None, + override_request_header_type: None, + override_request_payload_type: None, + override_response_header_type: None, + override_response_payload_type: None, verify_enable_safe_overflow_for_requests: false, verify_enable_safe_overflow_for_responses: false, verify_max_loaned_requests: false, @@ -861,18 +870,30 @@ impl< } } - fn prepare_message_type_details(&mut self) { - self.config_details_mut().request_message_type_details = MessageTypeDetails::from::< - header::request_response::RequestHeader, - RequestHeader, - RequestPayload, - >(TypeVariant::FixedSize); + fn prepare_message_type(&mut self) { + if let Some(details) = &self.override_request_payload_type { + self.config_details_mut() + .request_message_type_details + .payload = details.clone(); + } - self.config_details_mut().response_message_type_details = MessageTypeDetails::from::< - header::request_response::ResponseHeader, - ResponseHeader, - ResponsePayload, - >(TypeVariant::FixedSize); + if let Some(details) = &self.override_request_header_type { + self.config_details_mut() + .request_message_type_details + .user_header = details.clone(); + } + + if let Some(details) = &self.override_response_payload_type { + self.config_details_mut() + .response_message_type_details + .payload = details.clone(); + } + + if let Some(details) = &self.override_response_header_type { + self.config_details_mut() + .response_message_type_details + .user_header = details.clone(); + } if let Some(alignment) = self.override_request_alignment { self.config_details_mut() @@ -898,6 +919,31 @@ impl< .max(alignment); } } +} + +impl< + RequestPayload: Debug + ZeroCopySend, + RequestHeader: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend, + ResponseHeader: Debug + ZeroCopySend, + ServiceType: Service, + > Builder +{ + fn prepare_message_type_details(&mut self) { + self.config_details_mut().request_message_type_details = MessageTypeDetails::from::< + header::request_response::RequestHeader, + RequestHeader, + RequestPayload, + >(TypeVariant::FixedSize); + + self.config_details_mut().response_message_type_details = MessageTypeDetails::from::< + header::request_response::ResponseHeader, + ResponseHeader, + ResponsePayload, + >(TypeVariant::FixedSize); + + self.prepare_message_type(); + } /// If the [`Service`] exists, it will be opened otherwise a new [`Service`] will be /// created. @@ -1008,3 +1054,454 @@ impl< self.create_impl(attributes) } } + +impl< + RequestPayload: Debug + ZeroCopySend, + RequestHeader: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend, + ResponseHeader: Debug + ZeroCopySend, + ServiceType: Service, + > Builder<[RequestPayload], RequestHeader, ResponsePayload, ResponseHeader, ServiceType> +{ + fn prepare_message_type_details(&mut self) { + self.config_details_mut().request_message_type_details = MessageTypeDetails::from::< + header::request_response::RequestHeader, + RequestHeader, + RequestPayload, + >(TypeVariant::Dynamic); + + self.config_details_mut().response_message_type_details = MessageTypeDetails::from::< + header::request_response::ResponseHeader, + ResponseHeader, + ResponsePayload, + >(TypeVariant::FixedSize); + + self.prepare_message_type(); + } + + /// If the [`Service`] exists, it will be opened otherwise a new [`Service`] will be + /// created. + pub fn open_or_create( + self, + ) -> Result< + request_response::PortFactory< + ServiceType, + [RequestPayload], + RequestHeader, + ResponsePayload, + ResponseHeader, + >, + RequestResponseOpenOrCreateError, + > { + self.open_or_create_with_attributes(&AttributeVerifier::new()) + } + + /// If the [`Service`] exists, it will be opened otherwise a new [`Service`] will be + /// created. It defines a set of attributes. + /// + /// If the [`Service`] already exists all attribute requirements must be satisfied, + /// and service payload type must be the same, otherwise the open process will fail. + /// If the [`Service`] does not exist the required attributes will be defined in the [`Service`]. + pub fn open_or_create_with_attributes( + mut self, + required_attributes: &AttributeVerifier, + ) -> Result< + request_response::PortFactory< + ServiceType, + [RequestPayload], + RequestHeader, + ResponsePayload, + ResponseHeader, + >, + RequestResponseOpenOrCreateError, + > { + self.prepare_message_type_details(); + self.open_or_create_impl(required_attributes) + } + + /// Opens an existing [`Service`]. + pub fn open( + self, + ) -> Result< + request_response::PortFactory< + ServiceType, + [RequestPayload], + RequestHeader, + ResponsePayload, + ResponseHeader, + >, + RequestResponseOpenError, + > { + self.open_with_attributes(&AttributeVerifier::new()) + } + + /// Opens an existing [`Service`] with attribute requirements. If the defined attribute + /// requirements are not satisfied the open process will fail. + pub fn open_with_attributes( + mut self, + required_attributes: &AttributeVerifier, + ) -> Result< + request_response::PortFactory< + ServiceType, + [RequestPayload], + RequestHeader, + ResponsePayload, + ResponseHeader, + >, + RequestResponseOpenError, + > { + self.prepare_message_type_details(); + self.open_impl(required_attributes) + } + + /// Creates a new [`Service`]. + pub fn create( + self, + ) -> Result< + request_response::PortFactory< + ServiceType, + [RequestPayload], + RequestHeader, + ResponsePayload, + ResponseHeader, + >, + RequestResponseCreateError, + > { + self.create_with_attributes(&AttributeSpecifier::new()) + } + + /// Creates a new [`Service`] with a set of attributes. + pub fn create_with_attributes( + mut self, + attributes: &AttributeSpecifier, + ) -> Result< + request_response::PortFactory< + ServiceType, + [RequestPayload], + RequestHeader, + ResponsePayload, + ResponseHeader, + >, + RequestResponseCreateError, + > { + self.prepare_message_type_details(); + self.create_impl(attributes) + } +} + +impl< + RequestPayload: Debug + ZeroCopySend, + RequestHeader: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend, + ResponseHeader: Debug + ZeroCopySend, + ServiceType: Service, + > Builder<[RequestPayload], RequestHeader, [ResponsePayload], ResponseHeader, ServiceType> +{ + fn prepare_message_type_details(&mut self) { + self.config_details_mut().request_message_type_details = MessageTypeDetails::from::< + header::request_response::RequestHeader, + RequestHeader, + RequestPayload, + >(TypeVariant::Dynamic); + + self.config_details_mut().response_message_type_details = MessageTypeDetails::from::< + header::request_response::ResponseHeader, + ResponseHeader, + ResponsePayload, + >(TypeVariant::Dynamic); + + self.prepare_message_type(); + } + + /// If the [`Service`] exists, it will be opened otherwise a new [`Service`] will be + /// created. + #[allow(clippy::type_complexity)] // type alias would require 5 generic parameters which hardly reduces complexity + pub fn open_or_create( + self, + ) -> Result< + request_response::PortFactory< + ServiceType, + [RequestPayload], + RequestHeader, + [ResponsePayload], + ResponseHeader, + >, + RequestResponseOpenOrCreateError, + > { + self.open_or_create_with_attributes(&AttributeVerifier::new()) + } + + /// If the [`Service`] exists, it will be opened otherwise a new [`Service`] will be + /// created. It defines a set of attributes. + /// + /// If the [`Service`] already exists all attribute requirements must be satisfied, + /// and service payload type must be the same, otherwise the open process will fail. + /// If the [`Service`] does not exist the required attributes will be defined in the [`Service`]. + #[allow(clippy::type_complexity)] // type alias would require 5 generic parameters which hardly reduces complexity + pub fn open_or_create_with_attributes( + mut self, + required_attributes: &AttributeVerifier, + ) -> Result< + request_response::PortFactory< + ServiceType, + [RequestPayload], + RequestHeader, + [ResponsePayload], + ResponseHeader, + >, + RequestResponseOpenOrCreateError, + > { + self.prepare_message_type_details(); + self.open_or_create_impl(required_attributes) + } + + /// Opens an existing [`Service`]. + #[allow(clippy::type_complexity)] // type alias would require 5 generic parameters which hardly reduces complexity + pub fn open( + self, + ) -> Result< + request_response::PortFactory< + ServiceType, + [RequestPayload], + RequestHeader, + [ResponsePayload], + ResponseHeader, + >, + RequestResponseOpenError, + > { + self.open_with_attributes(&AttributeVerifier::new()) + } + + /// Opens an existing [`Service`] with attribute requirements. If the defined attribute + /// requirements are not satisfied the open process will fail. + #[allow(clippy::type_complexity)] // type alias would require 5 generic parameters which hardly reduces complexity + pub fn open_with_attributes( + mut self, + required_attributes: &AttributeVerifier, + ) -> Result< + request_response::PortFactory< + ServiceType, + [RequestPayload], + RequestHeader, + [ResponsePayload], + ResponseHeader, + >, + RequestResponseOpenError, + > { + self.prepare_message_type_details(); + self.open_impl(required_attributes) + } + + /// Creates a new [`Service`]. + #[allow(clippy::type_complexity)] // type alias would require 5 generic parameters which hardly reduces complexity + pub fn create( + self, + ) -> Result< + request_response::PortFactory< + ServiceType, + [RequestPayload], + RequestHeader, + [ResponsePayload], + ResponseHeader, + >, + RequestResponseCreateError, + > { + self.create_with_attributes(&AttributeSpecifier::new()) + } + + /// Creates a new [`Service`] with a set of attributes. + #[allow(clippy::type_complexity)] // type alias would require 5 generic parameters which hardly reduces complexity + pub fn create_with_attributes( + mut self, + attributes: &AttributeSpecifier, + ) -> Result< + request_response::PortFactory< + ServiceType, + [RequestPayload], + RequestHeader, + [ResponsePayload], + ResponseHeader, + >, + RequestResponseCreateError, + > { + self.prepare_message_type_details(); + self.create_impl(attributes) + } +} + +impl< + RequestPayload: Debug + ZeroCopySend, + RequestHeader: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend, + ResponseHeader: Debug + ZeroCopySend, + ServiceType: Service, + > Builder +{ + fn prepare_message_type_details(&mut self) { + self.config_details_mut().request_message_type_details = MessageTypeDetails::from::< + header::request_response::RequestHeader, + RequestHeader, + RequestPayload, + >(TypeVariant::FixedSize); + + self.config_details_mut().response_message_type_details = MessageTypeDetails::from::< + header::request_response::ResponseHeader, + ResponseHeader, + ResponsePayload, + >(TypeVariant::Dynamic); + + self.prepare_message_type(); + } + + /// If the [`Service`] exists, it will be opened otherwise a new [`Service`] will be + /// created. + pub fn open_or_create( + self, + ) -> Result< + request_response::PortFactory< + ServiceType, + RequestPayload, + RequestHeader, + [ResponsePayload], + ResponseHeader, + >, + RequestResponseOpenOrCreateError, + > { + self.open_or_create_with_attributes(&AttributeVerifier::new()) + } + + /// If the [`Service`] exists, it will be opened otherwise a new [`Service`] will be + /// created. It defines a set of attributes. + /// + /// If the [`Service`] already exists all attribute requirements must be satisfied, + /// and service payload type must be the same, otherwise the open process will fail. + /// If the [`Service`] does not exist the required attributes will be defined in the [`Service`]. + pub fn open_or_create_with_attributes( + mut self, + required_attributes: &AttributeVerifier, + ) -> Result< + request_response::PortFactory< + ServiceType, + RequestPayload, + RequestHeader, + [ResponsePayload], + ResponseHeader, + >, + RequestResponseOpenOrCreateError, + > { + self.prepare_message_type_details(); + self.open_or_create_impl(required_attributes) + } + + /// Opens an existing [`Service`]. + pub fn open( + self, + ) -> Result< + request_response::PortFactory< + ServiceType, + RequestPayload, + RequestHeader, + [ResponsePayload], + ResponseHeader, + >, + RequestResponseOpenError, + > { + self.open_with_attributes(&AttributeVerifier::new()) + } + + /// Opens an existing [`Service`] with attribute requirements. If the defined attribute + /// requirements are not satisfied the open process will fail. + pub fn open_with_attributes( + mut self, + required_attributes: &AttributeVerifier, + ) -> Result< + request_response::PortFactory< + ServiceType, + RequestPayload, + RequestHeader, + [ResponsePayload], + ResponseHeader, + >, + RequestResponseOpenError, + > { + self.prepare_message_type_details(); + self.open_impl(required_attributes) + } + + /// Creates a new [`Service`]. + pub fn create( + self, + ) -> Result< + request_response::PortFactory< + ServiceType, + RequestPayload, + RequestHeader, + [ResponsePayload], + ResponseHeader, + >, + RequestResponseCreateError, + > { + self.create_with_attributes(&AttributeSpecifier::new()) + } + + /// Creates a new [`Service`] with a set of attributes. + pub fn create_with_attributes( + mut self, + attributes: &AttributeSpecifier, + ) -> Result< + request_response::PortFactory< + ServiceType, + RequestPayload, + RequestHeader, + [ResponsePayload], + ResponseHeader, + >, + RequestResponseCreateError, + > { + self.prepare_message_type_details(); + self.create_impl(attributes) + } +} + +impl + Builder< + [CustomPayloadMarker], + CustomHeaderMarker, + [CustomPayloadMarker], + CustomHeaderMarker, + ServiceType, + > +{ + #[doc(hidden)] + pub unsafe fn __internal_set_request_payload_type_details( + mut self, + value: &TypeDetail, + ) -> Self { + self.override_request_payload_type = Some(value.clone()); + self + } + + #[doc(hidden)] + pub unsafe fn __internal_set_response_payload_type_details( + mut self, + value: &TypeDetail, + ) -> Self { + self.override_response_payload_type = Some(value.clone()); + self + } + + #[doc(hidden)] + pub unsafe fn __internal_set_request_header_type_details(mut self, value: &TypeDetail) -> Self { + self.override_request_header_type = Some(value.clone()); + self + } + + #[doc(hidden)] + pub unsafe fn __internal_set_response_header_type_details( + mut self, + value: &TypeDetail, + ) -> Self { + self.override_response_header_type = Some(value.clone()); + self + } +} diff --git a/iceoryx2/src/service/dynamic_config/request_response.rs b/iceoryx2/src/service/dynamic_config/request_response.rs index 7a38b030b..d3f527d4a 100644 --- a/iceoryx2/src/service/dynamic_config/request_response.rs +++ b/iceoryx2/src/service/dynamic_config/request_response.rs @@ -18,7 +18,10 @@ use iceoryx2_bb_memory::bump_allocator::BumpAllocator; use crate::{ node::NodeId, - port::port_identifiers::{UniqueClientId, UniquePortId, UniqueServerId}, + port::{ + details::data_segment::DataSegmentType, + port_identifiers::{UniqueClientId, UniquePortId, UniqueServerId}, + }, }; use super::PortCleanupAction; @@ -30,6 +33,9 @@ pub struct ServerDetails { pub server_port_id: UniqueServerId, pub request_buffer_size: usize, pub number_of_responses: usize, + pub max_slice_len: usize, + pub data_segment_type: DataSegmentType, + pub max_number_of_segments: u8, } #[doc(hidden)] @@ -40,6 +46,9 @@ pub struct ClientDetails { pub node_id: NodeId, pub number_of_requests: usize, pub response_buffer_size: usize, + pub max_slice_len: usize, + pub data_segment_type: DataSegmentType, + pub max_number_of_segments: u8, } #[repr(C)] diff --git a/iceoryx2/src/service/header/publish_subscribe.rs b/iceoryx2/src/service/header/publish_subscribe.rs index 872ab2ae3..ad6c385ed 100644 --- a/iceoryx2/src/service/header/publish_subscribe.rs +++ b/iceoryx2/src/service/header/publish_subscribe.rs @@ -60,7 +60,7 @@ impl Header { /// Returns how many elements are stored inside the sample's payload. /// /// # Details when using - /// [`CustomPayloadMarker`](crate::service::builder::publish_subscribe::CustomPayloadMarker) + /// [`CustomPayloadMarker`](crate::service::builder::CustomPayloadMarker) /// /// In this case the number of elements relates to the element defined in the /// [`MessageTypeDetails`](crate::service::static_config::message_type_details::MessageTypeDetails). diff --git a/iceoryx2/src/service/header/request_response.rs b/iceoryx2/src/service/header/request_response.rs index 9dc464556..165818972 100644 --- a/iceoryx2/src/service/header/request_response.rs +++ b/iceoryx2/src/service/header/request_response.rs @@ -23,6 +23,7 @@ pub struct RequestHeader { pub(crate) client_port_id: UniqueClientId, pub(crate) channel_id: ChannelId, pub(crate) request_id: u64, + pub(crate) number_of_elements: u64, } impl RequestHeader { @@ -31,6 +32,19 @@ impl RequestHeader { pub fn client_port_id(&self) -> UniqueClientId { self.client_port_id } + + /// Returns how many elements are stored inside the requests's payload. + /// + /// # Details when using + /// [`CustomPayloadMarker`](crate::service::builder::CustomPayloadMarker) + /// + /// In this case the number of elements relates to the element defined in the + /// [`MessageTypeDetails`](crate::service::static_config::message_type_details::MessageTypeDetails). + /// When the element has a `payload.size == 40` and the `RequestMut::payload().len() == 120` it + /// means that it contains 3 elements (3 * 40 == 120). + pub fn number_of_elements(&self) -> u64 { + self.number_of_elements + } } /// Response header used by @@ -40,6 +54,7 @@ impl RequestHeader { pub struct ResponseHeader { pub(crate) server_port_id: UniqueServerId, pub(crate) request_id: u64, + pub(crate) number_of_elements: u64, } impl ResponseHeader { @@ -48,4 +63,17 @@ impl ResponseHeader { pub fn server_port_id(&self) -> UniqueServerId { self.server_port_id } + + /// Returns how many elements are stored inside the response's payload. + /// + /// # Details when using + /// [`CustomPayloadMarker`](crate::service::builder::CustomPayloadMarker) + /// + /// In this case the number of elements relates to the element defined in the + /// [`MessageTypeDetails`](crate::service::static_config::message_type_details::MessageTypeDetails). + /// When the element has a `payload.size == 40` and the `ResponseMut::payload().len() == 120` it + /// means that it contains 3 elements (3 * 40 == 120). + pub fn number_of_elements(&self) -> u64 { + self.number_of_elements + } } diff --git a/iceoryx2/src/service/port_factory/client.rs b/iceoryx2/src/service/port_factory/client.rs index 79494e982..d207d88a3 100644 --- a/iceoryx2/src/service/port_factory/client.rs +++ b/iceoryx2/src/service/port_factory/client.rs @@ -38,6 +38,7 @@ use crate::{ use core::fmt::Debug; use iceoryx2_bb_elementary::zero_copy_send::ZeroCopySend; use iceoryx2_bb_log::fail; +use iceoryx2_cal::shm_allocator::AllocationStrategy; /// Defines a failure that can occur when a [`Client`] is created with /// [`crate::service::port_factory::client::PortFactoryClient`]. @@ -60,6 +61,13 @@ impl core::fmt::Display for ClientCreateError { impl core::error::Error for ClientCreateError {} +#[derive(Debug, Clone, Copy)] +pub(crate) struct LocalClientConfig { + pub(crate) unable_to_deliver_strategy: UnableToDeliverStrategy, + pub(crate) initial_max_slice_len: usize, + pub(crate) allocation_strategy: AllocationStrategy, +} + /// Factory to create a new [`Client`] port/endpoint for /// [`MessagingPattern::RequestResponse`](crate::service::messaging_pattern::MessagingPattern::RequestResponse) /// based communication. @@ -67,11 +75,14 @@ impl core::error::Error for ClientCreateError {} pub struct PortFactoryClient< 'factory, Service: service::Service, - RequestPayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > { + pub(crate) config: LocalClientConfig, + pub(crate) request_degradation_callback: Option>, + pub(crate) response_degradation_callback: Option>, pub(crate) factory: &'factory PortFactory< Service, RequestPayload, @@ -79,17 +90,14 @@ pub struct PortFactoryClient< ResponsePayload, ResponseHeader, >, - pub(crate) unable_to_deliver_strategy: UnableToDeliverStrategy, - pub(crate) request_degradation_callback: Option>, - pub(crate) response_degradation_callback: Option>, } impl< 'factory, Service: service::Service, - RequestPayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > PortFactoryClient< @@ -119,10 +127,14 @@ impl< .request_response; Self { - factory, - unable_to_deliver_strategy: defs.client_unable_to_deliver_strategy, + config: LocalClientConfig { + unable_to_deliver_strategy: defs.client_unable_to_deliver_strategy, + initial_max_slice_len: 1, + allocation_strategy: AllocationStrategy::Static, + }, request_degradation_callback: None, response_degradation_callback: None, + factory, } } @@ -131,7 +143,7 @@ impl< /// [`RequestMut`](crate::request_mut::RequestMut) since /// its internal buffer is full. pub fn unable_to_deliver_strategy(mut self, value: UnableToDeliverStrategy) -> Self { - self.unable_to_deliver_strategy = value; + self.config.unable_to_deliver_strategy = value; self } @@ -184,3 +196,29 @@ impl< "Failed to create new Client port.")) } } + +impl< + Service: service::Service, + RequestPayload: Debug + ZeroCopySend, + RequestHeader: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, + ResponseHeader: Debug + ZeroCopySend, + > + PortFactoryClient<'_, Service, [RequestPayload], RequestHeader, ResponsePayload, ResponseHeader> +{ + /// Sets the maximum slice length that a user can allocate with + /// [`Client::loan_slice()`] or [`Client::loan_slice_uninit()`]. + pub fn initial_max_slice_len(mut self, value: usize) -> Self { + self.config.initial_max_slice_len = value; + self + } + + /// Defines the allocation strategy that is used when the provided + /// [`PortFactoryClient::initial_max_slice_len()`] is exhausted. This happens when the user + /// acquires a more than max slice len in [`Client::loan_slice()`] or + /// [`Client::loan_slice_uninit()`]. + pub fn allocation_strategy(mut self, value: AllocationStrategy) -> Self { + self.config.allocation_strategy = value; + self + } +} diff --git a/iceoryx2/src/service/port_factory/request_response.rs b/iceoryx2/src/service/port_factory/request_response.rs index 90f99985c..16fdbf181 100644 --- a/iceoryx2/src/service/port_factory/request_response.rs +++ b/iceoryx2/src/service/port_factory/request_response.rs @@ -61,9 +61,9 @@ use super::{client::PortFactoryClient, nodes, server::PortFactoryServer}; #[derive(Debug)] pub struct PortFactory< Service: service::Service, - RequestPayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > { pub(crate) service: Service, @@ -75,9 +75,9 @@ pub struct PortFactory< unsafe impl< Service: service::Service, - RequestPayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > Send for PortFactory @@ -96,9 +96,9 @@ unsafe impl< impl< Service: service::Service, - RequestPayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > crate::service::port_factory::PortFactory for PortFactory @@ -148,9 +148,9 @@ impl< impl< Service: service::Service, - RequestPayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > PortFactory { diff --git a/iceoryx2/src/service/port_factory/server.rs b/iceoryx2/src/service/port_factory/server.rs index 4cc418bd4..789494fe8 100644 --- a/iceoryx2/src/service/port_factory/server.rs +++ b/iceoryx2/src/service/port_factory/server.rs @@ -38,6 +38,14 @@ use crate::{ use core::fmt::Debug; use iceoryx2_bb_elementary::zero_copy_send::ZeroCopySend; use iceoryx2_bb_log::{fail, warn}; +use iceoryx2_cal::shm_allocator::AllocationStrategy; + +#[derive(Debug, Clone, Copy)] +pub(crate) struct LocalServerConfig { + pub(crate) unable_to_deliver_strategy: UnableToDeliverStrategy, + pub(crate) initial_max_slice_len: usize, + pub(crate) allocation_strategy: AllocationStrategy, +} /// Defines a failure that can occur when a [`Server`] is created with /// [`crate::service::port_factory::server::PortFactoryServer`]. @@ -65,9 +73,9 @@ impl core::error::Error for ServerCreateError {} pub struct PortFactoryServer< 'factory, Service: service::Service, - RequestPayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > { pub(crate) factory: &'factory PortFactory< @@ -78,8 +86,8 @@ pub struct PortFactoryServer< ResponseHeader, >, + pub(crate) config: LocalServerConfig, pub(crate) max_loaned_responses_per_request: usize, - pub(crate) unable_to_deliver_strategy: UnableToDeliverStrategy, pub(crate) request_degradation_callback: Option>, pub(crate) response_degradation_callback: Option>, } @@ -87,9 +95,9 @@ pub struct PortFactoryServer< impl< 'factory, Service: service::Service, - RequestPayload: Debug + ZeroCopySend, + RequestPayload: Debug + ZeroCopySend + ?Sized, RequestHeader: Debug + ZeroCopySend, - ResponsePayload: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend + ?Sized, ResponseHeader: Debug + ZeroCopySend, > PortFactoryServer< @@ -121,7 +129,11 @@ impl< Self { factory, max_loaned_responses_per_request: defs.server_max_loaned_responses_per_request, - unable_to_deliver_strategy: defs.server_unable_to_deliver_strategy, + config: LocalServerConfig { + unable_to_deliver_strategy: defs.server_unable_to_deliver_strategy, + initial_max_slice_len: 1, + allocation_strategy: AllocationStrategy::Static, + }, request_degradation_callback: None, response_degradation_callback: None, } @@ -132,7 +144,7 @@ impl< /// [`Response`](crate::response::Response) since /// its internal buffer is full. pub fn unable_to_deliver_strategy(mut self, value: UnableToDeliverStrategy) -> Self { - self.unable_to_deliver_strategy = value; + self.config.unable_to_deliver_strategy = value; self } @@ -198,3 +210,31 @@ impl< "Failed to create new Server port.")) } } + +impl< + Service: service::Service, + RequestPayload: Debug + ZeroCopySend + ?Sized, + RequestHeader: Debug + ZeroCopySend, + ResponsePayload: Debug + ZeroCopySend, + ResponseHeader: Debug + ZeroCopySend, + > + PortFactoryServer<'_, Service, RequestPayload, RequestHeader, [ResponsePayload], ResponseHeader> +{ + /// Sets the maximum slice length that a user can allocate with + /// [`ActiveRequest::loan_slice()`](crate::active_request::ActiveRequest::loan_slice()) or + /// [`ActiveRequest::loan_slice_uninit()`](crate::active_request::ActiveRequest::loan_slice_uninit()). + pub fn initial_max_slice_len(mut self, value: usize) -> Self { + self.config.initial_max_slice_len = value; + self + } + + /// Defines the allocation strategy that is used when the provided + /// [`PortFactoryServer::initial_max_slice_len()`] is exhausted. This happens when the user + /// acquires a more than max slice len in + /// [`ActiveRequest::loan_slice()`](crate::active_request::ActiveRequest::loan_slice()) or + /// [`ActiveRequest::loan_slice_uninit()`](crate::active_request::ActiveRequest::loan_slice_uninit()). + pub fn allocation_strategy(mut self, value: AllocationStrategy) -> Self { + self.config.allocation_strategy = value; + self + } +} diff --git a/iceoryx2/tests/client_tests.rs b/iceoryx2/tests/client_tests.rs index 4bfed026c..828fe327a 100644 --- a/iceoryx2/tests/client_tests.rs +++ b/iceoryx2/tests/client_tests.rs @@ -159,6 +159,8 @@ mod client { .create() .unwrap(); + assert_that!(sut.unable_to_deliver_strategy(), eq UnableToDeliverStrategy::Block); + let request = sut.send_copy(123); assert_that!(request, is_ok); drop(request); @@ -197,6 +199,8 @@ mod client { .create() .unwrap(); + assert_that!(sut.unable_to_deliver_strategy(), eq UnableToDeliverStrategy::DiscardSample); + let request = sut.send_copy(123); assert_that!(request, is_ok); let _request = sut.send_copy(456); diff --git a/iceoryx2/tests/publisher_tests.rs b/iceoryx2/tests/publisher_tests.rs index 81356e8e8..120acee9a 100644 --- a/iceoryx2/tests/publisher_tests.rs +++ b/iceoryx2/tests/publisher_tests.rs @@ -19,7 +19,7 @@ mod publisher { use iceoryx2::port::{publisher::PublisherCreateError, LoanError}; use iceoryx2::prelude::*; - use iceoryx2::service::builder::publish_subscribe::CustomPayloadMarker; + use iceoryx2::service::builder::CustomPayloadMarker; use iceoryx2::service::static_config::message_type_details::{TypeDetail, TypeVariant}; use iceoryx2::service::{service_name::ServiceName, Service}; use iceoryx2::testing::*; diff --git a/iceoryx2/tests/service_publish_subscribe_tests.rs b/iceoryx2/tests/service_publish_subscribe_tests.rs index 574b047e0..81677a41a 100644 --- a/iceoryx2/tests/service_publish_subscribe_tests.rs +++ b/iceoryx2/tests/service_publish_subscribe_tests.rs @@ -24,7 +24,7 @@ mod service_publish_subscribe { use iceoryx2::prelude::{AllocationStrategy, *}; use iceoryx2::service::builder::publish_subscribe::PublishSubscribeCreateError; use iceoryx2::service::builder::publish_subscribe::PublishSubscribeOpenError; - use iceoryx2::service::builder::publish_subscribe::{CustomHeaderMarker, CustomPayloadMarker}; + use iceoryx2::service::builder::{CustomHeaderMarker, CustomPayloadMarker}; use iceoryx2::service::messaging_pattern::MessagingPattern; use iceoryx2::service::static_config::message_type_details::{TypeDetail, TypeVariant}; use iceoryx2::service::{Service, ServiceDetails}; diff --git a/iceoryx2/tests/service_request_response_builder_tests.rs b/iceoryx2/tests/service_request_response_builder_tests.rs index 12b0fddbb..c05b9fd9a 100644 --- a/iceoryx2/tests/service_request_response_builder_tests.rs +++ b/iceoryx2/tests/service_request_response_builder_tests.rs @@ -1144,6 +1144,87 @@ mod service_request_response { assert_that!(sut.static_config().response_message_type_details().payload.alignment, eq core::mem::align_of::()); } + #[test] + fn create_service_with_request_slice_type_works() { + let config = generate_isolated_config(); + let service_name = generate_service_name(); + let node = NodeBuilder::new().config(&config).create::().unwrap(); + + let sut_create = node + .service_builder(&service_name) + .request_response::<[u64], u64>() + .create(); + assert_that!(sut_create, is_ok); + + let sut_open_fail = node + .service_builder(&service_name) + .request_response::() + .open(); + assert_that!(sut_open_fail.err(), eq Some(RequestResponseOpenError::IncompatibleRequestType)); + + let sut_open = node + .service_builder(&service_name) + .request_response::<[u64], u64>() + .open(); + assert_that!(sut_open, is_ok); + } + + #[test] + fn create_service_with_response_slice_type_works() { + let config = generate_isolated_config(); + let service_name = generate_service_name(); + let node = NodeBuilder::new().config(&config).create::().unwrap(); + + let sut_create = node + .service_builder(&service_name) + .request_response::() + .create(); + assert_that!(sut_create, is_ok); + + let sut_open_fail = node + .service_builder(&service_name) + .request_response::() + .open(); + assert_that!(sut_open_fail.err(), eq Some(RequestResponseOpenError::IncompatibleResponseType)); + + let sut_open = node + .service_builder(&service_name) + .request_response::() + .open(); + assert_that!(sut_open, is_ok); + } + + #[test] + fn create_service_with_request_and_response_slice_type_works() { + let config = generate_isolated_config(); + let service_name = generate_service_name(); + let node = NodeBuilder::new().config(&config).create::().unwrap(); + + let sut_create = node + .service_builder(&service_name) + .request_response::<[u64], [u64]>() + .create(); + assert_that!(sut_create, is_ok); + + let sut_open_fail = node + .service_builder(&service_name) + .request_response::<[u64], u64>() + .open(); + assert_that!(sut_open_fail.err(), eq Some(RequestResponseOpenError::IncompatibleResponseType)); + + let sut_open_fail = node + .service_builder(&service_name) + .request_response::() + .open(); + assert_that!(sut_open_fail.err(), eq Some(RequestResponseOpenError::IncompatibleRequestType)); + + let sut_open = node + .service_builder(&service_name) + .request_response::<[u64], [u64]>() + .open(); + assert_that!(sut_open, is_ok); + } + #[instantiate_tests()] mod ipc {} diff --git a/iceoryx2/tests/service_request_response_tests.rs b/iceoryx2/tests/service_request_response_tests.rs index aeb4c93c7..4d0653d7e 100644 --- a/iceoryx2/tests/service_request_response_tests.rs +++ b/iceoryx2/tests/service_request_response_tests.rs @@ -17,7 +17,10 @@ mod service_request_response { use iceoryx2::node::NodeBuilder; use iceoryx2::port::client::Client; use iceoryx2::port::server::Server; + use iceoryx2::port::LoanError; use iceoryx2::prelude::{PortFactory, *}; + use iceoryx2::service::builder::{CustomHeaderMarker, CustomPayloadMarker}; + use iceoryx2::service::static_config::message_type_details::{TypeDetail, TypeVariant}; use iceoryx2::testing::*; use iceoryx2_bb_testing::assert_that; @@ -889,6 +892,380 @@ mod service_request_response { } } + #[test] + fn sending_requests_with_custom_payload_works() { + const NUMBER_OF_ELEMENTS: usize = 1; + let service_name = generate_service_name(); + let config = generate_isolated_config(); + let node = NodeBuilder::new().config(&config).create::().unwrap(); + let mut type_details = TypeDetail::__internal_new::(TypeVariant::FixedSize); + type_details.size = 1024; + type_details.alignment = 1024; + + let service_1 = unsafe { + node.service_builder(&service_name) + .request_response::<[CustomPayloadMarker], [CustomPayloadMarker]>() + .request_user_header::() + .response_user_header::() + .__internal_set_request_payload_type_details(&type_details) + .create() + .unwrap() + }; + let service_2 = unsafe { + node.service_builder(&service_name) + .request_response::<[CustomPayloadMarker], [CustomPayloadMarker]>() + .request_user_header::() + .response_user_header::() + .__internal_set_request_payload_type_details(&type_details) + .open() + .unwrap() + }; + + let server = service_1.server_builder().create().unwrap(); + let client = service_2.client_builder().create().unwrap(); + + let mut request = unsafe { client.loan_custom_payload(NUMBER_OF_ELEMENTS).unwrap() }; + assert_that!(request.payload(), len type_details.size); + assert_that!((request.payload().as_ptr() as usize % type_details.alignment), eq 0); + assert_that!(request.header().number_of_elements(), eq NUMBER_OF_ELEMENTS as u64); + let payload_ptr = request.payload_mut().as_mut_ptr() as *mut u8; + for n in 0..type_details.size { + unsafe { payload_ptr.add(n).write((n % 255) as u8) }; + } + + let _pending_response = unsafe { request.assume_init().send().unwrap() }; + + let active_request = unsafe { server.receive_custom_payload().unwrap().unwrap() }; + assert_that!(active_request.payload(), len type_details.size); + assert_that!((active_request.payload().as_ptr() as usize % type_details.alignment), eq 0); + assert_that!(active_request.header().number_of_elements(), eq NUMBER_OF_ELEMENTS as u64); + let payload_ptr = active_request.payload().as_ptr() as *const u8; + for n in 0..type_details.size { + assert_that!(unsafe { *payload_ptr.add(n) }, eq(n % 255) as u8); + } + } + + #[test] + fn sending_response_with_custom_payload_works() { + const NUMBER_OF_ELEMENTS: usize = 1; + let service_name = generate_service_name(); + let config = generate_isolated_config(); + let node = NodeBuilder::new().config(&config).create::().unwrap(); + let mut type_details = TypeDetail::__internal_new::(TypeVariant::FixedSize); + type_details.size = 512; + type_details.alignment = 256; + + let service_1 = unsafe { + node.service_builder(&service_name) + .request_response::<[CustomPayloadMarker], [CustomPayloadMarker]>() + .request_user_header::() + .response_user_header::() + .__internal_set_response_payload_type_details(&type_details) + .create() + .unwrap() + }; + let service_2 = unsafe { + node.service_builder(&service_name) + .request_response::<[CustomPayloadMarker], [CustomPayloadMarker]>() + .request_user_header::() + .response_user_header::() + .__internal_set_response_payload_type_details(&type_details) + .open() + .unwrap() + }; + + let server = service_1.server_builder().create().unwrap(); + let client = service_2.client_builder().create().unwrap(); + + let request = unsafe { client.loan_custom_payload(NUMBER_OF_ELEMENTS).unwrap() }; + let pending_response = unsafe { request.assume_init().send().unwrap() }; + let active_request = unsafe { server.receive_custom_payload().unwrap().unwrap() }; + + let mut response = unsafe { + active_request + .loan_custom_payload(NUMBER_OF_ELEMENTS) + .unwrap() + }; + assert_that!(response.payload(), len type_details.size); + assert_that!((response.payload().as_ptr() as usize % type_details.alignment), eq 0); + assert_that!(response.header().number_of_elements(), eq NUMBER_OF_ELEMENTS as u64); + let payload_ptr = response.payload_mut().as_mut_ptr() as *mut u8; + for n in 0..type_details.size { + unsafe { payload_ptr.add(n).write((n % 89) as u8) }; + } + + unsafe { response.assume_init().send().unwrap() }; + + let response = unsafe { pending_response.receive_custom_payload().unwrap().unwrap() }; + assert_that!(response.payload(), len type_details.size); + assert_that!((response.payload().as_ptr() as usize % type_details.alignment), eq 0); + assert_that!(response.header().number_of_elements(), eq NUMBER_OF_ELEMENTS as u64); + let payload_ptr = response.payload().as_ptr() as *const u8; + for n in 0..type_details.size { + assert_that!(unsafe { *payload_ptr.add(n) }, eq(n % 89) as u8); + } + } + + #[test] + fn sending_requests_with_custom_header_works() { + let service_name = generate_service_name(); + let config = generate_isolated_config(); + let node = NodeBuilder::new().config(&config).create::().unwrap(); + let mut type_details = TypeDetail::__internal_new::(TypeVariant::FixedSize); + type_details.size = 2048; + type_details.alignment = 8; + + let service_1 = unsafe { + node.service_builder(&service_name) + .request_response::<[CustomPayloadMarker], [CustomPayloadMarker]>() + .request_user_header::() + .response_user_header::() + .__internal_set_request_header_type_details(&type_details) + .create() + .unwrap() + }; + let service_2 = unsafe { + node.service_builder(&service_name) + .request_response::<[CustomPayloadMarker], [CustomPayloadMarker]>() + .request_user_header::() + .response_user_header::() + .__internal_set_request_header_type_details(&type_details) + .open() + .unwrap() + }; + + let server = service_1.server_builder().create().unwrap(); + let client = service_2.client_builder().create().unwrap(); + + let mut request = unsafe { client.loan_custom_payload(1).unwrap() }; + let header_ptr = (request.user_header_mut() as *mut CustomHeaderMarker) as *mut u8; + for n in 0..type_details.size { + unsafe { header_ptr.add(n).write((n % 231) as u8) }; + } + let _pending_response = unsafe { request.assume_init().send().unwrap() }; + + let active_request = unsafe { server.receive_custom_payload().unwrap().unwrap() }; + let header_ptr = (active_request.user_header() as *const CustomHeaderMarker) as *const u8; + for n in 0..type_details.size { + assert_that!(unsafe { *header_ptr.add(n) }, eq(n % 231) as u8); + } + } + + #[test] + fn sending_response_with_custom_header_works() { + let service_name = generate_service_name(); + let config = generate_isolated_config(); + let node = NodeBuilder::new().config(&config).create::().unwrap(); + let mut type_details = TypeDetail::__internal_new::(TypeVariant::FixedSize); + type_details.size = 4096; + type_details.alignment = 32; + + let service_1 = unsafe { + node.service_builder(&service_name) + .request_response::<[CustomPayloadMarker], [CustomPayloadMarker]>() + .request_user_header::() + .response_user_header::() + .__internal_set_response_header_type_details(&type_details) + .create() + .unwrap() + }; + let service_2 = unsafe { + node.service_builder(&service_name) + .request_response::<[CustomPayloadMarker], [CustomPayloadMarker]>() + .request_user_header::() + .response_user_header::() + .__internal_set_response_header_type_details(&type_details) + .open() + .unwrap() + }; + + let server = service_1.server_builder().create().unwrap(); + let client = service_2.client_builder().create().unwrap(); + + let request = unsafe { client.loan_custom_payload(1).unwrap() }; + let pending_response = unsafe { request.assume_init().send().unwrap() }; + let active_request = unsafe { server.receive_custom_payload().unwrap().unwrap() }; + + let mut response = unsafe { active_request.loan_custom_payload(1).unwrap() }; + let header_ptr = (response.user_header_mut() as *mut CustomHeaderMarker) as *mut u8; + for n in 0..type_details.size { + unsafe { header_ptr.add(n).write((n % 229) as u8) }; + } + unsafe { response.assume_init().send().unwrap() }; + + let response = unsafe { pending_response.receive_custom_payload().unwrap().unwrap() }; + let header_ptr = (response.user_header() as *const CustomHeaderMarker) as *const u8; + for n in 0..type_details.size { + assert_that!(unsafe { *header_ptr.add(n) }, eq(n % 229) as u8); + } + } + + #[test] + fn send_increasing_requests_with_static_allocation_strategy_fails() { + const SLICE_SIZE: usize = 1024; + let service_name = generate_service_name(); + let config = generate_isolated_config(); + let node = NodeBuilder::new().config(&config).create::().unwrap(); + + let service = node + .service_builder(&service_name) + .request_response::<[u8], u64>() + .create() + .unwrap(); + + let client = service + .client_builder() + .initial_max_slice_len(SLICE_SIZE) + .allocation_strategy(AllocationStrategy::Static) + .create() + .unwrap(); + + let request = client.loan_slice(SLICE_SIZE - 1); + assert_that!(request, is_ok); + + let request = client.loan_slice(SLICE_SIZE); + assert_that!(request, is_ok); + + let request = client.loan_slice(SLICE_SIZE + 1); + assert_that!(request.err(), eq Some(LoanError::ExceedsMaxLoanSize)); + } + + #[test] + fn send_increasing_responses_with_static_allocation_strategy_fails() { + const SLICE_SIZE: usize = 1024; + let service_name = generate_service_name(); + let config = generate_isolated_config(); + let node = NodeBuilder::new().config(&config).create::().unwrap(); + + let service = node + .service_builder(&service_name) + .request_response::() + .create() + .unwrap(); + + let client = service.client_builder().create().unwrap(); + let server = service + .server_builder() + .initial_max_slice_len(SLICE_SIZE) + .allocation_strategy(AllocationStrategy::Static) + .create() + .unwrap(); + let _pending_response = client.send_copy(0).unwrap(); + let active_request = server.receive().unwrap().unwrap(); + + let response = active_request.loan_slice(SLICE_SIZE - 1); + assert_that!(response, is_ok); + + let response = active_request.loan_slice(SLICE_SIZE); + assert_that!(response, is_ok); + + let response = active_request.loan_slice(SLICE_SIZE + 1); + assert_that!(response.err(), eq Some(LoanError::ExceedsMaxLoanSize)); + } + + fn send_and_receive_increasing_requests_works( + allocation_strategy: AllocationStrategy, + ) { + const ITERATIONS: usize = 128; + let service_name = generate_service_name(); + let config = generate_isolated_config(); + let node = NodeBuilder::new().config(&config).create::().unwrap(); + + let service = node + .service_builder(&service_name) + .request_response::<[u8], u64>() + .create() + .unwrap(); + + let client = service + .client_builder() + .initial_max_slice_len(1) + .allocation_strategy(allocation_strategy) + .create() + .unwrap(); + let server = service.server_builder().create().unwrap(); + + for n in 0..ITERATIONS { + let request_size = (n + 1) * 32; + let mut request = client.loan_slice(request_size).unwrap(); + for byte in request.payload_mut() { + *byte = n as u8; + } + + let _pending_response = request.send().unwrap(); + + let active_request = server.receive().unwrap().unwrap(); + assert_that!(active_request.payload(), len request_size); + for byte in active_request.payload() { + assert_that!(*byte, eq n as u8); + } + } + } + + #[test] + fn send_and_receive_increasing_requests_with_best_fit_allocation_works() { + send_and_receive_increasing_requests_works::(AllocationStrategy::BestFit); + } + + #[test] + fn send_and_receive_increasing_requests_with_power_of_two_allocation_works() { + send_and_receive_increasing_requests_works::(AllocationStrategy::PowerOfTwo); + } + + fn send_and_receive_increasing_responses_works( + allocation_strategy: AllocationStrategy, + ) { + const ITERATIONS: usize = 128; + let service_name = generate_service_name(); + let config = generate_isolated_config(); + let node = NodeBuilder::new().config(&config).create::().unwrap(); + + let service = node + .service_builder(&service_name) + .request_response::() + .create() + .unwrap(); + + let client = service.client_builder().create().unwrap(); + let server = service + .server_builder() + .initial_max_slice_len(1) + .allocation_strategy(allocation_strategy) + .create() + .unwrap(); + let pending_response = client.send_copy(0).unwrap(); + let active_request = server.receive().unwrap().unwrap(); + + for n in 0..ITERATIONS { + let response_size = (n + 1) * 32; + let mut response = active_request.loan_slice(response_size).unwrap(); + for byte in response.payload_mut() { + *byte = n as u8 + 5; + } + response.send().unwrap(); + + let response = pending_response.receive().unwrap().unwrap(); + assert_that!(response.payload(), len response_size); + for byte in response.payload() { + assert_that!(*byte, eq n as u8 + 5); + } + } + } + + #[test] + fn send_and_receive_increasing_responses_with_best_fit_allocation_strategy_works< + Sut: Service, + >() { + send_and_receive_increasing_responses_works::(AllocationStrategy::BestFit); + } + + #[test] + fn send_and_receive_increasing_responses_with_power_of_two_allocation_strategy_works< + Sut: Service, + >() { + send_and_receive_increasing_responses_works::(AllocationStrategy::PowerOfTwo); + } + #[instantiate_tests()] mod ipc {} diff --git a/iceoryx2/tests/subscriber_tests.rs b/iceoryx2/tests/subscriber_tests.rs index 4db12c225..6cd42c2ea 100644 --- a/iceoryx2/tests/subscriber_tests.rs +++ b/iceoryx2/tests/subscriber_tests.rs @@ -13,7 +13,7 @@ #[generic_tests::define] mod subscriber { use iceoryx2::port::ReceiveError; - use iceoryx2::service::builder::publish_subscribe::CustomPayloadMarker; + use iceoryx2::service::builder::CustomPayloadMarker; use iceoryx2::service::static_config::message_type_details::{TypeDetail, TypeVariant}; use std::collections::HashSet;