@@ -25,8 +25,7 @@ import ai.chronon.api.Extensions.GroupByOps
25
25
import ai .chronon .api .Extensions .MetadataOps
26
26
import ai .chronon .api .Extensions .SourceOps
27
27
import ai .chronon .api .TilingUtils
28
- import ai .chronon .online .AvroConversions
29
- import ai .chronon .online .KVStore
28
+ import ai .chronon .online .{AvroConversions , KVStore , TileCodec }
30
29
import ai .chronon .spark .GenericRowHandler
31
30
import ai .chronon .spark .GroupByUpload
32
31
import ai .chronon .spark .SparkSessionBuilder
@@ -35,6 +34,7 @@ import ai.chronon.spark.streaming.GroupBy
35
34
import ai .chronon .spark .streaming .JoinSourceRunner
36
35
import ai .chronon .spark .utils .InMemoryKvStore
37
36
import ai .chronon .spark .utils .InMemoryStream
37
+ import ai .chronon .spark .utils .InMemoryStream .TileUpdate
38
38
import ai .chronon .spark .utils .MockApi
39
39
import org .apache .spark .sql .SparkSession
40
40
import org .apache .spark .sql .streaming .Trigger
@@ -52,12 +52,16 @@ object OnlineUtils {
52
52
debug : Boolean ,
53
53
dropDsOnWrite : Boolean ,
54
54
isTiled : Boolean ): Unit = {
55
+
55
56
val inputStreamDf = groupByConf.dataModel match {
57
+
56
58
case DataModel .Entities =>
57
59
val entity = groupByConf.streamingSource.get
58
60
val df = tableUtils.sql(s " SELECT * FROM ${entity.getEntities.mutationTable} WHERE ds = ' $ds' " )
61
+
59
62
df.withColumnRenamed(entity.query.reversalColumn, Constants .ReversalColumn )
60
63
.withColumnRenamed(entity.query.mutationTimeColumn, Constants .MutationTimeColumn )
64
+
61
65
case DataModel .Events =>
62
66
val table = groupByConf.streamingSource.get.table
63
67
tableUtils.sql(s " SELECT * FROM $table WHERE ds >= ' $ds' " )
@@ -66,6 +70,7 @@ object OnlineUtils {
66
70
val inputStream = new InMemoryStream
67
71
val mockApi = new MockApi (kvStore, namespace)
68
72
var inputModified = inputStreamDf
73
+
69
74
if (dropDsOnWrite && inputStreamDf.schema.fieldNames.contains(tableUtils.partitionColumn)) {
70
75
inputModified = inputStreamDf.drop(tableUtils.partitionColumn)
71
76
}
@@ -79,43 +84,48 @@ object OnlineUtils {
79
84
}
80
85
81
86
if (isTiled) {
82
- val memoryStream : Array [(Array [Any ], Long , Array [Byte ])] =
87
+
88
+ val (memoryStream : Array [TileUpdate ], tileCodec : TileCodec ) =
83
89
inputStream.getInMemoryTiledStreamArray(session, inputModified, groupByConf)
84
90
val inMemoryKvStore : KVStore = kvStore()
85
91
86
- val fetcher = mockApi.buildFetcher(false )
92
+ val fetcher = mockApi.buildFetcher(debug = false )
87
93
val groupByServingInfo = fetcher.metadataStore.getGroupByServingInfo(groupByConf.getMetaData.getName).get
88
94
89
95
val keyZSchema : api.StructType = groupByServingInfo.keyChrononSchema
90
96
val keyToBytes = AvroConversions .encodeBytes(keyZSchema, GenericRowHandler .func)
91
97
92
- val putRequests = memoryStream.map { entry =>
93
- val keys = entry._1
94
- val timestamp = entry._2
95
- val tileBytes = entry._3
98
+ val putRequests = memoryStream.map { entry : TileUpdate =>
99
+ val keyBytes = keyToBytes(entry.keys)
100
+ val tileIrBytes = tileCodec.makeTileIr(entry.ir, isComplete = false )
96
101
97
- val keyBytes = keyToBytes(keys)
98
102
val tileKey = TilingUtils .buildTileKey(
99
103
groupByConf.streamingDataset,
100
104
keyBytes,
101
105
Some (ResolutionUtils .getSmallestWindowResolutionInMillis(groupByServingInfo.groupBy)),
102
106
None )
107
+
103
108
KVStore .PutRequest (TilingUtils .serializeTileKey(tileKey),
104
- tileBytes ,
109
+ tileIrBytes ,
105
110
groupByConf.streamingDataset,
106
- Some (timestamp ))
111
+ Some (entry.tileTimestamp ))
107
112
}
113
+
108
114
inMemoryKvStore.multiPut(putRequests)
115
+
109
116
} else {
117
+
110
118
val groupByStreaming =
111
119
new GroupBy (inputStream.getInMemoryStreamDF(session, inputModified),
112
120
session,
113
121
groupByConf,
114
122
mockApi,
115
123
debug = debug)
124
+
116
125
// We modify the arguments for running to make sure all data gets into the KV Store before fetching.
117
126
val dataStream = groupByStreaming.buildDataStream()
118
127
val query = dataStream.trigger(Trigger .Once ()).start()
128
+
119
129
query.awaitTermination()
120
130
}
121
131
}
0 commit comments