Skip to content

feat: initial Operator::from_uri implementation #5482

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 35 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
f2267e6
feat: initial OperatorRegistry implementation
jorgehermo9 Dec 30, 2024
f581316
feat: continue with operator registry implementation
jorgehermo9 Dec 30, 2024
b67be38
feat: register enabled services & set global registry
jorgehermo9 Dec 30, 2024
b8c4fb1
feat: glue together Operator and global OperatorRegistry
jorgehermo9 Dec 30, 2024
a50c058
feat: glue together Operator and global OperatorRegistry
jorgehermo9 Dec 30, 2024
539e6ea
feat: implement Configurator::from_uri
jorgehermo9 Dec 30, 2024
3aba98b
test: add small doctest to Operator::from_uri
jorgehermo9 Dec 30, 2024
5979898
chore: add comment
jorgehermo9 Dec 30, 2024
6eed396
chore: remove changes
jorgehermo9 Dec 30, 2024
0e582ee
chore: remove changes
jorgehermo9 Dec 30, 2024
9d730c0
chore: add license header
jorgehermo9 Dec 30, 2024
976e60c
chore: fix typo
jorgehermo9 Dec 30, 2024
a4478a9
fix: clippy lint
jorgehermo9 Dec 30, 2024
a4e3866
retrigger ci
jorgehermo9 Dec 30, 2024
443c11d
feat: drop usage of the url crate
jorgehermo9 Jan 5, 2025
e7215b4
chore: remove TODO
jorgehermo9 Jan 5, 2025
b6dd1dc
feat: wrap OperatorRegistry in Arc Mutex
jorgehermo9 Jan 5, 2025
e887d24
feat: Initialize operator registry in new method
jorgehermo9 Jan 5, 2025
41790a5
chore: remove todo
jorgehermo9 Jan 5, 2025
27d65d9
Merge branch 'main' into feature/operator-from-uri
jorgehermo9 Jan 5, 2025
3b80f70
feat: move service registration to builder methods
jorgehermo9 Jan 5, 2025
c5d1c9c
chore: fix comment typo
jorgehermo9 Jan 5, 2025
e53cdd7
chore: fix comment typo
jorgehermo9 Jan 5, 2025
79091ac
chore: add comment
jorgehermo9 Jan 5, 2025
abd614d
Merge branch 'main' into feature/operator-from-uri
asukaminato0721 May 17, 2025
e71c0f1
fix bracket
asukaminato0721 May 17, 2025
176a855
clippy
asukaminato0721 May 17, 2025
0b341d3
feat: address comments & todos
jorgehermo9 May 22, 2025
fd227a5
feat: fast committed
jorgehermo9 May 22, 2025
5ecb178
feat: derive default
jorgehermo9 May 22, 2025
7309b4b
feat: simplify operator from config
jorgehermo9 May 22, 2025
56d38d9
feat: fast committed
jorgehermo9 May 22, 2025
2fd9561
feat: fast committed
jorgehermo9 May 22, 2025
088d7d0
Merge branch 'main' into feature/operator-from-uri
jorgehermo9 May 22, 2025
41c6874
test: add tests for query_pairs
jorgehermo9 May 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/raw/http_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ mod uri;
pub use uri::new_http_uri_invalid_error;
pub use uri::percent_decode_path;
pub use uri::percent_encode_path;
pub use uri::query_pairs;
pub use uri::QueryPairsWriter;

mod error;
Expand Down
55 changes: 55 additions & 0 deletions core/src/raw/http_util/uri.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,28 @@ pub fn percent_decode_path(path: &str) -> String {
}
}

/// query_pairs will parse a query string encoded as key-value pairs separated by `&` to a vector of key-value pairs.
/// It also does percent decoding for both key and value.
///
/// Note that `?` is not allowed in the query string, and it will be treated as a part of the first key if included.
pub fn query_pairs(query: &str) -> Vec<(String, String)> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be exposed as a public API?

IIUC the only public entrypoint to the registry is OperatorRegistry::parse() which takes a URI as a string. Is there a need for users to manually parse a URI string and split its query into parts for further customization? If not, I'd suggest to keep query_pairs() pub(crate) (at least for now) to keep the API surface minimal.

Query parsing isn't really the main responsibility of OpenDAL so IMO it would make sense to keep such utilities internal if we can.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are absolutely right! Thanks for noticing this. Will change it asap

