From 9c865e1d7edf324f82c9c4a349a8e34468911e82 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Wed, 12 Mar 2025 12:48:08 +0100 Subject: [PATCH 1/5] Refs #22930. Add entry when missing on `set_statistics_message_data`. Signed-off-by: Miguel Company --- src/cpp/statistics/rtps/messages/OutputTrafficManager.hpp | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/cpp/statistics/rtps/messages/OutputTrafficManager.hpp b/src/cpp/statistics/rtps/messages/OutputTrafficManager.hpp index 0c16b9406a9..99789e509de 100644 --- a/src/cpp/statistics/rtps/messages/OutputTrafficManager.hpp +++ b/src/cpp/statistics/rtps/messages/OutputTrafficManager.hpp @@ -25,6 +25,7 @@ #include #include +#include #include #include @@ -83,7 +84,12 @@ class OutputTrafficManager return locator == entry.first; }; auto it = std::find_if(collection_.begin(), collection_.end(), search); - assert(it != collection_.end()); + if(it == collection_.end()) + { + EPROSIMA_LOG_ERROR(RTPS_OUT, + "Locator '" << locator << "' not found in collection. Adding entry."); + it = collection_.insert(it, entry_type(locator, value_type{})); + } set_statistics_submessage_from_transport(locator, send_buffer, total_bytes, it->second); #endif // FASTDDS_STATISTICS } From 8d4718913fd932e48489f9ab75e782d28f4f224c Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Wed, 12 Mar 2025 15:28:26 +0100 Subject: [PATCH 2/5] Refs #22930. Create sender resources on stateless readers. Signed-off-by: Miguel Company --- .../rtps/participant/RTPSParticipantImpl.cpp | 22 +++++++++++++++++++ .../rtps/participant/RTPSParticipantImpl.hpp | 5 +++++ src/cpp/rtps/reader/StatelessReader.cpp | 16 +++++++++++--- 3 files changed, 40 insertions(+), 3 deletions(-) diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index d9cfc4938fa..31fb6960f90 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -1992,6 +1992,28 @@ void RTPSParticipantImpl::createSenderResources( m_network_Factory.build_send_resources(send_resource_list_, locator_selector_entry); } +void RTPSParticipantImpl::createSenderResources( + const RemoteLocatorList& locator_list, + const EndpointAttributes& param) +{ + using network::external_locators::filter_remote_locators; + + LocatorSelectorEntry entry(locator_list.unicast.size(), locator_list.multicast.size()); + entry.multicast = locator_list.multicast; + entry.unicast = locator_list.unicast; + filter_remote_locators(entry, param.external_unicast_locators, param.ignore_non_matching_locators); + + std::lock_guard lock(m_send_resources_mutex_); + for (const Locator_t& locator : entry.unicast) + { + m_network_Factory.build_send_resources(send_resource_list_, locator); + } + for (const Locator_t& locator : entry.multicast) + { + m_network_Factory.build_send_resources(send_resource_list_, locator); + } +} + bool RTPSParticipantImpl::deleteUserEndpoint( const GUID_t& endpoint) { diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.hpp b/src/cpp/rtps/participant/RTPSParticipantImpl.hpp index 60eb417a4f8..74bc47bdd9b 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.hpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.hpp @@ -115,6 +115,7 @@ namespace rtps { struct PublicationBuiltinTopicData; struct TopicDescription; +struct RemoteLocatorList; class RTPSParticipant; class RTPSParticipantListener; class BuiltinProtocols; @@ -1045,6 +1046,10 @@ class RTPSParticipantImpl void createSenderResources( const Locator_t& locator); + void createSenderResources( + const RemoteLocatorList& locator_list, + const EndpointAttributes& param); + /** * Creates sender resources for the given locator selector entry by calling the NetworkFactory's * build_send_resources method. diff --git a/src/cpp/rtps/reader/StatelessReader.cpp b/src/cpp/rtps/reader/StatelessReader.cpp index 07f40efa972..594f46de3b6 100644 --- a/src/cpp/rtps/reader/StatelessReader.cpp +++ b/src/cpp/rtps/reader/StatelessReader.cpp @@ -106,6 +106,9 @@ bool StatelessReader::matched_writer_add_edp( std::unique_lock guard(mp_mutex); listener = listener_; + bool is_same_process = RTPSDomainImpl::should_intraprocess_between(m_guid, wdata.guid); + bool is_datasharing = is_datasharing_compatible_with(wdata); + for (RemoteWriterInfo_t& writer : matched_writers_) { if (writer.guid == wdata.guid) @@ -120,6 +123,11 @@ bool StatelessReader::matched_writer_add_edp( } writer.ownership_strength = wdata.ownership_strength.value; + if (!is_same_process && !is_datasharing) + { + mp_RTPSParticipant->createSenderResources(wdata.remote_locators, m_att); + } + if (nullptr != listener) { // call the listener without the lock taken @@ -141,9 +149,6 @@ bool StatelessReader::matched_writer_add_edp( } } - bool is_same_process = RTPSDomainImpl::should_intraprocess_between(m_guid, wdata.guid); - bool is_datasharing = is_datasharing_compatible_with(wdata); - RemoteWriterInfo_t info; info.guid = wdata.guid; info.persistence_guid = wdata.persistence_guid; @@ -192,6 +197,11 @@ bool StatelessReader::matched_writer_add_edp( // this has to be done after the writer is added to the matched_writers or the processing may fail datasharing_listener_->notify(false); } + + if (!is_same_process && !is_datasharing) + { + mp_RTPSParticipant->createSenderResources(wdata.remote_locators, m_att); + } } if (liveliness_lease_duration_ < dds::c_TimeInfinite) From c9f81d88d87c63e59ef980cc08fddbeb4e129760 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Wed, 12 Mar 2025 15:49:39 +0100 Subject: [PATCH 3/5] Refs #22930. Similar changes on writers. Signed-off-by: Miguel Company --- src/cpp/rtps/writer/StatefulWriter.cpp | 2 ++ src/cpp/rtps/writer/StatelessWriter.cpp | 10 ++-------- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/src/cpp/rtps/writer/StatefulWriter.cpp b/src/cpp/rtps/writer/StatefulWriter.cpp index 45ca17f284b..3133c98c407 100644 --- a/src/cpp/rtps/writer/StatefulWriter.cpp +++ b/src/cpp/rtps/writer/StatefulWriter.cpp @@ -987,6 +987,7 @@ bool StatefulWriter::matched_reader_add_edp( m_att.external_unicast_locators, m_att.ignore_non_matching_locators); filter_remote_locators(*reader->async_locator_selector_entry(), m_att.external_unicast_locators, m_att.ignore_non_matching_locators); + mp_RTPSParticipant->createSenderResources(rdata.remote_locators, m_att); update_reader_info(locator_selector_general_, true); update_reader_info(locator_selector_async_, true); } @@ -1071,6 +1072,7 @@ bool StatefulWriter::matched_reader_add_edp( } } + mp_RTPSParticipant->createSenderResources(rdata.remote_locators, m_att); update_reader_info(locator_selector_general_, true); update_reader_info(locator_selector_async_, true); diff --git a/src/cpp/rtps/writer/StatelessWriter.cpp b/src/cpp/rtps/writer/StatelessWriter.cpp index 9ef6b2f9e86..76cf8ac0bc3 100644 --- a/src/cpp/rtps/writer/StatelessWriter.cpp +++ b/src/cpp/rtps/writer/StatelessWriter.cpp @@ -475,6 +475,7 @@ bool StatelessWriter::matched_reader_add_edp( { filter_remote_locators(*reader.general_locator_selector_entry(), m_att.external_unicast_locators, m_att.ignore_non_matching_locators); + mp_RTPSParticipant->createSenderResources(data.remote_locators, m_att); update_reader_info(true); } return true; @@ -561,14 +562,7 @@ bool StatelessWriter::matched_reader_add_edp( } // Create sender resources for the case when we send to a single reader - locator_selector_.locator_selector.reset(false); - locator_selector_.locator_selector.enable(data.guid); - mp_RTPSParticipant->network_factory().select_locators(locator_selector_.locator_selector); - RTPSParticipantImpl* part = mp_RTPSParticipant; - locator_selector_.locator_selector.for_each([part](const Locator_t& loc) - { - part->createSenderResources(loc); - }); + mp_RTPSParticipant->createSenderResources(data.remote_locators, m_att); // Create sender resources for the case when we send to all readers update_reader_info(true); From a7590d5a6ee5d941593d0586b60cbda230037386 Mon Sep 17 00:00:00 2001 From: cferreiragonz Date: Mon, 17 Mar 2025 12:34:55 +0100 Subject: [PATCH 4/5] Refs #22930: Test TCP with best effort reader Signed-off-by: cferreiragonz --- .../common/BlackboxTestsTransportTCP.cpp | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/test/blackbox/common/BlackboxTestsTransportTCP.cpp b/test/blackbox/common/BlackboxTestsTransportTCP.cpp index 2bf6315e0a4..70ffb5852d8 100644 --- a/test/blackbox/common/BlackboxTestsTransportTCP.cpp +++ b/test/blackbox/common/BlackboxTestsTransportTCP.cpp @@ -1506,6 +1506,57 @@ TEST_P(TransportTCP, tcp_unique_network_flows_communication) EXPECT_TRUE(writer.waitForAllAcked(std::chrono::seconds(30))); } +/** + * This verifies that a best effort reader is capable of creating resources when a new locator + * is received along a Data(W) in order to start communication. This will ensure the creation a new connect channel. + * The reader must have the lowest listening port to force the participant to create the channel. + */ +TEST_P(TransportTCP, best_effort_reader_tcp_resources_creation) +{ + PubSubWriter writer(TEST_TOPIC_NAME); + PubSubReader reader(TEST_TOPIC_NAME); + + // Large data setup is reused to enable UDP for multicast and TCP for data. + // However, the metatraffic unicast needs to be replaced for UDP to ensure that the TCP + // locator is not announced in the Data(P) (In large data the metatraffic unicast is TCP). + LocatorList metatraffic_unicast; + eprosima::fastdds::rtps::Locator_t udp_locator; + udp_locator.kind = LOCATOR_KIND_UDPv4; + eprosima::fastdds::rtps::IPLocator::setIPv4(udp_locator, "127.0.0.1"); + metatraffic_unicast.push_back(udp_locator); + + // Writer with highest listening port will wait for connection + writer.setup_large_data_tcp(use_ipv6, global_port + 1) + .metatraffic_unicast_locator_list(metatraffic_unicast) + .init(); + + // Reader with lowest listening port to force the connection channel creation + reader.setup_large_data_tcp(use_ipv6, global_port) + .reliability(eprosima::fastdds::dds::ReliabilityQosPolicyKind::BEST_EFFORT_RELIABILITY_QOS) + .metatraffic_unicast_locator_list(metatraffic_unicast) + .init(); + + ASSERT_TRUE(writer.isInitialized()); + ASSERT_TRUE(reader.isInitialized()); + + writer.wait_discovery(std::chrono::seconds(5)); + reader.wait_discovery(std::chrono::seconds(5)); + + ASSERT_EQ(writer.get_matched(), 1u); + ASSERT_EQ(reader.get_matched(), 1u); + + // Although participants have matched, the TCP connection might not be established yet. + // This active wait ensures the connection had time to be established before sending non-reliable samples. + std::this_thread::sleep_for(std::chrono::seconds(3)); + + auto data = default_helloworld_data_generator(); + reader.startReception(data); + writer.send(data); + ASSERT_TRUE(data.empty()); + + reader.block_for_all(); +} + #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else From 144c3800ed6a963afdfa1e836945b5d80c98d839 Mon Sep 17 00:00:00 2001 From: cferreiragonz Date: Mon, 17 Mar 2025 17:01:26 +0100 Subject: [PATCH 5/5] Refs #22930: Uncrustify Signed-off-by: cferreiragonz --- src/cpp/statistics/rtps/messages/OutputTrafficManager.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cpp/statistics/rtps/messages/OutputTrafficManager.hpp b/src/cpp/statistics/rtps/messages/OutputTrafficManager.hpp index 99789e509de..d00b319932b 100644 --- a/src/cpp/statistics/rtps/messages/OutputTrafficManager.hpp +++ b/src/cpp/statistics/rtps/messages/OutputTrafficManager.hpp @@ -84,10 +84,10 @@ class OutputTrafficManager return locator == entry.first; }; auto it = std::find_if(collection_.begin(), collection_.end(), search); - if(it == collection_.end()) + if (it == collection_.end()) { EPROSIMA_LOG_ERROR(RTPS_OUT, - "Locator '" << locator << "' not found in collection. Adding entry."); + "Locator '" << locator << "' not found in collection. Adding entry."); it = collection_.insert(it, entry_type(locator, value_type{})); } set_statistics_submessage_from_transport(locator, send_buffer, total_bytes, it->second);