-
Notifications
You must be signed in to change notification settings - Fork 621
archival: Unify segment upload and reupload code paths #25731
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
archival: Unify segment upload and reupload code paths #25731
Conversation
6b95872
to
4dd2795
Compare
from the 'segment_collector' intance Signed-off-by: Evgeny Lazin <[email protected]>
...make_puload_candidate_stream method Signed-off-by: Evgeny Lazin <[email protected]>
"make_upload_candidate_stream" can skip offset now. The behavior is similar to how ntp_archiver works in this case. Signed-off-by: Evgeny Lazin <[email protected]>
Signed-off-by: Evgeny Lazin <[email protected]>
Adds new collector mode which can be used to upload new segments. This makes 'segment_collector' suitable to replace some logic inside the 'archival_policy'. The long term goal here is to make 'segment_reupload' the only code path which is used to find upload candidates. Then the 'ntp_archiver' should start using method of the segment collector which returns a stream instead of the set of segments. When its done the internals of the 'segment_collector' could be updated to use 'log_reader' instead of iterating through the segments. Signed-off-by: Evgeny Lazin <[email protected]>
4dd2795
to
1a722c1
Compare
1a722c1
to
9b26977
Compare
Retry command for Build#64471please wait until all jobs are finished before running the slash command
|
CI test resultstest results on build#64471
test results on build#64541
test results on build#64561
test results on build#64583
|
If I'm not mistaken, we also need to reproduce this logic somewhere: redpanda/src/v/cluster/archival/archival_policy.cc Lines 247 to 265 in 5d8dc56
BTW @Lazin, I'm hacking on this diff a bit, locally, so I'm happy to push it over the line if you're working on other stuff. Up to you. |
something like this? diff --git a/src/v/cluster/archival/archival_policy.cc b/src/v/cluster/archival/archival_policy.cc
index 8255690645..c368507b51 100644
--- a/src/v/cluster/archival/archival_policy.cc
+++ b/src/v/cluster/archival/archival_policy.cc
@@ -203,7 +203,8 @@ ss::future<candidate_creation_result> archival_policy::get_next_segment(
segment_collector.collect_segments(
segment_collector_mode::new_non_compacted);
- if (!segment_collector.segment_ready_for_upload()) {
+ if (!segment_collector.segment_ready_for_upload(
+ begin_inclusive, end_exclusive, flush_offset)) {
co_return candidate_creation_error::no_segments_collected;
}
diff --git a/src/v/cluster/archival/segment_reupload.cc b/src/v/cluster/archival/segment_reupload.cc
index c6ad537c0b..de6dccb659 100644
--- a/src/v/cluster/archival/segment_reupload.cc
+++ b/src/v/cluster/archival/segment_reupload.cc
@@ -534,8 +534,47 @@ bool segment_collector::should_replace_manifest_segment() const {
return _can_replace_manifest_segment && _begin_inclusive < _end_inclusive;
}
-bool segment_collector::segment_ready_for_upload() const {
- return _begin_inclusive <= _end_inclusive && !_segments.empty();
+bool segment_collector::segment_ready_for_upload(
+ model::offset start_offset,
+ model::offset end_exclusive,
+ std::optional<model::offset> flush_offset) const {
+ if (_begin_inclusive > _end_inclusive || _segments.empty()) {
+ return false;
+ }
+ const auto* segment = _segments.front().get();
+ if (segment == nullptr) {
+ return false;
+ }
+
+ auto below_flush_offset = flush_offset.has_value()
+ && segment->offsets().get_base_offset()
+ <= flush_offset.value();
+ auto closed = !segment->has_appender();
+
+ if (!closed && !below_flush_offset) {
+ auto kafka_start_offset = _log.from_log_offset(start_offset);
+ auto kafka_lso = _log.from_log_offset(end_exclusive);
+
+ if (kafka_start_offset >= kafka_lso) {
+ // If timeboxed uploads are enabled and there is no producer
+ // activity, we can get into a nasty loop where we upload a segment,
+ // add an archival metadata batch, upload a segment containing that
+ // batch, add another archival metadata batch, etc. This leads to
+ // lots of small segments that don't contain data being uploaded. To
+ // avoid it, we check that kafka (translated) offset increases.
+ vlog(
+ archival_log.debug,
+ "Segment collector for {}: can't find candidate, only non-data "
+ "batches to upload (kafka start_offset: {}, kafka "
+ "last_stable_offset: {})",
+ _manifest.get_ntp(),
+ kafka_start_offset,
+ kafka_lso);
+ return false;
+ }
+ }
+
+ return true;
}
cloud_storage::segment_name segment_collector::adjust_segment_name() const {
@@ -633,10 +672,11 @@ ss::future<candidate_creation_result> segment_collector::make_upload_candidate(
for (const auto& s : _segments) {
fmt::print(
seg,
- "{}-{}/{}; ",
+ "{}-{}/{} [closed? {}]; ",
s->offsets().get_base_offset(),
s->offsets().get_committed_offset(),
- s->size_bytes());
+ s->size_bytes(),
+ !s->has_appender());
}
vlog(archival_log.debug, "Collected segments: {}", seg.str());
}
@@ -793,6 +833,8 @@ ss::future<candidate_creation_result> segment_collector::make_upload_candidate(
}
}
+ std::cerr << "create upload candidate" << std::endl;
+
co_return upload_candidate_with_locks{
upload_candidate{
.exposed_name = adjust_segment_name(),
diff --git a/src/v/cluster/archival/segment_reupload.h b/src/v/cluster/archival/segment_reupload.h
index fb1ff454d3..f7a7fd140c 100644
--- a/src/v/cluster/archival/segment_reupload.h
+++ b/src/v/cluster/archival/segment_reupload.h
@@ -91,7 +91,10 @@ public:
bool should_replace_manifest_segment() const;
/// Segments are found and can be uploaded to the cloud.
- bool segment_ready_for_upload() const;
+ bool segment_ready_for_upload(
+ model::offset start_offset,
+ model::offset end_exclusive,
+ std::optional<model::offset> flush_offset) const;
/// The starting point for the collection, this may not coincide with the
/// start of the first collected segment. It should be aligned |
@oleiman IIUC the proposed change the upload candidate lookup is happening every time even if the only batch in the log is a config batch. I think it could be done without scanning the segments. I'll try to fix this today. If the PR is unchanged within few hours from the moment I posted this message feel free to push into my branch. |
9b26977
to
b9f8508
Compare
I made the change archival_policy. So now it checks that the upload will move last uploaded kafka offset forward before looking at segments. Let's see how it will work. |
Retry command for Build#64541please wait until all jobs are finished before running the slash command
|
Previously, the search of segments that can be uploaded to the cloud storage was implemented inside the archival_policy. For non-compacted segments the archival_policy used its internal implementation but for compacted reuploads it was using segment_collector. This commit introduces new segment serach logic that uses segment_collector so both compacted and non-compacted uploads are going through the collector now. Signed-off-by: Evgeny Lazin <[email protected]>
Signed-off-by: Evgeny Lazin <[email protected]>
b9f8508
to
b670448
Compare
Retry command for Build#64561please wait until all jobs are finished before running the slash command
|
auto kafka_begin_inclusive = log->from_log_offset(begin_inclusive); | ||
auto kafka_end_exclusive = log->from_log_offset( | ||
model::next_offset(end_inclusive.value())); | ||
if (kafka_begin_inclusive > kafka_end_exclusive) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (kafka_begin_inclusive > kafka_end_exclusive) { | |
if (kafka_begin_inclusive >= kafka_end_exclusive) { |
seems to do it 🙂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not quite :\
Retry command for Build#64583please wait until all jobs are finished before running the slash command
|
This PR changes the segment upload path.
Previously, we used
segment_collector
to reupload data (compacted reupload or adjacent segment merging). To upload new segmentsarchival_policy
was used. So the code is scattered across two different places. This PR adds new collection mode to thesegment_collector
. This mode allows segment collector to upload new segments. It works similarly toarchival_policy
, tries to upload whole segments if possible. Does the scanning if needed. The goal here is to replace implementation of thesegment_collector
with the one that useslog_reader
interface.Backports Required
Release Notes