Skip to content

feat(postgres-cdc): support replicating Postgres schema change #18760

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .typos.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ extend-exclude = [
# Ideally, we should just ignore that line: https://github.com/crate-ci/typos/issues/316
"src/common/src/cast/mod.rs",
"src/tests/simulation/tests/integration_tests/scale/shared_source.rs",
"java/connector-node/risingwave-source-cdc/src/main/java/io/debezium/connector/postgresql/*.java",
]
164 changes: 164 additions & 0 deletions e2e_test/source/cdc_inline/auto_schema_change_pg.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
control substitution on

system ok
psql -c "
DROP TABLE IF EXISTS test_schema_change;
CREATE TABLE IF NOT EXISTS test_schema_change(
id int,
name varchar(255) DEFAULT 'default_name',
age int DEFAULT 18,
v1 real DEFAULT 1.1,
v2 double precision DEFAULT 2.2,
v3 numeric DEFAULT 3.3,
v4 boolean DEFAULT false,
v5 date DEFAULT '2020-01-01',
v6 time DEFAULT '12:34:56',
v7 timestamp DEFAULT '2020-01-01 12:34:56',
v8 timestamptz DEFAULT '2020-01-01 12:34:56+00',
v9 interval DEFAULT '1 day',
v10 jsonb DEFAULT '{}',
PRIMARY KEY (id)
);
INSERT INTO test_schema_change(id,name,age) VALUES (1, 'name1', 20), (2, 'name2', 21), (3, 'name3', 22);
"

statement ok
create source pg_source with (
connector = 'postgres-cdc',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = '${PGDATABASE:postgres}',
slot.name = 'pg_slot',
auto.schema.change = 'true'
);

statement ok
create table rw_test_schema_change (*) from pg_source table 'public.test_schema_change';


# Name, Type, Is Hidden, Description
query TTTT
describe rw_test_schema_change;
----
id integer false NULL
name character varying false NULL
age integer false NULL
v1 real false NULL
v2 double precision false NULL
v3 numeric false NULL
v4 boolean false NULL
v5 date false NULL
v6 time without time zone false NULL
v7 timestamp without time zone false NULL
v8 timestamp with time zone false NULL
v9 interval false NULL
v10 jsonb false NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_test_schema_change NULL NULL

sleep 2s

query TTTTTTTTTTTTT
SELECT * from rw_test_schema_change order by id;
----
1 name1 20 1.1 2.2 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {}
2 name2 21 1.1 2.2 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {}
3 name3 22 1.1 2.2 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {}

sleep 3s

system ok
psql -c "
BEGIN;
ALTER TABLE test_schema_change ADD COLUMN v11 varchar DEFAULT 'hello';
ALTER TABLE test_schema_change ADD COLUMN v12 decimal DEFAULT '1.2345';
COMMIT;
INSERT INTO test_schema_change (id,name,age) values (11,'aaa', 11);
"

sleep 3s

# Name, Type, Is Hidden, Description
query TTTT
describe rw_test_schema_change;
----
id integer false NULL
name character varying false NULL
age integer false NULL
v1 real false NULL
v2 double precision false NULL
v3 numeric false NULL
v4 boolean false NULL
v5 date false NULL
v6 time without time zone false NULL
v7 timestamp without time zone false NULL
v8 timestamp with time zone false NULL
v9 interval false NULL
v10 jsonb false NULL
v11 character varying false NULL
v12 numeric false NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_test_schema_change NULL NULL


query TTTTTTTTTTTTTTT
SELECT * from rw_test_schema_change order by id;
----
1 name1 20 1.1 2.2 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {} hello 1.2345
2 name2 21 1.1 2.2 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {} hello 1.2345
3 name3 22 1.1 2.2 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {} hello 1.2345
11 aaa 11 1.1 2.2 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {} hello 1.2345


system ok
psql -c "
ALTER TABLE test_schema_change DROP COLUMN v1;
ALTER TABLE test_schema_change DROP COLUMN v2;
"

system ok
psql -c "
INSERT INTO test_schema_change (id,name,age) values (12,'bbb', 12);
"

sleep 3s


# Name, Type, Is Hidden, Description
query TTTT
describe rw_test_schema_change;
----
id integer false NULL
name character varying false NULL
age integer false NULL
v3 numeric false NULL
v4 boolean false NULL
v5 date false NULL
v6 time without time zone false NULL
v7 timestamp without time zone false NULL
v8 timestamp with time zone false NULL
v9 interval false NULL
v10 jsonb false NULL
v11 character varying false NULL
v12 numeric false NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_test_schema_change NULL NULL


query TTTTTTTTTTTTTTT
SELECT * from rw_test_schema_change order by id;
----
1 name1 20 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {} hello 1.2345
2 name2 21 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {} hello 1.2345
3 name3 22 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {} hello 1.2345
11 aaa 11 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {} hello 1.2345
12 bbb 12 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {} hello 1.2345


statement ok
drop source pg_source cascade;
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ var record = event.value();
.setSourceTsMs(sourceTsMs)
.build();
LOG.debug(
"offset => {}, key => {}, payload => {}",
"[schema] offset => {}, key => {}, payload => {}",
message.getOffset(),
message.getKey(),
message.getPayload());
Expand Down Expand Up @@ -269,7 +269,7 @@ var record = event.value();
.setSourceTsMs(sourceTsMs)
.build();
LOG.debug(
"offset => {}, key => {}, payload => {}",
"[data] offset => {}, key => {}, payload => {}",
message.getOffset(),
message.getKey(),
message.getPayload());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ database.password=${password}
database.dbname=${database.name}
database.sslmode=${ssl.mode:-prefer}
table.include.list=${schema.name}.${table.name}
include.schema.changes=${auto.schema.change:-false}
# The name of the PostgreSQL replication slot
slot.name=${slot.name}
# default plugin name is 'pgoutput'
Expand Down
Loading
Loading