diff --git a/src/cpp/CMakeLists.txt b/src/cpp/CMakeLists.txt index bd364a73194..f5595b54a33 100644 --- a/src/cpp/CMakeLists.txt +++ b/src/cpp/CMakeLists.txt @@ -141,6 +141,7 @@ set(${PROJECT_NAME}_source_files rtps/builtin/discovery/participant/PDPServer.cpp rtps/builtin/discovery/participant/PDPServerListener.cpp rtps/builtin/discovery/participant/PDPSimple.cpp + rtps/builtin/discovery/participant/simple/PDPStatelessWriter.cpp rtps/builtin/discovery/participant/timedevent/DSClientEvent.cpp rtps/builtin/discovery/participant/timedevent/DServerEvent.cpp rtps/builtin/liveliness/WLP.cpp diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp index 20c70bac635..1a37a608b82 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp @@ -287,7 +287,7 @@ void PDPSimple::announceParticipantState( if (!(dispose || new_change)) { - endpoints->writer.writer_->unsent_changes_reset(); + endpoints->writer.writer_->send_periodic_announcement(); } } } @@ -403,7 +403,7 @@ bool PDPSimple::create_dcps_participant_endpoints() if (mp_RTPSParticipant->createWriter(&rtps_writer, watt, writer.history_.get(), nullptr, writer_entity_id, true)) { - writer.writer_ = dynamic_cast(rtps_writer); + writer.writer_ = dynamic_cast(rtps_writer); assert(nullptr != writer.writer_); #if HAVE_SECURITY @@ -455,7 +455,7 @@ bool PDPSimple::create_dcps_participant_endpoints() EPROSIMA_LOG_WARNING(RTPS_PDP, "Ignoring initial peers locator " << loc << " : not allowed."); } } - writer.writer_->set_fixed_locators(fixed_locators); + writer.writer_->set_initial_peers(fixed_locators); } else { @@ -727,11 +727,6 @@ void PDPSimple::match_pdp_remote_endpoints( { writer->matched_reader_add_edp(*temp_reader_data); } - - if (!writer_only && (dds::BEST_EFFORT_RELIABILITY_QOS == reliability_kind)) - { - endpoints->writer.writer_->unsent_changes_reset(); - } } } diff --git a/src/cpp/rtps/builtin/discovery/participant/simple/PDPStatelessWriter.cpp b/src/cpp/rtps/builtin/discovery/participant/simple/PDPStatelessWriter.cpp new file mode 100644 index 00000000000..d26a68555b3 --- /dev/null +++ b/src/cpp/rtps/builtin/discovery/participant/simple/PDPStatelessWriter.cpp @@ -0,0 +1,187 @@ +// Copyright 2025 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file PDPStatelessWriter.cpp + */ + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace eprosima { +namespace fastdds { +namespace rtps { + +PDPStatelessWriter::PDPStatelessWriter( + RTPSParticipantImpl* participant, + const GUID_t& guid, + const WriterAttributes& attributes, + FlowController* flow_controller, + WriterHistory* history, + WriterListener* listener) + : StatelessWriter(participant, guid, attributes, flow_controller, history, listener) + , interested_readers_(participant->get_attributes().allocation.participants) +{ +} + +bool PDPStatelessWriter::matched_reader_add_edp( + const ReaderProxyData& data) +{ + bool ret = StatelessWriter::matched_reader_add_edp(data); + if (ret) + { + // Mark new reader as interested + add_interested_reader(data.guid()); + // Send announcement to new reader + reschedule_all_samples(); + } + return ret; +} + +bool PDPStatelessWriter::matched_reader_remove( + const GUID_t& reader_guid) +{ + bool ret = StatelessWriter::matched_reader_remove(reader_guid); + if (ret) + { + // Mark reader as not interested + remove_interested_reader(reader_guid); + } + return ret; +} + +void PDPStatelessWriter::unsent_change_added_to_history( + CacheChange_t* change, + const std::chrono::time_point& max_blocking_time) +{ + mark_all_readers_interested(); + StatelessWriter::unsent_change_added_to_history(change, max_blocking_time); +} + +void PDPStatelessWriter::set_initial_peers( + const LocatorList& locator_list) +{ + std::lock_guard guard(mp_mutex); + + initial_peers_.push_back(locator_list); + mp_RTPSParticipant->createSenderResources(initial_peers_); +} + +void PDPStatelessWriter::send_periodic_announcement() +{ + mark_all_readers_interested(); + reschedule_all_samples(); +} + +bool PDPStatelessWriter::send_to_fixed_locators( + const std::vector& buffers, + const uint32_t& total_bytes, + std::chrono::steady_clock::time_point& max_blocking_time_point) const +{ + bool ret = true; + + if (should_reach_all_destinations_) + { + ret = initial_peers_.empty() || + mp_RTPSParticipant->sendSync(buffers, total_bytes, m_guid, + Locators(initial_peers_.begin()), Locators(initial_peers_.end()), + max_blocking_time_point); + + if (ret) + { + fixed_locators_.clear(); + should_reach_all_destinations_ = false; + } + } + else + { + interested_readers_.clear(); + } + + return ret; +} + +bool PDPStatelessWriter::is_relevant( + const fastdds::rtps::CacheChange_t& /* change */, + const fastdds::rtps::GUID_t& reader_guid) const +{ + return interested_readers_.end() != + std::find(interested_readers_.begin(), interested_readers_.end(), reader_guid); +} + +void PDPStatelessWriter::mark_all_readers_interested() +{ + std::lock_guard guard(mp_mutex); + should_reach_all_destinations_ = true; + interested_readers_.clear(); + fixed_locators_.clear(); + fixed_locators_.push_back(initial_peers_); + reader_data_filter(nullptr); +} + +void PDPStatelessWriter::add_interested_reader( + const GUID_t& reader_guid) +{ + std::lock_guard guard(mp_mutex); + if (!should_reach_all_destinations_) + { + auto it = std::find(interested_readers_.begin(), interested_readers_.end(), reader_guid); + if (it == interested_readers_.end()) + { + interested_readers_.emplace_back(reader_guid); + reader_data_filter(this); + } + } +} + +void PDPStatelessWriter::remove_interested_reader( + const GUID_t& reader_guid) +{ + std::lock_guard guard(mp_mutex); + interested_readers_.remove(reader_guid); +} + +void PDPStatelessWriter::reschedule_all_samples() +{ + std::lock_guard guard(mp_mutex); + size_t n_samples = history_->getHistorySize(); + if (0 < n_samples) + { + assert(1 == n_samples); + auto it = history_->changesBegin(); + CacheChange_t* change = *it; + flow_controller_->add_new_sample(this, change, std::chrono::steady_clock::now() + std::chrono::hours(24)); + } +} + +} // namespace rtps +} // namespace fastdds +} // namespace eprosima diff --git a/src/cpp/rtps/builtin/discovery/participant/simple/PDPStatelessWriter.hpp b/src/cpp/rtps/builtin/discovery/participant/simple/PDPStatelessWriter.hpp new file mode 100644 index 00000000000..7d21b6a41bc --- /dev/null +++ b/src/cpp/rtps/builtin/discovery/participant/simple/PDPStatelessWriter.hpp @@ -0,0 +1,155 @@ +// Copyright 2025 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file PDPStatelessWriter.hpp + */ + +#ifndef FASTDDS_RTPS_BUILTIN_DISCOVERY_PARTICIPANT_SIMPLE__PDPSTATELESSWRITER_HPP +#define FASTDDS_RTPS_BUILTIN_DISCOVERY_PARTICIPANT_SIMPLE__PDPSTATELESSWRITER_HPP + +#include + +#include +#include +#include + +#include + +namespace eprosima { +namespace fastdds { +namespace rtps { + +/** + * Class PDPStatelessWriter, specialization of StatelessWriter with specific behavior for PDP. + */ +class PDPStatelessWriter : public StatelessWriter, private IReaderDataFilter +{ + +public: + + PDPStatelessWriter( + RTPSParticipantImpl* participant, + const GUID_t& guid, + const WriterAttributes& attributes, + FlowController* flow_controller, + WriterHistory* history, + WriterListener* listener); + + virtual ~PDPStatelessWriter() = default; + + //vvvvvvvvvvvvvvvvvvvvv [Exported API] vvvvvvvvvvvvvvvvvvvvv + + bool matched_reader_add_edp( + const ReaderProxyData& data) final; + + bool matched_reader_remove( + const GUID_t& reader_guid) final; + + //^^^^^^^^^^^^^^^^^^^^^^ [Exported API] ^^^^^^^^^^^^^^^^^^^^^^^ + + //vvvvvvvvvvvvvvvvvvvvv [BaseWriter API] vvvvvvvvvvvvvvvvvvvvvv + + void unsent_change_added_to_history( + CacheChange_t* change, + const std::chrono::time_point& max_blocking_time) final; + + //^^^^^^^^^^^^^^^^^^^^^^ [BaseWriter API] ^^^^^^^^^^^^^^^^^^^^^^^ + + /** + * @brief Set the locators to which the writer should send periodic announcements. + * + * This method is used to configure the initial peers list on the PDP writer. + * + * @param locator_list List of locators to which the writer should send periodic announcements. + * + * @return true if the locators were set successfully. + */ + void set_initial_peers( + const LocatorList& locator_list); + + /** + * Reset the unsent changes. + */ + void send_periodic_announcement(); + +protected: + + bool send_to_fixed_locators( + const std::vector& buffers, + const uint32_t& total_bytes, + std::chrono::steady_clock::time_point& max_blocking_time_point) const override; + +private: + + /** + * This method checks whether a CacheChange_t is relevant for the specified reader + * This callback should return always the same result given the same arguments + * @param change The CacheChange_t to be evaluated + * @param reader_guid remote reader GUID_t + * @return true if relevant, false otherwise. + */ + bool is_relevant( + const fastdds::rtps::CacheChange_t& change, + const fastdds::rtps::GUID_t& reader_guid) const final; + + /** + * @brief Mark all readers as interested. + * + * This method sets the flag indicating that all readers are interested in the data sent by this writer. + * It is used to ensure that all readers are considered when sending data. + * The flag will be reset when all the samples from this writer have been sent. + */ + void mark_all_readers_interested(); + + /** + * @brief Mark an interested reader. + * + * Add the guid of a reader to the list of interested readers. + * + * @param reader_guid The GUID of the reader to mark as interested. + */ + void add_interested_reader( + const GUID_t& reader_guid); + + /** + * @brief Unmark an interested reader. + * + * Remove the guid of a reader from the list of interested readers. + * + * @param reader_guid The GUID of the reader to mark as interested. + */ + void remove_interested_reader( + const GUID_t& reader_guid); + + /** + * @brief Add all samples from this writer to the flow controller. + */ + void reschedule_all_samples(); + + //! Configured initial peers + LocatorList initial_peers_{}; + //! The set of readers interested + mutable ResourceLimitedVector interested_readers_; + //! Whether we have set that all destinations are interested + mutable bool should_reach_all_destinations_ = false; + +}; + +} // namespace rtps +} // namespace fastdds +} // namespace eprosima + +#endif // FASTDDS_RTPS_BUILTIN_DISCOVERY_PARTICIPANT_SIMPLE__PDPSTATELESSWRITER_HPP + diff --git a/src/cpp/rtps/builtin/discovery/participant/simple/SimplePDPEndpoints.hpp b/src/cpp/rtps/builtin/discovery/participant/simple/SimplePDPEndpoints.hpp index 791390abba5..6cff91e90d0 100644 --- a/src/cpp/rtps/builtin/discovery/participant/simple/SimplePDPEndpoints.hpp +++ b/src/cpp/rtps/builtin/discovery/participant/simple/SimplePDPEndpoints.hpp @@ -26,9 +26,9 @@ #include #include #include +#include #include #include -#include namespace eprosima { namespace fastdds { @@ -86,7 +86,7 @@ struct SimplePDPEndpoints : public PDPEndpoints BuiltinReader reader; //! Builtin Simple PDP writer - BuiltinWriter writer; + BuiltinWriter writer; }; } // namespace rtps diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index 9bdc3175d90..f54d4d8014d 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -53,6 +53,7 @@ #include #include #include +#include #include #include #include @@ -1231,7 +1232,7 @@ bool RTPSParticipantImpl::create_writer( return false; } - auto callback = [hist, listen, this] + auto callback = [hist, listen, entityId, this] (const GUID_t& guid, WriterAttributes& watt, FlowController* flow_controller, IPersistenceService* persistence, bool is_reliable) -> BaseWriter* { @@ -1251,7 +1252,11 @@ bool RTPSParticipantImpl::create_writer( } else { - if (persistence != nullptr) + if (entityId == c_EntityId_SPDPWriter) + { + writer = new PDPStatelessWriter(this, guid, watt, flow_controller, hist, listen); + } + else if (persistence != nullptr) { writer = new StatelessPersistentWriter(this, guid, watt, flow_controller, hist, listen, persistence); diff --git a/src/cpp/rtps/writer/StatelessWriter.cpp b/src/cpp/rtps/writer/StatelessWriter.cpp index 16a64f3a486..0adbb0e2bae 100644 --- a/src/cpp/rtps/writer/StatelessWriter.cpp +++ b/src/cpp/rtps/writer/StatelessWriter.cpp @@ -563,6 +563,17 @@ bool StatelessWriter::matched_reader_add_edp( << " as remote reader"); } + // Create sender resources for the case when we send to a single reader + locator_selector_.locator_selector.reset(false); + locator_selector_.locator_selector.enable(data.guid()); + mp_RTPSParticipant->network_factory().select_locators(locator_selector_.locator_selector); + RTPSParticipantImpl* part = mp_RTPSParticipant; + locator_selector_.locator_selector.for_each([part](const Locator_t& loc) + { + part->createSenderResources(loc); + }); + + // Create sender resources for the case when we send to all readers update_reader_info(true); if (nullptr != listener_) @@ -587,26 +598,6 @@ bool StatelessWriter::matched_reader_add_edp( return true; } -bool StatelessWriter::set_fixed_locators( - const LocatorList_t& locator_list) -{ -#if HAVE_SECURITY - if (getAttributes().security_attributes().is_submessage_protected || - getAttributes().security_attributes().is_payload_protected) - { - EPROSIMA_LOG_ERROR(RTPS_WRITER, "A secure besteffort writer cannot add a lonely locator"); - return false; - } -#endif // if HAVE_SECURITY - - std::lock_guard guard(mp_mutex); - - fixed_locators_.push_back(locator_list); - mp_RTPSParticipant->createSenderResources(fixed_locators_); - - return true; -} - bool StatelessWriter::matched_reader_remove( const GUID_t& reader_guid) { @@ -698,16 +689,6 @@ bool StatelessWriter::matched_reader_is_matched( ); } -void StatelessWriter::unsent_changes_reset() -{ - std::lock_guard guard(mp_mutex); - std::for_each(history_->changesBegin(), history_->changesEnd(), [&](CacheChange_t* change) - { - flow_controller_->add_new_sample(this, change, - std::chrono::steady_clock::now() + std::chrono::hours(24)); - }); -} - bool StatelessWriter::process_acknack( const GUID_t& writer_guid, const GUID_t& reader_guid, @@ -757,6 +738,14 @@ bool StatelessWriter::send_nts( return false; } + return send_to_fixed_locators(buffers, total_bytes, max_blocking_time_point); +} + +bool StatelessWriter::send_to_fixed_locators( + const std::vector& buffers, + const uint32_t& total_bytes, + std::chrono::steady_clock::time_point& max_blocking_time_point) const +{ return fixed_locators_.empty() || mp_RTPSParticipant->sendSync(buffers, total_bytes, m_guid, Locators(fixed_locators_.begin()), Locators(fixed_locators_.end()), diff --git a/src/cpp/rtps/writer/StatelessWriter.hpp b/src/cpp/rtps/writer/StatelessWriter.hpp index ac426c111c3..f30bbd7b1bb 100644 --- a/src/cpp/rtps/writer/StatelessWriter.hpp +++ b/src/cpp/rtps/writer/StatelessWriter.hpp @@ -23,6 +23,7 @@ #include #include +#include #include #include #include @@ -62,10 +63,10 @@ class StatelessWriter : public BaseWriter //vvvvvvvvvvvvvvvvvvvvv [Exported API] vvvvvvvvvvvvvvvvvvvvv bool matched_reader_add_edp( - const ReaderProxyData& data) final; + const ReaderProxyData& data) override; bool matched_reader_remove( - const GUID_t& reader_guid) final; + const GUID_t& reader_guid) override; bool matched_reader_is_matched( const GUID_t& reader_guid) final; @@ -176,23 +177,6 @@ class StatelessWriter : public BaseWriter //^^^^^^^^^^^^^^^^^^^^^^ [BaseWriter API] ^^^^^^^^^^^^^^^^^^^^^^^ - /** - * @brief Set the locators to which the writer should always send data. - * - * This method is used to configure the initial peers list on the PDP writer. - * - * @param locator_list List of locators to which the writer should always send data. - * - * @return true if the locators were set successfully. - */ - bool set_fixed_locators( - const LocatorList_t& locator_list); - - /** - * Reset the unsent changes. - */ - void unsent_changes_reset(); - /** * Get the number of matched readers * @return Number of the matched readers @@ -205,6 +189,15 @@ class StatelessWriter : public BaseWriter + matched_datasharing_readers_.size(); } +protected: + + mutable LocatorList_t fixed_locators_; + + virtual bool send_to_fixed_locators( + const std::vector& buffers, + const uint32_t& total_bytes, + std::chrono::steady_clock::time_point& max_blocking_time_point) const; + private: void init( @@ -226,7 +219,6 @@ class StatelessWriter : public BaseWriter ReaderLocator& reader_locator); bool is_inline_qos_expected_ = false; - LocatorList_t fixed_locators_; ResourceLimitedVector> matched_remote_readers_; std::condition_variable_any unsent_changes_cond_; diff --git a/test/blackbox/common/BlackboxTestsDiscovery.cpp b/test/blackbox/common/BlackboxTestsDiscovery.cpp index 16c1f724e29..b4948032e4e 100644 --- a/test/blackbox/common/BlackboxTestsDiscovery.cpp +++ b/test/blackbox/common/BlackboxTestsDiscovery.cpp @@ -42,6 +42,7 @@ #include "PubSubReader.hpp" #include "PubSubWriter.hpp" #include "PubSubWriterReader.hpp" +#include "PubSubParticipant.hpp" using namespace eprosima::fastdds; using namespace eprosima::fastdds::rtps; @@ -1327,9 +1328,8 @@ TEST_P(Discovery, AsymmeticIgnoreParticipantFlags) // This will hold the multicast port. Since the test is not always run in the same domain, we'll need to set // its value when the first multicast datagram is sent. std::atomic multicast_port{ 0 }; - // Only two multicast datagrams are allowed: the initial DATA(p) and the DATA(p) sent in response of the discovery - // of p1. - constexpr uint32_t allowed_messages_on_port = 2; + // Only one multicast datagram is allowed: the initial DATA(p) + constexpr uint32_t allowed_messages_on_port = 1; auto test_transport = std::make_shared(); @@ -1365,6 +1365,115 @@ TEST_P(Discovery, AsymmeticIgnoreParticipantFlags) EXPECT_EQ(messages_on_port, allowed_messages_on_port); } +//! Regression test for redmine issue 22506 +TEST_P(Discovery, single_unicast_pdp_response) +{ + // Leverage intraprocess so transport is only used for participant discovery + if (INTRAPROCESS != GetParam()) + { + GTEST_SKIP() << "Only makes sense on INTRAPROCESS"; + return; + } + + using namespace eprosima::fastdds::dds; + + // All participants would restrict communication to UDP localhost. + // The main participant should send a single initial announcement, and have a big announcement period. + // This is to ensure that we only check the datagrams sent in response to the participant discovery, + // and not the ones sent in the periodic announcements. + // The main participant will use the test transport to count the number of unicast messages sent. + + // This will hold the multicast port. Since the test is not always run in the same domain, we'll need to set + // its value when the first multicast datagram is sent. + std::atomic multicast_port{ 0 }; + // Declare a test transport that will count the number of unicast messages sent + std::atomic num_unicast_sends{ 0 }; + auto test_transport = std::make_shared(); + test_transport->interfaceWhiteList.push_back("127.0.0.1"); + test_transport->locator_filter_ = [&num_unicast_sends, &multicast_port]( + const eprosima::fastdds::rtps::Locator& destination) + { + if (IPLocator::isMulticast(destination)) + { + uint32_t port = 0; + multicast_port.compare_exchange_strong(port, destination.port); + } + else + { + num_unicast_sends.fetch_add(1u, std::memory_order_seq_cst); + } + + // Do not discard any message + return false; + }; + + // Create the main participant + auto main_participant = std::make_shared>(0, 0, 0, 0); + WireProtocolConfigQos main_wire_protocol; + main_wire_protocol.builtin.avoid_builtin_multicast = true; + main_wire_protocol.builtin.discovery_config.leaseDuration = c_TimeInfinite; + main_wire_protocol.builtin.discovery_config.leaseDuration_announcementperiod = { 3600, 0 }; + main_wire_protocol.builtin.discovery_config.initial_announcements.count = 1; + main_wire_protocol.builtin.discovery_config.initial_announcements.period = { 0, 100000000 }; + + // The main participant will use the test transport and a specific announcments configuration + main_participant->disable_builtin_transport().add_user_transport_to_pparams(test_transport) + .wire_protocol(main_wire_protocol); + + // Start the main participant + ASSERT_TRUE(main_participant->init_participant()); + + // Wait for the initial announcements to be sent + std::this_thread::sleep_for(std::chrono::seconds(1)); + // This would have set the multicast port + EXPECT_NE(multicast_port, 0u); + + // The rest of the participants only send announcements to the main participant + // Calculate the metatraffic unicast port of the main participant + uint32_t port = multicast_port + main_wire_protocol.port.offsetd1 - main_wire_protocol.port.offsetd0; + + // The rest of the participants only send announcements to the main participant + auto udp_localhost_transport = std::make_shared(); + udp_localhost_transport->interfaceWhiteList.push_back("127.0.0.1"); + Locator peer_locator; + IPLocator::createLocator(LOCATOR_KIND_UDPv4, "127.0.0.1", port, peer_locator); + WireProtocolConfigQos wire_protocol; + wire_protocol.builtin.avoid_builtin_multicast = true; + wire_protocol.builtin.initialPeersList.push_back(peer_locator); + + std::vector>> participants; + for (size_t i = 0; i < 5; ++i) + { + auto participant = std::make_shared>(0, 0, 0, 0); + // All participants use the same transport + participant->disable_builtin_transport().add_user_transport_to_pparams(udp_localhost_transport) + .wire_protocol(wire_protocol); + participants.push_back(participant); + } + + // Start the rest of the participants + for (auto& participant : participants) + { + ASSERT_TRUE(participant->init_participant()); + participant->wait_discovery(); + } + + // Destroy main participant + main_participant.reset(); + for (auto& participant : participants) + { + participant->wait_discovery(std::chrono::seconds::zero(), 0, true); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + // Check that only two unicast messages per participant were sent + EXPECT_EQ(num_unicast_sends.load(std::memory_order::memory_order_seq_cst), + participants.size() + participants.size()); + + // Clean up + participants.clear(); +} + #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else diff --git a/test/blackbox/common/DDSBlackboxTestsMonitorService.cpp b/test/blackbox/common/DDSBlackboxTestsMonitorService.cpp index bdcfc85b47e..c4339cdd33a 100644 --- a/test/blackbox/common/DDSBlackboxTestsMonitorService.cpp +++ b/test/blackbox/common/DDSBlackboxTestsMonitorService.cpp @@ -12,15 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include +#include +#include #include #include #include #include // TODO(jlbueno): remove private header -#include #include #include "../types/statistics/monitorservice_typesPubSubTypes.hpp" #include "BlackboxTests.hpp" @@ -118,6 +120,7 @@ class MonitorServiceParticipant void setup() { DomainParticipantQos pqos; + pqos.wire_protocol().builtin.discovery_config.leaseDuration_announcementperiod = { 0, 250000000 }; pqos.name() = "Monitor_Service_Participant"; auto participant = DomainParticipantFactory::get_instance()-> create_participant((uint32_t)GET_PID() % 230, pqos); diff --git a/test/unittest/dds/publisher/CMakeLists.txt b/test/unittest/dds/publisher/CMakeLists.txt index 81fe89d5633..3f0ff8d3b53 100644 --- a/test/unittest/dds/publisher/CMakeLists.txt +++ b/test/unittest/dds/publisher/CMakeLists.txt @@ -106,6 +106,7 @@ set(DATAWRITERTESTS_SOURCE DataWriterTests.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/builtin/discovery/participant/PDPServerListener.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/builtin/discovery/participant/simple/PDPStatelessWriter.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/builtin/discovery/participant/timedevent/DSClientEvent.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/builtin/discovery/participant/timedevent/DServerEvent.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/builtin/liveliness/WLP.cpp diff --git a/test/unittest/rtps/reader/CMakeLists.txt b/test/unittest/rtps/reader/CMakeLists.txt index 3c841189e05..7be873e41eb 100644 --- a/test/unittest/rtps/reader/CMakeLists.txt +++ b/test/unittest/rtps/reader/CMakeLists.txt @@ -205,6 +205,7 @@ set(STATEFUL_READER_TESTS_SOURCE StatefulReaderTests.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/builtin/discovery/participant/PDPServerListener.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/builtin/discovery/participant/simple/PDPStatelessWriter.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/builtin/discovery/participant/timedevent/DSClientEvent.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/builtin/discovery/participant/timedevent/DServerEvent.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/builtin/liveliness/WLP.cpp diff --git a/test/unittest/statistics/dds/CMakeLists.txt b/test/unittest/statistics/dds/CMakeLists.txt index 5053fa246c7..1e538e3e991 100644 --- a/test/unittest/statistics/dds/CMakeLists.txt +++ b/test/unittest/statistics/dds/CMakeLists.txt @@ -209,6 +209,7 @@ if (SQLITE3_SUPPORT AND FASTDDS_STATISTICS AND NOT QNX) ${PROJECT_SOURCE_DIR}/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/builtin/discovery/participant/PDPServerListener.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/builtin/discovery/participant/simple/PDPStatelessWriter.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/builtin/discovery/participant/timedevent/DSClientEvent.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/builtin/discovery/participant/timedevent/DServerEvent.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/builtin/liveliness/WLP.cpp @@ -391,6 +392,7 @@ if (SQLITE3_SUPPORT AND FASTDDS_STATISTICS AND NOT QNX) ${PROJECT_SOURCE_DIR}/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/builtin/discovery/participant/PDPServerListener.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/builtin/discovery/participant/simple/PDPStatelessWriter.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/builtin/discovery/participant/timedevent/DSClientEvent.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/builtin/discovery/participant/timedevent/DServerEvent.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/builtin/liveliness/WLP.cpp