File tree 1 file changed +4
-3
lines changed
spark/src/main/scala/ai/chronon/spark/format
1 file changed +4
-3
lines changed Original file line number Diff line number Diff line change 1
1
package ai .chronon .spark .format
2
2
3
3
import org .apache .spark .sql .SparkSession
4
+ import org .apache .spark .sql .functions .{col , date_format }
4
5
import org .apache .spark .sql .types .StructType
5
6
6
7
case object Iceberg extends Format {
@@ -29,12 +30,12 @@ case object Iceberg extends Format {
29
30
.load(s " $tableName.partitions " )
30
31
31
32
val index = partitionsDf.schema.fieldIndex(" partition" )
32
-
33
+ val partitionFmt = sparkSession.conf.get( " spark.chronon.partition.format " , " yyyyMMdd " )
33
34
if (partitionsDf.schema(index).dataType.asInstanceOf [StructType ].fieldNames.contains(" hr" )) {
34
35
// Hour filter is currently buggy in iceberg. https://github.com/apache/iceberg/issues/4718
35
36
// so we collect and then filter.
36
37
partitionsDf
37
- .select(" partition.ds" , " partition.hr" )
38
+ .select(date_format(col( " partition.ds" ), partitionFmt), col( " partition.hr" ) )
38
39
.collect()
39
40
.filter(_.get(1 ) == null )
40
41
.map(_.getString(0 ))
@@ -43,7 +44,7 @@ case object Iceberg extends Format {
43
44
} else {
44
45
45
46
partitionsDf
46
- .select(" partition.ds" )
47
+ .select(date_format(col( " partition.ds" ), partitionFmt) )
47
48
.collect()
48
49
.map(_.getString(0 ))
49
50
.toSeq
You can’t perform that action at this time.
0 commit comments