diff --git a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala index f3adbe905f..0e3f8882a3 100644 --- a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala +++ b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala @@ -106,7 +106,7 @@ case class BigQueryFormat(project: String, bqClient: BigQuery, override val opti // TODO: remove temporary hack. this is done because the existing raw data is in the date format yyyy-MM-dd // but partition values in bigquery's INFORMATION_SCHEMA.PARTITIONS are in yyyyMMdd format. // moving forward, for bigquery gcp we should default to storing raw data in yyyyMMdd format. - val partitionFormat = sparkSession.conf.get("spark.chronon.partition.format", "yyyyMMdd") + val partitionFormat = TableUtils(sparkSession).partitionFormat val partitionInfoDf = sparkSession.read .format("bigquery") diff --git a/spark/src/main/scala/ai/chronon/spark/TableUtils.scala b/spark/src/main/scala/ai/chronon/spark/TableUtils.scala index f6bba01bb8..1239e57d62 100644 --- a/spark/src/main/scala/ai/chronon/spark/TableUtils.scala +++ b/spark/src/main/scala/ai/chronon/spark/TableUtils.scala @@ -59,7 +59,7 @@ class TableUtils(@transient val sparkSession: SparkSession) extends Serializable .withZone(ZoneId.systemDefault()) val partitionColumn: String = sparkSession.conf.get("spark.chronon.partition.column", "ds") - private[spark] val partitionFormat: String = + val partitionFormat: String = sparkSession.conf.get("spark.chronon.partition.format", "yyyy-MM-dd") val partitionSpec: PartitionSpec = PartitionSpec(partitionFormat, WindowUtils.Day.millis) val smallModelEnabled: Boolean = diff --git a/spark/src/main/scala/ai/chronon/spark/format/Iceberg.scala b/spark/src/main/scala/ai/chronon/spark/format/Iceberg.scala index 969c61e24f..f3903be1a5 100644 --- a/spark/src/main/scala/ai/chronon/spark/format/Iceberg.scala +++ b/spark/src/main/scala/ai/chronon/spark/format/Iceberg.scala @@ -1,6 +1,8 @@ package ai.chronon.spark.format +import ai.chronon.spark.TableUtils import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions.{col, date_format} import org.apache.spark.sql.types.StructType case object Iceberg extends Format { @@ -29,12 +31,12 @@ case object Iceberg extends Format { .load(s"$tableName.partitions") val index = partitionsDf.schema.fieldIndex("partition") - + val partitionFmt = TableUtils(sparkSession).partitionFormat if (partitionsDf.schema(index).dataType.asInstanceOf[StructType].fieldNames.contains("hr")) { // Hour filter is currently buggy in iceberg. https://github.com/apache/iceberg/issues/4718 // so we collect and then filter. partitionsDf - .select("partition.ds", "partition.hr") + .select(date_format(col("partition.ds"), partitionFmt), col("partition.hr")) .collect() .filter(_.get(1) == null) .map(_.getString(0)) @@ -43,7 +45,7 @@ case object Iceberg extends Format { } else { partitionsDf - .select("partition.ds") + .select(date_format(col("partition.ds"), partitionFmt)) .collect() .map(_.getString(0)) .toSeq