@@ -18,8 +18,9 @@ use std::sync::Arc;
18
18
19
19
use anyhow:: { anyhow, Context } ;
20
20
use itertools:: Itertools ;
21
+ use risingwave_common:: buffer:: Bitmap ;
21
22
use risingwave_common:: catalog:: TableId ;
22
- use risingwave_common:: hash:: ParallelUnitId ;
23
+ use risingwave_common:: hash:: { ActorMapping , ParallelUnitMapping } ;
23
24
use risingwave_common:: { bail, try_match_expand} ;
24
25
use risingwave_connector:: source:: SplitImpl ;
25
26
use risingwave_pb:: common:: { ParallelUnit , WorkerNode } ;
@@ -736,44 +737,54 @@ where
736
737
let actor_status = table_fragment. actor_status . clone ( ) ;
737
738
let fragment = table_fragment. fragments . get_mut ( & fragment_id) . unwrap ( ) ;
738
739
740
+ fragment
741
+ . actors
742
+ . retain ( |a| !removed_actor_ids. contains ( & a. actor_id ) ) ;
743
+
739
744
// update vnode mapping for actors.
740
745
for actor in & mut fragment. actors {
741
746
if let Some ( bitmap) = vnode_bitmap_updates. get ( & actor. actor_id ) {
742
747
actor. vnode_bitmap = Some ( bitmap. to_protobuf ( ) ) ;
743
748
}
744
749
}
745
750
746
- fragment
747
- . actors
748
- . retain ( |a| !removed_actor_ids. contains ( & a. actor_id ) ) ;
749
-
750
751
// update fragment's vnode mapping
751
- if let Some ( vnode_mapping) = fragment. vnode_mapping . as_mut ( ) {
752
- let mut actor_to_parallel_unit = HashMap :: with_capacity ( fragment. actors . len ( ) ) ;
753
- for actor in & fragment. actors {
754
- if let Some ( actor_status) = actor_status. get ( & actor. actor_id ) {
755
- if let Some ( parallel_unit) = actor_status. parallel_unit . as_ref ( ) {
756
- actor_to_parallel_unit. insert (
757
- actor. actor_id as ActorId ,
758
- parallel_unit. id as ParallelUnitId ,
759
- ) ;
760
- }
761
- }
752
+ let mut actor_to_parallel_unit = HashMap :: with_capacity ( fragment. actors . len ( ) ) ;
753
+ let mut actor_to_vnode_bitmap = HashMap :: with_capacity ( fragment. actors . len ( ) ) ;
754
+ for actor in & fragment. actors {
755
+ let actor_status = & actor_status[ & actor. actor_id ] ;
756
+ let parallel_unit_id = actor_status. parallel_unit . as_ref ( ) . unwrap ( ) . id ;
757
+ actor_to_parallel_unit. insert ( actor. actor_id , parallel_unit_id) ;
758
+
759
+ if let Some ( vnode_bitmap) = & actor. vnode_bitmap {
760
+ let bitmap = Bitmap :: from ( vnode_bitmap) ;
761
+ actor_to_vnode_bitmap. insert ( actor. actor_id , bitmap) ;
762
762
}
763
+ }
763
764
764
- if let Some ( actor_mapping) = upstream_dispatcher_mapping. as_ref ( ) {
765
- * vnode_mapping = actor_mapping
766
- . to_parallel_unit ( & actor_to_parallel_unit)
767
- . to_protobuf ( ) ;
768
- }
765
+ let vnode_mapping = if actor_to_vnode_bitmap. is_empty ( ) {
766
+ // If there's no `vnode_bitmap`, then the fragment must be a singleton fragment.
767
+ // We directly use the single parallel unit to construct the mapping.
768
+ // TODO: also fill `vnode_bitmap` for the actor of singleton fragment so that we
769
+ // don't need this branch.
770
+ let parallel_unit = * actor_to_parallel_unit. values ( ) . exactly_one ( ) . unwrap ( ) ;
771
+ ParallelUnitMapping :: new_single ( parallel_unit)
772
+ } else {
773
+ // Generate the parallel unit mapping from the fragment's actor bitmaps.
774
+ assert_eq ! ( actor_to_vnode_bitmap. len( ) , actor_to_parallel_unit. len( ) ) ;
775
+ ActorMapping :: from_bitmaps ( & actor_to_vnode_bitmap)
776
+ . to_parallel_unit ( & actor_to_parallel_unit)
777
+ }
778
+ . to_protobuf ( ) ;
769
779
770
- if !fragment. state_table_ids . is_empty ( ) {
771
- let fragment_mapping = FragmentParallelUnitMapping {
772
- fragment_id : fragment_id as FragmentId ,
773
- mapping : Some ( vnode_mapping. clone ( ) ) ,
774
- } ;
775
- fragment_mapping_to_notify. push ( fragment_mapping) ;
776
- }
780
+ * fragment. vnode_mapping . as_mut ( ) . unwrap ( ) = vnode_mapping. clone ( ) ;
781
+
782
+ if !fragment. state_table_ids . is_empty ( ) {
783
+ let fragment_mapping = FragmentParallelUnitMapping {
784
+ fragment_id : fragment_id as FragmentId ,
785
+ mapping : Some ( vnode_mapping) ,
786
+ } ;
787
+ fragment_mapping_to_notify. push ( fragment_mapping) ;
777
788
}
778
789
779
790
// Second step, update upstream fragments
0 commit comments