Skip to content

[22849] Refactor builtin endpoints creation #5679

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 5 additions & 14 deletions src/cpp/fastdds/builtin/type_lookup_service/TypeLookupManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <rtps/builtin/data/ParticipantProxyData.hpp>
#include <rtps/builtin/data/ReaderProxyData.hpp>
#include <rtps/builtin/data/WriterProxyData.hpp>
#include <rtps/builtin/discovery/participant/PDP.h>
#include <rtps/participant/RTPSParticipantImpl.hpp>
#include <rtps/reader/StatefulReader.hpp>
#include <rtps/RTPSDomainImpl.hpp>
Expand Down Expand Up @@ -557,28 +558,18 @@ bool TypeLookupManager::create_endpoints()
hatt.maximumReservedCaches = 1000;
hatt.payloadMaxSize = TypeLookupManager::typelookup_data_max_size;

WriterAttributes watt;
watt.endpoint.unicastLocatorList = builtin_protocols_->m_metatrafficUnicastLocatorList;
watt.endpoint.multicastLocatorList = builtin_protocols_->m_metatrafficMulticastLocatorList;
watt.endpoint.external_unicast_locators = builtin_protocols_->m_att.metatraffic_external_unicast_locators;
watt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators;
WriterAttributes watt = PDP::static_create_builtin_writer_attributes(participant_);
rtps::set_builtin_endpoint_locators(watt.endpoint, pattr, nullptr, builtin_protocols_->m_att, builtin_protocols_);
watt.endpoint.remoteLocatorList = builtin_protocols_->m_initialPeersList;
watt.matched_readers_allocation = pattr.allocation.participants;
watt.endpoint.topicKind = fastdds::rtps::NO_KEY;
watt.endpoint.reliabilityKind = fastdds::rtps::RELIABLE;
watt.endpoint.durabilityKind = fastdds::rtps::VOLATILE;
watt.mode = fastdds::rtps::ASYNCHRONOUS_WRITER;

ReaderAttributes ratt;
ratt.endpoint.unicastLocatorList = builtin_protocols_->m_metatrafficUnicastLocatorList;
ratt.endpoint.multicastLocatorList = builtin_protocols_->m_metatrafficMulticastLocatorList;
ratt.endpoint.external_unicast_locators = builtin_protocols_->m_att.metatraffic_external_unicast_locators;
ratt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators;
ReaderAttributes ratt = PDP::static_create_builtin_reader_attributes(participant_);
rtps::set_builtin_endpoint_locators(ratt.endpoint, pattr, nullptr, builtin_protocols_->m_att, builtin_protocols_);
ratt.endpoint.remoteLocatorList = builtin_protocols_->m_initialPeersList;
ratt.matched_writers_allocation = pattr.allocation.participants;
ratt.expects_inline_qos = true;
ratt.endpoint.topicKind = fastdds::rtps::NO_KEY;
ratt.endpoint.reliabilityKind = fastdds::rtps::RELIABLE;
ratt.endpoint.durabilityKind = fastdds::rtps::VOLATILE;

// Built-in request writer
Expand Down
65 changes: 50 additions & 15 deletions src/cpp/rtps/builtin/discovery/participant/PDP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1614,20 +1614,19 @@ static void set_builtin_matched_allocation(
}
}

