Skip to content

[22995] Fix MacOS nightly flaky tests (backport #5738) #5739

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
261 changes: 261 additions & 0 deletions test/blackbox/common/BlackboxTestsDiscovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1474,6 +1474,267 @@ TEST_P(Discovery, single_unicast_pdp_response)
participants.clear();
}

<<<<<<< HEAD
=======
//! Regression test for redmine issue 22506
//! Test using a user's flowcontroller limiting the bandwidth and 5 remote participants waiting for the PDP sample.
TEST_P(Discovery, single_unicast_pdp_response_flowcontroller)
{
// Leverage intraprocess so transport is only used for participant discovery
if (INTRAPROCESS != GetParam())
{
GTEST_SKIP() << "Only makes sense on INTRAPROCESS";
return;
}

using namespace eprosima::fastdds::dds;

// All participants would restrict communication to UDP localhost.
// The main participant should send a single initial announcement, and have a big announcement period.
// This is to ensure that we only check the datagrams sent in response to the participant discovery,
// and not the ones sent in the periodic announcements.
// The main participant will use the test transport to count the number of unicast messages sent.

// This will hold the multicast port. Since the test is not always run in the same domain, we'll need to set
// its value when the first multicast datagram is sent.
std::atomic<uint32_t> multicast_port{ 0 };
// Declare a test transport that will count the number of unicast messages sent
std::atomic<size_t> num_unicast_sends{ 0 };
auto test_transport = std::make_shared<test_UDPv4TransportDescriptor>();
test_transport->interfaceWhiteList.push_back("127.0.0.1");
test_transport->locator_filter_ = [&num_unicast_sends, &multicast_port](
const eprosima::fastdds::rtps::Locator& destination)
{
if (IPLocator::isMulticast(destination))
{
uint32_t port = 0;
multicast_port.compare_exchange_strong(port, destination.port);
}
else
{
num_unicast_sends.fetch_add(1u, std::memory_order_seq_cst);
}

// Do not discard any message
return false;
};

// Create the main participant
auto main_participant = std::make_shared<PubSubParticipant<HelloWorldPubSubType>>(0, 0, 0, 0);
WireProtocolConfigQos main_wire_protocol;
main_wire_protocol.builtin.avoid_builtin_multicast = true;
main_wire_protocol.builtin.discovery_config.leaseDuration = c_TimeInfinite;
main_wire_protocol.builtin.discovery_config.leaseDuration_announcementperiod = { 3600, 0 };
main_wire_protocol.builtin.discovery_config.initial_announcements.count = 1;
main_wire_protocol.builtin.discovery_config.initial_announcements.period = { 0, 100000000u };
main_wire_protocol.builtin.flow_controller_name = "TestFlowController";

// Flowcontroller to limit the bandwidth
auto test_flow_controller = std::make_shared<eprosima::fastdds::rtps::FlowControllerDescriptor>();
test_flow_controller->name = "TestFlowController";
test_flow_controller->max_bytes_per_period = 3700;
test_flow_controller->period_ms = static_cast<uint64_t>(100);

// The main participant will use the test transport, specific announcements configuration and a flowcontroller
main_participant->disable_builtin_transport().add_user_transport_to_pparams(test_transport)
.wire_protocol(main_wire_protocol)
.flow_controller(test_flow_controller);

// Start the main participant
ASSERT_TRUE(main_participant->init_participant());

// Wait for the initial announcements to be sent
std::this_thread::sleep_for(std::chrono::seconds(1));
// This would have set the multicast port
EXPECT_NE(multicast_port, 0u);

// The rest of the participants only send announcements to the main participant
// Calculate the metatraffic unicast port of the main participant
uint32_t port = multicast_port + main_wire_protocol.port.offsetd1 - main_wire_protocol.port.offsetd0;

// The rest of the participants only send announcements to the main participant
auto udp_localhost_transport = std::make_shared<test_UDPv4TransportDescriptor>();
udp_localhost_transport->interfaceWhiteList.push_back("127.0.0.1");
Locator peer_locator;
IPLocator::createLocator(LOCATOR_KIND_UDPv4, "127.0.0.1", port, peer_locator);
WireProtocolConfigQos wire_protocol;
wire_protocol.builtin.avoid_builtin_multicast = true;
wire_protocol.builtin.initialPeersList.push_back(peer_locator);
wire_protocol.builtin.discovery_config.leaseDuration = c_TimeInfinite;
wire_protocol.builtin.discovery_config.leaseDuration_announcementperiod = { 3600, 0 };
wire_protocol.builtin.discovery_config.initial_announcements.count = 1;
wire_protocol.builtin.discovery_config.initial_announcements.period = { 0, 100000000u };

std::vector<std::shared_ptr<PubSubParticipant<HelloWorldPubSubType>>> participants;
for (size_t i = 0; i < 5; ++i)
{
auto participant = std::make_shared<PubSubParticipant<HelloWorldPubSubType>>(0, 0, 0, 0);
// All participants use the same transport
participant->disable_builtin_transport().add_user_transport_to_pparams(udp_localhost_transport)
.wire_protocol(wire_protocol);
participants.push_back(participant);
}

// Start the rest of the participants
for (auto& participant : participants)
{
ASSERT_TRUE(participant->init_participant());
participant->wait_discovery(std::chrono::seconds::zero(), 1, true);
}

main_participant->wait_discovery(std::chrono::seconds::zero(), 5, true);

// When in single threaded application, give some time for the builtin endpoints matching
std::this_thread::sleep_for(std::chrono::seconds(5));

// Destroy main participant
main_participant.reset();
for (auto& participant : participants)
{
participant->wait_discovery(std::chrono::seconds::zero(), 0, true);
}

// Check that the main participant sends two unicast messages to every other participant.
// One Data[P] and one Data[uP].
// Note that in a single core system, the number of unicast messages sent may be one
// per participant since the main participant's destruction races with
// the asynchronous Data[uP] in the locator selector (the unicast locator of the remote may not be there by the time)
// using the multicast instead.
EXPECT_GE(num_unicast_sends.load(std::memory_order::memory_order_seq_cst),
participants.size());

// Clean up
participants.clear();
}

