Skip to content

[22931] Fix assertion on OutputTrafficManager #5704

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1992,6 +1992,28 @@ void RTPSParticipantImpl::createSenderResources(
m_network_Factory.build_send_resources(send_resource_list_, locator_selector_entry);
}

void RTPSParticipantImpl::createSenderResources(
const RemoteLocatorList& locator_list,
const EndpointAttributes& param)
{
using network::external_locators::filter_remote_locators;

LocatorSelectorEntry entry(locator_list.unicast.size(), locator_list.multicast.size());
entry.multicast = locator_list.multicast;
entry.unicast = locator_list.unicast;
filter_remote_locators(entry, param.external_unicast_locators, param.ignore_non_matching_locators);

std::lock_guard<std::timed_mutex> lock(m_send_resources_mutex_);
for (const Locator_t& locator : entry.unicast)
{
m_network_Factory.build_send_resources(send_resource_list_, locator);
}
for (const Locator_t& locator : entry.multicast)
{
m_network_Factory.build_send_resources(send_resource_list_, locator);
}
}

bool RTPSParticipantImpl::deleteUserEndpoint(
const GUID_t& endpoint)
{
Expand Down
5 changes: 5 additions & 0 deletions src/cpp/rtps/participant/RTPSParticipantImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ namespace rtps {

struct PublicationBuiltinTopicData;
struct TopicDescription;
struct RemoteLocatorList;
class RTPSParticipant;
class RTPSParticipantListener;
class BuiltinProtocols;
Expand Down Expand Up @@ -1045,6 +1046,10 @@ class RTPSParticipantImpl
void createSenderResources(
const Locator_t& locator);

void createSenderResources(
const RemoteLocatorList& locator_list,
const EndpointAttributes& param);

/**
* Creates sender resources for the given locator selector entry by calling the NetworkFactory's
* build_send_resources method.
Expand Down
16 changes: 13 additions & 3 deletions src/cpp/rtps/reader/StatelessReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ bool StatelessReader::matched_writer_add_edp(
std::unique_lock<RecursiveTimedMutex> guard(mp_mutex);
listener = listener_;

bool is_same_process = RTPSDomainImpl::should_intraprocess_between(m_guid, wdata.guid);
bool is_datasharing = is_datasharing_compatible_with(wdata);

for (RemoteWriterInfo_t& writer : matched_writers_)
{
if (writer.guid == wdata.guid)
Expand All @@ -120,6 +123,11 @@ bool StatelessReader::matched_writer_add_edp(
}
writer.ownership_strength = wdata.ownership_strength.value;

if (!is_same_process && !is_datasharing)
{
mp_RTPSParticipant->createSenderResources(wdata.remote_locators, m_att);
}

if (nullptr != listener)
{
// call the listener without the lock taken
Expand All @@ -141,9 +149,6 @@ bool StatelessReader::matched_writer_add_edp(
}
}

bool is_same_process = RTPSDomainImpl::should_intraprocess_between(m_guid, wdata.guid);
bool is_datasharing = is_datasharing_compatible_with(wdata);

RemoteWriterInfo_t info;
info.guid = wdata.guid;
info.persistence_guid = wdata.persistence_guid;
Expand Down Expand Up @@ -192,6 +197,11 @@ bool StatelessReader::matched_writer_add_edp(
// this has to be done after the writer is added to the matched_writers or the processing may fail
datasharing_listener_->notify(false);
}

if (!is_same_process && !is_datasharing)
{
mp_RTPSParticipant->createSenderResources(wdata.remote_locators, m_att);
}
}

if (liveliness_lease_duration_ < dds::c_TimeInfinite)
Expand Down
2 changes: 2 additions & 0 deletions src/cpp/rtps/writer/StatefulWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,7 @@ bool StatefulWriter::matched_reader_add_edp(
m_att.external_unicast_locators, m_att.ignore_non_matching_locators);
filter_remote_locators(*reader->async_locator_selector_entry(),
m_att.external_unicast_locators, m_att.ignore_non_matching_locators);
mp_RTPSParticipant->createSenderResources(rdata.remote_locators, m_att);
update_reader_info(locator_selector_general_, true);
update_reader_info(locator_selector_async_, true);
}
Expand Down Expand Up @@ -1071,6 +1072,7 @@ bool StatefulWriter::matched_reader_add_edp(
}
}

mp_RTPSParticipant->createSenderResources(rdata.remote_locators, m_att);
update_reader_info(locator_selector_general_, true);
update_reader_info(locator_selector_async_, true);

Expand Down
10 changes: 2 additions & 8 deletions src/cpp/rtps/writer/StatelessWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ bool StatelessWriter::matched_reader_add_edp(
{
filter_remote_locators(*reader.general_locator_selector_entry(),
m_att.external_unicast_locators, m_att.ignore_non_matching_locators);
mp_RTPSParticipant->createSenderResources(data.remote_locators, m_att);
update_reader_info(true);
}
return true;
Expand Down Expand Up @@ -561,14 +562,7 @@ bool StatelessWriter::matched_reader_add_edp(
}

// Create sender resources for the case when we send to a single reader
locator_selector_.locator_selector.reset(false);
locator_selector_.locator_selector.enable(data.guid);
mp_RTPSParticipant->network_factory().select_locators(locator_selector_.locator_selector);
RTPSParticipantImpl* part = mp_RTPSParticipant;
locator_selector_.locator_selector.for_each([part](const Locator_t& loc)
{
part->createSenderResources(loc);
});
mp_RTPSParticipant->createSenderResources(data.remote_locators, m_att);

// Create sender resources for the case when we send to all readers
update_reader_info(true);
Expand Down
8 changes: 7 additions & 1 deletion src/cpp/statistics/rtps/messages/OutputTrafficManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <utility>

#include <fastdds/config.hpp>
#include <fastdds/dds/log/Log.hpp>
#include <fastdds/rtps/common/Locator.hpp>

#include <statistics/rtps/messages/RTPSStatisticsMessages.hpp>
Expand Down Expand Up @@ -83,7 +84,12 @@ class OutputTrafficManager
return locator == entry.first;
};
auto it = std::find_if(collection_.begin(), collection_.end(), search);
assert(it != collection_.end());
if (it == collection_.end())
{
EPROSIMA_LOG_ERROR(RTPS_OUT,
"Locator '" << locator << "' not found in collection. Adding entry.");
it = collection_.insert(it, entry_type(locator, value_type{}));
}
set_statistics_submessage_from_transport(locator, send_buffer, total_bytes, it->second);
#endif // FASTDDS_STATISTICS
}
Expand Down
51 changes: 51 additions & 0 deletions test/blackbox/common/BlackboxTestsTransportTCP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1506,6 +1506,57 @@ TEST_P(TransportTCP, tcp_unique_network_flows_communication)
EXPECT_TRUE(writer.waitForAllAcked(std::chrono::seconds(30)));
}

/**
* This verifies that a best effort reader is capable of creating resources when a new locator
* is received along a Data(W) in order to start communication. This will ensure the creation a new connect channel.
* The reader must have the lowest listening port to force the participant to create the channel.
*/
TEST_P(TransportTCP, best_effort_reader_tcp_resources_creation)
{
PubSubWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);
PubSubReader<HelloWorldPubSubType> reader(TEST_TOPIC_NAME);

// Large data setup is reused to enable UDP for multicast and TCP for data.
// However, the metatraffic unicast needs to be replaced for UDP to ensure that the TCP
// locator is not announced in the Data(P) (In large data the metatraffic unicast is TCP).
LocatorList metatraffic_unicast;
eprosima::fastdds::rtps::Locator_t udp_locator;
udp_locator.kind = LOCATOR_KIND_UDPv4;
eprosima::fastdds::rtps::IPLocator::setIPv4(udp_locator, "127.0.0.1");
metatraffic_unicast.push_back(udp_locator);

// Writer with highest listening port will wait for connection
writer.setup_large_data_tcp(use_ipv6, global_port + 1)
.metatraffic_unicast_locator_list(metatraffic_unicast)
.init();

// Reader with lowest listening port to force the connection channel creation
reader.setup_large_data_tcp(use_ipv6, global_port)
.reliability(eprosima::fastdds::dds::ReliabilityQosPolicyKind::BEST_EFFORT_RELIABILITY_QOS)
.metatraffic_unicast_locator_list(metatraffic_unicast)
.init();

ASSERT_TRUE(writer.isInitialized());
ASSERT_TRUE(reader.isInitialized());

writer.wait_discovery(std::chrono::seconds(5));
reader.wait_discovery(std::chrono::seconds(5));

ASSERT_EQ(writer.get_matched(), 1u);
ASSERT_EQ(reader.get_matched(), 1u);

// Although participants have matched, the TCP connection might not be established yet.
// This active wait ensures the connection had time to be established before sending non-reliable samples.
std::this_thread::sleep_for(std::chrono::seconds(3));

auto data = default_helloworld_data_generator();
reader.startReception(data);
writer.send(data);
ASSERT_TRUE(data.empty());

reader.block_for_all();
}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
Expand Down
Loading