Skip to content

Commit 4a6b934

Browse files
Fix topic interference on liveliness_changed status (#4988) (#5032)
* Fix topic interference on `liveliness_changed` status (#4988) * Refs #21189. Basic infrastructure for ROS2 blackbox tests. Signed-off-by: Miguel Company <[email protected]> * Refs #21189. Added callback for liveliness_changed event. Signed-off-by: Miguel Company <[email protected]> * Refs #21189. Added ROS2 regression test. Signed-off-by: Miguel Company <[email protected]> * Refs #21189. Added blackbox regression test. Signed-off-by: Miguel Company <[email protected]> * Refs #21189. Fix StatefulReader. Signed-off-by: Miguel Company <[email protected]> * Refs #21189. Fix StatelessReader. Signed-off-by: Miguel Company <[email protected]> * Refs #21189. Uncrustify and doxygen. Signed-off-by: Miguel Company <[email protected]> * Refs #21245. Change liveliness announcement periods. Signed-off-by: Miguel Company <[email protected]> * Refs #21245. Create ROS 2 builtin endpoints. Signed-off-by: Miguel Company <[email protected]> * Refs #21245. Fix type and QoS on ros_discovery_info. Signed-off-by: Miguel Company <[email protected]> * Refs #21245. Avoid collision with true ROS 2 topics. Signed-off-by: Miguel Company <[email protected]> * Refs #21245. Uncrustify. Signed-off-by: Miguel Company <[email protected]> * Refs #21245. Fix warnings on MacOS. Signed-off-by: Miguel Company <[email protected]> * Refs #21245. Fix tsan reported deadlock. Signed-off-by: Miguel Company <[email protected]> * Refs #21189. Fix long-standing deadlock in WLP. Signed-off-by: Miguel Company <[email protected]> * Refs #21189. Fix build after rebase. Signed-off-by: Miguel Company <[email protected]> --------- Signed-off-by: Miguel Company <[email protected]> (cherry picked from commit 9243ead) # Conflicts: # src/cpp/rtps/reader/StatefulReader.cpp # src/cpp/rtps/reader/StatelessReader.cpp # test/blackbox/common/BlackboxTestsLivelinessQos.cpp * Fix conflicts Signed-off-by: Miguel Company <[email protected]> * Fix build after backport. Signed-off-by: Miguel Company <[email protected]> --------- Signed-off-by: Miguel Company <[email protected]> Co-authored-by: Miguel Company <[email protected]>
1 parent a78316b commit 4a6b934

13 files changed

+1343
-61
lines changed

src/cpp/rtps/builtin/liveliness/WLP.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -860,10 +860,11 @@ bool WLP::remove_local_reader(
860860

861861
bool WLP::automatic_liveliness_assertion()
862862
{
863-
std::lock_guard<std::recursive_mutex> guard(*mp_builtinProtocols->mp_PDP->getMutex());
863+
std::unique_lock<std::recursive_mutex> lock(*mp_builtinProtocols->mp_PDP->getMutex());
864864

865865
if (0 < automatic_writers_.size())
866866
{
867+
lock.unlock();
867868
return send_liveliness_message(automatic_instance_handle_);
868869
}
869870

src/cpp/rtps/reader/StatefulReader.cpp

+32-32
Original file line numberDiff line numberDiff line change
@@ -373,40 +373,11 @@ bool StatefulReader::matched_writer_remove(
373373
const GUID_t& writer_guid,
374374
bool removed_by_lease)
375375
{
376-
377-
if (is_alive_ && liveliness_lease_duration_ < c_TimeInfinite)
378-
{
379-
auto wlp = this->mp_RTPSParticipant->wlp();
380-
if ( wlp != nullptr)
381-
{
382-
LivelinessData::WriterStatus writer_liveliness_status;
383-
wlp->sub_liveliness_manager_->remove_writer(
384-
writer_guid,
385-
liveliness_kind_,
386-
liveliness_lease_duration_,
387-
writer_liveliness_status);
388-
389-
if (writer_liveliness_status == LivelinessData::WriterStatus::ALIVE)
390-
{
391-
wlp->update_liveliness_changed_status(writer_guid, this, -1, 0);
392-
}
393-
else if (writer_liveliness_status == LivelinessData::WriterStatus::NOT_ALIVE)
394-
{
395-
wlp->update_liveliness_changed_status(writer_guid, this, 0, -1);
396-
}
397-
398-
}
399-
else
400-
{
401-
EPROSIMA_LOG_ERROR(RTPS_LIVELINESS,
402-
"Finite liveliness lease duration but WLP not enabled, cannot remove writer");
403-
}
404-
}
405-
406-
std::unique_lock<RecursiveTimedMutex> lock(mp_mutex);
407376
WriterProxy* wproxy = nullptr;
408377
if (is_alive_)
409378
{
379+
std::unique_lock<RecursiveTimedMutex> lock(mp_mutex);
380+
410381
//Remove cachechanges belonging to the unmatched writer
411382
mp_history->writer_unmatched(writer_guid, get_last_notified(writer_guid));
412383

@@ -468,7 +439,36 @@ bool StatefulReader::matched_writer_remove(
468439
}
469440
}
470441

471-
return (wproxy != nullptr);
442+
bool ret_val = (wproxy != nullptr);
443+
if (ret_val && liveliness_lease_duration_ < c_TimeInfinite)
444+
{
445+
auto wlp = this->mp_RTPSParticipant->wlp();
446+
if ( wlp != nullptr)
447+
{
448+
LivelinessData::WriterStatus writer_liveliness_status;
449+
wlp->sub_liveliness_manager_->remove_writer(
450+
writer_guid,
451+
liveliness_kind_,
452+
liveliness_lease_duration_,
453+
writer_liveliness_status);
454+
455+
if (writer_liveliness_status == LivelinessData::WriterStatus::ALIVE)
456+
{
457+
wlp->update_liveliness_changed_status(writer_guid, this, -1, 0);
458+
}
459+
else if (writer_liveliness_status == LivelinessData::WriterStatus::NOT_ALIVE)
460+
{
461+
wlp->update_liveliness_changed_status(writer_guid, this, 0, -1);
462+
}
463+
}
464+
else
465+
{
466+
EPROSIMA_LOG_ERROR(RTPS_LIVELINESS,
467+
"Finite liveliness lease duration but WLP not enabled, cannot remove writer");
468+
}
469+
}
470+
471+
return ret_val;
472472
}
473473

474474
bool StatefulReader::matched_writer_is_matched(

src/cpp/rtps/reader/StatelessReader.cpp

+33-28
Original file line numberDiff line numberDiff line change
@@ -223,33 +223,8 @@ bool StatelessReader::matched_writer_remove(
223223
const GUID_t& writer_guid,
224224
bool removed_by_lease)
225225
{
226-
if (liveliness_lease_duration_ < c_TimeInfinite)
227-
{
228-
auto wlp = mp_RTPSParticipant->wlp();
229-
if ( wlp != nullptr)
230-
{
231-
LivelinessData::WriterStatus writer_liveliness_status;
232-
wlp->sub_liveliness_manager_->remove_writer(
233-
writer_guid,
234-
liveliness_kind_,
235-
liveliness_lease_duration_,
236-
writer_liveliness_status);
226+
bool ret_val = false;
237227

238-
if (writer_liveliness_status == LivelinessData::WriterStatus::ALIVE)
239-
{
240-
wlp->update_liveliness_changed_status(writer_guid, this, -1, 0);
241-
}
242-
else if (writer_liveliness_status == LivelinessData::WriterStatus::NOT_ALIVE)
243-
{
244-
wlp->update_liveliness_changed_status(writer_guid, this, 0, -1);
245-
}
246-
}
247-
else
248-
{
249-
EPROSIMA_LOG_ERROR(RTPS_LIVELINESS,
250-
"Finite liveliness lease duration but WLP not enabled, cannot remove writer");
251-
}
252-
}
253228
{
254229
std::unique_lock<RecursiveTimedMutex> guard(mp_mutex);
255230

@@ -289,11 +264,41 @@ bool StatelessReader::matched_writer_remove(
289264
}
290265
#endif //FASTDDS_STATISTICS
291266

292-
return true;
267+
ret_val = true;
268+
break;
293269
}
294270
}
295271
}
296-
return false;
272+
273+
if (ret_val && liveliness_lease_duration_ < c_TimeInfinite)
274+
{
275+
auto wlp = mp_RTPSParticipant->wlp();
276+
if ( wlp != nullptr)
277+
{
278+
LivelinessData::WriterStatus writer_liveliness_status;
279+
wlp->sub_liveliness_manager_->remove_writer(
280+
writer_guid,
281+
liveliness_kind_,
282+
liveliness_lease_duration_,
283+
writer_liveliness_status);
284+
285+
if (writer_liveliness_status == LivelinessData::WriterStatus::ALIVE)
286+
{
287+
wlp->update_liveliness_changed_status(writer_guid, this, -1, 0);
288+
}
289+
else if (writer_liveliness_status == LivelinessData::WriterStatus::NOT_ALIVE)
290+
{
291+
wlp->update_liveliness_changed_status(writer_guid, this, 0, -1);
292+
}
293+
}
294+
else
295+
{
296+
EPROSIMA_LOG_ERROR(RTPS_LIVELINESS,
297+
"Finite liveliness lease duration but WLP not enabled, cannot remove writer");
298+
}
299+
}
300+
301+
return ret_val;
297302
}
298303

299304
bool StatelessReader::matched_writer_is_matched(

test/blackbox/common/BlackboxTestsLivelinessQos.cpp

+170
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
#include "BlackboxTests.hpp"
1616

17+
#include <string>
1718
#include <thread>
1819

1920
#include "PubSubReader.hpp"
@@ -2039,6 +2040,175 @@ TEST(LivelinessTests, correct_liveliness_state_one_writer_multiple_readers)
20392040
ASSERT_EQ(reader.sub_wait_liveliness_lost_for(2, std::chrono::seconds(4)), 2u);
20402041
}
20412042

2043+
/**
2044+
* This is a regression test for redmine issue #21189.
2045+
*
2046+
* The test ensures that liveliness changed status is not affected by writers on a topic different from
2047+
* the one of the reader.
2048+
*
2049+
* The test creates two readers and two writers, each reader and writer pair on a different topic.
2050+
* Writing a sample on one writer should not affect the liveliness changed status of the other reader.
2051+
* Destroying the writer should not affect the liveliness changed status of the other reader.
2052+
*/
2053+
static void test_liveliness_qos_independent_topics(
2054+
const std::string& topic_name,
2055+
eprosima::fastdds::dds::ReliabilityQosPolicyKind reliability_kind)
2056+
{
2057+
const auto lease_dutation_time = std::chrono::seconds(1);
2058+
const eprosima::fastrtps::Duration_t lease_duration(1, 0);
2059+
const eprosima::fastrtps::Duration_t announcement_period(0, 250000000);
2060+
2061+
PubSubReader<HelloWorldPubSubType> reader1(topic_name + "1");
2062+
PubSubReader<HelloWorldPubSubType> reader2(topic_name + "2");
2063+
2064+
PubSubWriter<HelloWorldPubSubType> writer1(topic_name + "1");
2065+
PubSubWriter<HelloWorldPubSubType> writer2(topic_name + "2");
2066+
2067+
// Configure and start the readers
2068+
reader1.liveliness_kind(eprosima::fastdds::dds::AUTOMATIC_LIVELINESS_QOS)
2069+
.liveliness_lease_duration(lease_duration)
2070+
.reliability(reliability_kind)
2071+
.init();
2072+
reader2.liveliness_kind(eprosima::fastdds::dds::AUTOMATIC_LIVELINESS_QOS)
2073+
.liveliness_lease_duration(lease_duration)
2074+
.reliability(reliability_kind)
2075+
.init();
2076+
2077+
// Configure and start the writers
2078+
writer1.liveliness_kind(eprosima::fastdds::dds::AUTOMATIC_LIVELINESS_QOS)
2079+
.liveliness_lease_duration(lease_duration)
2080+
.liveliness_announcement_period(announcement_period)
2081+
.reliability(reliability_kind)
2082+
.init();
2083+
writer2.liveliness_kind(eprosima::fastdds::dds::AUTOMATIC_LIVELINESS_QOS)
2084+
.liveliness_lease_duration(lease_duration)
2085+
.liveliness_announcement_period(announcement_period)
2086+
.reliability(reliability_kind)
2087+
.init();
2088+
2089+
// Wait for discovery
2090+
reader1.wait_discovery();
2091+
reader2.wait_discovery();
2092+
writer1.wait_discovery();
2093+
writer2.wait_discovery();
2094+
2095+
HelloWorldPubSubType::type data;
2096+
2097+
// Write a sample on writer1 and wait for reader1 to assert writer1's liveliness
2098+
writer1.send_sample(data);
2099+
reader1.wait_liveliness_recovered();
2100+
2101+
// Check liveliness changed status on both readers
2102+
{
2103+
auto liveliness = reader1.liveliness_changed_status();
2104+
EXPECT_EQ(liveliness.alive_count, 1);
2105+
EXPECT_EQ(liveliness.not_alive_count, 0);
2106+
}
2107+
2108+
{
2109+
auto liveliness = reader2.liveliness_changed_status();
2110+
EXPECT_EQ(liveliness.alive_count, 0);
2111+
EXPECT_EQ(liveliness.not_alive_count, 0);
2112+
}
2113+
2114+
// Write a sample on writer2 and wait for reader2 to assert writer2's liveliness
2115+
writer2.send_sample(data);
2116+
reader2.wait_liveliness_recovered();
2117+
2118+
// Check liveliness changed status on both readers
2119+
{
2120+
auto liveliness = reader1.liveliness_changed_status();
2121+
EXPECT_EQ(liveliness.alive_count, 1);
2122+
EXPECT_EQ(liveliness.not_alive_count, 0);
2123+
}
2124+
2125+
{
2126+
auto liveliness = reader2.liveliness_changed_status();
2127+
EXPECT_EQ(liveliness.alive_count, 1);
2128+
EXPECT_EQ(liveliness.not_alive_count, 0);
2129+
}
2130+
2131+
// Destroy writer2 and wait twice the lease duration time
2132+
writer2.destroy();
2133+
std::this_thread::sleep_for(lease_dutation_time * 2);
2134+
2135+
// Check liveliness changed status on both readers
2136+
{
2137+
auto liveliness = reader1.liveliness_changed_status();
2138+
EXPECT_EQ(liveliness.alive_count, 1);
2139+
EXPECT_EQ(liveliness.not_alive_count, 0);
2140+
}
2141+
2142+
{
2143+
auto liveliness = reader2.liveliness_changed_status();
2144+
EXPECT_EQ(liveliness.alive_count, 0);
2145+
EXPECT_EQ(liveliness.not_alive_count, 0);
2146+
}
2147+
2148+
// Start writer2 again and wait for reader2 to assert writer2's liveliness
2149+
writer2.init();
2150+
reader2.wait_discovery();
2151+
writer2.send_sample(data);
2152+
reader2.wait_liveliness_recovered(2);
2153+
2154+
// Check liveliness changed status on both readers
2155+
{
2156+
auto liveliness = reader1.liveliness_changed_status();
2157+
EXPECT_EQ(liveliness.alive_count, 1);
2158+
EXPECT_EQ(liveliness.not_alive_count, 0);
2159+
}
2160+
2161+
{
2162+
auto liveliness = reader2.liveliness_changed_status();
2163+
EXPECT_EQ(liveliness.alive_count, 1);
2164+
EXPECT_EQ(liveliness.not_alive_count, 0);
2165+
}
2166+
2167+
// Destroy writer1 and wait twice the lease duration time
2168+
writer1.destroy();
2169+
std::this_thread::sleep_for(lease_dutation_time * 2);
2170+
2171+
// Check liveliness changed status on both readers
2172+
{
2173+
auto liveliness = reader1.liveliness_changed_status();
2174+
EXPECT_EQ(liveliness.alive_count, 0);
2175+
EXPECT_EQ(liveliness.not_alive_count, 0);
2176+
}
2177+
2178+
{
2179+
auto liveliness = reader2.liveliness_changed_status();
2180+
EXPECT_EQ(liveliness.alive_count, 1);
2181+
EXPECT_EQ(liveliness.not_alive_count, 0);
2182+
}
2183+
2184+
// Destroy writer2 and wait twice the lease duration time
2185+
writer2.destroy();
2186+
std::this_thread::sleep_for(lease_dutation_time * 2);
2187+
2188+
// Check liveliness changed status on both readers
2189+
{
2190+
auto liveliness = reader1.liveliness_changed_status();
2191+
EXPECT_EQ(liveliness.alive_count, 0);
2192+
EXPECT_EQ(liveliness.not_alive_count, 0);
2193+
}
2194+
2195+
{
2196+
auto liveliness = reader2.liveliness_changed_status();
2197+
EXPECT_EQ(liveliness.alive_count, 0);
2198+
EXPECT_EQ(liveliness.not_alive_count, 0);
2199+
}
2200+
}
2201+
2202+
TEST_P(LivelinessQos, IndependentTopics_reliable)
2203+
{
2204+
test_liveliness_qos_independent_topics(TEST_TOPIC_NAME, eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS);
2205+
}
2206+
2207+
TEST_P(LivelinessQos, IndependentTopics_besteffort)
2208+
{
2209+
test_liveliness_qos_independent_topics(TEST_TOPIC_NAME, eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS);
2210+
}
2211+
20422212
#ifdef INSTANTIATE_TEST_SUITE_P
20432213
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
20442214
#else

0 commit comments

Comments
 (0)