Skip to content

feat(object_store): add retry for Minio SlowDown and TooManyRequests errors #14739

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1305,6 +1305,11 @@ command = "target/${BUILD_MODE_DIR}/risedev-dev"
args = ["${@}"]
description = "Clean data and start a full RisingWave dev cluster using risedev-dev"

[tasks.ci-kill-no-dump-logs]
category = "RiseDev - CI"
dependencies = ["k", "check-logs", "wait-processes-exit"]
description = "Kill cluster and check logs, do not dump logs"

[tasks.ci-kill]
category = "RiseDev - CI"
dependencies = ["k", "l", "check-logs", "wait-processes-exit"]
Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/backfill-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ download_and_prepare_rw "$profile" source

################ TESTS

profile=$profile ./ci/scripts/run-backfill-tests.sh
BUILDKITE=${BUILDKITE:-} profile=$profile ./ci/scripts/run-backfill-tests.sh
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I ask what this change is for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's to parameterize the script, so it runs differently locally and in ci.

In ci: cluster configuration does not include monitoring stack.
Locally: cluster configuration includes monitoring.

55 changes: 34 additions & 21 deletions ci/scripts/run-backfill-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,17 @@ BACKGROUND_DDL_DIR=$TEST_DIR/background_ddl
COMMON_DIR=$BACKGROUND_DDL_DIR/common

CLUSTER_PROFILE='ci-1cn-1fe-kafka-with-recovery'
echo "--- Configuring cluster profiles"
if [[ -n "${BUILDKITE:-}" ]]; then
RUNTIME_CLUSTER_PROFILE='ci-3cn-1fe-with-monitoring'
else
echo "Running in buildkite"
RUNTIME_CLUSTER_PROFILE='ci-3cn-1fe'
MINIO_RATE_LIMIT_CLUSTER_PROFILE='ci-3cn-1fe-with-minio-rate-limit'
else
echo "Running locally"
RUNTIME_CLUSTER_PROFILE='ci-3cn-1fe-with-monitoring'
MINIO_RATE_LIMIT_CLUSTER_PROFILE='ci-3cn-1fe-with-monitoring-and-minio-rate-limit'
fi
export RUST_LOG="info,risingwave_meta::barrier::progress=debug,risingwave_meta::rpc::ddl_controller=debug"
export RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \

