Skip to content

Commit 961e342

Browse files
authored
feat(sink): enable user-defined primary key for upsert sink (risingwavelabs#8610)
1 parent 88aa6a4 commit 961e342

File tree

24 files changed

+219
-162
lines changed

24 files changed

+219
-162
lines changed

ci/scripts/e2e-sink-test.sh

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -88,29 +88,29 @@ cargo make ci-start ci-1cn-1fe
8888

8989
echo "--- testing sinks"
9090
sqllogictest -p 4566 -d dev './e2e_test/sink/append_only_sink.slt'
91-
# sqllogictest -p 4566 -d dev './e2e_test/sink/create_sink_as.slt'
91+
sqllogictest -p 4566 -d dev './e2e_test/sink/create_sink_as.slt'
9292
sqllogictest -p 4566 -d dev './e2e_test/sink/blackhole_sink.slt'
9393
sleep 1
9494

9595
# check sink destination postgres
96-
# sqllogictest -p 4566 -d dev './e2e_test/sink/remote/jdbc.load.slt'
97-
# sleep 1
98-
# sqllogictest -h db -p 5432 -d test './e2e_test/sink/remote/jdbc.check.pg.slt'
99-
# sleep 1
96+
sqllogictest -p 4566 -d dev './e2e_test/sink/remote/jdbc.load.slt'
97+
sleep 1
98+
sqllogictest -h db -p 5432 -d test './e2e_test/sink/remote/jdbc.check.pg.slt'
99+
sleep 1
100100

101101
# check sink destination mysql using shell
102-
# if mysql --host=mysql --port=3306 -u root -p123456 -sN -e "SELECT * FROM test.t_remote ORDER BY id;" | awk '{
103-
# if ($1 == 1 && $2 == "Alex") c1++;
104-
# if ($1 == 3 && $2 == "Carl") c2++;
105-
# if ($1 == 4 && $2 == "Doris") c3++;
106-
# if ($1 == 5 && $2 == "Eve") c4++;
107-
# if ($1 == 6 && $2 == "Frank") c5++; }
108-
# END { exit !(c1 == 1 && c2 == 1 && c3 == 1 && c4 == 1 && c5 == 1); }'; then
109-
# echo "mysql sink check passed"
110-
# else
111-
# echo "The output is not as expected."
112-
# exit 1
113-
# fi
102+
if mysql --host=mysql --port=3306 -u root -p123456 -sN -e "SELECT * FROM test.t_remote ORDER BY id;" | awk '{
103+
if ($1 == 1 && $2 == "Alex") c1++;
104+
if ($1 == 3 && $2 == "Carl") c2++;
105+
if ($1 == 4 && $2 == "Doris") c3++;
106+
if ($1 == 5 && $2 == "Eve") c4++;
107+
if ($1 == 6 && $2 == "Frank") c5++; }
108+
END { exit !(c1 == 1 && c2 == 1 && c3 == 1 && c4 == 1 && c5 == 1); }'; then
109+
echo "mysql sink check passed"
110+
else
111+
echo "The output is not as expected."
112+
exit 1
113+
fi
114114

115115
echo "--- Kill cluster"
116116
pkill -f connector-node

dashboard/proto/gen/catalog.ts

Lines changed: 16 additions & 15 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dashboard/proto/gen/stream_plan.ts

Lines changed: 14 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

e2e_test/batch/explain.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ statement ok
55
explain create index i on t(v);
66

77
statement ok
8-
explain create sink sink_t from t with ( connector = 'kafka', format = 'append_only' )
8+
explain create sink sink_t from t with ( connector = 'kafka', type = 'append-only' )
99

1010
statement ok
1111
drop table t;

e2e_test/ddl/table.slt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,10 @@ statement ok
3939
explain select v2 from ddl_t;
4040

4141
statement ok
42-
explain create sink sink_t from ddl_t with ( connector = 'kafka', format = 'append_only', force_append_only = 'true' );
42+
explain create sink sink_t from ddl_t with ( connector = 'kafka', type = 'append-only', force_append_only = 'true' );
4343

