Skip to content

Commit 8206e4b

Browse files
juanjo4936EugenioCollado
authored andcommitted
[22648] Unacknowledged sample removed in KeepAll mode (#5618)
* Refs #22648: Regression test Signed-off-by: Juanjo Garcia <[email protected]> * Refs #22648: corrected bug Signed-off-by: Juanjo Garcia <[email protected]> * Update comment Signed-off-by: EugenioCollado <[email protected]> --------- Signed-off-by: Juanjo Garcia <[email protected]> Signed-off-by: EugenioCollado <[email protected]> Co-authored-by: EugenioCollado <[email protected]> (cherry picked from commit 68f97fe)
1 parent 90890a8 commit 8206e4b

File tree

2 files changed

+92
-0
lines changed

2 files changed

+92
-0
lines changed

src/cpp/fastdds/publisher/DataWriterHistory.cpp

+2
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,8 @@ bool DataWriterHistory::prepare_change(
160160
if (history_qos_.kind == KEEP_ALL_HISTORY_QOS)
161161
{
162162
ret = this->mp_writer->try_remove_change(max_blocking_time, lock);
163+
// If change was removed (ret == 1) in KeepAllHistory, it must have been acked
164+
is_acked = ret;
163165
}
164166
else if (history_qos_.kind == KEEP_LAST_HISTORY_QOS)
165167
{

test/blackbox/common/DDSBlackboxTestsListeners.cpp

+90
Original file line numberDiff line numberDiff line change
@@ -3476,6 +3476,96 @@ TEST(DDSStatus, entire_history_acked_volatile_unknown_pointer)
34763476
}
34773477
}
34783478

3479+
/*¡
3480+
* Regression Test for 22648: on_unacknowledged_sample_removed callback is called when writer with keep all
3481+
* history is used, when the history was full but before max_blocking_time a sample was acknowledged, as is_acked was
3482+
* checked before the waiting time, and is not re-checked. This should not happen.
3483+
*/
3484+
TEST(DDSStatus, reliable_keep_all_unack_sample_removed_call)
3485+
{
3486+
auto test_transport = std::make_shared<test_UDPv4TransportDescriptor>();
3487+
test_transport->drop_data_messages_filter_ = [](eprosima::fastdds::rtps::CDRMessage_t& msg) -> bool
3488+
{
3489+
static std::vector<std::pair<eprosima::fastdds::rtps::SequenceNumber_t,
3490+
std::chrono::steady_clock::time_point>> delayed_messages;
3491+
3492+
uint32_t old_pos = msg.pos;
3493+
3494+
// Parse writer ID and sequence number
3495+
msg.pos += 2; // flags
3496+
msg.pos += 2; // inline QoS
3497+
msg.pos += 4; // reader ID
3498+
auto writerID = eprosima::fastdds::helpers::cdr_parse_entity_id((char*)&msg.buffer[msg.pos]);
3499+
msg.pos += 4;
3500+
eprosima::fastdds::rtps::SequenceNumber_t sn;
3501+
sn.high = (int32_t)eprosima::fastdds::helpers::cdr_parse_u32((char*)&msg.buffer[msg.pos]);
3502+
msg.pos += 4;
3503+
sn.low = eprosima::fastdds::helpers::cdr_parse_u32((char*)&msg.buffer[msg.pos]);
3504+
3505+
// Restore buffer position
3506+
msg.pos = old_pos;
3507+
3508+
// Delay logic for user endpoints only
3509+
if ((writerID.value[3] & 0xC0) == 0) // only user endpoints
3510+
{
3511+
auto now = std::chrono::steady_clock::now();
3512+
auto it = std::find_if(delayed_messages.begin(), delayed_messages.end(),
3513+
[&sn](const auto& pair)
3514+
{
3515+
return pair.first == sn;
3516+
});
3517+
3518+
if (it == delayed_messages.end())
3519+
{
3520+
// If the sequence number is encountered for the first time, start the delay
3521+
delayed_messages.emplace_back(sn, now + std::chrono::milliseconds(750)); // Add delay
3522+
return true; // Start dropping this message
3523+
}
3524+
else if (now < it->second)
3525+
{
3526+
// If the delay period has not elapsed, keep dropping the message
3527+
return true;
3528+
}
3529+
else
3530+
{
3531+
// Once the delay has elapsed, allow the message to proceed
3532+
delayed_messages.erase(it);
3533+
}
3534+
}
3535+
return false; // Allow message to proceed
3536+
};
3537+
3538+
PubSubWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);
3539+
PubSubReader<HelloWorldPubSubType> reader(TEST_TOPIC_NAME);
3540+
3541+
writer.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS, eprosima::fastdds::dds::Duration_t (200, 0))
3542+
.history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS)
3543+
.resource_limits_max_instances(1)
3544+
.resource_limits_max_samples(1)
3545+
.resource_limits_max_samples_per_instance(1)
3546+
.disable_builtin_transport()
3547+
.add_user_transport_to_pparams(test_transport)
3548+
.init();
3549+
ASSERT_TRUE(writer.isInitialized());
3550+
3551+
reader.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
3552+
.init();
3553+
ASSERT_TRUE(reader.isInitialized());
3554+
3555+
// Wait for discovery
3556+
writer.wait_discovery();
3557+
reader.wait_discovery();
3558+
3559+
auto data = default_helloworld_data_generator(2);
3560+
3561+
for (auto sample : data)
3562+
{
3563+
writer.send_sample(sample);
3564+
}
3565+
3566+
EXPECT_EQ(writer.times_unack_sample_removed(), 0u);
3567+
}
3568+
34793569
/*!
34803570
* Test that checks with a writer of each type that having the same listener attached, the notified writer in the
34813571
* callback is the corresponding writer that has removed a sample unacknowledged.

0 commit comments

Comments
 (0)