Skip to content

Commit 6e7d002

Browse files
MiguelCompanymergify[bot]
authored andcommitted
Filter interested readers on PDP writer (#5604)
* Refs #22506. Regression test. Signed-off-by: Miguel Company <[email protected]> * Refs #22506. Add `PDPStatelessWriter` extending `StatelessWriter` Signed-off-by: Miguel Company <[email protected]> * Refs #22506. Use `PDPStatelessWriter` in `SimplePDPEndpoints` Signed-off-by: Miguel Company <[email protected]> * Refs #22506. Move specific API to `PDPStatelessWriter`. Signed-off-by: Miguel Company <[email protected]> * Refs #22506. Initial work on management of interested writers. Signed-off-by: Miguel Company <[email protected]> * Refs #22506. Take advantage of single sample history. Signed-off-by: Miguel Company <[email protected]> * Refs #22506. Reset state after sending datagram. Signed-off-by: Miguel Company <[email protected]> * Refs #22506. Filter interested readers. Signed-off-by: Miguel Company <[email protected]> * Refs #22506. Do not reset unsent changes upon participant discovery. Signed-off-by: Miguel Company <[email protected]> * Refs #22506. Fix AsymmeticIgnoreParticipantFlags test. Signed-off-by: Miguel Company <[email protected]> * Refs #22506. Create sender resources for matched readers. Signed-off-by: Miguel Company <[email protected]> * Refs #22708. Rename methods. Signed-off-by: Miguel Company <[email protected]> * Refs #22708. Avoid allocations using a `ResourceLimitedVector` instead of an `std::set` Signed-off-by: Miguel Company <[email protected]> * Refs #22604. Improve monitor service blackbox tests. Reducing announcement period to improve discovery timing. Signed-off-by: Miguel Company <[email protected]> --------- Signed-off-by: Miguel Company <[email protected]> (cherry picked from commit 8eb1072)
1 parent e03357e commit 6e7d002

File tree

13 files changed

+506
-66
lines changed

13 files changed

+506
-66
lines changed

src/cpp/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ set(${PROJECT_NAME}_source_files
141141
rtps/builtin/discovery/participant/PDPServer.cpp
142142
rtps/builtin/discovery/participant/PDPServerListener.cpp
143143
rtps/builtin/discovery/participant/PDPSimple.cpp
144+
rtps/builtin/discovery/participant/simple/PDPStatelessWriter.cpp
144145
rtps/builtin/discovery/participant/timedevent/DSClientEvent.cpp
145146
rtps/builtin/discovery/participant/timedevent/DServerEvent.cpp
146147
rtps/builtin/liveliness/WLP.cpp

src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp

+3-8
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ void PDPSimple::announceParticipantState(
287287

288288
if (!(dispose || new_change))
289289
{
290-
endpoints->writer.writer_->unsent_changes_reset();
290+
endpoints->writer.writer_->send_periodic_announcement();
291291
}
292292
}
293293
}
@@ -403,7 +403,7 @@ bool PDPSimple::create_dcps_participant_endpoints()
403403
if (mp_RTPSParticipant->createWriter(&rtps_writer, watt, writer.history_.get(),
404404
nullptr, writer_entity_id, true))
405405
{
406-
writer.writer_ = dynamic_cast<StatelessWriter*>(rtps_writer);
406+
writer.writer_ = dynamic_cast<PDPStatelessWriter*>(rtps_writer);
407407
assert(nullptr != writer.writer_);
408408

409409
#if HAVE_SECURITY
@@ -455,7 +455,7 @@ bool PDPSimple::create_dcps_participant_endpoints()
455455
EPROSIMA_LOG_WARNING(RTPS_PDP, "Ignoring initial peers locator " << loc << " : not allowed.");
456456
}
457457
}
458-
writer.writer_->set_fixed_locators(fixed_locators);
458+
writer.writer_->set_initial_peers(fixed_locators);
459459
}
460460
else
461461
{
@@ -725,11 +725,6 @@ void PDPSimple::match_pdp_remote_endpoints(
725725
{
726726
writer->matched_reader_add_edp(*temp_reader_data);
727727
}
728-
729-
if (!writer_only && (dds::BEST_EFFORT_RELIABILITY_QOS == reliability_kind))
730-
{
731-
endpoints->writer.writer_->unsent_changes_reset();
732-
}
733728
}
734729
}
735730

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
// Copyright 2025 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
/**
16+
* @file PDPStatelessWriter.cpp
17+
*/
18+
19+
#include <rtps/builtin/discovery/participant/simple/PDPStatelessWriter.hpp>
20+
21+
#include <algorithm>
22+
#include <cassert>
23+
#include <chrono>
24+
#include <cstdint>
25+
#include <mutex>
26+
#include <set>
27+
#include <vector>
28+
29+
#include <fastdds/rtps/common/LocatorList.hpp>
30+
#include <fastdds/rtps/history/WriterHistory.hpp>
31+
#include <fastdds/rtps/transport/NetworkBuffer.hpp>
32+
#include <fastdds/utils/collections/ResourceLimitedVector.hpp>
33+
#include <fastdds/utils/TimedMutex.hpp>
34+
35+
#include <rtps/builtin/data/ReaderProxyData.hpp>
36+
#include <rtps/participant/RTPSParticipantImpl.hpp>
37+
#include <rtps/writer/StatelessWriter.hpp>
38+
39+
namespace eprosima {
40+
namespace fastdds {
41+
namespace rtps {
42+
43+
PDPStatelessWriter::PDPStatelessWriter(
44+
RTPSParticipantImpl* participant,
45+
const GUID_t& guid,
46+
const WriterAttributes& attributes,
47+
FlowController* flow_controller,
48+
WriterHistory* history,
49+
WriterListener* listener)
50+
: StatelessWriter(participant, guid, attributes, flow_controller, history, listener)
51+
, interested_readers_(participant->get_attributes().allocation.participants)
52+
{
53+
}
54+
55+
bool PDPStatelessWriter::matched_reader_add_edp(
56+
const ReaderProxyData& data)
57+
{
58+
bool ret = StatelessWriter::matched_reader_add_edp(data);
59+
if (ret)
60+
{
61+
// Mark new reader as interested
62+
add_interested_reader(data.guid());
63+
// Send announcement to new reader
64+
reschedule_all_samples();
65+
}
66+
return ret;
67+
}
68+
69+
bool PDPStatelessWriter::matched_reader_remove(
70+
const GUID_t& reader_guid)
71+
{
72+
bool ret = StatelessWriter::matched_reader_remove(reader_guid);
73+
if (ret)
74+
{
75+
// Mark reader as not interested
76+
remove_interested_reader(reader_guid);
77+
}
78+
return ret;
79+
}
80+
81+
void PDPStatelessWriter::unsent_change_added_to_history(
82+
CacheChange_t* change,
83+
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time)
84+
{
85+
mark_all_readers_interested();
86+
StatelessWriter::unsent_change_added_to_history(change, max_blocking_time);
87+
}
88+
89+
void PDPStatelessWriter::set_initial_peers(
90+
const LocatorList& locator_list)
91+
{
92+
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
93+
94+
initial_peers_.push_back(locator_list);
95+
mp_RTPSParticipant->createSenderResources(initial_peers_);
96+
}
97+
98+
void PDPStatelessWriter::send_periodic_announcement()
99+
{
100+
mark_all_readers_interested();
101+
reschedule_all_samples();
102+
}
103+
104+
bool PDPStatelessWriter::send_to_fixed_locators(
105+
const std::vector<eprosima::fastdds::rtps::NetworkBuffer>& buffers,
106+
const uint32_t& total_bytes,
107+
std::chrono::steady_clock::time_point& max_blocking_time_point) const
108+
{
109+
bool ret = true;
110+
111+
if (should_reach_all_destinations_)
112+
{
113+
ret = initial_peers_.empty() ||
114+
mp_RTPSParticipant->sendSync(buffers, total_bytes, m_guid,
115+
Locators(initial_peers_.begin()), Locators(initial_peers_.end()),
116+
max_blocking_time_point);
117+
118+
if (ret)
119+
{
120+
fixed_locators_.clear();
121+
should_reach_all_destinations_ = false;
122+
}
123+
}
124+
else
125+
{
126+
interested_readers_.clear();
127+
}
128+
129+
return ret;
130+
}
131+
132+
bool PDPStatelessWriter::is_relevant(
133+
const fastdds::rtps::CacheChange_t& /* change */,
134+
const fastdds::rtps::GUID_t& reader_guid) const
135+
{
136+
return interested_readers_.end() !=
137+
std::find(interested_readers_.begin(), interested_readers_.end(), reader_guid);
138+
}
139+
140+
void PDPStatelessWriter::mark_all_readers_interested()
141+
{
142+
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
143+
should_reach_all_destinations_ = true;
144+
interested_readers_.clear();
145+
fixed_locators_.clear();
146+
fixed_locators_.push_back(initial_peers_);
147+
reader_data_filter(nullptr);
148+
}
149+
150+
void PDPStatelessWriter::add_interested_reader(
151+
const GUID_t& reader_guid)
152+
{
153+
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
154+
if (!should_reach_all_destinations_)
155+
{
156+
auto it = std::find(interested_readers_.begin(), interested_readers_.end(), reader_guid);
157+
if (it == interested_readers_.end())
158+
{
159+
interested_readers_.emplace_back(reader_guid);
160+
reader_data_filter(this);
161+
}
162+
}
163+
}
164+
165+
void PDPStatelessWriter::remove_interested_reader(
166+
const GUID_t& reader_guid)
167+
{
168+
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
169+
interested_readers_.remove(reader_guid);
170+
}
171+
172+
void PDPStatelessWriter::reschedule_all_samples()
173+
{
174+
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
175+
size_t n_samples = history_->getHistorySize();
176+
if (0 < n_samples)
177+
{
178+
assert(1 == n_samples);
179+
auto it = history_->changesBegin();
180+
CacheChange_t* change = *it;
181+
flow_controller_->add_new_sample(this, change, std::chrono::steady_clock::now() + std::chrono::hours(24));
182+
}
183+
}
184+
185+
} // namespace rtps
186+
} // namespace fastdds
187+
} // namespace eprosima
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
// Copyright 2025 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
/**
16+
* @file PDPStatelessWriter.hpp
17+
*/
18+
19+
#ifndef FASTDDS_RTPS_BUILTIN_DISCOVERY_PARTICIPANT_SIMPLE__PDPSTATELESSWRITER_HPP
20+
#define FASTDDS_RTPS_BUILTIN_DISCOVERY_PARTICIPANT_SIMPLE__PDPSTATELESSWRITER_HPP
21+
22+
#include <chrono>
23+
24+
#include <fastdds/rtps/common/LocatorList.hpp>
25+
#include <fastdds/rtps/interfaces/IReaderDataFilter.hpp>
26+
#include <fastdds/utils/collections/ResourceLimitedVector.hpp>
27+
28+
#include <rtps/writer/StatelessWriter.hpp>
29+
30+
namespace eprosima {
31+
namespace fastdds {
32+
namespace rtps {
33+
34+
/**
35+
* Class PDPStatelessWriter, specialization of StatelessWriter with specific behavior for PDP.
36+
*/
37+
class PDPStatelessWriter : public StatelessWriter, private IReaderDataFilter
38+
{
39+
40+
public:
41+
42+
PDPStatelessWriter(
43+
RTPSParticipantImpl* participant,
44+
const GUID_t& guid,
45+
const WriterAttributes& attributes,
46+
FlowController* flow_controller,
47+
WriterHistory* history,
48+
WriterListener* listener);
49+
50+
virtual ~PDPStatelessWriter() = default;
51+
52+
//vvvvvvvvvvvvvvvvvvvvv [Exported API] vvvvvvvvvvvvvvvvvvvvv
53+
54+
bool matched_reader_add_edp(
55+
const ReaderProxyData& data) final;
56+
57+
bool matched_reader_remove(
58+
const GUID_t& reader_guid) final;
59+
60+
//^^^^^^^^^^^^^^^^^^^^^^ [Exported API] ^^^^^^^^^^^^^^^^^^^^^^^
61+
62+
//vvvvvvvvvvvvvvvvvvvvv [BaseWriter API] vvvvvvvvvvvvvvvvvvvvvv
63+
64+
void unsent_change_added_to_history(
65+
CacheChange_t* change,
66+
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) final;
67+
68+
//^^^^^^^^^^^^^^^^^^^^^^ [BaseWriter API] ^^^^^^^^^^^^^^^^^^^^^^^
69+
70+
/**
71+
* @brief Set the locators to which the writer should send periodic announcements.
72+
*
73+
* This method is used to configure the initial peers list on the PDP writer.
74+
*
75+
* @param locator_list List of locators to which the writer should send periodic announcements.
76+
*
77+
* @return true if the locators were set successfully.
78+
*/
79+
void set_initial_peers(
80+
const LocatorList& locator_list);
81+
82+
/**
83+
* Reset the unsent changes.
84+
*/
85+
void send_periodic_announcement();
86+
87+
protected:
88+
89+
bool send_to_fixed_locators(
90+
const std::vector<eprosima::fastdds::rtps::NetworkBuffer>& buffers,
91+
const uint32_t& total_bytes,
92+
std::chrono::steady_clock::time_point& max_blocking_time_point) const override;
93+
94+
private:
95+
96+
/**
97+
* This method checks whether a CacheChange_t is relevant for the specified reader
98+
* This callback should return always the same result given the same arguments
99+
* @param change The CacheChange_t to be evaluated
100+
* @param reader_guid remote reader GUID_t
101+
* @return true if relevant, false otherwise.
102+
*/
103+
bool is_relevant(
104+
const fastdds::rtps::CacheChange_t& change,
105+
const fastdds::rtps::GUID_t& reader_guid) const final;
106+
107+
/**
108+
* @brief Mark all readers as interested.
109+
*
110+
* This method sets the flag indicating that all readers are interested in the data sent by this writer.
111+
* It is used to ensure that all readers are considered when sending data.
112+
* The flag will be reset when all the samples from this writer have been sent.
113+
*/
114+
void mark_all_readers_interested();
115+
116+
/**
117+
* @brief Mark an interested reader.
118+
*
119+
* Add the guid of a reader to the list of interested readers.
120+
*
121+
* @param reader_guid The GUID of the reader to mark as interested.
122+
*/
123+
void add_interested_reader(
124+
const GUID_t& reader_guid);
125+
126+
/**
127+
* @brief Unmark an interested reader.
128+
*
129+
* Remove the guid of a reader from the list of interested readers.
130+
*
131+
* @param reader_guid The GUID of the reader to mark as interested.
132+
*/
133+
void remove_interested_reader(
134+
const GUID_t& reader_guid);
135+
136+
/**
137+
* @brief Add all samples from this writer to the flow controller.
138+
*/
139+
void reschedule_all_samples();
140+
141+
//! Configured initial peers
142+
LocatorList initial_peers_{};
143+
//! The set of readers interested
144+
mutable ResourceLimitedVector<GUID_t> interested_readers_;
145+
//! Whether we have set that all destinations are interested
146+
mutable bool should_reach_all_destinations_ = false;
147+
148+
};
149+
150+
} // namespace rtps
151+
} // namespace fastdds
152+
} // namespace eprosima
153+
154+
#endif // FASTDDS_RTPS_BUILTIN_DISCOVERY_PARTICIPANT_SIMPLE__PDPSTATELESSWRITER_HPP
155+

0 commit comments

Comments
 (0)