From 5fe5b5d8ab4057353ee8cdc3acc780ebf9a91a58 Mon Sep 17 00:00:00 2001 From: Iker Luengo Date: Thu, 8 Apr 2021 11:54:04 +0200 Subject: [PATCH] Fix read_next_sample and take_next_sample This is a port of #1732 from to <2.2.x> * Refs 10476. Fix on DataReaderTests. Signed-off-by: Miguel Company * Refs 10476. Added xxx_next_sample tests to DataReaderTests.read_unread Signed-off-by: Miguel Company * Refs 10476. Strict real-time on read_or_take. Signed-off-by: Miguel Company * Refs 10476. Using LoanableTypedCollection on ReadTakeCommand and DataReaderLoanManager. Signed-off-by: Miguel Company * Refs 10476. Use ReadTakeCommand on take_next_sample. Signed-off-by: Miguel Company * Refs 10476. Use ReadTakeCommand on read_next_sample via read_or_take_next_sample. Signed-off-by: Miguel Company * Refs 10476. Adapt tests to new behavior of take Signed-off-by: Iker Luengo * Refs 10476. Added method get_first_change_with_minimum_ts to ReaderHistory. Signed-off-by: Miguel Company * Refs 10476. Add new method to mock. Signed-off-by: Miguel Company * Refs 10476. PubSubReader::last_seq is now a map for each instance. Signed-off-by: Miguel Company * Refs 10476. Added UserAllocatedSequence. Signed-off-by: Miguel Company * Refs 10476. Added take_first_data to PubSubReader. Signed-off-by: Miguel Company * Refs 10476. Using take_first_data on LifespanQos blackbox test. Signed-off-by: Miguel Company * Refs 10476. Uncrustify. Signed-off-by: Miguel Company * Refs 10476. Solve build error on non-windows platforms. Signed-off-by: Miguel Company * Refs 10480. Apply suggestions from code review Signed-off-by: Miguel Company Co-authored-by: IkerLuengo <57146230+IkerLuengo@users.noreply.github.com> Co-authored-by: Iker Luengo Co-authored-by: IkerLuengo <57146230+IkerLuengo@users.noreply.github.com> Signed-off-by: Iker Luengo --- .../dds/core/UserAllocatedSequence.hpp | 103 ++++++++++++++++++ include/fastdds/rtps/history/ReaderHistory.h | 3 + src/cpp/fastdds/subscriber/DataReaderImpl.cpp | 82 +++++++++----- src/cpp/fastdds/subscriber/DataReaderImpl.hpp | 5 + .../DataReaderImpl/DataReaderLoanManager.hpp | 4 +- .../DataReaderImpl/ReadTakeCommand.hpp | 4 +- src/cpp/rtps/history/ReaderHistory.cpp | 31 +++--- test/blackbox/api/dds-pim/PubSubReader.hpp | 85 ++++++++++++--- .../api/fastrtps_deprecated/PubSubReader.hpp | 28 ++++- .../common/BlackboxTestsLifespanQoS.cpp | 15 +-- .../common/DDSBlackboxTestsDataSharing.cpp | 34 ++---- .../fastdds/rtps/history/ReaderHistory.h | 7 ++ .../dds/subscriber/DataReaderTests.cpp | 45 +++++++- test/unittest/dds/subscriber/FooType.hpp | 6 + 14 files changed, 355 insertions(+), 97 deletions(-) create mode 100644 include/fastdds/dds/core/UserAllocatedSequence.hpp diff --git a/include/fastdds/dds/core/UserAllocatedSequence.hpp b/include/fastdds/dds/core/UserAllocatedSequence.hpp new file mode 100644 index 00000000000..eec686ab3ad --- /dev/null +++ b/include/fastdds/dds/core/UserAllocatedSequence.hpp @@ -0,0 +1,103 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file UserAllocatedSequence.hpp + */ + +#ifndef _FASTDDS_DDS_CORE_USERALLOCATEDSEQUENCE_HPP_ +#define _FASTDDS_DDS_CORE_USERALLOCATEDSEQUENCE_HPP_ + +#include +#include +#include + +#include + +namespace eprosima { +namespace fastdds { +namespace dds { + +/** + * A collection of generic opaque pointers allocated by the user. + * + * This kind of collection would always return @c true for @c has_ownership(), + * and thus would not be able to receive loans. + * It would also have an inmutable @c maximum(), so it would not allow @c length() to grow beyond the maximum + * value indicated on construction. + */ +struct UserAllocatedSequence : public LoanableCollection +{ + using size_type = LoanableCollection::size_type; + using element_type = LoanableCollection::element_type; + + /** + * Construct a UserAllocatedSequence. + * + * @param [in] items Pointer to the beginning of an array of @c num_items opaque pointers. + * @param [in] num_items Number of opaque pointers in @c items. + * + * @post buffer() == items + * @post has_ownership() == true + * @post length() == 0 + * @post maximum() == num_items + */ + UserAllocatedSequence( + element_type* items, + size_type num_items) + { + has_ownership_ = true; + maximum_ = num_items; + length_ = 0; + elements_ = items; + } + + ~UserAllocatedSequence() = default; + + // Non-copyable + UserAllocatedSequence( + const UserAllocatedSequence&) = delete; + UserAllocatedSequence& operator = ( + const UserAllocatedSequence&) = delete; + + // Non-moveable + UserAllocatedSequence( + UserAllocatedSequence&&) = delete; + UserAllocatedSequence& operator = ( + UserAllocatedSequence&&) = delete; + +protected: + + using LoanableCollection::maximum_; + using LoanableCollection::length_; + using LoanableCollection::elements_; + using LoanableCollection::has_ownership_; + + void resize( + size_type new_length) override + { + // This kind of collection cannot grow above its stack-allocated size + if (new_length > maximum_) + { + throw std::bad_alloc(); + } + } + +}; + +} // namespace dds +} // namespace fastdds +} // namespace eprosima + +#endif // _FASTDDS_DDS_CORE_USERALLOCATEDSEQUENCE_HPP_ diff --git a/include/fastdds/rtps/history/ReaderHistory.h b/include/fastdds/rtps/history/ReaderHistory.h index 856372a3cac..8b937dfd0a0 100644 --- a/include/fastdds/rtps/history/ReaderHistory.h +++ b/include/fastdds/rtps/history/ReaderHistory.h @@ -126,6 +126,9 @@ class ReaderHistory : public History RTPS_DllAPI void do_release_cache( CacheChange_t* ch) override; + iterator get_first_change_with_minimum_ts( + const Time_t timestamp); + //!Pointer to the reader RTPSReader* mp_reader; diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp index ea76c46070d..a982265d154 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp @@ -21,8 +21,9 @@ #include -#include +#include #include +#include #include #include #include @@ -409,7 +410,19 @@ ReturnCode_t DataReaderImpl::read_or_take( return code; } - std::lock_guard lock(reader_->getMutex()); + auto max_blocking_time = std::chrono::steady_clock::now() + +#if HAVE_STRICT_REALTIME + std::chrono::microseconds(::TimeConv::Time_t2MicroSecondsInt64(qos_.reliability().max_blocking_time)); +#else + std::chrono::hours(24); +#endif // if HAVE_STRICT_REALTIME + + std::unique_lock lock(reader_->getMutex(), std::defer_lock); + + if (!lock.try_lock_until(max_blocking_time)) + { + return ReturnCode_t::RETCODE_TIMEOUT; + } auto it = history_.lookup_instance(handle, exact_instance); if (!it.first) @@ -560,9 +573,10 @@ ReturnCode_t DataReaderImpl::return_loan( return ReturnCode_t::RETCODE_OK; } -ReturnCode_t DataReaderImpl::read_next_sample( +ReturnCode_t DataReaderImpl::read_or_take_next_sample( void* data, - SampleInfo* info) + SampleInfo* info, + bool should_take) { if (reader_ == nullptr) { @@ -580,43 +594,51 @@ ReturnCode_t DataReaderImpl::read_next_sample( #else std::chrono::hours(24); #endif // if HAVE_STRICT_REALTIME - SampleInfo_t rtps_info; - if (history_.readNextData(data, &rtps_info, max_blocking_time)) - { - sample_info_to_dds(rtps_info, info); - return ReturnCode_t::RETCODE_OK; - } - return ReturnCode_t::RETCODE_ERROR; -} -ReturnCode_t DataReaderImpl::take_next_sample( - void* data, - SampleInfo* info) -{ - if (reader_ == nullptr) + std::unique_lock lock(reader_->getMutex(), std::defer_lock); + + if (!lock.try_lock_until(max_blocking_time)) { - return ReturnCode_t::RETCODE_NOT_ENABLED; + return ReturnCode_t::RETCODE_TIMEOUT; } - if (history_.getHistorySize() == 0) + auto it = history_.lookup_instance(HANDLE_NIL, false); + if (!it.first) { return ReturnCode_t::RETCODE_NO_DATA; } - auto max_blocking_time = std::chrono::steady_clock::now() + -#if HAVE_STRICT_REALTIME - std::chrono::microseconds(::TimeConv::Time_t2MicroSecondsInt64(qos_.reliability().max_blocking_time)); -#else - std::chrono::hours(24); -#endif // if HAVE_STRICT_REALTIME + StackAllocatedSequence data_values; + const_cast(data_values.buffer())[0] = data; + StackAllocatedSequence sample_infos; - SampleInfo_t rtps_info; - if (history_.takeNextData(data, &rtps_info, max_blocking_time)) + detail::StateFilter states{ NOT_READ_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE }; + detail::ReadTakeCommand cmd(*this, data_values, sample_infos, 1, states, it.second, false); + while (!cmd.is_finished()) { - sample_info_to_dds(rtps_info, info); - return ReturnCode_t::RETCODE_OK; + cmd.add_instance(should_take); + } + + ReturnCode_t code = cmd.return_value(); + if (ReturnCode_t::RETCODE_OK == code) + { + *info = sample_infos[0]; } - return ReturnCode_t::RETCODE_ERROR; + return code; +} + +ReturnCode_t DataReaderImpl::read_next_sample( + void* data, + SampleInfo* info) +{ + return read_or_take_next_sample(data, info, false); +} + +ReturnCode_t DataReaderImpl::take_next_sample( + void* data, + SampleInfo* info) +{ + return read_or_take_next_sample(data, info, true); } ReturnCode_t DataReaderImpl::get_first_untaken_info( diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl.hpp b/src/cpp/fastdds/subscriber/DataReaderImpl.hpp index b713a5803b4..f9b9db12153 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl.hpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl.hpp @@ -402,6 +402,11 @@ class DataReaderImpl bool single_instance, bool should_take); + ReturnCode_t read_or_take_next_sample( + void* data, + SampleInfo* info, + bool should_take); + /** * @brief A method called when a new cache change is added * @param change The cache change that has been added diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl/DataReaderLoanManager.hpp b/src/cpp/fastdds/subscriber/DataReaderImpl/DataReaderLoanManager.hpp index 0af76ab3ac5..b563e94bbf0 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl/DataReaderLoanManager.hpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl/DataReaderLoanManager.hpp @@ -23,7 +23,7 @@ #include #include -#include +#include #include #include @@ -38,7 +38,7 @@ namespace detail { struct DataReaderLoanManager { - using SampleInfoSeq = LoanableSequence; + using SampleInfoSeq = LoanableTypedCollection; using ReturnCode_t = eprosima::fastrtps::types::ReturnCode_t; explicit DataReaderLoanManager( diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp b/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp index 4741167fd29..9e1895da4dd 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp @@ -23,7 +23,7 @@ #include #include -#include +#include #include #include @@ -51,7 +51,7 @@ struct ReadTakeCommand using CacheChange_t = eprosima::fastrtps::rtps::CacheChange_t; using RTPSReader = eprosima::fastrtps::rtps::RTPSReader; using WriterProxy = eprosima::fastrtps::rtps::WriterProxy; - using SampleInfoSeq = LoanableSequence; + using SampleInfoSeq = LoanableTypedCollection; ReadTakeCommand( DataReaderImpl& reader, diff --git a/src/cpp/rtps/history/ReaderHistory.cpp b/src/cpp/rtps/history/ReaderHistory.cpp index c8b73058511..2621437ad4f 100644 --- a/src/cpp/rtps/history/ReaderHistory.cpp +++ b/src/cpp/rtps/history/ReaderHistory.cpp @@ -71,19 +71,8 @@ bool ReaderHistory::add_change( logError(RTPS_READER_HISTORY, "The Writer GUID_t must be defined"); } - if (!m_changes.empty() && a_change->sourceTimestamp < (*m_changes.rbegin())->sourceTimestamp) - { - auto it = std::lower_bound(m_changes.begin(), m_changes.end(), a_change, - [](const CacheChange_t* c1, const CacheChange_t* c2) -> bool - { - return c1->sourceTimestamp < c2->sourceTimestamp; - }); - m_changes.insert(it, a_change); - } - else - { - m_changes.push_back(a_change); - } + auto it = get_first_change_with_minimum_ts(a_change->sourceTimestamp); + m_changes.insert(it, a_change); logInfo(RTPS_READER_HISTORY, "Change " << a_change->sequenceNumber << " added with " << a_change->serializedPayload.length << " bytes"); @@ -242,6 +231,22 @@ void ReaderHistory::do_release_cache( mp_reader->releaseCache(ch); } +History::iterator ReaderHistory::get_first_change_with_minimum_ts( + const Time_t timestamp) +{ + if (!m_changes.empty() && timestamp < (*m_changes.rbegin())->sourceTimestamp) + { + iterator it = std::lower_bound(m_changes.begin(), m_changes.end(), timestamp, + [](const CacheChange_t* c1, const Time_t& ts) -> bool + { + return c1->sourceTimestamp < ts; + }); + return it; + } + + return m_changes.end(); +} + } /* namespace rtps */ } /* namespace fastrtps */ } /* namespace eprosima */ diff --git a/test/blackbox/api/dds-pim/PubSubReader.hpp b/test/blackbox/api/dds-pim/PubSubReader.hpp index afaac3cedad..4934e6f0caa 100644 --- a/test/blackbox/api/dds-pim/PubSubReader.hpp +++ b/test/blackbox/api/dds-pim/PubSubReader.hpp @@ -31,18 +31,19 @@ #include #endif // _MSC_VER -#include +#include +#include #include +#include #include #include -#include -#include #include #include +#include +#include #include -#include +#include #include -#include #include #include #include @@ -413,7 +414,7 @@ class PubSubReader total_msgs_ = msgs; number_samples_expected_ = total_msgs_.size(); current_processed_count_ = 0; - last_seq = eprosima::fastrtps::rtps::SequenceNumber_t(); + last_seq.clear(); mutex_.unlock(); bool ret = false; @@ -424,7 +425,7 @@ class PubSubReader while (ret); receiving_.store(true); - return last_seq; + return get_last_sequence_received(); } void stopReception() @@ -463,7 +464,7 @@ class PubSubReader { block([this, seq]() -> bool { - return last_seq == seq; + return get_last_sequence_received() == seq; }); } @@ -499,6 +500,36 @@ class PubSubReader return current_processed_count_; } + void check_history_content( + std::list& expected_messages) + { + FASTDDS_SEQUENCE(DataSeq, type); + DataSeq data_seq; + eprosima::fastdds::dds::SampleInfoSeq info_seq; + + ReturnCode_t success = + datareader_->read(data_seq, info_seq, + eprosima::fastdds::dds::LENGTH_UNLIMITED, + eprosima::fastdds::dds::ANY_SAMPLE_STATE, + eprosima::fastdds::dds::ANY_VIEW_STATE, + eprosima::fastdds::dds::ANY_INSTANCE_STATE); + + if (ReturnCode_t::RETCODE_OK == success) + { + for (eprosima::fastdds::dds::LoanableCollection::size_type n = 0; n < info_seq.length(); ++n) + { + if (info_seq[n].valid_data) + { + auto it = std::find(expected_messages.begin(), expected_messages.end(), data_seq[n]); + ASSERT_NE(it, expected_messages.end()); + expected_messages.erase(it); + } + } + ASSERT_TRUE(expected_messages.empty()); + datareader_->return_loan(data_seq, info_seq); + } + } + void wait_discovery( std::chrono::seconds timeout = std::chrono::seconds::zero(), unsigned int min_writers = 1) @@ -657,7 +688,17 @@ class PubSubReader eprosima::fastrtps::rtps::SequenceNumber_t get_last_sequence_received() { - return last_seq; + if (last_seq.empty()) + { + return eprosima::fastrtps::rtps::SequenceNumber_t(); + } + + using pair_type = typename decltype(last_seq)::value_type; + auto seq_comp = [](const pair_type& v1, const pair_type& v2) -> bool + { + return v1.second < v2.second; + }; + return std::max_element(last_seq.cbegin(), last_seq.cend(), seq_comp)->second; } PubSubReader& deactivate_status_listener( @@ -1235,6 +1276,24 @@ class PubSubReader onEndpointDiscovery_ = f; } + bool take_first_data( + void* data) + { + using collection = eprosima::fastdds::dds::UserAllocatedSequence; + using info_seq_type = eprosima::fastdds::dds::SampleInfoSeq; + + collection::element_type buf[1] = { data }; + collection data_seq(buf, 1); + info_seq_type info_seq(1); + + if (ReturnCode_t::RETCODE_OK == datareader_->take(data_seq, info_seq)) + { + current_processed_count_++; + return true; + } + return false; + } + bool takeNextData( void* data) { @@ -1353,8 +1412,6 @@ class PubSubReader return participant_guid_; } -private: - void receive_one( eprosima::fastdds::dds::DataReader* datareader, bool& returnedValue) @@ -1373,8 +1430,8 @@ class PubSubReader std::unique_lock lock(mutex_); // Check order of changes. - ASSERT_LT(last_seq, info.sample_identity.sequence_number()); - last_seq = info.sample_identity.sequence_number(); + ASSERT_LT(last_seq[info.instance_handle], info.sample_identity.sequence_number()); + last_seq[info.instance_handle] = info.sample_identity.sequence_number(); if (info.instance_state == eprosima::fastdds::dds::ALIVE_INSTANCE_STATE) { @@ -1458,7 +1515,7 @@ class PubSubReader unsigned int participant_matched_; std::atomic receiving_; eprosima::fastdds::dds::TypeSupport type_; - eprosima::fastrtps::rtps::SequenceNumber_t last_seq; + std::map last_seq; size_t current_processed_count_; size_t number_samples_expected_; bool discovery_result_; diff --git a/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp b/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp index 5e80f2cad21..b74de5e5a80 100644 --- a/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp +++ b/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp @@ -338,7 +338,7 @@ class PubSubReader total_msgs_ = msgs; number_samples_expected_ = total_msgs_.size(); current_processed_count_ = 0; - last_seq = eprosima::fastrtps::rtps::SequenceNumber_t(); + last_seq.clear(); mutex_.unlock(); bool ret = false; @@ -387,7 +387,7 @@ class PubSubReader { block([this, seq]() -> bool { - return last_seq == seq; + return get_last_sequence_received() == seq; }); } @@ -560,7 +560,17 @@ class PubSubReader eprosima::fastrtps::rtps::SequenceNumber_t get_last_sequence_received() { - return last_seq; + if (last_seq.empty()) + { + return eprosima::fastrtps::rtps::SequenceNumber_t(); + } + + using pair_type = typename decltype(last_seq)::value_type; + auto seq_comp = [](const pair_type& v1, const pair_type& v2) -> bool + { + return v1.second < v2.second; + }; + return std::max_element(last_seq.cbegin(), last_seq.cend(), seq_comp)->second; } /*** Function to change QoS ***/ @@ -1094,6 +1104,12 @@ class PubSubReader onEndpointDiscovery_ = f; } + bool take_first_data( + void* data) + { + return takeNextData(data); + } + bool takeNextData( void* data) { @@ -1186,8 +1202,8 @@ class PubSubReader std::unique_lock lock(mutex_); // Check order of changes. - ASSERT_LT(last_seq, info.sample_identity.sequence_number()); - last_seq = info.sample_identity.sequence_number(); + ASSERT_LT(last_seq[info.instance_handle], info.sample_identity.sequence_number()); + last_seq[info.instance_handle] = info.sample_identity.sequence_number(); if (info.sampleKind == eprosima::fastrtps::rtps::ALIVE) { @@ -1267,7 +1283,7 @@ class PubSubReader unsigned int participant_matched_; std::atomic receiving_; type_support type_; - eprosima::fastrtps::rtps::SequenceNumber_t last_seq; + std::map last_seq; size_t current_processed_count_; size_t number_samples_expected_; bool discovery_result_; diff --git a/test/blackbox/common/BlackboxTestsLifespanQoS.cpp b/test/blackbox/common/BlackboxTestsLifespanQoS.cpp index 1faea46e2be..39709123030 100644 --- a/test/blackbox/common/BlackboxTestsLifespanQoS.cpp +++ b/test/blackbox/common/BlackboxTestsLifespanQoS.cpp @@ -124,9 +124,10 @@ TEST_P(LifespanQos, LongLifespan) // On the reader side we should be able to take the data HelloWorldType::type msg; - EXPECT_EQ(reader.takeNextData(&msg), true); - EXPECT_EQ(reader.takeNextData(&msg), true); - EXPECT_EQ(reader.takeNextData(&msg), true); + for (uint32_t i = 0; i < writer_samples; ++i) + { + EXPECT_EQ(reader.take_first_data(&msg), true); + } } TEST_P(LifespanQos, ShortLifespan) @@ -171,9 +172,10 @@ TEST_P(LifespanQos, ShortLifespan) // On the reader side we should not be able to take the data HelloWorldType::type msg; - EXPECT_EQ(reader.takeNextData(&msg), false); - EXPECT_EQ(reader.takeNextData(&msg), false); - EXPECT_EQ(reader.takeNextData(&msg), false); + for (uint32_t i = 0; i < writer_samples; ++i) + { + EXPECT_EQ(reader.take_first_data(&msg), false); + } } #ifdef INSTANTIATE_TEST_SUITE_P @@ -201,4 +203,3 @@ GTEST_INSTANTIATE_TEST_MACRO(LifespanQos, } }); - diff --git a/test/blackbox/common/DDSBlackboxTestsDataSharing.cpp b/test/blackbox/common/DDSBlackboxTestsDataSharing.cpp index 952b10462b3..5390c3cd3a3 100644 --- a/test/blackbox/common/DDSBlackboxTestsDataSharing.cpp +++ b/test/blackbox/common/DDSBlackboxTestsDataSharing.cpp @@ -116,6 +116,9 @@ TEST(DDSDataSharing, TransientReader) TEST(DDSDataSharing, BestEffortDirtyPayloads) { + // The writer's pool is smaller than the reader history. + // The number of samples is larger than the pool size, so some payloads get rused + // leaving dirty payloads in the reader PubSubReader read_reader(TEST_TOPIC_NAME, false); PubSubWriter writer(TEST_TOPIC_NAME); @@ -144,13 +147,11 @@ TEST(DDSDataSharing, BestEffortDirtyPayloads) writer.wait_discovery(); read_reader.wait_discovery(); - // Send the data to fill the history and overwrite old changes - // The reader will receive all changes but the application will see only the last ones std::list data = default_fixed_sized_data_generator(writer_sent_data); - std::list received_data; + std::list data_in_history; auto data_it = data.begin(); std::advance(data_it, writer_sent_data - writer_history_depth - 1); - std::copy(data_it, data.end(), std::back_inserter(received_data)); + std::copy(data_it, data.end(), std::back_inserter(data_in_history)); // Send the data to fill the history and overwrite old changes // The reader will receive and process all changes so that the writer can reuse them, @@ -161,17 +162,14 @@ TEST(DDSDataSharing, BestEffortDirtyPayloads) read_reader.block_for_all(); // Doing a second read on the same history, the application will see only the last samples - read_reader.startReception(received_data); - FixedSizedType::type value; - while (read_reader.takeNextData ((void*)&value)) - { - default_receive_print(value); - } - read_reader.block_for_all(); + read_reader.check_history_content(data_in_history); } TEST(DDSDataSharing, ReliableDirtyPayloads) { + // The writer's pool is smaller than the reader history. + // The number of samples is larger than the pool size, so some payloads get rused + // leaving dirty payloads in the reader PubSubReader read_reader(TEST_TOPIC_NAME, false); PubSubWriter writer(TEST_TOPIC_NAME); @@ -200,13 +198,11 @@ TEST(DDSDataSharing, ReliableDirtyPayloads) writer.wait_discovery(); read_reader.wait_discovery(); - // Send the data to fill the history and overwrite old changes - // The reader will receive all changes but the application will see only the last ones std::list data = default_fixed_sized_data_generator(writer_sent_data); - std::list received_data; + std::list data_in_history; auto data_it = data.begin(); std::advance(data_it, writer_sent_data - writer_history_depth - 1); - std::copy(data_it, data.end(), std::back_inserter(received_data)); + std::copy(data_it, data.end(), std::back_inserter(data_in_history)); // Send the data to fill the history and overwrite old changes // The reader will receive and process all changes so that the writer can reuse them, @@ -217,13 +213,7 @@ TEST(DDSDataSharing, ReliableDirtyPayloads) read_reader.block_for_all(); // Doing a second read on the same history, the application will see only the last samples - read_reader.startReception(received_data); - FixedSizedType::type value; - while (read_reader.takeNextData ((void*)&value)) - { - default_receive_print(value); - } - read_reader.block_for_all(); + read_reader.check_history_content(data_in_history); } TEST(DDSDataSharing, DataSharingWriter_DifferentDomainReaders) diff --git a/test/mock/rtps/ReaderHistory/fastdds/rtps/history/ReaderHistory.h b/test/mock/rtps/ReaderHistory/fastdds/rtps/history/ReaderHistory.h index 8b3add6941c..3a8af73ccd3 100644 --- a/test/mock/rtps/ReaderHistory/fastdds/rtps/history/ReaderHistory.h +++ b/test/mock/rtps/ReaderHistory/fastdds/rtps/history/ReaderHistory.h @@ -114,6 +114,13 @@ class ReaderHistory std::mutex samples_number_mutex_; unsigned int samples_number_; SequenceNumber_t last_sequence_number_; + + iterator get_first_change_with_minimum_ts( + const Time_t& /* timestamp */) + { + return m_changes.end(); + } + }; } // namespace rtps diff --git a/test/unittest/dds/subscriber/DataReaderTests.cpp b/test/unittest/dds/subscriber/DataReaderTests.cpp index 0b0e8f8146f..7995eff048d 100644 --- a/test/unittest/dds/subscriber/DataReaderTests.cpp +++ b/test/unittest/dds/subscriber/DataReaderTests.cpp @@ -358,7 +358,6 @@ class DataReaderTests : public ::testing::Test DataType data; SampleInfo info; - EXPECT_EQ(code, data_reader->read_next_sample(&data, &info)); EXPECT_EQ(code, data_reader->take_next_sample(&data, &info)); if (ReturnCode_t::RETCODE_OK == code) { @@ -366,6 +365,7 @@ class DataReaderTests : public ::testing::Test data_writer_->write(&data); data_reader->wait_for_unread_message(time_to_wait); } + EXPECT_EQ(code, data_reader->read_next_sample(&data, &info)); } // Return code when requesting a bad instance @@ -1276,6 +1276,49 @@ TEST_F(DataReaderTests, read_unread) EXPECT_EQ(ok_code, data_reader_->return_loan(data_seq[i], info_seq[i])); } } + + // Check read_next_sample / take_next_sample + { + // Send a bunch of samples + for (char i = 0; i < num_samples; ++i) + { + data.message()[0] = i + '0'; + EXPECT_EQ(ok_code, data_writer_->write(&data, handle_ok_)); + } + + // Reader should have 10 samples with the following states (R = read, N = not-read, / = removed from history) + // {N, N, N, N, N, N, N, N, N, N} + + // Read a sample and take another + for (char i = 0; i < num_samples; i += 2) + { + FooType read_data; + FooType take_data; + SampleInfo read_info; + SampleInfo take_info; + + EXPECT_EQ(ok_code, data_reader_->read_next_sample(&read_data, &read_info)); + EXPECT_EQ(read_data.message()[0], i + '0'); + + EXPECT_EQ(ok_code, data_reader_->take_next_sample(&take_data, &take_info)); + EXPECT_EQ(take_data.message()[0], i + '1'); + + EXPECT_FALSE(read_data == take_data); + EXPECT_NE(read_info.sample_identity, take_info.sample_identity); + } + + // Reader sample states should be + // {R, /, R, /, R, /, R, /, R, /} + + // As all samples are read, read_next_sample and take_next_sample should not return data + { + FooType read_data; + SampleInfo read_info; + + EXPECT_EQ(no_data_code, data_reader_->read_next_sample(&read_data, &read_info)); + EXPECT_EQ(no_data_code, data_reader_->take_next_sample(&read_data, &read_info)); + } + } } TEST_F(DataReaderTests, TerminateWithoutDestroyingReader) diff --git a/test/unittest/dds/subscriber/FooType.hpp b/test/unittest/dds/subscriber/FooType.hpp index 08e9684753a..3a6f9f074c0 100644 --- a/test/unittest/dds/subscriber/FooType.hpp +++ b/test/unittest/dds/subscriber/FooType.hpp @@ -85,6 +85,12 @@ class FooType scdr << index_; } + inline bool operator ==( + const FooType& other) const + { + return (index_ == other.index_) && (message_ == other.message_); + } + private: uint32_t index_ = 0;