Skip to content

Commit 9a53750

Browse files
committed
Remove revision var, update usage in FragmentManagerCore, and add new revisions.
1 parent 819a754 commit 9a53750

File tree

5 files changed

+70
-41
lines changed

5 files changed

+70
-41
lines changed

src/meta/src/barrier/command.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use crate::manager::{FragmentManagerRef, WorkerId};
4040
use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments};
4141
use crate::storage::MetaStore;
4242
use crate::stream::{
43-
build_actor_connector_splits, RescheduleRevision, SourceManagerRef, SplitAssignment,
43+
build_actor_connector_splits, SourceManagerRef, SplitAssignment, TableRevision,
4444
};
4545
use crate::MetaResult;
4646

@@ -120,7 +120,6 @@ pub enum Command {
120120
/// very similar to `Create` and `Drop` commands, for added and removed actors, respectively.
121121
RescheduleFragment {
122122
reschedules: HashMap<FragmentId, Reschedule>,
123-
revision: RescheduleRevision,
124123
},
125124

126125
/// `ReplaceTable` command generates a `Update` barrier with the given `merge_updates`. This is
@@ -620,7 +619,6 @@ where
620619

621620
Command::RescheduleFragment {
622621
reschedules,
623-
revision,
624622
} => {
625623
let mut node_dropped_actors = HashMap::new();
626624
for table_fragments in self.fragment_manager.list_table_fragments().await? {
@@ -647,7 +645,7 @@ where
647645

648646
// Update fragment info after rescheduling in meta store.
649647
self.fragment_manager
650-
.post_apply_reschedules(reschedules.clone(), *revision)
648+
.post_apply_reschedules(reschedules.clone())
651649
.await?;
652650

653651
let mut stream_source_actor_splits = HashMap::new();

src/meta/src/manager/catalog/fragment.rs

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,12 @@ use crate::model::{
4242
ValTransaction,
4343
};
4444
use crate::storage::{MetaStore, Transaction};
45-
use crate::stream::{RescheduleRevision, SplitAssignment};
45+
use crate::stream::{SplitAssignment, TableRevision};
4646
use crate::MetaResult;
4747

4848
pub struct FragmentManagerCore {
4949
table_fragments: BTreeMap<TableId, TableFragments>,
50+
table_revision: TableRevision,
5051
}
5152

5253
impl FragmentManagerCore {
@@ -102,9 +103,14 @@ where
102103
.map(|tf| (tf.table_id(), tf))
103104
.collect();
104105

106+
let table_revision = TableRevision::get(env.meta_store()).await?;
107+
105108
Ok(Self {
106109
env,
107-
core: RwLock::new(FragmentManagerCore { table_fragments }),
110+
core: RwLock::new(FragmentManagerCore {
111+
table_fragments,
112+
table_revision,
113+
}),
108114
})
109115
}
110116

@@ -118,6 +124,10 @@ where
118124
Ok(map.values().cloned().collect())
119125
}
120126

127+
pub async fn get_revision(&self) -> TableRevision {
128+
self.core.read().await.table_revision
129+
}
130+
121131
pub async fn has_any_table_fragments(&self) -> bool {
122132
!self.core.read().await.table_fragments.is_empty()
123133
}
@@ -347,7 +357,10 @@ where
347357
/// Drop table fragments info and remove downstream actor infos in fragments from its dependent
348358
/// tables.
349359
pub async fn drop_table_fragments_vec(&self, table_ids: &HashSet<TableId>) -> MetaResult<()> {
350-
let map = &mut self.core.write().await.table_fragments;
360+
let mut guard = self.core.write().await;
361+
let current_revision = guard.table_revision;
362+
363+
let map = &mut guard.table_fragments;
351364
let to_delete_table_fragments = table_ids
352365
.iter()
353366
.filter_map(|table_id| map.get(table_id).cloned())
@@ -385,7 +398,19 @@ where
385398
});
386399
}
387400
}
388-
commit_meta!(self, table_fragments)?;
401+
402+
// new empty transaction
403+
let mut trx = Transaction::default();
404+
405+
// save next revision
406+
let next_revision = current_revision.next();
407+
next_revision.store(&mut trx);
408+
409+
// commit
410+
commit_meta_with_trx!(self, trx, table_fragments)?;
411+
412+
// update revision in memory
413+
guard.table_revision = next_revision;
389414

