17
17
18
18
import static org .apache .beam .it .truthmatchers .PipelineAsserts .assertThatPipeline ;
19
19
20
+ import com .google .cloud .datastream .v1 .DestinationConfig ;
21
+ import com .google .cloud .datastream .v1 .SourceConfig ;
22
+ import com .google .cloud .datastream .v1 .Stream ;
20
23
import com .google .cloud .teleport .v2 .spanner .migrations .transformation .CustomTransformation ;
21
24
import com .google .common .io .Resources ;
22
25
import com .google .pubsub .v1 .SubscriptionName ;
23
26
import com .google .pubsub .v1 .TopicName ;
24
27
import java .io .IOException ;
25
28
import java .nio .charset .StandardCharsets ;
26
29
import java .nio .file .Paths ;
30
+ import java .util .ArrayList ;
27
31
import java .util .Arrays ;
28
32
import java .util .HashMap ;
33
+ import java .util .LinkedHashMap ;
29
34
import java .util .List ;
30
35
import java .util .Map ;
31
36
import java .util .Map .Entry ;
35
40
import org .apache .beam .it .common .utils .PipelineUtils ;
36
41
import org .apache .beam .it .conditions .ConditionCheck ;
37
42
import org .apache .beam .it .gcp .TemplateTestBase ;
43
+ import org .apache .beam .it .gcp .datastream .DatastreamResourceManager ;
44
+ import org .apache .beam .it .gcp .datastream .JDBCSource ;
38
45
import org .apache .beam .it .gcp .pubsub .PubsubResourceManager ;
39
46
import org .apache .beam .it .gcp .spanner .SpannerResourceManager ;
47
+ import org .apache .beam .it .gcp .spanner .matchers .SpannerAsserts ;
40
48
import org .apache .beam .it .gcp .storage .GcsResourceManager ;
41
49
import org .slf4j .Logger ;
42
50
import org .slf4j .LoggerFactory ;
@@ -62,6 +70,17 @@ public SpannerResourceManager setUpSpannerResourceManager() {
62
70
.build ();
63
71
}
64
72
73
+ public String generateSessionFile (
74
+ int numOfTables , String srcDb , String spannerDb , List <String > tableNames , String sessionFile )
75
+ throws IOException {
76
+ String sessionFileContent =
77
+ sessionFile .replaceAll ("SRC_DATABASE" , srcDb ).replaceAll ("SP_DATABASE" , spannerDb );
78
+ for (int i = 1 ; i <= numOfTables ; i ++) {
79
+ sessionFileContent = sessionFileContent .replaceAll ("TABLE" + i , tableNames .get (i - 1 ));
80
+ }
81
+ return sessionFileContent ;
82
+ }
83
+
65
84
public SpannerResourceManager setUpShadowSpannerResourceManager () {
66
85
// Create a separate spanner resource manager with different db name for shadow tables.
67
86
SpannerResourceManager sp =
@@ -207,13 +226,48 @@ protected LaunchInfo launchDataflowJob(
207
226
String shardingContextFileResourceName ,
208
227
GcsResourceManager gcsResourceManager )
209
228
throws IOException {
229
+ return launchDataflowJob (
230
+ identifierSuffix ,
231
+ sessionFileResourceName ,
232
+ transformationContextFileResourceName ,
233
+ gcsPathPrefix ,
234
+ spannerResourceManager ,
235
+ pubsubResourceManager ,
236
+ jobParameters ,
237
+ customTransformation ,
238
+ shardingContextFileResourceName ,
239
+ gcsResourceManager ,
240
+ null ,
241
+ null ,
242
+ null );
243
+ }
244
+
245
+ protected LaunchInfo launchDataflowJob (
246
+ String identifierSuffix ,
247
+ String sessionFileResourceName ,
248
+ String transformationContextFileResourceName ,
249
+ String gcsPathPrefix ,
250
+ SpannerResourceManager spannerResourceManager ,
251
+ PubsubResourceManager pubsubResourceManager ,
252
+ Map <String , String > jobParameters ,
253
+ CustomTransformation customTransformation ,
254
+ String shardingContextFileResourceName ,
255
+ GcsResourceManager gcsResourceManager ,
256
+ DatastreamResourceManager datastreamResourceManager ,
257
+ String sessionResourceContent ,
258
+ JDBCSource jdbcSource )
259
+ throws IOException {
210
260
211
261
if (sessionFileResourceName != null ) {
212
262
gcsResourceManager .uploadArtifact (
213
263
gcsPathPrefix + "/session.json" ,
214
264
Resources .getResource (sessionFileResourceName ).getPath ());
215
265
}
216
266
267
+ if (sessionResourceContent != null ) {
268
+ gcsResourceManager .createArtifact (gcsPathPrefix + "/session.json" , sessionResourceContent );
269
+ }
270
+
217
271
if (transformationContextFileResourceName != null ) {
218
272
gcsResourceManager .uploadArtifact (
219
273
gcsPathPrefix + "/transformationContext.json" ,
@@ -258,6 +312,18 @@ protected LaunchInfo launchDataflowJob(
258
312
}
259
313
};
260
314
315
+ if (jdbcSource != null ) {
316
+ params .put (
317
+ "streamName" ,
318
+ createDataStream (
319
+ datastreamResourceManager ,
320
+ gcsResourceManager ,
321
+ gcsPrefix ,
322
+ jdbcSource ,
323
+ DatastreamResourceManager .DestinationOutputFormat .JSON_FILE_FORMAT )
324
+ .getName ());
325
+ }
326
+
261
327
if (sessionFileResourceName != null ) {
262
328
params .put (
263
329
"sessionFilePath" , getGcsPath (gcsPathPrefix + "/session.json" , gcsResourceManager ));
@@ -318,4 +384,121 @@ public void createAndUploadJarToGcs(String gcsPathPrefix, GcsResourceManager gcs
318
384
gcsPathPrefix + "/customTransformation.jar" ,
319
385
"../spanner-custom-shard/target/spanner-custom-shard-1.0-SNAPSHOT.jar" );
320
386
}
387
+
388
+ public static Map <String , Object > createSessionTemplate (
389
+ int numTables ,
390
+ List <Map <String , Object >> columnConfigs ,
391
+ List <Map <String , Object >> primaryKeyConfig ) {
392
+ Map <String , Object > sessionTemplate = new LinkedHashMap <>();
393
+ sessionTemplate .put ("SessionName" , "NewSession" );
394
+ sessionTemplate .put ("EditorName" , "" );
395
+ sessionTemplate .put ("DatabaseType" , "mysql" );
396
+ sessionTemplate .put ("DatabaseName" , "SP_DATABASE" );
397
+ sessionTemplate .put ("Dialect" , "google_standard_sql" );
398
+ sessionTemplate .put ("Notes" , null );
399
+ sessionTemplate .put ("Tags" , null );
400
+ sessionTemplate .put ("SpSchema" , new LinkedHashMap <>());
401
+ sessionTemplate .put ("SyntheticPKeys" , new LinkedHashMap <>());
402
+ sessionTemplate .put ("SrcSchema" , new LinkedHashMap <>());
403
+ sessionTemplate .put ("SchemaIssues" , new LinkedHashMap <>());
404
+ sessionTemplate .put ("Location" , new LinkedHashMap <>());
405
+ sessionTemplate .put ("TimezoneOffset" , "+00:00" );
406
+ sessionTemplate .put ("SpDialect" , "google_standard_sql" );
407
+ sessionTemplate .put ("UniquePKey" , new LinkedHashMap <>());
408
+ sessionTemplate .put ("Rules" , new ArrayList <>());
409
+ sessionTemplate .put ("IsSharded" , false );
410
+ sessionTemplate .put ("SpRegion" , "" );
411
+ sessionTemplate .put ("ResourceValidation" , false );
412
+ sessionTemplate .put ("UI" , false );
413
+
414
+ for (int i = 1 ; i <= numTables ; i ++) {
415
+ String tableName = "TABLE" + i ;
416
+ List <String > colIds = new ArrayList <>();
417
+ Map <String , Object > colDefs = new LinkedHashMap <>();
418
+
419
+ for (int j = 0 ; j < columnConfigs .size (); j ++) {
420
+ Map <String , Object > colConfig = columnConfigs .get (j );
421
+ String colId = (String ) colConfig .getOrDefault ("id" , "c" + (j + 1 ));
422
+ colIds .add (colId );
423
+
424
+ Map <String , Object > colType = new LinkedHashMap <>();
425
+ colType .put ("Name" , colConfig .getOrDefault ("Type" , "STRING" ));
426
+ colType .put ("Len" , colConfig .getOrDefault ("Length" , 0 ));
427
+ colType .put ("IsArray" , colConfig .getOrDefault ("IsArray" , false ));
428
+
429
+ Map <String , Object > column = new LinkedHashMap <>();
430
+ column .put ("Name" , colConfig .getOrDefault ("Name" , "column_" + (j + 1 )));
431
+ column .put ("T" , colType );
432
+ column .put ("NotNull" , colConfig .getOrDefault ("NotNull" , false ));
433
+ column .put ("Comment" , colConfig .getOrDefault ("Comment" , "" ));
434
+ column .put ("Id" , colId );
435
+ colDefs .put (colId , column );
436
+ }
437
+
438
+ List <Map <String , Object >> primaryKeys = new ArrayList <>();
439
+ for (Map <String , Object > pk : primaryKeyConfig ) {
440
+ Map <String , Object > pkEntry = new LinkedHashMap <>();
441
+ pkEntry .put ("ColId" , pk .get ("ColId" ));
442
+ pkEntry .put ("Desc" , pk .getOrDefault ("Desc" , false ));
443
+ pkEntry .put ("Order" , pk .getOrDefault ("Order" , 1 ));
444
+ primaryKeys .add (pkEntry );
445
+ }
446
+
447
+ Map <String , Object > spSchemaEntry = new LinkedHashMap <>();
448
+ spSchemaEntry .put ("Name" , tableName );
449
+ spSchemaEntry .put ("ColIds" , colIds );
450
+ spSchemaEntry .put ("ShardIdColumn" , "" );
451
+ spSchemaEntry .put ("ColDefs" , colDefs );
452
+ spSchemaEntry .put ("PrimaryKeys" , primaryKeys );
453
+ spSchemaEntry .put ("ForeignKeys" , null );
454
+ spSchemaEntry .put ("Indexes" , null );
455
+ spSchemaEntry .put ("ParentId" , "" );
456
+ spSchemaEntry .put ("Comment" , "Spanner schema for source table " + tableName );
457
+ spSchemaEntry .put ("Id" , "t" + i );
458
+ ((Map <String , Object >) sessionTemplate .get ("SpSchema" )).put ("t" + i , spSchemaEntry );
459
+
460
+ Map <String , Object > srcSchemaEntry = new LinkedHashMap <>(spSchemaEntry );
461
+ srcSchemaEntry .put ("Schema" , "SRC_DATABASE" );
462
+ ((Map <String , Object >) sessionTemplate .get ("SrcSchema" )).put ("t" + i , srcSchemaEntry );
463
+
464
+ Map <String , Object > schemaIssuesEntry = new LinkedHashMap <>();
465
+ schemaIssuesEntry .put ("ColumnLevelIssues" , new LinkedHashMap <>());
466
+ schemaIssuesEntry .put ("TableLevelIssues" , null );
467
+ ((Map <String , Object >) sessionTemplate .get ("SchemaIssues" )).put ("t" + i , schemaIssuesEntry );
468
+ }
469
+
470
+ return sessionTemplate ;
471
+ }
472
+
473
+ /** Helper function for checking the rows of the destination Spanner tables. */
474
+ public static void checkSpannerTables (
475
+ SpannerResourceManager spannerResourceManager ,
476
+ List <String > tableNames ,
477
+ Map <String , List <Map <String , Object >>> cdcEvents ,
478
+ List <String > cols ) {
479
+ tableNames .forEach (
480
+ tableName -> {
481
+ SpannerAsserts .assertThatStructs (spannerResourceManager .readTableRecords (tableName , cols ))
482
+ .hasRecordsUnorderedCaseInsensitiveColumns (cdcEvents .get (tableName ));
483
+ });
484
+ }
485
+
486
+ protected Stream createDataStream (
487
+ DatastreamResourceManager datastreamResourceManager ,
488
+ GcsResourceManager gcsResourceManager ,
489
+ String gcsPrefix ,
490
+ JDBCSource jdbcSource ,
491
+ DatastreamResourceManager .DestinationOutputFormat destinationOutputFormat ) {
492
+ SourceConfig sourceConfig =
493
+ datastreamResourceManager .buildJDBCSourceConfig ("jdbc-profile" , jdbcSource );
494
+
495
+ DestinationConfig destinationConfig =
496
+ datastreamResourceManager .buildGCSDestinationConfig (
497
+ "gcs-profile" , gcsResourceManager .getBucket (), gcsPrefix , destinationOutputFormat );
498
+
499
+ Stream stream =
500
+ datastreamResourceManager .createStream ("stream1" , sourceConfig , destinationConfig );
501
+ datastreamResourceManager .startStream (stream );
502
+ return stream ;
503
+ }
321
504
}
0 commit comments