Skip to content

Commit 7e12e8f

Browse files
Collection of data and notification to listeners NETWORK_LATENCY (#1952)
* Refs 10791. Added RTPSStatisticsMessages.hpp. Signed-off-by: Miguel Company <[email protected]> * Refs 10791. Improve RTPSMessageCreator::addSubmessageHeader. Signed-off-by: Miguel Company <[email protected]> * Refs 10791. RTPSMessageGroup adds statistics submessage at the end. Signed-off-by: Miguel Company <[email protected]> * Refs 10791. Account for new submessage on RTPSWriter::calculateMaxDataSize. Signed-off-by: Miguel Company <[email protected]> * Refs 10791. MessageReceiver processing of new submessage. Signed-off-by: Miguel Company <[email protected]> * Refs 10791. Shared mem transport reports correct locator kind. Signed-off-by: Miguel Company <[email protected]> * Refs 10791. Do not process network statistics for SHM. Signed-off-by: Miguel Company <[email protected]> * Refs 10791. MessageReceiver takes reception locator. Signed-off-by: Miguel Company <[email protected]> * Refs 10791. Statistics submessage filled from transport. Signed-off-by: Miguel Company <[email protected]> * Refs 11450. Extend submessage with byte count. Signed-off-by: Miguel Company <[email protected]> * Refs 11450. SharedMemTransport removes statistics submessage. Signed-off-by: Miguel Company <[email protected]> * Refs 11450. Method to read a statistics submessage. Signed-off-by: Miguel Company <[email protected]> * Refs 11450. Processing moved to StatisticsParticipantImpl. Signed-off-by: Miguel Company <[email protected]> * Refs 10791. Modified test to expect network_latency events. Signed-off-by: Miguel Company <[email protected]> * Refs 10791. Calling listener. Signed-off-by: Miguel Company <[email protected]> * Refs 10791. Linters. Signed-off-by: Miguel Company <[email protected]> * Refs 11450. Ensure space for statistics submessage. Signed-off-by: Miguel Company <[email protected]> * Refs 10791. Adressed peer review comments. Signed-off-by: Miguel Company <[email protected]>
1 parent cec6e08 commit 7e12e8f

File tree

15 files changed

+406
-33
lines changed

15 files changed

+406
-33
lines changed

include/fastdds/rtps/messages/MessageReceiver.h

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,13 @@ class MessageReceiver
5656

5757
/**
5858
* Process a new CDR message.
59-
* @param[in] loc Locator indicating the sending address.
60-
* @param[in] msg Pointer to the message
59+
* @param [in] source_locator Locator indicating the sending address.
60+
* @param [in] reception_locator Locator indicating the listening address.
61+
* @param [in] msg Pointer to the message
6162
*/
6263
void processCDRMsg(
63-
const Locator_t& loc,
64+
const Locator_t& source_locator,
65+
const Locator_t& reception_locator,
6466
CDRMessage_t* msg);
6567

6668
// Functions to associate/remove associatedendpoints
@@ -243,6 +245,21 @@ class MessageReceiver
243245
uint32_t fragment_starting_num,
244246
uint16_t fragments_in_submessage);
245247
///@}
248+
249+
/**
250+
* Looks for the statistics specific submessage and notifies statistics related to the received message.
251+
*
252+
* @param [in] source_locator Locator indicating the sending address.
253+
* @param [in] reception_locator Locator indicating the listening address.
254+
* @param [in] msg Pointer to the message
255+
*
256+
* @pre The message header has already been read and validated.
257+
*/
258+
void notify_network_statistics(
259+
const Locator_t& source_locator,
260+
const Locator_t& reception_locator,
261+
CDRMessage_t* msg);
262+
246263
};
247264

248265
} /* namespace rtps */

src/cpp/rtps/messages/MessageReceiver.cpp

Lines changed: 66 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,19 @@
1919

2020
#include <fastdds/rtps/messages/MessageReceiver.h>
2121

22+
#include <cassert>
23+
#include <limits>
24+
#include <mutex>
25+
26+
#include <fastdds/core/policy/ParameterList.hpp>
2227
#include <fastdds/dds/log/Log.hpp>
2328

2429
#include <fastdds/rtps/reader/RTPSReader.h>
2530
#include <fastdds/rtps/writer/RTPSWriter.h>
2631

27-
#include <fastdds/core/policy/ParameterList.hpp>
2832
#include <rtps/participant/RTPSParticipantImpl.h>
29-
30-
#include <cassert>
31-
#include <limits>
32-
#include <mutex>
33+
#include <statistics/rtps/StatisticsBase.hpp>
34+
#include <statistics/rtps/messages/RTPSStatisticsMessages.hpp>
3335