390415
for table_fragments in to_delete_table_fragments {
391416
if table_fragments.state() != State::Initial {
@@ -605,9 +630,11 @@ where
605630
pub async fn post_apply_reschedules(
606631
&self,
607632
mut reschedules: HashMap<FragmentId, Reschedule>,
608-
revision: RescheduleRevision,
609633
) -> MetaResult<()> {
610-
let map = &mut self.core.write().await.table_fragments;
634+
let mut guard = self.core.write().await;
635+
let current_version = guard.table_revision;
636+
637+
let map = &mut guard.table_fragments;
611638

612639
fn update_actors(
613640
actors: &mut Vec<ActorId>,
@@ -822,10 +849,20 @@ where
822849
}
823850

824851
assert!(reschedules.is_empty(), "all reschedules must be applied");
852+
853+
// new empty transaction
825854
let mut trx = Transaction::default();
826-
revision.store(&mut trx);
855+
856+
// save next revision
857+
let next_revision = current_version.next();
858+
next_revision.store(&mut trx);
859+
860+
// commit
827861
commit_meta_with_trx!(self, trx, table_fragments)?;
828862

863+
// update revision in memory
864+
guard.table_revision = next_revision;
865+
829866
for mapping in fragment_mapping_to_notify {
830867
self.env
831868
.notification_manager()

src/meta/src/rpc/service/scale_service.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -139,17 +139,16 @@ where
139139

140140
let _guard = self.stream_manager.reschedule_revision_lock.lock().await;
141141

142-
let current_revision = self.stream_manager.reschedule_revision().await?;
142+
let current_revision = self.fragment_manager.get_revision().await;
143143

144144
if req.revision != current_revision.inner() {
145145
return Ok(Response::new(RescheduleResponse {
146146
success: false,
147-
revision: current_revision.into(),
147+
revision: current_revision.inner(),
148148
}));
149149
}
150150

151-
let next_revision = self
152-
.stream_manager
151+
self.stream_manager
153152
.reschedule_actors(
154153
req.reschedules
155154
.into_iter()
@@ -176,10 +175,11 @@ where
176175
)
177176
})
178177
.collect(),
179-
current_revision,
180178
)
181179
.await?;
182180

181+
let next_revision = self.fragment_manager.get_revision().await;
182+
183183
Ok(Response::new(RescheduleResponse {
184184
success: true,
185185
revision: next_revision.into(),

src/meta/src/stream/scale.rs

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -44,17 +44,17 @@ use crate::stream::GlobalStreamManager;
4444
use crate::{MetaError, MetaResult};
4545

4646
#[derive(Copy, Clone, Debug)]
47-
pub struct RescheduleRevision(u64);
47+
pub struct TableRevision(u64);
4848

4949
const RESCHEDULE_VERSION_KEY: &[u8] = b"reschedule_version";
5050

51-
impl From<RescheduleRevision> for u64 {
52-
fn from(value: RescheduleRevision) -> Self {
51+
impl From<TableRevision> for u64 {
52+
fn from(value: TableRevision) -> Self {
5353
value.0
5454
}
5555
}
5656

57-
impl RescheduleRevision {
57+
impl TableRevision {
5858
pub async fn get<S>(store: &S) -> MetaResult<Self>
5959
where
6060
S: MetaStore,
@@ -71,8 +71,8 @@ impl RescheduleRevision {
7171
Ok(Self(version))
7272
}
7373

74-
pub fn increase(&self) -> Self {
75-
RescheduleRevision(self.0 + 1)
74+
pub fn next(&self) -> Self {
75+
TableRevision(self.0 + 1)
7676
}
7777

7878
pub fn store(&self, txn: &mut Transaction) {
@@ -591,29 +591,26 @@ where
591591
pub async fn reschedule_actors(
592592
&self,
593593
reschedules: HashMap<FragmentId, ParallelUnitReschedule>,
594-
current_revision: RescheduleRevision,
595-
) -> MetaResult<RescheduleRevision> {
594+
) -> MetaResult<()> {
596595
let mut revert_funcs = vec![];
597-
match self
598-
.reschedule_actors_impl(&mut revert_funcs, reschedules, current_revision)
596+
if let Err(e) = self
597+
.reschedule_actors_impl(&mut revert_funcs, reschedules)
599598
.await
600599
{
601-
Err(e) => {
602-
for revert_func in revert_funcs.into_iter().rev() {
603-
revert_func.await;
604-
}
605-
Err(e)
600+
for revert_func in revert_funcs.into_iter().rev() {
601+
revert_func.await;
606602
}
607-
Ok(revision) => Ok(revision),
603+
return Err(e);
608604
}
605+
606+
Ok(())
609607
}
610608

611609
async fn reschedule_actors_impl(
612610
&self,
613611
revert_funcs: &mut Vec<BoxFuture<'_, ()>>,
614612
mut reschedules: HashMap<FragmentId, ParallelUnitReschedule>,
615-
current_revision: RescheduleRevision,
616-
) -> MetaResult<RescheduleRevision> {
613+
) -> MetaResult<()> {
617614
let ctx = self.build_reschedule_context(&mut reschedules).await?;
618615
// Index of actors to create/remove
619616
// Fragment Id => ( Actor Id => Parallel Unit Id )
@@ -1175,16 +1172,13 @@ where
11751172

11761173
tracing::debug!("reschedule plan: {:#?}", reschedule_fragment);
11771174

1178-
let next_revision = current_revision.increase();
1179-
11801175
self.barrier_scheduler
11811176
.run_command_with_paused(Command::RescheduleFragment {
11821177
reschedules: reschedule_fragment,
1183-
revision: next_revision,
11841178
})
11851179
.await?;
11861180

1187-
Ok(next_revision)
1181+
Ok(())
11881182
}
11891183

11901184
async fn create_actors_on_compute_node(

src/meta/src/stream/stream_manager.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use crate::hummock::HummockManagerRef;
3434
use crate::manager::{ClusterManagerRef, FragmentManagerRef, MetaSrvEnv};
3535
use crate::model::{ActorId, TableFragments};
3636
use crate::storage::MetaStore;
37-
use crate::stream::{RescheduleRevision, SourceManagerRef};
37+
use crate::stream::{TableRevision, SourceManagerRef};
3838
use crate::{MetaError, MetaResult};
3939

4040
pub type GlobalStreamManagerRef<S> = Arc<GlobalStreamManager<S>>;
@@ -540,8 +540,8 @@ where
540540
self.creating_job_info.cancel_jobs(table_ids).await;
541541
}
542542

543-
pub async fn reschedule_revision(&self) -> MetaResult<RescheduleRevision> {
544-
RescheduleRevision::get(self.env.meta_store()).await
543+
pub async fn reschedule_revision(&self) -> MetaResult<TableRevision> {
544+
TableRevision::get(self.env.meta_store()).await
545545
}
546546
}
547547

0 commit comments

Comments
 (0)