Skip to content

Commit d71913b

Browse files
Fix TCP discovery server locators translation (#5410)
--------- Signed-off-by: cferreiragonz <[email protected]>
1 parent 997c511 commit d71913b

File tree

6 files changed

+99
-10
lines changed

6 files changed

+99
-10
lines changed

src/cpp/rtps/security/SecurityManager.cpp

-3
Original file line numberDiff line numberDiff line change
@@ -1173,7 +1173,6 @@ bool SecurityManager::create_participant_stateless_message_reader()
11731173
ratt.endpoint.multicastLocatorList = pattr.builtin.metatrafficMulticastLocatorList;
11741174
}
11751175
ratt.endpoint.unicastLocatorList = pattr.builtin.metatrafficUnicastLocatorList;
1176-
ratt.endpoint.remoteLocatorList = pattr.builtin.initialPeersList;
11771176
ratt.endpoint.external_unicast_locators = pattr.builtin.metatraffic_external_unicast_locators;
11781177
ratt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators;
11791178
ratt.matched_writers_allocation = pattr.allocation.participants;
@@ -1273,7 +1272,6 @@ bool SecurityManager::create_participant_volatile_message_secure_writer()
12731272
watt.endpoint.unicastLocatorList = pattr.builtin.metatrafficUnicastLocatorList;
12741273
watt.endpoint.external_unicast_locators = pattr.builtin.metatraffic_external_unicast_locators;
12751274
watt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators;
1276-
watt.endpoint.remoteLocatorList = pattr.builtin.initialPeersList;
12771275
watt.endpoint.security_attributes().is_submessage_protected = true;
12781276
watt.endpoint.security_attributes().plugin_endpoint_attributes =
12791277
PLUGIN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_ENCRYPTED;
@@ -1326,7 +1324,6 @@ bool SecurityManager::create_participant_volatile_message_secure_reader()
13261324
ratt.endpoint.unicastLocatorList = pattr.builtin.metatrafficUnicastLocatorList;
13271325
ratt.endpoint.external_unicast_locators = pattr.builtin.metatraffic_external_unicast_locators;
13281326
ratt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators;
1329-
ratt.endpoint.remoteLocatorList = pattr.builtin.initialPeersList;
13301327
ratt.endpoint.security_attributes().is_submessage_protected = true;
13311328
ratt.endpoint.security_attributes().plugin_endpoint_attributes =
13321329
PLUGIN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_ENCRYPTED;

src/cpp/rtps/transport/TCPTransportInterface.cpp

+74-4
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ void TCPTransportInterface::clean()
191191

