diff --git a/core/src/services/hdfs_native/backend.rs b/core/src/services/hdfs_native/backend.rs index d0ac3ff9b1ea..a456b5e32d4e 100644 --- a/core/src/services/hdfs_native/backend.rs +++ b/core/src/services/hdfs_native/backend.rs @@ -26,6 +26,7 @@ use crate::*; use hdfs_native::HdfsError; use hdfs_native::WriteOptions; use log::debug; +use std::collections::HashMap; use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; @@ -89,6 +90,13 @@ impl HdfsNativeBuilder { self.config.enable_append = enable_append; self } + /// Set other configs of this backend. + /// + /// This is a map of key-value pairs. + pub fn configs(mut self, configs: Option>) -> Self { + self.config.configs = configs; + self + } } impl Builder for HdfsNativeBuilder { @@ -109,8 +117,15 @@ impl Builder for HdfsNativeBuilder { let root = normalize_root(&self.config.root.unwrap_or_default()); debug!("backend use root {}", root); - let client = hdfs_native::Client::new(name_node).map_err(parse_hdfs_error)?; - + let client = { + match self.config.configs { + None => hdfs_native::Client::new(name_node).map_err(parse_hdfs_error)?, + Some(ref configs) => { + hdfs_native::Client::new_with_config(name_node, configs.clone()) + .map_err(parse_hdfs_error)? + } + } + }; // need to check if root dir exists, create if not Ok(HdfsNativeBackend { root, diff --git a/core/src/services/hdfs_native/config.rs b/core/src/services/hdfs_native/config.rs index fcfe19a2c4c3..ad80db84fe37 100644 --- a/core/src/services/hdfs_native/config.rs +++ b/core/src/services/hdfs_native/config.rs @@ -21,6 +21,8 @@ use std::fmt::Formatter; use serde::Deserialize; use serde::Serialize; +use std::collections::HashMap; + /// Config for HdfsNative services support. #[derive(Default, Serialize, Deserialize, Clone, PartialEq, Eq)] #[serde(default)] @@ -32,6 +34,9 @@ pub struct HdfsNativeConfig { pub name_node: Option, /// enable the append capacity pub enable_append: bool, + /// other configs + #[serde(skip_serializing_if = "Option::is_none")] + pub configs: Option>, } impl Debug for HdfsNativeConfig { @@ -40,6 +45,7 @@ impl Debug for HdfsNativeConfig { .field("root", &self.root) .field("name_node", &self.name_node) .field("enable_append", &self.enable_append) + .field("configs", &self.configs) .finish_non_exhaustive() } }