Skip to content

Commit 3b30cb1

Browse files
feat(services/s3): Support request payer option (#6070)
* feat(services/s3): Support request payer option * chore: remove req shadowing where it was not necessary
1 parent 85e6ea5 commit 3b30cb1

File tree

4 files changed

+104
-23
lines changed

4 files changed

+104
-23
lines changed

core/src/services/s3/backend.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,14 @@ impl S3Builder {
420420
self
421421
}
422422

423+
/// Enable request payer so that OpenDAL will send requests with `x-amz-request-payer` header.
424+
///
425+
/// With this option the client accepts to pay for the request and data transfer costs.
426+
pub fn enable_request_payer(mut self) -> Self {
427+
self.config.enable_request_payer = true;
428+
self
429+
}
430+
423431
/// Disable load credential from ec2 metadata.
424432
///
425433
/// This option is used to disable the default behavior of opendal
@@ -1005,6 +1013,7 @@ impl Builder for S3Builder {
10051013
default_storage_class,
10061014
allow_anonymous: self.config.allow_anonymous,
10071015
disable_list_objects_v2: self.config.disable_list_objects_v2,
1016+
enable_request_payer: self.config.enable_request_payer,
10081017
signer,
10091018
loader,
10101019
credential_loaded: AtomicBool::new(false),

core/src/services/s3/config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,9 @@ pub struct S3Config {
193193
/// However, some legacy services do not yet support V2.
194194
/// This option allows users to switch back to the older List Objects V1.
195195
pub disable_list_objects_v2: bool,
196+
197+
/// Indicates whether the client agrees to pay for the requests made to the S3 bucket.
198+
pub enable_request_payer: bool,
196199
}
197200

198201
impl Debug for S3Config {

core/src/services/s3/core.rs

Lines changed: 91 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ pub mod constants {
5353
pub const X_AMZ_COPY_SOURCE: &str = "x-amz-copy-source";
5454

5555
pub const X_AMZ_SERVER_SIDE_ENCRYPTION: &str = "x-amz-server-side-encryption";
56+
pub const X_AMZ_SERVER_REQUEST_PAYER: (&str, &str) = ("x-amz-request-payer", "requester");
5657
pub const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: &str =
5758
"x-amz-server-side-encryption-customer-algorithm";
5859
pub const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY: &str =
@@ -98,6 +99,7 @@ pub struct S3Core {
9899
pub default_storage_class: Option<HeaderValue>,
99100
pub allow_anonymous: bool,
100101
pub disable_list_objects_v2: bool,
102+
pub enable_request_payer: bool,
101103

102104
pub signer: AwsV4Signer,
103105
pub loader: Box<dyn AwsCredentialLoad>,
@@ -340,6 +342,19 @@ impl S3Core {
340342
}
341343
req
342344
}
345+
346+
pub fn insert_request_payer_header(
347+
&self,
348+
mut req: http::request::Builder,
349+
) -> http::request::Builder {
350+
if self.enable_request_payer {
351+
req = req.header(
352+
HeaderName::from_static(constants::X_AMZ_SERVER_REQUEST_PAYER.0),
353+
HeaderValue::from_static(constants::X_AMZ_SERVER_REQUEST_PAYER.1),
354+
);
355+
}
356+
req
357+
}
343358
}
344359

345360
impl S3Core {
@@ -406,8 +421,11 @@ impl S3Core {
406421
);
407422
}
408423

424+
// Set request payer header if enabled.
425+
req = self.insert_request_payer_header(req);
426+
409427
// Inject operation to the request.
410-
let req = req.extension(Operation::Stat);
428+
req = req.extension(Operation::Stat);
411429

412430
let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
413431

@@ -487,12 +505,15 @@ impl S3Core {
487505
);
488506
}
489507

508+
// Set request payer header if enabled.
509+
req = self.insert_request_payer_header(req);
510+
490511
// Set SSE headers.
491512
// TODO: how will this work with presign?
492513
req = self.insert_sse_headers(req, false);
493514

494515
// Inject operation to the request.
495-
let req = req.extension(Operation::Read);
516+
req = req.extension(Operation::Read);
496517

497518
let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
498519

@@ -527,6 +548,9 @@ impl S3Core {
527548

528549
req = self.insert_metadata_headers(req, size, args);
529550

551+
// Set request payer header if enabled.
552+
req = self.insert_request_payer_header(req);
553+
530554
// Set SSE headers.
531555
req = self.insert_sse_headers(req, true);
532556

@@ -537,7 +561,7 @@ impl S3Core {
537561
}
538562

539563
// Inject operation to the request.
540-
let req = req.extension(Operation::Write);
564+
req = req.extension(Operation::Write);
541565

542566
// Set body
543567
let req = req.body(body).map_err(new_request_build_error)?;
@@ -567,11 +591,14 @@ impl S3Core {
567591

568592
req = req.header(constants::X_AMZ_WRITE_OFFSET_BYTES, position.to_string());
569593

594+
// Set request payer header if enabled.
595+
req = self.insert_request_payer_header(req);
596+
570597
// Set SSE headers.
571598
req = self.insert_sse_headers(req, true);
572599

573600
// Inject operation to the request.
574-
let req = req.extension(Operation::Write);
601+
req = req.extension(Operation::Write);
575602

576603
// Set body
577604
let req = req.body(body).map_err(new_request_build_error)?;
@@ -606,7 +633,12 @@ impl S3Core {
606633
url.push_str(&format!("?{}", query_args.join("&")));
607634
}
608635

609-
let mut req = Request::delete(&url)
636+
let mut req = Request::delete(&url);
637+
638+
// Set request payer header if enabled.
639+
req = self.insert_request_payer_header(req);
640+
641+
let mut req = req
610642
// Inject operation to the request.
611643
.extension(Operation::Delete)
612644
.body(Buffer::new())
@@ -665,6 +697,9 @@ impl S3Core {
665697
)
666698
}
667699

700+
// Set request payer header if enabled.
701+
req = self.insert_request_payer_header(req);
702+
668703
let mut req = req
669704
// Inject operation to the request.
670705
.extension(Operation::Copy)
@@ -701,7 +736,12 @@ impl S3Core {
701736
url = url.push("marker", &percent_encode_path(marker));
702737
}
703738

704-
let mut req = Request::get(url.finish())
739+
let mut req = Request::get(url.finish());
740+
741+
// Set request payer header if enabled.
742+
req = self.insert_request_payer_header(req);
743+
744+
let mut req = req
705745
// Inject operation to the request.
706746
.extension(Operation::List)
707747
.body(Buffer::new())
@@ -748,7 +788,12 @@ impl S3Core {
748788
);
749789
}
750790

751-
let mut req = Request::get(url.finish())
791+
let mut req = Request::get(url.finish());
792+
793+
// Set request payer header if enabled.
794+
req = self.insert_request_payer_header(req);
795+
796+
let mut req = req
752797
// Inject operation to the request.
753798
.extension(Operation::List)
754799
.body(Buffer::new())
@@ -794,14 +839,17 @@ impl S3Core {
794839
}
795840
}
796841

842+
// Set request payer header if enabled.
843+
req = self.insert_request_payer_header(req);
844+
797845
// Set SSE headers.
798-
let req = self.insert_sse_headers(req, true);
846+
req = self.insert_sse_headers(req, true);
799847

800848
// Set SSE headers.
801-
let req = self.insert_checksum_type_header(req);
849+
req = self.insert_checksum_type_header(req);
802850

803851
// Inject operation to the request.
804-
let req = req.extension(Operation::Write);
852+
req = req.extension(Operation::Write);
805853

806854
let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
807855

@@ -833,6 +881,9 @@ impl S3Core {
833881

834882
req = req.header(CONTENT_LENGTH, size);
835883

884+
// Set request payer header if enabled.
885+
req = self.insert_request_payer_header(req);
886+
836887
// Set SSE headers.
837888
req = self.insert_sse_headers(req, true);
838889

@@ -842,7 +893,7 @@ impl S3Core {
842893
}
843894

844895
// Inject operation to the request.
845-
let req = req.extension(Operation::Write);
896+
req = req.extension(Operation::Write);
846897

847898
// Set body
848899
let req = req.body(body).map_err(new_request_build_error)?;
@@ -865,20 +916,23 @@ impl S3Core {
865916
percent_encode_path(upload_id)
866917
);
867918

868-
let req = Request::post(&url);
919+
let mut req = Request::post(&url);
869920

870921
// Set SSE headers.
871-
let req = self.insert_sse_headers(req, true);
922+
req = self.insert_sse_headers(req, true);
872923

873924
let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest { part: parts })
874925
.map_err(new_xml_serialize_error)?;
875926
// Make sure content length has been set to avoid post with chunked encoding.
876-
let req = req.header(CONTENT_LENGTH, content.len());
927+
req = req.header(CONTENT_LENGTH, content.len());
877928
// Set content-type to `application/xml` to avoid mixed with form post.
878-
let req = req.header(CONTENT_TYPE, "application/xml");
929+
req = req.header(CONTENT_TYPE, "application/xml");
930+
931+
// Set request payer header if enabled.
932+
req = self.insert_request_payer_header(req);
879933

880934
// Inject operation to the request.
881-
let req = req.extension(Operation::Write);
935+
req = req.extension(Operation::Write);
882936

883937
let mut req = req
884938
.body(Buffer::from(Bytes::from(content)))
@@ -904,11 +958,17 @@ impl S3Core {
904958
percent_encode_path(upload_id)
905959
);
906960

907-
let mut req = Request::delete(&url)
961+
let mut req = Request::delete(&url);
962+
963+
// Set request payer header if enabled.
964+
req = self.insert_request_payer_header(req);
965+
966+
let mut req = req
908967
// Inject operation to the request.
909968
.extension(Operation::Write)
910969
.body(Buffer::new())
911970
.map_err(new_request_build_error)?;
971+
912972
self.sign(&mut req).await?;
913973
self.send(req).await
914974
}
@@ -919,7 +979,7 @@ impl S3Core {
919979
) -> Result<Response<Buffer>> {
920980
let url = format!("{}/?delete", self.endpoint);
921981

922-
let req = Request::post(&url);
982+
let mut req = Request::post(&url);
923983

924984
let content = quick_xml::se::to_string(&DeleteObjectsRequest {
925985
object: paths
@@ -933,14 +993,17 @@ impl S3Core {
933993
.map_err(new_xml_serialize_error)?;
934994

935995
// Make sure content length has been set to avoid post with chunked encoding.
936-
let req = req.header(CONTENT_LENGTH, content.len());
996+
req = req.header(CONTENT_LENGTH, content.len());
937997
// Set content-type to `application/xml` to avoid mixed with form post.
938-
let req = req.header(CONTENT_TYPE, "application/xml");
998+
req = req.header(CONTENT_TYPE, "application/xml");
939999
// Set content-md5 as required by API.
940-
let req = req.header("CONTENT-MD5", format_content_md5(content.as_bytes()));
1000+
req = req.header("CONTENT-MD5", format_content_md5(content.as_bytes()));
1001+
1002+
// Set request payer header if enabled.
1003+
req = self.insert_request_payer_header(req);
9411004

9421005
// Inject operation to the request.
943-
let req = req.extension(Operation::Delete);
1006+
req = req.extension(Operation::Delete);
9441007

9451008
let mut req = req
9461009
.body(Buffer::from(Bytes::from(content)))
@@ -986,7 +1049,12 @@ impl S3Core {
9861049
.expect("write into string must succeed");
9871050
}
9881051

989-
let mut req = Request::get(&url)
1052+
let mut req = Request::get(&url);
1053+
1054+
// Set request payer header if enabled.
1055+
req = self.insert_request_payer_header(req);
1056+
1057+
let mut req = req
9901058
// Inject operation to the request.
9911059
.extension(Operation::List)
9921060
.body(Buffer::new())

core/src/services/s3/docs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ This service can be used to:
3232
- `disable_config_load`: Disable aws config load from env.
3333
- `enable_virtual_host_style`: Enable virtual host style.
3434
- `disable_write_with_if_match`: Disable write with if match.
35+
- `enable_request_payer`: Enable the request payer for backend.
3536

3637
Refer to [`S3Builder`]'s public API docs for more information.
3738

0 commit comments

Comments
 (0)