4444
statement ok
45-
explain create sink sink_as as select sum(v2) as sum from ddl_t with ( connector = 'kafka', format = 'append_only', force_append_only = 'true' );
45+
explain create sink sink_as as select sum(v2) as sum from ddl_t with ( connector = 'kafka', type = 'append-only', force_append_only = 'true' );
4646

4747
# Create a mview with duplicated name.
4848
statement error

e2e_test/sink/append_only_sink.slt

Lines changed: 7 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,20 @@
11
statement ok
2-
create table t1 (v1 int, v2 int);
3-
4-
statement error No primary key for the upsert sink
5-
create sink s1 from t1 with (connector = 'console');
6-
7-
statement ok
8-
create sink s1 as select v1, v2, _row_id from t1 with (connector = 'console');
9-
10-
statement ok
11-
create table t2 (v1 int, v2 int primary key);
12-
13-
statement ok
14-
create sink s2 from t2 with (connector = 'console');
15-
16-
statement error No primary key for the upsert sink
17-
create sink s3 as select avg(v1) from t2 with (connector = 'console');
2+
create table t (v1 int, v2 int);
183

194
statement ok
20-
create sink s3 as select avg(v1) from t2 with (connector = 'console', format = 'append_only', force_append_only = 'true');
5+
create sink s1 from t with (connector = 'console');
216

227
statement ok
23-
create sink s4 as select avg(v1), v2 from t2 group by v2 with (connector = 'console');
8+
create sink s2 as select avg(v1), v2 from t group by v2 with (connector = 'console');
249

2510
statement error The sink cannot be append-only
26-
create sink s5 from t2 with (connector = 'console', format = 'append_only');
11+
create sink s3 from t with (connector = 'console', type = 'append-only');
2712

2813
statement ok
29-
create sink s5 from t2 with (connector = 'console', format = 'append_only', force_append_only = 'true');
14+
create sink s3 from t with (connector = 'console', type = 'append-only', force_append_only = 'true');
3015

3116
statement error Cannot force the sink to be append-only
32-
create sink s6 from t2 with (connector = 'console', format = 'upsert', force_append_only = 'true');
17+
create sink s4 from t with (connector = 'console', type = 'upsert', force_append_only = 'true');
3318

3419
statement ok
3520
drop sink s1
@@ -41,13 +26,4 @@ statement ok
4126
drop sink s3
4227

4328
statement ok
44-
drop sink s4
45-
46-
statement ok
47-
drop sink s5
48-
49-
statement ok
50-
drop table t1
51-
52-
statement ok
53-
drop table t2
29+
drop table t

