Skip to content

[22339] Fix tsan potential deadlock between StatefulWriter and FlowController (backport #5432) #5497

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: 2.10.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 40 additions & 35 deletions src/cpp/rtps/writer/StatefulWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1233,55 +1233,58 @@ bool StatefulWriter::matched_reader_remove(
{
ReaderProxy* rproxy = nullptr;
std::unique_lock<RecursiveTimedMutex> lock(mp_mutex);
std::unique_lock<LocatorSelectorSender> guard_locator_selector_general(locator_selector_general_);
std::unique_lock<LocatorSelectorSender> guard_locator_selector_async(locator_selector_async_);

for (ReaderProxyIterator it = matched_local_readers_.begin();
it != matched_local_readers_.end(); ++it)
{
if ((*it)->guid() == reader_guid)
{
EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid);
rproxy = std::move(*it);
it = matched_local_readers_.erase(it);
break;
}
}
std::lock_guard<LocatorSelectorSender> guard_locator_selector_general(locator_selector_general_);
std::lock_guard<LocatorSelectorSender> guard_locator_selector_async(locator_selector_async_);

if (rproxy == nullptr)
{
for (ReaderProxyIterator it = matched_datasharing_readers_.begin();
it != matched_datasharing_readers_.end(); ++it)
for (ReaderProxyIterator it = matched_local_readers_.begin();
it != matched_local_readers_.end(); ++it)
{
if ((*it)->guid() == reader_guid)
{
EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid);
rproxy = std::move(*it);
it = matched_datasharing_readers_.erase(it);
it = matched_local_readers_.erase(it);
break;
}
}
}

if (rproxy == nullptr)
{
for (ReaderProxyIterator it = matched_remote_readers_.begin();
it != matched_remote_readers_.end(); ++it)
if (rproxy == nullptr)
{
if ((*it)->guid() == reader_guid)
for (ReaderProxyIterator it = matched_datasharing_readers_.begin();
it != matched_datasharing_readers_.end(); ++it)
{
EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid);
rproxy = std::move(*it);
it = matched_remote_readers_.erase(it);
break;
if ((*it)->guid() == reader_guid)
{
EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid);
rproxy = std::move(*it);
it = matched_datasharing_readers_.erase(it);
break;
}
}
}
}

locator_selector_general_.locator_selector.remove_entry(reader_guid);
locator_selector_async_.locator_selector.remove_entry(reader_guid);
update_reader_info(locator_selector_general_, false);
update_reader_info(locator_selector_async_, false);
if (rproxy == nullptr)
{
for (ReaderProxyIterator it = matched_remote_readers_.begin();
it != matched_remote_readers_.end(); ++it)
{
if ((*it)->guid() == reader_guid)
{
EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid);
rproxy = std::move(*it);
it = matched_remote_readers_.erase(it);
break;
}
}
}

locator_selector_general_.locator_selector.remove_entry(reader_guid);
locator_selector_async_.locator_selector.remove_entry(reader_guid);
update_reader_info(locator_selector_general_, false);
update_reader_info(locator_selector_async_, false);
}