3436
#define INFO_SRC_SUBMSG_LENGTH 20
3537

@@ -311,11 +313,10 @@ void MessageReceiver::reset()
311313
}
312314

313315
void MessageReceiver::processCDRMsg(
314-
const Locator_t& loc,
316+
const Locator_t& source_locator,
317+
const Locator_t& reception_locator,
315318
CDRMessage_t* msg)
316319
{
317-
(void)loc;
318-
319320
if (msg->length < RTPSMESSAGE_HEADER_SIZE)
320321
{
321322
logWarning(RTPS_MSG_IN, IDSTRING "Received message too short, ignoring");
@@ -335,6 +336,8 @@ void MessageReceiver::processCDRMsg(
335336
return;
336337
}
337338

339+
notify_network_statistics(source_locator, reception_locator, msg);
340+
338341
#if HAVE_SECURITY
339342
security::SecurityManager& security = participant_->security_manager();
340343
CDRMessage_t* auxiliary_buffer = &crypto_msg_;
@@ -358,7 +361,6 @@ void MessageReceiver::processCDRMsg(
358361

359362
// Loop until there are no more submessages
360363
bool valid;
361-
int count = 0;
362364
SubmessageHeader_t submsgh; //Current submessage header
363365

364366
while (msg->pos < msg->length)// end of the message
@@ -386,7 +388,6 @@ void MessageReceiver::processCDRMsg(
386388
}
387389

388390
valid = true;
389-
count++;
390391
uint32_t next_msg_pos = submessage->pos;
391392
next_msg_pos += (submsgh.submessageLength + 3u) & ~3u;
392393
switch (submsgh.submessageId)
@@ -1310,6 +1311,61 @@ bool MessageReceiver::proc_Submsg_HeartbeatFrag(
13101311
return true;
13111312
}
13121313

1314+
void MessageReceiver::notify_network_statistics(
1315+
const Locator_t& source_locator,
1316+
const Locator_t& reception_locator,
1317+
CDRMessage_t* msg)
1318+
{
1319+
static_cast<void>(source_locator);
1320+
static_cast<void>(reception_locator);
1321+
static_cast<void>(msg);
1322+
1323+
#ifdef FASTDDS_STATISTICS
1324+
using namespace eprosima::fastdds::statistics;
1325+
using namespace eprosima::fastdds::statistics::rtps;
1326+
1327+
if ((c_VendorId_eProsima != source_vendor_id_) ||
1328+
(LOCATOR_KIND_SHM == source_locator.kind))
1329+
{
1330+
return;
1331+
}
1332+
1333+
// Keep track of current position, so we can restore it later.
1334+
auto initial_pos = msg->pos;
1335+
while (msg->pos < msg->length)
1336+
{
1337+
SubmessageHeader_t header;
1338+
if (!readSubmessageHeader(msg, &header))
1339+
{
1340+
break;
1341+
}
1342+
1343+
if (FASTDDS_STATISTICS_NETWORK_SUBMESSAGE == header.submessageId)
1344+
{
1345+
// Check submessage validity
1346+
if ((statistics_submessage_data_length != header.submessageLength) ||
1347+
((msg->pos + header.submessageLength) > msg->length))
1348+
{
1349+
break;
1350+
}
1351+
1352+
StatisticsSubmessageData data;
1353+
read_statistics_submessage(msg, data);
1354+
participant_->on_network_statistics(source_guid_prefix_, source_locator, reception_locator, data);
1355+
break;
1356+
}
1357+
1358+
if (header.is_last)
1359+
{
1360+
break;
1361+
}
1362+
msg->pos += (header.submessageLength + 3u) & ~3u;
1363+
}
1364+
1365+
msg->pos = initial_pos;
1366+
#endif // FASTDDS_STATISTICS
1367+
}
1368+
13131369
} /* namespace rtps */
13141370
} /* namespace fastrtps */
13151371
} /* namespace eprosima */

