Skip to content

Commit a4c4782

Browse files
MiguelCompanyJesusPoderoso
authored andcommitted
Fix issue with exclusive ownership and unordered samples (#5182)
* Refs #20866. Regression test. Signed-off-by: Miguel Company <[email protected]> * Refs #20866. Additional regression test. Signed-off-by: Miguel Company <[email protected]> * Refs #20866. Fix issue. Signed-off-by: Miguel Company <[email protected]> * Refs #20866. Fix unit tests. Signed-off-by: Miguel Company <[email protected]> * Refs #20866. Refactor test to run several cases. Signed-off-by: Miguel Company <[email protected]> --------- Signed-off-by: Miguel Company <[email protected]> (cherry picked from commit b1a7fe2) Signed-off-by: JesusPoderoso <[email protected]> # Conflicts: # test/unittest/dds/subscriber/DataReaderHistoryTests.cpp
1 parent 68b7e03 commit a4c4782

File tree

3 files changed

+208
-81
lines changed

3 files changed

+208
-81
lines changed

src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -858,13 +858,39 @@ bool DataReaderHistory::update_instance_nts(
858858

859859
assert(vit != instances_.end());
860860
assert(false == change->isRead);
861+
auto previous_owner = vit->second->current_owner.first;
861862
++counters_.samples_unread;
862863
bool ret =
863864
vit->second->update_state(counters_, change->kind, change->writerGUID,
864865
change->reader_info.writer_ownership_strength);
865866
change->reader_info.disposed_generation_count = vit->second->disposed_generation_count;
866867
change->reader_info.no_writers_generation_count = vit->second->no_writers_generation_count;
867868

869+
auto current_owner = vit->second->current_owner.first;
870+
if (current_owner != previous_owner)
871+
{
872+
assert(current_owner == change->writerGUID);
873+
874+
// Remove all changes from different owners after the change.
875+
DataReaderInstance::ChangeCollection& changes = vit->second->cache_changes;
876+
auto it = std::lower_bound(changes.begin(), changes.end(), change, rtps::history_order_cmp);
877+
assert(it != changes.end());
878+
assert(*it == change);
879+
++it;
880+
while (it != changes.end())
881+
{
882+
if ((*it)->writerGUID != current_owner)
883+
{
884+
// Remove from history
885+
remove_change_sub(*it, it);
886+
887+
// Current iterator will point to change next to the one removed. Avoid incrementing.
888+
continue;
889+
}
890+
++it;
891+
}
892+
}
893+
868894
return ret;
869895
}
870896

test/blackbox/common/DDSBlackboxTestsOwnershipQos.cpp

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2161,6 +2161,142 @@ TEST_P(OwnershipQos, exclusive_kind_keyed_besteffort_disposing_instance)
21612161
exclusive_kind_keyed_disposing_instance(false);
21622162
}
21632163

