Skip to content

[17283] Fix issues in Dynamic Network Interfaces #5282

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
f4177dc
Refs #21690. Parse `--rescan` argument on communication applications.
MiguelCompany Sep 27, 2024
c901d7a
Refs #21690. Implement rescan mechanism.
MiguelCompany Sep 27, 2024
0ac94ca
Refs #21690. Add docker infrastructure.
MiguelCompany Sep 30, 2024
d4a2a7a
Refs #21690. Add CMake infrastructure.
MiguelCompany Sep 30, 2024
ea60f90
Refs #21690. Ensure same domain and topic name are used.
MiguelCompany Sep 30, 2024
35d0424
Refs #21690. Add `--loops` argument to publisher.
MiguelCompany Sep 30, 2024
f6ede02
Refs #21690. Publisher exits after publishing all samples.
MiguelCompany Sep 30, 2024
77fef7a
Refs #21690. Improve subscriber script.
MiguelCompany Sep 30, 2024
13ecf3c
Refs #21690. Add test.
MiguelCompany Sep 30, 2024
1682174
Refs #21690. Make publisher wait subscriber.
MiguelCompany Sep 30, 2024
811c39b
Refs #21690. Possible fix.
MiguelCompany Sep 17, 2024
28f5093
Refs #21690. Clear locators before recalculating them.
MiguelCompany Sep 18, 2024
edbba1c
Refs #21690. Move local participant proxy update to PDP.
MiguelCompany Sep 18, 2024
89d0d66
Refs #21690. Improve new method's logic.
MiguelCompany Sep 18, 2024
42013fc
Refs #21690. Include what you use.
MiguelCompany Sep 18, 2024
dc2ef2c
Refs #21690. Add empty method to update endpoint locators.
MiguelCompany Sep 18, 2024
bfa66a1
Refs #21690. Add implementation for `update_endpoint_locators_if_defa…
MiguelCompany Sep 18, 2024
9582f1a
Refs #21690. Compare against old default locators.
MiguelCompany Sep 18, 2024
986acc4
Refs #21690. Update locators in attributes.
MiguelCompany Sep 18, 2024
77a37a1
Refs #17283. Avoid early return on `PDP::local_participant_attributes…
MiguelCompany Oct 3, 2024
407c552
Refs #17283. Apply suggestions.
MiguelCompany Oct 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 119 additions & 0 deletions src/cpp/rtps/builtin/discovery/participant/PDP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
#endif // if HAVE_SECURITY
#include <utils/shared_mutex.hpp>
#include <utils/TimeConversion.hpp>
#include <rtps/writer/BaseWriter.hpp>
#include <rtps/reader/BaseReader.hpp>

