Skip to content

refactor(services/aliyun_drive): Move raw request to core #6089

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Apr 27, 2025
12 changes: 1 addition & 11 deletions core/src/services/aliyun_drive/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ use std::sync::Arc;

use bytes::Buf;
use chrono::Utc;
use http::header;
use http::Request;
use http::Response;
use http::StatusCode;
use log::debug;
Expand Down Expand Up @@ -368,16 +366,8 @@ impl Access for AliyunDriveBackend {
let res = self.core.get_by_path(path).await?;
let file: AliyunDriveFile =
serde_json::from_reader(res.reader()).map_err(new_json_serialize_error)?;
let resp = self.core.download(&file.file_id, args.range()).await?;

let download_url = self.core.get_download_url(&file.file_id).await?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored this as @erickguan suggested #6089 (comment)

// TODO: this request should be done in core.rs and not here
let req = Request::get(&download_url)
.extension(Operation::Read)
.header(header::RANGE, args.range().to_header())
.body(Buffer::new())
.map_err(new_request_build_error)?;

let resp = self.core.info.http_client().fetch(req).await?;
let status = resp.status();
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
Expand Down
85 changes: 54 additions & 31 deletions core/src/services/aliyun_drive/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use bytes::Buf;
use chrono::Utc;
use http::header::HeaderValue;
use http::header::{self};
use http::Method;
use http::Request;
use http::{Method, Response};
use serde::Deserialize;
use serde::Serialize;
use tokio::sync::Mutex;
Expand Down Expand Up @@ -198,7 +198,6 @@ impl AliyunDriveCore {
})
.map_err(new_json_serialize_error)?;
let req = req
// Inject operation to the request.
.extension(Operation::Read)
.body(Buffer::from(body))
.map_err(new_request_build_error)?;
Expand Down Expand Up @@ -264,7 +263,6 @@ impl AliyunDriveCore {
})
.map_err(new_json_serialize_error)?;
let req = Request::post(format!("{}/adrive/v1.0/openFile/create", self.endpoint))
// Inject operation to the request.
.extension(Operation::Write)
.body(Buffer::from(body))
.map_err(new_request_build_error)?;
Expand All @@ -282,27 +280,40 @@ impl AliyunDriveCore {
.await
}

pub async fn get_download_url(&self, file_id: &str) -> Result<String> {
async fn get_download_url(&self, file_id: &str) -> Result<String> {
let (token, drive_id) = self.get_token_and_drive().await?;
let body = serde_json::to_vec(&FileRequest {
drive_id: &drive_id,
file_id,
})
.map_err(new_json_serialize_error)?;

let req = Request::post(format!(
"{}/adrive/v1.0/openFile/getDownloadUrl",
self.endpoint
))
// Inject operation to the request.
.extension(Operation::Read)
.body(Buffer::from(body))
.map_err(new_request_build_error)?;

let res = self.send(req, token.as_deref()).await?;

let output: GetDownloadUrlResponse =
serde_json::from_reader(res.reader()).map_err(new_json_serialize_error)?;

Ok(output.url)
}

pub async fn download(&self, file_id: &str, range: BytesRange) -> Result<Response<HttpBody>> {
let download_url = self.get_download_url(file_id).await?;
let req = Request::get(download_url)
.extension(Operation::Read)
.header(header::RANGE, range.to_header())
.body(Buffer::new())
.map_err(new_request_build_error)?;
self.info.http_client().fetch(req).await
}

pub async fn move_path(&self, file_id: &str, to_parent_file_id: &str) -> Result<()> {
let (token, drive_id) = self.get_token_and_drive().await?;
let body = serde_json::to_vec(&MovePathRequest {
Expand All @@ -313,7 +324,6 @@ impl AliyunDriveCore {
})
.map_err(new_json_serialize_error)?;
let req = Request::post(format!("{}/adrive/v1.0/openFile/move", self.endpoint))
// Inject operation to the request.
.extension(Operation::Write)
.body(Buffer::from(body))
.map_err(new_request_build_error)?;
Expand All @@ -331,7 +341,6 @@ impl AliyunDriveCore {
})
.map_err(new_json_serialize_error)?;
let req = Request::post(format!("{}/adrive/v1.0/openFile/update", self.endpoint))
// Inject operation to the request.
.extension(Operation::Write)
.body(Buffer::from(body))
.map_err(new_request_build_error)?;
Expand All @@ -354,7 +363,6 @@ impl AliyunDriveCore {
})
.map_err(new_json_serialize_error)?;
let req = Request::post(format!("{}/adrive/v1.0/openFile/copy", self.endpoint))
// Inject operation to the request.
.extension(Operation::Copy)
.body(Buffer::from(body))
.map_err(new_request_build_error)?;
Expand All @@ -369,7 +377,6 @@ impl AliyunDriveCore {
})
.map_err(new_json_serialize_error)?;
let req = Request::post(format!("{}/adrive/v1.0/openFile/delete", self.endpoint))
// Inject operation to the request.
.extension(Operation::Delete)
.body(Buffer::from(body))
.map_err(new_request_build_error)?;
Expand All @@ -392,22 +399,12 @@ impl AliyunDriveCore {
})
.map_err(new_json_serialize_error)?;
let req = Request::post(format!("{}/adrive/v1.0/openFile/list", self.endpoint))
// Inject operation to the request.
.extension(Operation::List)
.body(Buffer::from(body))
.map_err(new_request_build_error)?;
self.send(req, token.as_deref()).await
}

