Skip to content

Commit aa6e0f7

Browse files
authored
refactor(services/aliyun_drive): Move raw request to core (#6089)
* refactor(services/aliyun_drive): Move raw request to core * fix: clippy lints * feat(services/yandex_disk): back to streaming read * style: add newline * chore: change visibility of get_upload_url * chore: trigger CI again * fix: clippy lints * chore: trigger CI again
1 parent 66046dc commit aa6e0f7

File tree

3 files changed

+59
-59
lines changed

3 files changed

+59
-59
lines changed

core/src/services/aliyun_drive/backend.rs

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ use std::sync::Arc;
2121

2222
use bytes::Buf;
2323
use chrono::Utc;
24-
use http::header;
25-
use http::Request;
2624
use http::Response;
2725
use http::StatusCode;
2826
use log::debug;
@@ -368,16 +366,8 @@ impl Access for AliyunDriveBackend {
368366
let res = self.core.get_by_path(path).await?;
369367
let file: AliyunDriveFile =
370368
serde_json::from_reader(res.reader()).map_err(new_json_serialize_error)?;
369+
let resp = self.core.download(&file.file_id, args.range()).await?;
371370

372-
let download_url = self.core.get_download_url(&file.file_id).await?;
373-
// TODO: this request should be done in core.rs and not here
374-
let req = Request::get(&download_url)
375-
.extension(Operation::Read)
376-
.header(header::RANGE, args.range().to_header())
377-
.body(Buffer::new())
378-
.map_err(new_request_build_error)?;
379-
380-
let resp = self.core.info.http_client().fetch(req).await?;
381371
let status = resp.status();
382372
match status {
383373
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {

core/src/services/aliyun_drive/core.rs

Lines changed: 54 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ use bytes::Buf;
2222
use chrono::Utc;
2323
use http::header::HeaderValue;
2424
use http::header::{self};
25-
use http::Method;
2625
use http::Request;
26+
use http::{Method, Response};
2727
use serde::Deserialize;
2828
use serde::Serialize;
2929
use tokio::sync::Mutex;
@@ -198,7 +198,6 @@ impl AliyunDriveCore {
198198
})
199199
.map_err(new_json_serialize_error)?;
200200
let req = req
201-
// Inject operation to the request.
202201
.extension(Operation::Read)
203202
.body(Buffer::from(body))
204203
.map_err(new_request_build_error)?;
@@ -264,7 +263,6 @@ impl AliyunDriveCore {
264263
})
265264
.map_err(new_json_serialize_error)?;
266265
let req = Request::post(format!("{}/adrive/v1.0/openFile/create", self.endpoint))
267-
// Inject operation to the request.
268266
.extension(Operation::Write)
269267
.body(Buffer::from(body))
270268
.map_err(new_request_build_error)?;
@@ -282,27 +280,40 @@ impl AliyunDriveCore {
282280
.await
283281
}
284282

285-
pub async fn get_download_url(&self, file_id: &str) -> Result<String> {
283+
async fn get_download_url(&self, file_id: &str) -> Result<String> {
286284
let (token, drive_id) = self.get_token_and_drive().await?;
287285
let body = serde_json::to_vec(&FileRequest {
288286
drive_id: &drive_id,
289287
file_id,
290288
})
291289
.map_err(new_json_serialize_error)?;
290+
292291
let req = Request::post(format!(
293292
"{}/adrive/v1.0/openFile/getDownloadUrl",
294293
self.endpoint
295294
))
296-
// Inject operation to the request.
297295
.extension(Operation::Read)
298296
.body(Buffer::from(body))
299297
.map_err(new_request_build_error)?;
298+
300299
let res = self.send(req, token.as_deref()).await?;
300+
301301
let output: GetDownloadUrlResponse =
302302
serde_json::from_reader(res.reader()).map_err(new_json_serialize_error)?;
303+
303304
Ok(output.url)
304305
}
305306

307+
pub async fn download(&self, file_id: &str, range: BytesRange) -> Result<Response<HttpBody>> {
308+
let download_url = self.get_download_url(file_id).await?;
309+
let req = Request::get(download_url)
310+
.extension(Operation::Read)
311+
.header(header::RANGE, range.to_header())
312+
.body(Buffer::new())
313+
.map_err(new_request_build_error)?;
314+
self.info.http_client().fetch(req).await
315+
}
316+
306317
pub async fn move_path(&self, file_id: &str, to_parent_file_id: &str) -> Result<()> {
307318
let (token, drive_id) = self.get_token_and_drive().await?;
308319
let body = serde_json::to_vec(&MovePathRequest {
@@ -313,7 +324,6 @@ impl AliyunDriveCore {
313324
})
314325
.map_err(new_json_serialize_error)?;
315326
let req = Request::post(format!("{}/adrive/v1.0/openFile/move", self.endpoint))
316-
// Inject operation to the request.
317327
.extension(Operation::Write)
318328
.body(Buffer::from(body))
319329
.map_err(new_request_build_error)?;
@@ -331,7 +341,6 @@ impl AliyunDriveCore {
331341
})
332342
.map_err(new_json_serialize_error)?;
333343
let req = Request::post(format!("{}/adrive/v1.0/openFile/update", self.endpoint))
334-
// Inject operation to the request.
335344
.extension(Operation::Write)
336345
.body(Buffer::from(body))
337346
.map_err(new_request_build_error)?;
@@ -354,7 +363,6 @@ impl AliyunDriveCore {
354363
})
355364
.map_err(new_json_serialize_error)?;
356365
let req = Request::post(format!("{}/adrive/v1.0/openFile/copy", self.endpoint))
357-
// Inject operation to the request.
358366
.extension(Operation::Copy)
359367
.body(Buffer::from(body))
360368
.map_err(new_request_build_error)?;
@@ -369,7 +377,6 @@ impl AliyunDriveCore {
369377
})
370378
.map_err(new_json_serialize_error)?;
371379
let req = Request::post(format!("{}/adrive/v1.0/openFile/delete", self.endpoint))
372-
// Inject operation to the request.
373380
.extension(Operation::Delete)
374381
.body(Buffer::from(body))
375382
.map_err(new_request_build_error)?;
@@ -392,22 +399,12 @@ impl AliyunDriveCore {
392399
})
393400
.map_err(new_json_serialize_error)?;
394401
let req = Request::post(format!("{}/adrive/v1.0/openFile/list", self.endpoint))
395-
// Inject operation to the request.
396402
.extension(Operation::List)
397403
.body(Buffer::from(body))
398404
.map_err(new_request_build_error)?;
399405
self.send(req, token.as_deref()).await
400406
}
401407

