Skip to content

archival: Unify segment upload and reupload code paths #25731

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
377 changes: 51 additions & 326 deletions src/v/cluster/archival/archival_policy.cc

Large diffs are not rendered by default.

40 changes: 6 additions & 34 deletions src/v/cluster/archival/archival_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,21 +90,16 @@ class archival_policy {
std::optional<segment_time_limit> limit = std::nullopt,
ss::io_priority_class io_priority = ss::default_priority_class());

/// \brief regurn next upload candidate
///
/// \param begin_inclusive is an inclusive begining of the range
/// \param end_exclusive is an exclusive end of the range
/// \param lm is a log manager
/// \return initializd struct on success, empty struct on failure
ss::future<candidate_creation_result> get_next_candidate(
ss::future<candidate_creation_result> get_next_compacted_segment(
model::offset begin_inclusive,
model::offset end_exclusive,
std::optional<model::offset> flush_offset,
ss::shared_ptr<storage::log>,
ss::shared_ptr<storage::log> log,
const cloud_storage::partition_manifest& manifest,
ss::lowres_clock::duration segment_lock_duration);

ss::future<candidate_creation_result> get_next_compacted_segment(
ss::future<candidate_creation_result> get_next_segment(
model::offset begin_inclusive,
model::offset end_exclusive,
std::optional<model::offset> flush_offset,
ss::shared_ptr<storage::log> log,
const cloud_storage::partition_manifest& manifest,
ss::lowres_clock::duration segment_lock_duration);
Expand All @@ -119,33 +114,10 @@ class archival_policy {
/// result in partial upload.
bool upload_deadline_reached();

struct lookup_result {
ss::lw_shared_ptr<storage::segment> segment;
const storage::ntp_config* ntp_conf;
bool forced;
};

lookup_result find_segment(
model::offset last_offset,
model::offset adjusted_lso,
std::optional<model::offset> flush_offset,
ss::shared_ptr<storage::log>);

model::ntp _ntp;
std::optional<segment_time_limit> _upload_limit;
std::optional<ss::lowres_clock::time_point> _upload_deadline;
ss::io_priority_class _io_priority;
};

/// This function computes offsets for the upload (inc. file offets)
/// If the full segment is uploaded the segment is not scanned.
/// If the upload is partial, the partial scan will be performed if
/// the segment has the index and full scan otherwise.
ss::future<std::optional<std::error_code>> get_file_range(
model::offset begin_inclusive,
std::optional<model::offset> end_inclusive,
ss::lw_shared_ptr<storage::segment> segment,
ss::lw_shared_ptr<upload_candidate> upl,
ss::io_priority_class io_priority);

} // namespace archival
11 changes: 7 additions & 4 deletions src/v/cluster/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1703,17 +1703,20 @@ ntp_archiver::schedule_single_upload(const upload_context& upload_ctx) {

switch (upload_ctx.upload_kind) {
case segment_upload_kind::non_compacted:
candidate_result = co_await _policy.get_next_candidate(
candidate_result = co_await _policy.get_next_segment(
start_upload_offset,
last_stable_offset,
_flush_uploads_offset,
log,
manifest(),
_conf->segment_upload_timeout());
break;
case segment_upload_kind::compacted:
const auto& m = manifest();
candidate_result = co_await _policy.get_next_compacted_segment(
start_upload_offset, log, m, _conf->segment_upload_timeout());
start_upload_offset,
log,
manifest(),
_conf->segment_upload_timeout());
break;
}

Expand Down Expand Up @@ -3275,7 +3278,7 @@ ntp_archiver::find_reupload_candidate(manifest_scanner_t scanner) {
run->meta.size_bytes,
run->meta.committed_offset);
collector.collect_segments(
segment_collector_mode::collect_non_compacted);
segment_collector_mode::non_compacted_reupload);
auto candidate = co_await collector.make_upload_candidate(
_conf->upload_io_priority, _conf->segment_upload_timeout());

Expand Down
Loading