Skip to content

Commit 035697d

Browse files
committed
refactor(connector): directly deserialize config from map
Signed-off-by: Bugen Zhao <[email protected]>
1 parent 35c3aac commit 035697d

File tree

19 files changed

+86
-57
lines changed

19 files changed

+86
-57
lines changed

src/connector/src/lib.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,23 +43,20 @@ use risingwave_rpc_client::ConnectorClient;
4343
use serde::de;
4444

4545
pub mod aws_utils;
46+
pub mod common;
4647
pub mod error;
4748
mod macros;
48-
4949
pub mod parser;
5050
pub mod schema;
5151
pub mod sink;
5252
pub mod source;
53-
54-
pub mod common;
55-
56-
pub use paste::paste;
57-
53+
pub mod utils;
5854
mod with_options;
59-
6055
#[cfg(test)]
6156
mod with_options_test;
6257

58+
pub use paste::paste;
59+
6360
#[derive(Clone, Debug, Default)]
6461
pub struct ConnectorParams {
6562
pub connector_client: Option<ConnectorClient>,

src/connector/src/parser/mod.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ use crate::source::{
5757
SourceContextRef, SourceEncode, SourceFormat, SourceMeta, SourceWithStateStream, SplitId,
5858
StreamChunkWithState,
5959
};
60+
use crate::utils::DeserializeFromMap;
6061

6162
pub mod additional_columns;
6263
mod avro;
@@ -1009,10 +1010,8 @@ impl SpecificParserConfig {
10091010
config.client_config = SchemaRegistryAuth::from(&info.format_encode_options);
10101011
} else {
10111012
config.aws_auth_props = Some(
1012-
serde_json::from_value::<AwsAuthProps>(
1013-
serde_json::to_value(info.format_encode_options.clone()).unwrap(),
1014-
)
1015-
.map_err(|e| anyhow::anyhow!(e))?,
1013+
AwsAuthProps::deserialize_from_map(&info.format_encode_options)
1014+
.map_err(|e| anyhow::anyhow!(e))?,
10161015
);
10171016
}
10181017
EncodingProperties::Avro(config)
@@ -1041,10 +1040,8 @@ impl SpecificParserConfig {
10411040
config.client_config = SchemaRegistryAuth::from(&info.format_encode_options);
10421041
} else {
10431042
config.aws_auth_props = Some(
1044-
serde_json::from_value::<AwsAuthProps>(
1045-
serde_json::to_value(info.format_encode_options.clone()).unwrap(),
1046-
)
1047-
.map_err(|e| anyhow::anyhow!(e))?,
1043+
AwsAuthProps::deserialize_from_map(&info.format_encode_options)
1044+
.map_err(|e| anyhow::anyhow!(e))?,
10481045
);
10491046
}
10501047
EncodingProperties::Protobuf(config)

src/connector/src/sink/big_query.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ use crate::sink::writer::SinkWriterExt;
4545
use crate::sink::{
4646
DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWriter, SinkWriterParam,
4747
};
48+
use crate::utils::DeserializeFromMap;
4849

4950
pub const BIGQUERY_SINK: &str = "bigquery";
5051
const BIGQUERY_INSERT_MAX_NUMS: usize = 1024;
@@ -99,9 +100,8 @@ pub struct BigQueryConfig {
99100
}
100101
impl BigQueryConfig {
101102
pub fn from_hashmap(properties: HashMap<String, String>) -> Result<Self> {
102-
let config =
103-
serde_json::from_value::<BigQueryConfig>(serde_json::to_value(properties).unwrap())
104-
.map_err(|e| SinkError::Config(anyhow!(e)))?;
103+
let config = BigQueryConfig::deserialize_from_map(&properties)
104+
.map_err(|e| SinkError::Config(anyhow!(e)))?;
105105
if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
106106
return Err(SinkError::Config(anyhow!(
107107
"`{}` must be {}, or {}",

src/connector/src/sink/clickhouse.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
14+
1415
use core::fmt::Debug;
1516
use std::collections::{HashMap, HashSet};
1617
use std::time::Duration;
@@ -37,6 +38,7 @@ use crate::sink::writer::{
3738
use crate::sink::{
3839
Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
3940
};
41+
use crate::utils::DeserializeFromMap;
4042

4143
const QUERY_ENGINE: &str =
4244
"select distinct ?fields from system.tables where database = ? and table = ?";
@@ -168,8 +170,7 @@ pub struct ClickHouseSink {
168170
impl ClickHouseConfig {
169171
pub fn from_hashmap(properties: HashMap<String, String>) -> Result<Self> {
170172
let config =
171-
serde_json::from_value::<ClickHouseConfig>(serde_json::to_value(properties).unwrap())
172-
.map_err(|e| SinkError::Config(anyhow!(e)))?;
173+
Self::deserialize_from_map(&properties).map_err(|e| SinkError::Config(anyhow!(e)))?;
173174
if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
174175
return Err(SinkError::Config(anyhow!(
175176
"`{}` must be {}, or {}",

src/connector/src/sink/deltalake.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ use super::{
4545
SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION,
4646
};
4747
use crate::sink::writer::SinkWriterExt;
48+
use crate::utils::DeserializeFromMap;
4849

4950
pub const DELTALAKE_SINK: &str = "deltalake";
5051
pub const DEFAULT_REGION: &str = "us-east-1";
@@ -131,10 +132,8 @@ pub struct DeltaLakeConfig {
131132

132133
impl DeltaLakeConfig {
133134
pub fn from_hashmap(properties: HashMap<String, String>) -> Result<Self> {
134-
let config = serde_json::from_value::<DeltaLakeConfig>(
135-
serde_json::to_value(properties).map_err(|e| SinkError::DeltaLake(e.into()))?,
136-
)
137-
.map_err(|e| SinkError::Config(anyhow!(e)))?;
135+
let config =
136+
Self::deserialize_from_map(&properties).map_err(|e| SinkError::Config(anyhow!(e)))?;
138137
Ok(config)
139138
}
140139
}

src/connector/src/sink/doris.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use super::{Result, SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYP
4141
use crate::sink::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode};
4242
use crate::sink::writer::{LogSinkerOf, SinkWriterExt};
4343
use crate::sink::{DummySinkCommitCoordinator, Sink, SinkParam, SinkWriter, SinkWriterParam};
44+
use crate::utils::DeserializeFromMap;
4445

4546
pub const DORIS_SINK: &str = "doris";
4647

@@ -81,8 +82,7 @@ pub struct DorisConfig {
8182
impl DorisConfig {
8283
pub fn from_hashmap(properties: HashMap<String, String>) -> Result<Self> {
8384
let config =
84-
serde_json::from_value::<DorisConfig>(serde_json::to_value(properties).unwrap())
85-
.map_err(|e| SinkError::Config(anyhow!(e)))?;
85+
Self::deserialize_from_map(&properties).map_err(|e| SinkError::Config(anyhow!(e)))?;
8686
if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
8787
return Err(SinkError::Config(anyhow!(
8888
"`{}` must be {}, or {}",

src/connector/src/sink/iceberg/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ use crate::deserialize_bool_from_string;
5757
use crate::sink::coordinate::CoordinatedSinkWriter;
5858
use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt};
5959
use crate::sink::{Result, SinkCommitCoordinator, SinkParam};
60+
use crate::utils::DeserializeFromMap;
6061

6162
/// This iceberg sink is WIP. When it ready, we will change this name to "iceberg".
6263
pub const ICEBERG_SINK: &str = "iceberg";
@@ -132,9 +133,8 @@ where
132133

133134
impl IcebergConfig {
134135
pub fn from_hashmap(values: HashMap<String, String>) -> Result<Self> {
135-
let mut config =
136-
serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
137-
.map_err(|e| SinkError::Config(anyhow!(e)))?;
136+
let mut config = IcebergConfig::deserialize_from_map(&values)
137+
.map_err(|e| SinkError::Config(anyhow!(e)))?;
138138

139139
if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
140140
return Err(SinkError::Config(anyhow!(

src/connector/src/sink/kafka.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use crate::sink::writer::{
4343
use crate::sink::{DummySinkCommitCoordinator, Result, SinkWriterParam};
4444
use crate::source::kafka::{KafkaProperties, KafkaSplitEnumerator, PrivateLinkProducerContext};
4545
use crate::source::{SourceEnumeratorContext, SplitEnumerator};
46+
use crate::utils::DeserializeFromMap;
4647
use crate::{
4748
deserialize_duration_from_string, deserialize_u32_from_string, dispatch_sink_formatter_impl,
4849
};
@@ -236,8 +237,8 @@ pub struct KafkaConfig {
236237

237238
impl KafkaConfig {
238239
pub fn from_hashmap(values: HashMap<String, String>) -> Result<Self> {
239-
let config = serde_json::from_value::<KafkaConfig>(serde_json::to_value(values).unwrap())
240-
.map_err(|e| SinkError::Config(anyhow!(e)))?;
240+
let config =
241+
Self::deserialize_from_map(&values).map_err(|e| SinkError::Config(anyhow!(e)))?;
241242

242243
Ok(config)
243244
}

src/connector/src/sink/kinesis.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use crate::sink::writer::{
3838
AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink,
3939
};
4040
use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkWriterParam};
41+
use crate::utils::DeserializeFromMap;
4142

4243
pub const KINESIS_SINK: &str = "kinesis";
4344

@@ -136,9 +137,8 @@ pub struct KinesisSinkConfig {
136137

137138
impl KinesisSinkConfig {
138139
pub fn from_hashmap(properties: HashMap<String, String>) -> Result<Self> {
139-
let config =
140-
serde_json::from_value::<KinesisSinkConfig>(serde_json::to_value(properties).unwrap())
141-
.map_err(|e| SinkError::Config(anyhow!(e)))?;
140+
let config = KinesisSinkConfig::deserialize_from_map(&properties)
141+
.map_err(|e| SinkError::Config(anyhow!(e)))?;
142142
Ok(config)
143143
}
144144
}

src/connector/src/sink/nats.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use crate::sink::writer::{
3636
AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt,
3737
};
3838
use crate::sink::{Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY};
39+
use crate::utils::DeserializeFromMap;
3940

4041
pub const NATS_SINK: &str = "nats";
4142

@@ -66,8 +67,8 @@ pub struct NatsSinkWriter {
6667
/// Basic data types for use with the nats interface
6768
impl NatsConfig {
6869
pub fn from_hashmap(values: HashMap<String, String>) -> Result<Self> {
69-
let config = serde_json::from_value::<NatsConfig>(serde_json::to_value(values).unwrap())
70-
.map_err(|e| SinkError::Config(anyhow!(e)))?;
70+
let config =
71+
NatsConfig::deserialize_from_map(&values).map_err(|e| SinkError::Config(anyhow!(e)))?;
7172
if config.r#type != SINK_TYPE_APPEND_ONLY {
7273
Err(SinkError::Config(anyhow!(
7374
"Nats sink only support append-only mode"

src/connector/src/sink/pulsar.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use crate::sink::writer::{
3737
AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink,
3838
};
3939
use crate::sink::{DummySinkCommitCoordinator, Result};
40+
use crate::utils::DeserializeFromMap;
4041
use crate::{deserialize_duration_from_string, dispatch_sink_formatter_str_key_impl};
4142

4243
pub const PULSAR_SINK: &str = "pulsar";
@@ -127,7 +128,7 @@ pub struct PulsarConfig {
127128

128129
impl PulsarConfig {
129130
pub fn from_hashmap(values: HashMap<String, String>) -> Result<Self> {
130-
let config = serde_json::from_value::<PulsarConfig>(serde_json::to_value(values).unwrap())
131+
let config = PulsarConfig::deserialize_from_map(&values)
131132
.map_err(|e| SinkError::Config(anyhow!(e)))?;
132133

133134
Ok(config)

src/connector/src/sink/redis.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use crate::sink::writer::{
3535
AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt,
3636
};
3737
use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriterParam};
38+
use crate::utils::DeserializeFromMap;
3839

3940
pub const REDIS_SINK: &str = "redis";
4041
pub const KEY_FORMAT: &str = "key_format";
@@ -61,9 +62,8 @@ pub struct RedisConfig {
6162

6263
impl RedisConfig {
6364
pub fn from_hashmap(properties: HashMap<String, String>) -> Result<Self> {
64-
let config =
65-
serde_json::from_value::<RedisConfig>(serde_json::to_value(properties).unwrap())
66-
.map_err(|e| SinkError::Config(anyhow!("{:?}", e)))?;
65+
let config = RedisConfig::deserialize_from_map(&properties)
66+
.map_err(|e| SinkError::Config(anyhow!("{:?}", e)))?;
6767
Ok(config)
6868
}
6969
}

src/connector/src/sink/starrocks.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use super::writer::LogSinkerOf;
3838
use super::{SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT};
3939
use crate::sink::writer::SinkWriterExt;
4040
use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam};
41+
use crate::utils::DeserializeFromMap;
4142

4243
pub const STARROCKS_SINK: &str = "starrocks";
4344
const STARROCK_MYSQL_PREFER_SOCKET: &str = "false";
@@ -72,9 +73,8 @@ pub struct StarrocksConfig {
7273
}
7374
impl StarrocksConfig {
7475
pub fn from_hashmap(properties: HashMap<String, String>) -> Result<Self> {
75-
let config =
76-
serde_json::from_value::<StarrocksConfig>(serde_json::to_value(properties).unwrap())
77-
.map_err(|e| SinkError::Config(anyhow!(e)))?;
76+
let config = StarrocksConfig::deserialize_from_map(&properties)
77+
.map_err(|e| SinkError::Config(anyhow!(e)))?;
7878
if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
7979
return Err(SinkError::Config(anyhow!(
8080
"`{}` must be {}, or {}",

src/connector/src/source/base.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ use crate::parser::ParserConfig;
4848
pub(crate) use crate::source::common::CommonSplitReader;
4949
use crate::source::filesystem::FsPageItem;
5050
use crate::source::monitor::EnumeratorMetrics;
51+
use crate::utils::DeserializeFromMap;
5152
use crate::with_options::WithOptions;
5253
use crate::{
5354
dispatch_source_prop, dispatch_split_impl, for_all_sources, impl_connector_properties,
@@ -84,8 +85,7 @@ pub trait UnknownFields {
8485

8586
impl<P: DeserializeOwned + UnknownFields> TryFromHashmap for P {
8687
fn try_from_hashmap(props: HashMap<String, String>, deny_unknown_fields: bool) -> Result<Self> {
87-
let json_value = serde_json::to_value(props).map_err(|e| anyhow!(e))?;
88-
let res = serde_json::from_value::<P>(json_value).map_err(|e| anyhow!(e.to_string()))?;
88+
let res = P::deserialize_from_map(&props).map_err(|e| anyhow!(e.to_string()))?;
8989

9090
if !deny_unknown_fields || res.unknown_fields().is_empty() {
9191
Ok(res)

src/connector/src/source/cdc/external/mod.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use crate::error::ConnectorError;
3636
use crate::parser::mysql_row_to_owned_row;
3737
use crate::source::cdc::external::mock_external_table::MockExternalTableReader;
3838
use crate::source::cdc::external::postgres::{PostgresExternalTableReader, PostgresOffset};
39+
use crate::utils::DeserializeFromMap;
3940

4041
pub type ConnectorResult<T> = std::result::Result<T, ConnectorError>;
4142

@@ -299,10 +300,7 @@ impl MySqlExternalTableReader {
299300
) -> ConnectorResult<Self> {
300301
tracing::debug!(?rw_schema, "create mysql external table reader");
301302

302-
let config = serde_json::from_value::<ExternalTableConfig>(
303-
serde_json::to_value(with_properties).unwrap(),
304-
)
305-
.map_err(|e| {
303+
let config = ExternalTableConfig::deserialize_from_map(&with_properties).map_err(|e| {
306304
ConnectorError::Config(anyhow!("fail to extract mysql connector properties: {}", e))
307305
})?;
308306

src/connector/src/source/cdc/external/postgres.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use crate::source::cdc::external::{
3333
CdcOffset, ConnectorResult, DebeziumOffset, ExternalTableConfig, ExternalTableReader,
3434
SchemaTableName,
3535
};
36+
use crate::utils::DeserializeFromMap;
3637

3738
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
3839
pub struct PostgresOffset {
@@ -128,10 +129,7 @@ impl PostgresExternalTableReader {
128129
) -> ConnectorResult<Self> {
129130
tracing::debug!(?rw_schema, "create postgres external table reader");
130131

131-
let config = serde_json::from_value::<ExternalTableConfig>(
132-
serde_json::to_value(properties).unwrap(),
133-
)
134-
.map_err(|e| {
132+
let config = ExternalTableConfig::deserialize_from_map(&properties).map_err(|e| {
135133
ConnectorError::Config(anyhow!(
136134
"fail to extract postgres connector properties: {}",
137135
e

src/connector/src/source/kafka/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ mod test {
189189
use maplit::hashmap;
190190

191191
use super::*;
192+
use crate::utils::DeserializeFromMap;
192193

193194
#[test]
194195
fn test_parse_config_consumer_common() {
@@ -210,8 +211,7 @@ mod test {
210211
"properties.fetch.queue.backoff.ms".to_string() => "114514".to_string(),
211212
};
212213

213-
let props: KafkaProperties =
214-
serde_json::from_value(serde_json::to_value(config).unwrap()).unwrap();
214+
let props = KafkaProperties::deserialize_from_map(&config).unwrap();
215215

216216
assert_eq!(props.scan_startup_mode, Some("earliest".to_string()));
217217
assert_eq!(

src/connector/src/source/kinesis/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ mod test {
7070
use maplit::hashmap;
7171

7272
use super::*;
73+
use crate::utils::DeserializeFromMap;
7374

7475
#[test]
7576
fn test_parse_kinesis_timestamp_offset() {
@@ -80,8 +81,7 @@ mod test {
8081
"scan.startup.timestamp.millis".to_string() => "123456789".to_string(),
8182
};
8283

83-
let kinesis_props: KinesisProperties =
84-
serde_json::from_value(serde_json::to_value(props).unwrap()).unwrap();
84+
let kinesis_props = KinesisProperties::deserialize_from_map(&props).unwrap();
8585
assert_eq!(kinesis_props.timestamp_offset, Some(123456789));
8686
}
8787
}

0 commit comments

Comments
 (0)