Skip to content

Commit a69d9d9

Browse files
Avoid sending statistics msg with big msgs and no fragmentation (#5742) (#5745)
* Refs #23001: Add 100kb new Type Signed-off-by: cferreiragonz <[email protected]> * Refs #23001: Regression Test Signed-off-by: cferreiragonz <[email protected]> * Refs #23001: Avoid sending statistics with no_frag and big msg Signed-off-by: cferreiragonz <[email protected]> * Refs #23001: Remove stats buffer before alloc_buffer in SHM Signed-off-by: cferreiragonz <[email protected]> * Refs #23001: Fix Windows build Signed-off-by: cferreiragonz <[email protected]> * Refs #23001: Review Signed-off-by: cferreiragonz <[email protected]> --------- Signed-off-by: cferreiragonz <[email protected]> (cherry picked from commit 560dbde) Co-authored-by: Carlos Ferreira González <[email protected]>
1 parent feb9a2f commit a69d9d9

19 files changed

+1121
-20
lines changed

src/cpp/rtps/messages/RTPSMessageGroup.cpp

+6-1
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,12 @@ void RTPSMessageGroup::send()
392392
#endif // if HAVE_SECURITY
393393

394394
#ifdef FASTDDS_STATISTICS
395-
add_stats_submsg();
395+
if (buffers_bytes_ <
396+
static_cast<uint32_t>(std::numeric_limits<uint16_t>::max() - RTPSMESSAGE_DATA_MIN_LENGTH))
397+
{
398+
// Avoid sending the data message for DATA that are not fragmented and exceed the 65 kB limit
399+
add_stats_submsg();
400+
}
396401
#endif // FASTDDS_STATISTICS
397402

398403
if (!sender_->send(*buffers_to_send_,

src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp

+3-4
Original file line numberDiff line numberDiff line change
@@ -470,14 +470,13 @@ std::shared_ptr<SharedMemManager::Buffer> SharedMemTransport::copy_to_shared_buf
470470

471471
assert(shared_mem_segment_);
472472

473-
std::shared_ptr<SharedMemManager::Buffer> shared_buffer =
474-
shared_mem_segment_->alloc_buffer(total_bytes, max_blocking_time_point);
475-
uint8_t* pos = static_cast<uint8_t*>(shared_buffer->data());
476-
477473
// Statistics submessage is always the last buffer to be added
478474
// If statistics message is present, skip last buffer
479475
auto it_end = remove_statistics_buffer(buffers.back(), total_bytes) ? std::prev(buffers.end()) : buffers.end();
480476

477+
std::shared_ptr<SharedMemManager::Buffer> shared_buffer =
478+
shared_mem_segment_->alloc_buffer(total_bytes, max_blocking_time_point);
479+
uint8_t* pos = static_cast<uint8_t*>(shared_buffer->data());
481480

482481
for (auto it = buffers.begin(); it != it_end; ++it)
483482
{

test/blackbox/CMakeLists.txt

+4
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ file(GLOB RTPS_BLACKBOXTESTS_TEST_SOURCE "common/RTPSBlackboxTests*.cpp")
6060
set(RTPS_BLACKBOXTESTS_SOURCE ${RTPS_BLACKBOXTESTS_TEST_SOURCE}
6161
types/Data1mbPubSubTypes.cxx
6262
types/Data1mbTypeObjectSupport.cxx
63+
types/Data100kbPubSubTypes.cxx
64+
types/Data100kbTypeObjectSupport.cxx
6365
types/Data64kbPubSubTypes.cxx
6466
types/Data64kbTypeObjectSupport.cxx
6567
types/FixedSizedPubSubTypes.cxx
@@ -103,6 +105,8 @@ file(GLOB BLACKBOXTESTS_TEST_SOURCE "common/BlackboxTests*.cpp")
103105
set(BLACKBOXTESTS_SOURCE ${BLACKBOXTESTS_TEST_SOURCE}
104106
types/Data1mbPubSubTypes.cxx
105107
types/Data1mbTypeObjectSupport.cxx
108+
types/Data100kbPubSubTypes.cxx
109+
types/Data100kbTypeObjectSupport.cxx
106110
types/Data64kbPubSubTypes.cxx
107111
types/Data64kbTypeObjectSupport.cxx
108112
types/FixedSizedPubSubTypes.cxx

test/blackbox/api/dds-pim/PubSubReader.hpp

+15-6
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ using eprosima::fastdds::rtps::UDPTransportDescriptor;
6767
using eprosima::fastdds::rtps::UDPv4TransportDescriptor;
6868
using eprosima::fastdds::rtps::UDPv6TransportDescriptor;
6969
using eprosima::fastdds::rtps::IPLocator;
70+
using eprosima::fastdds::rtps::BuiltinTransports;
71+
using eprosima::fastdds::rtps::BuiltinTransportsOptions;
7072

7173
using SampleLostStatusFunctor = std::function<void (const eprosima::fastdds::dds::SampleLostStatus&)>;
7274
using SampleRejectedStatusFunctor = std::function<void (const eprosima::fastdds::dds::SampleRejectedStatus&)>;
@@ -1040,15 +1042,15 @@ class PubSubReader
10401042
}
10411043

10421044
PubSubReader& setup_transports(
1043-
eprosima::fastdds::rtps::BuiltinTransports transports)
1045+
BuiltinTransports transports)
10441046
{
10451047
participant_qos_.setup_transports(transports);
10461048
return *this;
10471049
}
10481050

10491051
PubSubReader& setup_transports(
1050-
eprosima::fastdds::rtps::BuiltinTransports transports,
1051-
const eprosima::fastdds::rtps::BuiltinTransportsOptions& options)
1052+
BuiltinTransports transports,
1053+
const BuiltinTransportsOptions& options)
10521054
{
10531055
participant_qos_.setup_transports(transports, options);
10541056
return *this;
@@ -1057,9 +1059,10 @@ class PubSubReader
10571059
PubSubReader& setup_large_data_tcp(
10581060
bool v6 = false,
10591061
const uint16_t& port = 0,
1060-
const uint32_t& tcp_negotiation_timeout = 0)
1062+
const BuiltinTransportsOptions& options = BuiltinTransportsOptions())
10611063
{
10621064
participant_qos_.transport().use_builtin_transports = false;
1065+
participant_qos_.transport().max_msg_size_no_frag = options.maxMessageSize;
10631066

10641067
/* Transports configuration */
10651068
// UDP transport for PDP over multicast
@@ -1077,7 +1080,10 @@ class PubSubReader
10771080
data_transport->check_crc = false;
10781081
data_transport->apply_security = false;
10791082
data_transport->enable_tcp_nodelay = true;
1080-
data_transport->tcp_negotiation_timeout = tcp_negotiation_timeout;
1083+
data_transport->maxMessageSize = options.maxMessageSize;
1084+
data_transport->sendBufferSize = options.sockets_buffer_size;
1085+
data_transport->receiveBufferSize = options.sockets_buffer_size;
1086+
data_transport->tcp_negotiation_timeout = options.tcp_negotiation_timeout;
10811087
participant_qos_.transport().user_transports.push_back(data_transport);
10821088
}
10831089
else
@@ -1091,7 +1097,10 @@ class PubSubReader
10911097
data_transport->check_crc = false;
10921098
data_transport->apply_security = false;
10931099
data_transport->enable_tcp_nodelay = true;
1094-
data_transport->tcp_negotiation_timeout = tcp_negotiation_timeout;
1100+
data_transport->maxMessageSize = options.maxMessageSize;
1101+
data_transport->sendBufferSize = options.sockets_buffer_size;
1102+
data_transport->receiveBufferSize = options.sockets_buffer_size;
1103+
data_transport->tcp_negotiation_timeout = options.tcp_negotiation_timeout;
10951104
participant_qos_.transport().user_transports.push_back(data_transport);
10961105
}
10971106

test/blackbox/api/dds-pim/PubSubWriter.hpp

+21-6
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ using eprosima::fastdds::rtps::UDPTransportDescriptor;
6161
using eprosima::fastdds::rtps::UDPv4TransportDescriptor;
6262
using eprosima::fastdds::rtps::UDPv6TransportDescriptor;
6363
using eprosima::fastdds::rtps::IPLocator;
64+
using eprosima::fastdds::rtps::BuiltinTransports;
65+
using eprosima::fastdds::rtps::BuiltinTransportsOptions;
6466

6567
template<class TypeSupport>
6668
class PubSubWriter
@@ -999,15 +1001,15 @@ class PubSubWriter
9991001
}
10001002

10011003
PubSubWriter& setup_transports(
1002-
eprosima::fastdds::rtps::BuiltinTransports transports)
1004+
BuiltinTransports transports)
10031005
{
10041006
participant_qos_.setup_transports(transports);
10051007
return *this;
10061008
}
10071009

