@@ -2,21 +2,45 @@ package ai.chronon.spark.stats.drift
2
2
3
3
import ai .chronon .online .KVStore
4
4
import ai .chronon .online .KVStore .PutRequest
5
+ import ai .chronon .spark .TableUtils
5
6
import org .apache .spark .sql .DataFrame
7
+ import org .apache .spark .sql .types
8
+
9
+ import java .io .FileNotFoundException
10
+
11
+ class SummaryUploader (summaryDF : DataFrame , kvStore : KVStore )(implicit tu : TableUtils ) {
12
+ val completed_schema : types.StructType = types.StructType (
13
+ Seq (
14
+ types.StructField (tu.partitionColumn, types.StringType , nullable = false )
15
+ )
16
+ )
6
17
7
- class SummaryUploader (summaryDF : DataFrame , kvStore : KVStore ) {
8
18
def run (): Unit = {
9
19
summaryDF.rdd.foreachPartition(rows => {
10
20
var putRequests : List [PutRequest ] = Nil
21
+ var newPartitions : List [String ] = Nil
11
22
for (row <- rows) {
23
+ val partition = row.getAs[String ](tu.partitionColumn)
12
24
13
- putRequests = putRequests :+ PutRequest (
14
- if (! row.isNullAt(row.fieldIndex(" keyBytes" ))) row.getAs[Array [Byte ]](" keyBytes" ) else Array .empty[Byte ],
15
- if (! row.isNullAt(row.fieldIndex(" valueBytes" ))) row.getAs[Array [Byte ]](" valueBytes" ) else Array .empty[Byte ],
16
- " drift_statistics"
17
- )
25
+ try {
26
+ tu.sparkSession.read.parquet(s " ${partition}_completed " )
27
+ } catch {
28
+ case _ : FileNotFoundException => {
29
+ putRequests = putRequests :+ PutRequest (
30
+ if (! row.isNullAt(row.fieldIndex(" keyBytes" ))) row.getAs[Array [Byte ]](" keyBytes" ) else Array .empty[Byte ],
31
+ if (! row.isNullAt(row.fieldIndex(" valueBytes" ))) row.getAs[Array [Byte ]](" valueBytes" )
32
+ else Array .empty[Byte ],
33
+ " drift_statistics"
34
+ )
35
+ newPartitions = newPartitions :+ partition
36
+ }
37
+ }
18
38
}
19
39
kvStore.multiPut(putRequests)
40
+ for (partition <- newPartitions) {
41
+ val df = tu.sparkSession.emptyDataFrame
42
+ df.write.parquet(s " ${partition}_completed " )
43
+ }
20
44
})
21
45
22
46
}
0 commit comments