Skip to content

Commit 57734f9

Browse files
committed
wip
1 parent c249084 commit 57734f9

File tree

2 files changed

+32
-2
lines changed

2 files changed

+32
-2
lines changed

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/BigQueryCatalogTest.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,15 @@ class BigQueryCatalogTest extends AnyFunSuite with MockitoSugar {
4242
})
4343
}
4444

45+
test("integration testing bigquery load table") {
46+
val externalTable = "data.checkouts_parquet"
47+
val table = tableUtils.loadTable(externalTable)
48+
val partitioned = tableUtils.isPartitioned(externalTable)
49+
val database = tableUtils.createDatabase("test_database")
50+
val allParts = tableUtils.allPartitions(externalTable)
51+
table.show
52+
}
53+
4554
ignore("integration testing bigquery partitions") {
4655
// TODO(tchow): This test is ignored because it requires a running instance of the bigquery. Need to figure out stubbing locally.
4756
// to run this:

spark/src/main/scala/ai/chronon/spark/TableUtils.scala

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,16 +141,24 @@ case class TableUtils(sparkSession: SparkSession) {
141141
rdd
142142
}
143143

144-
def tableExists(tableName: String): Boolean = sparkSession.catalog.tableExists(tableName)
144+
// Needs provider
145+
def tableExists(tableName: String): Boolean = {
146+
sparkSession.catalog.tableExists(tableName)
147+
}
145148

146-
def loadTable(tableName: String): DataFrame = sparkSession.table(tableName)
149+
// Needs provider
150+
def loadTable(tableName: String): DataFrame = {
151+
sparkSession.table(tableName)
152+
}
147153

154+
// Needs provider
148155
def isPartitioned(tableName: String): Boolean = {
149156
// TODO: use proper way to detect if a table is partitioned or not
150157
val schema = getSchemaFromTable(tableName)
151158
schema.fieldNames.contains(partitionColumn)
152159
}
153160

161+
// Needs provider
154162
def createDatabase(database: String): Boolean = {
155163
try {
156164
val command = s"CREATE DATABASE IF NOT EXISTS $database"
@@ -168,6 +176,7 @@ case class TableUtils(sparkSession: SparkSession) {
168176

169177
def tableReadFormat(tableName: String): Format = tableFormatProvider.readFormat(tableName)
170178

179+
// Needs provider
171180
// return all specified partition columns in a table in format of Map[partitionName, PartitionValue]
172181
def allPartitions(tableName: String, partitionColumnsFilter: Seq[String] = Seq.empty): Seq[Map[String, String]] = {
173182
if (!tableExists(tableName)) return Seq.empty[Map[String, String]]
@@ -182,6 +191,7 @@ case class TableUtils(sparkSession: SparkSession) {
182191
}
183192
}
184193

194+
// Needs provider
185195
def partitions(tableName: String, subPartitionsFilter: Map[String, String] = Map.empty): Seq[String] = {
186196
if (!tableExists(tableName)) return Seq.empty[String]
187197
val format = tableReadFormat(tableName)
@@ -222,11 +232,13 @@ case class TableUtils(sparkSession: SparkSession) {
222232
}
223233
}
224234

235+
// Needs provider
225236
def getSchemaFromTable(tableName: String): StructType = {
226237
sparkSession.sql(s"SELECT * FROM $tableName LIMIT 1").schema
227238
}
228239

229240
// method to check if a user has access to a table
241+
// Needs provider
230242
def checkTablePermission(tableName: String,
231243
fallbackPartition: String =
232244
partitionSpec.before(partitionSpec.at(System.currentTimeMillis()))): Boolean = {
@@ -252,12 +264,15 @@ case class TableUtils(sparkSession: SparkSession) {
252264
}
253265
}
254266

267+
// Needs provider
255268
def lastAvailablePartition(tableName: String, subPartitionFilters: Map[String, String] = Map.empty): Option[String] =
256269
partitions(tableName, subPartitionFilters).reduceOption((x, y) => Ordering[String].max(x, y))
257270

271+
// Needs provider
258272
def firstAvailablePartition(tableName: String, subPartitionFilters: Map[String, String] = Map.empty): Option[String] =
259273
partitions(tableName, subPartitionFilters).reduceOption((x, y) => Ordering[String].min(x, y))
260274

275+
// Needs provider
261276
def insertPartitions(df: DataFrame,
262277
tableName: String,
263278
tableProperties: Map[String, String] = null,
@@ -351,6 +366,7 @@ case class TableUtils(sparkSession: SparkSession) {
351366
}
352367
}
353368

369+
// Needs provider
354370
def insertUnPartitioned(df: DataFrame,
355371
tableName: String,
356372
tableProperties: Map[String, String] = null,
@@ -412,6 +428,7 @@ case class TableUtils(sparkSession: SparkSession) {
412428
}.get
413429
}
414430

431+
// Needs provider
415432
private def repartitionAndWriteInternal(df: DataFrame,
416433
tableName: String,
417434
saveMode: SaveMode,
@@ -488,6 +505,7 @@ case class TableUtils(sparkSession: SparkSession) {
488505
}
489506
}
490507

508+
// Needs provider
491509
private def createTableSql(tableName: String,
492510
schema: StructType,
493511
partitionColumns: Seq[String],
@@ -526,6 +544,7 @@ case class TableUtils(sparkSession: SparkSession) {
526544
Seq(createFragment, partitionFragment, fileFormatString, propertiesFragment).mkString("\n")
527545
}
528546

547+
// Needs provider
529548
private def alterTablePropertiesSql(tableName: String, properties: Map[String, String]): String = {
530549
// Only SQL api exists for setting TBLPROPERTIES
531550
val propertiesString = properties
@@ -612,6 +631,7 @@ case class TableUtils(sparkSession: SparkSession) {
612631
Some(missingChunks)
613632
}
614633

634+
// Needs provider
615635
def getTableProperties(tableName: String): Option[Map[String, String]] = {
616636
try {
617637
val tableId = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
@@ -621,6 +641,7 @@ case class TableUtils(sparkSession: SparkSession) {
621641
}
622642
}
623643

644+
// Needs provider
624645
def dropTableIfExists(tableName: String): Unit = {
625646
val command = s"DROP TABLE IF EXISTS $tableName"
626647
logger.info(s"Dropping table with command: $command")

0 commit comments

Comments
 (0)