10081010
PubSubWriter& setup_transports(
1009-
eprosima::fastdds::rtps::BuiltinTransports transports,
1010-
const eprosima::fastdds::rtps::BuiltinTransportsOptions& options)
1011+
BuiltinTransports transports,
1012+
const BuiltinTransportsOptions& options)
10111013
{
10121014
participant_qos_.setup_transports(transports, options);
10131015
return *this;
@@ -1016,9 +1018,10 @@ class PubSubWriter
10161018
PubSubWriter& setup_large_data_tcp(
10171019
bool v6 = false,
10181020
const uint16_t& port = 0,
1019-
const uint32_t& tcp_negotiation_timeout = 0)
1021+
const BuiltinTransportsOptions& options = BuiltinTransportsOptions())
10201022
{
10211023
participant_qos_.transport().use_builtin_transports = false;
1024+
participant_qos_.transport().max_msg_size_no_frag = options.maxMessageSize;
10221025

10231026
/* Transports configuration */
10241027
// UDP transport for PDP over multicast
@@ -1028,6 +1031,9 @@ class PubSubWriter
10281031
if (v6)
10291032
{
10301033
auto pdp_transport = std::make_shared<eprosima::fastdds::rtps::UDPv6TransportDescriptor>();
1034+
pdp_transport->maxMessageSize = options.maxMessageSize;
1035+
pdp_transport->sendBufferSize = options.sockets_buffer_size;
1036+
pdp_transport->receiveBufferSize = options.sockets_buffer_size;
10311037
participant_qos_.transport().user_transports.push_back(pdp_transport);
10321038

10331039
auto data_transport = std::make_shared<eprosima::fastdds::rtps::TCPv6TransportDescriptor>();
@@ -1036,12 +1042,18 @@ class PubSubWriter
10361042
data_transport->check_crc = false;
10371043
data_transport->apply_security = false;
10381044
data_transport->enable_tcp_nodelay = true;
1039-
data_transport->tcp_negotiation_timeout = tcp_negotiation_timeout;
1045+
data_transport->maxMessageSize = options.maxMessageSize;
1046+
data_transport->sendBufferSize = options.sockets_buffer_size;
1047+
data_transport->receiveBufferSize = options.sockets_buffer_size;
1048+
data_transport->tcp_negotiation_timeout = options.tcp_negotiation_timeout;
10401049
participant_qos_.transport().user_transports.push_back(data_transport);
10411050
}
10421051
else
10431052
{
10441053
auto pdp_transport = std::make_shared<eprosima::fastdds::rtps::UDPv4TransportDescriptor>();
1054+
pdp_transport->maxMessageSize = options.maxMessageSize;
1055+
pdp_transport->sendBufferSize = options.sockets_buffer_size;
1056+
pdp_transport->receiveBufferSize = options.sockets_buffer_size;
10451057
participant_qos_.transport().user_transports.push_back(pdp_transport);
10461058

10471059
auto data_transport = std::make_shared<eprosima::fastdds::rtps::TCPv4TransportDescriptor>();
@@ -1050,7 +1062,10 @@ class PubSubWriter
10501062
data_transport->check_crc = false;
10511063
data_transport->apply_security = false;
10521064
data_transport->enable_tcp_nodelay = true;
1053-
data_transport->tcp_negotiation_timeout = tcp_negotiation_timeout;
1065+
data_transport->maxMessageSize = options.maxMessageSize;
1066+
data_transport->sendBufferSize = options.sockets_buffer_size;
1067+
data_transport->receiveBufferSize = options.sockets_buffer_size;
1068+
data_transport->tcp_negotiation_timeout = options.tcp_negotiation_timeout;
10541069
participant_qos_.transport().user_transports.push_back(data_transport);
10551070
}
10561071

test/blackbox/common/BlackboxTests.hpp

+14
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#endif // if defined(_WIN32)
3333

3434
#include "../types/Data1mbPubSubTypes.hpp"
35+
#include "../types/Data100kbPubSubTypes.hpp"
3536
#include "../types/Data64kbPubSubTypes.hpp"
3637
#include "../types/FixedSizedPubSubTypes.hpp"
3738
#include "../types/HelloWorldPubSubTypes.hpp"
@@ -89,6 +90,10 @@ template<>
8990
void default_receive_print(
9091
const Data64kb& data);
9192

93+
template<>
94+
void default_receive_print(
95+
const Data100kb& data);
96+
9297
template<>
9398
void default_receive_print(
9499
const Data1mb& data);
@@ -128,6 +133,10 @@ template<>
128133
void default_send_print(
129134
const Data64kb& data);
130135

136+
template<>
137+
void default_send_print(
138+
const Data100kb& data);
139+
131140
template<>
132141
void default_send_print(
133142
const Data1mb& data);
@@ -169,6 +178,9 @@ std::list<Data1mb> default_data300kb_mix_data_generator(
169178
std::list<Data1mb> default_data96kb_data300kb_data_generator(
170179
size_t max = 0);
171180

181+
std::list<Data100kb> default_data100kb_data_generator(
182+
size_t max = 0);
183+
172184
std::list<KeyedData1mb> default_keyeddata300kb_data_generator(
173185
size_t max = 0);
174186

@@ -186,6 +198,8 @@ extern const std::function<void(const StringTest&)> default_string_print;
186198

187199
extern const std::function<void(const Data64kb&)> default_data64kb_print;
188200

201+
extern const std::function<void(const Data100kb&)> default_data100kb_print;
202+
189203
extern const std::function<void(const Data1mb&)> default_data300kb_print;
190204

191205
template<typename T>

test/blackbox/common/BlackboxTestsTransportCustom.cpp

+77
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,16 @@ class BuiltinTransportsTest
179179
}
180180
}
181181

