Skip to content

feat(meta): add system parameter pause_on_next_bootstrap #11936

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ message SystemParams {
optional bool telemetry_enabled = 10;
optional uint32 parallel_compact_size_mb = 11;
optional uint32 max_concurrent_creating_streaming_jobs = 12;
optional bool pause_on_next_bootstrap = 13;
}

message GetSystemParamsRequest {}
Expand Down
3 changes: 3 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ message AddMutation {
// We may embed a source change split mutation here.
// TODO: we may allow multiple mutations in a single barrier.
map<uint32, source.ConnectorSplits> actor_splits = 2;
// We may embed a pause mutation here.
// TODO: we may allow multiple mutations in a single barrier.
bool pause = 4;
}

message StopMutation {
Expand Down
55 changes: 6 additions & 49 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,10 @@ pub struct SystemConfig {
/// Max number of concurrent creating streaming jobs.
#[serde(default = "default::system::max_concurrent_creating_streaming_jobs")]
pub max_concurrent_creating_streaming_jobs: Option<u32>,

/// Whether to pause all data sources on next bootstrap.
#[serde(default = "default::system::pause_on_next_bootstrap")]
pub pause_on_next_bootstrap: Option<bool>,
}

impl SystemConfig {
Expand All @@ -792,6 +796,7 @@ impl SystemConfig {
backup_storage_directory: self.backup_storage_directory,
telemetry_enabled: self.telemetry_enabled,
max_concurrent_creating_streaming_jobs: self.max_concurrent_creating_streaming_jobs,
pause_on_next_bootstrap: self.pause_on_next_bootstrap,
}
}
}
Expand Down Expand Up @@ -1165,55 +1170,7 @@ pub mod default {
}

pub mod system {
use crate::system_param;

pub fn barrier_interval_ms() -> Option<u32> {
system_param::default::barrier_interval_ms()
}

pub fn checkpoint_frequency() -> Option<u64> {
system_param::default::checkpoint_frequency()
}

pub fn parallel_compact_size_mb() -> Option<u32> {
system_param::default::parallel_compact_size_mb()
}

pub fn sstable_size_mb() -> Option<u32> {
system_param::default::sstable_size_mb()
}

pub fn block_size_kb() -> Option<u32> {
system_param::default::block_size_kb()
}

pub fn bloom_false_positive() -> Option<f64> {
system_param::default::bloom_false_positive()
}

pub fn state_store() -> Option<String> {
system_param::default::state_store()
}

pub fn data_directory() -> Option<String> {
system_param::default::data_directory()
}

pub fn backup_storage_url() -> Option<String> {
system_param::default::backup_storage_url()
}

pub fn backup_storage_directory() -> Option<String> {
system_param::default::backup_storage_directory()
}

pub fn telemetry_enabled() -> Option<bool> {
system_param::default::telemetry_enabled()
}

pub fn max_concurrent_creating_streaming_jobs() -> Option<u32> {
system_param::default::max_concurrent_creating_streaming_jobs()
}
pub use crate::system_param::default::*;
}

pub mod batch {
Expand Down
6 changes: 4 additions & 2 deletions src/common/src/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ macro_rules! for_all_undeprecated_params {
{ backup_storage_directory, String, Some("backup".to_string()), false },
{ telemetry_enabled, bool, Some(true), true },
{ max_concurrent_creating_streaming_jobs, u32, Some(1_u32), true },
$({ $field, $type, $default },)*
{ pause_on_next_bootstrap, bool, Some(false), true },
$({ $field, $type, $default, $is_mutable },)*
}
};
}
Expand All @@ -67,7 +68,7 @@ macro_rules! for_all_params {
($macro:ident) => {
for_all_undeprecated_params!(
$macro /* Define future deprecated params here, such as
* ,{ backup_storage_directory, String, "backup".to_string() } */
* ,{ backup_storage_directory, String, "backup".to_string(), true } */
);
};
}
Expand Down Expand Up @@ -370,6 +371,7 @@ mod tests {
(BACKUP_STORAGE_DIRECTORY_KEY, "a"),
(TELEMETRY_ENABLED_KEY, "false"),
(MAX_CONCURRENT_CREATING_STREAMING_JOBS_KEY, "1"),
(PAUSE_ON_NEXT_BOOTSTRAP_KEY, "false"),
];

