Skip to content

Commit 2e39b51

Browse files
committed
source: soft-fail with a flag to achive backwards compatibility
1 parent 22b8911 commit 2e39b51

File tree

21 files changed

+161
-34
lines changed

21 files changed

+161
-34
lines changed

src/batch/src/executor/source.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ impl BoxedExecutorBuilder for SourceExecutor {
6363
// prepare connector source
6464
let source_props: HashMap<String, String> =
6565
HashMap::from_iter(source_node.properties.clone());
66-
let config = ConnectorProperties::extract(source_props).map_err(BatchError::connector)?;
66+
let config =
67+
ConnectorProperties::extract(source_props, false).map_err(BatchError::connector)?;
6768

6869
let info = source_node.get_info().unwrap();
6970
let parser_config = SpecificParserConfig::new(info, &source_node.properties)?;

src/connector/src/sink/kafka.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ impl From<KafkaConfig> for KafkaProperties {
263263
common: val.common,
264264
rdkafka_properties_common: val.rdkafka_properties_common,
265265
rdkafka_properties_consumer: Default::default(),
266+
unknown_fields: Default::default(),
266267
}
267268
}
268269
}

src/connector/src/source/base.rs

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,9 @@ const SPLIT_TYPE_FIELD: &str = "split_type";
5555
const SPLIT_INFO_FIELD: &str = "split_info";
5656
pub const UPSTREAM_SOURCE_KEY: &str = "connector";
5757

