Skip to content

feat(bindings/python): Enhance Reader and Writer #6086

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
238 changes: 117 additions & 121 deletions bindings/python/python/opendal/__base.pyi

Large diffs are not rendered by default.

19 changes: 18 additions & 1 deletion bindings/python/python/opendal/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,15 @@ class AsyncOperator(_Base):
"""
def __init__(self, scheme: str, **options: Any) -> None: ...
def layer(self, layer: Layer) -> "AsyncOperator": ...
async def open(self, path: PathBuf, mode: str) -> AsyncFile:
async def open(self, path: PathBuf, mode: str, **options: Any) -> AsyncFile:
"""Open a file at the given path for reading or writing.

Args:
path (str|Path): The path to the file.
mode (str): The mode to open the file. Can be "rb" or "wb".
**options (any): Reader options if mode == "rb" and
writer options if mode == "wb".
See the documentation `reader_with` and `writer_with` for more details.

Returns:
A file-like object that can be used to read or write the file.
Expand Down Expand Up @@ -472,6 +475,20 @@ class AsyncFile:
Args:
bs (bytes): The content to write.
"""
async def write_from(self, other: AsyncFile) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking if it's a good idea for us to provide something like shutil.copyfileobj. Is this more friendly for python users?

cc @messense and @erickguan for comments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Operator works with shutil.copyfileobj but not AsyncOperator. Perhaps something like this as a utility function:

async def async_copyfileobj(reader, writer, bufsize: int = 64 * 1024):
    while True:
        chunk = await reader.read(bufsize)
        if not chunk:
            break
        await writer.write(chunk)

It would be great if python offers an async version of shutil.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the rust based write_from seems to have an improvement of 18~28% copy throughput compared to the naive python loop based on the benchmark shared here: #5943 (comment)

is the suggestion here to provide a function with similar function signature as shutil.copyfileobj but copying is still done on the rust side?

but on the rust side, the chunk size itself is already an option for both the reader and writer,
not sure if there is a need to ask for bufsize again.. (I assume the bufsize would be most optimal if set based on the chunk size options set on the reader and writer)

"""Write the content of another AsyncFile to this file.

This method efficiently transfers data from `other` to this file
without loading the entire content into memory. It's suitable for
large files where memory usage is a concern.

Args:
other (AsyncFile): The AsyncFile to read data from.
It must be opened in binary read mode ('rb').

Raises:
IOError: If this file is not opened in a writable binary mode ('wb').
"""
async def seek(self, pos: int, whence: int = 0) -> int:
"""Set the file's current position.

Expand Down
2 changes: 1 addition & 1 deletion bindings/python/python/opendal/layers.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@ class ConcurrentLimitLayer(Layer):

