Skip to content

refactor(meta): refine pause/resume response #12079

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 26 additions & 11 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,30 @@ message FlushResponse {
hummock.HummockSnapshot snapshot = 2;
}

// The reason why the data sources in the cluster are paused.
enum PausedReason {
PAUSED_REASON_UNSPECIFIED = 0;
// The cluster is paused due to configuration change, e.g. altering table schema and scaling.
PAUSED_REASON_CONFIG_CHANGE = 1;
// The cluster is paused due to manual operation, e.g. `risectl` command or the
// `pause_on_next_bootstrap` system variable.
PAUSED_REASON_MANUAL = 2;
}

message PauseRequest {}

message PauseResponse {
optional PausedReason prev = 1;
optional PausedReason curr = 2;
}

message ResumeRequest {}

message ResumeResponse {
optional PausedReason prev = 1;
optional PausedReason curr = 2;
}

message CancelCreatingJobsRequest {
message CreatingJobInfo {
uint32 database_id = 1;
Expand Down Expand Up @@ -215,6 +239,8 @@ message ListActorStatesResponse {

service StreamManagerService {
rpc Flush(FlushRequest) returns (FlushResponse);
rpc Pause(PauseRequest) returns (PauseResponse);
rpc Resume(ResumeRequest) returns (ResumeResponse);
rpc CancelCreatingJobs(CancelCreatingJobsRequest) returns (CancelCreatingJobsResponse);
rpc ListTableFragments(ListTableFragmentsRequest) returns (ListTableFragmentsResponse);
rpc ListTableFragmentStates(ListTableFragmentStatesRequest) returns (ListTableFragmentStatesResponse);
Expand Down Expand Up @@ -389,14 +415,6 @@ service NotificationService {
rpc Subscribe(SubscribeRequest) returns (stream SubscribeResponse);
}

message PauseRequest {}

message PauseResponse {}

message ResumeRequest {}

message ResumeResponse {}

message GetClusterInfoRequest {}

message GetClusterInfoResponse {
Expand Down Expand Up @@ -455,9 +473,6 @@ message GetReschedulePlanResponse {
}

service ScaleService {
// TODO(Kexiang): delete them when config change interface is finished
rpc Pause(PauseRequest) returns (PauseResponse);
rpc Resume(ResumeRequest) returns (ResumeResponse);
rpc GetClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse);
rpc Reschedule(RescheduleRequest) returns (RescheduleResponse);
rpc GetReschedulePlan(GetReschedulePlanRequest) returns (GetReschedulePlanResponse);
Expand Down
28 changes: 24 additions & 4 deletions src/ctl/src/cmd_impl/meta/pause_resume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,44 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_pb::meta::PausedReason;

use crate::CtlContext;

fn desc(reason: PausedReason) -> &'static str {
// Method on optional enums derived from `prost` will use `Unspecified` if unset. So we treat
// `Unspecified` as not paused here.
match reason {
PausedReason::Unspecified => "not paused",
PausedReason::ConfigChange => "paused due to configuration change",
PausedReason::Manual => "paused manually",
}
}

pub async fn pause(context: &CtlContext) -> anyhow::Result<()> {
let meta_client = context.meta_client().await?;

meta_client.pause().await?;
let response = meta_client.pause().await?;

println!("Paused");
println!(
"Done.\nPrevious: {}\nCurrent: {}",
desc(response.prev()),
desc(response.curr())
);

Ok(())
}

pub async fn resume(context: &CtlContext) -> anyhow::Result<()> {
let meta_client = context.meta_client().await?;

meta_client.resume().await?;
let response = meta_client.resume().await?;

println!("Resumed");
println!(
"Done.\nPrevious: {}\nCurrent: {}",
desc(response.prev()),
desc(response.curr())
);

Ok(())
}
3 changes: 2 additions & 1 deletion src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use risingwave_common::catalog::TableId;
use risingwave_common::hash::ActorMapping;
use risingwave_connector::source::SplitImpl;
use risingwave_hummock_sdk::HummockEpoch;
use risingwave_pb::meta::PausedReason;
use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
use risingwave_pb::stream_plan::barrier::{BarrierKind, Mutation};
use risingwave_pb::stream_plan::update_mutation::*;
Expand All @@ -36,7 +37,7 @@ use super::info::BarrierActorInfo;
use super::trace::TracedEpoch;
use crate::barrier::CommandChanges;
use crate::manager::{FragmentManagerRef, WorkerId};
use crate::model::{ActorId, DispatcherId, FragmentId, PausedReason, TableFragments};
use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments};
use crate::stream::{build_actor_connector_splits, SourceManagerRef, SplitAssignment};
use crate::MetaResult;

Expand Down
30 changes: 22 additions & 8 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use risingwave_hummock_sdk::{ExtendedSstableInfo, HummockSstableObjectId};
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::table_fragments::actor_status::ActorState;
use risingwave_pb::meta::PausedReason;
use risingwave_pb::stream_plan::barrier::BarrierKind;
use risingwave_pb::stream_plan::Barrier;
use risingwave_pb::stream_service::{
Expand All @@ -48,6 +49,7 @@ use self::command::CommandContext;
use self::info::BarrierActorInfo;
use self::notifier::Notifier;
use self::progress::TrackingCommand;
use crate::barrier::notifier::BarrierInfo;
use crate::barrier::progress::CreateMviewProgressTracker;
use crate::barrier::BarrierEpochState::{Completed, InFlight};
use crate::hummock::HummockManagerRef;
Expand All @@ -56,7 +58,7 @@ use crate::manager::{
CatalogManagerRef, ClusterManagerRef, FragmentManagerRef, LocalNotification, MetaSrvEnv,
WorkerId,
};
use crate::model::{ActorId, BarrierManagerState, PausedReason};
use crate::model::{ActorId, BarrierManagerState};
use crate::rpc::metrics::MetaMetrics;
use crate::stream::SourceManagerRef;
use crate::{MetaError, MetaResult};
Expand Down Expand Up @@ -671,7 +673,7 @@ impl GlobalBarrierManager {

let Scheduled {
command,
notifiers,
mut notifiers,
send_latency_timer,
checkpoint,
span,
Expand All @@ -695,25 +697,37 @@ impl GlobalBarrierManager {
self.fragment_manager.clone(),
self.env.stream_client_pool_ref(),
info,
prev_epoch,
curr_epoch,
prev_epoch.clone(),
curr_epoch.clone(),
state.paused_reason(),
command,
kind,
self.source_manager.clone(),
span.clone(),
));
let mut notifiers = notifiers;
notifiers.iter_mut().for_each(Notifier::notify_to_send);

send_latency_timer.observe_duration();

checkpoint_control.enqueue_command(command_ctx.clone(), notifiers);
self.inject_barrier(command_ctx.clone(), barrier_complete_tx)
.instrument(span)
.await;

// Notify about the injection.
let prev_paused_reason = state.paused_reason();
let curr_paused_reason = command_ctx.next_paused_reason();

let info = BarrierInfo {
prev_epoch: prev_epoch.value(),
curr_epoch: curr_epoch.value(),
prev_paused_reason,
curr_paused_reason,
};
notifiers.iter_mut().for_each(|n| n.notify_injected(info));

// Update the paused state after the barrier is injected.
state.set_paused_reason(command_ctx.next_paused_reason());
state.set_paused_reason(curr_paused_reason);
// Record the in-flight barrier.
checkpoint_control.enqueue_command(command_ctx.clone(), notifiers);
}

/// Inject a barrier to all CNs and spawn a task to collect it
Expand Down
24 changes: 18 additions & 6 deletions src/meta/src/barrier/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,27 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::util::epoch::Epoch;
use risingwave_pb::meta::PausedReason;
use tokio::sync::oneshot;

use crate::{MetaError, MetaResult};

/// The barrier info sent back to the caller when a barrier is injected.
#[derive(Debug, Clone, Copy)]
pub struct BarrierInfo {
pub prev_epoch: Epoch,
pub curr_epoch: Epoch,

pub prev_paused_reason: Option<PausedReason>,
pub curr_paused_reason: Option<PausedReason>,
}

/// Used for notifying the status of a scheduled command/barrier.
#[derive(Debug, Default)]
pub(super) struct Notifier {
/// Get notified when scheduled barrier is about to send.
pub to_send: Option<oneshot::Sender<()>>,
/// Get notified when scheduled barrier is injected to compute nodes.
pub injected: Option<oneshot::Sender<BarrierInfo>>,

/// Get notified when scheduled barrier is collected or failed.
pub collected: Option<oneshot::Sender<MetaResult<()>>>,
Expand All @@ -30,10 +42,10 @@ pub(super) struct Notifier {
}

impl Notifier {
/// Notify when we are about to send a barrier.
pub fn notify_to_send(&mut self) {
if let Some(tx) = self.to_send.take() {
tx.send(()).ok();
/// Notify when we have injected a barrier to compute nodes.
pub fn notify_injected(&mut self, info: BarrierInfo) {
if let Some(tx) = self.injected.take() {
tx.send(info).ok();
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::time::{Duration, Instant};
use futures::future::try_join_all;
use itertools::Itertools;
use risingwave_pb::common::ActorInfo;
use risingwave_pb::meta::PausedReason;
use risingwave_pb::stream_plan::barrier::{BarrierKind, Mutation};
use risingwave_pb::stream_plan::AddMutation;
use risingwave_pb::stream_service::{
Expand All @@ -33,7 +34,7 @@ use crate::barrier::command::CommandContext;
use crate::barrier::info::BarrierActorInfo;
use crate::barrier::{CheckpointControl, Command, GlobalBarrierManager};
use crate::manager::WorkerId;
use crate::model::{BarrierManagerState, MigrationPlan, PausedReason};
use crate::model::{BarrierManagerState, MigrationPlan};
use crate::stream::build_actor_connector_splits;
use crate::MetaResult;

Expand Down
53 changes: 31 additions & 22 deletions src/meta/src/barrier/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ use std::time::Instant;
use anyhow::anyhow;
use risingwave_common::catalog::TableId;
use risingwave_pb::hummock::HummockSnapshot;
use risingwave_pb::meta::PausedReason;
use tokio::sync::{oneshot, watch, RwLock};

use super::notifier::Notifier;
use super::notifier::{BarrierInfo, Notifier};
use super::{Command, Scheduled};
use crate::hummock::HummockManagerRef;
use crate::model::PausedReason;
use crate::rpc::metrics::MetaMetrics;
use crate::{MetaError, MetaResult};

Expand Down Expand Up @@ -237,42 +237,41 @@ impl BarrierScheduler {
/// Run multiple commands and return when they're all completely finished. It's ensured that
/// multiple commands are executed continuously.
///
/// Returns the barrier info of each command.
///
/// TODO: atomicity of multiple commands is not guaranteed.
pub async fn run_multiple_commands(&self, commands: Vec<Command>) -> MetaResult<()> {
struct Context {
collect_rx: oneshot::Receiver<MetaResult<()>>,
finish_rx: oneshot::Receiver<()>,
}

async fn run_multiple_commands(&self, commands: Vec<Command>) -> MetaResult<Vec<BarrierInfo>> {
let mut contexts = Vec::with_capacity(commands.len());
let mut scheduleds = Vec::with_capacity(commands.len());

for command in commands {
let (injected_tx, injected_rx) = oneshot::channel();
let (collect_tx, collect_rx) = oneshot::channel();
let (finish_tx, finish_rx) = oneshot::channel();

contexts.push(Context {
collect_rx,
finish_rx,
});
contexts.push((injected_rx, collect_rx, finish_rx));
scheduleds.push(self.inner.new_scheduled(
command.need_checkpoint(),
command,
once(Notifier {
injected: Some(injected_tx),
collected: Some(collect_tx),
finished: Some(finish_tx),
..Default::default()
}),
));
}

self.push(scheduleds).await?;

for Context {
collect_rx,
finish_rx,
} in contexts
{
let mut infos = Vec::with_capacity(contexts.len());

for (injected_rx, collect_rx, finish_rx) in contexts {
// Wait for this command to be injected, and record the result.
let info = injected_rx
.await
.map_err(|e| anyhow!("failed to inject barrier: {}", e))?;
infos.push(info);

// Throw the error if it occurs when collecting this barrier.
collect_rx
.await
Expand All @@ -284,23 +283,33 @@ impl BarrierScheduler {
.map_err(|e| anyhow!("failed to finish command: {}", e))?;
}

Ok(())
Ok(infos)
}

/// Run a command with a `Pause` command before and `Resume` command after it. Used for
/// configuration change.
pub async fn run_config_change_command_with_pause(&self, command: Command) -> MetaResult<()> {
///
/// Returns the barrier info of the actual command.
pub async fn run_config_change_command_with_pause(
&self,
command: Command,
) -> MetaResult<BarrierInfo> {
self.run_multiple_commands(vec![
Command::pause(PausedReason::ConfigChange),
command,
Command::resume(PausedReason::ConfigChange),
])
.await
.map(|i| i[1])
}

/// Run a command and return when it's completely finished.
pub async fn run_command(&self, command: Command) -> MetaResult<()> {
self.run_multiple_commands(vec![command]).await
///
/// Returns the barrier info of the actual command.
pub async fn run_command(&self, command: Command) -> MetaResult<BarrierInfo> {
self.run_multiple_commands(vec![command])
.await
.map(|i| i[0])
}

/// Flush means waiting for the next barrier to collect.
Expand Down
Loading