Skip to content

Commit 32092e5

Browse files
Fix tsan potential deadlock between StatefulWriter and FlowController (#5432) (#5494)
* Refs #22339: Add BB test Signed-off-by: Mario Dominguez <[email protected]> * Refs #22339: Fix tsan deadlock report Signed-off-by: Mario Dominguez <[email protected]> * Refs #22339: Take writer's mutex before rproxy->stop() and check_acked_status() Signed-off-by: Mario Dominguez <[email protected]> * Refs #22339: Apply Miguels suggestion Signed-off-by: Mario Dominguez <[email protected]> --------- Signed-off-by: Mario Dominguez <[email protected]> (cherry picked from commit 8fcd7ca) Co-authored-by: Mario Domínguez López <[email protected]>
1 parent e16f87d commit 32092e5

File tree

2 files changed

+73
-36
lines changed

2 files changed

+73
-36
lines changed

src/cpp/rtps/writer/StatefulWriter.cpp

+36-36
Original file line numberDiff line numberDiff line change
@@ -1213,55 +1213,58 @@ bool StatefulWriter::matched_reader_remove(
12131213
{
12141214
ReaderProxy* rproxy = nullptr;
12151215
std::unique_lock<RecursiveTimedMutex> lock(mp_mutex);
1216-
std::unique_lock<LocatorSelectorSender> guard_locator_selector_general(locator_selector_general_);
1217-
std::unique_lock<LocatorSelectorSender> guard_locator_selector_async(locator_selector_async_);
12181216

1219-
for (ReaderProxyIterator it = matched_local_readers_.begin();
1220-
it != matched_local_readers_.end(); ++it)
12211217
{
1222-
if ((*it)->guid() == reader_guid)
1223-
{
1224-
EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid);
1225-
rproxy = std::move(*it);
1226-
it = matched_local_readers_.erase(it);
1227-
break;
1228-
}
1229-
}
1218+
std::lock_guard<LocatorSelectorSender> guard_locator_selector_general(locator_selector_general_);
1219+
std::lock_guard<LocatorSelectorSender> guard_locator_selector_async(locator_selector_async_);
12301220

1231-
if (rproxy == nullptr)
1232-
{
1233-
for (ReaderProxyIterator it = matched_datasharing_readers_.begin();
1234-
it != matched_datasharing_readers_.end(); ++it)
1221+
for (ReaderProxyIterator it = matched_local_readers_.begin();
1222+
it != matched_local_readers_.end(); ++it)
12351223
{
12361224
if ((*it)->guid() == reader_guid)
12371225
{
12381226
EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid);
12391227
rproxy = std::move(*it);
1240-
it = matched_datasharing_readers_.erase(it);
1228+
it = matched_local_readers_.erase(it);
12411229
break;
12421230
}
12431231
}
1244-
}
12451232

1246-
if (rproxy == nullptr)
1247-
{
1248-
for (ReaderProxyIterator it = matched_remote_readers_.begin();
1249-
it != matched_remote_readers_.end(); ++it)
1233+
if (rproxy == nullptr)
12501234
{
1251-
if ((*it)->guid() == reader_guid)
1235+
for (ReaderProxyIterator it = matched_datasharing_readers_.begin();
1236+
it != matched_datasharing_readers_.end(); ++it)
12521237
{
1253-
EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid);
1254-
rproxy = std::move(*it);
1255-
it = matched_remote_readers_.erase(it);
1256-
break;
1238+
if ((*it)->guid() == reader_guid)
1239+
{
1240+
EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid);
1241+
rproxy = std::move(*it);
1242+
it = matched_datasharing_readers_.erase(it);
1243+
break;
1244+
}
12571245
}
12581246
}
1259-
}
12601247

1261-
locator_selector_general_.locator_selector.remove_entry(reader_guid);
1262-
locator_selector_async_.locator_selector.remove_entry(reader_guid);
1263-
update_reader_info(locator_selector_general_, false);
1264-
update_reader_info(locator_selector_async_, false);
1248+
if (rproxy == nullptr)
1249+
{
1250+
for (ReaderProxyIterator it = matched_remote_readers_.begin();
1251+
it != matched_remote_readers_.end(); ++it)
1252+
{
1253+
if ((*it)->guid() == reader_guid)
1254+
{
1255+
EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid);
1256+
rproxy = std::move(*it);
1257+
it = matched_remote_readers_.erase(it);
1258+
break;
1259+
}
1260+
}
1261+
}
1262+
1263+
locator_selector_general_.locator_selector.remove_entry(reader_guid);
1264+
locator_selector_async_.locator_selector.remove_entry(reader_guid);
1265+
update_reader_info(locator_selector_general_, false);
1266+
update_reader_info(locator_selector_async_, false);
1267+
}
12651268

12661269
if (get_matched_readers_size() == 0)
12671270
{
@@ -1277,11 +1280,8 @@ bool StatefulWriter::matched_reader_remove(
12771280

12781281
if (nullptr != listener_)
12791282
{
1280-
// call the listener without locks taken
1281-
guard_locator_selector_async.unlock();
1282-
guard_locator_selector_general.unlock();
1283+
// listener is called without locks taken
12831284
lock.unlock();
1284-
12851285
listener_->on_reader_discovery(this, ReaderDiscoveryStatus::REMOVED_READER, reader_guid, nullptr);
12861286
}
12871287

test/blackbox/common/DDSBlackboxTestsBasic.cpp

+37
Original file line numberDiff line numberDiff line change
@@ -1014,6 +1014,43 @@ TEST(DDSBasic, successful_destruction_among_intraprocess_participants)
10141014
}
10151015
}
10161016
}
1017+
TEST(DDSBasic, reliable_volatile_writer_secure_builtin_no_potential_deadlock)
1018+
{
1019+
// Create
1020+
PubSubWriter<HelloWorldPubSubType> writer("HelloWorldTopic_no_potential_deadlock");
1021+
PubSubReader<HelloWorldPubSubType> reader("HelloWorldTopic_no_potential_deadlock");
1022+
1023+
writer.asynchronously(eprosima::fastdds::dds::ASYNCHRONOUS_PUBLISH_MODE)
1024+
.durability_kind(eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS)
1025+
.history_kind(eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS)
1026+
.history_depth(20)
1027+
.init();
1028+
1029+
ASSERT_TRUE(writer.isInitialized());
1030+
1031+
reader.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
1032+
.history_kind(eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS)
1033+
.history_depth(20)
1034+
.durability_kind(eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS)
1035+
.init();
1036+
1037+
ASSERT_TRUE(reader.isInitialized());
1038+
1039+
auto data = default_helloworld_data_generator(30);
1040+
1041+
std::thread th([&]()
1042+
{
1043+
reader.startReception(data);
1044+
reader.block_for_at_least(5);
1045+
});
1046+
1047+
writer.wait_discovery();
1048+
writer.send(data);
1049+
1050+
th.join();
1051+
reader.destroy();
1052+
writer.destroy();
1053+
}
10171054

10181055
} // namespace dds
10191056
} // namespace fastdds

0 commit comments

Comments
 (0)