Skip to content

Commit 34afd39

Browse files
BugenZhaoLi0k
authored andcommitted
feat(meta): add system parameter pause_on_next_bootstrap (#11936)
Signed-off-by: Bugen Zhao <[email protected]>
1 parent 55e13e3 commit 34afd39

File tree

25 files changed

+400
-120
lines changed

25 files changed

+400
-120
lines changed

proto/meta.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,7 @@ message SystemParams {
495495
optional bool telemetry_enabled = 10;
496496
optional uint32 parallel_compact_size_mb = 11;
497497
optional uint32 max_concurrent_creating_streaming_jobs = 12;
498+
optional bool pause_on_next_bootstrap = 13;
498499
}
499500

500501
message GetSystemParamsRequest {}

proto/stream_plan.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ message AddMutation {
2424
// We may embed a source change split mutation here.
2525
// TODO: we may allow multiple mutations in a single barrier.
2626
map<uint32, source.ConnectorSplits> actor_splits = 2;
27+
// We may embed a pause mutation here.
28+
// TODO: we may allow multiple mutations in a single barrier.
29+
bool pause = 4;
2730
}
2831

2932
message StopMutation {

src/common/src/config.rs

Lines changed: 6 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -775,6 +775,10 @@ pub struct SystemConfig {
775775
/// Max number of concurrent creating streaming jobs.
776776
#[serde(default = "default::system::max_concurrent_creating_streaming_jobs")]
777777
pub max_concurrent_creating_streaming_jobs: Option<u32>,
778+
779+
/// Whether to pause all data sources on next bootstrap.
780+
#[serde(default = "default::system::pause_on_next_bootstrap")]
781+
pub pause_on_next_bootstrap: Option<bool>,
778782
}
779783

780784
impl SystemConfig {
@@ -792,6 +796,7 @@ impl SystemConfig {
792796
backup_storage_directory: self.backup_storage_directory,
793797
telemetry_enabled: self.telemetry_enabled,
794798
max_concurrent_creating_streaming_jobs: self.max_concurrent_creating_streaming_jobs,
799+
pause_on_next_bootstrap: self.pause_on_next_bootstrap,
795800
}
796801
}
797802
}
@@ -1165,55 +1170,7 @@ pub mod default {
11651170
}
11661171

11671172
pub mod system {
1168-
use crate::system_param;
1169-
1170-
pub fn barrier_interval_ms() -> Option<u32> {
1171-
system_param::default::barrier_interval_ms()
1172-
}
1173-
1174-
pub fn checkpoint_frequency() -> Option<u64> {
1175-
system_param::default::checkpoint_frequency()
1176-
}
1177-
1178-
pub fn parallel_compact_size_mb() -> Option<u32> {
1179-
system_param::default::parallel_compact_size_mb()
1180-
}
1181-
1182-
pub fn sstable_size_mb() -> Option<u32> {
1183-
system_param::default::sstable_size_mb()
1184-
}
1185-
1186-
pub fn block_size_kb() -> Option<u32> {
1187-
system_param::default::block_size_kb()
1188-
}
1189-
1190-
pub fn bloom_false_positive() -> Option<f64> {
1191-
system_param::default::bloom_false_positive()
1192-
}
1193-
1194-
pub fn state_store() -> Option<String> {
1195-
system_param::default::state_store()
1196-
}
1197-
1198-
pub fn data_directory() -> Option<String> {
1199-
system_param::default::data_directory()
1200-
}
1201-
1202-
pub fn backup_storage_url() -> Option<String> {
1203-
system_param::default::backup_storage_url()
1204-
}
1205-
1206-
pub fn backup_storage_directory() -> Option<String> {
1207-
system_param::default::backup_storage_directory()
1208-
}
1209-
1210-
pub fn telemetry_enabled() -> Option<bool> {
1211-
system_param::default::telemetry_enabled()
1212-
}
1213-
1214-
pub fn max_concurrent_creating_streaming_jobs() -> Option<u32> {
1215-
system_param::default::max_concurrent_creating_streaming_jobs()
1216-
}
1173+
pub use crate::system_param::default::*;
12171174
}
12181175

12191176
pub mod batch {

src/common/src/system_param/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ macro_rules! for_all_undeprecated_params {
5656
{ backup_storage_directory, String, Some("backup".to_string()), false },
5757
{ telemetry_enabled, bool, Some(true), true },
5858
{ max_concurrent_creating_streaming_jobs, u32, Some(1_u32), true },
59-
$({ $field, $type, $default },)*
59+
{ pause_on_next_bootstrap, bool, Some(false), true },
60+
$({ $field, $type, $default, $is_mutable },)*
6061
}
6162
};
6263
}
@@ -67,7 +68,7 @@ macro_rules! for_all_params {
6768
($macro:ident) => {
6869
for_all_undeprecated_params!(
6970
$macro /* Define future deprecated params here, such as
70-
* ,{ backup_storage_directory, String, "backup".to_string() } */
71+
* ,{ backup_storage_directory, String, "backup".to_string(), true } */
7172
);
7273
};
7374
}
@@ -370,6 +371,7 @@ mod tests {
370371
(BACKUP_STORAGE_DIRECTORY_KEY, "a"),
371372
(TELEMETRY_ENABLED_KEY, "false"),
372373
(MAX_CONCURRENT_CREATING_STREAMING_JOBS_KEY, "1"),
374+
(PAUSE_ON_NEXT_BOOTSTRAP_KEY, "false"),
373375
];
374376