//! Regression test for redmine issue 22506
//! Same test as single_unicast_pdp_response_flowcontroller but the main participant's builtin controller is so limited
//! that it will not be able to send all the initial announcements.
TEST_P(Discovery, single_unicast_pdp_response_flowcontroller_limited)
{
// Leverage intraprocess so transport is only used for participant discovery
if (INTRAPROCESS != GetParam())
{
GTEST_SKIP() << "Only makes sense on INTRAPROCESS";
return;
}

using namespace eprosima::fastdds::dds;

// All participants would restrict communication to UDP localhost.
// The main participant should send a single initial announcement, and have a big announcement period.
// This is to ensure that we only check the datagrams sent in response to the participant discovery,
// and not the ones sent in the periodic announcements.
// The main participant will use the test transport to count the number of unicast messages sent.

// This will hold the multicast port. Since the test is not always run in the same domain, we'll need to set
// its value when the first multicast datagram is sent.
std::atomic<uint32_t> multicast_port{ 0 };
// Declare a test transport that will count the number of unicast messages sent
std::atomic<size_t> num_unicast_sends{ 0 };
auto test_transport = std::make_shared<test_UDPv4TransportDescriptor>();
test_transport->interfaceWhiteList.push_back("127.0.0.1");
test_transport->locator_filter_ = [&num_unicast_sends, &multicast_port](
const eprosima::fastdds::rtps::Locator& destination)
{
if (IPLocator::isMulticast(destination))
{
uint32_t port = 0;
multicast_port.compare_exchange_strong(port, destination.port);
}
else
{
num_unicast_sends.fetch_add(1u, std::memory_order_seq_cst);
}

// Do not discard any message
return false;
};

// Create the main participant
auto main_participant = std::make_shared<PubSubParticipant<HelloWorldPubSubType>>(0, 0, 0, 0);
WireProtocolConfigQos main_wire_protocol;
main_wire_protocol.builtin.avoid_builtin_multicast = true;
main_wire_protocol.builtin.discovery_config.leaseDuration = c_TimeInfinite;
main_wire_protocol.builtin.discovery_config.leaseDuration_announcementperiod = { 3600, 0 };
main_wire_protocol.builtin.discovery_config.initial_announcements.count = 1;
main_wire_protocol.builtin.discovery_config.initial_announcements.period = { 0, 100000000u };
main_wire_protocol.builtin.flow_controller_name = "TestFlowController";

// Flowcontroller to limit the bandwidth
auto test_flow_controller = std::make_shared<eprosima::fastdds::rtps::FlowControllerDescriptor>();
test_flow_controller->name = "TestFlowController";
test_flow_controller->max_bytes_per_period = 3700;
test_flow_controller->period_ms = static_cast<uint64_t>(100000);

// The main participant will use the test transport, specific announcements configuration and a flowcontroller
main_participant->disable_builtin_transport().add_user_transport_to_pparams(test_transport)
.wire_protocol(main_wire_protocol)
.flow_controller(test_flow_controller);

// Start the main participant
ASSERT_TRUE(main_participant->init_participant());

// Wait for the initial announcements to be sent
std::this_thread::sleep_for(std::chrono::seconds(1));
// This would have set the multicast port
EXPECT_NE(multicast_port, 0u);

// The rest of the participants only send announcements to the main participant
// Calculate the metatraffic unicast port of the main participant
uint32_t port = multicast_port + main_wire_protocol.port.offsetd1 - main_wire_protocol.port.offsetd0;

// The rest of the participants only send announcements to the main participant
auto udp_localhost_transport = std::make_shared<test_UDPv4TransportDescriptor>();
udp_localhost_transport->interfaceWhiteList.push_back("127.0.0.1");
Locator peer_locator;
IPLocator::createLocator(LOCATOR_KIND_UDPv4, "127.0.0.1", port, peer_locator);
WireProtocolConfigQos wire_protocol;
wire_protocol.builtin.avoid_builtin_multicast = true;
wire_protocol.builtin.initialPeersList.push_back(peer_locator);
wire_protocol.builtin.discovery_config.leaseDuration = c_TimeInfinite;
wire_protocol.builtin.discovery_config.leaseDuration_announcementperiod = { 3600, 0 };
wire_protocol.builtin.discovery_config.initial_announcements.count = 1;
wire_protocol.builtin.discovery_config.initial_announcements.period = { 0, 100000000u };

std::vector<std::shared_ptr<PubSubParticipant<HelloWorldPubSubType>>> participants;
for (size_t i = 0; i < 10; ++i)
{
auto participant = std::make_shared<PubSubParticipant<HelloWorldPubSubType>>(0, 0, 0, 0);
// All participants use the same transport
participant->disable_builtin_transport().add_user_transport_to_pparams(udp_localhost_transport)
.wire_protocol(wire_protocol);
participants.push_back(participant);
}

// Start the rest of the participants
for (auto& participant : participants)
{
ASSERT_TRUE(participant->init_participant());
participant->wait_discovery(std::chrono::seconds(1), 1, true);
}

// The builtin flowcontroller of the main participant will not be able to send all the initial announcements as the max byter per period has already
// been reached. In fact no more messages will be sent from the builtin writers of the main participant.
EXPECT_LT(num_unicast_sends.load(std::memory_order::memory_order_seq_cst), participants.size());
auto num_unicast_sends_limit = num_unicast_sends.load(std::memory_order::memory_order_seq_cst);

// Destroy main participant
main_participant.reset();
for (auto& participant : participants)
{
participant->wait_discovery(std::chrono::seconds(1), 0, true);
}

std::this_thread::sleep_for(std::chrono::milliseconds(100));
// No more messages have been sent sin the limit was reached
EXPECT_EQ(num_unicast_sends.load(std::memory_order::memory_order_seq_cst), num_unicast_sends_limit);

// Clean up
participants.clear();
}

>>>>>>> 6d0d853c (Fix `MacOS` nightly flaky tests (#5738))
#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
Expand Down
Loading