File tree 1 file changed +18
-0
lines changed 1 file changed +18
-0
lines changed Original file line number Diff line number Diff line change @@ -1447,6 +1447,7 @@ impl CatalogController {
1447
1447
. exec ( & txn)
1448
1448
. await ?;
1449
1449
1450
+ // add new actors
1450
1451
for (
1451
1452
PbStreamActor {
1452
1453
actor_id,
@@ -1554,6 +1555,23 @@ impl CatalogController {
1554
1555
actor. update ( & txn) . await ?;
1555
1556
}
1556
1557
1558
+ // Update actor_splits for existing actors
1559
+ for ( actor_id, splits) in actor_splits {
1560
+ if new_created_actors. contains ( & ( actor_id as ActorId ) ) {
1561
+ continue ;
1562
+ }
1563
+
1564
+ let actor = Actor :: find_by_id ( actor_id as ActorId )
1565
+ . one ( & txn)
1566
+ . await ?
1567
+ . ok_or_else ( || MetaError :: catalog_id_not_found ( "actor" , actor_id) ) ?;
1568
+
1569
+ let mut actor = actor. into_active_model ( ) ;
1570
+ let splits = splits. iter ( ) . map ( PbConnectorSplit :: from) . collect_vec ( ) ;
1571
+ actor. splits = Set ( Some ( ( & PbConnectorSplits { splits } ) . into ( ) ) ) ;
1572
+ actor. update ( & txn) . await ?;
1573
+ }
1574
+
1557
1575
// fragment update
1558
1576
let fragment = Fragment :: find_by_id ( fragment_id)
1559
1577
. one ( & txn)
You can’t perform that action at this time.
0 commit comments