375377
// To kv - missing field.

src/common/src/system_param/reader.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ impl SystemParamsReader {
8080
self.prost.telemetry_enabled.unwrap()
8181
}
8282

83+
pub fn pause_on_next_bootstrap(&self) -> bool {
84+
self.prost.pause_on_next_bootstrap.unwrap_or(false)
85+
}
86+
8387
pub fn to_kv(&self) -> Vec<(String, String)> {
8488
system_params_to_kv(&self.prost).unwrap()
8589
}

src/config/example.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,3 +151,4 @@ backup_storage_url = "memory"
151151
backup_storage_directory = "backup"
152152
telemetry_enabled = true
153153
max_concurrent_creating_streaming_jobs = 1
154+
pause_on_next_bootstrap = false

src/meta/src/backup_restore/restore.rs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,7 @@ mod tests {
291291
use clap::Parser;
292292
use itertools::Itertools;
293293
use risingwave_backup::meta_snapshot::{ClusterMetadata, MetaSnapshot};
294+
use risingwave_common::config::SystemConfig;
294295
use risingwave_pb::hummock::HummockVersion;
295296
use risingwave_pb::meta::SystemParams;
296297

@@ -318,18 +319,9 @@ mod tests {
318319

319320
fn get_system_params() -> SystemParams {
320321
SystemParams {
321-
barrier_interval_ms: Some(101),
322-
checkpoint_frequency: Some(102),
323-
sstable_size_mb: Some(103),
324-
block_size_kb: Some(104),
325-
bloom_false_positive: Some(0.1),
326322
state_store: Some("state_store".to_string()),
327323
data_directory: Some("data_directory".to_string()),
328-
backup_storage_url: Some("backup_storage_url".to_string()),
329-
backup_storage_directory: Some("backup_storage_directory".to_string()),
330-
telemetry_enabled: Some(false),
331-
parallel_compact_size_mb: Some(255),
332-
max_concurrent_creating_streaming_jobs: Some(1),
324+
..SystemConfig::default().into_init_system_params()
333325
}
334326
}
335327

src/meta/src/barrier/command.rs

Lines changed: 79 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use super::info::BarrierActorInfo;
3636
use super::trace::TracedEpoch;
3737
use crate::barrier::CommandChanges;
3838
use crate::manager::{FragmentManagerRef, WorkerId};
39-
use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments};
39+
use crate::model::{ActorId, DispatcherId, FragmentId, PausedReason, TableFragments};
4040
use crate::storage::MetaStore;
4141
use crate::stream::{build_actor_connector_splits, SourceManagerRef, SplitAssignment};
4242
use crate::MetaResult;
@@ -79,6 +79,15 @@ pub enum Command {
7979
/// After the barrier is collected, it does nothing.
8080
Plain(Option<Mutation>),
8181

82+
/// `Pause` command generates a `Pause` barrier with the provided [`PausedReason`] **only if**
83+
/// the cluster is not already paused. Otherwise, a barrier with no mutation will be generated.
84+
Pause(PausedReason),
85+
86+
/// `Resume` command generates a `Resume` barrier with the provided [`PausedReason`] **only
87+
/// if** the cluster is paused with the same reason. Otherwise, a barrier with no mutation
88+
/// will be generated.
89+
Resume(PausedReason),
90+
8291
/// `DropStreamingJobs` command generates a `Stop` barrier by the given
8392
/// [`HashSet<TableId>`]. The catalog has ensured that these streaming jobs are safe to be
8493
/// dropped by reference counts before.
@@ -142,18 +151,20 @@ impl Command {
142151
Self::Plain(None)
143152
}
144153

145-
pub fn pause() -> Self {
146-
Self::Plain(Some(Mutation::Pause(PauseMutation {})))
154+
pub fn pause(reason: PausedReason) -> Self {
155+
Self::Pause(reason)
147156
}
148157

149-
pub fn resume() -> Self {
150-
Self::Plain(Some(Mutation::Resume(ResumeMutation {})))
158+
pub fn resume(reason: PausedReason) -> Self {
159+
Self::Resume(reason)
151160
}
152161

153162
/// Changes to the actors to be sent or collected after this command is committed.
154163
pub fn changes(&self) -> CommandChanges {
155164
match self {
156165
Command::Plain(_) => CommandChanges::None,
166+
Command::Pause(_) => CommandChanges::None,
167+
Command::Resume(_) => CommandChanges::None,
157168
Command::CreateStreamingJob {
158169
table_fragments, ..
159170
} => CommandChanges::CreateTable(table_fragments.table_id()),
@@ -189,15 +200,15 @@ impl Command {
189200
/// injection. return true.
190201
pub fn should_pause_inject_barrier(&self) -> bool {
191202
// Note: the meaning for `Pause` is not pausing the periodic barrier injection, but for
192-
// pausing the sources on compute nodes. However, `Pause` is used for configuration change
193-
// like scaling and migration, which must pause the concurrent checkpoint to ensure the
203+
// pausing the sources on compute nodes. However, when `Pause` is used for configuration
204+
// change like scaling and migration, it must pause the concurrent checkpoint to ensure the
194205
// previous checkpoint has been done.
195-
matches!(self, Self::Plain(Some(Mutation::Pause(_))))
206+
matches!(self, Self::Pause(PausedReason::ConfigChange))
196207
}
197208

198209
pub fn need_checkpoint(&self) -> bool {
199210
// todo! Reviewing the flow of different command to reduce the amount of checkpoint
200-
!matches!(self, Command::Plain(None | Some(Mutation::Resume(_))))
211+
!matches!(self, Command::Plain(None) | Command::Resume(_))
201212
}
202213
}
203214

@@ -215,6 +226,8 @@ pub struct CommandContext<S: MetaStore> {
215226
pub prev_epoch: TracedEpoch,
216227
pub curr_epoch: TracedEpoch,
217228

229+
pub current_paused_reason: Option<PausedReason>,
230+
218231
pub command: Command,
219232

220233
pub kind: BarrierKind,
@@ -237,6 +250,7 @@ impl<S: MetaStore> CommandContext<S> {
237250
info: BarrierActorInfo,
238251
prev_epoch: TracedEpoch,
239252
curr_epoch: TracedEpoch,
253+
current_paused_reason: Option<PausedReason>,
240254
command: Command,
241255
kind: BarrierKind,
242256
source_manager: SourceManagerRef<S>,
@@ -248,6 +262,7 @@ impl<S: MetaStore> CommandContext<S> {
248262
info: Arc::new(info),
249263
prev_epoch,
250264
curr_epoch,
265+
current_paused_reason,
251266
command,
252267
kind,
253268
source_manager,
@@ -265,6 +280,24 @@ where
265280
let mutation = match &self.command {
266281
Command::Plain(mutation) => mutation.clone(),
267282

283+
Command::Pause(_) => {
284+
// Only pause when the cluster is not already paused.
285+
if self.current_paused_reason.is_none() {
286+
Some(Mutation::Pause(PauseMutation {}))
287+
} else {
288+
None
289+
}
290+
}
291+
292+
Command::Resume(reason) => {
293+
// Only resume when the cluster is paused with the same reason.
294+
if self.current_paused_reason == Some(*reason) {
295+
Some(Mutation::Resume(ResumeMutation {}))
296+
} else {
297+
None
298+
}
299+
}
300+
268301
Command::SourceSplitAssignment(change) => {
269302
let mut diff = HashMap::new();
270303

@@ -308,6 +341,8 @@ where
308341
actor_dispatchers,
309342
added_actors,
310343
actor_splits,
344+
// If the cluster is already paused, the new actors should be paused too.
345+
pause: self.current_paused_reason.is_some(),
311346
}))
312347
}
313348

@@ -478,6 +513,31 @@ where
478513
Ok(mutation)
479514
}
480515

516+
/// Returns the paused reason after executing the current command.
517+
pub fn next_paused_reason(&self) -> Option<PausedReason> {
518+
match &self.command {
519+
Command::Pause(reason) => {
520+
// Only pause when the cluster is not already paused.
521+
if self.current_paused_reason.is_none() {
522+
Some(*reason)
523+
} else {
524+
self.current_paused_reason
525+
}
526+
}
527+
528+
Command::Resume(reason) => {
529+
// Only resume when the cluster is paused with the same reason.
530+
if self.current_paused_reason == Some(*reason) {
531+
None
532+
} else {
533+
self.current_paused_reason
534+
}
535+
}
536+
537+
_ => self.current_paused_reason,
538+
}
539+
}
540+
481541
/// For `CreateStreamingJob`, returns the actors of the `Chain` nodes. For other commands,
482542
/// returns an empty set.
483543
pub fn actors_to_track(&self) -> HashSet<ActorId> {
@@ -562,18 +622,19 @@ where
562622
/// the given command.
563623
pub async fn post_collect(&self) -> MetaResult<()> {
564624
match &self.command {
565-
#[allow(clippy::single_match)]
566-
Command::Plain(mutation) => match mutation {
567-
// After the `Pause` barrier is collected and committed, we must ensure that the
568-
// storage version with this epoch is synced to all compute nodes before the
569-
// execution of the next command of `Update`, as some newly created operators may
570-
// immediately initialize their states on that barrier.
571-
Some(Mutation::Pause(..)) => {
625+
Command::Plain(_) => {}
626+
627+
Command::Pause(reason) => {
628+
if let PausedReason::ConfigChange = reason {
629+
// After the `Pause` barrier is collected and committed, we must ensure that the
630+
// storage version with this epoch is synced to all compute nodes before the
631+
// execution of the next command of `Update`, as some newly created operators
632+
// may immediately initialize their states on that barrier.
572633
self.wait_epoch_commit(self.prev_epoch.value().0).await?;
573634
}
635+
}
574636

575-
_ => {}
576-
},
637+
Command::Resume(_) => {}
577638

578639
Command::SourceSplitAssignment(split_assignment) => {
579640
self.fragment_manager

0 commit comments

Comments
 (0)