Skip to content

Commit 32100f3

Browse files
authored
feat(sink): set parallelism of iceberg sink to 1 (risingwavelabs#8476)
1 parent a6c8c86 commit 32100f3

File tree

1 file changed

+15
-2
lines changed

1 file changed

+15
-2
lines changed

src/frontend/src/optimizer/plan_node/stream_sink.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use risingwave_connector::sink::{
2626
SINK_FORMAT_APPEND_ONLY, SINK_FORMAT_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION,
2727
};
2828
use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;
29+
use tracing::info;
2930

3031
use super::derive::{derive_columns, derive_pk};
3132
use super::utils::IndicesDisplay;
@@ -72,10 +73,22 @@ impl StreamSink {
7273
let required_dist = match input.distribution() {
7374
Distribution::Single => RequiredDist::single(),
7475
_ => {
75-
assert_matches!(user_distributed_by, RequiredDist::Any);
76-
RequiredDist::shard_by_key(input.schema().len(), input.logical_pk())
76+
match properties.get("connector") {
77+
Some(s) if s == "iceberg" => {
78+
// iceberg with multiple parallelism will fail easily with concurrent commit
79+
// on metadata
80+
// TODO: reset iceberg sink to have multiple parallelism
81+
info!("setting iceberg sink parallelism to singleton");
82+
RequiredDist::single()
83+
}
84+
_ => {
85+
assert_matches!(user_distributed_by, RequiredDist::Any);
86+
RequiredDist::shard_by_key(input.schema().len(), input.logical_pk())
87+
}
88+
}
7789
}
7890
};
91+
7992
let input = required_dist.enforce_if_not_satisfies(input, &Order::any())?;
8093
let columns = derive_columns(input.schema(), out_names, &user_cols)?;
8194

0 commit comments

Comments
 (0)