Skip to content

Commit a5affa6

Browse files
authored
Merge branch 'v0.19.0-rc' into auto-v0.19.0-rc-952633dcc2d95d8ba14a805b627332ba1d998ecd
2 parents 2efc542 + 2d2015c commit a5affa6

File tree

6 files changed

+133
-55
lines changed

6 files changed

+133
-55
lines changed

e2e_test/source/cdc/cdc.check.slt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,14 @@ query V
2626
select count(*) as cnt from mytable;
2727
----
2828
4
29+
30+
# Skipped due to https://github.com/risingwavelabs/risingwave/issues/10206
31+
# query IIII
32+
# select count(*) from orders_2;
33+
# ----
34+
# 3
35+
36+
query IIII
37+
select count(*) from shipments_2;
38+
----
39+
3

e2e_test/source/cdc/cdc.load.slt

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,3 +87,39 @@ create table mytable (
8787
table.name = 'mytable',
8888
server.id = '5087'
8989
);
90+
91+
# Some columns missing and reordered (mysql-cdc)
92+
statement ok
93+
create table orders_2 (
94+
order_id int,
95+
price decimal,
96+
customer_name string,
97+
PRIMARY KEY (order_id)
98+
) with (
99+
connector = 'mysql-cdc',
100+
hostname = 'mysql',
101+
port = '3306',
102+
username = 'root',
103+
password = '123456',
104+
database.name = 'mydb',
105+
table.name = 'orders',
106+
server.id = '5087'
107+
);
108+
109+
# Some columns missing and reordered (postgres-cdc)
110+
statement ok
111+
create table shipments_2 (
112+
origin STRING,
113+
destination STRING,
114+
shipment_id INTEGER,
115+
order_id INTEGER,
116+
PRIMARY KEY (shipment_id)
117+
) with (
118+
connector = 'postgres-cdc',
119+
hostname = 'db',
120+
port = '5432',
121+
username = 'postgres',
122+
password = 'postgres',
123+
database.name = 'cdc_test',
124+
table.name = 'shipments'
125+
);

java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.sql.Connection;
2121
import java.sql.DriverManager;
2222
import java.sql.SQLException;
23+
import java.util.HashMap;
2324
import java.util.HashSet;
2425
import java.util.Map;
2526