src/cpp/rtps/messages/RTPSMessageCreator.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,13 @@ bool RTPSMessageCreator::addSubmessageHeader(
7878
octet flags,
7979
uint16_t size)
8080
{
81+
#if FASTDDS_IS_BIG_ENDIAN_TARGET
82+
msg->msg_endian = BIGEND;
83+
#else
84+
flags = flags | BIT(0);
85+
msg->msg_endian = LITTLEEND;
86+
#endif // if FASTDDS_IS_BIG_ENDIAN_TARGET
87+
8188
CDRMessage::addOctet(msg, id);
8289
CDRMessage::addOctet(msg, flags);
8390
CDRMessage::addUInt16(msg, size);

src/cpp/rtps/messages/RTPSMessageGroup.cpp

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,27 @@
3030
#include <rtps/messages/RTPSMessageGroup_t.hpp>
3131
#include <rtps/participant/RTPSParticipantImpl.h>
3232

33+
#include <statistics/rtps/messages/RTPSStatisticsMessages.hpp>
34+
3335
namespace eprosima {
3436
namespace fastrtps {
3537
namespace rtps {
3638

39+
static bool append_message(
40+
CDRMessage_t* full_msg,
41+
CDRMessage_t* submsg)
42+
{
43+
#ifndef FASTDDS_STATISTICS
44+
return CDRMessage::appendMsg(full_msg, submsg);
45+
#else
46+
// Keep room for the statistics submessage by reducing max_size while appending submessage
47+
full_msg->max_size -= eprosima::fastdds::statistics::rtps::statistics_submessage_length;
48+
bool ret_val = CDRMessage::appendMsg(full_msg, submsg);
49+
full_msg->max_size += eprosima::fastdds::statistics::rtps::statistics_submessage_length;
50+
return ret_val;
51+
#endif // FASTDDS_STATISTICS
52+
}
53+
3754
bool sort_changes_group (
3855
CacheChange_t* c1,
3956
CacheChange_t* c2)
@@ -213,6 +230,8 @@ void RTPSMessageGroup::send()
213230
}
214231
#endif // if HAVE_SECURITY
215232

233+
eprosima::fastdds::statistics::rtps::add_statistics_submessage(msgToSend);
234+
216235
if (!sender_.send(msgToSend, max_blocking_time_point_))
217236
{
218237
throw timeout();
@@ -246,20 +265,13 @@ bool RTPSMessageGroup::insert_submessage(
246265
const GuidPrefix_t& destination_guid_prefix,
247266
bool is_big_submessage)
248267
{
249-
if (!CDRMessage::appendMsg(full_msg_, submessage_msg_))
268+
if (!append_message(full_msg_, submessage_msg_))
250269
{
251270
// Retry
252-
flush();
253-
254-
current_dst_ = c_GuidPrefix_Unknown;
255-
256-
if (!add_info_dst_in_buffer(full_msg_, destination_guid_prefix))
257-
{
258-
logError(RTPS_WRITER, "Cannot add INFO_DST submessage to the CDRMessage. Buffer too small");
259-
return false;
260-
}
271+
flush_and_reset();
272+
add_info_dst_in_buffer(full_msg_, destination_guid_prefix);
261273

262-
if (!CDRMessage::appendMsg(full_msg_, submessage_msg_))
274+
if (!append_message(full_msg_, submessage_msg_))
263275
{
264276
logError(RTPS_WRITER, "Cannot add RTPS submesage to the CDRMessage. Buffer too small");
265277
return false;

src/cpp/rtps/network/ReceiverResource.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ void ReceiverResource::OnDataReceived(
118118
msg.reserved_size = size;
119119

120120
// TODO: Should we unlock in case UnregisterReceiver is called from callback ?
121-
rcv->processCDRMsg(remoteLocator, &msg);
121+
rcv->processCDRMsg(remoteLocator, localLocator, &msg);
122122
}
123123

124124
}

src/cpp/rtps/transport/TCPTransportInterface.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#include <rtps/transport/TCPChannelResourceSecure.h>
3333
#include <rtps/transport/TCPAcceptorSecure.h>
3434
#endif // if TLS_FOUND
35+
#include <statistics/rtps/messages/RTPSStatisticsMessages.hpp>
3536
#include <utils/SystemInfo.hpp>
3637

3738
using namespace std;
@@ -1055,6 +1056,8 @@ bool TCPTransportInterface::send(
10551056
std::shared_ptr<TCPChannelResource>& channel,
10561057
const Locator& remote_locator)
10571058
{
1059+
using namespace eprosima::fastdds::statistics::rtps;
1060+
10581061
bool locator_mismatch = false;
10591062

10601063
if (channel->locator() != IPLocator::toPhysicalLocator(remote_locator))
@@ -1109,6 +1112,8 @@ bool TCPTransportInterface::send(
11091112
if (channel->is_logical_port_opened(logical_port))
11101113
{
11111114
TCPHeader tcp_header;
1115+
StatisticsSubmessageData::Sequence seq;
1116+
set_statistics_submessage_from_transport(send_buffer, send_buffer_size, seq);
11121117
fill_rtcp_header(tcp_header, send_buffer, send_buffer_size, logical_port);
11131118

11141119
{

src/cpp/rtps/transport/UDPTransportInterface.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include <fastdds/dds/log/Log.hpp>
2525
#include <fastrtps/utils/IPLocator.h>
2626
#include <rtps/transport/UDPSenderResource.hpp>
27+
#include <statistics/rtps/messages/RTPSStatisticsMessages.hpp>
2728

2829
using namespace std;
2930
using namespace asio;
@@ -501,6 +502,8 @@ bool UDPTransportInterface::send(
501502
bool only_multicast_purpose,
502503
const std::chrono::microseconds& timeout)
503504
{
505+
using namespace eprosima::fastdds::statistics::rtps;
506+
504507
if (send_buffer_size > configuration()->sendBufferSize)
505508
{
506509
return false;
@@ -527,6 +530,8 @@ bool UDPTransportInterface::send(
527530
#endif // ifndef _WIN32
528531

529532
asio::error_code ec;
533+
StatisticsSubmessageData::Sequence seq;
534+
set_statistics_submessage_from_transport(send_buffer, send_buffer_size, seq);
530535
bytesSent = getSocketPtr(socket)->send_to(asio::buffer(send_buffer,
531536
send_buffer_size), destinationEndpoint, 0, ec);
532537
if (!!ec)

src/cpp/rtps/transport/shared_mem/SharedMemChannelResource.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ class SharedMemChannelResource : public ChannelResource
175175
virtual std::shared_ptr<SharedMemManager::Buffer> Receive(
176176
Locator& remote_locator)
177177
{
178-
(void)remote_locator;
178+
remote_locator.kind = LOCATOR_KIND_SHM;
179179

180180
try
181181
{

src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929
#include <rtps/transport/shared_mem/SharedMemTransport.h>
3030
#include <rtps/transport/shared_mem/SharedMemSenderResource.hpp>
3131
#include <rtps/transport/shared_mem/SharedMemChannelResource.hpp>
32-
3332
#include <rtps/transport/shared_mem/SharedMemManager.hpp>
33+
#include <statistics/rtps/messages/RTPSStatisticsMessages.hpp>
3434

3535
#define SHM_MANAGER_DOMAIN ("fastrtps")
3636

@@ -408,6 +408,8 @@ bool SharedMemTransport::send(
408408
fastrtps::rtps::LocatorsIterator* destination_locators_end,
409409
const std::chrono::steady_clock::time_point& max_blocking_time_point)
410410
{
411+
using namespace eprosima::fastdds::statistics::rtps;
412+
411413
fastrtps::rtps::LocatorsIterator& it = *destination_locators_begin;
412414

413415
bool ret = true;
@@ -423,6 +425,7 @@ bool SharedMemTransport::send(
423425
// Only copy the first time
424426
if (shared_buffer == nullptr)
425427
{
428+
remove_statistics_submessage(send_buffer, send_buffer_size);
426429
shared_buffer = copy_to_shared_buffer(send_buffer, send_buffer_size, max_blocking_time_point);
427430
}
428431

src/cpp/rtps/transport/shared_mem/test_SharedMemChannelResource.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ class test_SharedMemChannelResource : public SharedMemChannelResource
5555
std::shared_ptr<SharedMemManager::Buffer> Receive(
5656
Locator& remote_locator) override
5757
{
58-
(void)remote_locator;
58+
remote_locator.kind = LOCATOR_KIND_SHM;
5959

6060
try
6161
{

src/cpp/rtps/writer/RTPSWriter.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
#include <fastdds/rtps/messages/RTPSMessageCreator.h>
3939

4040
#include <statistics/rtps/StatisticsBase.hpp>
41+
#include <statistics/rtps/messages/RTPSStatisticsMessages.hpp>
4142

4243
namespace eprosima {
4344
namespace fastrtps {
@@ -304,6 +305,10 @@ uint32_t RTPSWriter::calculateMaxDataSize(
304305
}
305306
#endif // if HAVE_SECURITY
306307

308+
#ifdef FASTDDS_STATISTICS
309+
maxDataSize -= eprosima::fastdds::statistics::rtps::statistics_submessage_length;
310+
#endif // FASTDDS_STATISTICS
311+
307312
return maxDataSize;
308313
}
309314

0 commit comments

Comments
 (0)