Skip to content

Commit 2532f9d

Browse files
mergify[bot]juanlofer-eprosimaEduPonzJLBuenoLopez
authored
Fix TCP infinite loop, client shutdown and reconnection (#2478)
* Fix TCP infinite loop, client shutdown and reconnection (#2470) * Fix infinite loop in TCP header read Signed-off-by: Juan López Fernández <[email protected]> * Add test for channel disconnection during TCP segment reception Signed-off-by: Juan López Fernández <[email protected]> * Apply suggestions Signed-off-by: Juan López Fernández <[email protected]> * Add issue reference Signed-off-by: Juan López Fernández <[email protected]> * Refs 13540: Unlock TCP client on shutdown Signed-off-by: Eduardo Ponz <[email protected]> * Refs 13540: Update TCP sender resource channel on openning output channel when reusing sender resource Signed-off-by: Eduardo Ponz <[email protected]> * Add TCP client stuck test Signed-off-by: Juan López Fernández <[email protected]> * Add TCP client reconnection test Signed-off-by: Juan López Fernández <[email protected]> * Link tests with issues Signed-off-by: Juan López Fernández <[email protected]> * Uncrustify Signed-off-by: Juan López Fernández <[email protected]> * Rename TCPChannelResource mock class Signed-off-by: Juan López Fernández <[email protected]> Co-authored-by: Eduardo Ponz <[email protected]> (cherry picked from commit 6639a84) # Conflicts: # test/blackbox/common/BlackboxTestsTransportTCP.cpp Signed-off-by: JLBuenoLopez-eProsima <[email protected]> * Refs #13540: fix conflicts Signed-off-by: JLBuenoLopez-eProsima <[email protected]> * Refs #13540: linters Signed-off-by: JLBuenoLopez-eProsima <[email protected]> * Refs #13540: fix Windows warning Signed-off-by: JLBuenoLopez-eProsima <[email protected]> Co-authored-by: juanlofer-eprosima <[email protected]> Co-authored-by: Eduardo Ponz <[email protected]> Co-authored-by: JLBuenoLopez-eProsima <[email protected]>
1 parent f472699 commit 2532f9d

19 files changed

+456
-24
lines changed

examples/C++/DDS/HelloWorldExampleTCP/HelloWorldSubscriber.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ void HelloWorldSubscriber::run(
196196
uint32_t number)
197197
{
198198
std::cout << "[RTCP] Subscriber running until " << number << "samples have been received" << std::endl;
199-
while (number < this->listener_.samples_)
199+
while (number > this->listener_.samples_)
200200
{
201201
std::this_thread::sleep_for(std::chrono::milliseconds(500));
202202
}

src/cpp/rtps/transport/TCPChannelResourceBasic.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,12 +107,13 @@ void TCPChannelResourceBasic::disconnect()
107107
{
108108
auto socket = socket_;
109109

110+
std::error_code ec;
111+
socket->shutdown(asio::ip::tcp::socket::shutdown_both, ec);
112+
110113
service_.post([&, socket]()
111114
{
112115
try
113116
{
114-
std::error_code ec;
115-
socket->shutdown(asio::ip::tcp::socket::shutdown_both, ec);
116117
socket->cancel();
117118

118119
// This method was added on the version 1.12.0
@@ -125,7 +126,6 @@ void TCPChannelResourceBasic::disconnect()
125126
{
126127
}
127128
});
128-
129129
}
130130
}
131131

src/cpp/rtps/transport/TCPTransportInterface.cpp

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,7 @@ bool TCPTransportInterface::CloseInputChannel(
583583
void TCPTransportInterface::close_tcp_socket(
584584
std::shared_ptr<TCPChannelResource>& channel)
585585
{
586-
channel->disable();
586+
channel->disconnect();
587587
// channel.reset(); lead to race conditions because TransportInterface functions used in the callbacks doesn't check validity.
588588
}
589589

@@ -613,6 +613,20 @@ bool TCPTransportInterface::OpenOutputChannel(
613613
//TODO Review with wan ip.
614614
if (tcp_sender_resource && physical_locator == tcp_sender_resource->channel()->locator())
615615
{
616+
// Look for an existing channel that matches this physical locator
617+
auto existing_channel = channel_resources_.find(physical_locator);
618+
// If the channel exists, check if the channel reference in the sender resource needs to be updated with
619+
// the found channel
620+
if (existing_channel != channel_resources_.end() &&
621+
existing_channel->second != tcp_sender_resource->channel())
622+
{
623+
// Disconnect the old channel
624+
tcp_sender_resource->channel()->disconnect();
625+
tcp_sender_resource->channel()->clear();
626+
// Update sender resource with new channel
627+
tcp_sender_resource->channel() = existing_channel->second;
628+
}
629+
// Add logical port to channel if it's not there yet
616630
if (!tcp_sender_resource->channel()->is_logical_port_added(logical_port))
617631
{
618632
tcp_sender_resource->channel()->add_logical_port(logical_port, rtcp_message_manager_.get());
@@ -910,6 +924,10 @@ bool receive_header(
910924
{
911925
return false;
912926
}
927+
else if (!channel->connection_status())
928+
{
929+
return false;
930+
}
913931
}
914932

915933
bytes_needed = TCPHeader::size() - 4;
@@ -925,6 +943,10 @@ bool receive_header(
925943
{
926944
return false;
927945
}
946+
else if (!channel->connection_status())
947+
{
948+
return false;
949+
}
928950
}
929951

930952
return true;
@@ -960,17 +982,22 @@ bool TCPTransportInterface::Receive(
960982
do
961983
{
962984
header_found = receive_header(channel, tcp_header, ec);
963-
} while (!header_found && !ec);
985+
} while (!header_found && !ec && channel->connection_status());
964986

965987
if (ec)
966988
{
967989
if (ec != asio::error::eof)
968990
{
969-
logWarning(DEBUG, "Error reading TCP header: " << ec.message());
991+
logWarning(DEBUG, "Failed to read TCP header: " << ec.message());
970992
}
971993
close_tcp_socket(channel);
972994
success = false;
973995
}
996+
else if (!channel->connection_status())
997+
{
998+
logWarning(DEBUG, "Failed to read TCP header: channel disconnected while reading.");
999+
success = false;
1000+
}
9741001
else
9751002
{
9761003
size_t body_size = tcp_header.length - static_cast<uint32_t>(TCPHeader::size());
@@ -1350,7 +1377,7 @@ void TCPTransportInterface::SocketConnected(
13501377
}
13511378
else
13521379
{
1353-
channel->disable();
1380+
channel->disconnect();
13541381
}
13551382
}
13561383
}
@@ -1522,7 +1549,7 @@ bool TCPTransportInterface::apply_tls_config()
15221549
{
15231550
ssl_context_.load_verify_file(config->verify_file);
15241551
}
1525-
catch(const std::exception& e)
1552+
catch (const std::exception& e)
15261553
{
15271554
logError(TLS, "Error configuring TLS trusted CA certificate: " << e.what());
15281555
return false; // TODO check wether this should skip the rest of the configuration
@@ -1535,7 +1562,7 @@ bool TCPTransportInterface::apply_tls_config()
15351562
{
15361563
ssl_context_.use_certificate_chain_file(config->cert_chain_file);
15371564
}
1538-
catch(const std::exception& e)
1565+
catch (const std::exception& e)
15391566
{
15401567
logError(TLS, "Error configuring TLS certificate: " << e.what());
15411568
return false; // TODO check wether this should skip the rest of the configuration
@@ -1548,7 +1575,7 @@ bool TCPTransportInterface::apply_tls_config()
15481575
{
15491576
ssl_context_.use_private_key_file(config->private_key_file, ssl::context::pem);
15501577
}
1551-
catch(const std::exception& e)
1578+
catch (const std::exception& e)
15521579
{
15531580
logError(TLS, "Error configuring TLS private key: " << e.what());
15541581
return false; // TODO check wether this should skip the rest of the configuration
@@ -1561,7 +1588,7 @@ bool TCPTransportInterface::apply_tls_config()
15611588
{
15621589
ssl_context_.use_tmp_dh_file(config->tmp_dh_file);
15631590
}
1564-
catch(const std::exception& e)
1591+
catch (const std::exception& e)
15651592
{
15661593
logError(TLS, "Error configuring TLS dh params: " << e.what());
15671594
return false; // TODO check wether this should skip the rest of the configuration

test/blackbox/common/BlackboxTestsTransportTCP.cpp

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,3 +508,64 @@ TEST(BlackBox, TCPv6_copy)
508508
TCPv6TransportDescriptor tcpv6_transport_copy = tcpv6_transport;
509509
EXPECT_EQ(tcpv6_transport_copy, tcpv6_transport);
510510
}
511+
512+
// Test connection is successfully restablished after dropping and relaunching a TCP client (requester)
513+
// Issue -> https://github.com/eProsima/Fast-DDS/issues/2409
514+
TEST(TransportTCP, Client_reconnection)
515+
{
516+
TCPReqRepHelloWorldReplier* replier;
517+
TCPReqRepHelloWorldRequester* requester;
518+
const uint16_t nmsgs = 5;
519+
520+
replier = new TCPReqRepHelloWorldReplier;
521+
replier->init(1, 0, global_port);
522+
523+
ASSERT_TRUE(replier->isInitialized());
524+
525+
requester = new TCPReqRepHelloWorldRequester;
526+
requester->init(0, 0, global_port);
527+
528+
ASSERT_TRUE(requester->isInitialized());
529+
530+
// Wait for discovery.
531+
replier->wait_discovery();
532+
requester->wait_discovery();
533+
534+
ASSERT_TRUE(replier->is_matched());
535+
ASSERT_TRUE(requester->is_matched());
536+
537+
for (uint16_t count = 0; count < nmsgs; ++count)
538+
{
539+
requester->send(count);
540+
requester->block();
541+
}
542+
543+
// Release TCP client resources.
544+
delete requester;
545+
546+
// Wait until unmatched.
547+
replier->wait_unmatched();
548+
ASSERT_FALSE(replier->is_matched());
549+
550+
// Create new TCP client instance.
551+
requester = new TCPReqRepHelloWorldRequester;
552+
requester->init(0, 0, global_port);
553+
554+
ASSERT_TRUE(requester->isInitialized());
555+
556+
// Wait for discovery.
557+
replier->wait_discovery();
558+
requester->wait_discovery();
559+
560+
ASSERT_TRUE(replier->is_matched());
561+
ASSERT_TRUE(requester->is_matched());
562+
563+
for (uint16_t count = 0; count < nmsgs; ++count)
564+
{
565+
requester->send(count);
566+
requester->block();
567+
}
568+
569+
delete replier;
570+
delete requester;
571+
}

test/blackbox/common/TCPReqRepHelloWorldReplier.cpp

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,31 @@ void TCPReqRepHelloWorldReplier::wait_discovery(
166166
std::cout << "Replier discovery phase finished" << std::endl;
167167
}
168168

169+
void TCPReqRepHelloWorldReplier::wait_unmatched(
170+
std::chrono::seconds timeout)
171+
{
172+
std::unique_lock<std::mutex> lock(mutexDiscovery_);
173+
174+
std::cout << "Replier waiting until being unmatched..." << std::endl;
175+
176+
if (timeout == std::chrono::seconds::zero())
177+
{
178+
cvDiscovery_.wait(lock, [&]()
179+
{
180+
return !is_matched();
181+
});
182+
}
183+
else
184+
{
185+
cvDiscovery_.wait_for(lock, timeout, [&]()
186+
{
187+
return !is_matched();
188+
});
189+
}
190+
191+
std::cout << "Replier unmatched" << std::endl;
192+
}
193+
169194
void TCPReqRepHelloWorldReplier::matched()
170195
{
171196
std::unique_lock<std::mutex> lock(mutexDiscovery_);
@@ -176,6 +201,16 @@ void TCPReqRepHelloWorldReplier::matched()
176201
}
177202
}
178203

204+
void TCPReqRepHelloWorldReplier::unmatched()
205+
{
206+
std::unique_lock<std::mutex> lock(mutexDiscovery_);
207+
--matched_;
208+
if (!is_matched())
209+
{
210+
cvDiscovery_.notify_one();
211+
}
212+
}
213+
179214
bool TCPReqRepHelloWorldReplier::is_matched()
180215
{
181216
return matched_ > 1;

test/blackbox/common/TCPReqRepHelloWorldReplier.hpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ class TCPReqRepHelloWorldReplier
6969
{
7070
replier_.matched();
7171
}
72+
else if (info.status == eprosima::fastrtps::rtps::REMOVED_MATCHING)
73+
{
74+
replier_.unmatched();
75+
}
7276
}
7377

7478
private:
@@ -132,7 +136,10 @@ class TCPReqRepHelloWorldReplier
132136
uint16_t number);
133137
void wait_discovery(
134138
std::chrono::seconds timeout = std::chrono::seconds::zero());
139+
void wait_unmatched(
140+
std::chrono::seconds timeout = std::chrono::seconds::zero());
135141
void matched();
142+
void unmatched();
136143
bool is_matched();
137144

138145
virtual void configSubscriber(

test/blackbox/common/TCPReqRepHelloWorldRequester.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,12 @@ void TCPReqRepHelloWorldRequester::matched()
202202
}
203203
}
204204

205+
void TCPReqRepHelloWorldRequester::unmatched()
206+
{
207+
std::unique_lock<std::mutex> lock(mutexDiscovery_);
208+
--matched_;
209+
}
210+
205211
bool TCPReqRepHelloWorldRequester::is_matched()
206212
{
207213
return matched_ > 1;

test/blackbox/common/TCPReqRepHelloWorldRequester.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ class TCPReqRepHelloWorldRequester
7070
{
7171
requester_.matched();
7272
}
73+
else if (info.status == eprosima::fastrtps::rtps::REMOVED_MATCHING)
74+
{
75+
requester_.unmatched();
76+
}
7377
}
7478

7579
private:
@@ -136,6 +140,7 @@ class TCPReqRepHelloWorldRequester
136140
void wait_discovery(
137141
std::chrono::seconds timeout = std::chrono::seconds::zero());
138142
void matched();
143+
void unmatched();
139144
void send(
140145
const uint16_t number);
141146
bool is_matched();

test/dds/communication/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ list(APPEND TEST_DEFINITIONS
110110
zero_copy_pub_communication
111111
zero_copy_sub_communication
112112
mix_zero_copy_communication
113+
close_TCP_client
113114
)
114115

115116

@@ -118,6 +119,8 @@ list(APPEND XML_CONFIGURATION_FILES
118119
simple_besteffort.xml
119120
simple_reliable_zerocopy.xml
120121
simple_besteffort_zerocopy.xml
122+
TCP_server.xml
123+
TCP_client.xml
121124
)
122125
# Python file
123126
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test_build.py

test/dds/communication/PublisherModule.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,8 @@ bool PublisherModule::init(
6060
{
6161
std::cout << "Initializing Publisher" << std::endl;
6262

63-
DomainParticipantQos participant_qos;
6463
participant_ =
65-
DomainParticipantFactory::get_instance()->create_participant(seed % 230, participant_qos, this);
64+
DomainParticipantFactory::get_instance()->create_participant(seed % 230, PARTICIPANT_QOS_DEFAULT, this);
6665

6766
if (participant_ == nullptr)
6867
{

test/dds/communication/SubscriberModule.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,9 @@ bool SubscriberModule::init(
6666
<< StatusMask::data_available()
6767
<< StatusMask::liveliness_changed();
6868

69-
DomainParticipantQos participant_qos;
7069
participant_ =
71-
DomainParticipantFactory::get_instance()->create_participant(seed % 230, participant_qos, this, mask);
70+
DomainParticipantFactory::get_instance()->create_participant(seed % 230, PARTICIPANT_QOS_DEFAULT, this,
71+
mask);
7272

7373
if (participant_ == nullptr)
7474
{

test/dds/communication/TCP_client.xml

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<?xml version="1.0" encoding="utf-8" ?>
2+
<dds xmlns="http://www.eprosima.com/XMLSchemas/fastRTPS_Profiles">
3+
<profiles>
4+
<transport_descriptors>
5+
<transport_descriptor>
6+
<transport_id>tcp_transport_client</transport_id>
7+
<type>TCPv4</type>
8+
</transport_descriptor>
9+
</transport_descriptors>
10+
11+
<participant profile_name="TCPClient" is_default_profile="true">
12+
<rtps>
13+
<userTransports>
14+
<transport_id>tcp_transport_client</transport_id>
15+
</userTransports>
16+
<useBuiltinTransports>false</useBuiltinTransports>
17+
<builtin>
18+
<initialPeersList>
19+
<locator>
20+
<tcpv4>
21+
<address>127.0.0.1</address>
22+
<physical_port>5100</physical_port>
23+
</tcpv4>
24+
</locator>
25+
</initialPeersList>
26+
</builtin>
27+
</rtps>
28+
</participant>
29+
</profiles>
30+
</dds>

0 commit comments

Comments
 (0)