diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index d9cfc4938fa..31fb6960f90 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -1992,6 +1992,28 @@ void RTPSParticipantImpl::createSenderResources( m_network_Factory.build_send_resources(send_resource_list_, locator_selector_entry); } +void RTPSParticipantImpl::createSenderResources( + const RemoteLocatorList& locator_list, + const EndpointAttributes& param) +{ + using network::external_locators::filter_remote_locators; + + LocatorSelectorEntry entry(locator_list.unicast.size(), locator_list.multicast.size()); + entry.multicast = locator_list.multicast; + entry.unicast = locator_list.unicast; + filter_remote_locators(entry, param.external_unicast_locators, param.ignore_non_matching_locators); + + std::lock_guard lock(m_send_resources_mutex_); + for (const Locator_t& locator : entry.unicast) + { + m_network_Factory.build_send_resources(send_resource_list_, locator); + } + for (const Locator_t& locator : entry.multicast) + { + m_network_Factory.build_send_resources(send_resource_list_, locator); + } +} + bool RTPSParticipantImpl::deleteUserEndpoint( const GUID_t& endpoint) { diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.hpp b/src/cpp/rtps/participant/RTPSParticipantImpl.hpp index 60eb417a4f8..74bc47bdd9b 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.hpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.hpp @@ -115,6 +115,7 @@ namespace rtps { struct PublicationBuiltinTopicData; struct TopicDescription; +struct RemoteLocatorList; class RTPSParticipant; class RTPSParticipantListener; class BuiltinProtocols; @@ -1045,6 +1046,10 @@ class RTPSParticipantImpl void createSenderResources( const Locator_t& locator); + void createSenderResources( + const RemoteLocatorList& locator_list, + const EndpointAttributes& param); + /** * Creates sender resources for the given locator selector entry by calling the NetworkFactory's * build_send_resources method. diff --git a/src/cpp/rtps/reader/StatelessReader.cpp b/src/cpp/rtps/reader/StatelessReader.cpp index 07f40efa972..594f46de3b6 100644 --- a/src/cpp/rtps/reader/StatelessReader.cpp +++ b/src/cpp/rtps/reader/StatelessReader.cpp @@ -106,6 +106,9 @@ bool StatelessReader::matched_writer_add_edp( std::unique_lock guard(mp_mutex); listener = listener_; + bool is_same_process = RTPSDomainImpl::should_intraprocess_between(m_guid, wdata.guid); + bool is_datasharing = is_datasharing_compatible_with(wdata); + for (RemoteWriterInfo_t& writer : matched_writers_) { if (writer.guid == wdata.guid) @@ -120,6 +123,11 @@ bool StatelessReader::matched_writer_add_edp( } writer.ownership_strength = wdata.ownership_strength.value; + if (!is_same_process && !is_datasharing) + { + mp_RTPSParticipant->createSenderResources(wdata.remote_locators, m_att); + } + if (nullptr != listener) { // call the listener without the lock taken @@ -141,9 +149,6 @@ bool StatelessReader::matched_writer_add_edp( } } - bool is_same_process = RTPSDomainImpl::should_intraprocess_between(m_guid, wdata.guid); - bool is_datasharing = is_datasharing_compatible_with(wdata); - RemoteWriterInfo_t info; info.guid = wdata.guid; info.persistence_guid = wdata.persistence_guid; @@ -192,6 +197,11 @@ bool StatelessReader::matched_writer_add_edp( // this has to be done after the writer is added to the matched_writers or the processing may fail datasharing_listener_->notify(false); } + + if (!is_same_process && !is_datasharing) + { + mp_RTPSParticipant->createSenderResources(wdata.remote_locators, m_att); + } } if (liveliness_lease_duration_ < dds::c_TimeInfinite) diff --git a/src/cpp/rtps/writer/StatefulWriter.cpp b/src/cpp/rtps/writer/StatefulWriter.cpp index 45ca17f284b..3133c98c407 100644 --- a/src/cpp/rtps/writer/StatefulWriter.cpp +++ b/src/cpp/rtps/writer/StatefulWriter.cpp @@ -987,6 +987,7 @@ bool StatefulWriter::matched_reader_add_edp( m_att.external_unicast_locators, m_att.ignore_non_matching_locators); filter_remote_locators(*reader->async_locator_selector_entry(), m_att.external_unicast_locators, m_att.ignore_non_matching_locators); + mp_RTPSParticipant->createSenderResources(rdata.remote_locators, m_att); update_reader_info(locator_selector_general_, true); update_reader_info(locator_selector_async_, true); } @@ -1071,6 +1072,7 @@ bool StatefulWriter::matched_reader_add_edp( } } + mp_RTPSParticipant->createSenderResources(rdata.remote_locators, m_att); update_reader_info(locator_selector_general_, true); update_reader_info(locator_selector_async_, true); diff --git a/src/cpp/rtps/writer/StatelessWriter.cpp b/src/cpp/rtps/writer/StatelessWriter.cpp index 9ef6b2f9e86..76cf8ac0bc3 100644 --- a/src/cpp/rtps/writer/StatelessWriter.cpp +++ b/src/cpp/rtps/writer/StatelessWriter.cpp @@ -475,6 +475,7 @@ bool StatelessWriter::matched_reader_add_edp( { filter_remote_locators(*reader.general_locator_selector_entry(), m_att.external_unicast_locators, m_att.ignore_non_matching_locators); + mp_RTPSParticipant->createSenderResources(data.remote_locators, m_att); update_reader_info(true); } return true; @@ -561,14 +562,7 @@ bool StatelessWriter::matched_reader_add_edp( } // Create sender resources for the case when we send to a single reader - locator_selector_.locator_selector.reset(false); - locator_selector_.locator_selector.enable(data.guid); - mp_RTPSParticipant->network_factory().select_locators(locator_selector_.locator_selector); - RTPSParticipantImpl* part = mp_RTPSParticipant; - locator_selector_.locator_selector.for_each([part](const Locator_t& loc) - { - part->createSenderResources(loc); - }); + mp_RTPSParticipant->createSenderResources(data.remote_locators, m_att); // Create sender resources for the case when we send to all readers update_reader_info(true); diff --git a/src/cpp/statistics/rtps/messages/OutputTrafficManager.hpp b/src/cpp/statistics/rtps/messages/OutputTrafficManager.hpp index 0c16b9406a9..d00b319932b 100644 --- a/src/cpp/statistics/rtps/messages/OutputTrafficManager.hpp +++ b/src/cpp/statistics/rtps/messages/OutputTrafficManager.hpp @@ -25,6 +25,7 @@ #include #include +#include #include #include @@ -83,7 +84,12 @@ class OutputTrafficManager return locator == entry.first; }; auto it = std::find_if(collection_.begin(), collection_.end(), search); - assert(it != collection_.end()); + if (it == collection_.end()) + { + EPROSIMA_LOG_ERROR(RTPS_OUT, + "Locator '" << locator << "' not found in collection. Adding entry."); + it = collection_.insert(it, entry_type(locator, value_type{})); + } set_statistics_submessage_from_transport(locator, send_buffer, total_bytes, it->second); #endif // FASTDDS_STATISTICS } diff --git a/test/blackbox/common/BlackboxTestsTransportTCP.cpp b/test/blackbox/common/BlackboxTestsTransportTCP.cpp index 2bf6315e0a4..70ffb5852d8 100644 --- a/test/blackbox/common/BlackboxTestsTransportTCP.cpp +++ b/test/blackbox/common/BlackboxTestsTransportTCP.cpp @@ -1506,6 +1506,57 @@ TEST_P(TransportTCP, tcp_unique_network_flows_communication) EXPECT_TRUE(writer.waitForAllAcked(std::chrono::seconds(30))); } +/** + * This verifies that a best effort reader is capable of creating resources when a new locator + * is received along a Data(W) in order to start communication. This will ensure the creation a new connect channel. + * The reader must have the lowest listening port to force the participant to create the channel. + */ +TEST_P(TransportTCP, best_effort_reader_tcp_resources_creation) +{ + PubSubWriter writer(TEST_TOPIC_NAME); + PubSubReader reader(TEST_TOPIC_NAME); + + // Large data setup is reused to enable UDP for multicast and TCP for data. + // However, the metatraffic unicast needs to be replaced for UDP to ensure that the TCP + // locator is not announced in the Data(P) (In large data the metatraffic unicast is TCP). + LocatorList metatraffic_unicast; + eprosima::fastdds::rtps::Locator_t udp_locator; + udp_locator.kind = LOCATOR_KIND_UDPv4; + eprosima::fastdds::rtps::IPLocator::setIPv4(udp_locator, "127.0.0.1"); + metatraffic_unicast.push_back(udp_locator); + + // Writer with highest listening port will wait for connection + writer.setup_large_data_tcp(use_ipv6, global_port + 1) + .metatraffic_unicast_locator_list(metatraffic_unicast) + .init(); + + // Reader with lowest listening port to force the connection channel creation + reader.setup_large_data_tcp(use_ipv6, global_port) + .reliability(eprosima::fastdds::dds::ReliabilityQosPolicyKind::BEST_EFFORT_RELIABILITY_QOS) + .metatraffic_unicast_locator_list(metatraffic_unicast) + .init(); + + ASSERT_TRUE(writer.isInitialized()); + ASSERT_TRUE(reader.isInitialized()); + + writer.wait_discovery(std::chrono::seconds(5)); + reader.wait_discovery(std::chrono::seconds(5)); + + ASSERT_EQ(writer.get_matched(), 1u); + ASSERT_EQ(reader.get_matched(), 1u); + + // Although participants have matched, the TCP connection might not be established yet. + // This active wait ensures the connection had time to be established before sending non-reliable samples. + std::this_thread::sleep_for(std::chrono::seconds(3)); + + auto data = default_helloworld_data_generator(); + reader.startReception(data); + writer.send(data); + ASSERT_TRUE(data.empty()); + + reader.block_for_all(); +} + #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else