Skip to content

Commit 59a0947

Browse files
authored
fix(sink): pass downstream pk when starting a sink (risingwavelabs#8660)
1 parent 8c5489e commit 59a0947

File tree

3 files changed

+10
-4
lines changed

3 files changed

+10
-4
lines changed

ci/scripts/e2e-iceberg-sink-test.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ spark-3.3.1-bin-hadoop3/bin/spark-sql --packages $DEPENDENCIES \
8686

8787
# check sink destination using shell
8888
if cat ./spark-output/*.csv | sort | awk -F "," '{
89-
if ($1 == 1 && $2 == 2 && $3 == "1-2") c1++;
89+
if ($1 == 1 && $2 == 50 && $3 == "1-50") c1++;
9090
if ($1 == 13 && $2 == 2 && $3 == "13-2") c2++;
9191
if ($1 == 21 && $2 == 2 && $3 == "21-2") c3++;
9292
if ($1 == 2 && $2 == 2 && $3 == "2-2") c4++;

e2e_test/sink/iceberg_sink.slt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@ INSERT INTO t6 VALUES (1, 2, '1-2'), (2, 2, '2-2'), (3, 2, '3-2'), (5, 2, '5-2')
2323
statement ok
2424
FLUSH;
2525

26+
statement ok
27+
INSERT INTO t6 VALUES (1, 50, '1-50');
28+
29+
statement ok
30+
FLUSH;
31+
2632
statement ok
2733
DROP SINK s6;
2834

src/stream/src/from_proto/sink.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@ impl ExecutorBuilder for SinkExecutorBuilder {
3737
let sink_type = SinkType::from_proto(sink_desc.get_sink_type().unwrap());
3838
let mut properties = sink_desc.get_properties().clone();
3939
let pk_indices = sink_desc
40-
.plan_pk
40+
.downstream_pk
4141
.iter()
42-
.map(|pk| pk.column_index as usize)
43-
.collect::<Vec<_>>();
42+
.map(|i| *i as usize)
43+
.collect_vec();
4444
let schema = sink_desc.columns.iter().map(Into::into).collect();
4545
// This field can be used to distinguish a specific actor in parallelism to prevent
4646
// transaction execution errors

0 commit comments

Comments
 (0)