Skip to content

Commit f6d67ab

Browse files
authored
Vz/add test case for different partition formats (#753)
1 parent 5615e95 commit f6d67ab

File tree

51 files changed

+2285
-1561
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+2285
-1561
lines changed

.github/workflows/test_scala_2_12_spark.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ jobs:
7171
//spark:batch_test
7272
7373
fetcher_tests:
74-
runs-on: ubuntu-8_cores-32_gb
74+
runs-on: ubuntu_32_core_128gb
7575
container:
7676
image: ghcr.io/${{ github.repository }}-ci:latest
7777
credentials:
@@ -97,7 +97,7 @@ jobs:
9797
//spark:fetcher_test
9898
9999
join_tests:
100-
runs-on: ubuntu-8_cores-32_gb
100+
runs-on: ubuntu_32_core_128gb
101101
container:
102102
image: ghcr.io/${{ github.repository }}-ci:latest
103103
credentials:
@@ -123,7 +123,7 @@ jobs:
123123
//spark:join_test
124124
125125
groupby_tests:
126-
runs-on: ubuntu-8_cores-32_gb
126+
runs-on: ubuntu_32_core_128gb
127127
container:
128128
image: ghcr.io/${{ github.repository }}-ci:latest
129129
credentials:

.github/workflows/test_scala_2_13_spark.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ jobs:
7474
//spark:batch_test
7575
7676
fetcher_tests:
77-
runs-on: ubuntu-8_cores-32_gb
77+
runs-on: ubuntu_32_core_128gb
7878
container:
7979
image: ghcr.io/${{ github.repository }}-ci:latest
8080
credentials:
@@ -101,7 +101,7 @@ jobs:
101101
//spark:fetcher_test
102102
103103
join_tests:
104-
runs-on: ubuntu-8_cores-32_gb
104+
runs-on: ubuntu_32_core_128gb
105105
container:
106106
image: ghcr.io/${{ github.repository }}-ci:latest
107107
credentials:
@@ -128,7 +128,7 @@ jobs:
128128
//spark:join_test
129129
130130
groupby_tests:
131-
runs-on: ubuntu-8_cores-32_gb
131+
runs-on: ubuntu_32_core_128gb
132132
container:
133133
image: ghcr.io/${{ github.repository }}-ci:latest
134134
credentials:

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,3 +110,5 @@ releases
110110
MODULE.bazel*
111111

112112
/plugins/vscode/node_modules
113+
114+
**/.claude/settings.local.json

aggregator/src/main/scala/ai/chronon/aggregator/base/MinHeap.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ class MinHeap[T](maxSize: Int, ordering: Ordering[T]) {
7474
arr
7575
}
7676

77-
//mutating arr1 / arr2 intact
77+
// mutating arr1 / arr2 intact
7878
def merge(
7979
arr1: ContainerType[T],
8080
arr2: ContainerType[T]

aggregator/src/main/scala/ai/chronon/aggregator/base/TimedAggregators.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ abstract class TimeOrdered(inputType: DataType) extends TimedAggregator[Any, Tim
7272
}
7373

7474
class First(inputType: DataType) extends TimeOrdered(inputType) {
75-
//mutating
75+
// mutating
7676
override def update(
7777
ir: util.ArrayList[Any],
7878
input: Any,
@@ -92,7 +92,7 @@ class First(inputType: DataType) extends TimeOrdered(inputType) {
9292
}
9393

9494
class Last(inputType: DataType) extends TimeOrdered(inputType) {
95-
//mutating
95+
// mutating
9696
override def update(
9797
ir: util.ArrayList[Any],
9898
input: Any,

api/python/test/sample/scripts/data-loader.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import java.io.File
44
import org.apache.spark.sql.types._
55

6-
76
spark.sql("CREATE DATABASE IF NOT EXISTS data;")
87
// Directory containing CSV files
98
val folderPath = "/srv/chronon/data/"
@@ -27,9 +26,9 @@ files.foreach { file =>
2726
val customSchema = StructType(
2827
columns.map { columnName =>
2928
val dataType = columnName match {
30-
case "ts" => LongType
29+
case "ts" => LongType
3130
case name if name.endsWith("_price") || name.endsWith("_amt") => LongType
32-
case _ => StringType
31+
case _ => StringType
3332
}
3433
StructField(columnName, dataType, nullable = true)
3534
}

api/src/main/scala/ai/chronon/api/DataRange.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ case class PartitionRange(start: String, end: String)(implicit val partitionSpec
8888
}
8989

9090
def partitions: Seq[String] = {
91-
assert(wellDefined, s"Invalid partition range $this")
91+
require(wellDefined, s"Invalid partition range $this")
9292
Stream
9393
.iterate(start)(partitionSpec.after)
9494
.takeWhile(_ <= end)
@@ -151,6 +151,15 @@ case class PartitionRange(start: String, end: String)(implicit val partitionSpec
151151
compareDate(this.end, that.end)
152152
}
153153
}
154+
155+
def translate(otherSpec: PartitionSpec): PartitionRange = {
156+
157+
val newStart = Option(start).map(d => partitionSpec.translate(d, otherSpec)).orNull
158+
val newEnd = Option(end).map(d => partitionSpec.translate(d, otherSpec)).orNull
159+
160+
PartitionRange(newStart, newEnd)(otherSpec)
161+
}
162+
154163
override def toString: String = s"[$start...$end]"
155164
}
156165

api/src/main/scala/ai/chronon/api/PartitionSpec.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,19 @@ case class PartitionSpec(column: String, format: String, spanMillis: Long) {
8080
def now: String = at(System.currentTimeMillis())
8181

8282
def shiftBaour clientsFromNow(days: Int): String = shift(now, 0 - days)
83+
84+
def intervalWindow: Window = {
85+
if (spanMillis == WindowUtils.Day.millis) WindowUtils.Day
86+
else if (spanMillis == WindowUtils.Hour.millis) WindowUtils.Hour
87+
else
88+
throw new UnsupportedOperationException(
89+
s"Partition Intervals should be either hour or day - found ${spanMillis / 60 * 1000} minutes")
90+
}
91+
92+
def translate(date: String, targetSpec: PartitionSpec): String = {
93+
val millis = epochMillis(date)
94+
targetSpec.at(millis)
95+
}
8396
}
8497

8598
object PartitionSpec {

cloud_aws/src/test/scala/ai/chronon/integrations/aws/HudiTableUtilsTest.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@ class HudiTableUtilsTest extends AnyFlatSpec {
1818
"spark.sql.catalog.spark_catalog" -> "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
1919
"spark.sql.extensions" -> "org.apache.spark.sql.hudi.HoodieSparkSessionExtension",
2020
"spark.chronon.table_write.format" -> "hudi",
21-
"spark.kryo.registrator" -> classOf[ChrononHudiKryoRegistrator].getName,
21+
"spark.kryo.registrator" -> classOf[ChrononHudiKryoRegistrator].getName
2222
)
23-
))
23+
)
24+
)
2425
private val tableUtils = TableUtils(spark)
2526

26-
//todo(tchow): Fix once we support HUDI
27+
// todo(tchow): Fix once we support HUDI
2728
it should "create a hudi table and read the hudi table" ignore {
2829
import spark.implicits._
2930
val tableName = "db.test_create_table"

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ class DelegatingBigQueryMetastoreCatalog extends TableCatalog with SupportsNames
123123
fileBasedTable
124124
}
125125
case _: StandardTableDefinition => {
126-
//todo(tchow): Support partitioning
126+
// todo(tchow): Support partitioning
127127

128128
// Haour clients because there's a bug in the BigQueryCatalog where they ignore the projectId.
129129
// See: https://github.com/GoogleCloudDataproc/spark-bigquery-connector/pull/1340

0 commit comments

Comments
 (0)