namespace eprosima {
namespace fastdds {
Expand Down Expand Up @@ -1698,6 +1700,123 @@ void PDP::add_builtin_security_attributes(

#endif // HAVE_SECURITY

void PDP::local_participant_attributes_update_nts(
const RTPSParticipantAttributes& new_atts)
{
// Update user data
auto participant_data = getLocalParticipantProxyData();
participant_data->m_userData.data_vec(new_atts.userData);

bool announce_locators = !mp_RTPSParticipant->is_intraprocess_only();
if (!announce_locators)
{
// If we are intraprocess only, we do not need to update locators
return;
}

// Clear all locators
participant_data->metatraffic_locators.unicast.clear();
participant_data->metatraffic_locators.multicast.clear();
participant_data->default_locators.unicast.clear();
participant_data->default_locators.multicast.clear();

// Update default locators
for (const Locator_t& loc : new_atts.defaultUnicastLocatorList)
{
participant_data->default_locators.add_unicast_locator(loc);
}
for (const Locator_t& loc : new_atts.defaultMulticastLocatorList)
{
participant_data->default_locators.add_multicast_locator(loc);
}

// Update metatraffic locators
for (const auto& locator : new_atts.builtin.metatrafficUnicastLocatorList)
{
participant_data->metatraffic_locators.add_unicast_locator(locator);
}
if (!new_atts.builtin.avoid_builtin_multicast || participant_data->metatraffic_locators.unicast.empty())
{
for (const auto& locator : new_atts.builtin.metatrafficMulticastLocatorList)
{
participant_data->metatraffic_locators.add_multicast_locator(locator);
}
}

fastdds::rtps::network::external_locators::add_external_locators(*participant_data,
new_atts.builtin.metatraffic_external_unicast_locators,
new_atts.default_external_unicast_locators);
}

void PDP::update_endpoint_locators_if_default_nts(
const std::vector<BaseWriter*>& writers,
const std::vector<BaseReader*>& readers,
const RTPSParticipantAttributes& old_atts,
const RTPSParticipantAttributes& new_atts)
{
// Check if default locators have changed
const auto& old_default_unicast = old_atts.defaultUnicastLocatorList;
const auto& old_default_multicast = old_atts.defaultMulticastLocatorList;
const auto& new_default_unicast = new_atts.defaultUnicastLocatorList;
const auto& new_default_multicast = new_atts.defaultMulticastLocatorList;

// Early return if there is no change in default unicast locators
if ((old_default_unicast == new_default_unicast) &&
(old_default_multicast == new_default_multicast))
{
return;
}

// Update proxies of endpoints with default configured locators
EDP* edp = get_edp();
for (BaseWriter* writer : writers)
{
if ((old_default_multicast == writer->getAttributes().multicastLocatorList) &&
(old_default_unicast == writer->getAttributes().unicastLocatorList))
{
writer->getAttributes().multicastLocatorList = new_default_multicast;
writer->getAttributes().unicastLocatorList = new_default_unicast;

WriterProxyData* wdata = nullptr;
GUID_t participant_guid;
wdata = addWriterProxyData(writer->getGuid(), participant_guid,
[](WriterProxyData* proxy, bool is_update, const ParticipantProxyData& participant)
{
static_cast<void>(is_update);
assert(is_update);

proxy->set_locators(participant.default_locators);
return true;
});
assert(wdata != nullptr);
edp->process_writer_proxy_data(writer, wdata);
}
}
for (BaseReader* reader : readers)
{
if ((old_default_multicast == reader->getAttributes().multicastLocatorList) &&
(old_default_unicast == reader->getAttributes().unicastLocatorList))
{
reader->getAttributes().multicastLocatorList = new_default_multicast;
reader->getAttributes().unicastLocatorList = new_default_unicast;

ReaderProxyData* rdata = nullptr;
GUID_t participant_guid;
rdata = addReaderProxyData(reader->getGuid(), participant_guid,
[](ReaderProxyData* proxy, bool is_update, const ParticipantProxyData& participant)
{
static_cast<void>(is_update);
assert(is_update);

proxy->set_locators(participant.default_locators);
return true;
});
assert(rdata != nullptr);
edp->process_reader_proxy_data(reader, rdata);
}
}
}

} /* namespace rtps */
} /* namespace fastdds */
} /* namespace eprosima */
38 changes: 27 additions & 11 deletions src/cpp/rtps/builtin/discovery/participant/PDP.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,27 @@
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

#include <atomic>
#include <cstdint>
#include <functional>
#include <memory>
#include <mutex>
#include <string>
#include <vector>

#include <fastcdr/cdr/fixed_size_string.hpp>

#include <fastdds/dds/core/Time_t.hpp>
#include <fastdds/dds/core/policy/ParameterTypes.hpp>
#include <fastdds/dds/core/policy/QosPolicies.hpp>
#include <fastdds/rtps/attributes/ReaderAttributes.hpp>
#include <fastdds/rtps/attributes/RTPSParticipantAttributes.hpp>
#include <fastdds/rtps/attributes/WriterAttributes.hpp>
#include <fastdds/rtps/common/CDRMessage_t.hpp>
#include <fastdds/rtps/common/Guid.hpp>
#include <fastdds/rtps/common/GuidPrefix_t.hpp>
#include <fastdds/rtps/common/InstanceHandle.hpp>
#include <fastdds/rtps/common/LocatorList.hpp>
#include <fastdds/rtps/common/Types.hpp>
#include <fastdds/rtps/common/WriteParams.hpp>
#include <fastdds/rtps/history/IPayloadPool.hpp>
#include <fastdds/rtps/participant/ParticipantDiscoveryInfo.hpp>
Expand Down Expand Up @@ -65,19 +79,11 @@ class TypeIdentifier;

