Skip to content

Commit 457140d

Browse files
committed
pass downstream pk when starting a sink
1 parent ae99e55 commit 457140d

File tree

1 file changed

+3
-3
lines changed

1 file changed

+3
-3
lines changed

src/stream/src/from_proto/sink.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@ impl ExecutorBuilder for SinkExecutorBuilder {
3737
let sink_type = SinkType::from_proto(sink_desc.get_sink_type().unwrap());
3838
let mut properties = sink_desc.get_properties().clone();
3939
let pk_indices = sink_desc
40-
.plan_pk
40+
.downstream_pk
4141
.iter()
42-
.map(|pk| pk.column_index as usize)
43-
.collect::<Vec<_>>();
42+
.map(|i| *i as usize)
43+
.collect_vec();
4444
let schema = sink_desc.columns.iter().map(Into::into).collect();
4545
// This field can be used to distinguish a specific actor in parallelism to prevent
4646
// transaction execution errors

0 commit comments

Comments
 (0)