// To kv - missing field.
Expand Down
4 changes: 4 additions & 0 deletions src/common/src/system_param/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ impl SystemParamsReader {
self.prost.telemetry_enabled.unwrap()
}

pub fn pause_on_next_bootstrap(&self) -> bool {
self.prost.pause_on_next_bootstrap.unwrap_or(false)
}

pub fn to_kv(&self) -> Vec<(String, String)> {
system_params_to_kv(&self.prost).unwrap()
}
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,4 @@ backup_storage_url = "memory"
backup_storage_directory = "backup"
telemetry_enabled = true
max_concurrent_creating_streaming_jobs = 1
pause_on_next_bootstrap = false
12 changes: 2 additions & 10 deletions src/meta/src/backup_restore/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ mod tests {
use clap::Parser;
use itertools::Itertools;
use risingwave_backup::meta_snapshot::{ClusterMetadata, MetaSnapshot};
use risingwave_common::config::SystemConfig;
use risingwave_pb::hummock::HummockVersion;
use risingwave_pb::meta::SystemParams;

Expand Down Expand Up @@ -318,18 +319,9 @@ mod tests {

fn get_system_params() -> SystemParams {
SystemParams {
barrier_interval_ms: Some(101),
checkpoint_frequency: Some(102),
sstable_size_mb: Some(103),
block_size_kb: Some(104),
bloom_false_positive: Some(0.1),
state_store: Some("state_store".to_string()),
data_directory: Some("data_directory".to_string()),
backup_storage_url: Some("backup_storage_url".to_string()),
backup_storage_directory: Some("backup_storage_directory".to_string()),
telemetry_enabled: Some(false),
parallel_compact_size_mb: Some(255),
max_concurrent_creating_streaming_jobs: Some(1),
..SystemConfig::default().into_init_system_params()
}
}

Expand Down
97 changes: 79 additions & 18 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use super::info::BarrierActorInfo;
use super::trace::TracedEpoch;
use crate::barrier::CommandChanges;
use crate::manager::{FragmentManagerRef, WorkerId};
use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments};
use crate::model::{ActorId, DispatcherId, FragmentId, PausedReason, TableFragments};
use crate::storage::MetaStore;
use crate::stream::{build_actor_connector_splits, SourceManagerRef, SplitAssignment};
use crate::MetaResult;
Expand Down Expand Up @@ -79,6 +79,15 @@ pub enum Command {
/// After the barrier is collected, it does nothing.
Plain(Option<Mutation>),

/// `Pause` command generates a `Pause` barrier with the provided [`PausedReason`] **only if**
/// the cluster is not already paused. Otherwise, a barrier with no mutation will be generated.
Pause(PausedReason),

/// `Resume` command generates a `Resume` barrier with the provided [`PausedReason`] **only
/// if** the cluster is paused with the same reason. Otherwise, a barrier with no mutation
/// will be generated.
Resume(PausedReason),

/// `DropStreamingJobs` command generates a `Stop` barrier by the given
/// [`HashSet<TableId>`]. The catalog has ensured that these streaming jobs are safe to be
/// dropped by reference counts before.
Expand Down Expand Up @@ -142,18 +151,20 @@ impl Command {
Self::Plain(None)
}

pub fn pause() -> Self {
Self::Plain(Some(Mutation::Pause(PauseMutation {})))
pub fn pause(reason: PausedReason) -> Self {
Self::Pause(reason)
}

pub fn resume() -> Self {
Self::Plain(Some(Mutation::Resume(ResumeMutation {})))
pub fn resume(reason: PausedReason) -> Self {
Self::Resume(reason)
}

/// Changes to the actors to be sent or collected after this command is committed.
pub fn changes(&self) -> CommandChanges {
match self {
Command::Plain(_) => CommandChanges::None,
Command::Pause(_) => CommandChanges::None,
Command::Resume(_) => CommandChanges::None,
Command::CreateStreamingJob {
table_fragments, ..
} => CommandChanges::CreateTable(table_fragments.table_id()),
Expand Down Expand Up @@ -189,15 +200,15 @@ impl Command {
/// injection. return true.
pub fn should_pause_inject_barrier(&self) -> bool {
// Note: the meaning for `Pause` is not pausing the periodic barrier injection, but for
// pausing the sources on compute nodes. However, `Pause` is used for configuration change
// like scaling and migration, which must pause the concurrent checkpoint to ensure the
// pausing the sources on compute nodes. However, when `Pause` is used for configuration
// change like scaling and migration, it must pause the concurrent checkpoint to ensure the
// previous checkpoint has been done.
matches!(self, Self::Plain(Some(Mutation::Pause(_))))
matches!(self, Self::Pause(PausedReason::ConfigChange))
}

pub fn need_checkpoint(&self) -> bool {
// todo! Reviewing the flow of different command to reduce the amount of checkpoint
!matches!(self, Command::Plain(None | Some(Mutation::Resume(_))))
!matches!(self, Command::Plain(None) | Command::Resume(_))
}
}

Expand All @@ -215,6 +226,8 @@ pub struct CommandContext<S: MetaStore> {
pub prev_epoch: TracedEpoch,
pub curr_epoch: TracedEpoch,

pub current_paused_reason: Option<PausedReason>,

pub command: Command,

pub kind: BarrierKind,
Expand All @@ -237,6 +250,7 @@ impl<S: MetaStore> CommandContext<S> {
info: BarrierActorInfo,
prev_epoch: TracedEpoch,
curr_epoch: TracedEpoch,
current_paused_reason: Option<PausedReason>,
command: Command,
kind: BarrierKind,
source_manager: SourceManagerRef<S>,
Expand All @@ -248,6 +262,7 @@ impl<S: MetaStore> CommandContext<S> {
info: Arc::new(info),
prev_epoch,
curr_epoch,
current_paused_reason,
command,
kind,
source_manager,
Expand All @@ -265,6 +280,24 @@ where
let mutation = match &self.command {
Command::Plain(mutation) => mutation.clone(),

Command::Pause(_) => {
// Only pause when the cluster is not already paused.
if self.current_paused_reason.is_none() {
Some(Mutation::Pause(PauseMutation {}))
} else {
None
}
}

Command::Resume(reason) => {
// Only resume when the cluster is paused with the same reason.
if self.current_paused_reason == Some(*reason) {
Some(Mutation::Resume(ResumeMutation {}))
} else {
None
}
}

Command::SourceSplitAssignment(change) => {
let mut diff = HashMap::new();

Expand Down Expand Up @@ -308,6 +341,8 @@ where
actor_dispatchers,
added_actors,
actor_splits,
// If the cluster is already paused, the new actors should be paused too.
pause: self.current_paused_reason.is_some(),
}))
}

Expand Down Expand Up @@ -478,6 +513,31 @@ where
Ok(mutation)
}

/// Returns the paused reason after executing the current command.
pub fn next_paused_reason(&self) -> Option<PausedReason> {
match &self.command {
Command::Pause(reason) => {
// Only pause when the cluster is not already paused.
if self.current_paused_reason.is_none() {
Some(*reason)
} else {
self.current_paused_reason
}
}

Command::Resume(reason) => {
// Only resume when the cluster is paused with the same reason.
if self.current_paused_reason == Some(*reason) {
None
} else {
self.current_paused_reason
}
}

_ => self.current_paused_reason,
}
}

/// For `CreateStreamingJob`, returns the actors of the `Chain` nodes. For other commands,
/// returns an empty set.
pub fn actors_to_track(&self) -> HashSet<ActorId> {
Expand Down Expand Up @@ -562,18 +622,19 @@ where
/// the given command.
pub async fn post_collect(&self) -> MetaResult<()> {
match &self.command {
#[allow(clippy::single_match)]
Command::Plain(mutation) => match mutation {
// After the `Pause` barrier is collected and committed, we must ensure that the
// storage version with this epoch is synced to all compute nodes before the
// execution of the next command of `Update`, as some newly created operators may
// immediately initialize their states on that barrier.
Some(Mutation::Pause(..)) => {
Command::Plain(_) => {}

Command::Pause(reason) => {
if let PausedReason::ConfigChange = reason {
// After the `Pause` barrier is collected and committed, we must ensure that the
// storage version with this epoch is synced to all compute nodes before the
// execution of the next command of `Update`, as some newly created operators
// may immediately initialize their states on that barrier.
self.wait_epoch_commit(self.prev_epoch.value().0).await?;
}
}

_ => {}
},
Command::Resume(_) => {}

Command::SourceSplitAssignment(split_assignment) => {
self.fragment_manager
Expand Down
Loading