e2e_test/sink/iceberg_sink.slt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;
77
statement ok
88
CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH (
99
connector = 'iceberg',
10-
sink.mode='append-only',
10+
type = 'upsert',
11+
primary_key = 'v1',
1112
warehouse.path = 's3://iceberg',
1213
s3.endpoint = 'http://127.0.0.1:9301',
1314
s3.access.key = 'hummockadmin',

e2e_test/source/basic/kafka.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ from
7979
s5 with (
8080
properties.bootstrap.server = '127.0.0.1:29092',
8181
topic = 'sink_target',
82-
format = 'append_only',
82+
type = 'append-only',
8383
connector = 'kafka'
8484
)
8585

integration_tests/iceberg-sink/create_sink.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ CREATE SINK bhv_iceberg_sink
22
FROM
33
bhv_mv WITH (
44
connector = 'iceberg',
5-
sink.mode='upsert',
5+
type = 'upsert',
66
warehouse.path = 's3://hummock001/iceberg-data',
77
s3.endpoint = 'http://minio-0:9301',
88
s3.access.key = 'hummockadmin',

java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/TableSchema.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,6 @@ public static TableSchema fromProto(ConnectorServiceProto.TableSchema tableSchem
102102
.collect(Collectors.toList()));
103103
}
104104

105-
/** @deprecated pk here is from Risingwave, it may not match the pk in the database */
106-
@Deprecated
107105
public List<String> getPrimaryKeys() {
108106
return primaryKeys;
109107
}

java/connector-node/python-client/integration_tests.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ def test_print_sink(input_file):
162162

163163
def test_iceberg_sink(input_file):
164164
test_sink("iceberg",
165-
{"sink.mode":"append-only",
165+
{"type":"append-only",
166166
"warehouse.path":"s3a://bucket",
167167
"s3.endpoint": "http://127.0.0.1:9000",
168168
"s3.access.key": "minioadmin",
@@ -173,7 +173,7 @@ def test_iceberg_sink(input_file):
173173

174174
def test_upsert_iceberg_sink(input_file):
175175
test_upsert_sink("iceberg",
176-
{"sink.mode":"upsert",
176+
{"type":"upsert",
177177
"warehouse.path":"s3a://bucket",
178178
"s3.endpoint": "http://127.0.0.1:9000",
179179
"s3.access.key": "minioadmin",

java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkFactory.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public class IcebergSinkFactory implements SinkFactory {
3939

4040
private static final Logger LOG = LoggerFactory.getLogger(IcebergSinkFactory.class);
4141

42-
public static final String SINK_MODE_PROP = "sink.mode";
42+
public static final String SINK_TYPE_PROP = "type";
4343
public static final String WAREHOUSE_PATH_PROP = "warehouse.path";
4444
public static final String DATABASE_NAME_PROP = "database.name";
4545
public static final String TABLE_NAME_PROP = "table.name";
@@ -58,7 +58,7 @@ public class IcebergSinkFactory implements SinkFactory {
5858

5959
@Override
6060
public SinkBase create(TableSchema tableSchema, Map<String, String> tableProperties) {
61-
String mode = tableProperties.get(SINK_MODE_PROP);
61+
String mode = tableProperties.get(SINK_TYPE_PROP);
6262
String warehousePath = getWarehousePath(tableProperties);
6363
String databaseName = tableProperties.get(DATABASE_NAME_PROP);
6464
String tableName = tableProperties.get(TABLE_NAME_PROP);
@@ -93,22 +93,22 @@ public SinkBase create(TableSchema tableSchema, Map<String, String> tablePropert
9393
@Override
9494
public void validate(
9595
TableSchema tableSchema, Map<String, String> tableProperties, SinkType sinkType) {
96-
if (!tableProperties.containsKey(SINK_MODE_PROP) // only append-only, upsert
96+
if (!tableProperties.containsKey(SINK_TYPE_PROP) // only append-only, upsert
9797
|| !tableProperties.containsKey(WAREHOUSE_PATH_PROP)
9898
|| !tableProperties.containsKey(DATABASE_NAME_PROP)
9999
|| !tableProperties.containsKey(TABLE_NAME_PROP)) {
100100
throw INVALID_ARGUMENT
101101
.withDescription(
102102
String.format(
103103
"%s, %s, %s or %s is not specified",
104-
SINK_MODE_PROP,
104+
SINK_TYPE_PROP,
105105
WAREHOUSE_PATH_PROP,
106106
DATABASE_NAME_PROP,
107107
TABLE_NAME_PROP))
108108
.asRuntimeException();
109109
}
110110

111-
String mode = tableProperties.get(SINK_MODE_PROP);
111+
String mode = tableProperties.get(SINK_TYPE_PROP);
112112
String databaseName = tableProperties.get(DATABASE_NAME_PROP);
113113
String tableName = tableProperties.get(TABLE_NAME_PROP);
114114
String warehousePath = getWarehousePath(tableProperties);

java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/IcebergSinkFactoryTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public void testCreate() throws IOException {
6464
sinkFactory.create(
6565
TableSchema.getMockTableSchema(),
6666
Map.of(
67-
IcebergSinkFactory.SINK_MODE_PROP,
67+
IcebergSinkFactory.SINK_TYPE_PROP,
6868
sinkMode,
6969
IcebergSinkFactory.WAREHOUSE_PATH_PROP,
7070
warehousePath,

0 commit comments

Comments
 (0)