diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index f96385200a8..8d11ce792c6 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -1961,6 +1961,7 @@ bool RTPSParticipantImpl::deleteUserEndpoint( bool found = false, found_in_users = false; Endpoint* p_endpoint = nullptr; BaseReader* reader = nullptr; + BaseWriter* writer {nullptr}; if (endpoint.entityId.is_writer()) { @@ -1970,6 +1971,7 @@ bool RTPSParticipantImpl::deleteUserEndpoint( { if ((*wit)->getGuid().entityId == endpoint.entityId) //Found it { + writer = *wit; m_userWriterList.erase(wit); found_in_users = true; break; @@ -1980,6 +1982,7 @@ bool RTPSParticipantImpl::deleteUserEndpoint( { if ((*wit)->getGuid().entityId == endpoint.entityId) //Found it { + writer = *wit; p_endpoint = *wit; m_allWriterList.erase(wit); found = true; @@ -2069,6 +2072,10 @@ bool RTPSParticipantImpl::deleteUserEndpoint( { reader->local_actions_on_reader_removed(); } + else if (writer) + { + writer->local_actions_on_writer_removed(); + } delete(p_endpoint); return true; } @@ -2160,6 +2167,10 @@ void RTPSParticipantImpl::deleteAllUserEndpoints() { static_cast(endpoint)->local_actions_on_reader_removed(); } + else if (WRITER == kind) + { + static_cast(endpoint)->local_actions_on_writer_removed(); + } // remove the endpoints delete(endpoint); diff --git a/src/cpp/rtps/writer/BaseWriter.cpp b/src/cpp/rtps/writer/BaseWriter.cpp index 5402955f052..cd8aa6bfc88 100644 --- a/src/cpp/rtps/writer/BaseWriter.cpp +++ b/src/cpp/rtps/writer/BaseWriter.cpp @@ -392,7 +392,7 @@ void BaseWriter::init( } } -void BaseWriter::deinit() +void BaseWriter::local_actions_on_writer_removed() { // First, unregister changes from FlowController. This action must be protected. { diff --git a/src/cpp/rtps/writer/BaseWriter.hpp b/src/cpp/rtps/writer/BaseWriter.hpp index 4b5c17e71b4..1b14d50cda9 100644 --- a/src/cpp/rtps/writer/BaseWriter.hpp +++ b/src/cpp/rtps/writer/BaseWriter.hpp @@ -79,6 +79,8 @@ class BaseWriter bool is_async() const final; + virtual void local_actions_on_writer_removed(); + #ifdef FASTDDS_STATISTICS bool add_statistics_listener( @@ -354,8 +356,6 @@ class BaseWriter void init( const WriterAttributes& att); - void deinit(); - void add_guid( LocatorSelectorSender& locator_selector, const GUID_t& remote_guid); diff --git a/src/cpp/rtps/writer/StatefulPersistentWriter.cpp b/src/cpp/rtps/writer/StatefulPersistentWriter.cpp index 0a2e97722a1..a058dcaa0ce 100644 --- a/src/cpp/rtps/writer/StatefulPersistentWriter.cpp +++ b/src/cpp/rtps/writer/StatefulPersistentWriter.cpp @@ -44,7 +44,6 @@ StatefulPersistentWriter::StatefulPersistentWriter( StatefulPersistentWriter::~StatefulPersistentWriter() { - deinit(); } /* diff --git a/src/cpp/rtps/writer/StatefulWriter.cpp b/src/cpp/rtps/writer/StatefulWriter.cpp index 904ccb296d7..ced15bae493 100644 --- a/src/cpp/rtps/writer/StatefulWriter.cpp +++ b/src/cpp/rtps/writer/StatefulWriter.cpp @@ -252,6 +252,11 @@ void StatefulWriter::init( StatefulWriter::~StatefulWriter() { EPROSIMA_LOG_INFO(RTPS_WRITER, "StatefulWriter destructor"); +} + +void StatefulWriter::local_actions_on_writer_removed() +{ + EPROSIMA_LOG_INFO(RTPS_WRITER, "StatefulWriter local_actions_on_writer_removed"); // Disable timed events, because their callbacks use cache changes if (disable_positive_acks_) @@ -267,7 +272,7 @@ StatefulWriter::~StatefulWriter() } // This must be the next action, as it frees CacheChange_t from the async thread. - deinit(); + BaseWriter::local_actions_on_writer_removed(); // Stop all active proxies and pass them to the pool { diff --git a/src/cpp/rtps/writer/StatefulWriter.hpp b/src/cpp/rtps/writer/StatefulWriter.hpp index c81de5af951..a03075576ef 100644 --- a/src/cpp/rtps/writer/StatefulWriter.hpp +++ b/src/cpp/rtps/writer/StatefulWriter.hpp @@ -57,6 +57,8 @@ class StatefulWriter : public BaseWriter virtual ~StatefulWriter(); + void local_actions_on_writer_removed() override; + //vvvvvvvvvvvvvvvvvvvvv [Exported API] vvvvvvvvvvvvvvvvvvvvv bool matched_reader_add_edp( diff --git a/src/cpp/rtps/writer/StatelessPersistentWriter.cpp b/src/cpp/rtps/writer/StatelessPersistentWriter.cpp index 7aa760635e6..a904f180cec 100644 --- a/src/cpp/rtps/writer/StatelessPersistentWriter.cpp +++ b/src/cpp/rtps/writer/StatelessPersistentWriter.cpp @@ -42,7 +42,6 @@ StatelessPersistentWriter::StatelessPersistentWriter( StatelessPersistentWriter::~StatelessPersistentWriter() { - deinit(); } /* diff --git a/src/cpp/rtps/writer/StatelessWriter.cpp b/src/cpp/rtps/writer/StatelessWriter.cpp index 3cd88fa6fd7..16a64f3a486 100644 --- a/src/cpp/rtps/writer/StatelessWriter.cpp +++ b/src/cpp/rtps/writer/StatelessWriter.cpp @@ -186,7 +186,12 @@ void StatelessWriter::init( StatelessWriter::~StatelessWriter() { EPROSIMA_LOG_INFO(RTPS_WRITER, "StatelessWriter destructor"; ); - deinit(); +} + +void StatelessWriter::local_actions_on_writer_removed() +{ + EPROSIMA_LOG_INFO(RTPS_WRITER, "StatelessWriter local_actions_on_writer_removed"; ); + BaseWriter::local_actions_on_writer_removed(); } void StatelessWriter::get_builtin_guid() diff --git a/src/cpp/rtps/writer/StatelessWriter.hpp b/src/cpp/rtps/writer/StatelessWriter.hpp index 214655af774..ac426c111c3 100644 --- a/src/cpp/rtps/writer/StatelessWriter.hpp +++ b/src/cpp/rtps/writer/StatelessWriter.hpp @@ -57,6 +57,8 @@ class StatelessWriter : public BaseWriter virtual ~StatelessWriter(); + void local_actions_on_writer_removed() override; + //vvvvvvvvvvvvvvvvvvvvv [Exported API] vvvvvvvvvvvvvvvvvvvvv bool matched_reader_add_edp( diff --git a/test/blackbox/common/BlackboxTestsPubSubFlowControllers.cpp b/test/blackbox/common/BlackboxTestsPubSubFlowControllers.cpp index 3de86bdf621..75d3c529c24 100644 --- a/test/blackbox/common/BlackboxTestsPubSubFlowControllers.cpp +++ b/test/blackbox/common/BlackboxTestsPubSubFlowControllers.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include @@ -228,6 +229,60 @@ TEST_P(PubSubFlowControllers, AsyncMultipleWritersFlowController64kb) entities.block_for_all(); } +TEST_P(PubSubFlowControllers, AsyncPubSubAsReliableData64kbWithParticipantFlowControlAndPersistence) +{ + PubSubReader reader(TEST_TOPIC_NAME); + PubSubWriter writer(TEST_TOPIC_NAME); + + // Get info about current test + auto info = ::testing::UnitTest::GetInstance()->current_test_info(); + + // Create DB file name from test name and PID + std::ostringstream ss; + std::string test_case_name(info->test_case_name()); + std::string test_name(info->name()); + ss << + test_case_name.replace(test_case_name.find_first_of('/'), 1, "_") << "_" << + test_name.replace(test_name.find_first_of('/'), 1, "_") << "_" << GET_PID() << ".db"; + std::string db_file_name = {ss.str()}; + + reader.history_depth(3). + reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS).init(); + + ASSERT_TRUE(reader.isInitialized()); + + uint32_t bytesPerPeriod = 65000; + uint32_t periodInMs = 500; + writer.add_flow_controller_descriptor_to_pparams(scheduler_policy_, bytesPerPeriod, periodInMs); + + writer.history_depth(3) + .asynchronously(eprosima::fastdds::dds::ASYNCHRONOUS_PUBLISH_MODE) + .make_transient(db_file_name, "33.72.69.74.65.72.5f.70.65.72.73.5f|67.75.69.64") + .init(); + + ASSERT_TRUE(writer.isInitialized()); + + // Because its volatile the durability + // Wait for discovery. + writer.wait_discovery(); + reader.wait_discovery(); + + auto data = default_data64kb_data_generator(3); + + reader.startReception(data); + + // Send data + writer.send(data); + // In this test all data should be sent. + ASSERT_TRUE(data.empty()); + // Block reader until reception finished or timeout. + reader.block_for_all(); + + reader.destroy(); + writer.destroy(); + std::remove(db_file_name.c_str()); +} + #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else diff --git a/test/blackbox/common/DDSBlackboxTestsPersistence.cpp b/test/blackbox/common/DDSBlackboxTestsPersistence.cpp index 2d2881fb930..3c18ea03695 100644 --- a/test/blackbox/common/DDSBlackboxTestsPersistence.cpp +++ b/test/blackbox/common/DDSBlackboxTestsPersistence.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include