static void set_builtin_endpoint_locators(
void set_builtin_endpoint_locators(
EndpointAttributes& endpoint,
const PDP* pdp,
const BuiltinProtocols* builtin)
const RTPSParticipantAttributes& pattr,
const ParticipantProxyData* part_data,
const BuiltinAttributes& builtin_attr,
const BuiltinProtocols* builtin_protocol)
{
const RTPSParticipantAttributes& pattr = pdp->getRTPSParticipant()->get_attributes();

auto part_data = pdp->getLocalParticipantProxyData();
if (nullptr == part_data)
{
// Local participant data has not yet been created.
// This means we are creating the PDP endpoints, so we copy the locators from mp_builtin
endpoint.multicastLocatorList = builtin->m_metatrafficMulticastLocatorList;
endpoint.unicastLocatorList = builtin->m_metatrafficUnicastLocatorList;
endpoint.multicastLocatorList = builtin_protocol->m_metatrafficMulticastLocatorList;
endpoint.unicastLocatorList = builtin_protocol->m_metatrafficUnicastLocatorList;
}
else
{
Expand All @@ -1645,42 +1644,78 @@ static void set_builtin_endpoint_locators(
}

// External locators are always taken from the same place
endpoint.external_unicast_locators = pdp->builtin_attributes().metatraffic_external_unicast_locators;
endpoint.external_unicast_locators = builtin_attr.metatraffic_external_unicast_locators;
endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators;
}

ReaderAttributes PDP::create_builtin_reader_attributes() const
ReaderAttributes PDP::static_create_builtin_reader_attributes(
const RTPSParticipantImpl* RTPSParticipant)
{
ReaderAttributes attributes;

const RTPSParticipantAttributes& pattr = getRTPSParticipant()->get_attributes();
const RTPSParticipantAttributes& pattr = RTPSParticipant->get_attributes();
set_builtin_matched_allocation(attributes.matched_writers_allocation, pattr);
set_builtin_endpoint_locators(attributes.endpoint, this, mp_builtin);

// Builtin endpoints are always reliable, transient local, keyed topics
attributes.endpoint.reliabilityKind = RELIABLE;
attributes.endpoint.durabilityKind = TRANSIENT_LOCAL;
attributes.endpoint.topicKind = WITH_KEY;

attributes.endpoint.endpointKind = READER;

// Built-in readers never expect inline qos
attributes.expects_inline_qos = false;

attributes.times.heartbeat_response_delay = pdp_heartbeat_response_delay;

return attributes;
}

ReaderAttributes PDP::create_builtin_reader_attributes()
{
ReaderAttributes attributes = static_create_builtin_reader_attributes(getRTPSParticipant());

set_builtin_endpoint_locators(attributes.endpoint, getRTPSParticipant()->get_attributes(),
this->getLocalParticipantProxyData(), this->builtin_attributes(), mp_builtin);

return attributes;
}

WriterAttributes PDP::create_builtin_writer_attributes() const
WriterAttributes PDP::static_create_builtin_writer_attributes(
const RTPSParticipantImpl* RTPSParticipant)
{
WriterAttributes attributes;

const RTPSParticipantAttributes& pattr = getRTPSParticipant()->get_attributes();
const RTPSParticipantAttributes& pattr = RTPSParticipant->get_attributes();
set_builtin_matched_allocation(attributes.matched_readers_allocation, pattr);
set_builtin_endpoint_locators(attributes.endpoint, this, mp_builtin);

// Builtin endpoints are always reliable, transient local, keyed topics
attributes.endpoint.reliabilityKind = RELIABLE;
attributes.endpoint.durabilityKind = TRANSIENT_LOCAL;
attributes.endpoint.topicKind = WITH_KEY;

attributes.endpoint.endpointKind = WRITER;

// We assume that if we have at least one flow controller defined, we use async flow controller
if (!pattr.flow_controllers.empty())
{
attributes.mode = ASYNCHRONOUS_WRITER;
}

attributes.times.heartbeat_period = pdp_heartbeat_period;
attributes.times.nack_response_delay = pdp_nack_response_delay;
attributes.times.nack_supression_duration = pdp_nack_supression_duration;

return attributes;
}

WriterAttributes PDP::create_builtin_writer_attributes()
{
WriterAttributes attributes = static_create_builtin_writer_attributes(getRTPSParticipant());

set_builtin_endpoint_locators(attributes.endpoint, getRTPSParticipant()->get_attributes(),
this->getLocalParticipantProxyData(), this->builtin_attributes(), mp_builtin);

return attributes;
}

Expand Down
27 changes: 25 additions & 2 deletions src/cpp/rtps/builtin/discovery/participant/PDP.h
Original file line number Diff line number Diff line change
Expand Up @@ -441,9 +441,25 @@ class PDP : public fastdds::statistics::rtps::IProxyQueryable
return temp_writer_proxies_;
}

ReaderAttributes create_builtin_reader_attributes() const;
ReaderAttributes create_builtin_reader_attributes();

