diff --git a/integration_tests/mysql-sink/create_mv.sql b/integration_tests/mysql-sink/create_mv.sql index 5196258a669c9..5d6527a77aeb9 100644 --- a/integration_tests/mysql-sink/create_mv.sql +++ b/integration_tests/mysql-sink/create_mv.sql @@ -14,4 +14,32 @@ FROM jdbc.url = 'jdbc:mysql://mysql:3306/mydb?user=root&password=123456', table.name = 'target_count', type = 'upsert' - ); \ No newline at end of file + ); + +-- ingest the table back to RW +CREATE TABLE rw_types ( + id BIGINT PRIMARY KEY, + varchar_column VARCHAR, + text_column TEXT, + integer_column INTEGER, + smallint_column SMALLINT, + bigint_column BIGINT, + decimal_column DECIMAL, + real_column REAL, + double_column DOUBLE PRECISION, + boolean_column BOOLEAN, + date_column DATE, + time_column TIME, + timestamp_column TIMESTAMP, + jsonb_column JSONB, + bytea_column BYTEA +) WITH ( + connector = 'mysql-cdc', + hostname = 'mysql', + port = '3306', + username = 'root', + password = '123456', + database.name = 'mydb', + table.name = 'data_types', + server.id = '3' +); diff --git a/integration_tests/mysql-sink/create_source.sql b/integration_tests/mysql-sink/create_source.sql index 7a9e3d3add4c8..d4a3ade260bca 100644 --- a/integration_tests/mysql-sink/create_source.sql +++ b/integration_tests/mysql-sink/create_source.sql @@ -11,4 +11,39 @@ CREATE SOURCE user_behaviors ( topic = 'user_behaviors', properties.bootstrap.server = 'message_queue:29092', scan.startup.mode = 'earliest' -) ROW FORMAT JSON; \ No newline at end of file +) ROW FORMAT JSON; + +CREATE TABLE data_types ( + id BIGINT PRIMARY KEY, + varchar_column VARCHAR, + text_column TEXT, + integer_column INTEGER, + smallint_column SMALLINT, + bigint_column BIGINT, + decimal_column DECIMAL, + real_column REAL, + double_column DOUBLE PRECISION, + boolean_column BOOLEAN, + date_column DATE, + time_column TIME, + timestamp_column TIMESTAMP, + jsonb_column JSONB, + bytea_column BYTEA +); + +CREATE SINK data_types_mysql_sink +FROM + data_types WITH ( + connector = 'jdbc', + jdbc.url = 'jdbc:mysql://mysql:3306/mydb?user=root&password=123456', + table.name = 'data_types', + type = 'upsert' + ); + +INSERT INTO data_types (id, varchar_column, text_column, integer_column, smallint_column, bigint_column, decimal_column, real_column, double_column, boolean_column, date_column, time_column, timestamp_column, jsonb_column, bytea_column) +VALUES + (1, 'Varchar value 1', 'Text value 1', 123, 456, 789, 12.34, 56.78, 90.12, TRUE, '2023-05-22', '12:34:56', '2023-05-22 12:34:56', '{"key": "value"}', E'\\xDEADBEEF'), + (2, 'Varchar value 2', 'Text value 2', 234, 567, 890, 23.45, 67.89, 01.23, FALSE, '2023-05-23', '23:45:01', '2023-05-23 23:45:01', '{"key": "value2"}', E'\\xFEEDBEEF'), + (3, 'Varchar value 3', 'Text value 3', 345, 678, 901, 34.56, 78.90, 12.34, TRUE, '2023-05-24', '12:34:56', '2023-05-24 12:34:56', '{"key": "value3"}', E'\\xCAFEBABE'), + (4, 'Varchar value 4', 'Text value 4', 456, 789, 012, 45.67, 89.01, 23.45, FALSE, '2023-05-25', '23:45:01', '2023-05-25 23:45:01', '{"key": "value4"}', E'\\xBABEC0DE'), + (5, 'Varchar value 5', 'Text value 5', 567, 890, 123, 56.78, 90.12, 34.56, TRUE, '2023-05-26', '12:34:56', '2023-05-26 12:34:56', '{"key": "value5"}', E'\\xDEADBABE'); diff --git a/integration_tests/mysql-sink/data_check b/integration_tests/mysql-sink/data_check index 3835eb979b86e..a443bbb5d13ce 100644 --- a/integration_tests/mysql-sink/data_check +++ b/integration_tests/mysql-sink/data_check @@ -1 +1 @@ -user_behaviors,target_count \ No newline at end of file +user_behaviors,target_count,rw_types \ No newline at end of file diff --git a/integration_tests/mysql-sink/mysql_prepare.sql b/integration_tests/mysql-sink/mysql_prepare.sql index ded9b4cec97cd..724950f66a90b 100644 --- a/integration_tests/mysql-sink/mysql_prepare.sql +++ b/integration_tests/mysql-sink/mysql_prepare.sql @@ -1,4 +1,24 @@ CREATE TABLE target_count ( target_id VARCHAR(128) primary key, target_count BIGINT +); + + +-- sink table +CREATE TABLE data_types ( + id BIGINT PRIMARY KEY, + varchar_column VARCHAR(255), + text_column TEXT, + integer_column INT, + smallint_column SMALLINT, + bigint_column BIGINT, + decimal_column DECIMAL(10,2), + real_column FLOAT, + double_column DOUBLE, + boolean_column BOOLEAN, + date_column DATE, + time_column TIME, + timestamp_column TIMESTAMP, + jsonb_column JSON, + bytea_column BLOB ); \ No newline at end of file diff --git a/integration_tests/mysql-sink/query.sql b/integration_tests/mysql-sink/query.sql index e09c66a255f10..6fbe4cc96813e 100644 --- a/integration_tests/mysql-sink/query.sql +++ b/integration_tests/mysql-sink/query.sql @@ -2,5 +2,12 @@ SELECT * FROM target_count +LIMIT + 10; + +SELECT + * +FROM + data_types LIMIT 10; \ No newline at end of file diff --git a/integration_tests/postgres-sink/create_mv.sql b/integration_tests/postgres-sink/create_mv.sql index 9870d62c45a1c..3853010ebb42b 100644 --- a/integration_tests/postgres-sink/create_mv.sql +++ b/integration_tests/postgres-sink/create_mv.sql @@ -14,4 +14,36 @@ FROM jdbc.url = 'jdbc:postgresql://postgres:5432/mydb?user=myuser&password=123456', table.name = 'target_count', type = 'upsert' - ); \ No newline at end of file + ); + +-- ingest back to RW +CREATE table rw_types ( + id BIGINT PRIMARY KEY, + varchar_column VARCHAR, + text_column TEXT, + integer_column INTEGER, + smallint_column SMALLINT, + bigint_column BIGINT, + decimal_column DECIMAL, + real_column REAL, + double_column DOUBLE PRECISION, + boolean_column BOOLEAN, + date_column DATE, + time_column TIME, + timestamp_column TIMESTAMP, + timestamptz_column TIMESTAMPTZ, + interval_column INTERVAL, + jsonb_column JSONB, + bytea_column BYTEA, + array_column VARCHAR[] +) WITH ( + connector = 'postgres-cdc', + hostname = 'postgres', + port = '5432', + username = 'myuser', + password = '123456', + database.name = 'mydb', + schema.name = 'public', + table.name = 'data_types', + slot.name = 'data_types' +); diff --git a/integration_tests/postgres-sink/create_source.sql b/integration_tests/postgres-sink/create_source.sql index 7a9e3d3add4c8..1499ca6867f58 100644 --- a/integration_tests/postgres-sink/create_source.sql +++ b/integration_tests/postgres-sink/create_source.sql @@ -11,4 +11,44 @@ CREATE SOURCE user_behaviors ( topic = 'user_behaviors', properties.bootstrap.server = 'message_queue:29092', scan.startup.mode = 'earliest' -) ROW FORMAT JSON; \ No newline at end of file +) ROW FORMAT JSON; + +CREATE TABLE data_types ( + id BIGINT PRIMARY KEY, + varchar_column VARCHAR, + text_column TEXT, + integer_column INTEGER, + smallint_column SMALLINT, + bigint_column BIGINT, + decimal_column DECIMAL, + real_column REAL, + double_column DOUBLE PRECISION, + boolean_column BOOLEAN, + date_column DATE, + time_column TIME, + timestamp_column TIMESTAMP, + timestamptz_column TIMESTAMPTZ, + interval_column INTERVAL, + jsonb_column JSONB, + bytea_column BYTEA, + array_column VARCHAR[] +); + +-- sink data_type table to pg +CREATE SINK data_types_postgres_sink +FROM + data_types WITH ( + connector = 'jdbc', + jdbc.url = 'jdbc:postgresql://postgres:5432/mydb?user=myuser&password=123456', + table.name = 'data_types', + type='upsert' +); + +INSERT INTO data_types (id, varchar_column, text_column, integer_column, smallint_column, bigint_column, decimal_column, real_column, double_column, boolean_column, date_column, time_column, timestamp_column, timestamptz_column, interval_column, jsonb_column, bytea_column, array_column) +VALUES + (1, 'Varchar value 1', 'Text value 1', 123, 456, 789, 12.34, 56.78, 90.12, TRUE, '2023-05-22', '12:34:56', '2023-05-22 12:34:56', '2023-05-22 12:34:56+00:00', '1 day', '{"key": "value"}', E'\\xDEADBEEF', ARRAY['Value 1', 'Value 2']), + (2, 'Varchar value 2', 'Text value 2', 234, 567, 890, 23.45, 67.89, 01.23, FALSE, '2023-05-23', '23:45:01', '2023-05-23 23:45:01', '2023-05-23 23:45:01+00:00', '2 days', '{"key": "value2"}', E'\\xFEEDBEEF', ARRAY['Value 3', 'Value 4']), + (3, 'Varchar value 3', 'Text value 3', 345, 678, 901, 34.56, 78.90, 12.34, TRUE, '2023-05-24', '12:34:56', '2023-05-24 12:34:56', '2023-05-24 12:34:56+00:00', '3 days', '{"key": "value3"}', E'\\xCAFEBABE', ARRAY['Value 5', 'Value 6']), + (4, 'Varchar value 4', 'Text value 4', 456, 789, 012, 45.67, 89.01, 23.45, FALSE, '2023-05-25', '23:45:01', '2023-05-25 23:45:01', '2023-05-25 23:45:01+00:00', '4 days', '{"key": "value4"}', E'\\xBABEC0DE', ARRAY['Value 7', 'Value 8']), + (5, 'Varchar value 5', 'Text value 5', 567, 890, 123, 56.78, 90.12, 34.56, TRUE, '2023-05-26', '12:34:56', '2023-05-26 12:34:56', '2023-05-26 12:34:56+00:00', '5 days', '{"key": "value5"}', E'\\xDEADBABE', ARRAY['Value 9', 'Value 10']); + diff --git a/integration_tests/postgres-sink/data_check b/integration_tests/postgres-sink/data_check index 3835eb979b86e..a443bbb5d13ce 100644 --- a/integration_tests/postgres-sink/data_check +++ b/integration_tests/postgres-sink/data_check @@ -1 +1 @@ -user_behaviors,target_count \ No newline at end of file +user_behaviors,target_count,rw_types \ No newline at end of file diff --git a/integration_tests/postgres-sink/postgres_prepare.sql b/integration_tests/postgres-sink/postgres_prepare.sql index ded9b4cec97cd..595cf960121d7 100644 --- a/integration_tests/postgres-sink/postgres_prepare.sql +++ b/integration_tests/postgres-sink/postgres_prepare.sql @@ -1,4 +1,25 @@ CREATE TABLE target_count ( target_id VARCHAR(128) primary key, target_count BIGINT +); + +CREATE TABLE data_types ( + id BIGINT PRIMARY KEY, + varchar_column VARCHAR, + text_column TEXT, + integer_column INTEGER, + smallint_column SMALLINT, + bigint_column BIGINT, + decimal_column DECIMAL, + real_column REAL, + double_column DOUBLE PRECISION, + boolean_column BOOLEAN, + date_column DATE, + time_column TIME, + timestamp_column TIMESTAMP, + timestamptz_column TIMESTAMPTZ, + interval_column INTERVAL, + jsonb_column JSONB, + bytea_column BYTEA, + array_column VARCHAR[] ); \ No newline at end of file