Skip to content

Commit f36ee11

Browse files
StrikeWStrikeW
authored and
StrikeW
committed
feat(postgres-cdc): support replicating Postgres schema change (#18760)
1 parent 4149f21 commit f36ee11

File tree

14 files changed

+3722
-37
lines changed

14 files changed

+3722
-37
lines changed

.typos.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,5 +38,7 @@ extend-exclude = [
3838
"src/common/src/cast/mod.rs",
3939
"src/tests/simulation/tests/integration_tests/scale/shared_source.rs",
4040
# We don't want to fix "Divy" here, but may want in other places.
41-
"integration_tests/deltalake-sink/spark-script/run-sql-file.sh"
41+
"integration_tests/deltalake-sink/spark-script/run-sql-file.sh",
42+
# These files are copied from debezium connector, we don't want to fix their typos
43+
"java/connector-node/risingwave-source-cdc/src/main/java/io/debezium/connector/postgresql/*.java"
4244
]
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
control substitution on
2+
3+
system ok
4+
psql -c "
5+
DROP TABLE IF EXISTS test_schema_change;
6+
CREATE TABLE IF NOT EXISTS test_schema_change(
7+
id int,
8+
name varchar(255) DEFAULT 'default_name',
9+
age int DEFAULT 18,
10+
v1 real DEFAULT 1.1,
11+
v2 double precision DEFAULT 2.2,
12+
v3 numeric DEFAULT 3.3,
13+
v4 boolean DEFAULT false,
14+
v5 date DEFAULT '2020-01-01',
15+
v6 time DEFAULT '12:34:56',
16+
v7 timestamp DEFAULT '2020-01-01 12:34:56',
17+
v8 timestamptz DEFAULT '2020-01-01 12:34:56+00',
18+
v9 interval DEFAULT '1 day',
19+
v10 jsonb DEFAULT '{}',
20+
PRIMARY KEY (id)
21+
);
22+
INSERT INTO test_schema_change(id,name,age) VALUES (1, 'name1', 20), (2, 'name2', 21), (3, 'name3', 22);
23+
"
24+
25+
statement ok
26+
create source pg_source with (
27+
connector = 'postgres-cdc',
28+
hostname = '${PGHOST:localhost}',
29+
port = '${PGPORT:5432}',
30+
username = '${PGUSER:$USER}',
31+
password = '${PGPASSWORD:}',
32+
database.name = '${PGDATABASE:postgres}',
33+
slot.name = 'pg_slot',
34+
auto.schema.change = 'true'
35+
);
36+
37+
statement ok
38+
create table rw_test_schema_change (*) from pg_source table 'public.test_schema_change';
39+
40+
41+
# Name, Type, Is Hidden, Description
42+
query TTTT
43+
describe rw_test_schema_change;
44+
----
45+
id integer false NULL
46+
name character varying false NULL
47+
age integer false NULL
48+
v1 real false NULL
49+
v2 double precision false NULL
50+
v3 numeric false NULL
51+
v4 boolean false NULL
52+
v5 date false NULL
53+
v6 time without time zone false NULL
54+
v7 timestamp without time zone false NULL
55+
v8 timestamp with time zone false NULL
56+
v9 interval false NULL
57+
v10 jsonb false NULL
58+
primary key id NULL NULL
59+
distribution key id NULL NULL
60+
table description rw_test_schema_change NULL NULL
61+
62+
sleep 2s
63+
64+
query TTTTTTTTTTTTT
65+
SELECT * from rw_test_schema_change order by id;
66+
----
67+
1 name1 20 1.1 2.2 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {}
68+
2 name2 21 1.1 2.2 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {}
69+
3 name3 22 1.1 2.2 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {}
70+
71+
sleep 3s
72+
73+
system ok
74+
psql -c "
75+
BEGIN;
76+
ALTER TABLE test_schema_change ADD COLUMN v11 varchar DEFAULT 'hello';
77+
ALTER TABLE test_schema_change ADD COLUMN v12 decimal DEFAULT '1.2345';
78+
COMMIT;
79+
INSERT INTO test_schema_change (id,name,age) values (11,'aaa', 11);
80+
"
81+
82+
sleep 3s
83+
84+
# Name, Type, Is Hidden, Description
85+
query TTTT
86+
describe rw_test_schema_change;
87+
----
88+
id integer false NULL
89+
name character varying false NULL
90+
age integer false NULL
91+
v1 real false NULL
92+
v2 double precision false NULL
93+
v3 numeric false NULL
94+
v4 boolean false NULL
95+
v5 date false NULL
96+
v6 time without time zone false NULL
97+
v7 timestamp without time zone false NULL
98+
v8 timestamp with time zone false NULL
99+
v9 interval false NULL
100+
v10 jsonb false NULL
101+
v11 character varying false NULL
102+
v12 numeric false NULL
103+
primary key id NULL NULL
104+
distribution key id NULL NULL
105+
table description rw_test_schema_change NULL NULL
106+
107+
108+
query TTTTTTTTTTTTTTT
109+
SELECT * from rw_test_schema_change order by id;
110+
----
111+
1 name1 20 1.1 2.2 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {} hello 1.2345
112+
2 name2 21 1.1 2.2 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {} hello 1.2345
113+
3 name3 22 1.1 2.2 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {} hello 1.2345
114+
11 aaa 11 1.1 2.2 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {} hello 1.2345
115+
116+
117+
system ok
118+
psql -c "
119+
ALTER TABLE test_schema_change DROP COLUMN v1;
120+
ALTER TABLE test_schema_change DROP COLUMN v2;
121+
"
122+
123+
system ok
124+
psql -c "
125+
INSERT INTO test_schema_change (id,name,age) values (12,'bbb', 12);
126+
"
127+
128+
sleep 3s
129+
130+
131+
# Name, Type, Is Hidden, Description
132+
query TTTT
133+
describe rw_test_schema_change;
134+
----
135+
id integer false NULL
136+
name character varying false NULL
137+
age integer false NULL
138+
v3 numeric false NULL
139+
v4 boolean false NULL
140+
v5 date false NULL
141+
v6 time without time zone false NULL
142+
v7 timestamp without time zone false NULL
143+
v8 timestamp with time zone false NULL
144+
v9 interval false NULL
145+
v10 jsonb false NULL
146+
v11 character varying false NULL
147+
v12 numeric false NULL
148+
primary key id NULL NULL
149+
distribution key id NULL NULL
150+
table description rw_test_schema_change NULL NULL
151+
152+
153+
query TTTTTTTTTTTTTTT
154+
SELECT * from rw_test_schema_change order by id;
155+
----
156+
1 name1 20 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {} hello 1.2345
157+
2 name2 21 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {} hello 1.2345
158+
3 name3 22 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {} hello 1.2345
159+
11 aaa 11 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {} hello 1.2345
160+
12 bbb 12 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {} hello 1.2345
161+
162+
163+
statement ok
164+
drop source pg_source cascade;

java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzChangeEventConsumer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ var record = event.value();
213213
.setSourceTsMs(sourceTsMs)
214214
.build();
215215
LOG.debug(
216-
"offset => {}, key => {}, payload => {}",
216+
"[schema] offset => {}, key => {}, payload => {}",
217217
message.getOffset(),
218218
message.getKey(),
219219
message.getPayload());
@@ -270,7 +270,7 @@ var record = event.value();
270270
.setSourceTsMs(sourceTsMs)
271271
.build();
272272
LOG.debug(
273-
"offset => {}, key => {}, payload => {}",
273+
"[data] offset => {}, key => {}, payload => {}",
274274
message.getOffset(),
275275
message.getKey(),
276276
message.getPayload());

java/connector-node/risingwave-connector-service/src/main/resources/postgres.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ database.password=${password}
99
database.dbname=${database.name}
1010
database.sslmode=${ssl.mode:-prefer}
1111
table.include.list=${schema.name}.${table.name}
12+
include.schema.changes=${auto.schema.change:-false}
1213
# The name of the PostgreSQL replication slot
1314
slot.name=${slot.name}
1415
# default plugin name is 'pgoutput'

0 commit comments

Comments
 (0)