Skip to content

Commit 0e5a586

Browse files
tchow-zlaithomaschow
authored andcommitted
fix: support non-string types for iceberg partition listing (#436)
## Summary - Everywhere else we want to handle partitions that could be non-string types. This is similar to the change in: https://github.com/zipline-ai/chronon/blob/3d2e77da18e8fa81a5471935a7358937ed8f9f13/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala#L122-L128 ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced partition date display by introducing configurable date formatting. - Partition dates are now consistently formatted based on user configuration, ensuring reliable and predictable output across the system. - Improved retrieval of partition format for BigQuery operations, allowing for broader usage across different packages. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
1 parent 2ca0ae2 commit 0e5a586

File tree

3 files changed

+7
-5
lines changed

3 files changed

+7
-5
lines changed

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ case class BigQueryFormat(project: String, bqClient: BigQuery, override val opti
106106
// TODO: remove temporary hack. this is done because the existing raw data is in the date format yyyy-MM-dd
107107
// but partition values in bigquery's INFORMATION_SCHEMA.PARTITIONS are in yyyyMMdd format.
108108
// moving forward, for bigquery gcp we should default to storing raw data in yyyyMMdd format.
109-
val partitionFormat = sparkSession.conf.get("spark.chronon.partition.format", "yyyyMMdd")
109+
val partitionFormat = TableUtils(sparkSession).partitionFormat
110110

111111
val partitionInfoDf = sparkSession.read
112112
.format("bigquery")

spark/src/main/scala/ai/chronon/spark/TableUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class TableUtils(@transient val sparkSession: SparkSession) extends Serializable
5959
.withZone(ZoneId.systemDefault())
6060
val partitionColumn: String =
6161
sparkSession.conf.get("spark.chronon.partition.column", "ds")
62-
private[spark] val partitionFormat: String =
62+
val partitionFormat: String =
6363
sparkSession.conf.get("spark.chronon.partition.format", "yyyy-MM-dd")
6464
val partitionSpec: PartitionSpec = PartitionSpec(partitionFormat, WindowUtils.Day.millis)
6565
val smallModelEnabled: Boolean =

spark/src/main/scala/ai/chronon/spark/format/Iceberg.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package ai.chronon.spark.format
22

3+
import ai.chronon.spark.TableUtils
34
import org.apache.spark.sql.SparkSession
5+
import org.apache.spark.sql.functions.{col, date_format}
46
import org.apache.spark.sql.types.StructType
57

68
case object Iceberg extends Format {
@@ -29,12 +31,12 @@ case object Iceberg extends Format {
2931
.load(s"$tableName.partitions")
3032

3133
val index = partitionsDf.schema.fieldIndex("partition")
32-
34+
val partitionFmt = TableUtils(sparkSession).partitionFormat
3335
if (partitionsDf.schema(index).dataType.asInstanceOf[StructType].fieldNames.contains("hr")) {
3436
// Hour filter is currently buggy in iceberg. https://github.com/apache/iceberg/issues/4718
3537
// so we collect and then filter.
3638
partitionsDf
37-
.select("partition.ds", "partition.hr")
39+
.select(date_format(col("partition.ds"), partitionFmt), col("partition.hr"))
3840
.collect()
3941
.filter(_.get(1) == null)
4042
.map(_.getString(0))
@@ -43,7 +45,7 @@ case object Iceberg extends Format {
4345
} else {
4446

4547
partitionsDf
46-
.select("partition.ds")
48+
.select(date_format(col("partition.ds"), partitionFmt))
4749
.collect()
4850
.map(_.getString(0))
4951
.toSeq

0 commit comments

Comments
 (0)