@@ -39,11 +39,11 @@ class DataprocSubmitterTest extends AnyFlatSpec with MockitoSugar {
39
39
40
40
// Test starts here.
41
41
42
- val submitter = new DataprocSubmitter (
43
- mockJobControllerClient,
44
- SubmitterConf (" test-project" , " test-region" , " test-cluster" ))
42
+ val submitter =
43
+ new DataprocSubmitter (mockJobControllerClient, SubmitterConf (" test-project" , " test-region" , " test-cluster" ))
45
44
46
- val submittedJobId = submitter.submit(spark.SparkJob , Map (MainClass -> " test-main-class" , JarURI -> " test-jar-uri" ), List .empty)
45
+ val submittedJobId =
46
+ submitter.submit(spark.SparkJob , Map (MainClass -> " test-main-class" , JarURI -> " test-jar-uri" ), List .empty)
47
47
assertEquals(submittedJobId, jobId)
48
48
}
49
49
@@ -54,29 +54,36 @@ class DataprocSubmitterTest extends AnyFlatSpec with MockitoSugar {
54
54
it should " test flink job locally" ignore {
55
55
56
56
val submitter = DataprocSubmitter ()
57
- submitter.submit(spark.FlinkJob ,
58
- Map (MainClass -> " ai.chronon.flink.FlinkJob" ,
57
+ submitter.submit(
58
+ spark.FlinkJob ,
59
+ Map (
60
+ MainClass -> " ai.chronon.flink.FlinkJob" ,
59
61
FlinkMainJarURI -> " gs://zipline-jars/flink-assembly-0.1.0-SNAPSHOT.jar" ,
60
- JarURI -> " gs://zipline-jars/cloud_gcp_bigtable.jar" ),
62
+ JarURI -> " gs://zipline-jars/cloud_gcp_bigtable.jar"
63
+ ),
61
64
List .empty,
62
65
" --online-class=ai.chronon.integrations.cloud_gcp.GcpApiImpl" ,
63
66
" --groupby-name=e2e-count" ,
64
67
" -ZGCP_PROJECT_ID=bigtable-project-id" ,
65
- " -ZGCP_INSTANCE_ID=bigtable-instance-id" )
68
+ " -ZGCP_INSTANCE_ID=bigtable-instance-id"
69
+ )
66
70
}
67
71
68
72
it should " test flink kafka ingest job locally" ignore {
69
73
70
74
val submitter = DataprocSubmitter ()
71
75
val submittedJobId =
72
- submitter.submit(spark.FlinkJob ,
73
- Map (MainClass -> " ai.chronon.flink.FlinkKafkaBeaconEventDriver" ,
76
+ submitter.submit(
77
+ spark.FlinkJob ,
78
+ Map (
79
+ MainClass -> " ai.chronon.flink.FlinkKafkaBeaconEventDriver" ,
74
80
FlinkMainJarURI -> " gs://zipline-jars/flink_kafka_ingest-assembly-0.1.0-SNAPSHOT.jar" ,
75
- JarURI -> " gs://zipline-jars/cloud_gcp_bigtable.jar" ),
81
+ JarURI -> " gs://zipline-jars/cloud_gcp_bigtable.jar"
82
+ ),
76
83
List .empty,
77
84
" --kafka-bootstrap=bootstrap.zipline-kafka-cluster.us-central1.managedkafka.canary-443022.cloud.goog:9092" ,
78
85
" --kafka-topic=test-beacon-main" ,
79
- " --data-file-name=gs://zl-warehouse/beacon_events/beacon-output.avro" ,
86
+ " --data-file-name=gs://zl-warehouse/beacon_events/beacon-output.avro"
80
87
)
81
88
println(submittedJobId)
82
89
}
@@ -88,10 +95,10 @@ class DataprocSubmitterTest extends AnyFlatSpec with MockitoSugar {
88
95
submitter.submit(
89
96
spark.SparkJob ,
90
97
Map (MainClass -> " ai.chronon.spark.Driver" ,
91
- JarURI -> " gs://zipline-jars/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar" ),
98
+ JarURI -> " gs://zipline-jars/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar" ),
92
99
List (" gs://zipline-jars/training_set.v1" ,
93
- " gs://zipline-jars/dataproc-submitter-conf.yaml" ,
94
- " gs://zipline-jars/additional-confs.yaml" ),
100
+ " gs://zipline-jars/dataproc-submitter-conf.yaml" ,
101
+ " gs://zipline-jars/additional-confs.yaml" ),
95
102
" join" ,
96
103
" --end-date=2024-12-10" ,
97
104
" --additional-conf-path=additional-confs.yaml" ,
@@ -107,7 +114,7 @@ class DataprocSubmitterTest extends AnyFlatSpec with MockitoSugar {
107
114
submitter.submit(
108
115
spark.SparkJob ,
109
116
Map (MainClass -> " ai.chronon.spark.Driver" ,
110
- JarURI -> " gs://zipline-jars/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar" ),
117
+ JarURI -> " gs://zipline-jars/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar" ),
111
118
List .empty,
112
119
" groupby-upload-bulk-load" ,
113
120
" -ZGCP_PROJECT_ID=bigtable-project-id" ,
@@ -116,7 +123,8 @@ class DataprocSubmitterTest extends AnyFlatSpec with MockitoSugar {
116
123
" --online-class=ai.chronon.integrations.cloud_gcp.GcpApiImpl" ,
117
124
" --src-offline-table=data.test_gbu" ,
118
125
" --groupby-name=quickstart.purchases.v1" ,
119
- " --partition-string=2024-01-01" )
126
+ " --partition-string=2024-01-01"
127
+ )
120
128
println(submittedJobId)
121
129
assertEquals(submittedJobId, " mock-job-id" )
122
130
}
0 commit comments