@final
class MimeGuessLayer(Layer):
def __init__(self) -> None: ...
def __init__(self) -> None: ...
65 changes: 43 additions & 22 deletions bindings/python/src/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,44 +28,57 @@ pub struct Capability {
/// If operator supports stat with if none match.
pub stat_with_if_none_match: bool,

/// If operator supports read.
/// Indicates if the operator supports read operations.
pub read: bool,
/// If operator supports read with if match.
/// Indicates if conditional read operations using If-Match are supported.
pub read_with_if_match: bool,
/// If operator supports read with if none match.
/// Indicates if conditional read operations using If-None-Match are supported.
pub read_with_if_none_match: bool,
/// if operator supports read with override cache control.
/// Indicates if conditional read operations using If-Modified-Since are supported.
pub read_with_if_modified_since: bool,
/// Indicates if conditional read operations using If-Unmodified-Since are supported.
pub read_with_if_unmodified_since: bool,
/// Indicates if Cache-Control header override is supported during read operations.
pub read_with_override_cache_control: bool,
/// if operator supports read with override content disposition.
/// Indicates if Content-Disposition header override is supported during read operations.
pub read_with_override_content_disposition: bool,
/// if operator supports read with override content type.
/// Indicates if Content-Type header override is supported during read operations.
pub read_with_override_content_type: bool,
/// Indicates if versions read operations are supported.
pub read_with_version: bool,

/// If operator supports write.
/// Indicates if the operator supports write operations.
pub write: bool,
/// If operator supports write can be called in multi times.
/// Indicates if multiple write operations can be performed on the same object.
pub write_can_multi: bool,
/// If operator supports write with empty content.
/// Indicates if writing empty content is supported.
pub write_can_empty: bool,
/// If operator supports write by append.
/// Indicates if append operations are supported.
pub write_can_append: bool,
/// If operator supports write with content type.
/// Indicates if Content-Type can be specified during write operations.
pub write_with_content_type: bool,
/// If operator supports write with content disposition.
/// Indicates if Content-Disposition can be specified during write operations.
pub write_with_content_disposition: bool,
/// If operator supports write with cache control.
/// Indicates if Content-Encoding can be specified during write operations.
pub write_with_content_encoding: bool,
/// Indicates if Cache-Control can be specified during write operations.
pub write_with_cache_control: bool,
/// write_multi_max_size is the max size that services support in write_multi.
///
/// For example, AWS S3 supports 5GiB as max in write_multi.
/// Indicates if conditional write operations using If-Match are supported.
pub write_with_if_match: bool,
/// Indicates if conditional write operations using If-None-Match are supported.
pub write_with_if_none_match: bool,
/// Indicates if write operations can be conditional on object non-existence.
pub write_with_if_not_exists: bool,
/// Indicates if custom user metadata can be attached during write operations.
pub write_with_user_metadata: bool,
/// Maximum size supported for multipart uploads.
/// For example, AWS S3 supports up to 5GiB per part in multipart uploads.
pub write_multi_max_size: Option<usize>,
/// write_multi_min_size is the min size that services support in write_multi.
///
/// For example, AWS S3 requires at least 5MiB in write_multi expect the last one.
/// Minimum size required for multipart uploads (except for the last part).
/// For example, AWS S3 requires at least 5MiB per part.
pub write_multi_min_size: Option<usize>,
/// write_total_max_size is the max size that services support in write_total.
///
/// For example, Cloudflare D1 supports 1MB as max in write_total.
/// Maximum total size supported for write operations.
/// For example, Cloudflare D1 has a 1MB total size limit.
pub write_total_max_size: Option<usize>,

/// If operator supports create dir.
Expand Down Expand Up @@ -120,6 +133,9 @@ impl Capability {
read_with_override_content_disposition: capability
.read_with_override_content_disposition,
read_with_override_content_type: capability.read_with_override_content_type,
read_with_if_modified_since: capability.read_with_if_modified_since,
read_with_if_unmodified_since: capability.read_with_if_unmodified_since,
read_with_version: capability.read_with_version,
write: capability.write,
write_can_multi: capability.write_can_multi,
write_can_empty: capability.write_can_empty,
Expand All @@ -130,6 +146,11 @@ impl Capability {
write_multi_max_size: capability.write_multi_max_size,
write_multi_min_size: capability.write_multi_min_size,
write_total_max_size: capability.write_total_max_size,
write_with_content_encoding: capability.write_with_content_encoding,
write_with_if_match: capability.write_with_if_match,
write_with_if_none_match: capability.write_with_if_none_match,
write_with_if_not_exists: capability.write_with_if_not_exists,
write_with_user_metadata: capability.write_with_user_metadata,
create_dir: capability.create_dir,
delete: capability.delete,
copy: capability.copy,
Expand Down
32 changes: 29 additions & 3 deletions bindings/python/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::sync::Arc;

use futures::AsyncReadExt;
use futures::AsyncSeekExt;
use futures::AsyncWriteExt;
use pyo3::buffer::PyBuffer;
use pyo3::exceptions::PyIOError;
use pyo3::exceptions::PyValueError;
Expand Down Expand Up @@ -335,7 +336,7 @@ pub struct AsyncFile(Arc<Mutex<AsyncFileState>>);

enum AsyncFileState {
Reader(ocore::FuturesAsyncReader),
Writer(ocore::Writer),
Writer(ocore::FuturesAsyncWriter),
Closed,
}

Expand All @@ -344,7 +345,7 @@ impl AsyncFile {
Self(Arc::new(Mutex::new(AsyncFileState::Reader(reader))))
}

pub fn new_writer(writer: ocore::Writer) -> Self {
pub fn new_writer(writer: ocore::FuturesAsyncWriter) -> Self {
Self(Arc::new(Mutex::new(AsyncFileState::Writer(writer))))
}
}
Expand Down Expand Up @@ -422,13 +423,38 @@ impl AsyncFile {

let len = bs.len();
writer
.write(bs)
.write_all(&bs)
.await
.map(|_| len)
.map_err(|err| PyIOError::new_err(err.to_string()))
})
}

pub fn write_from<'p>(
&'p mut self,
py: Python<'p>,
from: PyRef<'p, Self>,
) -> PyResult<Bound<'p, PyAny>> {
let writer_state = self.0.clone();
let reader_state = from.0.clone();

