Skip to content

fix: misuse of props and options in CREATE SOURCE #13762

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
08ffe34
fix duplicate keys of AwsAuth in props and options
Rossil2012 Nov 27, 2023
d46b94c
not merge options into props
Rossil2012 Dec 1, 2023
a92d67f
fix handle_create_source
Rossil2012 Dec 4, 2023
08bda7f
Merge branch 'kanzhen/fix-options-props' into kanzhen/fix-prop-option…
Rossil2012 Dec 4, 2023
da9ce5c
chores
Rossil2012 Dec 4, 2023
1295a58
consume aws options
Rossil2012 Dec 4, 2023
b19fe0b
fmt
Rossil2012 Dec 4, 2023
0d28278
add PbSource.options
Rossil2012 Dec 4, 2023
f9092b9
persist options in meta
Rossil2012 Dec 4, 2023
a914777
unit test
Rossil2012 Dec 4, 2023
cae3290
Merge branch 'main' into kanzhen/fix-prop-option-misuse
Rossil2012 Dec 7, 2023
fbb0aad
fix create table with connector
Rossil2012 Dec 7, 2023
f69c6e8
rename options and properties
Rossil2012 Dec 7, 2023
a5d73e8
rename options and properties
Rossil2012 Dec 7, 2023
2e0fdad
fix model def in meta sql store
Rossil2012 Dec 8, 2023
c4caa53
move format_encode_options into StreamSourceInfo
Rossil2012 Dec 8, 2023
dfccb7d
fix dashboard
Rossil2012 Dec 8, 2023
857cd6f
remove unnecessary wrap
Rossil2012 Dec 8, 2023
bef1e78
fmt dashboard
Rossil2012 Dec 8, 2023
3721341
fix dashboard
Rossil2012 Dec 8, 2023
eae190d
fix backward compatibility
Rossil2012 Dec 11, 2023
e28db0f
fix SCHEMA_REGISTRY_USERNAME/PASSWORD
Rossil2012 Dec 11, 2023
213f6b5
fix backward compatibility in CatalogManager
Rossil2012 Dec 11, 2023
b70b1d8
Merge branch 'main' into kanzhen/fix-prop-option-misuse
Rossil2012 Dec 13, 2023
3135e9f
fix AwsAuthProps in options
Rossil2012 Dec 13, 2023
9f817c3
dashboard: remove properties in type Relation
Rossil2012 Dec 14, 2023
185a590
copy to format_encode_options instead of move
Rossil2012 Dec 15, 2023
47e6ff3
Merge branch 'main' into kanzhen/fix-prop-option-misuse
Rossil2012 Dec 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ message Source {
optional uint64 initialized_at_epoch = 15;
optional uint64 created_at_epoch = 16;

// Options specified by the user in ENCODE FORMAT clause.
map<string, string> options = 17;

// Per-source catalog version, used by schema change.
uint64 version = 100;
}
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl BoxedExecutorBuilder for SourceExecutor {
let config = ConnectorProperties::extract(source_props).map_err(BatchError::connector)?;

let info = source_node.get_info().unwrap();
let parser_config = SpecificParserConfig::new(info, &source_node.properties)?;
let parser_config = SpecificParserConfig::new(info, &source_node.properties, None)?;

let columns: Vec<_> = source_node
.columns
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ mod test {
row_encode: PbEncodeType::Avro.into(),
..Default::default()
};
let parser_config = SpecificParserConfig::new(&info, &HashMap::new())?;
let parser_config = SpecificParserConfig::new(&info, &HashMap::new(), None)?;
AvroParserConfig::new(parser_config.encoding_config).await
}

Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/debezium/avro_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ mod tests {
row_encode: PbEncodeType::Avro.into(),
..Default::default()
};
let parser_config = SpecificParserConfig::new(&info, &props)?;
let parser_config = SpecificParserConfig::new(&info, &props, None)?;
let config = DebeziumAvroParserConfig::new(parser_config.clone().encoding_config).await?;
let columns = config
.map_to_columns()?
Expand Down
5 changes: 2 additions & 3 deletions src/connector/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,11 @@ impl JsonParser {

pub async fn schema_to_columns(
schema_location: &str,
use_schema_registry: bool,
schema_registry_auth: Option<SchemaRegistryAuth>,
props: &HashMap<String, String>,
) -> anyhow::Result<Vec<ColumnDesc>> {
let url = handle_sr_list(schema_location)?;
let schema_content = if use_schema_registry {
let schema_registry_auth = SchemaRegistryAuth::from(props);
let schema_content = if let Some(schema_registry_auth) = schema_registry_auth {
let client = Client::new(url, &schema_registry_auth)?;
let topic = get_kafka_topic(props)?;
let resolver = ConfluentSchemaResolver::new(client);
Expand Down
36 changes: 22 additions & 14 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;

use auto_enums::auto_enum;
Expand Down Expand Up @@ -872,7 +872,11 @@ pub enum ProtocolProperties {

impl SpecificParserConfig {
// The validity of (format, encode) is ensured by `extract_format_encode`
pub fn new(info: &StreamSourceInfo, props: &HashMap<String, String>) -> Result<Self> {
pub fn new(
info: &StreamSourceInfo,
props: &HashMap<String, String>,
row_options: Option<&mut BTreeMap<String, String>>,
) -> Result<Self> {
let source_struct = extract_source_struct(info)?;
let format = source_struct.format;
let encode = source_struct.encode;
Expand Down Expand Up @@ -916,12 +920,14 @@ impl SpecificParserConfig {
config.topic = get_kafka_topic(props)?.clone();
config.client_config = SchemaRegistryAuth::from(props);
} else {
config.aws_auth_props = Some(
serde_json::from_value::<AwsAuthProps>(
serde_json::to_value(props).unwrap(),
)
.map_err(|e| anyhow::anyhow!(e))?,
);
config.aws_auth_props = row_options
.map(|options| {
serde_json::from_value::<AwsAuthProps>(
serde_json::to_value(options).unwrap(),
)
.map_err(|e| anyhow::anyhow!(e))
})
.transpose()?;
}
EncodingProperties::Avro(config)
}
Expand All @@ -948,12 +954,14 @@ impl SpecificParserConfig {
config.topic = get_kafka_topic(props)?.clone();
config.client_config = SchemaRegistryAuth::from(props);
} else {
config.aws_auth_props = Some(
serde_json::from_value::<AwsAuthProps>(
serde_json::to_value(props).unwrap(),
)
.map_err(|e| anyhow::anyhow!(e))?,
);
config.aws_auth_props = row_options
.map(|options| {
serde_json::from_value::<AwsAuthProps>(
serde_json::to_value(options).unwrap(),
)
.map_err(|e| anyhow::anyhow!(e))
})
.transpose()?;
}
EncodingProperties::Protobuf(config)
}
Expand Down
8 changes: 4 additions & 4 deletions src/connector/src/parser/protobuf/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ mod test {
row_encode: PbEncodeType::Protobuf.into(),
..Default::default()
};
let parser_config = SpecificParserConfig::new(&info, &HashMap::new())?;
let parser_config = SpecificParserConfig::new(&info, &HashMap::new(), None)?;
let conf = ProtobufParserConfig::new(parser_config.encoding_config).await?;
let value = DynamicMessage::decode(conf.message_descriptor, PRE_GEN_PROTO_DATA).unwrap();

Expand Down Expand Up @@ -667,7 +667,7 @@ mod test {
row_encode: PbEncodeType::Protobuf.into(),
..Default::default()
};
let parser_config = SpecificParserConfig::new(&info, &HashMap::new())?;
let parser_config = SpecificParserConfig::new(&info, &HashMap::new(), None)?;
let conf = ProtobufParserConfig::new(parser_config.encoding_config).await?;
let columns = conf.map_to_columns().unwrap();

Expand Down Expand Up @@ -716,7 +716,7 @@ mod test {
row_encode: PbEncodeType::Protobuf.into(),
..Default::default()
};
let parser_config = SpecificParserConfig::new(&info, &HashMap::new()).unwrap();
let parser_config = SpecificParserConfig::new(&info, &HashMap::new(), None).unwrap();
let conf = ProtobufParserConfig::new(parser_config.encoding_config)
.await
.unwrap();
Expand Down Expand Up @@ -744,7 +744,7 @@ mod test {
row_encode: PbEncodeType::Protobuf.into(),
..Default::default()
};
let parser_config = SpecificParserConfig::new(&info, &HashMap::new()).unwrap();
let parser_config = SpecificParserConfig::new(&info, &HashMap::new(), None).unwrap();

ProtobufParserConfig::new(parser_config.encoding_config)
.await
Expand Down
8 changes: 6 additions & 2 deletions src/frontend/src/catalog/source_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub struct SourceCatalog {
pub info: StreamSourceInfo,
pub row_id_index: Option<usize>,
pub properties: BTreeMap<String, String>,
pub options: BTreeMap<String, String>,
pub watermark_descs: Vec<WatermarkDesc>,
pub associated_table_id: Option<TableId>,
pub definition: String,
Expand All @@ -62,6 +63,7 @@ impl SourceCatalog {
columns: self.columns.iter().map(|c| c.to_protobuf()).collect(),
pk_column_ids: self.pk_col_ids.iter().map(Into::into).collect(),
properties: self.properties.clone().into_iter().collect(),
options: self.options.clone().into_iter().collect(),
owner: self.owner,
info: Some(self.info.clone()),
watermark_descs: self.watermark_descs.clone(),
Expand Down Expand Up @@ -93,7 +95,8 @@ impl From<&PbSource> for SourceCatalog {
.into_iter()
.map(Into::into)
.collect();
let with_options = WithOptions::new(prost.properties.clone());
let properties = WithOptions::new(prost.properties.clone()).into_inner();
let options = WithOptions::new(prost.options.clone()).into_inner();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need to rename the type alias now... 🥵

Copy link
Member

@xxchan xxchan Dec 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides, properties and options are not meaningful names. Can we change them to sth like with_options and encode_format_options/row_options?

Also in all other places, including proto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about with_properties and row_options?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the most of fields in option are related to parser behavior, I prefer encode_option or parser_option

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have been calling them format options for sink 😂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parser_options

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait a minute, the syntax seems to be FORMAT ... ENCODE ... (...), so at least it should be format_encode_options 🤣

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the syntax, the options indeed look like it's for ENCODE 🤔.

From the parser's definition it seems to be called row_options, so I guess this is what it should be like when it was designed (and not only for ENCODE). So might worth asking @st1page what's the best name for it.

pub struct ConnectorSchema {
pub format: Format,
pub row_encode: Encode,
pub row_options: Vec<SqlOption>,
}

the most of fields in option are related to parser behavior, I prefer encode_option or parser_option

I'm wondering whether parser_options will be confusing to users. We need to consider this in the error messages (maybe also syntax reference, but currently it also doesn't have a name)

Copy link
Contributor

@st1page st1page Dec 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c.c. @tabVersion
I think it is the historical reason... format_encode_options LGTM.
Btw, we might need some options in the KEY ENCODE in the future.

format upsert 
encode avro (
  schema.registry = 'http://message_queue:8081/',
  schema.registry.name.strategy = 'typo')
key encode avro (
  schema.registry = 'http://message_queue:8081/',
  schema.registry.name.strategy = 'typo')
)

or maybe this syntax

format upsert 
encode avro (
  schema.registry = 'http://message_queue:8081/',
  schema.registry.name.strategy = 'typo')
INCLUDE KEY AS kafka_key ENCODE avro (
  schema.registry = 'http://message_queue:8081/',
  schema.registry.name.strategy = 'typo')
)

Considering the unknown part of the other options... I think format_encode_options is the most specific and clear name.

Copy link
Contributor

@tabVersion tabVersion Dec 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is the historical reason... format_encode_options LGTM.
Btw, we might need some options in the KEY ENCODE in the future.

+1 for format_encode_options
for KEY ENCODE, I think it is ok to add some fields in the options because the clause is optional in SQL and we don't expect it as strong as the value part.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, should we consider backward compatibility here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, legacy codes that try to find keys of format_encode_options in with_properties are identified and corrected. So unmerging format_encode_options from with_properties will not result in key-not-found-like errors. Is this the backward compatibility you suggested?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO the most important thing is: already created source can still work. It seems compute node doesn't use format_encode_options at all and thus not broken? (I'm not very sure. Correct me if I'm wrong.)

Besides is the semantics change on the frontend. I'm still not sure whether sth is changed (i.e., can work previously, but can't work now, and vice versa). But actually that's acceptable to me...

Glad to hear others' thoughts about detailed backward compatibility concerns. @BugenZhao zkss

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

already created source can still work.

Yes!

It seems compute node doesn't use format_encode_options at all and thus not broken?

Also the meta service part: does it handle the cases that the new fields do not exist?

TBH, I'm not that familiar with the connector part. 🤣 Practice is the sole criterion for testing truth.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just added a backwards-compat test for invalid WITH options #13824. Can we or do we need to add similar stuff for this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO the most important thing is: already created source can still work.

Do you mean that, for example, a user creates a source using RW 1.4, and then upgrades to RW 1.5. So codes in RW 1.5 should recognize it as legacy source and try to find option keys in properties. Is this hot-update issue you concerned?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. We do need to add more codes to handle this issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI 😇 this is my solution for rejecting unknown WITH
2e39b51#diff-a80dd5cfb3d2b493a15182ad5c447741b10820fa16b8eea2719b9c0ae6f003a4R415-R418

let columns = prost_columns.into_iter().map(ColumnCatalog::from).collect();
let row_id_index = prost.row_id_index.map(|idx| idx as _);

Expand All @@ -120,7 +123,8 @@ impl From<&PbSource> for SourceCatalog {
owner,
info: prost.info.clone().unwrap(),
row_id_index,
properties: with_options.into_inner(),
properties,
options,
watermark_descs,
associated_table_id: associated_table_id.map(|x| x.into()),
definition: prost.definition.clone(),
Expand Down
Loading