Skip to content

Commit bce1295

Browse files
committed
merge
2 parents 1aaed62 + 6184a4c commit bce1295

File tree

6 files changed

+60
-75
lines changed

6 files changed

+60
-75
lines changed

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/BigQueryCatalogTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class BigQueryCatalogTest extends AnyFunSuite with MockitoSugar {
4848
table.show
4949
}
5050

51-
test("integration testing bigquery partitions") {
51+
ignore("integration testing bigquery partitions") {
5252
// TODO(tchow): This test is ignored because it requires a running instance of the bigquery. Need to figure out stubbing locally.
5353
// to run this:
5454
// 1. Set up a tunnel to dataproc federation proxy:

spark/src/main/scala/ai/chronon/spark/TableUtils.scala

Lines changed: 8 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ import scala.util.Try
6363
* retrieve metadata / configure it appropriately at creation time
6464
*/
6565

66-
case class TableUtils(sparkSession: SparkSession) {
66+
class TableUtils(val sparkSession: SparkSession) extends Serializable {
67+
6768
@transient lazy val logger: Logger = LoggerFactory.getLogger(getClass)
6869

6970
private val ARCHIVE_TIMESTAMP_FORMAT = "yyyyMMddHHmmss"
@@ -669,68 +670,6 @@ case class TableUtils(sparkSession: SparkSession) {
669670
}
670671
}
671672

672-
@deprecated
673-
def dropPartitionsAfterHole(inputTable: String,
674-
outputTable: String,
675-
partitionRange: PartitionRange,
676-
subPartitionFilters: Map[String, String] = Map.empty): Option[String] = {
677-
678-
def partitionsInRange(table: String, partitionFilter: Map[String, String] = Map.empty): Set[String] = {
679-
val allParts = partitions(table, partitionFilter)
680-
val startPrunedParts = Option(partitionRange.start).map(start => allParts.filter(_ >= start)).getOrElse(allParts)
681-
Option(partitionRange.end).map(end => startPrunedParts.filter(_ <= end)).getOrElse(startPrunedParts).toSet
682-
}
683-
684-
val inputPartitions = partitionsInRange(inputTable)
685-
val outputPartitions = partitionsInRange(outputTable, subPartitionFilters)
686-
val earliestHoleOpt = (inputPartitions -- outputPartitions).reduceLeftOption(Ordering[String].min)
687-
earliestHoleOpt.foreach { hole =>
688-
val toDrop = outputPartitions.filter(_ > hole)
689-
logger.info(s"""
690-
|Earliest hole at $hole in output table $outputTable, relative to $inputTable
691-
|Input Parts : ${inputPartitions.toArray.sorted.mkString("Array(", ", ", ")")}
692-
|Output Parts : ${outputPartitions.toArray.sorted.mkString("Array(", ", ", ")")}
693-
|Dropping Parts: ${toDrop.toArray.sorted.mkString("Array(", ", ", ")")}
694-
|Sub Partitions: ${subPartitionFilters.map(kv => s"${kv._1}=${kv._2}").mkString("Array(", ", ", ")")}
695-
""".stripMargin)
696-
dropPartitions(outputTable, toDrop.toArray.sorted, partitionColumn, subPartitionFilters)
697-
}
698-
earliestHoleOpt
699-
}
700-
701-
def dropPartitions(tableName: String,
702-
partitions: Seq[String],
703-
partitionColumn: String = partitionColumn,
704-
subPartitionFilters: Map[String, String] = Map.empty): Unit = {
705-
if (partitions.nonEmpty && tableExists(tableName)) {
706-
val partitionSpecs = partitions
707-
.map { partition =>
708-
val mainSpec = s"$partitionColumn='$partition'"
709-
val specs = mainSpec +: subPartitionFilters.map {
710-
case (key, value) => s"$key='$value'"
711-
}.toSeq
712-
specs.mkString("PARTITION (", ",", ")")
713-
}
714-
.mkString(",")
715-
val dropSql = s"ALTER TABLE $tableName DROP IF EXISTS $partitionSpecs"
716-
sql(dropSql)
717-
} else {
718-
logger.info(s"$tableName doesn't exist, please double check before drop partitions")
719-
}
720-
}
721-
722-
def dropPartitionRange(tableName: String,
723-
startDate: String,
724-
endDate: String,
725-
subPartitionFilters: Map[String, String] = Map.empty): Unit = {
726-
if (tableExists(tableName)) {
727-
val toDrop = Stream.iterate(startDate)(partitionSpec.after).takeWhile(_ <= endDate)
728-
dropPartitions(tableName, toDrop, partitionColumn, subPartitionFilters)
729-
} else {
730-
logger.info(s"$tableName doesn't exist, please double check before drop partitions")
731-
}
732-
}
733-
734673
/*
735674
* This method detects new columns that appear in newSchema but not in current table,
736675
* and append those new columns at the end of the existing table. This allows continuous evolution
@@ -863,3 +802,9 @@ sealed case class IncompatibleSchemaException(inconsistencies: Seq[(String, Data
863802
|""".stripMargin
864803
}
865804
}
805+
806+
object TableUtils {
807+
def apply(spark: SparkSession): TableUtils = {
808+
new TableUtils(spark)
809+
}
810+
}

spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ object TestRow {
5757
class JoinTest extends AnyFunSuite with TaggedFilterSuite {
5858

5959
val spark: SparkSession = SparkSessionBuilder.build("JoinTest", local = true)
60-
private implicit val tableUtils = TableUtils(spark)
60+
private implicit val tableUtils = TableTestUtils(spark)
6161

6262
private val today = tableUtils.partitionSpec.at(System.currentTimeMillis())
6363
private val monthAgo = tableUtils.partitionSpec.minus(today, new Window(30, TimeUnit.DAYS))

spark/src/test/scala/ai/chronon/spark/test/LabelJoinTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class LabelJoinTest {
3737
private val namespace = "label_join"
3838
private val tableName = "test_label_join"
3939
private val labelDS = "2022-10-30"
40-
private val tableUtils = TableUtils(spark)
40+
private val tableUtils = TableTestUtils(spark)
4141
tableUtils.createDatabase(namespace)
4242

4343
private val viewsGroupBy = TestUtils.createViewsGroupBy(namespace, spark)
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package ai.chronon.spark.test
2+
3+
import ai.chronon.spark.TableUtils
4+
import org.apache.spark.sql.SparkSession
5+
6+
case class TableTestUtils(override val sparkSession: SparkSession) extends TableUtils(sparkSession: SparkSession) {
7+
8+
def dropPartitions(tableName: String,
9+
partitions: Seq[String],
10+
partitionColumn: String = partitionColumn,
11+
subPartitionFilters: Map[String, String] = Map.empty): Unit = {
12+
if (partitions.nonEmpty && tableExists(tableName)) {
13+
val partitionSpecs = partitions
14+
.map { partition =>
15+
val mainSpec = s"$partitionColumn='$partition'"
16+
val specs = mainSpec +: subPartitionFilters.map {
17+
case (key, value) => s"$key='$value'"
18+
}.toSeq
19+
specs.mkString("PARTITION (", ",", ")")
20+
}
21+
.mkString(",")
22+
val dropSql = s"ALTER TABLE $tableName DROP IF EXISTS $partitionSpecs"
23+
sql(dropSql)
24+
} else {
25+
logger.info(s"$tableName doesn't exist, please double check before drop partitions")
26+
}
27+
}
28+
29+
def dropPartitionRange(tableName: String,
30+
startDate: String,
31+
endDate: String,
32+
subPartitionFilters: Map[String, String] = Map.empty): Unit = {
33+
if (tableExists(tableName)) {
34+
val toDrop = Stream.iterate(startDate)(partitionSpec.after).takeWhile(_ <= endDate)
35+
dropPartitions(tableName, toDrop, partitionColumn, subPartitionFilters)
36+
} else {
37+
logger.info(s"$tableName doesn't exist, please double check before drop partitions")
38+
}
39+
}
40+
41+
}

spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,6 @@ import org.junit.Test
3838

3939
import scala.util.Try
4040

41-
42-
4341
class SimpleAddUDF extends UDF {
4442
def evaluate(value: Int): Int = {
4543
value + 20
@@ -48,7 +46,7 @@ class SimpleAddUDF extends UDF {
4846

4947
class TableUtilsTest {
5048
lazy val spark: SparkSession = SparkSessionBuilder.build("TableUtilsTest", local = true)
51-
private val tableUtils = TableUtils(spark)
49+
private val tableUtils = TableTestUtils(spark)
5250
private implicit val partitionSpec: PartitionSpec = tableUtils.partitionSpec
5351

5452
@Test
@@ -83,10 +81,12 @@ class TableUtilsTest {
8381
Seq(
8482
types.StructField("name", types.StringType, nullable = true),
8583
types.StructField("age", types.IntegerType, nullable = false),
86-
types.StructField("address", types.StructType(Seq(
87-
types.StructField("street", types.StringType, nullable = true),
88-
types.StructField("city", types.StringType, nullable = true)
89-
)))
84+
types.StructField("address",
85+
types.StructType(
86+
Seq(
87+
types.StructField("street", types.StringType, nullable = true),
88+
types.StructField("city", types.StringType, nullable = true)
89+
)))
9090
)
9191
)
9292
val expectedFieldNames = Seq("name", "age", "address", "address.street", "address.city")
@@ -344,8 +344,7 @@ class TableUtilsTest {
344344
// verify the latest label version
345345
val labels = JoinUtils.getLatestLabelMapping(tableName, tableUtils)
346346
assertEquals(labels("2022-11-09"),
347-
List(PartitionRange("2022-10-01", "2022-10-02"),
348-
PartitionRange("2022-10-05", "2022-10-05")))
347+
List(PartitionRange("2022-10-01", "2022-10-02"), PartitionRange("2022-10-05", "2022-10-05")))
349348
}
350349

351350
private def prepareTestDataWithSubPartitions(tableName: String): Unit = {

0 commit comments

Comments
 (0)