Skip to content

[opt](storage) Add log and metric when aws/azure sdk do retry operation #51485

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,18 @@ DEFINE_mInt32(zone_map_row_num_threshold, "20");
// Info = 4,
// Debug = 5,
// Trace = 6
DEFINE_Int32(aws_log_level, "2");
DEFINE_Int32(aws_log_level, "3");
DEFINE_Validator(aws_log_level,
[](const int config) -> bool { return config >= 0 && config <= 6; });

// azure sdk log level
// Verbose = 1,
// Informational = 2,
// Warning = 3,
// Error = 4
DEFINE_Int32(azure_log_level, "3");
DEFINE_Validator(azure_log_level,
[](const int config) -> bool { return config >= 1 && config <= 4; });

// the buffer size when read data from remote storage like s3
DEFINE_mInt32(remote_storage_read_buffer_mb, "16");
Expand Down
7 changes: 7 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,13 @@ DECLARE_mInt32(zone_map_row_num_threshold);
// Trace = 6
DECLARE_Int32(aws_log_level);

// azure sdk log level
// Verbose = 1,
// Informational = 2,
// Warning = 3,
// Error = 4
DECLARE_Int32(azure_log_level);

// the buffer size when read data from remote storage like s3
DECLARE_mInt32(remote_storage_read_buffer_mb);

Expand Down
36 changes: 35 additions & 1 deletion be/src/util/s3_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

#include <atomic>
#ifdef USE_AZURE
#include <azure/core/diagnostics/logger.hpp>
#include <azure/storage/blobs/blob_container_client.hpp>
#endif
#include <cstdlib>
Expand Down Expand Up @@ -150,6 +151,33 @@ S3ClientFactory::S3ClientFactory() {
config::s3_put_token_per_second, config::s3_put_bucket_tokens,
config::s3_put_token_limit,
metric_func_factory(put_rate_limit_ns, put_rate_limit_exceed_req_num))};

#ifdef USE_AZURE
auto azureLogLevel =
static_cast<Azure::Core::Diagnostics::Logger::Level>(config::azure_log_level);
Azure::Core::Diagnostics::Logger::SetLevel(azureLogLevel);
Azure::Core::Diagnostics::Logger::SetListener(
[&](Azure::Core::Diagnostics::Logger::Level level, const std::string& message) {
switch (level) {
case Azure::Core::Diagnostics::Logger::Level::Verbose:
LOG(INFO) << message;
break;
case Azure::Core::Diagnostics::Logger::Level::Informational:
LOG(INFO) << message;
break;
case Azure::Core::Diagnostics::Logger::Level::Warning:
LOG(WARNING) << message;
break;
case Azure::Core::Diagnostics::Logger::Level::Error:
LOG(ERROR) << message;
break;
default:
LOG(WARNING) << "Unknown level: " << static_cast<int>(level)
<< ", message: " << message;
break;
}
});
#endif
}

S3ClientFactory::~S3ClientFactory() {
Expand Down Expand Up @@ -204,7 +232,13 @@ std::shared_ptr<io::ObjStorageClient> S3ClientFactory::_create_azure_client(
}
}

auto containerClient = std::make_shared<Azure::Storage::Blobs::BlobContainerClient>(uri, cred);
Azure::Storage::Blobs::BlobClientOptions options;
options.Retry.StatusCodes.insert(Azure::Core::Http::HttpStatusCode::TooManyRequests);
options.Retry.MaxRetries = config::max_s3_client_retry;
options.PerRetryPolicies.emplace_back(std::make_unique<AzureRetryRecordPolicy>());

auto containerClient = std::make_shared<Azure::Storage::Blobs::BlobContainerClient>(
uri, cred, std::move(options));
LOG_INFO("create one azure client with {}", s3_conf.to_string());
return std::make_shared<io::AzureObjStorageClient>(std::move(containerClient));
#else
Expand Down
18 changes: 11 additions & 7 deletions be/test/io/fs/azure_obj_storage_client_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "io/fs/file_system.h"
#include "io/fs/obj_storage_client.h"
#include "util/s3_util.h"