query
.split('&')
.filter_map(|pair| {
let mut iter = pair.splitn(2, '=');

let key = iter.next()?;
if key.is_empty() {
return None;
}

let value = iter.next().unwrap_or("");
Some((key, value))
})
.map(|(key, value)| (percent_decode_path(key), percent_decode_path(value)))
.collect()
}

/// QueryPairsWriter is used to write query pairs to a url.
pub struct QueryPairsWriter {
base: String,
Expand Down Expand Up @@ -186,4 +208,37 @@ mod tests {
assert_eq!(actual, expected, "{name}");
}
}

#[test]
fn test_query_pairs() {
let cases = vec![
(
"single pair",
"key=value",
vec![("key".into(), "value".into())],
),
(
"multiple pairs",
"key=value&key2=value2&key3=value3",
vec![
("key".into(), "value".into()),
("key2".into(), "value2".into()),
("key3".into(), "value3".into()),
],
),
("empty value", "key=", vec![("key".into(), "".into())]),
("empty input", "", vec![]),
(
"Unicode Characters",
"unicode%20param=%E4%BD%A0%E5%A5%BD%EF%BC%8C%E4%B8%96%E7%95%8C%EF%BC%81%E2%9D%A4",
vec![("unicode param".into(), "你好,世界!❤".into())],
),
];

for (name, input, expected) in cases {
let actual = query_pairs(input);

assert_eq!(actual, expected, "{name}");
}
}
}
20 changes: 20 additions & 0 deletions core/src/types/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

use std::fmt::Debug;

use http::Uri;
use serde::de::DeserializeOwned;
use serde::Serialize;

use crate::raw::*;
use crate::*;
use types::operator::{OperatorFactory, OperatorRegistry};

/// Builder is used to set up underlying services.
///
Expand Down Expand Up @@ -56,6 +58,15 @@ pub trait Builder: Default + 'static {

/// Consume the accessor builder to build a service.
fn build(self) -> Result<impl Access>;

/// Register this builder in the given registry.
fn register_in_registry(registry: &mut OperatorRegistry) {
let operator_factory: OperatorFactory = |uri, options| {
let builder = Self::Config::from_uri(uri, options)?.into_builder();
Ok(Operator::new(builder)?.finish())
};
registry.register(Self::SCHEME.into_static(), operator_factory)
}
}

/// Dummy implementation of builder
Expand Down Expand Up @@ -137,6 +148,15 @@ pub trait Configurator: Serialize + DeserializeOwned + Debug + 'static {
})
}

/// TODO: document this.
fn from_uri(uri: &Uri, options: impl IntoIterator<Item = (String, String)>) -> Result<Self> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it okay to accept &Uri here? This way, we don't have to parse the uri again (it is only done here right now)

Or should we change it to &str and parse the uri twice?

let query_pairs = uri.query().map(query_pairs).unwrap_or_default();

let merged_options = query_pairs.into_iter().chain(options);
Copy link
Contributor Author

@jorgehermo9 jorgehermo9 May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsure about
let merged_options = query_pairs.into_iter().chain(options);
or
let merged_options = options.chain(query_pairs.into_iter());

Which one should take precedence? query_pairs should overwrite the options, or should the options overwrite the query_pairs? I'm thinking on the case that query_pairs and options contain the same key

Right now, I think that the behaviour with query_pairs.into_iter().chain(options) is that options takes precedence and them overwrite whatever goes in query_pairs if a key is present in both


Self::from_iter(merged_options)
}

/// Convert this configuration into a service builder.
fn into_builder(self) -> Self::Builder;
}
Expand Down
3 changes: 3 additions & 0 deletions core/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ pub use operator::operator_futures;
pub use operator::BlockingOperator;
pub use operator::Operator;
pub use operator::OperatorBuilder;
pub use operator::OperatorFactory;
pub use operator::OperatorInfo;
pub use operator::OperatorRegistry;
pub use operator::GLOBAL_OPERATOR_REGISTRY;

mod builder;
pub use builder::Builder;
Expand Down
21 changes: 21 additions & 0 deletions core/src/types/operator/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use std::collections::HashMap;
use std::sync::Arc;

use super::registry::GLOBAL_OPERATOR_REGISTRY;
use crate::layers::*;
use crate::raw::*;
use crate::*;
Expand Down Expand Up @@ -95,6 +96,26 @@ impl Operator {
Ok(OperatorBuilder::new(acc))
}

