@@ -25,6 +25,7 @@ use tokio::sync::oneshot;
25
25
use super :: progress:: ChainState ;
26
26
use super :: CollectResult ;
27
27
use crate :: error:: { StreamError , StreamResult } ;
28
+ use crate :: executor:: monitor:: GLOBAL_STREAMING_METRICS ;
28
29
use crate :: executor:: Barrier ;
29
30
use crate :: task:: ActorId ;
30
31
@@ -84,85 +85,71 @@ impl ManagedBarrierState {
84
85
85
86
/// Notify if we have collected barriers from all actor ids. The state must be `Issued`.
86
87
fn may_notify ( & mut self , curr_epoch : u64 ) {
87
- let to_notify = match self . epoch_barrier_state_map . get ( & curr_epoch) {
88
- Some ( BarrierState {
89
- inner :
90
- ManagedBarrierStateInner :: Issued {
91
- remaining_actors, ..
92
- } ,
93
- ..
94
- } ) => remaining_actors. is_empty ( ) ,
95
- _ => unreachable ! ( ) ,
96
- } ;
88
+ // Report if there's progress on the earliest in-flight barrier.
89
+ if self . epoch_barrier_state_map . keys ( ) . next ( ) == Some ( & curr_epoch) {
90
+ GLOBAL_STREAMING_METRICS . barrier_manager_progress . inc ( ) ;
91
+ }
97
92
98
- if to_notify {
99
- while let Some ( (
100
- _,
101
- BarrierState {
102
- inner : barrier_inner,
103
- ..
104
- } ,
105
- ) ) = self . epoch_barrier_state_map . first_key_value ( )
106
- {
107
- match barrier_inner {
108
- ManagedBarrierStateInner :: Issued {
109
- remaining_actors, ..
110
- } => {
111
- if !remaining_actors. is_empty ( ) {
112
- break ;
113
- }
114
- }
115
- _ => break ,
116
- }
117
- let ( epoch, barrier_state) = self . epoch_barrier_state_map . pop_first ( ) . unwrap ( ) ;
118
- let create_mview_progress = self
119
- . create_mview_progress
120
- . remove ( & epoch)
121
- . unwrap_or_default ( )
122
- . into_iter ( )
123
- . map ( |( actor, state) | CreateMviewProgress {
124
- chain_actor_id : actor,
125
- done : matches ! ( state, ChainState :: Done ) ,
126
- consumed_epoch : match state {
127
- ChainState :: ConsumingUpstream ( consumed_epoch, _) => consumed_epoch,
128
- ChainState :: Done => epoch,
129
- } ,
130
- consumed_rows : match state {
131
- ChainState :: ConsumingUpstream ( _, consumed_rows) => consumed_rows,
132
- ChainState :: Done => 0 ,
133
- } ,
134
- } )
135
- . collect ( ) ;
93
+ while let Some ( entry) = self . epoch_barrier_state_map . first_entry ( ) {
94
+ let to_notify = matches ! (
95
+ & entry. get( ) . inner,
96
+ ManagedBarrierStateInner :: Issued {
97
+ remaining_actors, ..
98
+ } if remaining_actors. is_empty( ) ,
99
+ ) ;
136
100
137
- let kind = barrier_state. kind ;
138
- match kind {
139
- BarrierKind :: Unspecified => unreachable ! ( ) ,
140
- BarrierKind :: Initial => tracing:: info!(
141
- epoch = barrier_state. prev_epoch,
142
- "ignore sealing data for the first barrier"
143
- ) ,
144
- BarrierKind :: Barrier | BarrierKind :: Checkpoint => {
145
- dispatch_state_store ! ( & self . state_store, state_store, {
146
- state_store. seal_epoch( barrier_state. prev_epoch, kind. is_checkpoint( ) ) ;
147
- } ) ;
148
- }
101
+ if !to_notify {
102
+ break ;
103
+ }
104
+
105
+ let ( epoch, barrier_state) = entry. remove_entry ( ) ;
106
+ let create_mview_progress = self
107
+ . create_mview_progress
108
+ . remove ( & epoch)
109
+ . unwrap_or_default ( )
110
+ . into_iter ( )
111
+ . map ( |( actor, state) | CreateMviewProgress {
112
+ chain_actor_id : actor,
113
+ done : matches ! ( state, ChainState :: Done ) ,
114
+ consumed_epoch : match state {
115
+ ChainState :: ConsumingUpstream ( consumed_epoch, _) => consumed_epoch,
116
+ ChainState :: Done => epoch,
117
+ } ,
118
+ consumed_rows : match state {
119
+ ChainState :: ConsumingUpstream ( _, consumed_rows) => consumed_rows,
120
+ ChainState :: Done => 0 ,
121
+ } ,
122
+ } )
123
+ . collect ( ) ;
124
+
125
+ let kind = barrier_state. kind ;
126
+ match kind {
127
+ BarrierKind :: Unspecified => unreachable ! ( ) ,
128
+ BarrierKind :: Initial => tracing:: info!(
129
+ epoch = barrier_state. prev_epoch,
130
+ "ignore sealing data for the first barrier"
131
+ ) ,
132
+ BarrierKind :: Barrier | BarrierKind :: Checkpoint => {
133
+ dispatch_state_store ! ( & self . state_store, state_store, {
134
+ state_store. seal_epoch( barrier_state. prev_epoch, kind. is_checkpoint( ) ) ;
135
+ } ) ;
149
136
}
137
+ }
150
138
151
- match barrier_state. inner {
152
- ManagedBarrierStateInner :: Issued {
153
- collect_notifier, ..
154
- } => {
155
- // Notify about barrier finishing.
156
- let result = CollectResult {
157
- create_mview_progress,
158
- kind,
159
- } ;
160
- if collect_notifier. unwrap ( ) . send ( Ok ( result) ) . is_err ( ) {
161
- warn ! ( "failed to notify barrier collection with epoch {}" , epoch)
162
- }
139
+ match barrier_state. inner {
140
+ ManagedBarrierStateInner :: Issued {
141
+ collect_notifier, ..
142
+ } => {
143
+ // Notify about barrier finishing.
144
+ let result = CollectResult {
145
+ create_mview_progress,
146
+ kind,
147
+ } ;
148
+ if collect_notifier. unwrap ( ) . send ( Ok ( result) ) . is_err ( ) {
149
+ warn ! ( "failed to notify barrier collection with epoch {}" , epoch)
163
150
}
164
- _ => unreachable ! ( ) ,
165
151
}
152
+ _ => unreachable ! ( ) ,
166
153
}
167
154
}
168
155
}
0 commit comments