#ifdef USE_AZURE
#include <azure/storage/blobs.hpp>
Expand Down Expand Up @@ -49,13 +50,16 @@ class AzureObjStorageClientTest : public testing::Test {
std::string accountKey = std::getenv("AZURE_ACCOUNT_KEY");
std::string containerName = std::getenv("AZURE_CONTAINER_NAME");

auto cred = std::make_shared<Azure::Storage::StorageSharedKeyCredential>(accountName,
accountKey);
const std::string uri =
fmt::format("https://{}.blob.core.windows.net/{}", accountName, containerName);
auto containerClient = std::make_shared<BlobContainerClient>(uri, cred);
AzureObjStorageClientTest::obj_storage_client =
std::make_shared<io::AzureObjStorageClient>(std::move(containerClient));
// Initialize Azure SDK
[[maybe_unused]] auto& s3ClientFactory = S3ClientFactory::instance();

AzureObjStorageClientTest::obj_storage_client = S3ClientFactory::instance().create(
{.endpoint = fmt::format("https://{}.blob.core.windows.net", accountName),
.region = "dummy-region",
.ak = accountName,
.sk = accountKey,
.bucket = containerName,
.provider = io::ObjStorageType::AZURE});
}

void SetUp() override {
Expand Down
12 changes: 11 additions & 1 deletion cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,17 @@ CONF_mBool(enable_batch_get_mow_tablet_stats_and_meta, "true");
// Info = 4,
// Debug = 5,
// Trace = 6
CONF_Int32(aws_log_level, "2");
CONF_Int32(aws_log_level, "3");
CONF_Validator(aws_log_level, [](const int config) -> bool { return config >= 0 && config <= 6; });

// azure sdk log level
// Verbose = 1,
// Informational = 2,
// Warning = 3,
// Error = 4
CONF_Int32(azure_log_level, "3");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we reuse aws_log_level to reduce #configurations?

CONF_Validator(azure_log_level,
[](const int config) -> bool { return config >= 1 && config <= 4; });

// ca_cert_file is in this path by default, Normally no modification is required
// ca cert default path is different from different OS
Expand Down
31 changes: 29 additions & 2 deletions cloud/src/recycler/s3_accessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include <algorithm>
#ifdef USE_AZURE
#include <azure/core/diagnostics/logger.hpp>
#include <azure/storage/blobs/blob_container_client.hpp>
#include <azure/storage/common/storage_credential.hpp>
#endif
Expand Down Expand Up @@ -120,6 +121,33 @@ class S3Environment {
return std::make_shared<DorisAWSLogger>(logLevel);
};
Aws::InitAPI(aws_options_);

#ifdef USE_AZURE
auto azureLogLevel =
static_cast<Azure::Core::Diagnostics::Logger::Level>(config::azure_log_level);
Azure::Core::Diagnostics::Logger::SetLevel(azureLogLevel);
Azure::Core::Diagnostics::Logger::SetListener(
[&](Azure::Core::Diagnostics::Logger::Level level, const std::string& message) {
switch (level) {
case Azure::Core::Diagnostics::Logger::Level::Verbose:
LOG(INFO) << message;
break;
case Azure::Core::Diagnostics::Logger::Level::Informational:
LOG(INFO) << message;
break;
case Azure::Core::Diagnostics::Logger::Level::Warning:
LOG(WARNING) << message;
break;
case Azure::Core::Diagnostics::Logger::Level::Error:
LOG(ERROR) << message;
break;
default:
LOG(WARNING) << "Unknown level: " << static_cast<int>(level)
<< ", message: " << message;
break;
}
});
#endif
}

