@@ -1354,6 +1354,376 @@ TEST_P(Discovery, AsymmeticIgnoreParticipantFlags)
1354
1354
EXPECT_EQ (messages_on_port, allowed_messages_on_port);
1355
1355
}
1356
1356
1357
+ <<<<<<< HEAD
1358
+ =======
1359
+ // ! Regression test for redmine issue 22506
1360
+ TEST_P (Discovery, single_unicast_pdp_response)
1361
+ {
1362
+ // Leverage intraprocess so transport is only used for participant discovery
1363
+ if (INTRAPROCESS != GetParam ())
1364
+ {
1365
+ GTEST_SKIP () << " Only makes sense on INTRAPROCESS" ;
1366
+ return ;
1367
+ }
1368
+
1369
+ using namespace eprosima ::fastdds::dds;
1370
+
1371
+ // All participants would restrict communication to UDP localhost.
1372
+ // The main participant should send a single initial announcement, and have a big announcement period.
1373
+ // This is to ensure that we only check the datagrams sent in response to the participant discovery,
1374
+ // and not the ones sent in the periodic announcements.
1375
+ // The main participant will use the test transport to count the number of unicast messages sent.
1376
+
1377
+ // This will hold the multicast port. Since the test is not always run in the same domain, we'll need to set
1378
+ // its value when the first multicast datagram is sent.
1379
+ std::atomic<uint32_t > multicast_port{ 0 };
1380
+ // Declare a test transport that will count the number of unicast messages sent
1381
+ std::atomic<size_t > num_unicast_sends{ 0 };
1382
+ auto test_transport = std::make_shared<test_UDPv4TransportDescriptor>();
1383
+ test_transport->interfaceWhiteList .push_back (" 127.0.0.1" );
1384
+ test_transport->locator_filter_ = [&num_unicast_sends, &multicast_port](
1385
+ const eprosima::fastdds::rtps::Locator& destination)
1386
+ {
1387
+ if (IPLocator::isMulticast (destination))
1388
+ {
1389
+ uint32_t port = 0 ;
1390
+ multicast_port.compare_exchange_strong (port, destination.port );
1391
+ }
1392
+ else
1393
+ {
1394
+ num_unicast_sends.fetch_add (1u , std::memory_order_seq_cst);
1395
+ }
1396
+
1397
+ // Do not discard any message
1398
+ return false ;
1399
+ };
1400
+
1401
+ // Create the main participant
1402
+ auto main_participant = std::make_shared<PubSubParticipant<HelloWorldPubSubType>>(0 , 0 , 0 , 0 );
1403
+ WireProtocolConfigQos main_wire_protocol;
1404
+ main_wire_protocol.builtin .avoid_builtin_multicast = true ;
1405
+ main_wire_protocol.builtin .discovery_config .leaseDuration = c_TimeInfinite;
1406
+ main_wire_protocol.builtin .discovery_config .leaseDuration_announcementperiod = { 3600 , 0 };
1407
+ main_wire_protocol.builtin .discovery_config .initial_announcements .count = 1 ;
1408
+ main_wire_protocol.builtin .discovery_config .initial_announcements .period = { 0 , 100000000 };
1409
+
1410
+ // The main participant will use the test transport and a specific announcements configuration
1411
+ main_participant->disable_builtin_transport ().add_user_transport_to_pparams (test_transport)
1412
+ .wire_protocol (main_wire_protocol);
1413
+
1414
+ // Start the main participant
1415
+ ASSERT_TRUE (main_participant->init_participant ());
1416
+
1417
+ // Wait for the initial announcements to be sent
1418
+ std::this_thread::sleep_for (std::chrono::seconds (1 ));
1419
+ // This would have set the multicast port
1420
+ EXPECT_NE (multicast_port, 0u );
1421
+
1422
+ // The rest of the participants only send announcements to the main participant
1423
+ // Calculate the metatraffic unicast port of the main participant
1424
+ uint32_t port = multicast_port + main_wire_protocol.port .offsetd1 - main_wire_protocol.port .offsetd0 ;
1425
+
1426
+ // The rest of the participants only send announcements to the main participant
1427
+ auto udp_localhost_transport = std::make_shared<test_UDPv4TransportDescriptor>();
1428
+ udp_localhost_transport->interfaceWhiteList .push_back (" 127.0.0.1" );
1429
+ Locator peer_locator;
1430
+ IPLocator::createLocator (LOCATOR_KIND_UDPv4, " 127.0.0.1" , port, peer_locator);
1431
+ WireProtocolConfigQos wire_protocol;
1432
+ wire_protocol.builtin .avoid_builtin_multicast = true ;
1433
+ wire_protocol.builtin .initialPeersList .push_back (peer_locator);
1434
+
1435
+ std::vector<std::shared_ptr<PubSubParticipant<HelloWorldPubSubType>>> participants;
1436
+ for (size_t i = 0 ; i < 5 ; ++i)
1437
+ {
1438
+ auto participant = std::make_shared<PubSubParticipant<HelloWorldPubSubType>>(0 , 0 , 0 , 0 );
1439
+ // All participants use the same transport
1440
+ participant->disable_builtin_transport ().add_user_transport_to_pparams (udp_localhost_transport)
1441
+ .wire_protocol (wire_protocol);
1442
+ participants.push_back (participant);
1443
+ }
1444
+
1445
+ // Start the rest of the participants
1446
+ for (auto & participant : participants)
1447
+ {
1448
+ ASSERT_TRUE (participant->init_participant ());
1449
+ participant->wait_discovery ();
1450
+ }
1451
+
1452
+ // Destroy main participant
1453
+ main_participant.reset ();
1454
+ for (auto & participant : participants)
1455
+ {
1456
+ participant->wait_discovery (std::chrono::seconds::zero (), 0 , true );
1457
+ }
1458
+
1459
+ std::this_thread::sleep_for (std::chrono::milliseconds (100 ));
1460
+ // Check that only two unicast messages per participant were sent
1461
+ EXPECT_EQ (num_unicast_sends.load (std::memory_order::memory_order_seq_cst),
1462
+ participants.size () + participants.size ());
1463
+
1464
+ // Clean up
1465
+ participants.clear ();
1466
+ }
1467
+
1468
+ // ! Regression test for redmine issue 22506
1469
+ // ! Test using a user's flowcontroller limiting the bandwidth and 5 remote participants waiting for the PDP sample.
1470
+ TEST_P (Discovery, single_unicast_pdp_response_flowcontroller)
1471
+ {
1472
+ // Leverage intraprocess so transport is only used for participant discovery
1473
+ if (INTRAPROCESS != GetParam ())
1474
+ {
1475
+ GTEST_SKIP () << " Only makes sense on INTRAPROCESS" ;
1476
+ return ;
1477
+ }
1478
+
1479
+ using namespace eprosima ::fastdds::dds;
1480
+
1481
+ // All participants would restrict communication to UDP localhost.
1482
+ // The main participant should send a single initial announcement, and have a big announcement period.
1483
+ // This is to ensure that we only check the datagrams sent in response to the participant discovery,
1484
+ // and not the ones sent in the periodic announcements.
1485
+ // The main participant will use the test transport to count the number of unicast messages sent.
1486
+
1487
+ // This will hold the multicast port. Since the test is not always run in the same domain, we'll need to set
1488
+ // its value when the first multicast datagram is sent.
1489
+ std::atomic<uint32_t > multicast_port{ 0 };
1490
+ // Declare a test transport that will count the number of unicast messages sent
1491
+ std::atomic<size_t > num_unicast_sends{ 0 };
1492
+ auto test_transport = std::make_shared<test_UDPv4TransportDescriptor>();
1493
+ test_transport->interfaceWhiteList .push_back (" 127.0.0.1" );
1494
+ test_transport->locator_filter_ = [&num_unicast_sends, &multicast_port](
1495
+ const eprosima::fastdds::rtps::Locator& destination)
1496
+ {
1497
+ if (IPLocator::isMulticast (destination))
1498
+ {
1499
+ uint32_t port = 0 ;
1500
+ multicast_port.compare_exchange_strong (port, destination.port );
1501
+ }
1502
+ else
1503
+ {
1504
+ num_unicast_sends.fetch_add (1u , std::memory_order_seq_cst);
1505
+ }
1506
+
1507
+ // Do not discard any message
1508
+ return false ;
1509
+ };
1510
+
1511
+ // Create the main participant
1512
+ auto main_participant = std::make_shared<PubSubParticipant<HelloWorldPubSubType>>(0 , 0 , 0 , 0 );
1513
+ WireProtocolConfigQos main_wire_protocol;
1514
+ main_wire_protocol.builtin .avoid_builtin_multicast = true ;
1515
+ main_wire_protocol.builtin .discovery_config .leaseDuration = c_TimeInfinite;
1516
+ main_wire_protocol.builtin .discovery_config .leaseDuration_announcementperiod = { 3600 , 0 };
1517
+ main_wire_protocol.builtin .discovery_config .initial_announcements .count = 1 ;
1518
+ main_wire_protocol.builtin .discovery_config .initial_announcements .period = { 0 , 100000000u };
1519
+ main_wire_protocol.builtin .flow_controller_name = " TestFlowController" ;
1520
+
1521
+ // Flowcontroller to limit the bandwidth
1522
+ auto test_flow_controller = std::make_shared<eprosima::fastdds::rtps::FlowControllerDescriptor>();
1523
+ test_flow_controller->name = " TestFlowController" ;
1524
+ test_flow_controller->max_bytes_per_period = 3700 ;
1525
+ test_flow_controller->period_ms = static_cast <uint64_t >(100 );
1526
+
1527
+ // The main participant will use the test transport, specific announcements configuration and a flowcontroller
1528
+ main_participant->disable_builtin_transport ().add_user_transport_to_pparams (test_transport)
1529
+ .wire_protocol (main_wire_protocol)
1530
+ .flow_controller (test_flow_controller);
1531
+
1532
+ // Start the main participant
1533
+ ASSERT_TRUE (main_participant->init_participant ());
1534
+
1535
+ // Wait for the initial announcements to be sent
1536
+ std::this_thread::sleep_for (std::chrono::seconds (1 ));
1537
+ // This would have set the multicast port
1538
+ EXPECT_NE (multicast_port, 0u );
1539
+
1540
+ // The rest of the participants only send announcements to the main participant
1541
+ // Calculate the metatraffic unicast port of the main participant
1542
+ uint32_t port = multicast_port + main_wire_protocol.port .offsetd1 - main_wire_protocol.port .offsetd0 ;
1543
+
1544
+ // The rest of the participants only send announcements to the main participant
1545
+ auto udp_localhost_transport = std::make_shared<test_UDPv4TransportDescriptor>();
1546
+ udp_localhost_transport->interfaceWhiteList .push_back (" 127.0.0.1" );
1547
+ Locator peer_locator;
1548
+ IPLocator::createLocator (LOCATOR_KIND_UDPv4, " 127.0.0.1" , port, peer_locator);
1549
+ WireProtocolConfigQos wire_protocol;
1550
+ wire_protocol.builtin .avoid_builtin_multicast = true ;
1551
+ wire_protocol.builtin .initialPeersList .push_back (peer_locator);
1552
+ wire_protocol.builtin .discovery_config .leaseDuration = c_TimeInfinite;
1553
+ wire_protocol.builtin .discovery_config .leaseDuration_announcementperiod = { 3600 , 0 };
1554
+ wire_protocol.builtin .discovery_config .initial_announcements .count = 1 ;
1555
+ wire_protocol.builtin .discovery_config .initial_announcements .period = { 0 , 100000000u };
1556
+
1557
+ std::vector<std::shared_ptr<PubSubParticipant<HelloWorldPubSubType>>> participants;
1558
+ for (size_t i = 0 ; i < 5 ; ++i)
1559
+ {
1560
+ auto participant = std::make_shared<PubSubParticipant<HelloWorldPubSubType>>(0 , 0 , 0 , 0 );
1561
+ // All participants use the same transport
1562
+ participant->disable_builtin_transport ().add_user_transport_to_pparams (udp_localhost_transport)
1563
+ .wire_protocol (wire_protocol);
1564
+ participants.push_back (participant);
1565
+ }
1566
+
1567
+ // Start the rest of the participants
1568
+ for (auto & participant : participants)
1569
+ {
1570
+ ASSERT_TRUE (participant->init_participant ());
1571
+ participant->wait_discovery (std::chrono::seconds::zero (), 1 , true );
1572
+ }
1573
+
1574
+ main_participant->wait_discovery (std::chrono::seconds::zero (), 5 , true );
1575
+
1576
+ // When in single threaded application, give some time for the builtin endpoints matching
1577
+ std::this_thread::sleep_for (std::chrono::seconds (5 ));
1578
+
1579
+ // Destroy main participant
1580
+ main_participant.reset ();
1581
+ for (auto & participant : participants)
1582
+ {
1583
+ participant->wait_discovery (std::chrono::seconds::zero (), 0 , true );
1584
+ }
1585
+
1586
+ // Check that the main participant sends two unicast messages to every other participant.
1587
+ // One Data[P] and one Data[uP].
1588
+ // Note that in a single core system, the number of unicast messages sent may be one
1589
+ // per participant since the main participant's destruction races with
1590
+ // the asynchronous Data[uP] in the locator selector (the unicast locator of the remote may not be there by the time)
1591
+ // using the multicast instead.
1592
+ EXPECT_GE (num_unicast_sends.load (std::memory_order::memory_order_seq_cst),
1593
+ participants.size ());
1594
+
1595
+ // Clean up
1596
+ participants.clear ();
1597
+ }
1598
+
1599
+ // ! Regression test for redmine issue 22506
1600
+ // ! Same test as single_unicast_pdp_response_flowcontroller but the main participant's builtin controller is so limited
1601
+ // ! that it will not be able to send all the initial announcements.
1602
+ TEST_P (Discovery, single_unicast_pdp_response_flowcontroller_limited)
1603
+ {
1604
+ // Leverage intraprocess so transport is only used for participant discovery
1605
+ if (INTRAPROCESS != GetParam ())
1606
+ {
1607
+ GTEST_SKIP () << " Only makes sense on INTRAPROCESS" ;
1608
+ return ;
1609
+ }
1610
+
1611
+ using namespace eprosima ::fastdds::dds;
1612
+
1613
+ // All participants would restrict communication to UDP localhost.
1614
+ // The main participant should send a single initial announcement, and have a big announcement period.
1615
+ // This is to ensure that we only check the datagrams sent in response to the participant discovery,
1616
+ // and not the ones sent in the periodic announcements.
1617
+ // The main participant will use the test transport to count the number of unicast messages sent.
1618
+
1619
+ // This will hold the multicast port. Since the test is not always run in the same domain, we'll need to set
1620
+ // its value when the first multicast datagram is sent.
1621
+ std::atomic<uint32_t > multicast_port{ 0 };
1622
+ // Declare a test transport that will count the number of unicast messages sent
1623
+ std::atomic<size_t > num_unicast_sends{ 0 };
1624
+ auto test_transport = std::make_shared<test_UDPv4TransportDescriptor>();
1625
+ test_transport->interfaceWhiteList .push_back (" 127.0.0.1" );
1626
+ test_transport->locator_filter_ = [&num_unicast_sends, &multicast_port](
1627
+ const eprosima::fastdds::rtps::Locator& destination)
1628
+ {
1629
+ if (IPLocator::isMulticast (destination))
1630
+ {
1631
+ uint32_t port = 0 ;
1632
+ multicast_port.compare_exchange_strong (port, destination.port );
1633
+ }
1634
+ else
1635
+ {
1636
+ num_unicast_sends.fetch_add (1u , std::memory_order_seq_cst);
1637
+ }
1638
+
1639
+ // Do not discard any message
1640
+ return false ;
1641
+ };
1642
+
1643
+ // Create the main participant
1644
+ auto main_participant = std::make_shared<PubSubParticipant<HelloWorldPubSubType>>(0 , 0 , 0 , 0 );
1645
+ WireProtocolConfigQos main_wire_protocol;
1646
+ main_wire_protocol.builtin .avoid_builtin_multicast = true ;
1647
+ main_wire_protocol.builtin .discovery_config .leaseDuration = c_TimeInfinite;
1648
+ main_wire_protocol.builtin .discovery_config .leaseDuration_announcementperiod = { 3600 , 0 };
1649
+ main_wire_protocol.builtin .discovery_config .initial_announcements .count = 1 ;
1650
+ main_wire_protocol.builtin .discovery_config .initial_announcements .period = { 0 , 100000000u };
1651
+ main_wire_protocol.builtin .flow_controller_name = " TestFlowController" ;
1652
+
1653
+ // Flowcontroller to limit the bandwidth
1654
+ auto test_flow_controller = std::make_shared<eprosima::fastdds::rtps::FlowControllerDescriptor>();
1655
+ test_flow_controller->name = " TestFlowController" ;
1656
+ test_flow_controller->max_bytes_per_period = 3700 ;
1657
+ test_flow_controller->period_ms = static_cast <uint64_t >(100000 );
1658
+
1659
+ // The main participant will use the test transport, specific announcements configuration and a flowcontroller
1660
+ main_participant->disable_builtin_transport ().add_user_transport_to_pparams (test_transport)
1661
+ .wire_protocol (main_wire_protocol)
1662
+ .flow_controller (test_flow_controller);
1663
+
1664
+ // Start the main participant
1665
+ ASSERT_TRUE (main_participant->init_participant ());
1666
+
1667
+ // Wait for the initial announcements to be sent
1668
+ std::this_thread::sleep_for (std::chrono::seconds (1 ));
1669
+ // This would have set the multicast port
1670
+ EXPECT_NE (multicast_port, 0u );
1671
+
1672
+ // The rest of the participants only send announcements to the main participant
1673
+ // Calculate the metatraffic unicast port of the main participant
1674
+ uint32_t port = multicast_port + main_wire_protocol.port .offsetd1 - main_wire_protocol.port .offsetd0 ;
1675
+
1676
+ // The rest of the participants only send announcements to the main participant
1677
+ auto udp_localhost_transport = std::make_shared<test_UDPv4TransportDescriptor>();
1678
+ udp_localhost_transport->interfaceWhiteList .push_back (" 127.0.0.1" );
1679
+ Locator peer_locator;
1680
+ IPLocator::createLocator (LOCATOR_KIND_UDPv4, " 127.0.0.1" , port, peer_locator);
1681
+ WireProtocolConfigQos wire_protocol;
1682
+ wire_protocol.builtin .avoid_builtin_multicast = true ;
1683
+ wire_protocol.builtin .initialPeersList .push_back (peer_locator);
1684
+ wire_protocol.builtin .discovery_config .leaseDuration = c_TimeInfinite;
1685
+ wire_protocol.builtin .discovery_config .leaseDuration_announcementperiod = { 3600 , 0 };
1686
+ wire_protocol.builtin .discovery_config .initial_announcements .count = 1 ;
1687
+ wire_protocol.builtin .discovery_config .initial_announcements .period = { 0 , 100000000u };
1688
+
1689
+ std::vector<std::shared_ptr<PubSubParticipant<HelloWorldPubSubType>>> participants;
1690
+ for (size_t i = 0 ; i < 10 ; ++i)
1691
+ {
1692
+ auto participant = std::make_shared<PubSubParticipant<HelloWorldPubSubType>>(0 , 0 , 0 , 0 );
1693
+ // All participants use the same transport
1694
+ participant->disable_builtin_transport ().add_user_transport_to_pparams (udp_localhost_transport)
1695
+ .wire_protocol (wire_protocol);
1696
+ participants.push_back (participant);
1697
+ }
1698
+
1699
+ // Start the rest of the participants
1700
+ for (auto & participant : participants)
1701
+ {
1702
+ ASSERT_TRUE (participant->init_participant ());
1703
+ participant->wait_discovery (std::chrono::seconds (1 ), 1 , true );
1704
+ }
1705
+
1706
+ // The builtin flowcontroller of the main participant will not be able to send all the initial announcements as the max byter per period has already
1707
+ // been reached. In fact no more messages will be sent from the builtin writers of the main participant.
1708
+ EXPECT_LT (num_unicast_sends.load (std::memory_order::memory_order_seq_cst), participants.size ());
1709
+ auto num_unicast_sends_limit = num_unicast_sends.load (std::memory_order::memory_order_seq_cst);
1710
+
1711
+ // Destroy main participant
1712
+ main_participant.reset ();
1713
+ for (auto & participant : participants)
1714
+ {
1715
+ participant->wait_discovery (std::chrono::seconds (1 ), 0 , true );
1716
+ }
1717
+
1718
+ std::this_thread::sleep_for (std::chrono::milliseconds (100 ));
1719
+ // No more messages have been sent sin the limit was reached
1720
+ EXPECT_EQ (num_unicast_sends.load (std::memory_order::memory_order_seq_cst), num_unicast_sends_limit);
1721
+
1722
+ // Clean up
1723
+ participants.clear ();
1724
+ }
1725
+
1726
+ >>>>>>> 6d0d853c (Fix `MacOS` nightly flaky tests (#5738 ))
1357
1727
#ifdef INSTANTIATE_TEST_SUITE_P
1358
1728
#define GTEST_INSTANTIATE_TEST_MACRO (x, y, z, w ) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
1359
1729
#else
0 commit comments