File tree 1 file changed +0
-43
lines changed
src/stream/src/executor/source
1 file changed +0
-43
lines changed Original file line number Diff line number Diff line change @@ -121,40 +121,6 @@ impl<S: StateStore> SourceExecutor<S> {
121
121
. map_err ( StreamExecutorError :: connector_error)
122
122
}
123
123
124
- fn check_split_assignment_is_migration (
125
- & self ,
126
- actor_splits : & HashMap < ActorId , Vec < SplitImpl > > ,
127
- ) -> bool {
128
- let core = self . stream_source_core . as_ref ( ) . unwrap ( ) ;
129
-
130
- let mut split_to_actors_index = HashMap :: new ( ) ;
131
-
132
- for ( actor_id, splits) in actor_splits {
133
- for split in splits {
134
- split_to_actors_index
135
- . entry ( split. id ( ) )
136
- . or_insert ( vec ! [ ] )
137
- . push ( * actor_id) ;
138
- }
139
- }
140
-
141
- for split_id in core. state_cache . keys ( ) {
142
- if let Some ( actor_ids) = split_to_actors_index. remove ( split_id) {
143
- if !actor_ids. contains ( & self . actor_ctx . id ) {
144
- tracing:: warn!(
145
- "split {} migration from {} detected, target might be {:?}" ,
146
- split_id,
147
- self . actor_ctx. id,
148
- actor_ids
149
- ) ;
150
- return true ;
151
- }
152
- }
153
- }
154
-
155
- false
156
- }
157
-
158
124
#[ inline]
159
125
fn get_metric_labels ( & self ) -> [ String ; 3 ] {
160
126
[
@@ -520,15 +486,6 @@ impl<S: StateStore> SourceExecutor<S> {
520
486
"source change split received"
521
487
) ;
522
488
523
- // In the context of split changes, we do not allow
524
- // split
525
- // migration because it can lead to inconsistent states.
526
- // Therefore, all split migration must be done via
527
- // update
528
- // mutation and pause/resume
529
- assert ! ( !self
530
- . check_split_assignment_is_migration( actor_splits) ) ;
531
-
532
489
target_state = self
533
490
. apply_split_change (
534
491
& source_desc,
You can’t perform that action at this time.
0 commit comments