Skip to content

Commit 860f5b2

Browse files
authored
feat: Reduce stat operation if we are reading all (#5146)
* feat: Reduce stat request if we are reading to end Signed-off-by: Xuanwo <[email protected]> * Fix sftp read Signed-off-by: Xuanwo <[email protected]> * Try fix sftp read Signed-off-by: Xuanwo <[email protected]> * Fix typo Signed-off-by: Xuanwo <[email protected]> * Should be split off Signed-off-by: Xuanwo <[email protected]> * fix sftp read Signed-off-by: Xuanwo <[email protected]> --------- Signed-off-by: Xuanwo <[email protected]>
1 parent 43e4d7d commit 860f5b2

File tree

8 files changed

+179
-162
lines changed

8 files changed

+179
-162
lines changed

core/src/services/sftp/backend.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ impl Access for SftpBackend {
404404

405405
Ok((
406406
RpRead::default(),
407-
SftpReader::new(client, f, args.range().size().unwrap_or(u64::MAX) as _),
407+
SftpReader::new(client, f, args.range().size()),
408408
))
409409
}
410410

core/src/services/sftp/reader.rs

Lines changed: 14 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,17 @@ pub struct SftpReader {
3030

3131
file: File,
3232
chunk: usize,
33-
size: usize,
33+
size: Option<usize>,
3434
read: usize,
3535
buf: BytesMut,
3636
}
3737

3838
impl SftpReader {
39-
pub fn new(conn: PooledConnection<'static, Manager>, file: File, size: usize) -> Self {
39+
pub fn new(conn: PooledConnection<'static, Manager>, file: File, size: Option<u64>) -> Self {
4040
Self {
4141
_conn: conn,
4242
file,
43-
size,
43+
size: size.map(|v| v as usize),
4444
chunk: 2 * 1024 * 1024,
4545
read: 0,
4646
buf: BytesMut::new(),
@@ -50,50 +50,29 @@ impl SftpReader {
5050

5151
impl oio::Read for SftpReader {
5252
async fn read(&mut self) -> Result<Buffer> {
53-
// let client = self.inner.connect().await?;
54-
//
55-
// let mut fs = client.fs();
56-
// fs.set_cwd(&self.root);
57-
//
58-
// let path = fs
59-
// .canonicalize(&self.path)
60-
// .await
61-
// .map_err(parse_sftp_error)?;
62-
//
63-
// let mut f = client
64-
// .open(path.as_path())
65-
// .await
66-
// .map_err(parse_sftp_error)?;
67-
68-
// f.seek(SeekFrom::Start(offset))
69-
// .await
70-
// .map_err(new_std_io_error)?;
71-
72-
// let mut size = size;
73-
// if size == 0 {
74-
// return Ok(Buffer::new());
75-
// }
76-
77-
if self.read >= self.size {
53+
if self.read >= self.size.unwrap_or(usize::MAX) {
7854
return Ok(Buffer::new());
7955
}
8056

81-
let size = (self.size - self.read).min(self.chunk);
57+
let size = if let Some(size) = self.size {
58+
(size - self.read).min(self.chunk)
59+
} else {
60+
self.chunk
61+
};
8262
self.buf.reserve(size);
8363

8464
let Some(bytes) = self
8565
.file
86-
.read(size as u32, self.buf.split_off(size))
66+
.read(size as u32, self.buf.split_off(0))
8767
.await
8868
.map_err(parse_sftp_error)?
8969
else {
90-
return Err(Error::new(
91-
ErrorKind::RangeNotSatisfied,
92-
"sftp read file reaching EoF",
93-
));
70+
return Ok(Buffer::new());
9471
};
9572

9673
self.read += bytes.len();
97-
Ok(Buffer::from(bytes.freeze()))
74+
self.buf = bytes;
75+
let bs = self.buf.split();
76+
Ok(Buffer::from(bs.freeze()))
9877
}
9978
}

core/src/types/blocking_read/buffer_iterator.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::ops::Range;
18+
use std::ops::RangeBounds;
1919
use std::sync::Arc;
2020

2121
use crate::raw::*;
@@ -30,8 +30,8 @@ struct IteratingReader {
3030
impl IteratingReader {
3131
/// Create a new iterating reader.
3232
#[inline]
33-
fn new(ctx: Arc<ReadContext>, range: Range<u64>) -> Self {
34-
let generator = ReadGenerator::new(ctx.clone(), range);
33+
fn new(ctx: Arc<ReadContext>, range: BytesRange) -> Self {
34+
let generator = ReadGenerator::new(ctx.clone(), range.offset(), range.size());
3535
Self {
3636
generator,
3737
reader: None,
@@ -73,9 +73,9 @@ pub struct BufferIterator {
7373
impl BufferIterator {
7474
/// Create a new buffer iterator.
7575
#[inline]
76-
pub fn new(ctx: Arc<ReadContext>, range: Range<u64>) -> Self {
76+
pub fn new(ctx: Arc<ReadContext>, range: impl RangeBounds<u64>) -> Self {
7777
Self {
78-
inner: IteratingReader::new(ctx, range),
78+
inner: IteratingReader::new(ctx, range.into()),
7979
}
8080
}
8181
}

core/src/types/context/read.rs

Lines changed: 100 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::ops::Range;
18+
use std::ops::{Bound, Range, RangeBounds};
1919
use std::sync::Arc;
2020

2121
use crate::raw::*;
@@ -68,6 +68,38 @@ impl ReadContext {
6868
pub fn options(&self) -> &OpReader {
6969
&self.options
7070
}
71+
72+
/// Parse the range bounds into a range.
73+
pub(crate) async fn parse_into_range(
74+
&self,
75+
range: impl RangeBounds<u64>,
76+
) -> Result<Range<u64>> {
77+
let start = match range.start_bound() {
78+
Bound::Included(v) => *v,
79+
Bound::Excluded(v) => v + 1,
80+
Bound::Unbounded => 0,
81+
};
82+
83+
let end = match range.end_bound() {
84+
Bound::Included(v) => v + 1,
85+
Bound::Excluded(v) => *v,
86+
Bound::Unbounded => {
87+
let mut op_stat = OpStat::new();
88+
89+
if let Some(v) = self.args().version() {
90+
op_stat = op_stat.with_version(v);
91+
}
92+
93+
self.accessor()
94+
.stat(self.path(), op_stat)
95+
.await?
96+
.into_metadata()
97+
.content_length()
98+
}
99+
};
100+
101+
Ok(start..end)
102+
}
71103
}
72104

73105
/// ReadGenerator is used to generate new readers.
@@ -83,62 +115,65 @@ pub struct ReadGenerator {
83115
ctx: Arc<ReadContext>,
84116

85117
offset: u64,
86-
end: u64,
118+
size: Option<u64>,
87119
}
88120

89121
impl ReadGenerator {
90122
/// Create a new ReadGenerator.
91123
#[inline]
92-
pub fn new(ctx: Arc<ReadContext>, range: Range<u64>) -> Self {
93-
Self {
94-
ctx,
95-
offset: range.start,
96-
end: range.end,
124+
pub fn new(ctx: Arc<ReadContext>, offset: u64, size: Option<u64>) -> Self {
125+
Self { ctx, offset, size }
126+
}
127+
128+
/// Generate next range to read.
129+
fn next_range(&mut self) -> Option<BytesRange> {
130+
if self.size == Some(0) {
131+
return None;
97132
}
133+
134+
let next_offset = self.offset;
135+
let next_size = match self.size {
136+
// Given size is None, read all data.
137+
None => {
138+
// Update size to Some(0) to indicate that there is no more data to read.
139+
self.size = Some(0);
140+
None
141+
}
142+
Some(remaining) => {
143+
// If chunk is set, read data in chunks.
144+
let read_size = self
145+
.ctx
146+
.options
147+
.chunk()
148+
.map_or(remaining, |chunk| remaining.min(chunk as u64));
149+
// Update (offset, size) before building future.
150+
self.offset += read_size;
151+
self.size = Some(remaining - read_size);
152+
Some(read_size)
153+
}
154+
};
155+
156+
Some(BytesRange::new(next_offset, next_size))
98157
}
99158

100159
/// Generate next reader.
101160
pub async fn next_reader(&mut self) -> Result<Option<oio::Reader>> {
102-
if self.offset >= self.end {
161+
let Some(range) = self.next_range() else {
103162
return Ok(None);
104-
}
163+
};
105164

106-
let offset = self.offset;
107-
let mut size = (self.end - self.offset) as usize;
108-
if let Some(chunk) = self.ctx.options.chunk() {
109-
size = size.min(chunk)
110-
}
111-
112-
// Update self.offset before building future.
113-
self.offset += size as u64;
114-
let args = self
115-
.ctx
116-
.args
117-
.clone()
118-
.with_range(BytesRange::new(offset, Some(size as u64)));
165+
let args = self.ctx.args.clone().with_range(range);
119166
let (_, r) = self.ctx.acc.read(&self.ctx.path, args).await?;
120167
Ok(Some(r))
121168
}
122169

123170
/// Generate next blocking reader.
124171
pub fn next_blocking_reader(&mut self) -> Result<Option<oio::BlockingReader>> {
125-
if self.offset >= self.end {
172+
let Some(range) = self.next_range() else {
126173
return Ok(None);
127-
}
174+
};
128175

129-
let offset = self.offset;
130-
let mut size = (self.end - self.offset) as usize;
131-
if let Some(chunk) = self.ctx.options.chunk() {
132-
size = size.min(chunk)
133-
}
134-
135-
// Update self.offset before building future.
136-
self.offset += size as u64;
137-
let args = self
138-
.ctx
139-
.args
140-
.clone()
141-
.with_range(BytesRange::new(offset, Some(size as u64)));
176+
let args = self.ctx.args.clone().with_range(range);
142177
let (_, r) = self.ctx.acc.blocking_read(&self.ctx.path, args)?;
143178
Ok(Some(r))
144179
}
@@ -167,7 +202,7 @@ mod tests {
167202
OpRead::new(),
168203
OpReader::new().with_chunk(3),
169204
));
170-
let mut generator = ReadGenerator::new(ctx, 0..10);
205+
let mut generator = ReadGenerator::new(ctx, 0, Some(10));
171206
let mut readers = vec![];
172207
while let Some(r) = generator.next_reader().await? {
173208
readers.push(r);
@@ -177,6 +212,32 @@ mod tests {
177212
Ok(())
178213
}
179214

215+
#[tokio::test]
216+
async fn test_next_reader_without_size() -> Result<()> {
217+
let op = Operator::via_iter(Scheme::Memory, [])?;
218+
op.write(
219+
"test",
220+
Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]),
221+
)
222+
.await?;
223+
224+
let acc = op.into_inner();
225+
let ctx = Arc::new(ReadContext::new(
226+
acc,
227+
"test".to_string(),
228+
OpRead::new(),
229+
OpReader::new().with_chunk(3),
230+
));
231+
let mut generator = ReadGenerator::new(ctx, 0, None);
232+
let mut readers = vec![];
233+
while let Some(r) = generator.next_reader().await? {
234+
readers.push(r);
235+
}
236+
237+
pretty_assertions::assert_eq!(readers.len(), 1);
238+
Ok(())
239+
}
240+
180241
#[test]
181242
fn test_next_blocking_reader() -> Result<()> {
182243
let op = Operator::via_iter(Scheme::Memory, [])?;
@@ -192,7 +253,7 @@ mod tests {
192253
OpRead::new(),
193254
OpReader::new().with_chunk(3),
194255
));
195-
let mut generator = ReadGenerator::new(ctx, 0..10);
256+
let mut generator = ReadGenerator::new(ctx, 0, Some(10));
196257
let mut readers = vec![];
197258
while let Some(r) = generator.next_blocking_reader()? {
198259
readers.push(r);

0 commit comments

Comments
 (0)