-
Notifications
You must be signed in to change notification settings - Fork 591
feat: initial Operator::from_uri
implementation
#5482
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 32 commits
f2267e6
f581316
b67be38
b8c4fb1
a50c058
539e6ea
3aba98b
5979898
6eed396
0e582ee
9d730c0
976e60c
a4478a9
a4e3866
443c11d
e7215b4
b6dd1dc
e887d24
41790a5
27d65d9
3b80f70
c5d1c9c
e53cdd7
79091ac
abd614d
e71c0f1
176a855
0b341d3
fd227a5
5ecb178
7309b4b
56d38d9
2fd9561
088d7d0
41c6874
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,11 +17,13 @@ | |
|
||
use std::fmt::Debug; | ||
|
||
use http::Uri; | ||
use serde::de::DeserializeOwned; | ||
use serde::Serialize; | ||
|
||
use crate::raw::*; | ||
use crate::*; | ||
use types::operator::{OperatorFactory, OperatorRegistry}; | ||
|
||
/// Builder is used to set up underlying services. | ||
/// | ||
|
@@ -56,6 +58,15 @@ pub trait Builder: Default + 'static { | |
|
||
/// Consume the accessor builder to build a service. | ||
fn build(self) -> Result<impl Access>; | ||
|
||
/// Register this builder in the given registry. | ||
fn register_in_registry(registry: &mut OperatorRegistry) { | ||
let operator_factory: OperatorFactory = |uri, options| { | ||
asukaminato0721 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let builder = Self::Config::from_uri(uri, options)?.into_builder(); | ||
Ok(Operator::new(builder)?.finish()) | ||
}; | ||
registry.register(Self::SCHEME.into_static(), operator_factory) | ||
jorgehermo9 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
/// Dummy implementation of builder | ||
|
@@ -137,6 +148,15 @@ pub trait Configurator: Serialize + DeserializeOwned + Debug + 'static { | |
}) | ||
} | ||
|
||
/// TODO: document this. | ||
fn from_uri(uri: &Uri, options: impl IntoIterator<Item = (String, String)>) -> Result<Self> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it okay to accept Or should we change it to |
||
let query_pairs = uri.query().map(query_pairs).unwrap_or_default(); | ||
|
||
let merged_options = query_pairs.into_iter().chain(options); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unsure about Which one should take precedence? query_pairs should overwrite the options, or should the options overwrite the query_pairs? I'm thinking on the case that query_pairs and options contain the same key Right now, I think that the behaviour with |
||
|
||
Self::from_iter(merged_options) | ||
} | ||
|
||
/// Convert this configuration into a service builder. | ||
fn into_builder(self) -> Self::Builder; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,3 +32,8 @@ pub use info::OperatorInfo; | |
|
||
pub mod operator_functions; | ||
pub mod operator_futures; | ||
|
||
mod registry; | ||
pub use registry::OperatorFactory; | ||
pub use registry::OperatorRegistry; | ||
pub use registry::GLOBAL_OPERATOR_REGISTRY; | ||
asukaminato0721 marked this conversation as resolved.
Show resolved
Hide resolved
Comment on lines
+37
to
+39
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we expose these 3 as public api @Xuanwo? or should we pub(crate) them instead? Or should we only expose |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,204 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
use std::collections::HashMap; | ||
use std::sync::LazyLock; | ||
|
||
use http::Uri; | ||
|
||
use crate::services::*; | ||
use crate::*; | ||
|
||
/// TODO: document this | ||
pub static GLOBAL_OPERATOR_REGISTRY: LazyLock<OperatorRegistry> = | ||
LazyLock::new(OperatorRegistry::initialized); | ||
|
||
/// TODO: document this | ||
pub type OperatorFactory = fn(&Uri, HashMap<String, String>) -> Result<Operator>; | ||
|
||
/// TODO: document this | ||
#[derive(Clone, Debug, Default)] | ||
pub struct OperatorRegistry { | ||
registry: HashMap<String, OperatorFactory>, | ||
} | ||
|
||
impl OperatorRegistry { | ||
/// TODO: document this | ||
pub fn new() -> Self { | ||
jorgehermo9 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Self { | ||
registry: HashMap::new(), | ||
} | ||
} | ||
/// TODO: document this | ||
pub fn initialized() -> Self { | ||
let mut registry = Self::new(); | ||
|
||
#[cfg(feature = "services-aliyun-drive")] | ||
jorgehermo9 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
AliyunDrive::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-atomicserver")] | ||
Atomicserver::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-alluxio")] | ||
Alluxio::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-azblob")] | ||
Azblob::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-azdls")] | ||
Azdls::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-azfile")] | ||
Azfile::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-b2")] | ||
B2::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-cacache")] | ||
Cacache::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-cos")] | ||
Cos::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-compfs")] | ||
Compfs::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-dashmap")] | ||
Dashmap::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-dropbox")] | ||
Dropbox::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-etcd")] | ||
Etcd::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-foundationdb")] | ||
Foundationdb::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-fs")] | ||
Fs::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-ftp")] | ||
Ftp::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-gcs")] | ||
Gcs::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-ghac")] | ||
Ghac::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-hdfs")] | ||
Hdfs::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-http")] | ||
Http::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-huggingface")] | ||
Huggingface::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-ipfs")] | ||
Ipfs::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-ipmfs")] | ||
Ipmfs::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-icloud")] | ||
Icloud::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-memcached")] | ||
Memcached::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-memory")] | ||
Memory::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-mini-moka")] | ||
MiniMoka::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-moka")] | ||
Moka::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-monoiofs")] | ||
Monoiofs::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-mysql")] | ||
Mysql::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-obs")] | ||
Obs::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-onedrive")] | ||
Onedrive::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-postgresql")] | ||
Postgresql::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-gdrive")] | ||
Gdrive::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-oss")] | ||
Oss::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-persy")] | ||
Persy::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-redis")] | ||
Redis::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-rocksdb")] | ||
Rocksdb::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-s3")] | ||
S3::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-seafile")] | ||
Seafile::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-upyun")] | ||
Upyun::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-yandex-disk")] | ||
YandexDisk::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-pcloud")] | ||
Pcloud::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-sftp")] | ||
Sftp::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-sled")] | ||
Sled::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-sqlite")] | ||
Sqlite::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-swift")] | ||
Swift::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-tikv")] | ||
Tikv::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-vercel-artifacts")] | ||
VercelArtifacts::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-vercel-blob")] | ||
VercelBlob::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-webdav")] | ||
Webdav::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-webhdfs")] | ||
Webhdfs::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-redb")] | ||
Redb::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-mongodb")] | ||
Mongodb::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-hdfs-native")] | ||
HdfsNative::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-surrealdb")] | ||
Surrealdb::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-lakefs")] | ||
Lakefs::register_in_registry(&mut registry); | ||
#[cfg(feature = "services-nebula-graph")] | ||
NebulaGraph::register_in_registry(&mut registry); | ||
|
||
registry | ||
} | ||
|
||
/// TODO: document this | ||
pub fn register<T: Into<String>>(&mut self, scheme: T, factory: OperatorFactory) { | ||
self.registry.insert(scheme.into(), factory); | ||
} | ||
|
||
/// TODO: document this | ||
pub fn parse( | ||
&self, | ||
uri: &str, | ||
options: impl IntoIterator<Item = (String, String)>, | ||
) -> Result<Operator> { | ||
let parsed_uri = uri.parse::<Uri>().map_err(|err| { | ||
Error::new(ErrorKind::ConfigInvalid, "uri is invalid") | ||
.with_context("uri", uri) | ||
.set_source(err) | ||
})?; | ||
|
||
let scheme = parsed_uri.scheme_str().ok_or_else(|| { | ||
Error::new(ErrorKind::ConfigInvalid, "uri is missing scheme").with_context("uri", uri) | ||
})?; | ||
|
||
let factory = self.registry.get(scheme).ok_or_else(|| { | ||
Error::new( | ||
ErrorKind::ConfigInvalid, | ||
"could not find any operator factory for the given scheme", | ||
) | ||
.with_context("uri", uri) | ||
.with_context("scheme", scheme) | ||
})?; | ||
|
||
let options = options.into_iter().collect(); | ||
|
||
factory(&parsed_uri, options) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be exposed as a public API?
IIUC the only public entrypoint to the registry is
OperatorRegistry::parse()
which takes a URI as a string. Is there a need for users to manually parse a URI string and split its query into parts for further customization? If not, I'd suggest to keepquery_pairs()
pub(crate)
(at least for now) to keep the API surface minimal.Query parsing isn't really the main responsibility of OpenDAL so IMO it would make sense to keep such utilities internal if we can.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are absolutely right! Thanks for noticing this. Will change it asap