namespace rtps {

class PDPServerListener;
class PDPEndpoints;

} // namespace rtps
} // namespace fastdds

namespace fastdds {
namespace rtps {

class RTPSWriter;
class RTPSReader;
class BaseWriter;
class BaseReader;
class WriterHistory;
class ReaderHistory;
struct RTPSParticipantAllocationAttributes;
class RTPSParticipantImpl;
class RTPSParticipantListener;
class BuiltinProtocols;
Expand All @@ -87,6 +93,7 @@ class ReaderProxyData;
class WriterProxyData;
class ParticipantProxyData;
class ReaderListener;
class PDPEndpoints;
class PDPListener;
class PDPServerListener;
class ITopicPayloadPool;
Expand Down Expand Up @@ -484,6 +491,15 @@ class PDP : public fastdds::statistics::rtps::IProxyQueryable

#endif // FASTDDS_STATISTICS

virtual void local_participant_attributes_update_nts(
const RTPSParticipantAttributes& new_atts);

virtual void update_endpoint_locators_if_default_nts(
const std::vector<BaseWriter*>& writers,
const std::vector<BaseReader*>& readers,
const RTPSParticipantAttributes& old_atts,
const RTPSParticipantAttributes& new_atts);

protected:

//!Pointer to the builtin protocols object.
Expand Down
23 changes: 6 additions & 17 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1445,6 +1445,7 @@ void RTPSParticipantImpl::update_attributes(
if (internal_metatraffic_locators_)
{
LocatorList_t metatraffic_unicast_locator_list = temp_atts.builtin.metatrafficUnicastLocatorList;
temp_atts.builtin.metatrafficUnicastLocatorList.clear();
get_default_metatraffic_locators(temp_atts);
if (!(metatraffic_unicast_locator_list == temp_atts.builtin.metatrafficUnicastLocatorList))
{
Expand All @@ -1455,6 +1456,7 @@ void RTPSParticipantImpl::update_attributes(
if (internal_default_locators_)
{
LocatorList_t default_unicast_locator_list = temp_atts.defaultUnicastLocatorList;
temp_atts.defaultUnicastLocatorList.clear();
get_default_unicast_locators(temp_atts);
if (!(default_unicast_locator_list == temp_atts.defaultUnicastLocatorList))
{
Expand Down Expand Up @@ -1529,25 +1531,12 @@ void RTPSParticipantImpl::update_attributes(

{
std::lock_guard<std::recursive_mutex> lock(*pdp->getMutex());
pdp->local_participant_attributes_update_nts(temp_atts);

// Update user data
auto local_participant_proxy_data = pdp->getLocalParticipantProxyData();
local_participant_proxy_data->m_userData.data_vec(temp_atts.userData);

// Update metatraffic locators
for (auto locator : temp_atts.builtin.metatrafficMulticastLocatorList)
{
local_participant_proxy_data->metatraffic_locators.add_multicast_locator(locator);
}
for (auto locator : temp_atts.builtin.metatrafficUnicastLocatorList)
{
local_participant_proxy_data->metatraffic_locators.add_unicast_locator(locator);
}

// Update default locators
for (auto locator : temp_atts.defaultUnicastLocatorList)
if (local_interfaces_changed && internal_default_locators_)
{
local_participant_proxy_data->default_locators.add_unicast_locator(locator);
std::lock_guard<shared_mutex> _(endpoints_list_mutex);
pdp->update_endpoint_locators_if_default_nts(m_userWriterList, m_userReaderList, m_att, temp_atts);
}

if (local_interfaces_changed)
Expand Down
4 changes: 4 additions & 0 deletions test/dds/communication/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -315,3 +315,7 @@ if(Python3_Interpreter_FOUND)
endif()

endif()

if(UNIX AND NOT(APPLE) AND NOT(QNXNTO) AND NOT(ANDROID))
add_subdirectory(dyn_network)
endif()
6 changes: 3 additions & 3 deletions test/dds/communication/PubSubMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ void publisher_run(
publisher->wait_discovery(wait);
}

publisher->run(samples, loops, interval);
publisher->run(samples, 0, loops, interval);
}

int main(
Expand Down Expand Up @@ -196,7 +196,7 @@ int main(
DomainParticipantFactory::get_instance()->load_XML_profiles_file(xml_file);
}

SubscriberModule subscriber(publishers, samples, fixed_type, zero_copy);
SubscriberModule subscriber(publishers, samples, fixed_type, zero_copy, false, false);
PublisherModule publisher(exit_on_lost_liveliness, fixed_type, zero_copy);

uint32_t result = 1;
Expand All @@ -207,7 +207,7 @@ int main(

if (subscriber.init(seed, magic))
{
result = subscriber.run(notexit, timeout) ? 0 : -1;
result = subscriber.run(notexit, 0, timeout) ? 0 : -1;
}

publisher_thread.join();
Expand Down
28 changes: 26 additions & 2 deletions test/dds/communication/PublisherMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ using namespace eprosima::fastdds::dds;
* --seed <int>
* --wait <int>
* --samples <int>
* --loops <int>
* --interval <int>
* --magic <str>
* --xmlfile <path>
* --interval <int>
* --rescan <int>
*/

int main(
Expand All @@ -46,7 +48,9 @@ int main(
uint32_t wait = 0;
char* xml_file = nullptr;
uint32_t samples = 4;
uint32_t loops = 0;
uint32_t interval = 250;
uint32_t rescan_interval = 0;
std::string magic;

while (arg_count < argc)
Expand Down Expand Up @@ -93,6 +97,16 @@ int main(

samples = strtol(argv[arg_count], nullptr, 10);
}
else if (strcmp(argv[arg_count], "--loops") == 0)
{
if (++arg_count >= argc)
{
std::cout << "--loops expects a parameter" << std::endl;
return -1;
}

loops = strtol(argv[arg_count], nullptr, 10);
}
else if (strcmp(argv[arg_count], "--interval") == 0)
{
if (++arg_count >= argc)
Expand Down Expand Up @@ -123,6 +137,16 @@ int main(

xml_file = argv[arg_count];
}
else if (strcmp(argv[arg_count], "--rescan") == 0)
{
if (++arg_count >= argc)
{
std::cout << "--rescan expects a parameter" << std::endl;
return -1;
}

rescan_interval = strtol(argv[arg_count], nullptr, 10);
}
else
{
std::cout << "Wrong argument " << argv[arg_count] << std::endl;
Expand All @@ -146,7 +170,7 @@ int main(
publisher.wait_discovery(wait);
}

publisher.run(samples, 0, interval);
publisher.run(samples, rescan_interval, loops, interval);
return 0;
}

Expand Down
20 changes: 20 additions & 0 deletions test/dds/communication/PublisherModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,30 @@ void PublisherModule::wait_discovery(

void PublisherModule::run(
uint32_t samples,
const uint32_t rescan_interval,
const uint32_t loops,
uint32_t interval)
{
uint32_t current_loop = 0;
uint16_t index = 1;
void* sample = nullptr;

std::thread net_rescan_thread([this, rescan_interval]()
{
if (rescan_interval > 0)
{
auto interval = std::chrono::seconds(rescan_interval);
while (run_)
{
std::this_thread::sleep_for(interval);
if (run_)
{
participant_->set_qos(participant_->get_qos());
}
}
}
});

while (run_ && (loops == 0 || loops > current_loop))
{
if (zero_copy_)
Expand Down Expand Up @@ -187,6 +204,9 @@ void PublisherModule::run(

std::this_thread::sleep_for(std::chrono::milliseconds(interval));
}

run_ = false;
net_rescan_thread.join();
}

void PublisherModule::on_publication_matched(
Expand Down
Loading
Loading