@@ -22,7 +22,7 @@ import logging
22
22
import sys
23
23
import time
24
24
import traceback
25
- from typing import Optional
25
+ from typing import Dict , Optional , Set
26
26
27
27
import yaml
28
28
@@ -292,6 +292,34 @@ class Porter(object):
292
292
293
293
return table , already_ported , total_to_port , forward_chunk , backward_chunk
294
294
295
+ async def get_table_constraints (self ) -> Dict [str , Set [str ]]:
296
+ """Returns a map of tables that have foreign key constraints to tables they depend on.
297
+ """
298
+
299
+ def _get_constraints (txn ):
300
+ # We can pull the information about foreign key constraints out from
301
+ # the postgres schema tables.
302
+ sql = """
303
+ SELECT DISTINCT
304
+ tc.table_name,
305
+ ccu.table_name AS foreign_table_name
306
+ FROM
307
+ information_schema.table_constraints AS tc
308
+ INNER JOIN information_schema.constraint_column_usage AS ccu
309
+ USING (table_schema, constraint_name)
310
+ WHERE tc.constraint_type = 'FOREIGN KEY';
311
+ """
312
+ txn .execute (sql )
313
+
314
+ results = {}
315
+ for table , foreign_table in txn :
316
+ results .setdefault (table , set ()).add (foreign_table )
317
+ return results
318
+
319
+ return await self .postgres_store .db_pool .runInteraction (
320
+ "get_table_constraints" , _get_constraints
321
+ )
322
+
295
323
async def handle_table (
296
324
self , table , postgres_size , table_size , forward_chunk , backward_chunk
297
325
):
@@ -619,15 +647,41 @@ class Porter(object):
619
647
consumeErrors = True ,
620
648
)
621
649
)
650
+ tables_to_port_info_map = {r [0 ]: r [1 :] for r in setup_res }
622
651
623
652
# Step 4. Do the copying.
653
+ #
654
+ # This is slightly convoluted as we need to ensure tables are ported
655
+ # in the correct order due to foreign key constraints.
624
656
self .progress .set_state ("Copying to postgres" )
625
- await make_deferred_yieldable (
626
- defer .gatherResults (
627
- [run_in_background (self .handle_table , * res ) for res in setup_res ],
628
- consumeErrors = True ,
657
+
658
+ constraints = await self .get_table_constraints ()
659
+ tables_ported = set () # type: Set[str]
660
+
661
+ while tables_to_port_info_map :
662
+ # Pulls out all tables that are still to be ported and which
663
+ # only depend on tables that are already ported (if any).
664
+ tables_to_port = [
665
+ table
666
+ for table in tables_to_port_info_map
667
+ if not constraints .get (table , set ()) - tables_ported
668
+ ]
669
+
670
+ await make_deferred_yieldable (
671
+ defer .gatherResults (
672
+ [
673
+ run_in_background (
674
+ self .handle_table ,
675
+ table ,
676
+ * tables_to_port_info_map .pop (table ),
677
+ )
678
+ for table in tables_to_port
679
+ ],
680
+ consumeErrors = True ,
681
+ )
629
682
)
630
- )
683
+
684
+ tables_ported .update (tables_to_port )
631
685
632
686
# Step 5. Set up sequences
633
687
self .progress .set_state ("Setting up sequence generators" )
0 commit comments