@@ -80,18 +80,29 @@ object Extensions {
80
80
if (unbounded) " " else s " _ ${window.length}${window.timeUnit.str}"
81
81
82
82
def millis : Long = window.length.toLong * window.timeUnit.millis
83
+
84
+ def inverse : Window = {
85
+ if (window == null ) return null
86
+ window.deepCopy().setLength(0 - window.getLength)
87
+ }
88
+
83
89
}
84
90
85
91
object WindowUtils {
86
92
val Unbounded : Window = new Window (Int .MaxValue , TimeUnit .DAYS )
93
+
87
94
val Hour : Window = new Window (1 , TimeUnit .HOURS )
88
95
val Day : Window = new Window (1 , TimeUnit .DAYS )
96
+ val Null : Window = null
97
+
89
98
private val SecondMillis : Long = 1000
90
99
private val Minute : Long = 60 * SecondMillis
91
100
val FiveMinutes : Long = 5 * Minute
92
101
private val defaultPartitionSize : api.TimeUnit = api.TimeUnit .DAYS
93
102
val onePartition : api.Window = new api.Window (1 , defaultPartitionSize)
94
103
104
+ def hours (millis : Long ): Window = new Window ((millis / Hour .millis).toInt, TimeUnit .HOURS )
105
+
95
106
def millisToString (millis : Long ): String = {
96
107
if (millis % Day .millis == 0 ) {
97
108
new Window ((millis / Day .millis).toInt, TimeUnit .DAYS ).str
@@ -113,13 +124,32 @@ object Extensions {
113
124
timestampMs - (timestampMs % windowSizeMs)
114
125
}
115
126
116
- def convertUnits (window : Window , offsetUnit : api.TimeUnit ): Window = {
127
+ def convertUnits (window : Window , outputUnit : api.TimeUnit ): Window = {
117
128
if (window == null ) return null
118
- if (window.timeUnit == offsetUnit ) return window
129
+ if (window.timeUnit == outputUnit ) return window
119
130
120
- val offsetSpanMillis = new Window (1 , offsetUnit ).millis
131
+ val offsetSpanMillis = new Window (1 , outputUnit ).millis
121
132
val windowLength = math.ceil(window.millis.toDouble / offsetSpanMillis.toDouble).toInt
122
- new Window (windowLength, offsetUnit)
133
+ new Window (windowLength, outputUnit)
134
+ }
135
+
136
+ def plus (a : Window , b : Window ): Window = {
137
+ if (a == null ) return b
138
+ if (b == null ) return a
139
+
140
+ require(a.timeUnit == b.timeUnit, s " Cannot add windows with different time units ${a.timeUnit} vs. ${b.timeUnit}" )
141
+
142
+ new Window (a.length + b.length, a.timeUnit)
143
+ }
144
+
145
+ def minus (a : Window , b : Window ): Window = {
146
+ if (a == null ) return null
147
+ if (b == null ) return a
148
+
149
+ require(a.timeUnit == b.timeUnit,
150
+ s " Cannot subtract windows with different time units ${a.timeUnit} vs. ${b.timeUnit}" )
151
+
152
+ new Window (a.length - b.length, a.timeUnit)
123
153
}
124
154
125
155
def zero (timeUnits : api.TimeUnit = api.TimeUnit .DAYS ): Window = new Window (0 , timeUnits)
@@ -129,8 +159,12 @@ object Extensions {
129
159
def cleanName : String = metaData.name.sanitize
130
160
131
161
def outputTable : String = s " ${metaData.outputNamespace}. ${metaData.cleanName}"
162
+
163
+ // legacy way of generating label info - we might end-up doing views again, but probably with better names
132
164
def outputLabelTable : String = s " ${metaData.outputNamespace}. ${metaData.cleanName}_labels "
133
165
def outputFinalView : String = s " ${metaData.outputNamespace}. ${metaData.cleanName}_labeled "
166
+ def outputLatestLabelView : String = s " ${metaData.outputNamespace}. ${metaData.cleanName}_labeled_latest "
167
+
134
168
def outputLabelTableV2 : String =
135
169
s " ${metaData.outputNamespace}. ${metaData.cleanName}_with_labels " // Used for the LabelJoinV2 flow
136
170
def loggedTable : String = s " ${outputTable}_logged "
@@ -372,6 +406,13 @@ object Extensions {
372
406
else { source.getJoinSource.getJoin.metaData.outputTable }
373
407
}
374
408
409
+ def mutationsTable : Option [String ] = for (
410
+ entities <- Option (source.getEntities);
411
+ mutationsTable <- Option (entities.getMutationTable)
412
+ ) yield {
413
+ mutationsTable
414
+ }
415
+
375
416
def overwriteTable (table : String ): Unit = {
376
417
if (source.isSetEntities) { source.getEntities.setSnapshotTable(table) }
377
418
else if (source.isSetEvents) { source.getEvents.setTable(table) }
@@ -441,12 +482,26 @@ object Extensions {
441
482
*/
442
483
def cleanTopic : String = source.topic.cleanSpec
443
484
485
+ def partitionInterval : Window = Option (source.query.partitionInterval).getOrElse(WindowUtils .onePartition)
444
486
}
445
487
446
488
implicit class GroupByOps (groupBy : GroupBy ) extends GroupBy (groupBy) {
447
489
448
490
def keyNameForKvStore : String = {
449
- _keyNameForKvStore(groupBy.metaData, GroupByKeyword )
491
+ _keyNameForKvStore(groupBy.metaData, GroupByFolder )
492
+ }
493
+
494
+ def allWindows : Array [Window ] = {
495
+ groupBy.aggregations
496
+ .iterator()
497
+ .toScala
498
+ .flatMap { agg =>
499
+ Option (agg.windows)
500
+ .map(_.iterator().toScala)
501
+ .getOrElse(Array (WindowUtils .Null ).iterator)
502
+ }
503
+ .toArray
504
+ .distinct
450
505
}
451
506
452
507
def maxWindow : Option [Window ] = {
@@ -841,7 +896,7 @@ object Extensions {
841
896
842
897
implicit class JoinOps (val join : Join ) extends Serializable {
843
898
def keyNameForKvStore : String = {
844
- _keyNameForKvStore(join.metaData, JoinKeyword )
899
+ _keyNameForKvStore(join.metaData, JoinFolder )
845
900
}
846
901
847
902
@ transient lazy val logger : Logger = LoggerFactory .getLogger(getClass)
@@ -1242,13 +1297,13 @@ object Extensions {
1242
1297
1243
1298
implicit class StagingQueryOps (stagingQuery : StagingQuery ) {
1244
1299
def keyNameForKvStore : String = {
1245
- _keyNameForKvStore(stagingQuery.metaData, StagingQueryKeyword )
1300
+ _keyNameForKvStore(stagingQuery.metaData, StagingQueryFolder )
1246
1301
}
1247
1302
}
1248
1303
1249
1304
implicit class ModelOps (model : Model ) {
1250
1305
def keyNameForKvStore : String = {
1251
- _keyNameForKvStore(model.metaData, ModelKeyword )
1306
+ _keyNameForKvStore(model.metaData, ModelFolder )
1252
1307
}
1253
1308
}
1254
1309
0 commit comments