Skip to content

[22604] Filter interested readers on PDP writer #5604

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ set(${PROJECT_NAME}_source_files
rtps/builtin/discovery/participant/PDPServer.cpp
rtps/builtin/discovery/participant/PDPServerListener.cpp
rtps/builtin/discovery/participant/PDPSimple.cpp
rtps/builtin/discovery/participant/simple/PDPStatelessWriter.cpp
rtps/builtin/discovery/participant/timedevent/DSClientEvent.cpp
rtps/builtin/discovery/participant/timedevent/DServerEvent.cpp
rtps/builtin/liveliness/WLP.cpp
Expand Down
11 changes: 3 additions & 8 deletions src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ void PDPSimple::announceParticipantState(

if (!(dispose || new_change))
{
endpoints->writer.writer_->unsent_changes_reset();
endpoints->writer.writer_->send_periodic_announcement();
}
}
}
Expand Down Expand Up @@ -403,7 +403,7 @@ bool PDPSimple::create_dcps_participant_endpoints()
if (mp_RTPSParticipant->createWriter(&rtps_writer, watt, writer.history_.get(),
nullptr, writer_entity_id, true))
{
writer.writer_ = dynamic_cast<StatelessWriter*>(rtps_writer);
writer.writer_ = dynamic_cast<PDPStatelessWriter*>(rtps_writer);
assert(nullptr != writer.writer_);

#if HAVE_SECURITY
Expand Down Expand Up @@ -455,7 +455,7 @@ bool PDPSimple::create_dcps_participant_endpoints()
EPROSIMA_LOG_WARNING(RTPS_PDP, "Ignoring initial peers locator " << loc << " : not allowed.");
}
}
writer.writer_->set_fixed_locators(fixed_locators);
writer.writer_->set_initial_peers(fixed_locators);
}
else
{
Expand Down Expand Up @@ -727,11 +727,6 @@ void PDPSimple::match_pdp_remote_endpoints(
{
writer->matched_reader_add_edp(*temp_reader_data);
}

if (!writer_only && (dds::BEST_EFFORT_RELIABILITY_QOS == reliability_kind))
{
endpoints->writer.writer_->unsent_changes_reset();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
// Copyright 2025 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* @file PDPStatelessWriter.cpp
*/

#include <rtps/builtin/discovery/participant/simple/PDPStatelessWriter.hpp>

#include <algorithm>
#include <cassert>
#include <chrono>
#include <cstdint>
#include <mutex>
#include <set>
#include <vector>

#include <fastdds/rtps/common/LocatorList.hpp>
#include <fastdds/rtps/history/WriterHistory.hpp>
#include <fastdds/rtps/transport/NetworkBuffer.hpp>
#include <fastdds/utils/collections/ResourceLimitedVector.hpp>
#include <fastdds/utils/TimedMutex.hpp>

#include <rtps/builtin/data/ReaderProxyData.hpp>
#include <rtps/participant/RTPSParticipantImpl.hpp>
#include <rtps/writer/StatelessWriter.hpp>

namespace eprosima {
namespace fastdds {
namespace rtps {

PDPStatelessWriter::PDPStatelessWriter(
RTPSParticipantImpl* participant,
const GUID_t& guid,
const WriterAttributes& attributes,
FlowController* flow_controller,
WriterHistory* history,
WriterListener* listener)
: StatelessWriter(participant, guid, attributes, flow_controller, history, listener)
, interested_readers_(participant->get_attributes().allocation.participants)
{
}

bool PDPStatelessWriter::matched_reader_add_edp(
const ReaderProxyData& data)
{
bool ret = StatelessWriter::matched_reader_add_edp(data);
if (ret)
{
// Mark new reader as interested
add_interested_reader(data.guid());
// Send announcement to new reader
reschedule_all_samples();
}
return ret;
}

bool PDPStatelessWriter::matched_reader_remove(
const GUID_t& reader_guid)
{
bool ret = StatelessWriter::matched_reader_remove(reader_guid);
if (ret)
{
// Mark reader as not interested
remove_interested_reader(reader_guid);
}
return ret;
}

void PDPStatelessWriter::unsent_change_added_to_history(
CacheChange_t* change,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time)
{
mark_all_readers_interested();
StatelessWriter::unsent_change_added_to_history(change, max_blocking_time);
}

void PDPStatelessWriter::set_initial_peers(
const LocatorList& locator_list)
{
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);

initial_peers_.push_back(locator_list);
mp_RTPSParticipant->createSenderResources(initial_peers_);
}

void PDPStatelessWriter::send_periodic_announcement()
{
mark_all_readers_interested();
reschedule_all_samples();
}

bool PDPStatelessWriter::send_to_fixed_locators(
const std::vector<eprosima::fastdds::rtps::NetworkBuffer>& buffers,
const uint32_t& total_bytes,
std::chrono::steady_clock::time_point& max_blocking_time_point) const
{
bool ret = true;

if (should_reach_all_destinations_)
{
ret = initial_peers_.empty() ||
mp_RTPSParticipant->sendSync(buffers, total_bytes, m_guid,
Locators(initial_peers_.begin()), Locators(initial_peers_.end()),
max_blocking_time_point);

if (ret)
{
fixed_locators_.clear();
should_reach_all_destinations_ = false;
}
}
else
{
interested_readers_.clear();
}

return ret;
}

bool PDPStatelessWriter::is_relevant(
const fastdds::rtps::CacheChange_t& /* change */,
const fastdds::rtps::GUID_t& reader_guid) const
{
return interested_readers_.end() !=
std::find(interested_readers_.begin(), interested_readers_.end(), reader_guid);
}

void PDPStatelessWriter::mark_all_readers_interested()
{
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
should_reach_all_destinations_ = true;
interested_readers_.clear();
fixed_locators_.clear();
fixed_locators_.push_back(initial_peers_);
reader_data_filter(nullptr);
}

void PDPStatelessWriter::add_interested_reader(
const GUID_t& reader_guid)
{
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
if (!should_reach_all_destinations_)
{
auto it = std::find(interested_readers_.begin(), interested_readers_.end(), reader_guid);
if (it == interested_readers_.end())
{
interested_readers_.emplace_back(reader_guid);
reader_data_filter(this);
}
}
}

void PDPStatelessWriter::remove_interested_reader(
const GUID_t& reader_guid)
{
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
interested_readers_.remove(reader_guid);
}

void PDPStatelessWriter::reschedule_all_samples()
{
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
size_t n_samples = history_->getHistorySize();
if (0 < n_samples)
{
assert(1 == n_samples);
auto it = history_->changesBegin();
CacheChange_t* change = *it;
flow_controller_->add_new_sample(this, change, std::chrono::steady_clock::now() + std::chrono::hours(24));
}
}

} // namespace rtps
} // namespace fastdds
} // namespace eprosima
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright 2025 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* @file PDPStatelessWriter.hpp
*/

#ifndef FASTDDS_RTPS_BUILTIN_DISCOVERY_PARTICIPANT_SIMPLE__PDPSTATELESSWRITER_HPP
#define FASTDDS_RTPS_BUILTIN_DISCOVERY_PARTICIPANT_SIMPLE__PDPSTATELESSWRITER_HPP

#include <chrono>

#include <fastdds/rtps/common/LocatorList.hpp>
#include <fastdds/rtps/interfaces/IReaderDataFilter.hpp>
#include <fastdds/utils/collections/ResourceLimitedVector.hpp>

#include <rtps/writer/StatelessWriter.hpp>

namespace eprosima {
namespace fastdds {
namespace rtps {

/**
* Class PDPStatelessWriter, specialization of StatelessWriter with specific behavior for PDP.
*/
class PDPStatelessWriter : public StatelessWriter, private IReaderDataFilter
{

public:

PDPStatelessWriter(
RTPSParticipantImpl* participant,
const GUID_t& guid,
const WriterAttributes& attributes,
FlowController* flow_controller,
WriterHistory* history,
WriterListener* listener);

virtual ~PDPStatelessWriter() = default;

//vvvvvvvvvvvvvvvvvvvvv [Exported API] vvvvvvvvvvvvvvvvvvvvv

bool matched_reader_add_edp(
const ReaderProxyData& data) final;

bool matched_reader_remove(
const GUID_t& reader_guid) final;

//^^^^^^^^^^^^^^^^^^^^^^ [Exported API] ^^^^^^^^^^^^^^^^^^^^^^^

//vvvvvvvvvvvvvvvvvvvvv [BaseWriter API] vvvvvvvvvvvvvvvvvvvvvv

void unsent_change_added_to_history(
CacheChange_t* change,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) final;

//^^^^^^^^^^^^^^^^^^^^^^ [BaseWriter API] ^^^^^^^^^^^^^^^^^^^^^^^

/**
* @brief Set the locators to which the writer should send periodic announcements.
*
* This method is used to configure the initial peers list on the PDP writer.
*
* @param locator_list List of locators to which the writer should send periodic announcements.
*
* @return true if the locators were set successfully.
*/
void set_initial_peers(
const LocatorList& locator_list);

/**
* Reset the unsent changes.
*/
void send_periodic_announcement();

protected:

bool send_to_fixed_locators(
const std::vector<eprosima::fastdds::rtps::NetworkBuffer>& buffers,
const uint32_t& total_bytes,
std::chrono::steady_clock::time_point& max_blocking_time_point) const override;

private:

/**
* This method checks whether a CacheChange_t is relevant for the specified reader
* This callback should return always the same result given the same arguments
* @param change The CacheChange_t to be evaluated
* @param reader_guid remote reader GUID_t
* @return true if relevant, false otherwise.
*/
bool is_relevant(
const fastdds::rtps::CacheChange_t& change,
const fastdds::rtps::GUID_t& reader_guid) const final;

/**
* @brief Mark all readers as interested.
*
* This method sets the flag indicating that all readers are interested in the data sent by this writer.
* It is used to ensure that all readers are considered when sending data.
* The flag will be reset when all the samples from this writer have been sent.
*/
void mark_all_readers_interested();

/**
* @brief Mark an interested reader.
*
* Add the guid of a reader to the list of interested readers.
*
* @param reader_guid The GUID of the reader to mark as interested.
*/
void add_interested_reader(
const GUID_t& reader_guid);

/**
* @brief Unmark an interested reader.
*
* Remove the guid of a reader from the list of interested readers.
*
* @param reader_guid The GUID of the reader to mark as interested.
*/
void remove_interested_reader(
const GUID_t& reader_guid);

/**
* @brief Add all samples from this writer to the flow controller.
*/
void reschedule_all_samples();

//! Configured initial peers
LocatorList initial_peers_{};
//! The set of readers interested
mutable ResourceLimitedVector<GUID_t> interested_readers_;
//! Whether we have set that all destinations are interested
mutable bool should_reach_all_destinations_ = false;

};

} // namespace rtps
} // namespace fastdds
} // namespace eprosima

#endif // FASTDDS_RTPS_BUILTIN_DISCOVERY_PARTICIPANT_SIMPLE__PDPSTATELESSWRITER_HPP

Loading
Loading