2164+
/*!
2165+
* This is a regression test for redmine issue 20866.
2166+
*
2167+
* This test checks that a reader keeping a long number of samples and with an exclusive ownership policy only
2168+
* returns the data from the writer with the highest strength.
2169+
*
2170+
* @param use_keep_all_history Whether to use KEEP_ALL history or KEEP_LAST(20).
2171+
* @param mixed_data Whether to send data from both writers in an interleaved way.
2172+
*/
2173+
static void test_exclusive_kind_big_history(
2174+
bool use_keep_all_history,
2175+
bool mixed_data)
2176+
{
2177+
PubSubReader<KeyedHelloWorldPubSubType> reader(TEST_TOPIC_NAME);
2178+
PubSubWriter<KeyedHelloWorldPubSubType> low_strength_writer(TEST_TOPIC_NAME);
2179+
PubSubWriter<KeyedHelloWorldPubSubType> high_strength_writer(TEST_TOPIC_NAME);
2180+
2181+
// Configure history QoS.
2182+
if (use_keep_all_history)
2183+
{
2184+
reader.history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS);
2185+
low_strength_writer.history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS);
2186+
high_strength_writer.history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS);
2187+
}
2188+
else
2189+
{
2190+
reader.history_kind(eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS).history_depth(20);
2191+
low_strength_writer.history_kind(eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS).history_depth(20);
2192+
high_strength_writer.history_kind(eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS).history_depth(20);
2193+
}
2194+
2195+
// Prepare data.
2196+
std::list<KeyedHelloWorld> generated_data = default_keyedhelloworld_data_generator(20);
2197+
auto middle = std::next(generated_data.begin(), 10);
2198+
std::list<KeyedHelloWorld> low_strength_data(generated_data.begin(), middle);
2199+
std::list<KeyedHelloWorld> high_strength_data(middle, generated_data.end());
2200+
auto expected_data = high_strength_data;
2201+
2202+
if (mixed_data)
2203+
{
2204+
// Expect reception of the first two samples from the low strength writer (one per instance).
2205+
auto it = low_strength_data.begin();
2206+
expected_data.push_front(*it++);
2207+
expected_data.push_front(*it);
2208+
}
2209+
2210+
// Initialize writers.
2211+
low_strength_writer.ownership_strength(3)
2212+
.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
2213+
.init();
2214+
ASSERT_TRUE(low_strength_writer.isInitialized());
2215+
2216+
// High strength writer will use a custom transport to ensure its data is received after the low strength data.
2217+
auto test_transport = std::make_shared<test_UDPv4TransportDescriptor>();
2218+
std::atomic<bool> drop_messages(false);
2219+
test_transport->messages_filter_ = [&drop_messages](eprosima::fastdds::rtps::CDRMessage_t&)
2220+
{
2221+
return drop_messages.load();
2222+
};
2223+
high_strength_writer.ownership_strength(4)
2224+
.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
2225+
.disable_builtin_transport()
2226+
.add_user_transport_to_pparams(test_transport)
2227+
.init();
2228+
ASSERT_TRUE(high_strength_writer.isInitialized());
2229+
2230+
// Initialize reader.
2231+
reader.ownership_exclusive()
2232+
.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
2233+
.init();
2234+
ASSERT_TRUE(reader.isInitialized());
2235+
2236+
// Wait for discovery.
2237+
low_strength_writer.wait_discovery();
2238+
high_strength_writer.wait_discovery();
2239+
reader.wait_discovery(std::chrono::seconds::zero(), 2);
2240+
2241+
// Drop the messages from the high strength writer, so they arrive later to the reader.
2242+
drop_messages.store(true);
2243+
2244+
if (mixed_data)
2245+
{
2246+
// Send one sample from each writer, with low strength data first.
2247+
while (!low_strength_data.empty() && !high_strength_data.empty())
2248+
{
2249+
EXPECT_TRUE(low_strength_writer.send_sample(low_strength_data.front()));
2250+
EXPECT_TRUE(high_strength_writer.send_sample(high_strength_data.front()));
2251+
low_strength_data.pop_front();
2252+
high_strength_data.pop_front();
2253+
}
2254+
}
2255+
else
2256+
{
2257+
// Send high strength data first, so it has the lowest source timestamps, but drop the messages, so they arrive
2258+
// later to the reader.
2259+
high_strength_writer.send(high_strength_data);
2260+
EXPECT_TRUE(high_strength_data.empty());
2261+
2262+
// Send low strength data, so it has the highest source timestamps.
2263+
low_strength_writer.send(low_strength_data);
2264+
EXPECT_TRUE(low_strength_data.empty());
2265+
}
2266+
2267+
// Wait for the reader to receive the low strength data.
2268+
EXPECT_TRUE(low_strength_writer.waitForAllAcked(std::chrono::seconds(1)));
2269+
2270+
// Let high strength writer send the data, and wait for the reader to receive it.
2271+
drop_messages.store(false);
2272+
EXPECT_TRUE(high_strength_writer.waitForAllAcked(std::chrono::seconds(1)));
2273+
2274+
// Make the reader process the data, expecting only the required data.
2275+
// The issue was reproduced by the reader complaining about reception of unexpected data.
2276+
reader.startReception(expected_data);
2277+
reader.block_for_all();
2278+
}
2279+
2280+
TEST(OwnershipQos, exclusive_kind_keep_all_reliable)
2281+
{
2282+
test_exclusive_kind_big_history(true, false);
2283+
}
2284+
2285+
TEST(OwnershipQos, exclusive_kind_keep_all_reliable_mixed)
2286+
{
2287+
test_exclusive_kind_big_history(true, true);
2288+
}
2289+
2290+
TEST(OwnershipQos, exclusive_kind_keep_last_reliable)
2291+
{
2292+
test_exclusive_kind_big_history(false, false);
2293+
}
2294+
2295+
TEST(OwnershipQos, exclusive_kind_keep_last_reliable_mixed)
2296+
{
2297+
test_exclusive_kind_big_history(false, true);
2298+
}
2299+
21642300
#ifdef INSTANTIATE_TEST_SUITE_P
21652301
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
21662302
#else

0 commit comments

Comments
 (0)