Skip to content

Commit 7ef397c

Browse files
[22648] Unacknowledged sample removed in KeepAll mode (#5618)
* Refs #22648: Regression test Signed-off-by: Juanjo Garcia <[email protected]> * Refs #22648: corrected bug Signed-off-by: Juanjo Garcia <[email protected]> * Update comment Signed-off-by: EugenioCollado <[email protected]> --------- Signed-off-by: Juanjo Garcia <[email protected]> Signed-off-by: EugenioCollado <[email protected]> Co-authored-by: EugenioCollado <[email protected]> (cherry picked from commit 68f97fe)
1 parent bc53ad5 commit 7ef397c

File tree

2 files changed

+93
-1
lines changed

2 files changed

+93
-1
lines changed

src/cpp/fastdds/publisher/DataWriterHistory.cpp

+2
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,8 @@ bool DataWriterHistory::prepare_change(
151151
if (history_qos_.kind == KEEP_ALL_HISTORY_QOS)
152152
{
153153
ret = this->mp_writer->try_remove_change(max_blocking_time, lock);
154+
// If change was removed (ret == 1) in KeepAllHistory, it must have been acked
155+
is_acked = ret;
154156
}
155157
else if (history_qos_.kind == KEEP_LAST_HISTORY_QOS)
156158
{

test/blackbox/common/DDSBlackboxTestsListeners.cpp

+91-1
Original file line numberDiff line numberDiff line change
@@ -3400,7 +3400,7 @@ TEST(DDSStatus, entire_history_acked_volatile_unknown_pointer)
34003400
PubSubWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);
34013401
PubSubReader<HelloWorldPubSubType> reader(TEST_TOPIC_NAME);
34023402

3403-
writer.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS, eprosima::fastrtps::Duration_t (200, 0))
3403+
writer.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS, eprosima::fastdds::dds::Duration_t (200, 0))
34043404
.durability_kind(eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS)
34053405
.history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS)
34063406
.resource_limits_max_instances(1)
@@ -3428,6 +3428,96 @@ TEST(DDSStatus, entire_history_acked_volatile_unknown_pointer)
34283428
}
34293429
}
34303430

3431+
/*¡
3432+
* Regression Test for 22648: on_unacknowledged_sample_removed callback is called when writer with keep all
3433+
* history is used, when the history was full but before max_blocking_time a sample was acknowledged, as is_acked was
3434+
* checked before the waiting time, and is not re-checked. This should not happen.
3435+
*/
3436+
TEST(DDSStatus, reliable_keep_all_unack_sample_removed_call)
3437+
{
3438+
auto test_transport = std::make_shared<test_UDPv4TransportDescriptor>();
3439+
test_transport->drop_data_messages_filter_ = [](eprosima::fastdds::rtps::CDRMessage_t& msg) -> bool
3440+
{
3441+
static std::vector<std::pair<eprosima::fastdds::rtps::SequenceNumber_t,
3442+
std::chrono::steady_clock::time_point>> delayed_messages;
3443+
3444+
uint32_t old_pos = msg.pos;
3445+
3446+
// Parse writer ID and sequence number
3447+
msg.pos += 2; // flags
3448+
msg.pos += 2; // inline QoS
3449+
msg.pos += 4; // reader ID
3450+
auto writerID = eprosima::fastdds::helpers::cdr_parse_entity_id((char*)&msg.buffer[msg.pos]);
3451+
msg.pos += 4;
3452+
eprosima::fastdds::rtps::SequenceNumber_t sn;
3453+
sn.high = (int32_t)eprosima::fastdds::helpers::cdr_parse_u32((char*)&msg.buffer[msg.pos]);
3454+
msg.pos += 4;
3455+
sn.low = eprosima::fastdds::helpers::cdr_parse_u32((char*)&msg.buffer[msg.pos]);
3456+
3457+
// Restore buffer position
3458+
msg.pos = old_pos;
3459+
3460+
// Delay logic for user endpoints only
3461+
if ((writerID.value[3] & 0xC0) == 0) // only user endpoints
3462+
{
3463+
auto now = std::chrono::steady_clock::now();
3464+
auto it = std::find_if(delayed_messages.begin(), delayed_messages.end(),
3465+
[&sn](const auto& pair)
3466+
{
3467+
return pair.first == sn;
3468+
});
3469+
3470+
if (it == delayed_messages.end())
3471+
{
3472+
// If the sequence number is encountered for the first time, start the delay
3473+
delayed_messages.emplace_back(sn, now + std::chrono::milliseconds(750)); // Add delay
3474+
return true; // Start dropping this message
3475+
}
3476+
else if (now < it->second)
3477+
{
3478+
// If the delay period has not elapsed, keep dropping the message
3479+
return true;
3480+
}
3481+
else
3482+
{
3483+
// Once the delay has elapsed, allow the message to proceed
3484+
delayed_messages.erase(it);
3485+
}
3486+
}
3487+
return false; // Allow message to proceed
3488+
};
3489+
3490+
PubSubWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);
3491+
PubSubReader<HelloWorldPubSubType> reader(TEST_TOPIC_NAME);
3492+
3493+
writer.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS, eprosima::fastdds::dds::Duration_t (200, 0))
3494+
.history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS)
3495+
.resource_limits_max_instances(1)
3496+
.resource_limits_max_samples(1)
3497+
.resource_limits_max_samples_per_instance(1)
3498+
.disable_builtin_transport()
3499+
.add_user_transport_to_pparams(test_transport)
3500+
.init();
3501+
ASSERT_TRUE(writer.isInitialized());
3502+
3503+
reader.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
3504+
.init();
3505+
ASSERT_TRUE(reader.isInitialized());
3506+
3507+
// Wait for discovery
3508+
writer.wait_discovery();
3509+
reader.wait_discovery();
3510+
3511+
auto data = default_helloworld_data_generator(2);
3512+
3513+
for (auto sample : data)
3514+
{
3515+
writer.send_sample(sample);
3516+
}
3517+
3518+
EXPECT_EQ(writer.times_unack_sample_removed(), 0u);
3519+
}
3520+
34313521
/*!
34323522
* Test that checks with a writer of each type that having the same listener attached, the notified writer in the
34333523
* callback is the corresponding writer that has removed a sample unacknowledged.

0 commit comments

Comments
 (0)