182+
static void test_api_100kb(
183+
const BuiltinTransports& builtin_transports,
184+
const BuiltinTransportsOptions* const builtin_transports_options = nullptr)
185+
{
186+
if (builtin_transports != BuiltinTransports::NONE)
187+
{
188+
run_test_api_100kb(builtin_transports, builtin_transports_options);
189+
}
190+
}
191+
182192
private:
183193

184194
static void run_test(
@@ -302,6 +312,57 @@ class BuiltinTransportsTest
302312
EXPECT_TRUE(writer.waitForAllAcked(std::chrono::seconds(3)));
303313
}
304314

315+
static void run_test_api_100kb(
316+
const BuiltinTransports& builtin_transports,
317+
const BuiltinTransportsOptions* const builtin_transports_options = nullptr)
318+
{
319+
/* Test configuration */
320+
PubSubWriter<Data100kbPubSubType> writer(TEST_TOPIC_NAME);
321+
PubSubReader<Data100kbPubSubType> reader(TEST_TOPIC_NAME);
322+
323+
// Reliable keep all to wait of all acked as end condition
324+
writer.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
325+
.history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS);
326+
327+
reader.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
328+
.history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS);
329+
330+
// Builtin transport configuration
331+
if (builtin_transports_options != nullptr)
332+
{
333+
writer.setup_transports(builtin_transports, *builtin_transports_options);
334+
reader.setup_transports(builtin_transports, *builtin_transports_options);
335+
}
336+
else
337+
{
338+
writer.setup_transports(builtin_transports);
339+
reader.setup_transports(builtin_transports);
340+
}
341+
342+
/* Run test */
343+
// Init writer
344+
writer.init();
345+
ASSERT_TRUE(writer.isInitialized());
346+
347+
// Init reader
348+
reader.init();
349+
ASSERT_TRUE(reader.isInitialized());
350+
351+
// Wait for discovery
352+
writer.wait_discovery();
353+
reader.wait_discovery();
354+
355+
// Send data
356+
auto data = default_data100kb_data_generator();
357+
reader.startReception(data);
358+
writer.send(data);
359+
ASSERT_TRUE(data.empty());
360+
361+
// Wait for reception acknowledgement
362+
reader.block_for_all();
363+
EXPECT_TRUE(writer.waitForAllAcked(std::chrono::seconds(3)));
364+
}
365+
305366
static const std::string env_var_name_;
306367
};
307368

