Skip to content

[21706] Feature: Extended incompatible QoS for monitor service #5385

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions include/fastdds/statistics/monitorservice_types.idl
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,28 @@ module statistics {
typedef BaseStatus_s InconsistentTopicStatus_s;
typedef BaseStatus_s SampleLostStatus_s;

struct ExtendedIncompatibleQoSStatus_s
{
detail::GUID_s remote_guid;
sequence<unsigned long> current_incompatible_policies;
};

typedef sequence<ExtendedIncompatibleQoSStatus_s> ExtendedIncompatibleQoSStatusSeq_s;

module StatusKind
{
typedef unsigned long StatusKind;

const StatusKind PROXY = 0;
const StatusKind CONNECTION_LIST = 1;
const StatusKind INCOMPATIBLE_QOS = 2;
const StatusKind INCONSISTENT_TOPIC = 3;
const StatusKind LIVELINESS_LOST = 4;
const StatusKind LIVELINESS_CHANGED = 5;
const StatusKind DEADLINE_MISSED = 6;
const StatusKind SAMPLE_LOST = 7;
const StatusKind STATUSES_SIZE = 8;
const StatusKind PROXY = 0;
const StatusKind CONNECTION_LIST = 1;
const StatusKind INCOMPATIBLE_QOS = 2;
const StatusKind INCONSISTENT_TOPIC = 3;
const StatusKind LIVELINESS_LOST = 4;
const StatusKind LIVELINESS_CHANGED = 5;
const StatusKind DEADLINE_MISSED = 6;
const StatusKind SAMPLE_LOST = 7;
const StatusKind EXTENDED_INCOMPATIBLE_QOS = 8;
const StatusKind STATUSES_SIZE = 9;
}; // module StatusKind

union MonitorServiceData switch(StatusKind::StatusKind)
Expand All @@ -107,6 +116,8 @@ module statistics {
DeadlineMissedStatus_s deadline_missed_status;
case StatusKind::SAMPLE_LOST:
SampleLostStatus_s sample_lost_status;
case StatusKind::EXTENDED_INCOMPATIBLE_QOS:
ExtendedIncompatibleQoSStatusSeq_s extended_incompatible_qos_status;
case StatusKind::STATUSES_SIZE:
octet statuses_size;
};
Expand Down
6 changes: 6 additions & 0 deletions src/cpp/rtps/builtin/discovery/endpoint/EDP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,7 @@ bool EDP::pairingReader(
if (no_match_reason.test(MatchingFailureMask::incompatible_qos) && reader->get_listener() != nullptr)
{
reader->get_listener()->on_requested_incompatible_qos(reader, incompatible_qos);
mp_PDP->notify_incompatible_qos_matching(R->getGuid(), wdatait->guid(), incompatible_qos);
}

//EPROSIMA_LOG_INFO(RTPS_EDP,RTPS_CYAN<<"Valid Matching to writerProxy: "<<wdatait->m_guid<<RTPS_DEF<<endl);
Expand Down Expand Up @@ -964,6 +965,7 @@ bool EDP::pairingWriter(
if (no_match_reason.test(MatchingFailureMask::incompatible_qos) && writer->get_listener() != nullptr)
{
writer->get_listener()->on_offered_incompatible_qos(writer, incompatible_qos);
mp_PDP->notify_incompatible_qos_matching(W->getGuid(), rdatait->guid(), incompatible_qos);
}

//EPROSIMA_LOG_INFO(RTPS_EDP,RTPS_CYAN<<"Valid Matching to writerProxy: "<<wdatait->m_guid<<RTPS_DEF<<endl);
Expand Down Expand Up @@ -1039,6 +1041,7 @@ bool EDP::pairing_reader_proxy_with_any_local_writer(
if (no_match_reason.test(MatchingFailureMask::incompatible_qos) && w.get_listener() != nullptr)
{
w.get_listener()->on_offered_incompatible_qos(&w, incompatible_qos);
mp_PDP->notify_incompatible_qos_matching(w.getGuid(), rdata->guid(), incompatible_qos);
}

if (w.matched_reader_is_matched(reader_guid)
Expand Down Expand Up @@ -1107,6 +1110,7 @@ bool EDP::pairing_reader_proxy_with_local_writer(
w.get_listener() != nullptr)
{
w.get_listener()->on_offered_incompatible_qos(&w, incompatible_qos);
mp_PDP->notify_incompatible_qos_matching(local_writer, rdata.guid(), incompatible_qos);
}

if (w.matched_reader_is_matched(reader_guid)
Expand Down Expand Up @@ -1230,6 +1234,7 @@ bool EDP::pairing_writer_proxy_with_any_local_reader(
if (no_match_reason.test(MatchingFailureMask::incompatible_qos) && r.get_listener() != nullptr)
{
r.get_listener()->on_requested_incompatible_qos(&r, incompatible_qos);
mp_PDP->notify_incompatible_qos_matching(r.getGuid(), wdata->guid(), incompatible_qos);
}

if (r.matched_writer_is_matched(writer_guid)
Expand Down Expand Up @@ -1298,6 +1303,7 @@ bool EDP::pairing_writer_proxy_with_local_reader(
r.get_listener() != nullptr)
{
r.get_listener()->on_requested_incompatible_qos(&r, incompatible_qos);
mp_PDP->notify_incompatible_qos_matching(local_reader, wdata.guid(), incompatible_qos);
}

if (r.matched_writer_is_matched(writer_guid)
Expand Down
1 change: 0 additions & 1 deletion src/cpp/rtps/builtin/discovery/endpoint/EDP.h
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,6 @@ class EDP
bool checkDataRepresentationQos(
const WriterProxyData* wdata,
const ReaderProxyData* rdata) const;

};

} // namespace rtps
Expand Down
37 changes: 37 additions & 0 deletions src/cpp/rtps/builtin/discovery/participant/PDP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,15 @@ bool PDP::removeReaderProxyData(
listener->on_reader_discovery(participant, reason, info, should_be_ignored);
}

#ifdef FASTDDS_STATISTICS
auto proxy_observer = get_proxy_observer();
// notify monitor service
if (nullptr != proxy_observer)
{
proxy_observer->on_remote_proxy_data_removed(pR->guid());
}
#endif // ifdef FASTDDS_STATISTICS

// Clear reader proxy data and move to pool in order to allow reuse
pR->clear();
pit->m_readers->erase(rit);
Expand Down Expand Up @@ -848,6 +857,15 @@ bool PDP::removeWriterProxyData(
listener->on_writer_discovery(participant, status, info, should_be_ignored);
}

#ifdef FASTDDS_STATISTICS
auto proxy_observer = get_proxy_observer();
// notify monitor service
if (nullptr != get_proxy_observer())
{
proxy_observer->on_remote_proxy_data_removed(pW->guid());
}
#endif // ifdef FASTDDS_STATISTICS

// Clear writer proxy data and move to pool in order to allow reuse
pW->clear();
pit->m_writers->erase(wit);
Expand Down Expand Up @@ -1746,6 +1764,25 @@ void PDP::local_participant_attributes_update_nts(
}
}

void PDP::notify_incompatible_qos_matching(
const GUID_t& local_guid,
const GUID_t& remote_guid,
const fastdds::dds::PolicyMask& incompatible_qos) const
{
#ifdef FASTDDS_STATISTICS
auto proxy_observer = get_proxy_observer();
// Notify the IProxyObserver implementor of a qos incompatibility
if (nullptr != proxy_observer)
{
proxy_observer->on_incompatible_qos_matching(local_guid, remote_guid, incompatible_qos);
}
#else
static_cast<void>(local_guid);
static_cast<void>(remote_guid);
static_cast<void>(incompatible_qos);
#endif // FASTDDS_STATISTICS
}

void PDP::update_endpoint_locators_if_default_nts(
const std::vector<BaseWriter*>& writers,
const std::vector<BaseReader*>& readers,
Expand Down
15 changes: 14 additions & 1 deletion src/cpp/rtps/builtin/discovery/participant/PDP.h
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ class PDP : public fastdds::statistics::rtps::IProxyQueryable
void set_proxy_observer(
const fastdds::statistics::rtps::IProxyObserver* proxy_observer);

const fastdds::statistics::rtps::IProxyObserver* get_proxy_observer()
const fastdds::statistics::rtps::IProxyObserver* get_proxy_observer() const
{
return proxy_observer_.load();
}
Expand Down Expand Up @@ -500,6 +500,19 @@ class PDP : public fastdds::statistics::rtps::IProxyQueryable
const RTPSParticipantAttributes& old_atts,
const RTPSParticipantAttributes& new_atts);

/**
* @brief Notify monitor the IProxyObserver implementor about
* any incompatible QoS matching between a local and a remote entity.
*
* @param local_guid GUID of the local entity.
* @param remote_guid GUID of the remote entity.
* @param incompatible_qos The PolicyMask with the incompatible QoS.
*/
void notify_incompatible_qos_matching(
const GUID_t& local_guid,
const GUID_t& remote_guid,
const fastdds::dds::PolicyMask& incompatible_qos) const;

protected:

//!Pointer to the builtin protocols object.
Expand Down
132 changes: 132 additions & 0 deletions src/cpp/statistics/rtps/monitor-service/MonitorService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,26 @@ bool MonitorService::disable_monitor_service()
bool MonitorService::remove_local_entity(
const fastdds::rtps::EntityId_t& entity_id)
{
// Remove the entity from the extended incompatible QoS collection
{
std::lock_guard<std::mutex> lock(extended_incompatible_qos_mtx_);
GUID_t entity_guid = {local_participant_guid_.guidPrefix, entity_id};
extended_incompatible_qos_collection_.erase(entity_guid);
}

// Remove the entity from the local entities
{
std::lock_guard<std::mutex> lock (mtx_);

//! Add the entity to the changed entities if was not already present
if (!local_entities_[entity_id].second)
{
changed_entities_.push_back(entity_id);
if (!timer_active_.load())
{
event_->restart_timer();
timer_active_.store(true);
}
}

//! But remove it from the collection of entities
Expand Down Expand Up @@ -326,6 +339,12 @@ bool MonitorService::write_status(
status_retrieved = status_queryable_.get_monitoring_status(local_entity_guid, data);
break;
}
case StatusKind::EXTENDED_INCOMPATIBLE_QOS:
{
std::lock_guard<std::mutex> lock(extended_incompatible_qos_mtx_);
data.extended_incompatible_qos_status(extended_incompatible_qos_collection_[local_entity_guid]);
break;
}
default:
{
EPROSIMA_LOG_ERROR(MONITOR_SERVICE, "Referring to an unknown status");
Expand Down Expand Up @@ -570,6 +589,119 @@ bool MonitorService::spin_queue()
return re_schedule;
}

void MonitorService::on_incompatible_qos_matching(
const fastdds::rtps::GUID_t& local_guid,
const fastdds::rtps::GUID_t& remote_guid,
const fastdds::dds::PolicyMask& incompatible_qos_policies)
{
// Convert the PolicyMask to a vector of policy ids
std::vector<uint32_t> incompatible_policies;
for (uint32_t id = 1; id < dds::NEXT_QOS_POLICY_ID; ++id)
{
if (incompatible_qos_policies.test(id))
{
incompatible_policies.push_back(id);
}
}

std::lock_guard<std::mutex> lock(extended_incompatible_qos_mtx_);

if (!incompatible_policies.empty())
{
// Check if the local_guid is already in the collection. If not, create a new entry
auto local_entity_incompatibilites =
extended_incompatible_qos_collection_.insert({local_guid, {}});

bool first_incompatibility_with_remote = false;

// Local entity already in the collection (has any incompatible QoS with any remote entity)
if (!local_entity_incompatibilites.second)
{
// Check if the local entitiy already had an incompatibility with this remote entity
auto it = std::find_if(
local_entity_incompatibilites.first->second.begin(),
local_entity_incompatibilites.first->second.end(),
[&remote_guid](const ExtendedIncompatibleQoSStatus_s& status)
{
return to_fastdds_type(status.remote_guid()) == remote_guid;
});

if (it == local_entity_incompatibilites.first->second.end())
{
// First incompatibility with that remote entity
first_incompatibility_with_remote = true;
}
else
{
// Already had an incompatibility with that remote entity.
// Update them
it->current_incompatible_policies(incompatible_policies);
push_entity_update(local_guid.entityId, StatusKind::EXTENDED_INCOMPATIBLE_QOS);
}
}
else
{
// This will be the first incompatibility of this entity
first_incompatibility_with_remote = true;
}

if (first_incompatibility_with_remote)
{
ExtendedIncompatibleQoSStatus_s status;
status.remote_guid(to_statistics_type(remote_guid));
status.current_incompatible_policies(incompatible_policies);
local_entity_incompatibilites.first->second.emplace_back(status);
push_entity_update(local_guid.entityId, StatusKind::EXTENDED_INCOMPATIBLE_QOS);
}
}
else
{
// Remove remote guid from the local guid incompatibilities collection
auto it = extended_incompatible_qos_collection_.find(local_guid);

if (it != extended_incompatible_qos_collection_.end())
{
auto it_remote = std::find_if(
it->second.begin(),
it->second.end(),
[&remote_guid](const ExtendedIncompatibleQoSStatus_s& status)
{
return to_fastdds_type(status.remote_guid()) == remote_guid;
});

if (it_remote != it->second.end())
{
it->second.erase(it_remote);
push_entity_update(local_guid.entityId, StatusKind::EXTENDED_INCOMPATIBLE_QOS);
}
}
}
}

void MonitorService::on_remote_proxy_data_removed(
const fastdds::rtps::GUID_t& removed_proxy_guid)
{
auto& ext_incompatible_qos_collection = extended_incompatible_qos_collection_;
std::lock_guard<std::mutex> lock(extended_incompatible_qos_mtx_);

for (auto& local_entity : ext_incompatible_qos_collection)
{
auto it = std::find_if(
local_entity.second.begin(),
local_entity.second.end(),
[&removed_proxy_guid](const ExtendedIncompatibleQoSStatus_s& status)
{
return to_fastdds_type(status.remote_guid()) == removed_proxy_guid;
});

if (it != local_entity.second.end())
{
local_entity.second.erase(it);
push_entity_update(local_entity.first.entityId, StatusKind::EXTENDED_INCOMPATIBLE_QOS);
}
}
}

} // namespace rtps
} // namespace statistics
} // namespace fastdds
Expand Down
31 changes: 31 additions & 0 deletions src/cpp/statistics/rtps/monitor-service/MonitorService.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,30 @@ class MonitorService
const fastdds::rtps::EntityId_t& entity_id,
const uint32_t& status_id);

/**
* @brief Process any updates regarding
* remote entities incompatible QoS matching.
*
* @param local_guid The GUID_t identifying the local entity
* @param remote_guid The GUID_t identifying the remote entity
* @param incompatible_qos The PolicyMask with the incompatible QoS
*
*/
void on_incompatible_qos_matching(
const fastdds::rtps::GUID_t& local_guid,
const fastdds::rtps::GUID_t& remote_guid,
const fastdds::dds::PolicyMask& incompatible_qos_policies);

/**
* @brief Notifies that a remote proxy data has been removed.
* This is interesting to notify proxy removals independently
* of the remote entity being matched or not.
*
* @param removed_proxy_guid GUID of the removed proxy.
*/
void on_remote_proxy_data_removed(
const fastdds::rtps::GUID_t& removed_proxy_guid);

private:

/**
Expand Down Expand Up @@ -257,6 +281,13 @@ class MonitorService
endpoint_registrator_t endpoint_registrator_;

MonitorServiceStatusDataPubSubType type_;

// Stores the current extended incompatible qos status
// of local entities with remote entities and their policies.
std::map<fastdds::rtps::GUID_t, ExtendedIncompatibleQoSStatusSeq_s>
extended_incompatible_qos_collection_;

std::mutex extended_incompatible_qos_mtx_;
};

#endif // FASTDDS_STATISTICS
Expand Down
Loading
Loading