@@ -34,7 +34,7 @@ class BigQueryCatalogTest extends AnyFlatSpec with MockitoSugar {
34
34
" spark.chronon.partition.column" -> " ds" ,
35
35
" spark.hadoop.fs.gs.impl" -> classOf [GoogleHadoopFileSystem ].getName,
36
36
" spark.hadoop.fs.AbstractFileSystem.gs.impl" -> classOf [GoogleHadoopFS ].getName,
37
- " spark.sql.catalogImplementation" -> " in-memory" ,
37
+ " spark.sql.catalogImplementation" -> " in-memory"
38
38
39
39
// Uncomment to test
40
40
// "spark.sql.defaultCatalog" -> "default_iceberg",
@@ -116,6 +116,19 @@ class BigQueryCatalogTest extends AnyFlatSpec with MockitoSugar {
116
116
SparkBigQueryUtil .sparkDateToBigQuery(nonJava8Date)
117
117
}
118
118
119
+ it should " bigquery connector converts spark timestamp regardless of setting" in {
120
+ val input = spark.createDataFrame(Seq ((1 , " 2025-04-28 12:30:45" ))).toDF(" id" , " ts" )
121
+ spark.conf.set(SQLConf .DATETIME_JAVA8API_ENABLED .key, true )
122
+ val java8Timestamp = input.select(col(" id" ), col(" ts" ).cast(" timestamp" )).collect.take(1 ).head.get(1 )
123
+ assert(java8Timestamp.isInstanceOf [java.time.Instant ])
124
+ SparkBigQueryUtil .sparkTimestampToBigQuery(java8Timestamp)
125
+
126
+ spark.conf.set(SQLConf .DATETIME_JAVA8API_ENABLED .key, false )
127
+ val nonJava8Timestamp = input.select(col(" id" ), col(" ts" ).cast(" timestamp" )).collect.take(1 ).head.get(1 )
128
+ assert(nonJava8Timestamp.isInstanceOf [java.sql.Timestamp ])
129
+ SparkBigQueryUtil .sparkTimestampToBigQuery(nonJava8Timestamp)
130
+ }
131
+
119
132
it should " integration testing bigquery native table" ignore {
120
133
val nativeTable = " data.checkouts"
121
134
val table = tableUtils.loadTable(nativeTable)
@@ -141,9 +154,8 @@ class BigQueryCatalogTest extends AnyFlatSpec with MockitoSugar {
141
154
142
155
val singleFilter = tableUtils.loadTable(iceberg, List (" ds = '2023-11-30'" ))
143
156
val multiFilter = tableUtils.loadTable(iceberg, List (" ds = '2023-11-30'" , " ds = '2023-11-30'" ))
144
- assertEquals(
145
- singleFilter.select(" user_id" , " ds" ).as[(String , String )].collect.toList,
146
- multiFilter.select(" user_id" , " ds" ).as[(String , String )].collect.toList)
157
+ assertEquals(singleFilter.select(" user_id" , " ds" ).as[(String , String )].collect.toList,
158
+ multiFilter.select(" user_id" , " ds" ).as[(String , String )].collect.toList)
147
159
}
148
160
149
161
it should " integration testing formats" ignore {
@@ -180,37 +192,34 @@ class BigQueryCatalogTest extends AnyFlatSpec with MockitoSugar {
180
192
assertTrue(dneFormat.isEmpty)
181
193
}
182
194
183
-
184
195
it should " integration testing bigquery partitions" ignore {
185
196
// TODO(tchow): This test is ignored because it requires a running instance of the bigquery. Need to figure out stubbing locally.
186
197
// to run, set `GOOGLE_APPLICATION_CREDENTIALS=<path_to_application_default_credentials.json>
187
198
val externalPartitions = tableUtils.partitions(" data.checkouts_parquet_partitioned" )
188
- assertEquals(Seq (" 2023-11-30" ), externalPartitions)
199
+ assertEquals(Seq (" 2023-11-30" ), externalPartitions)
189
200
val nativePartitions = tableUtils.partitions(" data.purchases" )
190
201
assertEquals(
191
- Set (20231118 , 20231122 , 20231125 , 20231102 , 20231123 , 20231119 , 20231130 , 20231101 , 20231117 , 20231110 , 20231108 , 20231112 , 20231115 , 20231116 , 20231113 , 20231104 , 20231103 , 20231106 , 20231121 , 20231124 , 20231128 , 20231109 , 20231127 , 20231129 , 20231126 , 20231114 , 20231107 , 20231111 , 20231120 , 20231105 ).map(_.toString), nativePartitions.toSet)
202
+ Set (20231118 , 20231122 , 20231125 , 20231102 , 20231123 , 20231119 , 20231130 , 20231101 , 20231117 , 20231110 , 20231108 ,
203
+ 20231112 , 20231115 , 20231116 , 20231113 , 20231104 , 20231103 , 20231106 , 20231121 , 20231124 , 20231128 , 20231109 ,
204
+ 20231127 , 20231129 , 20231126 , 20231114 , 20231107 , 20231111 , 20231120 , 20231105 ).map(_.toString),
205
+ nativePartitions.toSet
206
+ )
192
207
193
208
val df = tableUtils.loadTable(" `canary-443022.data`.purchases" )
194
209
df.show
195
210
196
- tableUtils.insertPartitions(
197
- df,
198
- " data.tchow_test_iceberg" ,
199
- Map (
200
- " file_format" -> " PARQUET" ,
201
- " table_type" -> " iceberg" ),
202
- List (" ds" ))
203
-
211
+ tableUtils.insertPartitions(df,
212
+ " data.tchow_test_iceberg" ,
213
+ Map (" file_format" -> " PARQUET" , " table_type" -> " iceberg" ),
214
+ List (" ds" ))
204
215
205
216
val icebergCols = spark.catalog.listColumns(" data.tchow_test_iceberg" )
206
217
val externalCols = spark.catalog.listColumns(" data.checkouts_parquet_partitioned" )
207
218
val nativeCols = spark.catalog.listColumns(" data.purchases" )
208
219
209
220
val icebergPartitions = spark.sql(" SELECT * FROM data.tchow_test_iceberg.partitions" )
210
221
211
-
212
- val sqlDf = tableUtils.sql(
213
- s """
222
+ val sqlDf = tableUtils.sql(s """
214
223
|SELECT ds FROM data.checkouts_parquet_partitioned -- external parquet
215
224
|UNION ALL
216
225
|SELECT ds FROM data.purchases -- bigquery native
@@ -272,8 +281,7 @@ class BigQueryCatalogTest extends AnyFlatSpec with MockitoSugar {
272
281
input.close();
273
282
274
283
assertNotNull(" Deserialized object should not be null" , deserializedObj);
275
- assertTrue(" Deserialized object should be an instance of GCSFileIO" ,
276
- deserializedObj.isInstanceOf [GCSFileIO ]);
284
+ assertTrue(" Deserialized object should be an instance of GCSFileIO" , deserializedObj.isInstanceOf [GCSFileIO ]);
277
285
assertEquals(original.properties(), deserializedObj.asInstanceOf [GCSFileIO ].properties())
278
286
}
279
287
}
0 commit comments