@@ -48,6 +48,7 @@ use iceoryx2_bb_log::{fail, warn};
48
48
use iceoryx2_bb_posix:: unique_system_id:: UniqueSystemId ;
49
49
use iceoryx2_cal:: dynamic_storage:: DynamicStorage ;
50
50
51
+ use crate :: service:: builder:: publish_subscribe:: CustomPayloadMarker ;
51
52
use crate :: service:: naming_scheme:: data_segment_name;
52
53
use crate :: service:: port_factory:: server:: LocalServerConfig ;
53
54
use crate :: {
@@ -513,11 +514,11 @@ impl<
513
514
details : ChunkDetails < Service > ,
514
515
chunk : Chunk ,
515
516
connection_id : usize ,
517
+ number_of_elements : usize ,
516
518
) -> ActiveRequest < Service , [ RequestPayload ] , RequestHeader , ResponsePayload , ResponseHeader >
517
519
{
518
520
let header =
519
521
unsafe { & * ( chunk. header as * const service:: header:: request_response:: RequestHeader ) } ;
520
- let number_of_elements = ( * header) . number_of_elements ( ) ;
521
522
522
523
ActiveRequest {
523
524
details,
@@ -568,8 +569,68 @@ impl<
568
569
. response_sender
569
570
. get_connection_id_of ( header. client_port_id . value ( ) )
570
571
{
571
- let active_request =
572
- self . create_active_request ( details, chunk, connection_id) ;
572
+ let active_request = self . create_active_request (
573
+ details,
574
+ chunk,
575
+ connection_id,
576
+ header. number_of_elements ( ) as _ ,
577
+ ) ;
578
+
579
+ if !self . enable_fire_and_forget && !active_request. is_connected ( ) {
580
+ continue ;
581
+ }
582
+
583
+ return Ok ( Some ( active_request) ) ;
584
+ }
585
+ }
586
+ None => return Ok ( None ) ,
587
+ }
588
+ }
589
+ }
590
+ }
591
+
592
+ impl <
593
+ Service : service:: Service ,
594
+ RequestHeader : Debug + ZeroCopySend ,
595
+ ResponsePayload : Debug + ZeroCopySend + ?Sized ,
596
+ ResponseHeader : Debug + ZeroCopySend ,
597
+ > Server < Service , [ CustomPayloadMarker ] , RequestHeader , ResponsePayload , ResponseHeader >
598
+ {
599
+ pub unsafe fn receive_custom_payload (
600
+ & self ,
601
+ ) -> Result <
602
+ Option <
603
+ ActiveRequest <
604
+ Service ,
605
+ [ CustomPayloadMarker ] ,
606
+ RequestHeader ,
607
+ ResponsePayload ,
608
+ ResponseHeader ,
609
+ > ,
610
+ > ,
611
+ ReceiveError ,
612
+ > {
613
+ loop {
614
+ match self . receive_impl ( ) ? {
615
+ Some ( ( details, chunk) ) => {
616
+ let header = unsafe {
617
+ & * ( chunk. header as * const service:: header:: request_response:: RequestHeader )
618
+ } ;
619
+ let number_of_elements = ( * header) . number_of_elements ( ) ;
620
+ let number_of_bytes = number_of_elements as usize
621
+ * self . shared_state . request_receiver . payload_size ( ) ;
622
+
623
+ if let Some ( connection_id) = self
624
+ . shared_state
625
+ . response_sender
626
+ . get_connection_id_of ( header. client_port_id . value ( ) )
627
+ {
628
+ let active_request = self . create_active_request (
629
+ details,
630
+ chunk,
631
+ connection_id,
632
+ number_of_bytes,
633
+ ) ;
573
634
574
635
if !self . enable_fire_and_forget && !active_request. is_connected ( ) {
575
636
continue ;
0 commit comments