Skip to content

Commit 80ad7df

Browse files
authored
feat: mongo parse strong schema (#20100)
Signed-off-by: cl <[email protected]> Signed-off-by: cailue <[email protected]> Signed-off-by: 蔡略 <[email protected]>
1 parent 69d2c0e commit 80ad7df

File tree

8 files changed

+1407
-50
lines changed

8 files changed

+1407
-50
lines changed

e2e_test/source_legacy/basic/kafka.slt

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,20 @@ WITH (
386386
topic = 'debezium_mongo_json_customers')
387387
FORMAT DEBEZIUM_MONGO ENCODE JSON
388388

389+
statement ok
390+
CREATE TABLE mongo_customers_strong_schema (
391+
_id BIGINT PRIMARY KEY,
392+
email VARCHAR,
393+
first_name VARCHAR,
394+
last_name VARCHAR
395+
)
396+
WITH (
397+
connector = 'kafka',
398+
properties.bootstrap.server = 'message_queue:29092',
399+
topic = 'debezium_mongo_json_customers'
400+
)
401+
FORMAT DEBEZIUM_MONGO ENCODE JSON ( strong_schema = true )
402+
389403
statement ok
390404
CREATE TABLE mongo_customers_no_schema_field (
391405
_id BIGINT PRIMARY KEY,
@@ -397,6 +411,20 @@ WITH (
397411
topic = 'debezium_mongo_json_customers_no_schema_field')
398412
FORMAT DEBEZIUM_MONGO ENCODE JSON
399413

414+
statement ok
415+
CREATE TABLE mongo_customers_no_schema_field_strong_schema (
416+
_id BIGINT PRIMARY KEY,
417+
email VARCHAR,
418+
first_name VARCHAR,
419+
last_name VARCHAR
420+
)
421+
WITH (
422+
connector = 'kafka',
423+
properties.bootstrap.server = 'message_queue:29092',
424+
topic = 'debezium_mongo_json_customers_no_schema_field'
425+
)
426+
FORMAT DEBEZIUM_MONGO ENCODE JSON ( strong_schema = true )
427+
400428
statement ok
401429
CREATE TABLE upsert_students_default_key (
402430
"ID" INT,
@@ -749,6 +777,20 @@ ORDER BY
749777
1003 {"_id": {"$numberLong": "1003"}, "email": "[email protected]", "first_name": "Edward", "last_name": "Walker"}
750778
1004 {"_id": {"$numberLong": "1004"}, "email": "[email protected]", "first_name": "Anne", "last_name": "Kretchmar"}
751779

780+
query II
781+
SELECT
782+
*
783+
FROM
784+
mongo_customers_strong_schema
785+
ORDER BY
786+
_id;
787+
----
788+
1001 [email protected] Sally Thomas
789+
1002 [email protected] George Bailey
790+
1003 [email protected] Edward Walker
791+
1004 [email protected] Anne Kretchmar
792+
793+
752794
query II
753795
SELECT
754796
*
@@ -763,6 +805,19 @@ ORDER BY
763805
1004 {"_id": {"$numberLong": "1004"}, "email": "[email protected]", "first_name": "Anne", "last_name": "Kretchmar"}
764806

765807

808+
query II
809+
SELECT
810+
*
811+
FROM
812+
mongo_customers_no_schema_field_strong_schema
813+
ORDER BY
814+
_id;
815+
----
816+
1001 [email protected] Sally Thomas
817+
1002 [email protected] George Bailey
818+
1003 [email protected] Edward Walker
819+
1004 [email protected] Anne Kretchmar
820+
766821
query II
767822
SELECT
768823
"ID", "firstName", "lastName", "age", "height", "weight"

src/connector/src/parser/config.rs

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::collections::HashSet;
15+
use std::collections::{BTreeMap, HashSet};
1616

1717
use risingwave_common::bail;
1818
use risingwave_common::secret::LocalSecretManager;
@@ -27,6 +27,7 @@ use crate::error::ConnectorResult;
2727
use crate::parser::PROTOBUF_MESSAGES_AS_JSONB;
2828
use crate::schema::AWS_GLUE_SCHEMA_ARN_KEY;
2929
use crate::schema::schema_registry::SchemaRegistryAuth;
30+
use crate::source::cdc::CDC_MONGODB_STRONG_SCHEMA_KEY;
3031
use crate::source::{SourceColumnDesc, SourceEncode, SourceFormat, extract_source_struct};
3132

3233
/// Note: this is created in `SourceReader::build_stream`
@@ -60,7 +61,7 @@ pub enum EncodingProperties {
6061
Protobuf(ProtobufProperties),
6162
Csv(CsvProperties),
6263
Json(JsonProperties),
63-
MongoJson,
64+
MongoJson(MongoProperties),
6465
Bytes(BytesProperties),
6566
Parquet,
6667
Native,
@@ -262,10 +263,8 @@ impl SpecificParserConfig {
262263
)?,
263264
}),
264265
(SourceFormat::DebeziumMongo, SourceEncode::Json) => {
265-
EncodingProperties::Json(JsonProperties {
266-
use_schema_registry: false,
267-
timestamptz_handling: None,
268-
})
266+
let props = MongoProperties::from(&format_encode_options_with_secret);
267+
EncodingProperties::MongoJson(props)
269268
}
270269
(SourceFormat::Plain, SourceEncode::Bytes) => {
271270
EncodingProperties::Bytes(BytesProperties { column_name: None })
@@ -350,3 +349,22 @@ pub struct JsonProperties {
350349
pub struct BytesProperties {
351350
pub column_name: Option<String>,
352351
}
352+
353+
#[derive(Debug, Default, Clone)]
354+
pub struct MongoProperties {
355+
pub strong_schema: bool,
356+
}
357+
358+
impl MongoProperties {
359+
pub fn new(strong_schema: bool) -> Self {
360+
Self { strong_schema }
361+
}
362+
}
363+
impl From<&BTreeMap<String, String>> for MongoProperties {
364+
fn from(config: &BTreeMap<String, String>) -> Self {
365+
let strong_schema = config
366+
.get(CDC_MONGODB_STRONG_SCHEMA_KEY)
367+
.is_some_and(|k| k.eq_ignore_ascii_case("true"));
368+
Self { strong_schema }
369+
}
370+
}

0 commit comments

Comments
 (0)