402-
pub async fn upload(&self, upload_url: &str, body: Buffer) -> Result<Buffer> {
403-
let req = Request::put(upload_url)
404-
// Inject operation to the request.
405-
.extension(Operation::Write)
406-
.body(body)
407-
.map_err(new_request_build_error)?;
408-
self.send(req, None).await
409-
}
410-
411408
pub async fn complete(&self, file_id: &str, upload_id: &str) -> Result<Buffer> {
412409
let (token, drive_id) = self.get_token_and_drive().await?;
413410
let body = serde_json::to_vec(&CompleteRequest {
@@ -417,41 +414,67 @@ impl AliyunDriveCore {
417414
})
418415
.map_err(new_json_serialize_error)?;
419416
let req = Request::post(format!("{}/adrive/v1.0/openFile/complete", self.endpoint))
420-
// Inject operation to the request.
421417
.extension(Operation::Write)
422418
.body(Buffer::from(body))
423419
.map_err(new_request_build_error)?;
424420
self.send(req, token.as_deref()).await
425421
}
426422

427-
pub async fn get_upload_url(
423+
async fn get_upload_url(
428424
&self,
429425
file_id: &str,
430426
upload_id: &str,
431-
part_number: Option<usize>,
432-
) -> Result<Buffer> {
427+
part_number: usize,
428+
) -> Result<String> {
433429
let (token, drive_id) = self.get_token_and_drive().await?;
434-
let part_info_list = part_number.map(|part_number| {
435-
vec![PartInfoItem {
436-
part_number: Some(part_number),
437-
}]
438-
});
430+
let part_info_list = vec![PartInfoItem {
431+
part_number: Some(part_number),
432+
}];
439433
let body = serde_json::to_vec(&GetUploadRequest {
440434
drive_id: &drive_id,
441435
file_id,
442436
upload_id,
443-
part_info_list,
437+
part_info_list: Some(part_info_list),
444438
})
445439
.map_err(new_json_serialize_error)?;
440+
446441
let req = Request::post(format!(
447442
"{}/adrive/v1.0/openFile/getUploadUrl",
448443
self.endpoint
449444
))
450-
// Inject operation to the request.
451445
.extension(Operation::Write)
452446
.body(Buffer::from(body))
453447
.map_err(new_request_build_error)?;
454-
self.send(req, token.as_deref()).await
448+
449+
let res = self.send(req, token.as_deref()).await?;
450+
451+
let mut output: UploadUrlResponse =
452+
serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?;
453+
454+
let Some(upload_url) = output
455+
.part_info_list
456+
.take()
457+
.map(|mut list| list.swap_remove(0))
458+
.map(|part_info| part_info.upload_url)
459+
else {
460+
return Err(Error::new(ErrorKind::Unexpected, "cannot find upload_url"));
461+
};
462+
463+
Ok(upload_url)
464+
}
465+
pub async fn upload(
466+
&self,
467+
file_id: &str,
468+
upload_id: &str,
469+
part_number: usize,
470+
body: Buffer,
471+
) -> Result<Buffer> {
472+
let upload_url = self.get_upload_url(file_id, upload_id, part_number).await?;
473+
let req = Request::put(upload_url)
474+
.extension(Operation::Write)
475+
.body(body)
476+
.map_err(new_request_build_error)?;
477+
self.send(req, None).await
455478
}
456479
}
457480

core/src/services/aliyun_drive/writer.rs

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use super::core::AliyunDriveCore;
2323
use super::core::CheckNameMode;
2424
use super::core::CreateResponse;
2525
use super::core::CreateType;
26-
use super::core::UploadUrlResponse;
2726
use crate::raw::*;
2827
use crate::*;
2928

@@ -81,23 +80,11 @@ impl oio::Write for AliyunDriveWriter {
8180
}
8281
};
8382

84-
let res = self
83+
if let Err(err) = self
8584
.core
86-
.get_upload_url(file_id, upload_id, Some(self.part_number))
87-
.await?;
88-
let output: UploadUrlResponse =
89-
serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?;
90-
91-
let Some(upload_url) = output
92-
.part_info_list
93-
.as_ref()
94-
.and_then(|list| list.first())
95-
.map(|part_info| &part_info.upload_url)
96-
else {
97-
return Err(Error::new(ErrorKind::Unexpected, "cannot find upload_url"));
98-
};
99-
100-
if let Err(err) = self.core.upload(upload_url, bs).await {
85+
.upload(file_id, upload_id, self.part_number, bs)
86+
.await
87+
{
10188
if err.kind() != ErrorKind::AlreadyExists {
10289
return Err(err);
10390
}

0 commit comments

Comments
 (0)