@@ -127,31 +128,32 @@ private void validateTableSchema() throws SQLException {
127128
jdbcConnection.prepareStatement(ValidatorUtils.getSql("mysql.table_schema"))) {
128129
stmt.setString(1, userProps.get(DbzConnectorConfig.DB_NAME));
129130
stmt.setString(2, userProps.get(DbzConnectorConfig.TABLE_NAME));
130-
var res = stmt.executeQuery();
131+
132+
// Field name in lower case -> data type
133+
var schema = new HashMap<String, String>();
131134
var pkFields = new HashSet<String>();
132-
int index = 0;
135+
var res = stmt.executeQuery();
133136
while (res.next()) {
134137
var field = res.getString(1);
135138
var dataType = res.getString(2);
136139
var key = res.getString(3);
137-
138-
if (index >= tableSchema.getNumColumns()) {
139-
throw ValidatorUtils.invalidArgument("The number of columns mismatch");
140+
schema.put(field.toLowerCase(), dataType);
141+
if (key.equalsIgnoreCase("PRI")) {
142+
// RisingWave always use lower case for column name
143+
pkFields.add(field.toLowerCase());
140144
}
145+
}
141146

142-
var srcColName = tableSchema.getColumnNames()[index++];
143-
if (!srcColName.equalsIgnoreCase(field)) {
147+
// All columns defined must exist in upstream database
148+
for (var e : tableSchema.getColumnTypes().entrySet()) {
149+
var pgDataType = schema.get(e.getKey().toLowerCase());
150+
if (pgDataType == null) {
144151
throw ValidatorUtils.invalidArgument(
145-
String.format("column name mismatch: %s, [%s]", field, srcColName));
152+
"Column '" + e.getKey() + "' not found in the upstream database");
146153
}
147-
148-
if (!isDataTypeCompatible(dataType, tableSchema.getColumnType(srcColName))) {
154+
if (!isDataTypeCompatible(pgDataType, e.getValue())) {
149155
throw ValidatorUtils.invalidArgument(
150-
String.format("incompatible data type of column %s", srcColName));
151-
}
152-
if (key.equalsIgnoreCase("PRI")) {
153-
// RisingWave always use lower case for column name
154-
pkFields.add(field.toLowerCase());
156+
"Incompatible data type of column " + e.getKey());
155157
}
156158
}
157159

java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -132,29 +132,32 @@ private void validateTableSchema() throws SQLException {
132132
throw ValidatorUtils.invalidArgument("Primary key mismatch");
133133
}
134134
}
135-
// check whether source schema match table schema on upstream
135+
136+
// Check whether source schema match table schema on upstream
137+
// All columns defined must exist in upstream database
136138
try (var stmt =
137139
jdbcConnection.prepareStatement(ValidatorUtils.getSql("postgres.table_schema"))) {
138140
stmt.setString(1, schemaName);
139141
stmt.setString(2, tableName);
140142
var res = stmt.executeQuery();
141-
int index = 0;
143+
144+
// Field name in lower case -> data type
145+
Map<String, String> schema = new HashMap<>();
142146
while (res.next()) {
143147
var field = res.getString(1);
144148
var dataType = res.getString(2);
145-
if (index >= tableSchema.getNumColumns()) {
146-
throw ValidatorUtils.invalidArgument("The number of columns mismatch");
147-
}
149+
schema.put(field.toLowerCase(), dataType);
150+
}
148151

149-
var srcColName = tableSchema.getColumnNames()[index++];
150-
if (!srcColName.equalsIgnoreCase(field)) {
152+
for (var e : tableSchema.getColumnTypes().entrySet()) {
153+
var pgDataType = schema.get(e.getKey().toLowerCase());
154+
if (pgDataType == null) {
151155
throw ValidatorUtils.invalidArgument(
152-
"table column defined in the source mismatches upstream column "
153-
+ field);
156+
"Column '" + e.getKey() + "' not found in the upstream database");
154157
}
155-
if (!isDataTypeCompatible(dataType, tableSchema.getColumnType(srcColName))) {
158+
if (!isDataTypeCompatible(pgDataType, e.getValue())) {
156159
throw ValidatorUtils.invalidArgument(
157-
"incompatible data type of column " + srcColName);
160+
"Incompatible data type of column " + e.getKey());
158161
}
159162
}
160163
}

src/connector/src/parser/debezium/avro_parser.rs

Lines changed: 52 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use risingwave_common::error::{Result, RwError};
2626
use risingwave_pb::plan_common::ColumnDesc;
2727

2828
use super::operators::*;
29+
use crate::common::UpsertMessage;
2930
use crate::impl_common_parser_logic;
3031
use crate::parser::avro::util::{
3132
avro_field_to_column_desc, extract_inner_field_schema, from_avro_value,
@@ -192,42 +193,57 @@ impl DebeziumAvroParser {
192193
payload: Vec<u8>,
193194
mut writer: SourceStreamChunkRowWriter<'_>,
194195
) -> Result<WriteGuard> {
196+
// https://debezium.io/documentation/reference/stable/transformations/event-flattening.html#event-flattening-behavior:
197+
//
198+
// A database DELETE operation causes Debezium to generate two Kafka records:
199+
// - A record that contains "op": "d", the before row data, and some other fields.
200+
// - A tombstone record that has the same key as the deleted row and a value of null. This
201+
// record is a marker for Apache Kafka. It indicates that log compaction can remove
202+
// all records that have this key.
203+
204+
let UpsertMessage {
205+
primary_key: key,
206+
record: payload,
207+
} = bincode::deserialize(&payload[..]).unwrap();
208+
209+
// If message value == null, it must be a tombstone message. Emit DELETE to downstream using
210+
// message key as the DELETE row. Throw an error if message key is empty.
211+
if payload.is_empty() {
212+
let (schema_id, mut raw_payload) = extract_schema_id(&key)?;
213+
let key_schema = self.schema_resolver.get(schema_id).await?;
214+
let key = from_avro_datum(key_schema.as_ref(), &mut raw_payload, None)
215+
.map_err(|e| RwError::from(ProtocolError(e.to_string())))?;
216+
return writer.delete(|column| {
217+
let field_schema =
218+
extract_inner_field_schema(&self.inner_schema, Some(&column.name))?;
219+
from_avro_value(
220+
get_field_from_avro_value(&key, column.name.as_str())?.clone(),
221+
field_schema,
222+
)
223+
});
224+
}
225+
195226
let (schema_id, mut raw_payload) = extract_schema_id(&payload)?;
196227
let writer_schema = self.schema_resolver.get(schema_id).await?;
197-
198228
let avro_value = from_avro_datum(writer_schema.as_ref(), &mut raw_payload, None)
199229
.map_err(|e| RwError::from(ProtocolError(e.to_string())))?;
200-
201230
let op = get_field_from_avro_value(&avro_value, OP)?;
231+
202232
if let Value::String(op_str) = op {
203233
match op_str.as_str() {
204-
DEBEZIUM_UPDATE_OP => {
205-
let before = get_field_from_avro_value(&avro_value, BEFORE)
206-
.map_err(|_| {
207-
RwError::from(ProtocolError(
208-
"before is missing for updating event. If you are using postgres, you may want to try ALTER TABLE $TABLE_NAME REPLICA IDENTITY FULL;".to_string(),
209-
))
210-
})?;
211-
let after = get_field_from_avro_value(&avro_value, AFTER)?;
212-
213-
writer.update(|column| {
214-
let field_schema =
215-
extract_inner_field_schema(&self.inner_schema, Some(&column.name))?;
216-
let before = from_avro_value(
217-
get_field_from_avro_value(before, column.name.as_str())?.clone(),
218-
field_schema,
219-
)?;
220-
let after = from_avro_value(
221-
get_field_from_avro_value(after, column.name.as_str())?.clone(),
222-
field_schema,
223-
)?;
234+
DEBEZIUM_CREATE_OP | DEBEZIUM_UPDATE_OP | DEBEZIUM_READ_OP => {
235+
// - If debezium op == CREATE, emit INSERT to downstream using the after field
236+
// in the debezium value as the INSERT row.
237+
// - If debezium op == UPDATE, emit INSERT to downstream using the after field
238+
// in the debezium value as the INSERT row.
224239

225-
Ok((before, after))
226-
})
227-
}
228-
DEBEZIUM_CREATE_OP | DEBEZIUM_READ_OP => {
229240
let after = get_field_from_avro_value(&avro_value, AFTER)?;
230-
241+
if *after == Value::Null {
242+
return Err(RwError::from(ProtocolError(format!(
243+
"after is null for {} event",
244+
op_str
245+
))));
246+
}
231247
writer.insert(|column| {
232248
let field_schema =
233249
extract_inner_field_schema(&self.inner_schema, Some(&column.name))?;
@@ -238,12 +254,20 @@ impl DebeziumAvroParser {
238254
})
239255
}
240256
DEBEZIUM_DELETE_OP => {
257+
// If debezium op == DELETE, emit DELETE to downstream using the before field as
258+
// the DELETE row.
259+
241260
let before = get_field_from_avro_value(&avro_value, BEFORE)
242261
.map_err(|_| {
243262
RwError::from(ProtocolError(
244-
"before is missing for updating event. If you are using postgres, you may want to try ALTER TABLE $TABLE_NAME REPLICA IDENTITY FULL;".to_string(),
263+
"before is missing for the Debezium delete op. If you are using postgres, you may want to try ALTER TABLE $TABLE_NAME REPLICA IDENTITY FULL;".to_string(),
245264
))
246265
})?;
266+
if *before == Value::Null {
267+
return Err(RwError::from(ProtocolError(
268+
"before is null for DELETE event".to_string(),
269+
)));
270+
}
247271

248272
writer.delete(|column| {
249273
let field_schema =

src/connector/src/parser/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,9 @@ impl SpecificParserConfig {
426426
pub fn is_upsert(&self) -> bool {
427427
matches!(
428428
self,
429-
SpecificParserConfig::UpsertJson | SpecificParserConfig::UpsertAvro(_)
429+
SpecificParserConfig::UpsertJson
430+
| SpecificParserConfig::UpsertAvro(_)
431+
| SpecificParserConfig::DebeziumAvro(_)
430432
)
431433
}
432434

0 commit comments

Comments
 (0)