From 93076ee283a7a1f80f934472d2cf23eaffbb4ba0 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Sat, 28 Sep 2024 12:53:34 +0800 Subject: [PATCH 1/6] feat: Reduce stat request if we are reading to end Signed-off-by: Xuanwo --- .../types/blocking_read/buffer_iterator.rs | 10 +- core/src/types/context/read.rs | 139 +++++++++++++----- core/src/types/read/buffer_stream.rs | 49 ++++-- core/src/types/read/futures_async_reader.rs | 12 +- core/src/types/read/futures_bytes_stream.rs | 21 ++- core/src/types/read/reader.rs | 59 +------- 6 files changed, 164 insertions(+), 126 deletions(-) diff --git a/core/src/types/blocking_read/buffer_iterator.rs b/core/src/types/blocking_read/buffer_iterator.rs index 91a98130cc4c..f0902d5aac87 100644 --- a/core/src/types/blocking_read/buffer_iterator.rs +++ b/core/src/types/blocking_read/buffer_iterator.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::ops::Range; +use std::ops::RangeBounds; use std::sync::Arc; use crate::raw::*; @@ -30,8 +30,8 @@ struct IteratingReader { impl IteratingReader { /// Create a new iterating reader. #[inline] - fn new(ctx: Arc, range: Range) -> Self { - let generator = ReadGenerator::new(ctx.clone(), range); + fn new(ctx: Arc, range: BytesRange) -> Self { + let generator = ReadGenerator::new(ctx.clone(), range.offset(), range.size()); Self { generator, reader: None, @@ -73,9 +73,9 @@ pub struct BufferIterator { impl BufferIterator { /// Create a new buffer iterator. #[inline] - pub fn new(ctx: Arc, range: Range) -> Self { + pub fn new(ctx: Arc, range: impl RangeBounds) -> Self { Self { - inner: IteratingReader::new(ctx, range), + inner: IteratingReader::new(ctx, range.into()), } } } diff --git a/core/src/types/context/read.rs b/core/src/types/context/read.rs index 955a1ff1455e..d3cf432a35ad 100644 --- a/core/src/types/context/read.rs +++ b/core/src/types/context/read.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::ops::Range; +use std::ops::{Bound, Range, RangeBounds}; use std::sync::Arc; use crate::raw::*; @@ -68,6 +68,38 @@ impl ReadContext { pub fn options(&self) -> &OpReader { &self.options } + + /// Parse the range bounds into a range. + pub(crate) async fn parse_into_range( + &self, + range: impl RangeBounds, + ) -> Result> { + let start = match range.start_bound() { + Bound::Included(v) => *v, + Bound::Excluded(v) => v + 1, + Bound::Unbounded => 0, + }; + + let end = match range.end_bound() { + Bound::Included(v) => v + 1, + Bound::Excluded(v) => *v, + Bound::Unbounded => { + let mut op_stat = OpStat::new(); + + if let Some(v) = self.args().version() { + op_stat = op_stat.with_version(v); + } + + self.accessor() + .stat(self.path(), op_stat) + .await? + .into_metadata() + .content_length() + } + }; + + Ok(start..end) + } } /// ReadGenerator is used to generate new readers. @@ -83,62 +115,65 @@ pub struct ReadGenerator { ctx: Arc, offset: u64, - end: u64, + size: Option, } impl ReadGenerator { /// Create a new ReadGenerator. #[inline] - pub fn new(ctx: Arc, range: Range) -> Self { - Self { - ctx, - offset: range.start, - end: range.end, + pub fn new(ctx: Arc, offset: u64, size: Option) -> Self { + Self { ctx, offset, size } + } + + /// Generate next range to read. + fn next_range(&mut self) -> Option { + if self.size == Some(0) { + return None; } + + let next_offset = self.offset; + let next_size = match self.size { + // Given size is None, read all data. + None => { + // Update size to Some(0) to indicate that there is no more data to read. + self.size = Some(0); + None + } + Some(remaining) => { + // If chunk is set, read data in chunks. + let read_size = self + .ctx + .options + .chunk() + .map_or(remaining, |chunk| remaining.min(chunk as u64)); + // Update (offset, size) before building future. + self.offset += read_size; + self.size = Some(remaining - read_size); + Some(read_size) + } + }; + + Some(BytesRange::new(next_offset, next_size)) } /// Generate next reader. pub async fn next_reader(&mut self) -> Result> { - if self.offset >= self.end { + let Some(range) = self.next_range() else { return Ok(None); - } + }; - let offset = self.offset; - let mut size = (self.end - self.offset) as usize; - if let Some(chunk) = self.ctx.options.chunk() { - size = size.min(chunk) - } - - // Update self.offset before building future. - self.offset += size as u64; - let args = self - .ctx - .args - .clone() - .with_range(BytesRange::new(offset, Some(size as u64))); + let args = self.ctx.args.clone().with_range(range); let (_, r) = self.ctx.acc.read(&self.ctx.path, args).await?; Ok(Some(r)) } /// Generate next blocking reader. pub fn next_blocking_reader(&mut self) -> Result> { - if self.offset >= self.end { + let Some(range) = self.next_range() else { return Ok(None); - } + }; - let offset = self.offset; - let mut size = (self.end - self.offset) as usize; - if let Some(chunk) = self.ctx.options.chunk() { - size = size.min(chunk) - } - - // Update self.offset before building future. - self.offset += size as u64; - let args = self - .ctx - .args - .clone() - .with_range(BytesRange::new(offset, Some(size as u64))); + let args = self.ctx.args.clone().with_range(range); let (_, r) = self.ctx.acc.blocking_read(&self.ctx.path, args)?; Ok(Some(r)) } @@ -167,7 +202,7 @@ mod tests { OpRead::new(), OpReader::new().with_chunk(3), )); - let mut generator = ReadGenerator::new(ctx, 0..10); + let mut generator = ReadGenerator::new(ctx, 0, Some(10)); let mut readers = vec![]; while let Some(r) = generator.next_reader().await? { readers.push(r); @@ -177,6 +212,32 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_next_reader_without_size() -> Result<()> { + let op = Operator::via_iter(Scheme::Memory, [])?; + op.write( + "test", + Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]), + ) + .await?; + + let acc = op.into_inner(); + let ctx = Arc::new(ReadContext::new( + acc, + "test".to_string(), + OpRead::new(), + OpReader::new().with_chunk(3), + )); + let mut generator = ReadGenerator::new(ctx, 0, None); + let mut readers = vec![]; + while let Some(r) = generator.next_reader().await? { + readers.push(r); + } + + pretty_assertions::assert_eq!(readers.len(), 1); + Ok(()) + } + #[test] fn test_next_blocking_reader() -> Result<()> { let op = Operator::via_iter(Scheme::Memory, [])?; @@ -192,7 +253,7 @@ mod tests { OpRead::new(), OpReader::new().with_chunk(3), )); - let mut generator = ReadGenerator::new(ctx, 0..10); + let mut generator = ReadGenerator::new(ctx, 0, Some(10)); let mut readers = vec![]; while let Some(r) = generator.next_blocking_reader()? { readers.push(r); diff --git a/core/src/types/read/buffer_stream.rs b/core/src/types/read/buffer_stream.rs index 1464ef6164a1..eff0287e5e6d 100644 --- a/core/src/types/read/buffer_stream.rs +++ b/core/src/types/read/buffer_stream.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::ops::Range; +use std::ops::RangeBounds; use std::pin::Pin; use std::sync::Arc; use std::task::Context; @@ -40,8 +40,8 @@ pub struct StreamingReader { impl StreamingReader { /// Create a new streaming reader. #[inline] - fn new(ctx: Arc, range: Range) -> Self { - let generator = ReadGenerator::new(ctx.clone(), range); + fn new(ctx: Arc, range: BytesRange) -> Self { + let generator = ReadGenerator::new(ctx.clone(), range.offset(), range.size()); Self { generator, reader: None, @@ -86,7 +86,7 @@ impl ChunkedReader { /// # Notes /// /// We don't need to handle `Executor::timeout` since we are outside of the layer. - fn new(ctx: Arc, range: Range) -> Self { + fn new(ctx: Arc, range: BytesRange) -> Self { let tasks = ConcurrentTasks::new( ctx.args().executor().cloned().unwrap_or_default(), ctx.options().concurrent(), @@ -99,7 +99,7 @@ impl ChunkedReader { }) }, ); - let generator = ReadGenerator::new(ctx, range); + let generator = ReadGenerator::new(ctx, range.offset(), range.size()); Self { generator, tasks, @@ -144,17 +144,40 @@ enum State { } impl BufferStream { - /// Create a new buffer stream. - pub fn new(ctx: Arc, range: Range) -> Self { + /// Create a new buffer stream with already calculated offset and size. + pub fn new(ctx: Arc, offset: u64, size: Option) -> Self { + debug_assert!( + size.is_some() || ctx.options().chunk().is_none(), + "size must be known if chunk is set" + ); + let reader = if ctx.options().chunk().is_some() { - TwoWays::Two(ChunkedReader::new(ctx, range)) + TwoWays::Two(ChunkedReader::new(ctx, BytesRange::new(offset, size))) } else { - TwoWays::One(StreamingReader::new(ctx, range)) + TwoWays::One(StreamingReader::new(ctx, BytesRange::new(offset, size))) }; + Self { state: State::Idle(Some(reader)), } } + + /// Create a new buffer stream with given range bound. + /// + /// If users is going to perform chunked read but the read size is unknown, we will parse + /// into range first. + pub async fn create(ctx: Arc, range: impl RangeBounds) -> Result { + let reader = if ctx.options().chunk().is_some() { + let range = ctx.parse_into_range(range).await?; + TwoWays::Two(ChunkedReader::new(ctx, range.into())) + } else { + TwoWays::One(StreamingReader::new(ctx, range.into())) + }; + + Ok(Self { + state: State::Idle(Some(reader)), + }) + } } impl Stream for BufferStream { @@ -198,8 +221,8 @@ mod tests { use super::*; - #[test] - fn test_trait() -> Result<()> { + #[tokio::test] + async fn test_trait() -> Result<()> { let acc = Operator::via_iter(Scheme::Memory, [])?.into_inner(); let ctx = Arc::new(ReadContext::new( acc, @@ -207,7 +230,7 @@ mod tests { OpRead::new(), OpReader::new(), )); - let v = BufferStream::new(ctx, 4..8); + let v = BufferStream::create(ctx, 4..8).await?; let _: Box = Box::new(v); @@ -231,7 +254,7 @@ mod tests { OpReader::new(), )); - let s = BufferStream::new(ctx, 4..8); + let s = BufferStream::create(ctx, 4..8).await?; let bufs: Vec<_> = s.try_collect().await.unwrap(); assert_eq!(bufs.len(), 1); assert_eq!(bufs[0].chunk(), "o".as_bytes()); diff --git a/core/src/types/read/futures_async_reader.rs b/core/src/types/read/futures_async_reader.rs index 13e659e2b68e..bf3bc212e2c9 100644 --- a/core/src/types/read/futures_async_reader.rs +++ b/core/src/types/read/futures_async_reader.rs @@ -63,7 +63,7 @@ impl FuturesAsyncReader { #[inline] pub(super) fn new(ctx: Arc, range: Range) -> Self { let (start, end) = (range.start, range.end); - let stream = BufferStream::new(ctx.clone(), range); + let stream = BufferStream::new(ctx.clone(), start, Some(end - start)); FuturesAsyncReader { ctx, @@ -157,7 +157,11 @@ impl AsyncSeek for FuturesAsyncReader { self.buf.advance(cnt as _); } else { self.buf = Buffer::new(); - self.stream = BufferStream::new(self.ctx.clone(), new_pos + self.start..self.end); + self.stream = BufferStream::new( + self.ctx.clone(), + new_pos + self.start, + Some(self.end - self.start - new_pos), + ); } self.pos = new_pos; @@ -177,8 +181,8 @@ mod tests { use super::*; - #[test] - fn test_trait() -> Result<()> { + #[tokio::test] + async fn test_trait() -> Result<()> { let acc = Operator::via_iter(Scheme::Memory, [])?.into_inner(); let ctx = Arc::new(ReadContext::new( acc, diff --git a/core/src/types/read/futures_bytes_stream.rs b/core/src/types/read/futures_bytes_stream.rs index 99e18b5ad8c2..df401f850b21 100644 --- a/core/src/types/read/futures_bytes_stream.rs +++ b/core/src/types/read/futures_bytes_stream.rs @@ -16,7 +16,7 @@ // under the License. use std::io; -use std::ops::Range; +use std::ops::RangeBounds; use std::pin::Pin; use std::sync::Arc; use std::task::ready; @@ -46,14 +46,13 @@ unsafe impl Sync for FuturesBytesStream {} impl FuturesBytesStream { /// NOTE: don't allow users to create FuturesStream directly. - #[inline] - pub(crate) fn new(ctx: Arc, range: Range) -> Self { - let stream = BufferStream::new(ctx, range); + pub(crate) async fn new(ctx: Arc, range: impl RangeBounds) -> Result { + let stream = BufferStream::create(ctx, range).await?; - FuturesBytesStream { + Ok(FuturesBytesStream { stream, buf: Buffer::new(), - } + }) } } @@ -88,8 +87,8 @@ mod tests { use super::*; - #[test] - fn test_trait() -> Result<()> { + #[tokio::test] + async fn test_trait() -> Result<()> { let acc = Operator::via_iter(Scheme::Memory, [])?.into_inner(); let ctx = Arc::new(ReadContext::new( acc, @@ -97,7 +96,7 @@ mod tests { OpRead::new(), OpReader::new(), )); - let v = FuturesBytesStream::new(ctx, 4..8); + let v = FuturesBytesStream::new(ctx, 4..8).await?; let _: Box = Box::new(v); @@ -121,7 +120,7 @@ mod tests { OpReader::new(), )); - let s = FuturesBytesStream::new(ctx, 4..8); + let s = FuturesBytesStream::new(ctx, 4..8).await?; let bufs: Vec = s.try_collect().await.unwrap(); assert_eq!(&bufs[0], "o".as_bytes()); assert_eq!(&bufs[1], "Wor".as_bytes()); @@ -146,7 +145,7 @@ mod tests { OpReader::new().with_concurrent(3).with_chunk(1), )); - let s = FuturesBytesStream::new(ctx, 4..8); + let s = FuturesBytesStream::new(ctx, 4..8).await?; let bufs: Vec = s.try_collect().await.unwrap(); assert_eq!(&bufs[0], "o".as_bytes()); assert_eq!(&bufs[1], "W".as_bytes()); diff --git a/core/src/types/read/reader.rs b/core/src/types/read/reader.rs index c7c68c0f70fe..3ec284e5bb89 100644 --- a/core/src/types/read/reader.rs +++ b/core/src/types/read/reader.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::ops::Bound; use std::ops::Range; use std::ops::RangeBounds; use std::sync::Arc; @@ -25,7 +24,6 @@ use futures::stream; use futures::StreamExt; use futures::TryStreamExt; -use crate::raw::*; use crate::*; /// Reader is designed to read data from given path in an asynchronous @@ -94,9 +92,6 @@ use crate::*; #[derive(Clone)] pub struct Reader { ctx: Arc, - - /// Total size of the reader. - size: Arc, } impl Reader { @@ -108,48 +103,7 @@ impl Reader { /// We don't want to expose those details to users so keep this function /// in crate only. pub(crate) fn new(ctx: ReadContext) -> Self { - Reader { - ctx: Arc::new(ctx), - size: Arc::new(AtomicContentLength::new()), - } - } - - /// Parse users input range bounds into valid `Range`. - /// - /// To avoid duplicated stat call, we will cache the size of the reader. - async fn parse_range(&self, range: impl RangeBounds) -> Result> { - let start = match range.start_bound() { - Bound::Included(v) => *v, - Bound::Excluded(v) => v + 1, - Bound::Unbounded => 0, - }; - - let end = match range.end_bound() { - Bound::Included(v) => v + 1, - Bound::Excluded(v) => *v, - Bound::Unbounded => match self.size.load() { - Some(v) => v, - None => { - let mut op_stat = OpStat::new(); - - if let Some(v) = self.ctx.args().version() { - op_stat = op_stat.with_version(v); - } - - let size = self - .ctx - .accessor() - .stat(self.ctx.path(), op_stat) - .await? - .into_metadata() - .content_length(); - self.size.store(size); - size - } - }, - }; - - Ok(start..end) + Reader { ctx: Arc::new(ctx) } } /// Read give range from reader into [`Buffer`]. @@ -246,8 +200,7 @@ impl Reader { /// And the name `BufferStream` is not good enough to expose to users. /// Let's keep it inside for now. async fn into_stream(self, range: impl RangeBounds) -> Result { - let range = self.parse_range(range).await?; - Ok(BufferStream::new(self.ctx, range)) + BufferStream::create(self.ctx, range).await } /// Convert reader into [`FuturesAsyncReader`] which implements [`futures::AsyncRead`], @@ -312,7 +265,7 @@ impl Reader { self, range: impl RangeBounds, ) -> Result { - let range = self.parse_range(range).await?; + let range = self.ctx.parse_into_range(range).await?; Ok(FuturesAsyncReader::new(self.ctx, range)) } @@ -372,21 +325,19 @@ impl Reader { self, range: impl RangeBounds, ) -> Result { - let range = self.parse_range(range).await?; - Ok(FuturesBytesStream::new(self.ctx, range)) + FuturesBytesStream::new(self.ctx, range).await } } #[cfg(test)] mod tests { - use bytes::Bytes; use rand::rngs::ThreadRng; use rand::Rng; use rand::RngCore; use super::*; - use crate::raw::MaybeSend; + use crate::raw::*; use crate::services; use crate::Operator; From c611f34bb2b311bcdcabf9a7ed08c95018b7072a Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Sat, 28 Sep 2024 13:15:37 +0800 Subject: [PATCH 2/6] Fix sftp read Signed-off-by: Xuanwo --- core/src/services/sftp/backend.rs | 2 +- core/src/services/sftp/reader.rs | 43 +++++++------------------------ 2 files changed, 11 insertions(+), 34 deletions(-) diff --git a/core/src/services/sftp/backend.rs b/core/src/services/sftp/backend.rs index 48b87e0493b4..8e3c103f7862 100644 --- a/core/src/services/sftp/backend.rs +++ b/core/src/services/sftp/backend.rs @@ -404,7 +404,7 @@ impl Access for SftpBackend { Ok(( RpRead::default(), - SftpReader::new(client, f, args.range().size().unwrap_or(u64::MAX) as _), + SftpReader::new(client, f, args.range().size()), )) } diff --git a/core/src/services/sftp/reader.rs b/core/src/services/sftp/reader.rs index f007c3bba64b..90a231472f2c 100644 --- a/core/src/services/sftp/reader.rs +++ b/core/src/services/sftp/reader.rs @@ -30,17 +30,17 @@ pub struct SftpReader { file: File, chunk: usize, - size: usize, + size: Option, read: usize, buf: BytesMut, } impl SftpReader { - pub fn new(conn: PooledConnection<'static, Manager>, file: File, size: usize) -> Self { + pub fn new(conn: PooledConnection<'static, Manager>, file: File, size: Option) -> Self { Self { _conn: conn, file, - size, + size: size.map(|v| v as usize), chunk: 2 * 1024 * 1024, read: 0, buf: BytesMut::new(), @@ -50,35 +50,15 @@ impl SftpReader { impl oio::Read for SftpReader { async fn read(&mut self) -> Result { - // let client = self.inner.connect().await?; - // - // let mut fs = client.fs(); - // fs.set_cwd(&self.root); - // - // let path = fs - // .canonicalize(&self.path) - // .await - // .map_err(parse_sftp_error)?; - // - // let mut f = client - // .open(path.as_path()) - // .await - // .map_err(parse_sftp_error)?; - - // f.seek(SeekFrom::Start(offset)) - // .await - // .map_err(new_std_io_error)?; - - // let mut size = size; - // if size == 0 { - // return Ok(Buffer::new()); - // } - - if self.read >= self.size { + if Some(self.read) >= self.size { return Ok(Buffer::new()); } - let size = (self.size - self.read).min(self.chunk); + let size = if let Some(size) = self.size { + (size - self.read).min(self.chunk) + } else { + self.chunk + }; self.buf.reserve(size); let Some(bytes) = self @@ -87,10 +67,7 @@ impl oio::Read for SftpReader { .await .map_err(parse_sftp_error)? else { - return Err(Error::new( - ErrorKind::RangeNotSatisfied, - "sftp read file reaching EoF", - )); + return Ok(Buffer::new()); }; self.read += bytes.len(); From e12a106d6890fc32b23ca4c35fa547625923c05c Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Sat, 28 Sep 2024 13:38:00 +0800 Subject: [PATCH 3/6] Try fix sftp read Signed-off-by: Xuanwo --- core/src/services/sftp/backend.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/services/sftp/backend.rs b/core/src/services/sftp/backend.rs index 8e3c103f7862..d6ec3c080410 100644 --- a/core/src/services/sftp/backend.rs +++ b/core/src/services/sftp/backend.rs @@ -396,6 +396,8 @@ impl Access for SftpBackend { .await .map_err(parse_sftp_error)?; + let meta = f.metadata().await.map_err(parse_sftp_error)?; + if args.range().offset() != 0 { f.seek(SeekFrom::Start(args.range().offset())) .await @@ -404,7 +406,7 @@ impl Access for SftpBackend { Ok(( RpRead::default(), - SftpReader::new(client, f, args.range().size()), + SftpReader::new(client, f, args.range().size().and(meta.len())), )) } From ae464347a8766e47d787245aa24b08b7941ed879 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Sat, 28 Sep 2024 13:44:17 +0800 Subject: [PATCH 4/6] Fix typo Signed-off-by: Xuanwo --- core/src/services/sftp/reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/services/sftp/reader.rs b/core/src/services/sftp/reader.rs index 90a231472f2c..861015ba4a44 100644 --- a/core/src/services/sftp/reader.rs +++ b/core/src/services/sftp/reader.rs @@ -63,7 +63,7 @@ impl oio::Read for SftpReader { let Some(bytes) = self .file - .read(size as u32, self.buf.split_off(size)) + .read(size as u32, self.buf.split_to(size)) .await .map_err(parse_sftp_error)? else { From f1d92b13685b3ba5e423c0b6616b1ac1642651a3 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Sat, 28 Sep 2024 14:00:58 +0800 Subject: [PATCH 5/6] Should be split off Signed-off-by: Xuanwo --- core/src/services/sftp/reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/services/sftp/reader.rs b/core/src/services/sftp/reader.rs index 861015ba4a44..6b4ceeff1cc4 100644 --- a/core/src/services/sftp/reader.rs +++ b/core/src/services/sftp/reader.rs @@ -63,7 +63,7 @@ impl oio::Read for SftpReader { let Some(bytes) = self .file - .read(size as u32, self.buf.split_to(size)) + .read(size as u32, self.buf.split_off(0)) .await .map_err(parse_sftp_error)? else { From f65cdfc9fa2f17649f1d2bc60bd03b3e22338f61 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Sat, 28 Sep 2024 14:25:55 +0800 Subject: [PATCH 6/6] fix sftp read Signed-off-by: Xuanwo --- core/src/services/sftp/backend.rs | 4 +--- core/src/services/sftp/reader.rs | 6 ++++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/services/sftp/backend.rs b/core/src/services/sftp/backend.rs index d6ec3c080410..8e3c103f7862 100644 --- a/core/src/services/sftp/backend.rs +++ b/core/src/services/sftp/backend.rs @@ -396,8 +396,6 @@ impl Access for SftpBackend { .await .map_err(parse_sftp_error)?; - let meta = f.metadata().await.map_err(parse_sftp_error)?; - if args.range().offset() != 0 { f.seek(SeekFrom::Start(args.range().offset())) .await @@ -406,7 +404,7 @@ impl Access for SftpBackend { Ok(( RpRead::default(), - SftpReader::new(client, f, args.range().size().and(meta.len())), + SftpReader::new(client, f, args.range().size()), )) } diff --git a/core/src/services/sftp/reader.rs b/core/src/services/sftp/reader.rs index 6b4ceeff1cc4..936e315acd13 100644 --- a/core/src/services/sftp/reader.rs +++ b/core/src/services/sftp/reader.rs @@ -50,7 +50,7 @@ impl SftpReader { impl oio::Read for SftpReader { async fn read(&mut self) -> Result { - if Some(self.read) >= self.size { + if self.read >= self.size.unwrap_or(usize::MAX) { return Ok(Buffer::new()); } @@ -71,6 +71,8 @@ impl oio::Read for SftpReader { }; self.read += bytes.len(); - Ok(Buffer::from(bytes.freeze())) + self.buf = bytes; + let bs = self.buf.split(); + Ok(Buffer::from(bs.freeze())) } }