192192
{
193193
std::vector<std::shared_ptr<TCPChannelResource>> channels;
194+
std::vector<eprosima::fastdds::rtps::Locator> delete_channels;
194195

195196
{
196197
std::unique_lock<std::mutex> scopedLock(sockets_map_mutex_);
@@ -200,10 +201,22 @@ void TCPTransportInterface::clean()
200201

201202
for (auto& channel : channel_resources_)
202203
{
203-
channels.push_back(channel.second);
204+
if (std::find(channels.begin(), channels.end(), channel.second) == channels.end())
205+
{
206+
channels.push_back(channel.second);
207+
}
208+
else
209+
{
210+
delete_channels.push_back(channel.first);
211+
}
204212
}
205213
}
206214

215+
for (auto& delete_channel : delete_channels)
216+
{
217+
channel_resources_.erase(delete_channel);
218+
}
219+
207220
for (auto& channel : channels)
208221
{
209222
if (channel->connection_established())
@@ -279,7 +292,7 @@ Locator TCPTransportInterface::local_endpoint_to_locator(
279292
return locator;
280293
}
281294

282-
void TCPTransportInterface::bind_socket(
295+
ResponseCode TCPTransportInterface::bind_socket(
283296
std::shared_ptr<TCPChannelResource>& channel)
284297
{
285298
std::unique_lock<std::mutex> scopedLock(sockets_map_mutex_);
@@ -288,7 +301,29 @@ void TCPTransportInterface::bind_socket(
288301
auto it_remove = std::find(unbound_channel_resources_.begin(), unbound_channel_resources_.end(), channel);
289302
assert(it_remove != unbound_channel_resources_.end());
290303
unbound_channel_resources_.erase(it_remove);
291-
channel_resources_[channel->locator()] = channel;
304+
305+
ResponseCode ret = RETCODE_OK;
306+
const auto insert_ret = channel_resources_.insert(
307+
decltype(channel_resources_)::value_type{channel->locator(), channel});
308+
if (false == insert_ret.second)
309+
{
310+
// There is an existing channel that can be used. Force the Client to close unnecessary socket
311+
ret = RETCODE_SERVER_ERROR;
312+
}
313+
314+
std::vector<fastdds::rtps::IPFinder::info_IP> local_interfaces;
315+
// Check if the locator is from an owned interface to link all local interfaces to the channel
316+
is_own_interface(channel->locator(), local_interfaces);
317+
if (!local_interfaces.empty())
318+
{
319+
Locator local_locator(channel->locator());
320+
for (auto& interface_it : local_interfaces)
321+
{
322+
IPLocator::setIPv4(local_locator, interface_it.locator);
323+
channel_resources_.insert(decltype(channel_resources_)::value_type{local_locator, channel});
324+
}
325+
}
326+
return ret;
292327
}
293328

294329
bool TCPTransportInterface::check_crc(
@@ -923,7 +958,7 @@ bool TCPTransportInterface::CreateInitialConnect(
923958
std::lock_guard<std::mutex> socketsLock(sockets_map_mutex_);
924959

925960
// We try to find a SenderResource that has this locator.
926-
// Note: This is done in this level because if we do in NetworkFactory level, we have to mantain what transport
961+
// Note: This is done in this level because if we do it at NetworkFactory level, we have to mantain what transport
927962
// already reuses a SenderResource.
928963
for (auto& sender_resource : send_resource_list)
929964
{
@@ -987,6 +1022,19 @@ bool TCPTransportInterface::CreateInitialConnect(
9871022
send_resource_list.emplace_back(
9881023
static_cast<SenderResource*>(new TCPSenderResource(*this, physical_locator)));
9891024

1025+
std::vector<fastdds::rtps::IPFinder::info_IP> local_interfaces;
1026+
// Check if the locator is from an owned interface to link all local interfaces to the channel
1027+
is_own_interface(physical_locator, local_interfaces);
1028+
if (!local_interfaces.empty())
1029+
{
1030+
Locator local_locator(physical_locator);
1031+
for (auto& interface_it : local_interfaces)
1032+
{
1033+
IPLocator::setIPv4(local_locator, interface_it.locator);
1034+
channel_resources_[local_locator] = channel;
1035+
}
1036+
}
1037+
9901038
return true;
9911039
}
9921040

@@ -2079,6 +2127,28 @@ void TCPTransportInterface::send_channel_pending_logical_ports(
20792127
}
20802128
}
20812129

2130+
void TCPTransportInterface::is_own_interface(
2131+
const Locator& locator,
2132+
std::vector<fastdds::rtps::IPFinder::info_IP>& locNames) const
2133+
{
2134+
std::vector<fastdds::rtps::IPFinder::info_IP> local_interfaces;
2135+
get_ips(local_interfaces, true, false);
2136+
for (const auto& interface_it : local_interfaces)
2137+
{
2138+
if (IPLocator::compareAddress(locator, interface_it.locator) && is_interface_allowed(interface_it.name))
2139+
{
2140+
locNames = local_interfaces;
2141+
// Remove interface of original locator from the list
2142+
locNames.erase(std::remove_if(locNames.begin(), locNames.end(),
2143+
[&interface_it](const fastdds::rtps::IPFinder::info_IP& locInterface)
2144+
{
2145+
return locInterface.locator == interface_it.locator;
2146+
}), locNames.end());
2147+
break;
2148+
}
2149+
}
2150+
}
2151+
20822152
} // namespace rtps
20832153
} // namespace fastdds
20842154
} // namespace eprosima

src/cpp/rtps/transport/TCPTransportInterface.h

+11-1
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ class TCPTransportInterface : public TransportInterface
239239
virtual ~TCPTransportInterface();
240240

241241
//! Stores the binding between the given locator and the given TCP socket. Server side.
242-
void bind_socket(
242+
ResponseCode bind_socket(
243243
std::shared_ptr<TCPChannelResource>&);
244244

245245
//! Removes the listening socket for the specified port.
@@ -525,6 +525,16 @@ class TCPTransportInterface : public TransportInterface
525525
*/
526526
void send_channel_pending_logical_ports(
527527
std::shared_ptr<TCPChannelResource>& channel);
528+
529+
/**
530+
* Method to check if a locator contains an interface that belongs to the same host.
531+
* If it occurs, @c locNames will be updated with the list of interfaces of the host.
532+
* @param [in] locator Locator to check.
533+
* @param [in,out] locNames Vector of interfaces to be updated.
534+
*/
535+
void is_own_interface(
536+
const Locator& locator,
537+
std::vector<fastdds::rtps::IPFinder::info_IP>& locNames) const;
528538
};
529539

530540
} // namespace rtps

src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ ResponseCode RTCPMessageManager::processBindConnectionRequest(
458458

459459
if (RETCODE_OK == code)
460460
{
461-
mTransport->bind_socket(channel);
461+
code = mTransport->bind_socket(channel);
462462
}
463463

464464
sendData(channel, BIND_CONNECTION_RESPONSE, transaction_id, &payload, code);

test/unittest/transport/TCPv4Tests.cpp

+9-1
Original file line numberDiff line numberDiff line change
@@ -2001,6 +2001,9 @@ TEST_F(TCPv4Tests, autofill_port)
20012001
// This test verifies server's channel resources mapping keys uniqueness, where keys are clients locators.
20022002
// Clients typically communicated its PID as its locator port. When having several clients in the same
20032003
// process this lead to overwriting server's channel resources map elements.
2004+
// In order to ensure communication in TCP Discovery Server, a different entry is created in the server's
2005+
// channel resources map for each IP interface found, all of them using the same TCP channel. Thus, two
2006+
// clients will generate at least two entries in the server's channel resources map.
20042007
TEST_F(TCPv4Tests, client_announced_local_port_uniqueness)
20052008
{
20062009
TCPv4TransportDescriptor recvDescriptor;
@@ -2032,7 +2035,12 @@ TEST_F(TCPv4Tests, client_announced_local_port_uniqueness)
20322035

20332036
std::this_thread::sleep_for(std::chrono::milliseconds(100));
20342037

2035-
ASSERT_EQ(receiveTransportUnderTest.get_channel_resources().size(), 2u);
2038+
std::set<std::shared_ptr<TCPChannelResource>> channels_created;
2039+
for (const auto& channel_resource : receiveTransportUnderTest.get_channel_resources())
2040+
{
2041+
channels_created.insert(channel_resource.second);
2042+
}
2043+
ASSERT_EQ(channels_created.size(), 2u);
20362044
}
20372045

20382046
#ifndef _WIN32

tools/fds/server.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,8 @@ int fastdds_discovery_server(
304304
// Retrieve first TCP port
305305
option::Option* pO_tcp_port = options[TCP_PORT];
306306

307+
bool udp_server_initialized = (pOp != nullptr) || (pO_port != nullptr);
308+
307309
/**
308310
* A locator has been initialized previously in [0.0.0.0] address using either the DEFAULT_ROS2_SERVER_PORT or the
309311
* port number set in the CLI. This locator must be used:
@@ -318,6 +320,7 @@ int fastdds_discovery_server(
318320
// Add default locator in cases a) and b)
319321
participantQos.wire_protocol().builtin.metatrafficUnicastLocatorList.clear();
320322
participantQos.wire_protocol().builtin.metatrafficUnicastLocatorList.push_back(locator4);
323+
udp_server_initialized = true;
321324
}
322325
else if (nullptr == pOp && nullptr != pO_port)
323326
{
@@ -569,6 +572,7 @@ int fastdds_discovery_server(
569572
}
570573

571574
fastdds::rtps::GuidPrefix_t guid_prefix = participantQos.wire_protocol().prefix;
575+
participantQos.transport().use_builtin_transports = udp_server_initialized || options[XML_FILE] != nullptr;
572576

573577
// Create the server
574578
int return_value = 0;

0 commit comments

Comments
 (0)