Skip to content

Commit 5543490

Browse files
committed
inline fragment ids in proto field to repeated uint32
1 parent d447492 commit 5543490

File tree

4 files changed

+11
-25
lines changed

4 files changed

+11
-25
lines changed

proto/stream_plan.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ message AddMutation {
3131
bool pause = 4;
3232
repeated SubscriptionUpstreamInfo subscriptions_to_add = 5;
3333
// nodes which should start backfill
34-
common.Uint32Vector backfill_nodes_to_start = 6;
34+
repeated uint32 backfill_nodes_to_start = 6;
3535
}
3636

3737
message StopMutation {

src/meta/src/barrier/command.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use risingwave_connector::source::SplitImpl;
2626
use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
2727
use risingwave_meta_model::WorkerId;
2828
use risingwave_pb::catalog::{CreateType, Table};
29-
use risingwave_pb::common::{ActorInfo, Uint32Vector};
29+
use risingwave_pb::common::ActorInfo;
3030
use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
3131
use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
3232
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
@@ -754,9 +754,7 @@ impl Command {
754754
// If the cluster is already paused, the new actors should be paused too.
755755
pause: is_currently_paused,
756756
subscriptions_to_add,
757-
backfill_nodes_to_start: Some(Uint32Vector {
758-
data: backfill_nodes_to_start,
759-
}),
757+
backfill_nodes_to_start,
760758
}));
761759

762760
if let CreateStreamingJobType::SinkIntoTable(ReplaceStreamJobPlan {
@@ -1009,7 +1007,7 @@ impl Command {
10091007
upstream_mv_table_id: upstream_mv_table_id.table_id,
10101008
subscriber_id: *subscription_id,
10111009
}],
1012-
backfill_nodes_to_start: Some(Uint32Vector { data: vec![] }),
1010+
backfill_nodes_to_start: vec![],
10131011
})),
10141012
Command::DropSubscription {
10151013
upstream_mv_table_id,

src/meta/src/barrier/rpc.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use risingwave_common::util::epoch::Epoch;
3131
use risingwave_common::util::tracing::TracingContext;
3232
use risingwave_connector::source::SplitImpl;
3333
use risingwave_meta_model::WorkerId;
34-
use risingwave_pb::common::{HostAddress, Uint32Vector, WorkerNode};
34+
use risingwave_pb::common::{HostAddress, WorkerNode};
3535
use risingwave_pb::hummock::HummockVersionStats;
3636
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
3737
use risingwave_pb::stream_plan::{
@@ -487,12 +487,10 @@ impl ControlStreamManager {
487487
pause: is_paused,
488488
subscriptions_to_add: Default::default(),
489489
// TODO(kwannoel): recover using backfill order plan
490-
backfill_nodes_to_start: Some(Uint32Vector {
491-
data: background_jobs
492-
.values()
493-
.flat_map(|(_, fragments)| fragments.fragment_ids())
494-
.collect_vec(),
495-
}),
490+
backfill_nodes_to_start: background_jobs
491+
.values()
492+
.flat_map(|(_, fragments)| fragments.fragment_ids())
493+
.collect_vec(),
496494
});
497495

498496
fn resolve_jobs_committed_epoch(

src/stream/src/executor/mod.rs

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,6 @@ pub type DispatcherMessageStreamItem = StreamExecutorResult<DispatcherMessage>;
165165
pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>;
166166

167167
pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch};
168-
use risingwave_pb::common::Uint32Vector;
169168
use risingwave_pb::stream_plan::stream_message_batch::{BarrierBatch, StreamMessageBatch};
170169
use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
171170

@@ -722,9 +721,7 @@ impl Mutation {
722721
upstream_mv_table_id: table_id.table_id,
723722
})
724723
.collect(),
725-
backfill_nodes_to_start: Some(Uint32Vector {
726-
data: backfill_nodes_to_start.iter().cloned().collect(),
727-
}),
724+
backfill_nodes_to_start: backfill_nodes_to_start.iter().copied().collect(),
728725
}),
729726
Mutation::SourceChangeSplit(changes) => PbMutation::Splits(SourceChangeSplitMutation {
730727
actor_splits: changes
@@ -877,14 +874,7 @@ impl Mutation {
877874
},
878875
)
879876
.collect(),
880-
backfill_nodes_to_start: add
881-
.backfill_nodes_to_start
882-
.as_ref()
883-
.expect("not None")
884-
.data
885-
.iter()
886-
.copied()
887-
.collect(),
877+
backfill_nodes_to_start: add.backfill_nodes_to_start.iter().copied().collect(),
888878
}),
889879

890880
PbMutation::Splits(s) => {

0 commit comments

Comments
 (0)