/// TODO: document this.
///
/// TODO: improve those examples
/// # Examples
/// ```
/// # use anyhow::Result;
/// use opendal::Operator;
///
/// fn test() -> Result<()> {
/// Operator::from_uri("fs://?root=/tmp/test", vec![])?;
/// Ok(())
/// }
/// ```
pub fn from_uri(
uri: &str,
options: impl IntoIterator<Item = (String, String)>,
) -> Result<Self> {
GLOBAL_OPERATOR_REGISTRY.parse(uri, options)
}

/// Create a new operator from given iterator in static dispatch.
///
/// # Notes
Expand Down
5 changes: 5 additions & 0 deletions core/src/types/operator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,8 @@ pub use info::OperatorInfo;

pub mod operator_functions;
pub mod operator_futures;

mod registry;
pub use registry::OperatorFactory;
pub use registry::OperatorRegistry;
pub use registry::GLOBAL_OPERATOR_REGISTRY;
Comment on lines +37 to +39
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we expose these 3 as public api @Xuanwo? or should we pub(crate) them instead?

Or should we only expose OperatorFactory and OperatorRegistry but not GLOBAL_OPERATOR_REGISTRY?

204 changes: 204 additions & 0 deletions core/src/types/operator/registry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;
use std::sync::LazyLock;

use http::Uri;

use crate::services::*;
use crate::*;

/// TODO: document this
pub static GLOBAL_OPERATOR_REGISTRY: LazyLock<OperatorRegistry> =
LazyLock::new(OperatorRegistry::initialized);

/// TODO: document this
pub type OperatorFactory = fn(&Uri, HashMap<String, String>) -> Result<Operator>;

/// TODO: document this
#[derive(Clone, Debug, Default)]
pub struct OperatorRegistry {
registry: HashMap<String, OperatorFactory>,
}

