Skip to content

Commit e6e918f

Browse files
Decouple transport receivers creation using unique network flows (#5583)
* Refs #22519. Add regression test Signed-off-by: Juan Lopez Fernandez <[email protected]> * Refs #22519. Decouple transport receivers creation using unique network flows Signed-off-by: Juan Lopez Fernandez <[email protected]> * Refs #22519. Add comment for future developers Signed-off-by: Juan Lopez Fernandez <[email protected]> * Refs #22519. Apply suggestion Signed-off-by: Juan Lopez Fernandez <[email protected]> * Refs #22519. Reuse unique ports for locators of same kind in a reader Signed-off-by: Juan Lopez Fernandez <[email protected]> --------- Signed-off-by: Juan Lopez Fernandez <[email protected]>
1 parent c0e7929 commit e6e918f

File tree

3 files changed

+147
-17
lines changed

3 files changed

+147
-17
lines changed

src/cpp/rtps/participant/RTPSParticipantImpl.cpp

+45-17
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
#include <algorithm>
2323
#include <functional>
24+
#include <map>
2425
#include <memory>
2526
#include <mutex>
2627
#include <sstream>
@@ -1728,42 +1729,69 @@ bool RTPSParticipantImpl::createAndAssociateReceiverswithEndpoint(
17281729
if (unique_flows)
17291730
{
17301731
attributes.multicastLocatorList.clear();
1731-
attributes.unicastLocatorList = m_att.defaultUnicastLocatorList;
1732+
attributes.unicastLocatorList.clear();
17321733
attributes.external_unicast_locators.clear();
17331734

1734-
uint16_t port = initial_unique_port;
1735-
while (port < final_unique_port)
1735+
// Register created resources to distinguish the case where a receiver was created in this same function call
1736+
// (and can be reused for other locators of the same kind in this reader), and that in which it was already
1737+
// created before for other reader in this same participant.
1738+
std::map<int32_t, int16_t> created_resources;
1739+
1740+
// Create unique flows for unicast locators
1741+
LocatorList_t input_locator_list = m_att.defaultUnicastLocatorList;
1742+
for (Locator_t& loc : input_locator_list)
17361743
{
1737-
// Set port on unicast locators
1738-
for (Locator_t& loc : attributes.unicastLocatorList)
1744+
uint16_t port = created_resources.count(loc.kind) ? created_resources[loc.kind] : initial_unique_port;
1745+
while (port < final_unique_port)
17391746
{
17401747
// Set logical port only TCP locators
17411748
if (LOCATOR_KIND_TCPv4 == loc.kind || LOCATOR_KIND_TCPv6 == loc.kind)
17421749
{
1750+
// Due to current implementation limitations only one physical port (actual socket receiver)
1751+
// is allowed when using TCP tranport. All we can do for now is to create a unique "logical" flow.
1752+
// TODO(juanlofer): create a unique dedicated TCP communication channel once this limitation is removed.
17431753
IPLocator::setLogicalPort(loc, port);
17441754
}
17451755
else
17461756
{
17471757
loc.port = port;
17481758
}
1759+
1760+
// Try creating receiver for this locator
1761+
LocatorList_t aux_locator_list;
1762+
aux_locator_list.push_back(loc);
1763+
if (createReceiverResources(aux_locator_list, false, true, false))
1764+
{
1765+
created_resources[loc.kind] = port;
1766+
}
1767+
1768+
// Locator will be present in the list if receiver was created, or was already created
1769+
// Continue if receiver not created for this reader (might exist but created for other reader in this same participant)
1770+
if (!aux_locator_list.empty() &&
1771+
created_resources.count(loc.kind) && (created_resources[loc.kind] == port))
1772+
{
1773+
break;
1774+
}
1775+
1776+
// Try with next port
1777+
++port;
17491778
}
17501779

1751-
// Try creating receiver resources
1752-
LocatorList_t aux_locator_list = attributes.unicastLocatorList;
1753-
if (createReceiverResources(aux_locator_list, false, true, false))
1780+
// Fail when unique ports are exhausted
1781+
if (port >= final_unique_port)
17541782
{
1755-
break;
1783+
EPROSIMA_LOG_WARNING(RTPS_PARTICIPANT, "Unique flows requested but exhausted. Port range: "
1784+
<< initial_unique_port << "-" << final_unique_port << ". Discarding locator: " << loc);
1785+
}
1786+
else
1787+
{
1788+
attributes.unicastLocatorList.push_back(loc);
17561789
}
1757-
1758-
// Try with next port
1759-
++port;
17601790
}
17611791

1762-
// Fail when unique ports are exhausted
1763-
if (port >= final_unique_port)
1792+
if (attributes.unicastLocatorList.empty())
17641793
{
1765-
EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT, "Unique flows requested but exhausted. Port range: "
1766-
<< initial_unique_port << "-" << final_unique_port);
1794+
EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT, "No unicast locators to create unique flows");
17671795
return false;
17681796
}
17691797
}
@@ -1874,7 +1902,7 @@ bool RTPSParticipantImpl::createReceiverResources(
18741902
bool ret_val = input_list.empty();
18751903

18761904
#if HAVE_SECURITY
1877-
// An auxilary buffer is needed in the ReceiverResource to to decrypt the message,
1905+
// An auxilary buffer is needed in the ReceiverResource to decrypt the message,
18781906
// that imposes a limit in the received messages size even if the transport allows (uint32_t) messages size.
18791907
uint32_t max_receiver_buffer_size =
18801908
is_secure() ? std::numeric_limits<uint16_t>::max() : (std::numeric_limits<uint32_t>::max)();

src/cpp/rtps/participant/RTPSParticipantImpl.hpp

+1
Original file line numberDiff line numberDiff line change
@@ -1031,6 +1031,7 @@ class RTPSParticipantImpl
10311031
* @param ApplyMutation - True if we want to create a Resource with a "similar" locator if the one we provide is unavailable
10321032
* @param RegisterReceiver - True if we want the receiver to be registered. Useful for receivers created after participant is enabled.
10331033
* @param log_when_creation_fails - True if a log warning shall be issued for each locator when a receiver resource cannot be created.
1034+
* @return True if a receiver resource was created for at least a locator in the list, false otherwise.
10341035
*/
10351036
bool createReceiverResources(
10361037
LocatorList_t& Locator_list,

test/blackbox/common/BlackboxTestsNetworkConf.cpp

+101
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include <gtest/gtest.h>
1919
#include <fastdds/rtps/common/Locator.hpp>
20+
#include <fastdds/rtps/transport/shared_mem/SharedMemTransportDescriptor.hpp>
2021
#include <fastdds/utils/IPFinder.hpp>
2122

2223
#include "BlackboxTests.hpp"
@@ -171,6 +172,106 @@ TEST_P(NetworkConfig, sub_unique_network_flows)
171172
}
172173
}
173174