run_sql_file() {
psql -h localhost -p 4566 -d dev -U root -f "$@"
Expand Down Expand Up @@ -60,8 +65,8 @@ rename_logs_with_prefix() {
}

kill_cluster() {
cargo make kill
cargo make wait-processes-exit
cargo make ci-kill-no-dump-logs
wait
}

restart_cluster() {
Expand Down Expand Up @@ -150,7 +155,6 @@ test_backfill_tombstone() {
./risedev psql -c "CREATE MATERIALIZED VIEW m1 as select * from tomb;"
echo "--- Kill cluster"
kill_cluster
cargo make wait-processes-exit
wait
}

Expand All @@ -171,9 +175,7 @@ test_replication_with_column_pruning() {
run_sql_file "$PARENT_PATH"/sql/backfill/replication_with_column_pruning/select.sql </dev/null
run_sql_file "$PARENT_PATH"/sql/backfill/replication_with_column_pruning/drop.sql
echo "--- Kill cluster"
cargo make kill
cargo make wait-processes-exit
wait
kill_cluster
}

# Test sink backfill recovery
Expand All @@ -200,13 +202,11 @@ test_sink_backfill_recovery() {

# Verify data matches upstream table.
sqllogictest -p 4566 -d dev 'e2e_test/backfill/sink/validate_sink.slt'
cargo make kill
cargo make wait-processes-exit
wait
kill_cluster
}

test_arrangement_backfill_snapshot_and_upstream_runtime() {
echo "--- e2e, test_backfill_snapshot_and_upstream_runtime"
echo "--- e2e, test_arrangement_backfill_snapshot_and_upstream_runtime, $RUNTIME_CLUSTER_PROFILE"
cargo make ci-start $RUNTIME_CLUSTER_PROFILE
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_table.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert_snapshot.slt'
Expand All @@ -218,12 +218,11 @@ test_arrangement_backfill_snapshot_and_upstream_runtime() {

sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/validate_rows_arrangement.slt'

cargo make kill
cargo make wait-processes-exit
cargo make ci-kill
}

test_no_shuffle_backfill_snapshot_and_upstream_runtime() {
echo "--- e2e, test_backfill_snapshot_and_upstream_runtime"
echo "--- e2e, test_no_shuffle_backfill_snapshot_and_upstream_runtime, $RUNTIME_CLUSTER_PROFILE"
cargo make ci-start $RUNTIME_CLUSTER_PROFILE
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_table.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert_snapshot.slt'
Expand All @@ -235,12 +234,11 @@ test_no_shuffle_backfill_snapshot_and_upstream_runtime() {

sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/validate_rows_no_shuffle.slt'

cargo make kill
cargo make wait-processes-exit
kill_cluster
}

test_backfill_snapshot_runtime() {
echo "--- e2e, test_backfill_snapshot_runtime"
echo "--- e2e, test_backfill_snapshot_runtime, $RUNTIME_CLUSTER_PROFILE"
cargo make ci-start $RUNTIME_CLUSTER_PROFILE
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_table.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert_snapshot.slt'
Expand All @@ -249,8 +247,22 @@ test_backfill_snapshot_runtime() {
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/validate_rows_no_shuffle.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/validate_rows_arrangement.slt'

cargo make kill
cargo make wait-processes-exit
kill_cluster
}

# Throttle the storage throughput.
# Arrangement Backfill should not fail because of this.
test_backfill_snapshot_with_limited_storage_throughput() {
echo "--- e2e, test_backfill_snapshot_with_limited_storage_throughput, $MINIO_RATE_LIMIT_CLUSTER_PROFILE"
cargo make ci-start $MINIO_RATE_LIMIT_CLUSTER_PROFILE
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_table.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert_snapshot.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_arrangement_backfill_mv.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_no_shuffle_mv.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/validate_rows_no_shuffle.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/validate_rows_arrangement.slt'

kill_cluster
}

main() {
Expand All @@ -270,6 +282,7 @@ main() {

# Backfill will happen in sequence here.
test_backfill_snapshot_runtime
test_backfill_snapshot_with_limited_storage_throughput

# No upstream only tests, because if there's no snapshot,
# Backfill will complete almost immediately.
Expand Down
2 changes: 1 addition & 1 deletion ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ steps:

- label: "Backfill tests"
key: "backfill-tests"
command: "ci/scripts/backfill-test.sh -p ci-release"
command: "BUILDKITE=${BUILDKITE:-} ci/scripts/backfill-test.sh -p ci-release"
if: |
!(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-backfill-tests"
Expand Down
2 changes: 1 addition & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ steps:
timeout_in_minutes: 40

- label: "Backfill tests"
command: "ci/scripts/backfill-test.sh -p ci-dev"
command: "BUILDKITE=${BUILDKITE:-} ci/scripts/backfill-test.sh -p ci-dev"
if: build.pull_request.labels includes "ci/run-backfill-tests" || build.env("CI_STEPS") =~ /(^|,)backfill-tests?(,|$$)/
depends_on:
- "build"
Expand Down
50 changes: 50 additions & 0 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,56 @@ profile:
- use: prometheus
- use: grafana

ci-3cn-1fe-with-minio-rate-limit:
config-path: src/config/ci.toml
steps:
- use: minio
api-requests-max: 18
api-requests-deadline: 1s
- use: etcd
unsafe-no-fsync: true
- use: meta-node
- use: compute-node
port: 5687
exporter-port: 1222
enable-tiered-cache: true
- use: compute-node
port: 5688
exporter-port: 1223
enable-tiered-cache: true
- use: compute-node
port: 5689
exporter-port: 1224
enable-tiered-cache: true
- use: frontend
- use: compactor

ci-3cn-1fe-with-monitoring-and-minio-rate-limit:
config-path: src/config/ci.toml
steps:
- use: minio
api-requests-max: 18
api-requests-deadline: 1s
- use: etcd
unsafe-no-fsync: true
- use: meta-node
- use: compute-node
port: 5687
exporter-port: 1222
enable-tiered-cache: true
- use: compute-node
port: 5688
exporter-port: 1223
enable-tiered-cache: true
- use: compute-node
port: 5689
exporter-port: 1224
enable-tiered-cache: true
- use: frontend
- use: compactor
- use: prometheus
- use: grafana

ci-3cn-3fe:
config-path: src/config/ci.toml
steps:
Expand Down
34 changes: 30 additions & 4 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -921,9 +921,29 @@ pub struct S3ObjectStoreConfig {
pub object_store_req_retry_max_delay_ms: u64,
#[serde(default = "default::object_store_config::s3::object_store_req_retry_max_attempts")]
pub object_store_req_retry_max_attempts: usize,
/// Whether to retry s3 sdk error from which no error metadata is provided.
#[serde(default = "default::object_store_config::s3::retry_unknown_service_error")]
/// For backwards compatibility, users should use `S3ObjectStoreDeveloperConfig` instead.
#[serde(
default = "default::object_store_config::s3::developer::object_store_retry_unknown_service_error"
)]
pub retry_unknown_service_error: bool,
#[serde(default)]
pub developer: S3ObjectStoreDeveloperConfig,
}

/// The subsections `[storage.object_store.s3.developer]`.
#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
pub struct S3ObjectStoreDeveloperConfig {
/// Whether to retry s3 sdk error from which no error metadata is provided.
#[serde(
default = "default::object_store_config::s3::developer::object_store_retry_unknown_service_error"
)]
pub object_store_retry_unknown_service_error: bool,
/// An array of error codes that should be retried.
/// e.g. `["SlowDown", "TooManyRequests"]`
#[serde(
default = "default::object_store_config::s3::developer::object_store_retryable_service_error_codes"
)]
pub object_store_retryable_service_error_codes: Vec<String>,
}

impl SystemConfig {
Expand Down Expand Up @@ -1526,8 +1546,14 @@ pub mod default {
DEFAULT_RETRY_MAX_ATTEMPTS
}

pub fn retry_unknown_service_error() -> bool {
false
pub mod developer {
pub fn object_store_retry_unknown_service_error() -> bool {
false
}

pub fn object_store_retryable_service_error_codes() -> Vec<String> {
vec!["SlowDown".into(), "TooManyRequests".into()]
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/config/ci.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ imm_merge_threshold = 2
[system]
barrier_interval_ms = 250
checkpoint_frequency = 5
max_concurrent_creating_streaming_jobs = 0
max_concurrent_creating_streaming_jobs = 0
4 changes: 4 additions & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ object_store_req_retry_max_delay_ms = 10000
object_store_req_retry_max_attempts = 8
retry_unknown_service_error = false

[storage.object_store.s3.developer]
object_store_retry_unknown_service_error = false
object_store_retryable_service_error_codes = ["SlowDown", "TooManyRequests"]

[system]
barrier_interval_ms = 1000
checkpoint_frequency = 1
Expand Down
30 changes: 24 additions & 6 deletions src/object_store/src/object/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -938,12 +938,18 @@ impl From<RetryError> for ObjectError {

struct RetryCondition {
retry_unknown_service_error: bool,
retryable_service_error_codes: Vec<String>,
}

impl RetryCondition {
fn new(config: &S3ObjectStoreConfig) -> Self {
Self {
retry_unknown_service_error: config.retry_unknown_service_error,
retry_unknown_service_error: config.developer.object_store_retry_unknown_service_error
|| config.retry_unknown_service_error,
retryable_service_error_codes: config
.developer
.object_store_retryable_service_error_codes
.clone(),
}
}
}
Expand All @@ -958,12 +964,24 @@ impl tokio_retry::Condition<RetryError> for RetryCondition {
return true;
}
}
SdkError::ServiceError(e) => {
if self.retry_unknown_service_error && e.err().code().is_none() {
tracing::warn!(target: "unknown_service_error", "{e:?} occurs, retry S3 get_object request.");
return true;
SdkError::ServiceError(e) => match e.err().code() {
None => {
if self.retry_unknown_service_error {
tracing::warn!(target: "unknown_service_error", "{e:?} occurs, retry S3 get_object request.");
return true;
}
}
}
Some(code) => {
if self
.retryable_service_error_codes
.iter()
.any(|s| s.as_str().eq_ignore_ascii_case(code))
{
tracing::warn!(target: "retryable_service_error", "{e:?} occurs, retry S3 get_object request.");
return true;
}
}
},
_ => {}
},
Either::Right(_) => {
Expand Down