Skip to content

Commit 2ed0fa1

Browse files
authored
feat(cdc): auto schema change for mysql cdc (#17876) (cherry-pick) (#18138)
1 parent 5eb2c21 commit 2ed0fa1

File tree

71 files changed

+1086
-128
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+1086
-128
lines changed

e2e_test/error_ui/simple/main.slt

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,8 @@ Caused by these errors (recent errors listed first):
1919
2: invalid IPv4 address
2020

2121

22-
statement error
22+
statement error Failed to run the query
2323
create function int_42() returns int as int_42 using link '55.55.55.55:5555';
24-
----
25-
db error: ERROR: Failed to run the query
26-
27-
Caused by these errors (recent errors listed first):
28-
1: failed to check UDF signature
29-
2: failed to send requests to UDF service
30-
3: status: Unavailable, message: "error trying to connect: tcp connect error: deadline has elapsed", details: [], metadata: MetadataMap { headers: {} }
31-
4: transport error
32-
5: error trying to connect
33-
6: tcp connect error
34-
7: deadline has elapsed
3524

3625

3726
statement error
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
control substitution on
2+
3+
system ok
4+
mysql -e "DROP DATABASE IF EXISTS mytest; CREATE DATABASE mytest;"
5+
6+
system ok
7+
mysql -e "
8+
USE mytest;
9+
DROP TABLE IF EXISTS customers;
10+
CREATE TABLE customers(
11+
id BIGINT PRIMARY KEY,
12+
modified DATETIME,
13+
custinfo JSON
14+
);
15+
ALTER TABLE customers ADD INDEX zipsa( (CAST(custinfo->'zipcode' AS UNSIGNED ARRAY)) );
16+
"
17+
18+
statement ok
19+
create source mysql_source with (
20+
connector = 'mysql-cdc',
21+
hostname = '${MYSQL_HOST:localhost}',
22+
port = '${MYSQL_TCP_PORT:8306}',
23+
username = 'root',
24+
password = '${MYSQL_PWD:}',
25+
database.name = 'mytest',
26+
server.id = '5701',
27+
auto.schema.change = 'true'
28+
);
29+
30+
statement ok
31+
create table rw_customers (id bigint, modified timestamp, custinfo jsonb, primary key (id)) from mysql_source table 'mytest.customers';
32+
33+
# Name, Type, Is Hidden, Description
34+
query TTTT
35+
describe rw_customers;
36+
----
37+
id bigint false NULL
38+
modified timestamp without time zone false NULL
39+
custinfo jsonb false NULL
40+
primary key id NULL NULL
41+
distribution key id NULL NULL
42+
table description rw_customers NULL NULL
43+
44+
45+
system ok
46+
mysql -e "
47+
USE mytest;
48+
ALTER TABLE customers ADD COLUMN v1 VARCHAR(255);
49+
ALTER TABLE customers ADD COLUMN v2 double(5,2);
50+
"
51+
52+
sleep 3s
53+
54+
# Name, Type, Is Hidden, Description
55+
query TTTT
56+
describe rw_customers;
57+
----
58+
id bigint false NULL
59+
modified timestamp without time zone false NULL
60+
custinfo jsonb false NULL
61+
v1 character varying false NULL
62+
v2 double precision false NULL
63+
primary key id NULL NULL
64+
distribution key id NULL NULL
65+
table description rw_customers NULL NULL
66+
67+
68+
statement ok
69+
drop source mysql_source cascade;

java/connector-node/risingwave-connector-service/src/main/resources/mysql.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ table.include.list=${database.name}.${table.name:-*}
1313
schema.history.internal.store.only.captured.tables.ddl=true
1414
schema.history.internal.store.only.captured.databases.ddl=true
1515
# default to disable schema change events
16-
include.schema.changes=${debezium.include.schema.changes:-false}
16+
include.schema.changes=${auto.schema.change:-false}
1717
database.server.id=${server.id}
1818
# default to use unencrypted connection
1919
database.ssl.mode=${ssl.mode:-disabled}

proto/catalog.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,11 @@ message Table {
411411
// conflict" operations.
412412
optional uint32 version_column_index = 38;
413413

414+
// The unique identifier of the upstream table if it is a CDC table.
415+
// It will be used in auto schema change to get the Table which mapped to the
416+
// upstream table.
417+
optional string cdc_table_id = 39;
418+
414419
// Per-table catalog version, used by schema change. `None` for internal
415420
// tables and tests. Not to be confused with the global catalog version for
416421
// notification service.

proto/common.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ message WorkerNode {
5454
bool is_streaming = 1;
5555
bool is_serving = 2;
5656
bool is_unschedulable = 3;
57+
// This is used for frontend node to register its rpc address
58+
string internal_rpc_host_addr = 4;
5759
}
5860
message Resource {
5961
string rw_version = 1;

proto/ddl_service.proto

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -454,14 +454,20 @@ message TableSchemaChange {
454454
}
455455

456456
TableChangeType change_type = 1;
457-
string cdc_table_name = 2;
457+
string cdc_table_id = 2;
458458
repeated plan_common.ColumnCatalog columns = 3;
459459
}
460460

461461
message SchemaChangeEnvelope {
462462
repeated TableSchemaChange table_changes = 1;
463463
}
464464

465+
message AutoSchemaChangeRequest {
466+
SchemaChangeEnvelope schema_change = 1;
467+
}
468+
469+
message AutoSchemaChangeResponse {}
470+
465471
service DdlService {
466472
rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse);
467473
rpc DropDatabase(DropDatabaseRequest) returns (DropDatabaseResponse);
@@ -500,4 +506,5 @@ service DdlService {
500506
rpc GetTables(GetTablesRequest) returns (GetTablesResponse);
501507
rpc Wait(WaitRequest) returns (WaitResponse);
502508
rpc CommentOn(CommentOnRequest) returns (CommentOnResponse);
509+
rpc AutoSchemaChange(AutoSchemaChangeRequest) returns (AutoSchemaChangeResponse);
503510
}

proto/frontend_service.proto

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
syntax = "proto3";
2+
3+
package frontend_service;
4+
5+
import "ddl_service.proto";
6+
7+
option java_package = "com.risingwave.proto";
8+
option optimize_for = SPEED;
9+
10+
message GetTableReplacePlanRequest {
11+
uint32 database_id = 1;
12+
uint32 owner = 2;
13+
string table_name = 3;
14+
ddl_service.TableSchemaChange table_change = 4;
15+
}
16+
17+
message GetTableReplacePlanResponse {
18+
ddl_service.ReplaceTablePlan replace_plan = 1;
19+
}
20+
21+
service FrontendService {
22+
rpc GetTableReplacePlan(GetTableReplacePlanRequest) returns (GetTableReplacePlanResponse);
23+
}

proto/meta.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,8 @@ message AddWorkerNodeRequest {
312312
bool is_streaming = 2;
313313
bool is_serving = 3;
314314
bool is_unschedulable = 4;
315+
// This is used for frontend node to register its rpc address
316+
string internal_rpc_host_addr = 5;
315317
}
316318
common.WorkerType worker_type = 1;
317319
common.HostAddress host = 2;

src/batch/src/executor/source.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ impl SourceExecutor {
174174
rate_limit: None,
175175
},
176176
ConnectorProperties::default(),
177+
None,
177178
));
178179
let stream = self
179180
.source