WriterAttributes create_builtin_writer_attributes() const;
WriterAttributes create_builtin_writer_attributes();

/**
* Create the attributes common to any builtin reader
* It is a static method to allow TypeLookupManager to create builtin readers
* @return ReaderAttributes
*/
static ReaderAttributes static_create_builtin_reader_attributes(
const RTPSParticipantImpl* RTPSParticipant);

/**
* Create the attributes common to any builtin writer
* It is a static method to allow TypeLookupManager to create builtin writers
* @return WriterAttributes
*/
static WriterAttributes static_create_builtin_writer_attributes(
const RTPSParticipantImpl* RTPSParticipant);

#if HAVE_SECURITY
void add_builtin_security_attributes(
Expand Down Expand Up @@ -679,6 +695,13 @@ class PDP : public fastdds::statistics::rtps::IProxyQueryable

};

void set_builtin_endpoint_locators(
EndpointAttributes& endpoint,
const RTPSParticipantAttributes& pattr,
const ParticipantProxyData* part_data,
const BuiltinAttributes& builtin_attr,
const BuiltinProtocols* builtin_protocol);


// configuration values for PDP reliable entities.
extern const dds::Duration_t pdp_heartbeat_period;
Expand Down
35 changes: 3 additions & 32 deletions src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,6 @@ bool PDPClient::create_ds_pdp_reliable_endpoints(

EPROSIMA_LOG_INFO(RTPS_PDP, "Beginning PDPClient Endpoints creation");

const RTPSParticipantAttributes& pattr = mp_RTPSParticipant->get_attributes();

/***********************************
* PDP READER
***********************************/
Expand All @@ -322,17 +320,8 @@ bool PDPClient::create_ds_pdp_reliable_endpoints(
hatt.memoryPolicy = mp_builtin->m_att.readerHistoryMemoryPolicy;
endpoints.reader.history_.reset(new ReaderHistory(hatt));

ReaderAttributes ratt;
ratt.expects_inline_qos = false;
ratt.endpoint.endpointKind = READER;
ratt.endpoint.multicastLocatorList = mp_builtin->m_metatrafficMulticastLocatorList;
ratt.endpoint.unicastLocatorList = mp_builtin->m_metatrafficUnicastLocatorList;
ratt.endpoint.external_unicast_locators = mp_builtin->m_att.metatraffic_external_unicast_locators;
ratt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators;
ratt.endpoint.topicKind = WITH_KEY;
ratt.endpoint.durabilityKind = TRANSIENT_LOCAL;
ratt.endpoint.reliabilityKind = RELIABLE;
ratt.times.heartbeat_response_delay = pdp_heartbeat_response_delay;
ReaderAttributes ratt = create_builtin_reader_attributes();

#if HAVE_SECURITY
if (is_discovery_protected)
{
Expand Down Expand Up @@ -377,18 +366,7 @@ bool PDPClient::create_ds_pdp_reliable_endpoints(
hatt.memoryPolicy = mp_builtin->m_att.writerHistoryMemoryPolicy;
endpoints.writer.history_.reset(new WriterHistory(hatt));

WriterAttributes watt;
watt.endpoint.endpointKind = WRITER;
watt.endpoint.durabilityKind = TRANSIENT_LOCAL;
watt.endpoint.reliabilityKind = RELIABLE;
watt.endpoint.topicKind = WITH_KEY;
watt.endpoint.multicastLocatorList = mp_builtin->m_metatrafficMulticastLocatorList;
watt.endpoint.unicastLocatorList = mp_builtin->m_metatrafficUnicastLocatorList;
watt.endpoint.external_unicast_locators = mp_builtin->m_att.metatraffic_external_unicast_locators;
watt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators;
watt.times.heartbeat_period = pdp_heartbeat_period;
watt.times.nack_response_delay = pdp_nack_response_delay;
watt.times.nack_supression_duration = pdp_nack_supression_duration;
WriterAttributes watt = create_builtin_writer_attributes();

#if HAVE_SECURITY
if (is_discovery_protected)
Expand All @@ -399,13 +377,6 @@ bool PDPClient::create_ds_pdp_reliable_endpoints(
}
#endif // HAVE_SECURITY

// We assume that if we have at least one flow controller defined, we use async flow controller
if (!pattr.flow_controllers.empty())
{
watt.mode = ASYNCHRONOUS_WRITER;
watt.flow_controller_name = fastdds::rtps::async_flow_controller_name;
}

RTPSWriter* wout = nullptr;
#if HAVE_SECURITY
EntityId_t writer_entity =
Expand Down
29 changes: 6 additions & 23 deletions src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,6 @@ bool PDPServer::create_ds_pdp_reliable_endpoints(
bool secure)
{
static_cast<void>(secure);
const RTPSParticipantAttributes& pattr = mp_RTPSParticipant->get_attributes();

/***********************************
* PDP READER
Expand All @@ -349,18 +348,10 @@ bool PDPServer::create_ds_pdp_reliable_endpoints(
endpoints.reader.history_.reset(new ReaderHistory(hatt));

// PDP Reader Attributes
ReaderAttributes ratt;
ratt.expects_inline_qos = false;
ratt.endpoint.endpointKind = READER;
ratt.endpoint.multicastLocatorList = mp_builtin->m_metatrafficMulticastLocatorList;
ratt.endpoint.unicastLocatorList = mp_builtin->m_metatrafficUnicastLocatorList;
ratt.endpoint.external_unicast_locators = mp_builtin->m_att.metatraffic_external_unicast_locators;
ratt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators;
ratt.endpoint.topicKind = WITH_KEY;
ReaderAttributes ratt = create_builtin_reader_attributes();
// Change depending on backup mode
ratt.endpoint.durabilityKind = durability_;
ratt.endpoint.reliabilityKind = RELIABLE;
ratt.times.heartbeat_response_delay = pdp_heartbeat_response_delay;

#if HAVE_SECURITY
if (secure)
{
Expand Down Expand Up @@ -422,8 +413,8 @@ bool PDPServer::create_ds_pdp_reliable_endpoints(
endpoints.writer.history_.reset(new WriterHistory(hatt));

// PDP Writer Attributes
WriterAttributes watt;
watt.endpoint.endpointKind = WRITER;
WriterAttributes watt = create_builtin_writer_attributes();

// VOLATILE durability to highlight that on steady state the history is empty (except for announcement DATAs)
// this setting is incompatible with CLIENTs TRANSIENT_LOCAL PDP readers but not validation is done on builitin
// endpoints
Expand All @@ -435,16 +426,8 @@ bool PDPServer::create_ds_pdp_reliable_endpoints(
get_writer_persistence_file_name()));
#endif // HAVE_SQLITE3

watt.endpoint.reliabilityKind = RELIABLE;
watt.endpoint.topicKind = WITH_KEY;
watt.endpoint.multicastLocatorList = mp_builtin->m_metatrafficMulticastLocatorList;
watt.endpoint.unicastLocatorList = mp_builtin->m_metatrafficUnicastLocatorList;
watt.endpoint.external_unicast_locators = mp_builtin->m_att.metatraffic_external_unicast_locators;
watt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators;
watt.times.heartbeat_period = pdp_heartbeat_period;
watt.times.nack_response_delay = pdp_nack_response_delay;
watt.times.nack_supression_duration = pdp_nack_supression_duration;
watt.mode = ASYNCHRONOUS_WRITER;
watt.mode = ASYNCHRONOUS_WRITER; //

// Enable separate sending so the filter can be called for each change and reader proxy
watt.separate_sending = true;
#if HAVE_SECURITY
Expand Down
6 changes: 0 additions & 6 deletions src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,12 +392,6 @@ bool PDPSimple::create_dcps_participant_endpoints()
mp_RTPSParticipant->createSenderResources(entry);
}

// We assume that if we have at least one flow controller defined, we use async flow controller
if (!pattr.flow_controllers.empty())
{
watt.mode = ASYNCHRONOUS_WRITER;
watt.flow_controller_name = fastdds::rtps::async_flow_controller_name;
}

RTPSWriter* rtps_writer = nullptr;
if (mp_RTPSParticipant->createWriter(&rtps_writer, watt, writer.history_.get(),
Expand Down
Loading
Loading