~S3Environment() { Aws::ShutdownAPI(aws_options_); }
Expand Down Expand Up @@ -308,8 +336,7 @@ int S3Accessor::init() {
// Within the RetryPolicy, the nextPolicy is called multiple times inside a loop.
// All policies in the PerRetryPolicies are downstream of the RetryPolicy.
// Therefore, you only need to add a policy to check if the response code is 429 and if the retry count meets the condition, it can record the retry count.
options.PerRetryPolicies.emplace_back(
std::make_unique<AzureRetryRecordPolicy>(config::max_s3_client_retry));
options.PerRetryPolicies.emplace_back(std::make_unique<AzureRetryRecordPolicy>());
auto container_client = std::make_shared<Azure::Storage::Blobs::BlobContainerClient>(
uri_, cred, std::move(options));
// uri format for debug: ${scheme}://${ak}.blob.core.windows.net/${bucket}/${prefix}
Expand Down
2 changes: 1 addition & 1 deletion common/cpp/aws_logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

#include <aws/core/utils/logging/LogLevel.h>
#include <aws/core/utils/logging/LogSystemInterface.h>
#include <glog/logging.h> // IWYU pragma: export
#include <glog/logging.h>

class DorisAWSLogger final : public Aws::Utils::Logging::LogSystemInterface {
public:
Expand Down
38 changes: 24 additions & 14 deletions common/cpp/obj_retry_strategy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@

#include <aws/core/http/HttpResponse.h>
#include <bvar/reducer.h>
#include <glog/logging.h>

namespace doris {

bvar::Adder<int64_t> s3_too_many_request_retry_cnt("s3_too_many_request_retry_cnt");
bvar::Adder<int64_t> object_request_retry_count("object_request_retry_count");

S3CustomRetryStrategy::S3CustomRetryStrategy(int maxRetries) : DefaultRetryStrategy(maxRetries) {}

Expand All @@ -35,33 +36,42 @@ bool S3CustomRetryStrategy::ShouldRetry(const Aws::Client::AWSError<Aws::Client:
}

if (Aws::Http::IsRetryableHttpResponseCode(error.GetResponseCode()) || error.ShouldRetry()) {
s3_too_many_request_retry_cnt << 1;
object_request_retry_count << 1;
LOG(INFO) << "retry due to error: " << error << ", attempt: " << attemptedRetries + 1 << "/"
<< m_maxRetries;
return true;
}

return false;
}
#ifdef USE_AZURE
AzureRetryRecordPolicy::AzureRetryRecordPolicy(int retry_cnt) : retry_cnt(retry_cnt) {}

AzureRetryRecordPolicy::~AzureRetryRecordPolicy() = default;

std::unique_ptr<Azure::Core::Http::RawResponse> AzureRetryRecordPolicy::Send(
Azure::Core::Http::Request& request, Azure::Core::Http::Policies::NextHttpPolicy nextPolicy,
Azure::Core::Context const& context) const {
auto resp = nextPolicy.Send(request, context);
if (retry_cnt != 0 &&
resp->GetStatusCode() == Azure::Core::Http::HttpStatusCode::TooManyRequests) {
retry_cnt--;
s3_too_many_request_retry_cnt << 1;
// https://learn.microsoft.com/en-us/azure/developer/cpp/sdk/fundamentals/http-pipelines-and-retries

std::unique_ptr<Azure::Core::Http::RawResponse> response = nextPolicy.Send(request, context);
int32_t retry_count =
Azure::Core::Http::Policies::_internal::RetryPolicy::GetRetryCount(context);

if (static_cast<int>(response->GetStatusCode()) > 299 ||
static_cast<int>(response->GetStatusCode()) < 200) {
if (retry_count > 0) {
object_request_retry_count << 1;
}

// If the response is not successful, we log the retry attempt and status code.
LOG(INFO) << "azure retry retry_count: " << retry_count
<< ", status code: " << static_cast<int>(response->GetStatusCode())
<< ", reason: " << response->GetReasonPhrase();
}
return resp;

return response;
}

std::unique_ptr<AzureRetryRecordPolicy::HttpPolicy> AzureRetryRecordPolicy::Clone() const {
auto ret = std::make_unique<AzureRetryRecordPolicy>(*this);
ret->retry_cnt = 0;
return ret;
return std::make_unique<AzureRetryRecordPolicy>(*this);
}
#endif
} // namespace doris
8 changes: 3 additions & 5 deletions common/cpp/obj_retry_strategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,14 @@ class S3CustomRetryStrategy final : public Aws::Client::DefaultRetryStrategy {
#ifdef USE_AZURE
class AzureRetryRecordPolicy final : public Azure::Core::Http::Policies::HttpPolicy {
public:
AzureRetryRecordPolicy(int retry_cnt);
~AzureRetryRecordPolicy() override;
AzureRetryRecordPolicy() = default;
~AzureRetryRecordPolicy() override = default;

std::unique_ptr<HttpPolicy> Clone() const override;
std::unique_ptr<Azure::Core::Http::RawResponse> Send(
Azure::Core::Http::Request& request,
Azure::Core::Http::Policies::NextHttpPolicy nextPolicy,
Azure::Core::Context const& context) const override;

private:
mutable int retry_cnt;
};
#endif
} // namespace doris
9 changes: 8 additions & 1 deletion conf/be.conf
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ sys_log_level = INFO
# Debug = 5,
# Trace = 6
# Default to turn off aws sdk log, because aws sdk errors that need to be cared will be output through Doris logs
aws_log_level=2
aws_log_level = 3

# azure sdk log level
# Verbose = 1,
# Informational = 2,
# Warning = 3,
# Error = 4
azure_log_level = 3
## If you are not running in aws cloud, you can disable EC2 metadata
AWS_EC2_METADATA_DISABLED=true
Loading