@@ -523,6 +584,14 @@ TEST(ChainingTransportTests, builtin_transports_api_shm)
523584
BuiltinTransportsTest::test_api(BuiltinTransports::SHM);
524585
}
525586

587+
TEST(ChainingTransportTests, builtin_transports_api_shm_no_frag)
588+
{
589+
BuiltinTransportsOptions options;
590+
options.maxMessageSize = 200000;
591+
options.sockets_buffer_size = 200000;
592+
BuiltinTransportsTest::test_api_100kb(BuiltinTransports::SHM, &options);
593+
}
594+
526595
TEST(ChainingTransportTests, builtin_transports_api_udpv4)
527596
{
528597
BuiltinTransportsTest::test_api(BuiltinTransports::UDPv4);
@@ -577,6 +646,14 @@ TEST(ChainingTransportTests, builtin_transports_api_large_datav6)
577646
}
578647
#endif // __APPLE__
579648

649+
TEST(ChainingTransportTests, builtin_transports_api_large_data_no_frag)
650+
{
651+
BuiltinTransportsOptions options;
652+
options.maxMessageSize = 200000;
653+
options.sockets_buffer_size = 200000;
654+
BuiltinTransportsTest::test_api_100kb(BuiltinTransports::LARGE_DATA, &options);
655+
}
656+
580657
TEST(ChainingTransportTests, builtin_transports_env_none)
581658
{
582659
BuiltinTransportsTest::test_env("NONE");

0 commit comments

Comments
 (0)