src/batch/src/worker_manager/worker_node_manager.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,7 @@ mod tests {
430430
is_unschedulable: false,
431431
is_serving: true,
432432
is_streaming: true,
433+
internal_rpc_host_addr: "".to_string(),
433434
}),
434435
transactional_id: Some(1),
435436
..Default::default()
@@ -444,6 +445,7 @@ mod tests {
444445
is_unschedulable: false,
445446
is_serving: true,
446447
is_streaming: false,
448+
internal_rpc_host_addr: "".to_string(),
447449
}),
448450
transactional_id: Some(2),
449451
..Default::default()

src/cmd_all/src/standalone.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -505,7 +505,7 @@ mod test {
505505
],
506506
),
507507
prometheus_listener_addr: "127.0.0.1:1234",
508-
health_check_listener_addr: "127.0.0.1:6786",
508+
frontend_rpc_listener_addr: "127.0.0.1:6786",
509509
config_path: "src/config/test.toml",
510510
metrics_level: None,
511511
enable_barrier_read: None,

src/common/src/config.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1027,6 +1027,10 @@ pub struct StreamingDeveloperConfig {
10271027
/// If not specified, the value of `server.connection_pool_size` will be used.
10281028
#[serde(default = "default::developer::stream_exchange_connection_pool_size")]
10291029
pub exchange_connection_pool_size: Option<u16>,
1030+
1031+
/// A flag to allow disabling the auto schema change handling
1032+
#[serde(default = "default::developer::stream_enable_auto_schema_change")]
1033+
pub enable_auto_schema_change: bool,
10301034
}
10311035

10321036
/// The subsections `[batch.developer]`.
@@ -1903,6 +1907,10 @@ pub mod default {
19031907
pub fn enable_actor_tokio_metrics() -> bool {
19041908
false
19051909
}
1910+
1911+
pub fn stream_enable_auto_schema_change() -> bool {
1912+
true
1913+
}
19061914
}
19071915

19081916
pub use crate::system_param::default as system;

src/common/src/vnode_mapping/vnode_placement.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ mod tests {
213213
is_unschedulable: false,
214214
is_serving: true,
215215
is_streaming: false,
216+
internal_rpc_host_addr: "".to_string(),
216217
};
217218

