Skip to content

Commit 25499e3

Browse files
fix(recovery): wait_epoch should be called in recovery (close risingwavelabs#8467) (risingwavelabs#8468)
1 parent 70f46f1 commit 25499e3

File tree

3 files changed

+33
-13
lines changed

3 files changed

+33
-13
lines changed

src/meta/src/barrier/command.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use risingwave_common::catalog::TableId;
2121
use risingwave_common::hash::ActorMapping;
2222
use risingwave_common::util::epoch::Epoch;
2323
use risingwave_connector::source::SplitImpl;
24+
use risingwave_hummock_sdk::HummockEpoch;
2425
use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
2526
use risingwave_pb::stream_plan::add_mutation::Dispatchers;
2627
use risingwave_pb::stream_plan::barrier::Mutation;
@@ -493,6 +494,18 @@ where
493494
Ok(())
494495
}
495496

497+
pub async fn wait_epoch_commit(&self, epoch: HummockEpoch) -> MetaResult<()> {
498+
let futures = self.info.node_map.values().map(|worker_node| async {
499+
let client = self.client_pool.get(worker_node).await?;
500+
let request = WaitEpochCommitRequest { epoch };
501+
client.wait_epoch_commit(request).await
502+
});
503+
504+
try_join_all(futures).await?;
505+
506+
Ok(())
507+
}
508+
496509
/// Do some stuffs after barriers are collected and the new storage version is committed, for
497510
/// the given command.
498511
pub async fn post_collect(&self) -> MetaResult<()> {
@@ -504,15 +517,7 @@ where
504517
// execution of the next command of `Update`, as some newly created operators may
505518
// immediately initialize their states on that barrier.
506519
Some(Mutation::Pause(..)) => {
507-
let futures = self.info.node_map.values().map(|worker_node| async {
508-
let client = self.client_pool.get(worker_node).await?;
509-
let request = WaitEpochCommitRequest {
510-
epoch: self.prev_epoch.0,
511-
};
512-
client.wait_epoch_commit(request).await
513-
});
514-
515-
try_join_all(futures).await?;
520+
self.wait_epoch_commit(self.prev_epoch.0).await?;
516521
}
517522

518523
_ => {}

src/meta/src/barrier/recovery.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,21 @@ where
166166
self.source_manager.clone(),
167167
));
168168

169+
#[cfg(not(all(test, feature = "failpoints")))]
170+
{
171+
use risingwave_common::util::epoch::INVALID_EPOCH;
172+
173+
let mce = self
174+
.hummock_manager
175+
.get_current_version()
176+
.await
177+
.max_committed_epoch;
178+
179+
if mce != INVALID_EPOCH {
180+
command_ctx.wait_epoch_commit(mce).await?;
181+
}
182+
}
183+
169184
let (barrier_complete_tx, mut barrier_complete_rx) =
170185
tokio::sync::mpsc::unbounded_channel();
171186
self.inject_barrier(command_ctx.clone(), barrier_complete_tx)

src/meta/src/stream/stream_manager.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -668,7 +668,7 @@ mod tests {
668668
}
669669

670670
impl MockServices {
671-
async fn start(host: &str, port: u16) -> MetaResult<Self> {
671+
async fn start(host: &str, port: u16, enable_recovery: bool) -> MetaResult<Self> {
672672
let addr = SocketAddr::new(host.parse().unwrap(), port);
673673
let state = Arc::new(FakeFragmentState {
674674
actor_streams: Mutex::new(HashMap::new()),
@@ -692,7 +692,7 @@ mod tests {
692692

693693
sleep(Duration::from_secs(1)).await;
694694

695-
let env = MetaSrvEnv::for_test_opts(Arc::new(MetaOpts::test(true))).await;
695+
let env = MetaSrvEnv::for_test_opts(Arc::new(MetaOpts::test(enable_recovery))).await;
696696
let system_params = env.system_params_manager().get_params().await;
697697
let meta_metrics = Arc::new(MetaMetrics::new());
698698
let cluster_manager =
@@ -868,7 +868,7 @@ mod tests {
868868

869869
#[tokio::test]
870870
async fn test_drop_materialized_view() -> MetaResult<()> {
871-
let services = MockServices::start("127.0.0.1", 12334).await?;
871+
let services = MockServices::start("127.0.0.1", 12334, false).await?;
872872

873873
let table_id = TableId::new(0);
874874
let actors = make_mview_stream_actors(&table_id, 4);
@@ -926,7 +926,7 @@ mod tests {
926926
async fn test_failpoints_drop_mv_recovery() {
927927
let inject_barrier_err = "inject_barrier_err";
928928
let inject_barrier_err_success = "inject_barrier_err_success";
929-
let services = MockServices::start("127.0.0.1", 12335).await.unwrap();
929+
let services = MockServices::start("127.0.0.1", 12335, true).await.unwrap();
930930

931931
let table_id = TableId::new(0);
932932
let actors = make_mview_stream_actors(&table_id, 4);

0 commit comments

Comments
 (0)