impl OperatorRegistry {
/// TODO: document this
pub fn new() -> Self {
Self {
registry: HashMap::new(),
}
}
/// TODO: document this
pub fn initialized() -> Self {
let mut registry = Self::new();

#[cfg(feature = "services-aliyun-drive")]
AliyunDrive::register_in_registry(&mut registry);
#[cfg(feature = "services-atomicserver")]
Atomicserver::register_in_registry(&mut registry);
#[cfg(feature = "services-alluxio")]
Alluxio::register_in_registry(&mut registry);
#[cfg(feature = "services-azblob")]
Azblob::register_in_registry(&mut registry);
#[cfg(feature = "services-azdls")]
Azdls::register_in_registry(&mut registry);
#[cfg(feature = "services-azfile")]
Azfile::register_in_registry(&mut registry);
#[cfg(feature = "services-b2")]
B2::register_in_registry(&mut registry);
#[cfg(feature = "services-cacache")]
Cacache::register_in_registry(&mut registry);
#[cfg(feature = "services-cos")]
Cos::register_in_registry(&mut registry);
#[cfg(feature = "services-compfs")]
Compfs::register_in_registry(&mut registry);
#[cfg(feature = "services-dashmap")]
Dashmap::register_in_registry(&mut registry);
#[cfg(feature = "services-dropbox")]
Dropbox::register_in_registry(&mut registry);
#[cfg(feature = "services-etcd")]
Etcd::register_in_registry(&mut registry);
#[cfg(feature = "services-foundationdb")]
Foundationdb::register_in_registry(&mut registry);
#[cfg(feature = "services-fs")]
Fs::register_in_registry(&mut registry);
#[cfg(feature = "services-ftp")]
Ftp::register_in_registry(&mut registry);
#[cfg(feature = "services-gcs")]
Gcs::register_in_registry(&mut registry);
#[cfg(feature = "services-ghac")]
Ghac::register_in_registry(&mut registry);
#[cfg(feature = "services-hdfs")]
Hdfs::register_in_registry(&mut registry);
#[cfg(feature = "services-http")]
Http::register_in_registry(&mut registry);
#[cfg(feature = "services-huggingface")]
Huggingface::register_in_registry(&mut registry);
#[cfg(feature = "services-ipfs")]
Ipfs::register_in_registry(&mut registry);
#[cfg(feature = "services-ipmfs")]
Ipmfs::register_in_registry(&mut registry);
#[cfg(feature = "services-icloud")]
Icloud::register_in_registry(&mut registry);
#[cfg(feature = "services-memcached")]
Memcached::register_in_registry(&mut registry);
#[cfg(feature = "services-memory")]
Memory::register_in_registry(&mut registry);
#[cfg(feature = "services-mini-moka")]
MiniMoka::register_in_registry(&mut registry);
#[cfg(feature = "services-moka")]
Moka::register_in_registry(&mut registry);
#[cfg(feature = "services-monoiofs")]
Monoiofs::register_in_registry(&mut registry);
#[cfg(feature = "services-mysql")]
Mysql::register_in_registry(&mut registry);
#[cfg(feature = "services-obs")]
Obs::register_in_registry(&mut registry);
#[cfg(feature = "services-onedrive")]
Onedrive::register_in_registry(&mut registry);
#[cfg(feature = "services-postgresql")]
Postgresql::register_in_registry(&mut registry);
#[cfg(feature = "services-gdrive")]
Gdrive::register_in_registry(&mut registry);
#[cfg(feature = "services-oss")]
Oss::register_in_registry(&mut registry);
#[cfg(feature = "services-persy")]
Persy::register_in_registry(&mut registry);
#[cfg(feature = "services-redis")]
Redis::register_in_registry(&mut registry);
#[cfg(feature = "services-rocksdb")]
Rocksdb::register_in_registry(&mut registry);
#[cfg(feature = "services-s3")]
S3::register_in_registry(&mut registry);
#[cfg(feature = "services-seafile")]
Seafile::register_in_registry(&mut registry);
#[cfg(feature = "services-upyun")]
Upyun::register_in_registry(&mut registry);
#[cfg(feature = "services-yandex-disk")]
YandexDisk::register_in_registry(&mut registry);
#[cfg(feature = "services-pcloud")]
Pcloud::register_in_registry(&mut registry);
#[cfg(feature = "services-sftp")]
Sftp::register_in_registry(&mut registry);
#[cfg(feature = "services-sled")]
Sled::register_in_registry(&mut registry);
#[cfg(feature = "services-sqlite")]
Sqlite::register_in_registry(&mut registry);
#[cfg(feature = "services-swift")]
Swift::register_in_registry(&mut registry);
#[cfg(feature = "services-tikv")]
Tikv::register_in_registry(&mut registry);
#[cfg(feature = "services-vercel-artifacts")]
VercelArtifacts::register_in_registry(&mut registry);
#[cfg(feature = "services-vercel-blob")]
VercelBlob::register_in_registry(&mut registry);
#[cfg(feature = "services-webdav")]
Webdav::register_in_registry(&mut registry);
#[cfg(feature = "services-webhdfs")]
Webhdfs::register_in_registry(&mut registry);
#[cfg(feature = "services-redb")]
Redb::register_in_registry(&mut registry);
#[cfg(feature = "services-mongodb")]
Mongodb::register_in_registry(&mut registry);
#[cfg(feature = "services-hdfs-native")]
HdfsNative::register_in_registry(&mut registry);
#[cfg(feature = "services-surrealdb")]
Surrealdb::register_in_registry(&mut registry);
#[cfg(feature = "services-lakefs")]
Lakefs::register_in_registry(&mut registry);
#[cfg(feature = "services-nebula-graph")]
NebulaGraph::register_in_registry(&mut registry);

registry
}

/// TODO: document this
pub fn register<T: Into<String>>(&mut self, scheme: T, factory: OperatorFactory) {
self.registry.insert(scheme.into(), factory);
}

/// TODO: document this
pub fn parse(
&self,
uri: &str,
options: impl IntoIterator<Item = (String, String)>,
) -> Result<Operator> {
let parsed_uri = uri.parse::<Uri>().map_err(|err| {
Error::new(ErrorKind::ConfigInvalid, "uri is invalid")
.with_context("uri", uri)
.set_source(err)
})?;

let scheme = parsed_uri.scheme_str().ok_or_else(|| {
Error::new(ErrorKind::ConfigInvalid, "uri is missing scheme").with_context("uri", uri)
})?;

let factory = self.registry.get(scheme).ok_or_else(|| {
Error::new(
ErrorKind::ConfigInvalid,
"could not find any operator factory for the given scheme",
)
.with_context("uri", uri)
.with_context("scheme", scheme)
})?;

let options = options.into_iter().collect();

factory(&parsed_uri, options)
}
}
Loading