Skip to content

Commit 5fcfc55

Browse files
juanlofer-eprosimamergify-bot
authored and
mergify-bot
committed
Fix TCP infinite loop, client shutdown and reconnection (#2470)
* Fix infinite loop in TCP header read Signed-off-by: Juan López Fernández <[email protected]> * Add test for channel disconnection during TCP segment reception Signed-off-by: Juan López Fernández <[email protected]> * Apply suggestions Signed-off-by: Juan López Fernández <[email protected]> * Add issue reference Signed-off-by: Juan López Fernández <[email protected]> * Refs 13540: Unlock TCP client on shutdown Signed-off-by: Eduardo Ponz <[email protected]> * Refs 13540: Update TCP sender resource channel on openning output channel when reusing sender resource Signed-off-by: Eduardo Ponz <[email protected]> * Add TCP client stuck test Signed-off-by: Juan López Fernández <[email protected]> * Add TCP client reconnection test Signed-off-by: Juan López Fernández <[email protected]> * Link tests with issues Signed-off-by: Juan López Fernández <[email protected]> * Uncrustify Signed-off-by: Juan López Fernández <[email protected]> * Rename TCPChannelResource mock class Signed-off-by: Juan López Fernández <[email protected]> Co-authored-by: Eduardo Ponz <[email protected]> (cherry picked from commit 6639a84) # Conflicts: # test/blackbox/common/BlackboxTestsTransportTCP.cpp
1 parent 3ed3d6a commit 5fcfc55

19 files changed

+477
-20
lines changed

examples/C++/DDS/HelloWorldExampleTCP/HelloWorldSubscriber.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ void HelloWorldSubscriber::run(
196196
uint32_t number)
197197
{
198198
std::cout << "[RTCP] Subscriber running until " << number << "samples have been received" << std::endl;
199-
while (number < this->listener_.samples_)
199+
while (number > this->listener_.samples_)
200200
{
201201
std::this_thread::sleep_for(std::chrono::milliseconds(500));
202202
}

src/cpp/rtps/transport/TCPChannelResourceBasic.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,12 +107,13 @@ void TCPChannelResourceBasic::disconnect()
107107
{
108108
auto socket = socket_;
109109

110+
std::error_code ec;
111+
socket->shutdown(asio::ip::tcp::socket::shutdown_both, ec);
112+
110113
service_.post([&, socket]()
111114
{
112115
try
113116
{
114-
std::error_code ec;
115-
socket->shutdown(asio::ip::tcp::socket::shutdown_both, ec);
116117
socket->cancel();
117118

118119
// This method was added on the version 1.12.0
@@ -125,7 +126,6 @@ void TCPChannelResourceBasic::disconnect()
125126
{
126127
}
127128
});
128-
129129
}
130130
}
131131

src/cpp/rtps/transport/TCPTransportInterface.cpp

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,7 @@ bool TCPTransportInterface::CloseInputChannel(
583583
void TCPTransportInterface::close_tcp_socket(
584584
std::shared_ptr<TCPChannelResource>& channel)
585585
{
586-
channel->disable();
586+
channel->disconnect();
587587
// channel.reset(); lead to race conditions because TransportInterface functions used in the callbacks doesn't check validity.
588588
}
589589

@@ -613,6 +613,20 @@ bool TCPTransportInterface::OpenOutputChannel(
613613
//TODO Review with wan ip.
614614
if (tcp_sender_resource && physical_locator == tcp_sender_resource->channel()->locator())
615615
{
616+
// Look for an existing channel that matches this physical locator
617+
auto existing_channel = channel_resources_.find(physical_locator);
618+
// If the channel exists, check if the channel reference in the sender resource needs to be updated with
619+
// the found channel
620+
if (existing_channel != channel_resources_.end() &&
621+
existing_channel->second != tcp_sender_resource->channel())
622+
{
623+
// Disconnect the old channel
624+
tcp_sender_resource->channel()->disconnect();
625+
tcp_sender_resource->channel()->clear();
626+
// Update sender resource with new channel
627+
tcp_sender_resource->channel() = existing_channel->second;
628+
}
629+
// Add logical port to channel if it's not there yet
616630
if (!tcp_sender_resource->channel()->is_logical_port_added(logical_port))
617631
{
618632
tcp_sender_resource->channel()->add_logical_port(logical_port, rtcp_message_manager_.get());
@@ -910,6 +924,10 @@ bool receive_header(
910924
{
911925
return false;
912926
}
927+
else if (!channel->connection_status())
928+
{
929+
return false;
930+
}
913931
}
914932

915933
bytes_needed = TCPHeader::size() - 4;
@@ -925,6 +943,10 @@ bool receive_header(
925943
{
926944
return false;
927945
}
946+
else if (!channel->connection_status())
947+
{
948+
return false;
949+
}
928950
}
929951

930952
return true;
@@ -960,17 +982,22 @@ bool TCPTransportInterface::Receive(
960982
do
961983
{
962984
header_found = receive_header(channel, tcp_header, ec);
963-
} while (!header_found && !ec);
985+
} while (!header_found && !ec && channel->connection_status());
964986

965987
if (ec)
966988
{
967989
if (ec != asio::error::eof)
968990
{
969-
logWarning(DEBUG, "Error reading TCP header: " << ec.message());
991+
logWarning(DEBUG, "Failed to read TCP header: " << ec.message());
970992
}
971993
close_tcp_socket(channel);
972994
success = false;
973995
}
996+
else if (!channel->connection_status())
997+
{
998+
logWarning(DEBUG, "Failed to read TCP header: channel disconnected while reading.");
999+
success = false;
1000+
}
9741001
else
9751002
{
9761003
size_t body_size = tcp_header.length - static_cast<uint32_t>(TCPHeader::size());
@@ -1350,7 +1377,7 @@ void TCPTransportInterface::SocketConnected(
13501377
}
13511378
else
13521379
{
1353-
channel->disable();
1380+
channel->disconnect();
13541381
}
13551382
}
13561383
}

test/blackbox/common/BlackboxTestsTransportTCP.cpp

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,3 +508,89 @@ TEST(BlackBox, TCPv6_copy)
508508
TCPv6TransportDescriptor tcpv6_transport_copy = tcpv6_transport;
509509
EXPECT_EQ(tcpv6_transport_copy, tcpv6_transport);
510510
}
511+
<<<<<<< HEAD
512+
=======
513+
514+
// Test connection is successfully restablished after dropping and relaunching a TCP client (requester)
515+
// Issue -> https://github.com/eProsima/Fast-DDS/issues/2409
516+
TEST(TransportTCP, Client_reconnection)
517+
{
518+
TCPReqRepHelloWorldReplier* replier;
519+
TCPReqRepHelloWorldRequester* requester;
520+
const uint16_t nmsgs = 5;
521+
522+
replier = new TCPReqRepHelloWorldReplier;
523+
replier->init(1, 0, global_port);
524+
525+
ASSERT_TRUE(replier->isInitialized());
526+
527+
requester = new TCPReqRepHelloWorldRequester;
528+
requester->init(0, 0, global_port);
529+
530+
ASSERT_TRUE(requester->isInitialized());
531+
532+
// Wait for discovery.
533+
replier->wait_discovery();
534+
requester->wait_discovery();
535+
536+
ASSERT_TRUE(replier->is_matched());
537+
ASSERT_TRUE(requester->is_matched());
538+
539+
for (uint16_t count = 0; count < nmsgs; ++count)
540+
{
541+
requester->send(count);
542+
requester->block();
543+
}
544+
545+
// Release TCP client resources.
546+
delete requester;
547+
548+
// Wait until unmatched.
549+
replier->wait_unmatched();
550+
ASSERT_FALSE(replier->is_matched());
551+
552+
// Create new TCP client instance.
553+
requester = new TCPReqRepHelloWorldRequester;
554+
requester->init(0, 0, global_port);
555+
556+
ASSERT_TRUE(requester->isInitialized());
557+
558+
// Wait for discovery.
559+
replier->wait_discovery();
560+
requester->wait_discovery();
561+
562+
ASSERT_TRUE(replier->is_matched());
563+
ASSERT_TRUE(requester->is_matched());
564+
565+
for (uint16_t count = 0; count < nmsgs; ++count)
566+
{
567+
requester->send(count);
568+
requester->block();
569+
}
570+
571+
delete replier;
572+
delete requester;
573+
}
574+
575+
#ifdef INSTANTIATE_TEST_SUITE_P
576+
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
577+
#else
578+
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_CASE_P(x, y, z, w)
579+
#endif // ifdef INSTANTIATE_TEST_SUITE_P
580+
581+
GTEST_INSTANTIATE_TEST_MACRO(TransportTCP,
582+
TransportTCP,
583+
testing::Combine(testing::Values(TRANSPORT), testing::Values(false, true)),
584+
[](const testing::TestParamInfo<TransportTCP::ParamType>& info)
585+
{
586+
bool ipv6 = std::get<1>(info.param);
587+
std::string suffix = ipv6 ? "TCPv6" : "TCPv4";
588+
switch (std::get<0>(info.param))
589+
{
590+
case TRANSPORT:
591+
default:
592+
return "Transport" + suffix;
593+
}
594+
595+
});
596+
>>>>>>> 6639a84b7 (Fix TCP infinite loop, client shutdown and reconnection (#2470))

test/blackbox/common/TCPReqRepHelloWorldReplier.cpp

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,31 @@ void TCPReqRepHelloWorldReplier::wait_discovery(
166166
std::cout << "Replier discovery phase finished" << std::endl;
167167
}
168168

169+
void TCPReqRepHelloWorldReplier::wait_unmatched(
170+
std::chrono::seconds timeout)
171+
{
172+
std::unique_lock<std::mutex> lock(mutexDiscovery_);
173+
174+
std::cout << "Replier waiting until being unmatched..." << std::endl;
175+
176+
if (timeout == std::chrono::seconds::zero())
177+
{
178+
cvDiscovery_.wait(lock, [&]()
179+
{
180+
return !is_matched();
181+
});
182+
}
183+
else
184+
{
185+
cvDiscovery_.wait_for(lock, timeout, [&]()
186+
{
187+
return !is_matched();
188+
});
189+
}
190+
191+
std::cout << "Replier unmatched" << std::endl;
192+
}
193+
169194
void TCPReqRepHelloWorldReplier::matched()
170195
{
171196
std::unique_lock<std::mutex> lock(mutexDiscovery_);
@@ -176,6 +201,16 @@ void TCPReqRepHelloWorldReplier::matched()
176201
}
177202
}
178203

204+
void TCPReqRepHelloWorldReplier::unmatched()
205+
{
206+
std::unique_lock<std::mutex> lock(mutexDiscovery_);
207+
--matched_;
208+
if (!is_matched())
209+
{
210+
cvDiscovery_.notify_one();
211+
}
212+
}
213+
179214
bool TCPReqRepHelloWorldReplier::is_matched()
180215
{
181216
return matched_ > 1;

test/blackbox/common/TCPReqRepHelloWorldReplier.hpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ class TCPReqRepHelloWorldReplier
6969
{
7070
replier_.matched();
7171
}
72+
else if (info.status == eprosima::fastrtps::rtps::REMOVED_MATCHING)
73+
{
74+
replier_.unmatched();
75+
}
7276
}
7377

7478
private:
@@ -132,7 +136,10 @@ class TCPReqRepHelloWorldReplier
132136
uint16_t number);
133137
void wait_discovery(
134138
std::chrono::seconds timeout = std::chrono::seconds::zero());
139+
void wait_unmatched(
140+
std::chrono::seconds timeout = std::chrono::seconds::zero());
135141
void matched();
142+
void unmatched();
136143
bool is_matched();
137144

138145
virtual void configSubscriber(

test/blackbox/common/TCPReqRepHelloWorldRequester.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,12 @@ void TCPReqRepHelloWorldRequester::matched()
202202
}
203203
}
204204

205+
void TCPReqRepHelloWorldRequester::unmatched()
206+
{
207+
std::unique_lock<std::mutex> lock(mutexDiscovery_);
208+
--matched_;
209+
}
210+
205211
bool TCPReqRepHelloWorldRequester::is_matched()
206212
{
207213
return matched_ > 1;

test/blackbox/common/TCPReqRepHelloWorldRequester.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ class TCPReqRepHelloWorldRequester
7070
{
7171
requester_.matched();
7272
}
73+
else if (info.status == eprosima::fastrtps::rtps::REMOVED_MATCHING)
74+
{
75+
requester_.unmatched();
76+
}
7377
}
7478

7579
private:
@@ -136,6 +140,7 @@ class TCPReqRepHelloWorldRequester
136140
void wait_discovery(
137141
std::chrono::seconds timeout = std::chrono::seconds::zero());
138142
void matched();
143+
void unmatched();
139144
void send(
140145
const uint16_t number);
141146
bool is_matched();

test/dds/communication/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ list(APPEND TEST_DEFINITIONS
110110
zero_copy_pub_communication
111111
zero_copy_sub_communication
112112
mix_zero_copy_communication
113+
close_TCP_client
113114
)
114115

115116

@@ -118,6 +119,8 @@ list(APPEND XML_CONFIGURATION_FILES
118119
simple_besteffort.xml
119120
simple_reliable_zerocopy.xml
120121
simple_besteffort_zerocopy.xml
122+
TCP_server.xml
123+
TCP_client.xml
121124
)
122125
# Python file
123126
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test_build.py

test/dds/communication/PublisherModule.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,8 @@ bool PublisherModule::init(
6060
{
6161
std::cout << "Initializing Publisher" << std::endl;
6262

63-
DomainParticipantQos participant_qos;
6463
participant_ =
65-
DomainParticipantFactory::get_instance()->create_participant(seed % 230, participant_qos, this);
64+
DomainParticipantFactory::get_instance()->create_participant(seed % 230, PARTICIPANT_QOS_DEFAULT, this);
6665

6766
if (participant_ == nullptr)
6867
{

test/dds/communication/SubscriberModule.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,9 @@ bool SubscriberModule::init(
6666
<< StatusMask::data_available()
6767
<< StatusMask::liveliness_changed();
6868

69-
DomainParticipantQos participant_qos;
7069
participant_ =
71-
DomainParticipantFactory::get_instance()->create_participant(seed % 230, participant_qos, this, mask);
70+
DomainParticipantFactory::get_instance()->create_participant(seed % 230, PARTICIPANT_QOS_DEFAULT, this,
71+
mask);
7272

7373
if (participant_ == nullptr)
7474
{

test/dds/communication/TCP_client.xml

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<?xml version="1.0" encoding="utf-8" ?>
2+
<dds xmlns="http://www.eprosima.com/XMLSchemas/fastRTPS_Profiles">
3+
<profiles>
4+
<transport_descriptors>
5+
<transport_descriptor>
6+
<transport_id>tcp_transport_client</transport_id>
7+
<type>TCPv4</type>
8+
</transport_descriptor>
9+
</transport_descriptors>
10+
11+
<participant profile_name="TCPClient" is_default_profile="true">
12+
<rtps>
13+
<userTransports>
14+
<transport_id>tcp_transport_client</transport_id>
15+
</userTransports>
16+
<useBuiltinTransports>false</useBuiltinTransports>
17+
<builtin>
18+
<initialPeersList>
19+
<locator>
20+
<tcpv4>
21+
<address>127.0.0.1</address>
22+
<physical_port>5100</physical_port>
23+
</tcpv4>
24+
</locator>
25+
</initialPeersList>
26+
</builtin>
27+
</rtps>
28+
</participant>
29+
</profiles>
30+
</dds>

0 commit comments

Comments
 (0)