@@ -36,7 +36,7 @@ use super::info::BarrierActorInfo;
36
36
use super :: trace:: TracedEpoch ;
37
37
use crate :: barrier:: CommandChanges ;
38
38
use crate :: manager:: { FragmentManagerRef , WorkerId } ;
39
- use crate :: model:: { ActorId , DispatcherId , FragmentId , TableFragments } ;
39
+ use crate :: model:: { ActorId , DispatcherId , FragmentId , PausedReason , TableFragments } ;
40
40
use crate :: storage:: MetaStore ;
41
41
use crate :: stream:: { build_actor_connector_splits, SourceManagerRef , SplitAssignment } ;
42
42
use crate :: MetaResult ;
@@ -79,6 +79,15 @@ pub enum Command {
79
79
/// After the barrier is collected, it does nothing.
80
80
Plain ( Option < Mutation > ) ,
81
81
82
+ /// `Pause` command generates a `Pause` barrier with the provided [`PausedReason`] **only if**
83
+ /// the cluster is not already paused. Otherwise, a barrier with no mutation will be generated.
84
+ Pause ( PausedReason ) ,
85
+
86
+ /// `Resume` command generates a `Resume` barrier with the provided [`PausedReason`] **only
87
+ /// if** the cluster is paused with the same reason. Otherwise, a barrier with no mutation
88
+ /// will be generated.
89
+ Resume ( PausedReason ) ,
90
+
82
91
/// `DropStreamingJobs` command generates a `Stop` barrier by the given
83
92
/// [`HashSet<TableId>`]. The catalog has ensured that these streaming jobs are safe to be
84
93
/// dropped by reference counts before.
@@ -142,18 +151,20 @@ impl Command {
142
151
Self :: Plain ( None )
143
152
}
144
153
145
- pub fn pause ( ) -> Self {
146
- Self :: Plain ( Some ( Mutation :: Pause ( PauseMutation { } ) ) )
154
+ pub fn pause ( reason : PausedReason ) -> Self {
155
+ Self :: Pause ( reason )
147
156
}
148
157
149
- pub fn resume ( ) -> Self {
150
- Self :: Plain ( Some ( Mutation :: Resume ( ResumeMutation { } ) ) )
158
+ pub fn resume ( reason : PausedReason ) -> Self {
159
+ Self :: Resume ( reason )
151
160
}
152
161
153
162
/// Changes to the actors to be sent or collected after this command is committed.
154
163
pub fn changes ( & self ) -> CommandChanges {
155
164
match self {
156
165
Command :: Plain ( _) => CommandChanges :: None ,
166
+ Command :: Pause ( _) => CommandChanges :: None ,
167
+ Command :: Resume ( _) => CommandChanges :: None ,
157
168
Command :: CreateStreamingJob {
158
169
table_fragments, ..
159
170
} => CommandChanges :: CreateTable ( table_fragments. table_id ( ) ) ,
@@ -189,15 +200,15 @@ impl Command {
189
200
/// injection. return true.
190
201
pub fn should_pause_inject_barrier ( & self ) -> bool {
191
202
// Note: the meaning for `Pause` is not pausing the periodic barrier injection, but for
192
- // pausing the sources on compute nodes. However, `Pause` is used for configuration change
193
- // like scaling and migration, which must pause the concurrent checkpoint to ensure the
203
+ // pausing the sources on compute nodes. However, when `Pause` is used for configuration
204
+ // change like scaling and migration, it must pause the concurrent checkpoint to ensure the
194
205
// previous checkpoint has been done.
195
- matches ! ( self , Self :: Plain ( Some ( Mutation :: Pause ( _ ) ) ) )
206
+ matches ! ( self , Self :: Pause ( PausedReason :: ConfigChange ) )
196
207
}
197
208
198
209
pub fn need_checkpoint ( & self ) -> bool {
199
210
// todo! Reviewing the flow of different command to reduce the amount of checkpoint
200
- !matches ! ( self , Command :: Plain ( None | Some ( Mutation :: Resume ( _) ) ) )
211
+ !matches ! ( self , Command :: Plain ( None ) | Command :: Resume ( _) )
201
212
}
202
213
}
203
214
@@ -215,6 +226,8 @@ pub struct CommandContext<S: MetaStore> {
215
226
pub prev_epoch : TracedEpoch ,
216
227
pub curr_epoch : TracedEpoch ,
217
228
229
+ pub current_paused_reason : Option < PausedReason > ,
230
+
218
231
pub command : Command ,
219
232
220
233
pub kind : BarrierKind ,
@@ -237,6 +250,7 @@ impl<S: MetaStore> CommandContext<S> {
237
250
info : BarrierActorInfo ,
238
251
prev_epoch : TracedEpoch ,
239
252
curr_epoch : TracedEpoch ,
253
+ current_paused_reason : Option < PausedReason > ,
240
254
command : Command ,
241
255
kind : BarrierKind ,
242
256
source_manager : SourceManagerRef < S > ,
@@ -248,6 +262,7 @@ impl<S: MetaStore> CommandContext<S> {
248
262
info : Arc :: new ( info) ,
249
263
prev_epoch,
250
264
curr_epoch,
265
+ current_paused_reason,
251
266
command,
252
267
kind,
253
268
source_manager,
@@ -265,6 +280,24 @@ where
265
280
let mutation = match & self . command {
266
281
Command :: Plain ( mutation) => mutation. clone ( ) ,
267
282
283
+ Command :: Pause ( _) => {
284
+ // Only pause when the cluster is not already paused.
285
+ if self . current_paused_reason . is_none ( ) {
286
+ Some ( Mutation :: Pause ( PauseMutation { } ) )
287
+ } else {
288
+ None
289
+ }
290
+ }
291
+
292
+ Command :: Resume ( reason) => {
293
+ // Only resume when the cluster is paused with the same reason.
294
+ if self . current_paused_reason == Some ( * reason) {
295
+ Some ( Mutation :: Resume ( ResumeMutation { } ) )
296
+ } else {
297
+ None
298
+ }
299
+ }
300
+
268
301
Command :: SourceSplitAssignment ( change) => {
269
302
let mut diff = HashMap :: new ( ) ;
270
303
@@ -308,6 +341,8 @@ where
308
341
actor_dispatchers,
309
342
added_actors,
310
343
actor_splits,
344
+ // If the cluster is already paused, the new actors should be paused too.
345
+ pause : self . current_paused_reason . is_some ( ) ,
311
346
} ) )
312
347
}
313
348
@@ -478,6 +513,31 @@ where
478
513
Ok ( mutation)
479
514
}
480
515
516
+ /// Returns the paused reason after executing the current command.
517
+ pub fn next_paused_reason ( & self ) -> Option < PausedReason > {
518
+ match & self . command {
519
+ Command :: Pause ( reason) => {
520
+ // Only pause when the cluster is not already paused.
521
+ if self . current_paused_reason . is_none ( ) {
522
+ Some ( * reason)
523
+ } else {
524
+ self . current_paused_reason
525
+ }
526
+ }
527
+
528
+ Command :: Resume ( reason) => {
529
+ // Only resume when the cluster is paused with the same reason.
530
+ if self . current_paused_reason == Some ( * reason) {
531
+ None
532
+ } else {
533
+ self . current_paused_reason
534
+ }
535
+ }
536
+
537
+ _ => self . current_paused_reason ,
538
+ }
539
+ }
540
+
481
541
/// For `CreateStreamingJob`, returns the actors of the `Chain` nodes. For other commands,
482
542
/// returns an empty set.
483
543
pub fn actors_to_track ( & self ) -> HashSet < ActorId > {
@@ -562,18 +622,19 @@ where
562
622
/// the given command.
563
623
pub async fn post_collect ( & self ) -> MetaResult < ( ) > {
564
624
match & self . command {
565
- #[ allow( clippy:: single_match) ]
566
- Command :: Plain ( mutation) => match mutation {
567
- // After the `Pause` barrier is collected and committed, we must ensure that the
568
- // storage version with this epoch is synced to all compute nodes before the
569
- // execution of the next command of `Update`, as some newly created operators may
570
- // immediately initialize their states on that barrier.
571
- Some ( Mutation :: Pause ( ..) ) => {
625
+ Command :: Plain ( _) => { }
626
+
627
+ Command :: Pause ( reason) => {
628
+ if let PausedReason :: ConfigChange = reason {
629
+ // After the `Pause` barrier is collected and committed, we must ensure that the
630
+ // storage version with this epoch is synced to all compute nodes before the
631
+ // execution of the next command of `Update`, as some newly created operators
632
+ // may immediately initialize their states on that barrier.
572
633
self . wait_epoch_commit ( self . prev_epoch . value ( ) . 0 ) . await ?;
573
634
}
635
+ }
574
636
575
- _ => { }
576
- } ,
637
+ Command :: Resume ( _) => { }
577
638
578
639
Command :: SourceSplitAssignment ( split_assignment) => {
579
640
self . fragment_manager
0 commit comments