future_into_py(py, async move {
let mut writer_guard = writer_state.lock().await;
let mut reader_guard = reader_state.lock().await;

match (reader_guard.deref_mut(), writer_guard.deref_mut()) {
(AsyncFileState::Reader(fr), AsyncFileState::Writer(fw)) => {
futures::io::copy(fr, fw)
.await
.map_err(|err| PyIOError::new_err(err.to_string()))
}
_ => {
Err(PyIOError::new_err("I/O operation failed: Either the destination file is not writable or the source is not readable."))
}
}
})
}

/// Change the stream position to the given byte offset.
/// offset is interpreted relative to the position indicated by `whence`.
/// The default value for whence is `SEEK_SET`. Values for `whence` are:
Expand Down
3 changes: 2 additions & 1 deletion bindings/python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ fn _opendal(py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PresignedRequest>()?;
m.add_class::<Capability>()?;

m.add_class::<WriteOptions>()?;
m.add_class::<WriterOptions>()?;
m.add_class::<ReaderOptions>()?;

// Layer module
let layers_module = PyModule::new(py, "layers")?;
Expand Down
30 changes: 21 additions & 9 deletions bindings/python/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl Operator {

/// Write bytes into given path.
#[pyo3(signature = (path, bs, **kwargs))]
pub fn write(&self, path: PathBuf, bs: Vec<u8>, kwargs: Option<WriteOptions>) -> PyResult<()> {
pub fn write(&self, path: PathBuf, bs: Vec<u8>, kwargs: Option<WriterOptions>) -> PyResult<()> {
let path = path.to_string_lossy().to_string();
let mut kwargs = kwargs.unwrap_or_default();
let mut write = self
Expand Down Expand Up @@ -322,28 +322,40 @@ impl AsyncOperator {
}

/// Open a file-like reader for the given path.
#[pyo3(signature = (path, mode, *, **kwargs))]
pub fn open<'p>(
&'p self,
py: Python<'p>,
path: PathBuf,
mode: String,
kwargs: Option<&Bound<PyDict>>,
) -> PyResult<Bound<'p, PyAny>> {
let this = self.core.clone();
let path = path.to_string_lossy().to_string();

let reader_opts = kwargs
.map(|v| v.extract::<ReaderOptions>())
.transpose()?
.unwrap_or_default();

let writer_opts = kwargs
.map(|v| v.extract::<WriterOptions>())
.transpose()?
.unwrap_or_default();

future_into_py(py, async move {
if mode == "rb" {
let r = this
.reader(&path)
.await
.map_err(format_pyerr)?
.into_futures_async_read(..)
let reader = reader_opts.create_reader(&this, path).await?;
let range = reader_opts.make_range();

let r = reader
.into_futures_async_read(range)
.await
.map_err(format_pyerr)?;
Ok(AsyncFile::new_reader(r))
} else if mode == "wb" {
let w = this.writer(&path).await.map_err(format_pyerr)?;
Ok(AsyncFile::new_writer(w))
let w = writer_opts.create_writer(&this, path).await?;
Ok(AsyncFile::new_writer(w.into_futures_async_write()))
} else {
Err(Unsupported::new_err(format!(
"OpenDAL doesn't support mode: {mode}"
Expand All @@ -369,7 +381,7 @@ impl AsyncOperator {
py: Python<'p>,
path: PathBuf,
bs: &Bound<PyBytes>,
kwargs: Option<WriteOptions>,
kwargs: Option<WriterOptions>,
) -> PyResult<Bound<'p, PyAny>> {
let mut kwargs = kwargs.unwrap_or_default();
let this = self.core.clone();
Expand Down
93 changes: 91 additions & 2 deletions bindings/python/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,106 @@
// specific language governing permissions and limitations
// under the License.

use crate::format_pyerr;
use crate::ocore::{Operator, Reader, Writer};
use dict_derive::FromPyObject;
use pyo3::prelude::PyResult;
use pyo3::pyclass;
use std::collections::HashMap;

use std::ops::Bound as RangeBound;

#[pyclass(module = "opendal")]
#[derive(FromPyObject, Default)]
pub struct ReaderOptions {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest that we don't change the class name.

pub version: Option<String>,
pub concurrent: Option<usize>,
pub chunk: Option<usize>,
pub gap: Option<usize>,
pub range_start: Option<usize>,
pub range_end: Option<usize>,
}

impl ReaderOptions {
pub fn make_range(&self) -> (RangeBound<u64>, RangeBound<u64>) {
let start_bound = self
.range_start
.map_or(RangeBound::Unbounded, |s| RangeBound::Included(s as u64));
let end_bound = self
.range_end
.map_or(RangeBound::Unbounded, |e| RangeBound::Excluded(e as u64));

(start_bound, end_bound)
}

pub async fn create_reader(&self, op: &Operator, path: String) -> PyResult<Reader> {
let mut fr = op.reader_with(&path);

if let Some(version) = &self.version {
fr = fr.version(version);
};
if let Some(concurrent) = self.concurrent {
fr = fr.concurrent(concurrent);
};
if let Some(chunk) = self.chunk {
fr = fr.chunk(chunk);
};
if let Some(gap) = self.gap {
fr = fr.gap(gap);
};

let reader = fr.await.map_err(format_pyerr)?;
Ok(reader)
}
}

#[pyclass(module = "opendal")]
#[derive(FromPyObject, Default)]
pub struct WriteOptions {
pub struct WriterOptions {
pub append: Option<bool>,
pub chunk: Option<usize>,
pub concurrent: Option<usize>,
pub cache_control: Option<String>,
pub content_type: Option<String>,
pub content_disposition: Option<String>,
pub cache_control: Option<String>,
pub content_encoding: Option<String>,
pub if_not_exists: Option<bool>,
pub user_metadata: Option<HashMap<String, String>>,
}

impl WriterOptions {
pub async fn create_writer(&self, op: &Operator, path: String) -> PyResult<Writer> {
let mut fw = op.writer_with(&path);

if let Some(append) = self.append {
fw = fw.append(append);
};
if let Some(chunk) = self.chunk {
fw = fw.chunk(chunk);
};
if let Some(concurrent) = self.concurrent {
fw = fw.concurrent(concurrent);
};
if let Some(cache_control) = &self.cache_control {
fw = fw.cache_control(cache_control);
};
if let Some(content_type) = &self.content_type {
fw = fw.content_type(content_type);
};
if let Some(content_disposition) = &self.content_disposition {
fw = fw.content_disposition(content_disposition);
};
if let Some(content_encoding) = &self.content_encoding {
fw = fw.content_encoding(content_encoding);
};
if let Some(if_not_exists) = self.if_not_exists {
fw = fw.if_not_exists(if_not_exists);
};
if let Some(user_metadata) = &self.user_metadata {
fw = fw.user_metadata(user_metadata.clone());
};

let writer = fw.await.map_err(format_pyerr)?;
Ok(writer)
}
}
Loading
Loading