Skip to content

Commit db22935

Browse files
committed
Restoring aggregator files to main
1 parent 5f1b30b commit db22935

File tree

4 files changed

+11
-11
lines changed

4 files changed

+11
-11
lines changed

aggregator/src/test/scala/ai/chronon/aggregator/test/DataGen.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import scala.reflect.ClassTag
2525
import scala.util.Random
2626

2727
// utility classes to generate random data
28-
abstract class CStream[T: ClassTag] {
28+
abstract class CStream[+T: ClassTag] {
2929
def next(): T
3030

3131
// roll a dice that gives max to min uniformly, with nulls interspersed as per null rate
@@ -105,12 +105,12 @@ object CStream {
105105
.toArray
106106
}
107107

108-
class PartitionStream(count: Int, partitionSpec: PartitionSpec) extends CStream[Any] {
108+
class PartitionStream(count: Int, partitionSpec: PartitionSpec) extends CStream[String] {
109109
val keys: Array[String] = genPartitions(count, partitionSpec)
110110
override def next(): String = Option(roll(keys.length, nullRate = 0)).map(dice => keys(dice.toInt)).get
111111
}
112112

113-
class StringStream(count: Int, prefix: String, nullRate: Double = 0.2) extends CStream[Any] {
113+
class StringStream(count: Int, prefix: String, nullRate: Double = 0.2) extends CStream[String] {
114114
val keyCount: Int = (count * (1 - nullRate)).ceil.toInt
115115
val keys: Array[String] = {
116116
val fullKeySet = (1 until (count + 1)).map(i => s"$prefix$i")
@@ -121,7 +121,7 @@ object CStream {
121121
}
122122

123123
class TimeStream(window: Window, roundMillis: Long = 1, maxTs: Long = System.currentTimeMillis())
124-
extends CStream[Any] {
124+
extends CStream[Long] {
125125
private val minTs = maxTs - window.millis
126126

127127
def round(v: Long): Long = (v / roundMillis) * roundMillis
@@ -131,17 +131,17 @@ object CStream {
131131
}
132132
}
133133

134-
class IntStream(max: Int = 10000, nullRate: Double = 0.1) extends CStream[Any] {
134+
class IntStream(max: Int = 10000, nullRate: Double = 0.1) extends CStream[Integer] {
135135
override def next(): Integer =
136136
Option(roll(max, 1, nullRate = nullRate)).map(dice => Integer.valueOf(dice.toInt)).orNull
137137
}
138138

139-
class LongStream(max: Int = 10000, nullRate: Double = 0.1) extends CStream[Any] {
139+
class LongStream(max: Int = 10000, nullRate: Double = 0.1) extends CStream[JLong] {
140140
override def next(): JLong =
141141
Option(roll(max, 1, nullRate = nullRate)).map(java.lang.Long.valueOf(_)).orNull
142142
}
143143

144-
class DoubleStream(max: Double = 10000, nullRate: Double = 0.1) extends CStream[Any] {
144+
class DoubleStream(max: Double = 10000, nullRate: Double = 0.1) extends CStream[JDouble] {
145145
override def next(): JDouble =
146146
Option(rollDouble(max, 1, nullRate = nullRate)).map(java.lang.Double.valueOf(_)).orNull
147147
}

aggregator/src/test/scala/ai/chronon/aggregator/test/SawtoothAggregatorTest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class SawtoothAggregatorTest extends TestCase {
5050

5151
def testTailAccuracy(): Unit = {
5252
val timer = new Timer
53-
val queries = CStream.genTimestamps(new Window(30, TimeUnit.DAYS), 10000, 5 * 60 * 1000).map(_.asInstanceOf[Long])
53+
val queries = CStream.genTimestamps(new Window(30, TimeUnit.DAYS), 10000, 5 * 60 * 1000)
5454

5555
val columns = Seq(Column("ts", LongType, 180), Column("num", LongType, 1000))
5656
val events = CStream.gen(columns, 10000).rows
@@ -121,7 +121,7 @@ class SawtoothAggregatorTest extends TestCase {
121121

122122
def testRealTimeAccuracy(): Unit = {
123123
val timer = new Timer
124-
val queries = CStream.genTimestamps(new Window(1, TimeUnit.DAYS), 1000).map(_.asInstanceOf[Long])
124+
val queries = CStream.genTimestamps(new Window(1, TimeUnit.DAYS), 1000)
125125
val columns = Seq(Column("ts", LongType, 180),
126126
Column("num", LongType, 1000),
127127
Column("age", LongType, 100),

aggregator/src/test/scala/ai/chronon/aggregator/test/SawtoothOnlineAggregatorTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class SawtoothOnlineAggregatorTest extends TestCase {
3737
def testConsistency(): Unit = {
3838
val queryEndTs = TsUtils.round(System.currentTimeMillis(), WindowUtils.Day.millis)
3939
val batchEndTs = queryEndTs - WindowUtils.Day.millis
40-
val queries = CStream.genTimestamps(new Window(1, TimeUnit.DAYS), 1000).map(_.asInstanceOf[Long])
40+
val queries = CStream.genTimestamps(new Window(1, TimeUnit.DAYS), 1000)
4141
val eventCount = 10000
4242

4343
val columns = Seq(Column("ts", LongType, 60),

aggregator/src/test/scala/ai/chronon/aggregator/test/TwoStackLiteAggregatorTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ class TwoStackLiteAggregatorTest extends TestCase{
6565

6666
def testAgainstSawtooth(): Unit = {
6767
val timer = new Timer
68-
val queries = CStream.genTimestamps(new Window(30, TimeUnit.DAYS), 100000, 5 * 60 * 1000).map(_.asInstanceOf[Long])
68+
val queries = CStream.genTimestamps(new Window(30, TimeUnit.DAYS), 100000, 5 * 60 * 1000)
6969

7070
val columns = Seq(Column("ts", LongType, 180), Column("num", LongType, 1000))
7171
val events = CStream.gen(columns, 10000).rows

0 commit comments

Comments
 (0)