Skip to content

Commit 63df47a

Browse files
Fix tsan potential deadlock between StatefulWriter and FlowController (#5432)
* Refs #22339: Add BB test Signed-off-by: Mario Dominguez <[email protected]> * Refs #22339: Fix tsan deadlock report Signed-off-by: Mario Dominguez <[email protected]> * Refs #22339: Take writer's mutex before rproxy->stop() and check_acked_status() Signed-off-by: Mario Dominguez <[email protected]> * Refs #22339: Apply Miguels suggestion Signed-off-by: Mario Dominguez <[email protected]> --------- Signed-off-by: Mario Dominguez <[email protected]> (cherry picked from commit 8fcd7ca) Fix conflicts Signed-off-by: Eugenio Collado <[email protected]>
1 parent f8e3baf commit 63df47a

File tree

2 files changed

+73
-36
lines changed

2 files changed

+73
-36
lines changed

src/cpp/rtps/writer/StatefulWriter.cpp

+36-36
Original file line numberDiff line numberDiff line change
@@ -1266,55 +1266,58 @@ bool StatefulWriter::matched_reader_remove(
12661266
{
12671267
ReaderProxy* rproxy = nullptr;
12681268
std::unique_lock<RecursiveTimedMutex> lock(mp_mutex);
1269-
std::unique_lock<LocatorSelectorSender> guard_locator_selector_general(locator_selector_general_);
1270-
std::unique_lock<LocatorSelectorSender> guard_locator_selector_async(locator_selector_async_);
12711269

1272-
for (ReaderProxyIterator it = matched_local_readers_.begin();
1273-
it != matched_local_readers_.end(); ++it)
12741270
{
1275-
if ((*it)->guid() == reader_guid)
1276-
{
1277-
EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid);
1278-
rproxy = std::move(*it);
1279-
it = matched_local_readers_.erase(it);
1280-
break;
1281-
}
1282-
}
1271+
std::lock_guard<LocatorSelectorSender> guard_locator_selector_general(locator_selector_general_);
1272+
std::lock_guard<LocatorSelectorSender> guard_locator_selector_async(locator_selector_async_);
12831273

1284-
if (rproxy == nullptr)
1285-
{
1286-
for (ReaderProxyIterator it = matched_datasharing_readers_.begin();
1287-
it != matched_datasharing_readers_.end(); ++it)
1274+
for (ReaderProxyIterator it = matched_local_readers_.begin();
1275+
it != matched_local_readers_.end(); ++it)
12881276
{
12891277
if ((*it)->guid() == reader_guid)
12901278
{
12911279
EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid);
12921280
rproxy = std::move(*it);
1293-
it = matched_datasharing_readers_.erase(it);
1281+
it = matched_local_readers_.erase(it);
12941282
break;
12951283
}
12961284
}
1297-
}
12981285

1299-
if (rproxy == nullptr)
1300-
{
1301-
for (ReaderProxyIterator it = matched_remote_readers_.begin();
1302-
it != matched_remote_readers_.end(); ++it)
1286+
if (rproxy == nullptr)
13031287
{
1304-
if ((*it)->guid() == reader_guid)
1288+
for (ReaderProxyIterator it = matched_datasharing_readers_.begin();
1289+
it != matched_datasharing_readers_.end(); ++it)
13051290
{
1306-
EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid);
1307-
rproxy = std::move(*it);
1308-
it = matched_remote_readers_.erase(it);
1309-
break;
1291+
if ((*it)->guid() == reader_guid)
1292+
{
1293+
EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid);
1294+
rproxy = std::move(*it);
1295+
it = matched_datasharing_readers_.erase(it);
1296+
break;
1297+
}
13101298
}
13111299
}
1312-
}
13131300

1314-
locator_selector_general_.locator_selector.remove_entry(reader_guid);
1315-
locator_selector_async_.locator_selector.remove_entry(reader_guid);
1316-
update_reader_info(locator_selector_general_, false);
1317-
update_reader_info(locator_selector_async_, false);
1301+
if (rproxy == nullptr)
1302+
{
1303+
for (ReaderProxyIterator it = matched_remote_readers_.begin();
1304+
it != matched_remote_readers_.end(); ++it)
1305+
{
1306+
if ((*it)->guid() == reader_guid)
1307+
{
1308+
EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid);
1309+
rproxy = std::move(*it);
1310+
it = matched_remote_readers_.erase(it);
1311+
break;
1312+
}
1313+
}
1314+
}
1315+
1316+
locator_selector_general_.locator_selector.remove_entry(reader_guid);
1317+
locator_selector_async_.locator_selector.remove_entry(reader_guid);
1318+
update_reader_info(locator_selector_general_, false);
1319+
update_reader_info(locator_selector_async_, false);
1320+
}
13181321

13191322
if (getMatchedReadersSize() == 0)
13201323
{
@@ -1330,11 +1333,8 @@ bool StatefulWriter::matched_reader_remove(
13301333

13311334
if (nullptr != mp_listener)
13321335
{
1333-
// call the listener without locks taken
1334-
guard_locator_selector_async.unlock();
1335-
guard_locator_selector_general.unlock();
1336+
// listener is called without locks taken
13361337
lock.unlock();
1337-
13381338
mp_listener->on_reader_discovery(this, ReaderDiscoveryInfo::REMOVED_READER, reader_guid, nullptr);
13391339
}
13401340

test/blackbox/common/DDSBlackboxTestsBasic.cpp

+37
Original file line numberDiff line numberDiff line change
@@ -981,6 +981,43 @@ TEST(DDSBasic, successful_destruction_among_intraprocess_participants)
981981
// Restore library settings
982982
fastrtps::xmlparser::XMLProfileManager::library_settings(old_library_settings);
983983
}
984+
TEST(DDSBasic, reliable_volatile_writer_secure_builtin_no_potential_deadlock)
985+
{
986+
// Create
987+
PubSubWriter<HelloWorldPubSubType> writer("HelloWorldTopic_no_potential_deadlock");
988+
PubSubReader<HelloWorldPubSubType> reader("HelloWorldTopic_no_potential_deadlock");
989+
990+
writer.asynchronously(eprosima::fastdds::dds::ASYNCHRONOUS_PUBLISH_MODE)
991+
.durability_kind(eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS)
992+
.history_kind(eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS)
993+
.history_depth(20)
994+
.init();
995+
996+
ASSERT_TRUE(writer.isInitialized());
997+
998+
reader.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
999+
.history_kind(eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS)
1000+
.history_depth(20)
1001+
.durability_kind(eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS)
1002+
.init();
1003+
1004+
ASSERT_TRUE(reader.isInitialized());
1005+
1006+
auto data = default_helloworld_data_generator(30);
1007+
1008+
std::thread th([&]()
1009+
{
1010+
reader.startReception(data);
1011+
reader.block_for_at_least(5);
1012+
});
1013+
1014+
writer.wait_discovery();
1015+
writer.send(data);
1016+
1017+
th.join();
1018+
reader.destroy();
1019+
writer.destroy();
1020+
}
9841021

9851022
} // namespace dds
9861023
} // namespace fastdds

0 commit comments

Comments
 (0)