218219
let count_same_vnode_mapping = |wm1: &WorkerSlotMapping, wm2: &WorkerSlotMapping| {
@@ -231,7 +232,7 @@ mod tests {
231232
let worker_1 = WorkerNode {
232233
id: 1,
233234
parallelism: 1,
234-
property: Some(serving_property),
235+
property: Some(serving_property.clone()),
235236
..Default::default()
236237
};
237238

@@ -246,7 +247,7 @@ mod tests {
246247
let worker_2 = WorkerNode {
247248
id: 2,
248249
parallelism: 50,
249-
property: Some(serving_property),
250+
property: Some(serving_property.clone()),
250251
..Default::default()
251252
};
252253

@@ -265,7 +266,7 @@ mod tests {
265266
let worker_3 = WorkerNode {
266267
id: 3,
267268
parallelism: 60,
268-
property: Some(serving_property),
269+
property: Some(serving_property.clone()),
269270
..Default::default()
270271
};
271272
let re_pu_mapping_2 = place_vnode(

src/compute/src/server.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ pub async fn compute_node_serve(
127127
is_streaming: opts.role.for_streaming(),
128128
is_serving: opts.role.for_serving(),
129129
is_unschedulable: false,
130+
internal_rpc_host_addr: "".to_string(),
130131
},
131132
&config.meta,
132133
)

src/config/example.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ stream_enable_arrangement_backfill = true
124124
stream_high_join_amplification_threshold = 2048
125125
stream_enable_actor_tokio_metrics = false
126126
stream_exchange_connection_pool_size = 1
127+
stream_enable_auto_schema_change = true
127128

128129
[storage]
129130
share_buffers_sync_parallelism = 1

src/connector/src/parser/debezium/schema_change.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,16 @@ impl From<&str> for TableChangeType {
6565

6666
#[derive(Debug)]
6767
pub struct TableSchemaChange {
68-
pub(crate) cdc_table_name: String,
68+
pub(crate) cdc_table_id: String,
6969
pub(crate) columns: Vec<ColumnCatalog>,
7070
pub(crate) change_type: TableChangeType,
7171
}
7272

7373
impl SchemaChangeEnvelope {
74+
pub fn is_empty(&self) -> bool {
75+
self.table_changes.is_empty()
76+
}
77+
7478
pub fn to_protobuf(&self) -> PbSchemaChangeEnvelope {
7579
let table_changes = self
7680
.table_changes
@@ -83,12 +87,19 @@ impl SchemaChangeEnvelope {
8387
.collect();
8488
PbTableSchemaChange {
8589
change_type: table_change.change_type.to_proto() as _,
86-
cdc_table_name: table_change.cdc_table_name.clone(),
90+
cdc_table_id: table_change.cdc_table_id.clone(),
8791
columns,
8892
}
8993
})
9094
.collect();
9195

9296
PbSchemaChangeEnvelope { table_changes }
9397
}
98+
99+
pub fn table_names(&self) -> Vec<String> {
100+
self.table_changes
101+
.iter()
102+
.map(|table_change| table_change.cdc_table_id.clone())
103+
.collect()
104+
}
94105
}

src/connector/src/parser/mod.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -836,8 +836,26 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
836836
}
837837
},
838838

839-
Ok(ParseResult::SchemaChange(_)) => {
840-
// TODO
839+
Ok(ParseResult::SchemaChange(schema_change)) => {
840+
if schema_change.is_empty() {
841+
continue;
842+
}
843+
844+
let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
845+
// we bubble up the schema change event to the source executor via channel,
846+
// and wait for the source executor to finish the schema change process before
847+
// parsing the following messages.
848+
if let Some(ref tx) = parser.source_ctx().schema_change_tx {
849+
tx.send((schema_change, oneshot_tx))
850+
.await
851+
.expect("send schema change to executor");
852+
match oneshot_rx.await {
853+
Ok(()) => {}
854+
Err(e) => {
855+
tracing::error!(error = %e.as_report(), "failed to wait for schema change");
856+
}
857+
}
858+
}
841859
}
842860
}
843861
}

src/connector/src/parser/plain_parser.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -508,7 +508,7 @@ mod tests {
508508
SchemaChangeEnvelope {
509509
table_changes: [
510510
TableSchemaChange {
511-
cdc_table_name: "mydb.test",
511+
cdc_table_id: "mydb.test",
512512
columns: [
513513
ColumnCatalog {
514514
column_desc: ColumnDesc {

src/connector/src/parser/unified/debezium.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ pub fn parse_schema_change(
211211
}
212212
}
213213
schema_changes.push(TableSchemaChange {
214-
cdc_table_name: id.replace('"', ""), // remove the double quotes
214+
cdc_table_id: id.replace('"', ""), // remove the double quotes
215215
columns: column_descs
216216
.into_iter()
217217
.map(|column_desc| ColumnCatalog {

0 commit comments

Comments
 (0)