Skip to content

Commit 8fcd7ca

Browse files
authored
Fix tsan potential deadlock between StatefulWriter and FlowController (#5432)
* Refs #22339: Add BB test Signed-off-by: Mario Dominguez <[email protected]> * Refs #22339: Fix tsan deadlock report Signed-off-by: Mario Dominguez <[email protected]> * Refs #22339: Take writer's mutex before rproxy->stop() and check_acked_status() Signed-off-by: Mario Dominguez <[email protected]> * Refs #22339: Apply Miguels suggestion Signed-off-by: Mario Dominguez <[email protected]> --------- Signed-off-by: Mario Dominguez <[email protected]>
1 parent 0deeb14 commit 8fcd7ca

File tree

2 files changed

+73
-36
lines changed

2 files changed

+73
-36
lines changed

src/cpp/rtps/writer/StatefulWriter.cpp

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1218,55 +1218,58 @@ bool StatefulWriter::matched_reader_remove(
12181218
{
12191219
ReaderProxy* rproxy = nullptr;
12201220
std::unique_lock<RecursiveTimedMutex> lock(mp_mutex);
1221-
std::unique_lock<LocatorSelectorSender> guard_locator_selector_general(locator_selector_general_);
1222-
std::unique_lock<LocatorSelectorSender> guard_locator_selector_async(locator_selector_async_);
12231221

1224-
for (ReaderProxyIterator it = matched_local_readers_.begin();
1225-
it != matched_local_readers_.end(); ++it)
12261222
{
1227-
if ((*it)->guid() == reader_guid)
1228-
{
1229-
EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid);
1230-
rproxy = std::move(*it);
1231-
it = matched_local_readers_.erase(it);
1232-
break;
1233-
}
1234-
}
1223+
std::lock_guard<LocatorSelectorSender> guard_locator_selector_general(locator_selector_general_);
1224+
std::lock_guard<LocatorSelectorSender> guard_locator_selector_async(locator_selector_async_);
12351225

1236-
if (rproxy == nullptr)
1237-
{
1238-
for (ReaderProxyIterator it = matched_datasharing_readers_.begin();
1239-
it != matched_datasharing_readers_.end(); ++it)
1226+
for (ReaderProxyIterator it = matched_local_readers_.begin();
1227+
it != matched_local_readers_.end(); ++it)
12401228
{
12411229
if ((*it)->guid() == reader_guid)
12421230
{
12431231
EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid);
12441232
rproxy = std::move(*it);
1245-
it = matched_datasharing_readers_.erase(it);
1233+
it = matched_local_readers_.erase(it);
12461234
break;
12471235
}
12481236
}
1249-
}
12501237

1251-
if (rproxy == nullptr)
1252-
{
1253-
for (ReaderProxyIterator it = matched_remote_readers_.begin();
1254-
it != matched_remote_readers_.end(); ++it)
1238+
if (rproxy == nullptr)
12551239
{
1256-
if ((*it)->guid() == reader_guid)
1240+
for (ReaderProxyIterator it = matched_datasharing_readers_.begin();
1241+
it != matched_datasharing_readers_.end(); ++it)
12571242
{
1258-
EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid);
1259-
rproxy = std::move(*it);
1260-
it = matched_remote_readers_.erase(it);
1261-
break;
1243+
if ((*it)->guid() == reader_guid)
1244+
{
1245+
EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid);
1246+
rproxy = std::move(*it);
1247+
it = matched_datasharing_readers_.erase(it);
1248+
break;
1249+
}
12621250
}
12631251
}
1264-
}
12651252

1266-
locator_selector_general_.locator_selector.remove_entry(reader_guid);
1267-
locator_selector_async_.locator_selector.remove_entry(reader_guid);
1268-
update_reader_info(locator_selector_general_, false);
1269-
update_reader_info(locator_selector_async_, false);
1253+
if (rproxy == nullptr)
1254+
{
1255+
for (ReaderProxyIterator it = matched_remote_readers_.begin();
1256+
it != matched_remote_readers_.end(); ++it)
1257+
{
1258+
if ((*it)->guid() == reader_guid)
1259+
{
1260+
EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid);
1261+
rproxy = std::move(*it);
1262+
it = matched_remote_readers_.erase(it);
1263+
break;
1264+
}
1265+
}
1266+
}
1267+
1268+
locator_selector_general_.locator_selector.remove_entry(reader_guid);
1269+
locator_selector_async_.locator_selector.remove_entry(reader_guid);
1270+
update_reader_info(locator_selector_general_, false);
1271+
update_reader_info(locator_selector_async_, false);
1272+
}
12701273

12711274
if (get_matched_readers_size() == 0)
12721275
{
@@ -1282,11 +1285,8 @@ bool StatefulWriter::matched_reader_remove(
12821285

12831286
if (nullptr != listener_)
12841287
{
1285-
// call the listener without locks taken
1286-
guard_locator_selector_async.unlock();
1287-
guard_locator_selector_general.unlock();
1288+
// listener is called without locks taken
12881289
lock.unlock();
1289-
12901290
listener_->on_reader_discovery(this, ReaderDiscoveryStatus::REMOVED_READER, reader_guid, nullptr);
12911291
}
12921292

test/blackbox/common/DDSBlackboxTestsBasic.cpp

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1014,6 +1014,43 @@ TEST(DDSBasic, successful_destruction_among_intraprocess_participants)
10141014
}
10151015
}
10161016
}
1017+
TEST(DDSBasic, reliable_volatile_writer_secure_builtin_no_potential_deadlock)
1018+
{
1019+
// Create
1020+
PubSubWriter<HelloWorldPubSubType> writer("HelloWorldTopic_no_potential_deadlock");
1021+
PubSubReader<HelloWorldPubSubType> reader("HelloWorldTopic_no_potential_deadlock");
1022+
1023+
writer.asynchronously(eprosima::fastdds::dds::ASYNCHRONOUS_PUBLISH_MODE)
1024+
.durability_kind(eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS)
1025+
.history_kind(eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS)
1026+
.history_depth(20)
1027+
.init();
1028+
1029+
ASSERT_TRUE(writer.isInitialized());
1030+
1031+
reader.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
1032+
.history_kind(eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS)
1033+
.history_depth(20)
1034+
.durability_kind(eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS)
1035+
.init();
1036+
1037+
ASSERT_TRUE(reader.isInitialized());
1038+
1039+
auto data = default_helloworld_data_generator(30);
1040+
1041+
std::thread th([&]()
1042+
{
1043+
reader.startReception(data);
1044+
reader.block_for_at_least(5);
1045+
});
1046+
1047+
writer.wait_discovery();
1048+
writer.send(data);
1049+
1050+
th.join();
1051+
reader.destroy();
1052+
writer.destroy();
1053+
}
10171054

10181055
} // namespace dds
10191056
} // namespace fastdds

0 commit comments

Comments
 (0)