Skip to content

Commit f6094b6

Browse files
Fix related_sample_identity handling in RPC entities (#5760)
* Refs #23048. Fix includes in ReplierImpl. Signed-off-by: Miguel Company <[email protected]> * Refs #23048. Factor-out code to fill `related_sample_identity`. Signed-off-by: Miguel Company <[email protected]> * Refs #23048. Only update `related_sample_identity` when necessary. Signed-off-by: Miguel Company <[email protected]> * Refs #23048. Doxygen. Signed-off-by: Miguel Company <[email protected]> * Refs #23048. Similar changes in RequesterImpl. Signed-off-by: Miguel Company <[email protected]> * Refs #23048. Add Blackbox tests. Signed-off-by: Miguel Company <[email protected]> * Refs #23048. Uncrustify. Signed-off-by: Miguel Company <[email protected]> * Refs #23048. Fix includes in `RequesterImpl`. Signed-off-by: Miguel Company <[email protected]> --------- Signed-off-by: Miguel Company <[email protected]>
1 parent a7f2258 commit f6094b6

File tree

5 files changed

+176
-6
lines changed

5 files changed

+176
-6
lines changed

src/cpp/fastdds/rpc/ReplierImpl.cpp

+45-4
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,60 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
#include "ReplierImpl.hpp"
16+
17+
#include <string>
18+
19+
#include <fastdds/dds/core/detail/DDSReturnCode.hpp>
20+
#include <fastdds/dds/core/LoanableCollection.hpp>
21+
#include <fastdds/dds/core/LoanableSequence.hpp>
1522
#include <fastdds/dds/core/status/StatusMask.hpp>
23+
#include <fastdds/dds/domain/qos/ReplierQos.hpp>
1624
#include <fastdds/dds/log/Log.hpp>
25+
#include <fastdds/dds/rpc/RequestInfo.hpp>
26+
#include <fastdds/rtps/common/SampleIdentity.hpp>
1727
#include <fastdds/rtps/common/WriteParams.hpp>
1828

19-
#include "ReplierImpl.hpp"
2029
#include "ServiceImpl.hpp"
2130

2231
namespace eprosima {
2332
namespace fastdds {
2433
namespace dds {
2534
namespace rpc {
2635

36+
/**
37+
* @brief Fills the related sample identity of the request.
38+
*
39+
* This will fill the related sample identity of the request with values taken from the sample identity.
40+
* Values different from unknown are preserved.
41+
*
42+
* @param info [in,out] The request information to update.
43+
*/
44+
static void fill_related_sample_identity(
45+
RequestInfo& info)
46+
{
47+
// When sending a reply, the code here expects that related_sample_identity
48+
// has the sample_identity of the corresponding request.
49+
50+
static const rtps::SampleIdentity unknown_identity = rtps::SampleIdentity::unknown();
51+
52+
// If the related guid is unknown, we consider that the request is not related to a previous one,
53+
// so we set the related sample identity to the received sample identity
54+
if (unknown_identity.writer_guid() == info.related_sample_identity.writer_guid())
55+
{
56+
info.related_sample_identity = info.sample_identity;
57+
return;
58+
}
59+
60+
// There is a special case where only the related guid is set.
61+
// This is used in ROS 2 to convey the GUID of the reply reader.
62+
// In this case we just set the sequence number of the related sample identity
63+
if (unknown_identity.sequence_number() == info.related_sample_identity.sequence_number())
64+
{
65+
info.related_sample_identity.sequence_number() = info.sample_identity.sequence_number();
66+
}
67+
}
68+
2769
ReplierImpl::ReplierImpl(
2870
ServiceImpl* service,
2971
const ReplierQos& qos)
@@ -79,8 +121,7 @@ ReturnCode_t ReplierImpl::take_request(
79121
}
80122

81123
retcode = replier_reader_->take_next_sample(data, &info);
82-
// Related sample identity is stored in sample_indentity member of info. Change it to related_sample_identity
83-
info.related_sample_identity = info.sample_identity;
124+
fill_related_sample_identity(info);
84125

85126
return retcode;
86127
}
@@ -104,7 +145,7 @@ ReturnCode_t ReplierImpl::take_request(
104145
// Fill related_sample_identity attribute
105146
for (LoanableCollection::size_type i = 0; i < info.length(); ++i)
106147
{
107-
info[i].related_sample_identity = info[i].sample_identity;
148+
fill_related_sample_identity(info[i]);
108149
}
109150

110151
return retcode;

src/cpp/fastdds/rpc/RequesterImpl.cpp

+25-2
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,21 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
#include "RequesterImpl.hpp"
16+
17+
#include <string>
18+
19+
#include <fastdds/dds/core/detail/DDSReturnCode.hpp>
20+
#include <fastdds/dds/core/LoanableCollection.hpp>
21+
#include <fastdds/dds/core/LoanableSequence.hpp>
1522
#include <fastdds/dds/core/status/StatusMask.hpp>
23+
#include <fastdds/dds/domain/qos/RequesterQos.hpp>
1624
#include <fastdds/dds/log/Log.hpp>
25+
#include <fastdds/dds/rpc/RequestInfo.hpp>
26+
#include <fastdds/rtps/common/Guid.hpp>
27+
#include <fastdds/rtps/common/SequenceNumber.hpp>
1728
#include <fastdds/rtps/common/WriteParams.hpp>
1829

19-
#include "RequesterImpl.hpp"
2030
#include "ServiceImpl.hpp"
2131

2232
namespace eprosima {
@@ -59,11 +69,24 @@ ReturnCode_t RequesterImpl::send_request(
5969
}
6070

6171
rtps::WriteParams wparams;
72+
wparams.related_sample_identity(info.related_sample_identity);
6273
ReturnCode_t ret = requester_writer_->write(data, wparams);
6374
if (RETCODE_OK == ret)
6475
{
6576
// Fill RequestInfo's related sample identity with the information expected for the corresponding reply
66-
info.related_sample_identity = wparams.related_sample_identity();
77+
info.sample_identity = wparams.sample_identity();
78+
79+
// Set the full related sample identity when the writer guid is unknown
80+
if (rtps::GUID_t::unknown() == info.related_sample_identity.writer_guid())
81+
{
82+
info.related_sample_identity = wparams.related_sample_identity();
83+
}
84+
85+
// Set the sequence number of the related sample identity when it is unknown
86+
if (rtps::SequenceNumber_t::unknown() == info.related_sample_identity.sequence_number())
87+
{
88+
info.related_sample_identity.sequence_number() = wparams.related_sample_identity().sequence_number();
89+
}
6790
}
6891
return ret;
6992
}

test/blackbox/api/dds-pim/ReqRepHelloWorldRequester.cpp

+41
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,37 @@ void ReqRepHelloWorldRequester::send(
189189
ASSERT_NE(related_sample_identity_.sequence_number(), SequenceNumber_t());
190190
}
191191

192+
void ReqRepHelloWorldRequester::send(
193+
const uint16_t number,
194+
const eprosima::fastdds::rtps::SampleIdentity& related_sample_identity)
195+
{
196+
RequestInfo info;
197+
info.related_sample_identity = related_sample_identity;
198+
HelloWorld hello;
199+
hello.index(number);
200+
hello.message("HelloWorld");
201+
202+
{
203+
std::unique_lock<std::mutex> lock(mutex_);
204+
current_number_ = number;
205+
}
206+
207+
ASSERT_EQ(requester_->send_request((void*)&hello, info), RETCODE_OK);
208+
related_sample_identity_ = info.related_sample_identity;
209+
210+
ASSERT_NE(related_sample_identity_.sequence_number(), SequenceNumber_t());
211+
212+
if (eprosima::fastdds::rtps::GUID_t::unknown() != related_sample_identity.writer_guid())
213+
{
214+
ASSERT_EQ(related_sample_identity_.writer_guid(), related_sample_identity.writer_guid());
215+
}
216+
217+
if (eprosima::fastdds::rtps::SequenceNumber_t::unknown() != related_sample_identity.sequence_number())
218+
{
219+
ASSERT_EQ(related_sample_identity_.sequence_number(), related_sample_identity.sequence_number());
220+
}
221+
}
222+
192223
const Duration_t ReqRepHelloWorldRequester::datawriter_latency_budget_duration() const
193224
{
194225
return requester_->get_requester_writer()->get_qos().latency_budget().duration;
@@ -199,6 +230,16 @@ const Duration_t ReqRepHelloWorldRequester::datareader_latency_budget_duration()
199230
return requester_->get_requester_reader()->get_qos().latency_budget().duration;
200231
}
201232

233+
const eprosima::fastdds::rtps::GUID_t& ReqRepHelloWorldRequester::get_reader_guid() const
234+
{
235+
return requester_->get_requester_reader()->guid();
236+
}
237+
238+
const eprosima::fastdds::rtps::SampleIdentity& ReqRepHelloWorldRequester::get_last_related_sample_identity() const
239+
{
240+
return related_sample_identity_;
241+
}
242+
202243
void ReqRepHelloWorldRequester::init_processing_thread()
203244
{
204245
wait_set_.attach_condition(stop_processing_thread_);

test/blackbox/api/dds-pim/ReqRepHelloWorldRequester.hpp

+9
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#include <fastdds/dds/rpc/Requester.hpp>
3333
#include <fastdds/dds/rpc/RequestInfo.hpp>
3434
#include <fastdds/dds/rpc/Service.hpp>
35+
#include <fastdds/rtps/common/SampleIdentity.hpp>
3536

3637
#include "../../common/BlackboxTests.hpp"
3738

@@ -80,13 +81,21 @@ class ReqRepHelloWorldRequester
8081
void send(
8182
const uint16_t number);
8283

84+
void send(
85+
const uint16_t number,
86+
const eprosima::fastdds::rtps::SampleIdentity& related_sample_identity);
87+
8388
const eprosima::fastdds::dds::Duration_t datawriter_latency_budget_duration() const;
8489

8590
const eprosima::fastdds::dds::Duration_t datareader_latency_budget_duration() const;
8691

8792
eprosima::fastdds::dds::RequesterQos create_requester_qos(
8893
bool volatile_durability_qos = false);
8994

95+
const eprosima::fastdds::rtps::GUID_t& get_reader_guid() const;
96+
97+
const eprosima::fastdds::rtps::SampleIdentity& get_last_related_sample_identity() const;
98+
9099
private:
91100

92101
ReqRepHelloWorldRequester& operator =(

test/blackbox/common/BlackboxTestsPubSubBasic.cpp

+56
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,62 @@ TEST_P(PubSubBasic, ReqRepAsReliableHelloworld)
284284
}
285285
}
286286

287+
TEST_P(PubSubBasic, ReqRepAsReliableHelloworldReaderGUID)
288+
{
289+
ReqRepHelloWorldRequester requester;
290+
ReqRepHelloWorldReplier replier;
291+
const uint16_t nmsgs = 10;
292+
293+
requester.init();
294+
295+
ASSERT_TRUE(requester.isInitialized());
296+
297+
replier.init();
298+
299+
requester.wait_discovery();
300+
replier.wait_discovery();
301+
302+
ASSERT_TRUE(replier.isInitialized());
303+
304+
for (uint16_t count = 0; count < nmsgs; ++count)
305+
{
306+
eprosima::fastdds::rtps::SampleIdentity related_sample_identity{};
307+
related_sample_identity.writer_guid(requester.get_reader_guid());
308+
requester.send(count, related_sample_identity);
309+
requester.block(std::chrono::seconds(5));
310+
}
311+
}
312+
313+
TEST_P(PubSubBasic, ReqRepAsReliableHelloworldConsecutive)
314+
{
315+
ReqRepHelloWorldRequester requester;
316+
ReqRepHelloWorldReplier replier;
317+
const uint16_t nmsgs = 10;
318+
319+
requester.init();
320+
321+
ASSERT_TRUE(requester.isInitialized());
322+
323+
replier.init();
324+
325+
requester.wait_discovery();
326+
replier.wait_discovery();
327+
328+
ASSERT_TRUE(replier.isInitialized());
329+
330+
requester.send(0);
331+
requester.block(std::chrono::seconds(5));
332+
333+
eprosima::fastdds::rtps::SampleIdentity related_sample_identity{};
334+
related_sample_identity = requester.get_last_related_sample_identity();
335+
336+
for (uint16_t count = 1; count < nmsgs; ++count)
337+
{
338+
requester.send(count, related_sample_identity);
339+
requester.block(std::chrono::seconds(5));
340+
}
341+
}
342+
287343
TEST_P(PubSubBasic, PubSubAsReliableData64kb)
288344
{
289345
PubSubReader<Data64kbPubSubType> reader(TEST_TOPIC_NAME);

0 commit comments

Comments
 (0)