pub async fn upload(&self, upload_url: &str, body: Buffer) -> Result<Buffer> {
let req = Request::put(upload_url)
// Inject operation to the request.
.extension(Operation::Write)
.body(body)
.map_err(new_request_build_error)?;
self.send(req, None).await
}

pub async fn complete(&self, file_id: &str, upload_id: &str) -> Result<Buffer> {
let (token, drive_id) = self.get_token_and_drive().await?;
let body = serde_json::to_vec(&CompleteRequest {
Expand All @@ -417,41 +414,67 @@ impl AliyunDriveCore {
})
.map_err(new_json_serialize_error)?;
let req = Request::post(format!("{}/adrive/v1.0/openFile/complete", self.endpoint))
// Inject operation to the request.
.extension(Operation::Write)
.body(Buffer::from(body))
.map_err(new_request_build_error)?;
self.send(req, token.as_deref()).await
}

pub async fn get_upload_url(
async fn get_upload_url(
&self,
file_id: &str,
upload_id: &str,
part_number: Option<usize>,
) -> Result<Buffer> {
part_number: usize,
) -> Result<String> {
let (token, drive_id) = self.get_token_and_drive().await?;
let part_info_list = part_number.map(|part_number| {
vec![PartInfoItem {
part_number: Some(part_number),
}]
});
let part_info_list = vec![PartInfoItem {
part_number: Some(part_number),
}];
let body = serde_json::to_vec(&GetUploadRequest {
drive_id: &drive_id,
file_id,
upload_id,
part_info_list,
part_info_list: Some(part_info_list),
})
.map_err(new_json_serialize_error)?;

let req = Request::post(format!(
"{}/adrive/v1.0/openFile/getUploadUrl",
self.endpoint
))
// Inject operation to the request.
.extension(Operation::Write)
.body(Buffer::from(body))
.map_err(new_request_build_error)?;
self.send(req, token.as_deref()).await

let res = self.send(req, token.as_deref()).await?;

let mut output: UploadUrlResponse =
serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?;

let Some(upload_url) = output
.part_info_list
.take()
.map(|mut list| list.swap_remove(0))
.map(|part_info| part_info.upload_url)
else {
return Err(Error::new(ErrorKind::Unexpected, "cannot find upload_url"));
};

Ok(upload_url)
}
pub async fn upload(
&self,
file_id: &str,
upload_id: &str,
part_number: usize,
body: Buffer,
) -> Result<Buffer> {
let upload_url = self.get_upload_url(file_id, upload_id, part_number).await?;
let req = Request::put(upload_url)
.extension(Operation::Write)
.body(body)
.map_err(new_request_build_error)?;
self.send(req, None).await
}
}

Expand Down
21 changes: 4 additions & 17 deletions core/src/services/aliyun_drive/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use super::core::AliyunDriveCore;
use super::core::CheckNameMode;
use super::core::CreateResponse;
use super::core::CreateType;
use super::core::UploadUrlResponse;
use crate::raw::*;
use crate::*;

Expand Down Expand Up @@ -81,23 +80,11 @@ impl oio::Write for AliyunDriveWriter {
}
};

let res = self
if let Err(err) = self
.core
.get_upload_url(file_id, upload_id, Some(self.part_number))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored this as @erickguan suggested #6089 (comment)

.await?;
let output: UploadUrlResponse =
serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?;

let Some(upload_url) = output
.part_info_list
.as_ref()
.and_then(|list| list.first())
.map(|part_info| &part_info.upload_url)
else {
return Err(Error::new(ErrorKind::Unexpected, "cannot find upload_url"));
};

if let Err(err) = self.core.upload(upload_url, bs).await {
.upload(file_id, upload_id, self.part_number, bs)
.await
{
if err.kind() != ErrorKind::AlreadyExists {
return Err(err);
}
Expand Down
Loading