58-
pub trait TryFromHashmap: Sized {
59-
fn try_from_hashmap(props: HashMap<String, String>) -> Result<Self>;
58+
pub trait TryFromHashmap: Sized + UnknownFields {
59+
/// Used to initialize the source properties from the raw untyped `WITH` options.
60+
fn try_from_hashmap(props: HashMap<String, String>, deny_unknown_fields: bool) -> Result<Self>;
6061
}
6162

6263
pub trait SourceProperties: TryFromHashmap + Clone {
@@ -68,10 +69,24 @@ pub trait SourceProperties: TryFromHashmap + Clone {
6869
fn init_from_pb_source(&mut self, _source: &PbSource) {}
6970
}
7071

71-
impl<P: DeserializeOwned> TryFromHashmap for P {
72-
fn try_from_hashmap(props: HashMap<String, String>) -> Result<Self> {
72+
pub trait UnknownFields {
73+
/// Unrecognized fields in the `WITH` clause.
74+
fn unknown_fields(&self) -> HashMap<String, String>;
75+
}
76+
77+
impl<P: DeserializeOwned + UnknownFields> TryFromHashmap for P {
78+
fn try_from_hashmap(props: HashMap<String, String>, deny_unknown_fields: bool) -> Result<Self> {
7379
let json_value = serde_json::to_value(props).map_err(|e| anyhow!(e))?;
74-
serde_json::from_value::<P>(json_value).map_err(|e| anyhow!(e.to_string()))
80+
let res = serde_json::from_value::<P>(json_value).map_err(|e| anyhow!(e.to_string()))?;
81+
82+
if !deny_unknown_fields || res.unknown_fields().is_empty() {
83+
Ok(res)
84+
} else {
85+
Err(anyhow!(
86+
"Unknown fields in the WITH clause: {:?}",
87+
res.unknown_fields()
88+
))
89+
}
7590
}
7691
}
7792

@@ -397,13 +412,16 @@ impl ConnectorProperties {
397412
/// Creates typed source properties from the raw `WITH` properties.
398413
///
399414
/// It checks the `connector` field, and them dispatches to the corresponding type's `try_from_hashmap` method.
400-
pub fn extract(mut props: HashMap<String, String>) -> Result<Self> {
415+
///
416+
/// `deny_unknown_fields`: Since `WITH` options are persisted in meta, we do not deny unknown fields when restoring from
417+
/// existing data to avoid breaking backwards compatibility. We only deny unknown fields when creating new sources.
418+
pub fn extract(mut props: HashMap<String, String>, deny_unknown_fields: bool) -> Result<Self> {
401419
if Self::is_new_fs_connector_hash_map(&props) {
402420
_ = props
403421
.remove(UPSTREAM_SOURCE_KEY)
404422
.ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
405423
return Ok(ConnectorProperties::S3(Box::new(
406-
S3Properties::try_from_hashmap(props)?,
424+
S3Properties::try_from_hashmap(props, deny_unknown_fields)?,
407425
)));
408426
}
409427

@@ -413,7 +431,7 @@ impl ConnectorProperties {
413431
match_source_name_str!(
414432
connector.to_lowercase().as_str(),
415433
PropType,
416-
PropType::try_from_hashmap(props).map(ConnectorProperties::from),
434+
PropType::try_from_hashmap(props, deny_unknown_fields).map(ConnectorProperties::from),
417435
|other| Err(anyhow!("connector '{}' is not supported", other))
418436
)
419437
}
@@ -683,7 +701,7 @@ mod tests {
683701
"nexmark.split.num" => "1",
684702
));
685703

686-
let props = ConnectorProperties::extract(props).unwrap();
704+
let props = ConnectorProperties::extract(props, true).unwrap();
687705

688706
if let ConnectorProperties::Nexmark(props) = props {
689707
assert_eq!(props.table_type, Some(EventType::Person));
@@ -703,7 +721,7 @@ mod tests {
703721
"broker.rewrite.endpoints" => r#"{"b-1:9092":"dns-1", "b-2:9092":"dns-2"}"#,
704722
));
705723

706-
let props = ConnectorProperties::extract(props).unwrap();
724+
let props = ConnectorProperties::extract(props, true).unwrap();
707725
if let ConnectorProperties::Kafka(k) = props {
708726
assert!(k.common.broker_rewrite_map.is_some());
709727
println!("{:?}", k.common.broker_rewrite_map);
@@ -737,7 +755,7 @@ mod tests {
737755
"table.name" => "orders",
738756
));
739757

740-
let conn_props = ConnectorProperties::extract(user_props_mysql).unwrap();
758+
let conn_props = ConnectorProperties::extract(user_props_mysql, true).unwrap();
741759
if let ConnectorProperties::MysqlCdc(c) = conn_props {
742760
assert_eq!(c.props.get("connector_node_addr").unwrap(), "localhost");
743761
assert_eq!(c.props.get("database.hostname").unwrap(), "127.0.0.1");
@@ -750,7 +768,7 @@ mod tests {
750768
panic!("extract cdc config failed");
751769
}
752770

753-
let conn_props = ConnectorProperties::extract(user_props_postgres).unwrap();
771+
let conn_props = ConnectorProperties::extract(user_props_postgres, true).unwrap();
754772
if let ConnectorProperties::PostgresCdc(c) = conn_props {
755773
assert_eq!(c.props.get("connector_node_addr").unwrap(), "localhost");
756774
assert_eq!(c.props.get("database.hostname").unwrap(), "127.0.0.1");

src/connector/src/source/cdc/mod.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,10 @@ pub struct CdcProperties<T: CdcSourceTypeTrait> {
7878
}
7979

8080
impl<T: CdcSourceTypeTrait> TryFromHashmap for CdcProperties<T> {
81-
fn try_from_hashmap(props: HashMap<String, String>) -> anyhow::Result<Self> {
81+
fn try_from_hashmap(
82+
props: HashMap<String, String>,
83+
_deny_unknown_fields: bool,
84+
) -> anyhow::Result<Self> {
8285
Ok(CdcProperties {
8386
props,
8487
table_schema: Default::default(),
@@ -124,6 +127,13 @@ where
124127
}
125128
}
126129

130+
impl<T: CdcSourceTypeTrait> crate::source::UnknownFields for CdcProperties<T> {
131+
fn unknown_fields(&self) -> HashMap<String, String> {
132+
// FIXME: CDC does not handle unknown fields yet
133+
HashMap::new()
134+
}
135+
}
136+
127137
impl<T: CdcSourceTypeTrait> CdcProperties<T> {
128138
pub fn get_source_type_pb(&self) -> SourceType {
129139
SourceType::from(T::source_type())

src/connector/src/source/datagen/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,13 @@ impl SourceProperties for DatagenProperties {
6565
const SOURCE_NAME: &'static str = DATAGEN_CONNECTOR;
6666
}
6767

68+
impl crate::source::UnknownFields for DatagenProperties {
69+
fn unknown_fields(&self) -> HashMap<String, String> {
70+
// FIXME: datagen does not handle unknown fields yet
71+
HashMap::new()
72+
}
73+
}
74+
6875
fn default_rows_per_second() -> u64 {
6976
10
7077
}

src/connector/src/source/filesystem/s3/enumerator.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ mod tests {
136136
access: None,
137137
secret: None,
138138
endpoint_url: None,
139+
unknown_fields: Default::default(),
139140
};
140141
let mut enumerator =
141142
S3SplitEnumerator::new(props.clone(), SourceEnumeratorContext::default().into())

src/connector/src/source/filesystem/s3/mod.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,20 @@
1313
// limitations under the License.
1414
mod enumerator;
1515

16+
use std::collections::HashMap;
17+
1618
pub use enumerator::S3SplitEnumerator;
1719
mod source;
1820
use serde::Deserialize;
1921
pub use source::S3FileReader;
2022

2123
use crate::common::AwsAuthProps;
2224
use crate::source::filesystem::FsSplit;
23-
use crate::source::SourceProperties;
25+
use crate::source::{SourceProperties, UnknownFields};
2426

2527
pub const S3_CONNECTOR: &str = "s3";
2628

2729
#[derive(Clone, Debug, Deserialize)]
28-
#[serde(deny_unknown_fields)]
2930
pub struct S3Properties {
3031
#[serde(rename = "s3.region_name")]
3132
pub region_name: String,
@@ -39,6 +40,9 @@ pub struct S3Properties {
3940
pub secret: Option<String>,
4041
#[serde(rename = "s3.endpoint_url")]
4142
pub endpoint_url: Option<String>,
43+
44+
#[serde(flatten)]
45+
pub unknown_fields: HashMap<String, String>,
4246
}
4347

4448
impl SourceProperties for S3Properties {
@@ -49,6 +53,12 @@ impl SourceProperties for S3Properties {
4953
const SOURCE_NAME: &'static str = S3_CONNECTOR;
5054
}
5155

56+
impl UnknownFields for S3Properties {
57+
fn unknown_fields(&self) -> HashMap<String, String> {
58+
self.unknown_fields.clone()
59+
}
60+
}
61+
5262
impl From<&S3Properties> for AwsAuthProps {
5363
fn from(props: &S3Properties) -> Self {
5464
Self {

src/connector/src/source/filesystem/s3/source/reader.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ mod tests {
264264
access: None,
265265
secret: None,
266266
endpoint_url: None,
267+
unknown_fields: Default::default(),
267268
};
268269
let mut enumerator =
269270
S3SplitEnumerator::new(props.clone(), SourceEnumeratorContext::default().into())

src/connector/src/source/google_pubsub/mod.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use serde::{Deserialize, Serialize};
15+
use std::collections::HashMap;
16+
17+
use serde::Deserialize;
1618

1719
pub mod enumerator;
1820
pub mod source;
@@ -29,8 +31,7 @@ use crate::source::SourceProperties;
2931
pub const GOOGLE_PUBSUB_CONNECTOR: &str = "google_pubsub";
3032

3133
#[serde_as]
32-
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Hash, WithOptions)]
33-
#[serde(deny_unknown_fields)]
34+
#[derive(Clone, Debug, Deserialize, WithOptions)]
3435
pub struct PubsubProperties {
3536
#[serde_as(as = "DisplayFromStr")]
3637
#[serde(rename = "pubsub.split_count")]
@@ -72,6 +73,9 @@ pub struct PubsubProperties {
7273
/// more details.
7374
#[serde(rename = "pubsub.start_snapshot")]
7475
pub start_snapshot: Option<String>,
76+
77+
#[serde(flatten)]
78+
pub unknown_fields: HashMap<String, String>,
7579
}
7680

7781
impl SourceProperties for PubsubProperties {
@@ -82,6 +86,12 @@ impl SourceProperties for PubsubProperties {
8286
const SOURCE_NAME: &'static str = GOOGLE_PUBSUB_CONNECTOR;
8387
}
8488

89+
impl crate::source::UnknownFields for PubsubProperties {
90+
fn unknown_fields(&self) -> HashMap<String, String> {
91+
self.unknown_fields.clone()
92+
}
93+
}
94+
8595
impl PubsubProperties {
8696
/// `initialize_env` sets environment variables read by the `google-cloud-pubsub` crate
8797
pub(crate) fn initialize_env(&self) {
@@ -121,6 +131,8 @@ mod tests {
121131
start_offset: None,
122132
start_snapshot: None,
123133
subscription: String::from("test-subscription"),
134+
135+
unknown_fields: Default::default(),
124136
};
125137

126138
let properties = PubsubProperties {

src/connector/src/source/kafka/mod.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashMap;
16+
1517
use serde::Deserialize;
1618
use serde_with::{serde_as, DisplayFromStr};
1719

@@ -88,7 +90,6 @@ pub struct RdKafkaPropertiesConsumer {
8890
}
8991

9092
#[derive(Clone, Debug, Deserialize, WithOptions)]
91-
#[serde(deny_unknown_fields)]
9293
pub struct KafkaProperties {
9394
/// This parameter is not intended to be exposed to users.
9495
/// This parameter specifies only for one parallelism. The parallelism of kafka source
@@ -127,6 +128,9 @@ pub struct KafkaProperties {
127128

128129
#[serde(flatten)]
129130
pub rdkafka_properties_consumer: RdKafkaPropertiesConsumer,
131+
132+
#[serde(flatten)]
133+
pub unknown_fields: HashMap<String, String>,
130134
}
131135

132136
impl SourceProperties for KafkaProperties {
@@ -137,6 +141,12 @@ impl SourceProperties for KafkaProperties {
137141
const SOURCE_NAME: &'static str = KAFKA_CONNECTOR;
138142
}
139143

144+
impl crate::source::UnknownFields for KafkaProperties {
145+
fn unknown_fields(&self) -> HashMap<String, String> {
146+
self.unknown_fields.clone()
147+
}
148+
}
149+
140150
impl KafkaProperties {
141151
pub fn set_client(&self, c: &mut rdkafka::ClientConfig) {
142152
self.rdkafka_properties_common.set_client(c);

src/connector/src/source/kinesis/mod.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ pub mod enumerator;
1616
pub mod source;
1717
pub mod split;
1818

19+
use std::collections::HashMap;
20+
1921
use serde::Deserialize;
2022
use with_options::WithOptions;
2123

@@ -28,7 +30,6 @@ use crate::source::SourceProperties;
2830
pub const KINESIS_CONNECTOR: &str = "kinesis";
2931

3032
#[derive(Clone, Debug, Deserialize, WithOptions)]
31-
#[serde(deny_unknown_fields)]
3233
pub struct KinesisProperties {
3334
#[serde(rename = "scan.startup.mode", alias = "kinesis.scan.startup.mode")]
3435
// accepted values: "latest", "earliest", "timestamp"
@@ -39,6 +40,9 @@ pub struct KinesisProperties {
3940

4041
#[serde(flatten)]
4142
pub common: KinesisCommon,
43+
44+
#[serde(flatten)]
45+
pub unknown_fields: HashMap<String, String>,
4246
}
4347

4448
impl SourceProperties for KinesisProperties {
@@ -48,3 +52,9 @@ impl SourceProperties for KinesisProperties {
4852

4953
const SOURCE_NAME: &'static str = KINESIS_CONNECTOR;
5054
}
55+
56+
impl crate::source::UnknownFields for KinesisProperties {
57+
fn unknown_fields(&self) -> HashMap<String, String> {
58+
self.unknown_fields.clone()
59+
}
60+
}

src/connector/src/source/kinesis/source/reader.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,8 @@ mod tests {
322322

323323
scan_startup_mode: None,
324324
timestamp_offset: Some(123456789098765432),
325+
326+
unknown_fields: Default::default(),
325327
};
326328
let client = KinesisSplitReader::new(
327329
properties,
@@ -355,6 +357,8 @@ mod tests {
355357

356358
scan_startup_mode: None,
357359
timestamp_offset: None,
360+
361+
unknown_fields: Default::default(),
358362
};
359363

360364
let trim_horizen_reader = KinesisSplitReader::new(

0 commit comments

Comments
 (0)