175+
// Regression test for redmine issue #22519 to check that readers using unique network flows cannot share locators
176+
// with other readers. The mentioned issue referred to the case where TCP + builtin transports are present.
177+
// In that concrete scenario, the problem was that while the TCP (and UDP) transports rightly were able
178+
// to create a receiver in the dedicated "unique flow" port, shared memory failed for that same port as the other
179+
// process (or participant) is already listening on it. However this was not being handled properly, so once matched,
180+
// the publisher attempts to send data to the wrongfully announced shared memory locator.
181+
// Note that the underlying problem is that, when creating unique network flows, all transports are requested to
182+
// create a receiver for a specific port all together. This is, the creation of unique flow receivers is only
183+
// considered to fail when it fails for all transports, instead of decoupling them and keep trying for alternative
184+
// ports when the creation of a specific transport receiver fails.
185+
// In this test a similar scenario is presented, but using instead UDP and shared memory transports. In the first
186+
// participant, only shared memory is used (which should create a SHM receiver in the first "unique" port attempted).
187+
// In the second participant both UDP and shared memory are used (which should create a UDP receiver in the first
188+
// "unique" port attempted, and a shared memory receiver in the second "unique" port attempted, as the first one is
189+
// already being used by the first participant). As a result, the listening shared memory locators of each data
190+
// reader should be different. Finally, a third data reader is created in the second participant, and it is verified
191+
// that its listening locators are different from those of the other reader created in the same participant, as well as
192+
// from the (SHM) one of the reader created in the first participant.
193+
TEST_P(NetworkConfig, sub_unique_network_flows_multiple_locators)
194+
{
195+
// Enable unique network flows feature
196+
PropertyPolicy properties;
197+
properties.properties().emplace_back("fastdds.unique_network_flows", "");
198+
199+
// First participant
200+
PubSubParticipant<HelloWorldPubSubType> participant(0, 1, 0, 0);
201+
202+
participant.sub_topic_name(TEST_TOPIC_NAME).sub_property_policy(properties);
203+
204+
std::shared_ptr<SharedMemTransportDescriptor> shm_descriptor = std::make_shared<SharedMemTransportDescriptor>();
205+
// Use only SHM transport in the first participant
206+
participant.disable_builtin_transport().add_user_transport_to_pparams(shm_descriptor);
207+
208+
ASSERT_TRUE(participant.init_participant());
209+
ASSERT_TRUE(participant.init_subscriber(0));
210+
211+
LocatorList_t locators;
212+
213+
participant.get_native_reader(0).get_listening_locators(locators);
214+
ASSERT_EQ(locators.size(), 1u);
215+
ASSERT_EQ((*locators.begin()).kind, LOCATOR_KIND_SHM);
216+
217+
// Second participant
218+
PubSubParticipant<HelloWorldPubSubType> participant2(0, 2, 0, 0);
219+
220+
participant2.sub_topic_name(TEST_TOPIC_NAME).sub_property_policy(properties);
221+
222+
// Use both UDP and SHM in the second participant
223+
if (!use_udpv4)
224+
{
225+
participant2.disable_builtin_transport().add_user_transport_to_pparams(descriptor_).
226+
add_user_transport_to_pparams(shm_descriptor);
227+
}
228+
229+
ASSERT_TRUE(participant2.init_participant());
230+
ASSERT_TRUE(participant2.init_subscriber(0));
231+
232+
LocatorList_t locators2_1;
233+
234+
participant2.get_native_reader(0).get_listening_locators(locators2_1);
235+
ASSERT_TRUE(locators2_1.size() >= 2u); // There should be at least two locators, one for SHM and N(#interfaces) for UDP
236+
237+
// Check SHM locator is different from the one in the first participant
238+
for (const Locator_t& loc : locators2_1)
239+
{
240+
if (LOCATOR_KIND_SHM == loc.kind)
241+
{
242+
// Ports should be different (expected second and first values of the unique network flows port range)
243+
ASSERT_FALSE(loc == *locators.begin());
244+
}
245+
}
246+
247+
// Now create a second reader in the second participant
248+
ASSERT_TRUE(participant2.init_subscriber(1));
249+
250+
LocatorList_t locators2_2;
251+
252+
participant2.get_native_reader(1).get_listening_locators(locators2_2);
253+
ASSERT_TRUE(locators2_2.size() >= 2u); // There should be at least two locators, one for SHM and N(#interfaces) for UDP
254+
255+
// Check SHM locator is different from the one in the first participant
256+
for (const Locator_t& loc : locators2_2)
257+
{
258+
if (LOCATOR_KIND_SHM == loc.kind)
259+
{
260+
// Ports should be different (expected third and first values of the unique network flows port range)
261+
ASSERT_FALSE(loc == *locators.begin());
262+
}
263+
}
264+
265+
// Now check no locators are shared between the two readers in the second participant
266+
for (const Locator_t& loc_1 : locators2_1)
267+
{
268+
for (const Locator_t& loc_2 : locators2_2)
269+
{
270+
ASSERT_FALSE(loc_1 == loc_2);
271+
}
272+
}
273+
}
274+
174275
//Verify that outLocatorList is used to select the desired output channel
175276
TEST_P(NetworkConfig, PubSubOutLocatorSelection)
176277
{

0 commit comments

Comments
 (0)