if (getMatchedReadersSize() == 0)
{
Expand All @@ -1297,12 +1300,14 @@ bool StatefulWriter::matched_reader_remove(

if (nullptr != mp_listener)
{
// call the listener without locks taken
guard_locator_selector_async.unlock();
guard_locator_selector_general.unlock();
// listener is called without locks taken
lock.unlock();
<<<<<<< HEAD

mp_listener->on_reader_discovery(this, ReaderDiscoveryInfo::REMOVED_READER, reader_guid, nullptr);
=======
listener_->on_reader_discovery(this, ReaderDiscoveryStatus::REMOVED_READER, reader_guid, nullptr);
>>>>>>> 8fcd7ca48 (Fix tsan potential deadlock between `StatefulWriter` and `FlowController` (#5432))
}
return true;
}
Expand Down
147 changes: 147 additions & 0 deletions test/blackbox/common/DDSBlackboxTestsBasic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,153 @@ TEST(DDSBasic, max_output_message_size_writer)

}

<<<<<<< HEAD
=======
/**
* @test This test checks that it is possible to register two TypeSupport instances of the same type
* under the same DomainParticipant.
*/
TEST(DDSBasic, register_two_identical_typesupports)
{
// Set DomainParticipantFactory to create disabled entities
DomainParticipantFactory* factory = DomainParticipantFactory::get_instance();
ASSERT_NE(nullptr, factory);

// Create a disabled DomainParticipant, setting it to in turn create disable entities
DomainParticipant* participant = factory->create_participant((uint32_t)GET_PID() % 230, PARTICIPANT_QOS_DEFAULT);
ASSERT_NE(nullptr, participant);

// Register a type support
TypeSupport type_support_1;
type_support_1.reset(new HelloWorldPubSubType());
EXPECT_EQ(RETCODE_OK, participant->register_type(type_support_1));

// Register a second instance of the type support with the same TopicDataType
TypeSupport type_support_2;
type_support_2.reset(new HelloWorldPubSubType());
EXPECT_EQ(RETCODE_OK, participant->register_type(type_support_2));
}

/**
* @test This is a regression test for Redmine Issue 21293.
* The destruction among intra-process participants should be correctly performed.
* local_reader() has to return a valid pointer.
*
*/
TEST(DDSBasic, successful_destruction_among_intraprocess_participants)
{
namespace dds = eprosima::fastdds::dds;
auto factory = dds::DomainParticipantFactory::get_instance();

// Set intraprocess delivery to full
LibrarySettings library_settings;
factory->get_library_settings(library_settings);
auto old_library_settings = library_settings;
library_settings.intraprocess_delivery = INTRAPROCESS_FULL;
factory->set_library_settings(library_settings);

{
auto participant_1 = std::make_shared<PubSubParticipant<HelloWorldPubSubType>>(1u, 1u, 1u, 1u);

ASSERT_TRUE(participant_1->init_participant());
participant_1->pub_topic_name(TEST_TOPIC_NAME);
ASSERT_TRUE(participant_1->init_publisher(0u));
participant_1->sub_topic_name(TEST_TOPIC_NAME + "_Return");
ASSERT_TRUE(participant_1->init_subscriber(0u));

std::vector<std::shared_ptr<PubSubParticipant<HelloWorldPubSubType>>> reception_participants;

size_t num_reception_participants = 50;

for (size_t i = 0; i < num_reception_participants; i++)
{
reception_participants.push_back(std::make_shared<PubSubParticipant<HelloWorldPubSubType>>(1u, 1u, 1u, 1u));
ASSERT_TRUE(reception_participants.back()->init_participant());
reception_participants.back()->sub_topic_name(TEST_TOPIC_NAME);
ASSERT_TRUE(reception_participants.back()->init_subscriber(0u));
reception_participants.back()->pub_topic_name(TEST_TOPIC_NAME + "_Return");
ASSERT_TRUE(reception_participants.back()->init_publisher(0u));
}

participant_1->wait_discovery(std::chrono::seconds::zero(), (uint8_t)num_reception_participants, true);

participant_1->pub_wait_discovery((unsigned int)num_reception_participants);
participant_1->sub_wait_discovery((unsigned int)num_reception_participants);

auto data_12 = default_helloworld_data_generator();

std::thread p1_thread([&participant_1, &data_12]()
{
auto data_size = data_12.size();
for (size_t i = 0; i < data_size; i++)
{
participant_1->send_sample(data_12.back());
data_12.pop_back();
}
});

std::vector<std::thread> reception_threads;
reception_threads.reserve(num_reception_participants);
for (auto& reception_participant : reception_participants)
{
reception_threads.emplace_back([&reception_participant]()
{
auto data_21 = default_helloworld_data_generator();
for (auto& data : data_21)
{
reception_participant->send_sample(data);
}

reception_participant.reset();
});
}

p1_thread.join();
for (auto& rec_thread : reception_threads)
{
rec_thread.join();
}
}
}
TEST(DDSBasic, reliable_volatile_writer_secure_builtin_no_potential_deadlock)
{
// Create
PubSubWriter<HelloWorldPubSubType> writer("HelloWorldTopic_no_potential_deadlock");
PubSubReader<HelloWorldPubSubType> reader("HelloWorldTopic_no_potential_deadlock");

writer.asynchronously(eprosima::fastdds::dds::ASYNCHRONOUS_PUBLISH_MODE)
.durability_kind(eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS)
.history_kind(eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS)
.history_depth(20)
.init();

ASSERT_TRUE(writer.isInitialized());

reader.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
.history_kind(eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS)
.history_depth(20)
.durability_kind(eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS)
.init();

ASSERT_TRUE(reader.isInitialized());

auto data = default_helloworld_data_generator(30);

std::thread th([&]()
{
reader.startReception(data);
reader.block_for_at_least(5);
});

writer.wait_discovery();
writer.send(data);

th.join();
reader.destroy();
writer.destroy();
}

>>>>>>> 8fcd7ca48 (Fix tsan potential deadlock between `StatefulWriter` and `FlowController` (#5432))
} // namespace dds
} // namespace fastdds
} // namespace eprosima