Skip to content

Summary upload #50

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 60 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
fba5c42
[WIP] Anomaly detection prototype with generated data
nikhil-zlai Sep 11, 2024
9b0c82a
discord -> slack + drift stuff + avro decoding benchmark
nikhil-zlai Sep 21, 2024
0aba15d
changes
nikhil-zlai Sep 21, 2024
dbe401a
overwatch
nikhil-zlai Oct 7, 2024
fbb7ffd
compile
nikhil-zlai Oct 8, 2024
c503295
pctile to pmf
nikhil-zlai Oct 14, 2024
2f11cb0
test v0
nikhil-zlai Oct 15, 2024
2712d19
more test fixes
nikhil-zlai Oct 15, 2024
8eb6042
adding incremental compute
nikhil-zlai Oct 15, 2024
8b0d460
printing queries in color
nikhil-zlai Oct 15, 2024
f7ff3c3
Partition Runner
nikhil-zlai Oct 17, 2024
c50c873
summary metrics thrift
nikhil-zlai Oct 18, 2024
5d59bbf
packing data into keybytes and value bytes
nikhil-zlai Oct 18, 2024
0956227
wiring up with partition runner + porting fraud data
nikhil-zlai Oct 21, 2024
697afbc
Merge branch 'main' of https://github.com/zipline-ai/chronon into sum…
chewys1024 Oct 22, 2024
4848221
Fix merge
chewys1024 Oct 22, 2024
cb20bb1
[WIP] Summary Uploader
chewys1024 Oct 22, 2024
425adb9
Added a mockKVStore
chewys1024 Oct 22, 2024
9b231ee
scalafixAll
chewys1024 Oct 22, 2024
90c157d
Add a check to only upload each partition once.
chewys1024 Oct 23, 2024
54cda0c
coderabbit changes
chewy-zlai Oct 23, 2024
5f1b30b
Summarizer (#17)
nikhil-zlai Oct 29, 2024
db22935
Restoring aggregator files to main
chewy-zlai Oct 29, 2024
4f9882a
Merge branch 'main' of https://github.com/zipline-ai/chronon into sum…
chewy-zlai Oct 29, 2024
24375e6
Merged with main to simplify change
chewy-zlai Oct 29, 2024
f15ab1f
revert build.sbt
chewy-zlai Oct 29, 2024
b8c0d8c
Merge branch 'main' of https://github.com/zipline-ai/chronon into sum…
chewy-zlai Oct 31, 2024
c1bbe1f
Add create to MockKVStore
chewy-zlai Oct 31, 2024
835ab23
Create map correctly
chewy-zlai Oct 31, 2024
673cb1e
Attempt to make it serializable
chewy-zlai Oct 31, 2024
f966868
Make MockKVStore and SummaryUploader serializable.
chewy-zlai Oct 31, 2024
3c50b70
Fix serialization by passing in a function to create the KVStore
chewy-zlai Oct 31, 2024
bc494fc
sbt scalafmt
chewy-zlai Oct 31, 2024
1b9f275
coderabbit suggested use of a variable
chewy-zlai Oct 31, 2024
9c46dbc
Clean up unintended changes to DriftTest
chewy-zlai Oct 31, 2024
27fdc18
Clean up unintended changes to DriftTest
chewy-zlai Oct 31, 2024
1405872
Batch putrequest defualting at 100 at a time
chewy-zlai Oct 31, 2024
3fe25fe
Store putRequests as an ArrayBuffer for better performance
chewy-zlai Oct 31, 2024
df6ccc1
Add a semaphore to limit puts
chewy-zlai Nov 4, 2024
2e4d2df
Add backoff to DynamoDBKVStoreImpl in case of ProvisionedThroughputEx…
chewy-zlai Nov 4, 2024
57c0291
Simplify KVStoreSemaphore via coderabbit suggestions
chewy-zlai Nov 4, 2024
a6447c8
Fixes error handling issue coderabbit noticed in DynamoDBKVStoreImpl
chewy-zlai Nov 4, 2024
d7860ee
Merge branch 'main' into summary-upload
chewy-zlai Nov 4, 2024
4e18f18
Switch semaphore to a ratelimiter per dataset for DynamoDBKVStoreImpl
chewy-zlai Nov 4, 2024
b5d08b8
Add ratelimiter to multiget as well for DDBKVSImpl
chewy-zlai Nov 4, 2024
23e710f
Merge branch 'summary-upload' of https://github.com/zipline-ai/chrono…
chewy-zlai Nov 4, 2024
f4d27fc
Remove parameter from creation of KVStore
chewy-zlai Nov 4, 2024
254e6c4
remove parameter from SummaryUploader
chewy-zlai Nov 4, 2024
9a9ac67
scalafixAll
chewy-zlai Nov 4, 2024
cd7cd8e
Handle partial errors from multiput
chewy-zlai Nov 4, 2024
ba9ea0c
Specificity in error handling
chewy-zlai Nov 4, 2024
6e75bcd
Fix guava dependency so RateLimiter will work.
chewy-zlai Nov 5, 2024
013c7d6
Update DynamoDBKVStore rateLimiters to use a TrieMap
chewy-zlai Nov 5, 2024
fb74aba
Switch to ConcurrentHashMap for RateLimiters
chewy-zlai Nov 5, 2024
f920ef2
remove semaphor
chewy-zlai Nov 5, 2024
436a063
remove use of semaphore
chewy-zlai Nov 5, 2024
1acf3b5
remove manual retry logic as AWS handles that for us
chewy-zlai Nov 5, 2024
4a8e5f9
more idiomatic use of concurrent maps
chewy-zlai Nov 5, 2024
40c52df
Move stats table name to Constants
chewy-zlai Nov 5, 2024
591c5ae
clean up redundant code
chewy-zlai Nov 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ package ai.chronon.aggregator.base

import ai.chronon.aggregator.base.FrequentItemType.ItemType
import ai.chronon.api._
import com.yahoo.memory.Memory
import com.yahoo.sketches.ArrayOfDoublesSerDe
import com.yahoo.sketches.ArrayOfItemsSerDe
import com.yahoo.sketches.ArrayOfLongsSerDe
import com.yahoo.sketches.ArrayOfStringsSerDe
import com.yahoo.sketches.cpc.CpcSketch
import com.yahoo.sketches.cpc.CpcUnion
import com.yahoo.sketches.frequencies.ErrorType
import com.yahoo.sketches.frequencies.ItemsSketch
import com.yahoo.sketches.kll.KllFloatsSketch
import org.apache.datasketches.common.ArrayOfDoublesSerDe
import org.apache.datasketches.common.ArrayOfItemsSerDe
import org.apache.datasketches.common.ArrayOfLongsSerDe
import org.apache.datasketches.common.ArrayOfStringsSerDe
import org.apache.datasketches.cpc.CpcSketch
import org.apache.datasketches.cpc.CpcUnion
import org.apache.datasketches.frequencies.ErrorType
import org.apache.datasketches.frequencies.ItemsSketch
import org.apache.datasketches.kll.KllFloatsSketch
import org.apache.datasketches.memory.Memory

import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
Expand Down Expand Up @@ -336,6 +336,13 @@ object CpcFriendly {
implicit val stringIsCpcFriendly: CpcFriendly[String] = new CpcFriendly[String] {
override def update(sketch: CpcSketch, input: String): Unit = sketch.update(input)
}
implicit val intIsCpcFriendly: CpcFriendly[Int] = new CpcFriendly[Int] {
override def update(sketch: CpcSketch, input: Int): Unit = sketch.update(input.toLong)
}

implicit val floatIsCpcFriendly: CpcFriendly[Float] = new CpcFriendly[Float] {
override def update(sketch: CpcSketch, input: Float): Unit = sketch.update(input.toDouble)
}

implicit val longIsCpcFriendly: CpcFriendly[Long] = new CpcFriendly[Long] {
override def update(sketch: CpcSketch, input: Long): Unit = sketch.update(input)
Expand All @@ -344,6 +351,10 @@ object CpcFriendly {
override def update(sketch: CpcSketch, input: Double): Unit = sketch.update(input)
}

implicit val decimalIsCpcFriendly: CpcFriendly[java.math.BigDecimal] = new CpcFriendly[java.math.BigDecimal] {
override def update(sketch: CpcSketch, input: java.math.BigDecimal): Unit = sketch.update(input.toPlainString)
}

implicit val BinaryIsCpcFriendly: CpcFriendly[Array[Byte]] = new CpcFriendly[Array[Byte]] {
override def update(sketch: CpcSketch, input: Array[Byte]): Unit = sketch.update(input)
}
Expand Down Expand Up @@ -666,7 +677,7 @@ class ApproxPercentiles(k: Int = 128, percentiles: Array[Double] = Array(0.5))
override def irType: DataType = BinaryType

override def prepare(input: Float): KllFloatsSketch = {
val sketch = new KllFloatsSketch(k)
val sketch = KllFloatsSketch.newHeapInstance(k)
sketch.update(input)
sketch
}
Expand All @@ -682,7 +693,7 @@ class ApproxPercentiles(k: Int = 128, percentiles: Array[Double] = Array(0.5))
}

override def bulkMerge(irs: Iterator[KllFloatsSketch]): KllFloatsSketch = {
val result = new KllFloatsSketch(k)
val result = KllFloatsSketch.newHeapInstance(k)
irs.foreach(result.merge)
result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ package ai.chronon.aggregator.row

import ai.chronon.api
import ai.chronon.api.Extensions._
import com.yahoo.memory.Memory
import com.yahoo.sketches.kll.KllFloatsSketch
import org.apache.datasketches.kll.KllFloatsSketch
import org.apache.datasketches.memory.Memory

import java.util
import scala.collection.Seq
Expand Down Expand Up @@ -131,11 +131,12 @@ object StatsGenerator {
metrics :+ MetricTransform(totalColumn, InputTransform.One, api.Operation.COUNT)
}

def lInfKllSketch(sketch1: AnyRef, sketch2: AnyRef, bins: Int = 128): AnyRef = {
def lInfKllSketch(sketch1: AnyRef, sketch2: AnyRef, bins: Int = 20): AnyRef = {
if (sketch1 == null || sketch2 == null) return None
val sketchIr1 = KllFloatsSketch.heapify(Memory.wrap(sketch1.asInstanceOf[Array[Byte]]))
val sketchIr2 = KllFloatsSketch.heapify(Memory.wrap(sketch2.asInstanceOf[Array[Byte]]))
val keySet = sketchIr1.getQuantiles(bins).union(sketchIr2.getQuantiles(bins))
val binsToDoubles = (0 to bins).map(_.toDouble / bins).toArray
val keySet = sketchIr1.getQuantiles(binsToDoubles).union(sketchIr2.getQuantiles(binsToDoubles))
var linfSimple = 0.0
keySet.foreach { key =>
val cdf1 = sketchIr1.getRank(key)
Expand All @@ -156,15 +157,17 @@ object StatsGenerator {
* and PSI>0.25 means "significant shift, action required"
* https://scholarworks.wmich.edu/dissertations/3208
*/
def PSIKllSketch(reference: AnyRef, comparison: AnyRef, bins: Int = 128, eps: Double = 0.000001): AnyRef = {
def PSIKllSketch(reference: AnyRef, comparison: AnyRef, bins: Int = 20, eps: Double = 0.000001): AnyRef = {
if (reference == null || comparison == null) return None
val referenceSketch = KllFloatsSketch.heapify(Memory.wrap(reference.asInstanceOf[Array[Byte]]))
val comparisonSketch = KllFloatsSketch.heapify(Memory.wrap(comparison.asInstanceOf[Array[Byte]]))
val keySet = referenceSketch.getQuantiles(bins).union(comparisonSketch.getQuantiles(bins)).toSet.toArray.sorted
val binsToDoubles = (0 to bins).map(_.toDouble / bins).toArray
val keySet =
referenceSketch.getQuantiles(binsToDoubles).union(comparisonSketch.getQuantiles(binsToDoubles)).distinct.sorted
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider parameter validation and functional improvements.

While the documentation is good, the method could benefit from parameter validation and a more functional approach.

Consider these improvements:

   def PSIKllSketch(reference: AnyRef, comparison: AnyRef, bins: Int = 20, eps: Double = 0.000001): AnyRef = {
+    require(bins > 0, "Number of bins must be positive")
+    require(eps > 0, "Epsilon must be positive")
     if (reference == null || comparison == null) return None
     val referenceSketch = KllFloatsSketch.heapify(Memory.wrap(reference.asInstanceOf[Array[Byte]]))
     val comparisonSketch = KllFloatsSketch.heapify(Memory.wrap(comparison.asInstanceOf[Array[Byte]]))
     val binsToDoubles = (0 to bins).map(_.toDouble / bins).toArray
     val keySet =
       referenceSketch.getQuantiles(binsToDoubles).union(comparisonSketch.getQuantiles(binsToDoubles)).distinct.sorted
     val referencePMF = regularize(referenceSketch.getPMF(keySet), eps)
     val comparisonPMF = regularize(comparisonSketch.getPMF(keySet), eps)
-    var psi = 0.0
-    for (i <- referencePMF.indices) {
-      psi += (referencePMF(i) - comparisonPMF(i)) * Math.log(referencePMF(i) / comparisonPMF(i))
-    }
-    psi.asInstanceOf[AnyRef]
+    referencePMF.zip(comparisonPMF)
+      .map { case (ref, comp) => (ref - comp) * Math.log(ref / comp) }
+      .sum
+      .asInstanceOf[AnyRef]

Also applies to: 170-173

val referencePMF = regularize(referenceSketch.getPMF(keySet), eps)
val comparisonPMF = regularize(comparisonSketch.getPMF(keySet), eps)
var psi = 0.0
for (i <- 0 until referencePMF.length) {
for (i <- referencePMF.indices) {
psi += (referencePMF(i) - comparisonPMF(i)) * Math.log(referencePMF(i) / comparisonPMF(i))
}
psi.asInstanceOf[AnyRef]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ package ai.chronon.aggregator.test

import ai.chronon.aggregator.base.ApproxPercentiles
import ai.chronon.aggregator.row.StatsGenerator
import com.yahoo.sketches.kll.KllFloatsSketch
import junit.framework.TestCase
import org.apache.datasketches.kll.KllFloatsSketch
import org.junit.Assert._
import org.slf4j.Logger
import org.slf4j.LoggerFactory
Expand All @@ -28,7 +28,8 @@ import scala.util.Random

class ApproxPercentilesTest extends TestCase {
@transient lazy val logger: Logger = LoggerFactory.getLogger(getClass)
def testBasicImpl(nums: Int, slide: Int, k: Int, percentiles: Array[Double], errorPercent: Float): Unit = {

def basicImplTestHelper(nums: Int, slide: Int, k: Int, percentiles: Array[Double], errorPercent: Float): Unit = {
val sorted = (0 to nums).map(_.toFloat)
val elems = Random.shuffle(sorted.toList).toArray
val chunks = elems.sliding(slide, slide)
Expand Down Expand Up @@ -58,14 +59,14 @@ class ApproxPercentilesTest extends TestCase {
def testBasicPercentiles: Unit = {
val percentiles_tested: Int = 31
val percentiles: Array[Double] = (0 to percentiles_tested).toArray.map(i => i * 1.0 / percentiles_tested)
testBasicImpl(3000, 5, 100, percentiles, errorPercent = 4)
testBasicImpl(30000, 50, 200, percentiles, errorPercent = 2)
testBasicImpl(30000, 50, 50, percentiles, errorPercent = 5)
basicImplTestHelper(3000, 5, 100, percentiles, errorPercent = 4)
basicImplTestHelper(30000, 50, 200, percentiles, errorPercent = 2)
basicImplTestHelper(30000, 50, 50, percentiles, errorPercent = 5)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding edge case tests

While the current test cases cover a good range of scenarios, consider adding tests for edge cases:

  • Empty or single-element input
  • Extreme percentiles (0.0 and 1.0)
  • Very small k values

Example addition:

basicImplTestHelper(0, 1, 50, Array(0.0, 1.0), errorPercent = 1)
basicImplTestHelper(1, 1, 2, Array(0.0, 0.5, 1.0), errorPercent = 1)

}

def getPSIDrift(sample1: Array[Float], sample2: Array[Float]): Double = {
val sketch1 = new KllFloatsSketch(200)
val sketch2 = new KllFloatsSketch(200)
val sketch1 = KllFloatsSketch.newHeapInstance(200)
val sketch2 = KllFloatsSketch.newHeapInstance(200)
sample1.map(sketch1.update)
sample2.map(sketch2.update)
val drift = StatsGenerator.PSIKllSketch(sketch1.toByteArray, sketch2.toByteArray).asInstanceOf[Double]
Expand Down
47 changes: 41 additions & 6 deletions aggregator/src/test/scala/ai/chronon/aggregator/test/DataGen.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ abstract class CStream[+T: ClassTag] {
else min + ((max - min) * math.random)
}

protected def rollFloat(max: JDouble, min: JDouble = 0, nullRate: Double = 0.1): java.lang.Float = {
val dice: Double = math.random
if (dice < nullRate) null
else (min + ((max - min) * math.random)).toFloat
}

// roll a dice that gives max to min uniformly, with nulls interspersed as per null rate
protected def roll(max: JLong, min: JLong = 0, nullRate: Double = 0.1): JLong = {
val roll = rollDouble(max.toDouble, min.toDouble, nullRate)
Expand All @@ -45,9 +51,9 @@ abstract class CStream[+T: ClassTag] {
Stream.fill(count)(next())
}

def chunk(minSize: Long = 0, maxSize: Long = 10, nullRate: Double = 0.1): CStream[Seq[T]] = {
def chunk(minSize: Long = 0, maxSize: Long = 10, nullRate: Double = 0.1): CStream[Any] = {
def innerNext(): T = next()
new CStream[Seq[T]] {
new CStream[Any] {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Consider preserving type safety in the chunk method's return type

Changing the return type of chunk from CStream[Seq[T]] to CStream[Any] reduces type safety and may lead to runtime type casting issues. Preserving the specific type CStream[Seq[T]] ensures compile-time checks and maintains clarity about the data being generated.

Apply this diff to restore the original return type and maintain type safety:

-def chunk(minSize: Long = 0, maxSize: Long = 10, nullRate: Double = 0.1): CStream[Any] = {
+def chunk(minSize: Long = 0, maxSize: Long = 10, nullRate: Double = 0.1): CStream[Seq[T]] = {
     def innerNext(): T = next()
     new CStream[Seq[T]] {
         override def next(): Seq[T] = {

Committable suggestion was skipped due to low confidence.

override def next(): Seq[T] = {
val size = roll(minSize, maxSize, nullRate)
if (size != null) {
Expand All @@ -58,17 +64,36 @@ abstract class CStream[+T: ClassTag] {
}
}
}

def zipChunk[Other](other: CStream[Other], minSize: Int = 0, maxSize: Int = 20, nullRate: Double = 0.1): CStream[Any] = {
def nextKey(): T = next()
def nextValue(): Other = other.next()

new CStream[Any] {
override def next(): Map[T, Other] = {
val size = roll(minSize, maxSize, nullRate)
if (size != null) {
(0 until size.toInt).map { _ => nextKey() -> nextValue() }.toMap
} else {
null
}
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Preserve type information in the zipChunk method

The zipChunk method currently returns CStream[Any], which obscures the actual data type being generated. Since the method produces a Map[T, Other], updating the return type to CStream[Map[T, Other]] enhances type safety and usability.

Apply this diff to update the return type and maintain type safety:

-def zipChunk[Other](other: CStream[Other], minSize: Int = 0, maxSize: Int = 20, nullRate: Double = 0.1): CStream[Any] = {
+def zipChunk[Other](other: CStream[Other], minSize: Int = 0, maxSize: Int = 20, nullRate: Double = 0.1): CStream[Map[T, Other]] = {
     def nextKey(): T = next()
     def nextValue(): Other = other.next()

     new CStream[Map[T, Other]] {
-        override def next(): Map[T, Other] = {
+        override def next(): Map[T, Other] = {

Committable suggestion was skipped due to low confidence.

}

object CStream {
private type JLong = java.lang.Long
private type JDouble = java.lang.Double
private type JFloat = java.lang.Float

def genTimestamps(window: Window,
count: Int,
roundMillis: Int = 1,
maxTs: Long = System.currentTimeMillis()): Array[Long] =
new CStream.TimeStream(window, roundMillis, maxTs).gen(count).toArray.sorted
new CStream.TimeStream(window, roundMillis, maxTs).gen(count).toArray.sorted(new Ordering[Any] {
override def compare(x: Any, y: Any): Int = x.asInstanceOf[Long].compareTo(y.asInstanceOf[Long])
})

def genPartitions(count: Int, partitionSpec: PartitionSpec): Array[String] = {
val today = partitionSpec.at(System.currentTimeMillis())
Expand Down Expand Up @@ -121,6 +146,11 @@ object CStream {
Option(rollDouble(max, 1, nullRate = nullRate)).map(java.lang.Double.valueOf(_)).orNull
}

class FloatStream(max: Double = 10000, nullRate: Double = 0.1) extends CStream[JFloat] {
override def next(): java.lang.Float =
Option(rollFloat(max, 1, nullRate = nullRate)).map(java.lang.Float.valueOf(_)).orNull
}

class ZippedStream(streams: CStream[Any]*)(tsIndex: Int) extends CStream[TestRow] {
override def next(): TestRow =
new TestRow(streams.map(_.next()).toArray: _*)(tsIndex)
Expand All @@ -139,7 +169,7 @@ object CStream {
}

case class Column(name: String, `type`: DataType, cardinality: Int, chunkSize: Int = 10, nullRate: Double = 0.1) {
def genImpl(dtype: DataType, partitionColumn: String, partitionSpec: PartitionSpec): CStream[Any] =
def genImpl(dtype: DataType, partitionColumn: String, partitionSpec: PartitionSpec, nullRate: Double): CStream[Any] =
dtype match {
case StringType =>
name match {
Expand All @@ -148,18 +178,23 @@ case class Column(name: String, `type`: DataType, cardinality: Int, chunkSize: I
}
case IntType => new IntStream(cardinality, nullRate)
case DoubleType => new DoubleStream(cardinality, nullRate)
case FloatType => new FloatStream(cardinality, nullRate)
case LongType =>
name match {
case Constants.TimeColumn => new TimeStream(new Window(cardinality, TimeUnit.DAYS))
case _ => new LongStream(cardinality, nullRate)
}
case ListType(elementType) =>
genImpl(elementType, partitionColumn, partitionSpec).chunk(chunkSize)
genImpl(elementType, partitionColumn, partitionSpec, nullRate).chunk(chunkSize)
case MapType(keyType, valueType) =>
val keyStream = genImpl(keyType, partitionColumn, partitionSpec, 0)
val valueStream = genImpl(valueType, partitionColumn, partitionSpec, nullRate)
keyStream.zipChunk(valueStream, maxSize = chunkSize)
case otherType => throw new UnsupportedOperationException(s"Can't generate random data for $otherType yet.")
}

def gen(partitionColumn: String, partitionSpec: PartitionSpec): CStream[Any] =
genImpl(`type`, partitionColumn, partitionSpec)
genImpl(`type`, partitionColumn, partitionSpec, nullRate)
def schema: (String, DataType) = name -> `type`
}
case class RowsWithSchema(rows: Array[TestRow], schema: Seq[(String, DataType)])
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ class TwoStackLiteAggregatorTest extends TestCase{
val topK = new TopK[Integer](IntType, 2)
val bankersBuffer = new TwoStackLiteAggregationBuffer(topK, 5)
assertEquals(null, bankersBuffer.query) // null
Seq(7, 8, 9).map(x => new Integer(x)).foreach(i => bankersBuffer.push(i))
Seq(7, 8, 9).map(x => Integer.valueOf(x)).foreach(i => bankersBuffer.push(i))
def assertBufferEquals(a: Seq[Int], b: java.util.ArrayList[Integer]): Unit = {
if(a==null || b == null) {
assertEquals(a, b)
} else {
assertArrayEquals(
Option(a).map(_.map(x => new Integer(x).asInstanceOf[AnyRef]).toArray).orNull,
Option(a).map(_.map(x => Integer.valueOf(x).asInstanceOf[AnyRef]).toArray).orNull,
Option(b).map(_.toArray).orNull)
}
}
Expand All @@ -59,7 +59,7 @@ class TwoStackLiteAggregatorTest extends TestCase{
assertBufferEquals(Seq(9), bankersBuffer.query)
bankersBuffer.pop()
assertBufferEquals(null, bankersBuffer.query)
bankersBuffer.push(new Integer(10))
bankersBuffer.push(Integer.valueOf(10))
assertBufferEquals(Seq(10), bankersBuffer.query)
}

Expand Down
8 changes: 4 additions & 4 deletions api/py/python-api-build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

set -o xtrace

export CHRONON_VERSION_STR=$1
export CHRONON_BRANCH_STR=$2
export VERSION=$1
export BRANCH=$2
ACTION=$3

echo "Finding working directory.."
Expand All @@ -37,8 +37,8 @@ elif [[ "${ACTION}" == "release" ]]; then
PYPI_REPOSITORY="chronon-pypi"
# Sanity checks, git state, Run Tests, Build, Release
# Make sure the version string doesn't contain SNAPSHOT if so it signifies development build and cannot be released.
if [[ "${CHRONON_VERSION_STR}" == *"SNAPSHOT"* ]]; then
echo "Python releases cannot be done for in development versions. Version: ${CHRONON_VERSION_STR}"
if [[ "${VERSION}" == *"SNAPSHOT"* ]]; then
echo "Python releases cannot be done for in development versions. Version: ${VERSION}"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix variable name inconsistency in version check.

The script exports CHRONON_VERSION_STR but uses VERSION in the SNAPSHOT check. This mismatch will cause the version validation to fail.

Apply this fix:

-  if [[ "${VERSION}" == *"SNAPSHOT"* ]]; then
-    echo "Python releases cannot be done for in development versions. Version: ${VERSION}"
+  if [[ "${CHRONON_VERSION_STR}" == *"SNAPSHOT"* ]]; then
+    echo "Python releases cannot be done for in development versions. Version: ${CHRONON_VERSION_STR}"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if [[ "${VERSION}" == *"SNAPSHOT"* ]]; then
echo "Python releases cannot be done for in development versions. Version: ${VERSION}"
if [[ "${CHRONON_VERSION_STR}" == *"SNAPSHOT"* ]]; then
echo "Python releases cannot be done for in development versions. Version: ${CHRONON_VERSION_STR}"

💡 Codebase verification

I can now provide the final response based on the analysis of the code:

Bug found: VERSION variable mismatch in python-api-build.sh

There's an inconsistency in the version variable usage in python-api-build.sh:

  • Line 21 exports CHRONON_VERSION_STR=$1
  • Line 40 checks ${VERSION} instead of ${CHRONON_VERSION_STR}

This is a bug as the script exports and uses different variable names for the same purpose, which will cause the SNAPSHOT version check to fail silently since ${VERSION} will always be empty.

🔗 Analysis chain

Verify version variable usage across related files.

Let's ensure consistent version variable naming across related files.


Let me analyze the version variable usage more deeply by checking the actual implementation and usage context.

The results will help us understand if there's any inconsistency in version handling between these files and if the VERSION check in python-api-build.sh is correctly implemented.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for inconsistent version variable usage
# Expected: All files should use the same version variable name

# Search for both version variable names in Python and shell scripts
echo "Checking version variable usage in setup.py and related files:"
rg -l "CHRONON_VERSION_STR|VERSION" --type py --type sh

echo -e "\nChecking specific version variable usage:"
rg "(CHRONON_VERSION_STR|VERSION)\s*=" --type py --type sh

Length of output: 672


Script:

#!/bin/bash
# Check how VERSION is used in python-api-build.sh
echo "Content of python-api-build.sh:"
cat api/py/python-api-build.sh

echo -e "\nChecking VERSION usage in setup.py:"
cat api/py/setup.py

Length of output: 4855

exit 1
fi
echo "Running tests, git check, build and release..."
Expand Down
4 changes: 2 additions & 2 deletions api/py/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
__version__ = "local"
__branch__ = "main"
def get_version():
version_str = os.environ.get("CHRONON_VERSION_STR", __version__)
branch_str = os.environ.get("CHRONON_BRANCH_STR", __branch__)
version_str = os.environ.get("VERSION", __version__)
branch_str = os.environ.get("BRANCH", __branch__)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Update required in python-api-build.sh script for environment variable changes

The verification has identified that api/py/python-api-build.sh is using the old environment variable names (CHRONON_VERSION_STR and CHRONON_BRANCH_STR). This script needs to be updated to match the new simplified variable names (VERSION and BRANCH). No other files in the codebase appear to be using these environment variables.

  • api/py/python-api-build.sh: Update the environment variable exports:
    - export CHRONON_VERSION_STR=$1
    - export CHRONON_BRANCH_STR=$2
    + export VERSION=$1
    + export BRANCH=$2
🔗 Analysis chain

Environment variable names have been simplified.

The change to use simpler environment variable names (VERSION and BRANCH) is good for maintainability. However, this is a breaking change that requires coordination.

Let's verify the impact of this change:

Consider the following to ensure a smooth transition:

  1. Update CI/CD configurations that set these environment variables
  2. Update build scripts that reference these variables
  3. Update documentation to reflect the new variable names
  4. Consider adding temporary backward compatibility by checking both old and new variable names during a transition period

Would you like me to help create a GitHub issue to track these tasks?


Let me check the content of the identified shell script to understand the impact better.


Let me check if there are any CI/CD configuration files that might be using these environment variables.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for any references to the old environment variable names
# that need to be updated.

echo "Checking for old environment variable references..."
rg -l "CHRONON_VERSION_STR|CHRONON_BRANCH_STR"

echo "Checking for any shell scripts that might be setting these variables..."
fd -e sh -x grep -l "CHRONON_VERSION_STR|CHRONON_BRANCH_STR" {}

Length of output: 409


Script:

#!/bin/bash
# Check the content of the build script that uses these variables
cat api/py/python-api-build.sh

Length of output: 2204


Script:

#!/bin/bash
# Check for CI/CD configuration files that might reference these variables
echo "Checking GitHub Actions workflows..."
fd -e yml -e yaml -p '.github/workflows' -x grep -l "CHRONON_VERSION_STR\|CHRONON_BRANCH_STR" {}

echo "Checking for other potential CI/CD configs..."
fd -e yml -e yaml -x grep -l "CHRONON_VERSION_STR\|CHRONON_BRANCH_STR" {}

echo "Checking for any Dockerfiles..."
fd Dockerfile -x grep -l "CHRONON_VERSION_STR\|CHRONON_BRANCH_STR" {}

Length of output: 507

# Replace "-SNAPSHOT" with ".dev"
version_str = version_str.replace("-SNAPSHOT", ".dev")
# If the prefix is the branch name, then convert it as suffix after '+' to make it Python PEP440 complaint
Expand Down
4 changes: 3 additions & 1 deletion api/src/main/scala/ai/chronon/api/Builders.scala
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,9 @@ object Builders {
result.setProduction(production)
result.setCustomJson(customJson)
result.setOutputNamespace(namespace)
result.setTeam(Option(team).getOrElse("chronon"))
val effectiveTeam = Option(team).getOrElse(name.split("\\.").headOption.getOrElse("chronon"))

result.setTeam(effectiveTeam)
result.setHistoricalBackfill(historicalBackill)
if (dependencies != null)
result.setDependencies(dependencies.toSeq.toJava)
Expand Down
1 change: 1 addition & 0 deletions api/src/main/scala/ai/chronon/api/Constants.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ object Constants {
val MutationAvroFields: Seq[StructField] = Seq(TimeField, ReversalField)
val MutationAvroColumns: Seq[String] = MutationAvroFields.map(_.name)
val MutationFields: Seq[StructField] = Seq(MutationTimeField, ReversalField)
val TileColumn: String = "_tile"
val TimedKvRDDKeySchemaKey: String = "__keySchema"
val TimedKvRDDValueSchemaKey: String = "__valueSchema"
val StatsKeySchema: StructType = StructType("keySchema", Array(StructField("JoinPath", StringType)))
Expand Down
13 changes: 9 additions & 4 deletions api/src/main/scala/ai/chronon/api/Extensions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,16 @@ object Extensions {
implicit class TimeUnitOps(timeUnit: TimeUnit) {
def str: String =
timeUnit match {
case TimeUnit.HOURS => "h"
case TimeUnit.DAYS => "d"
case TimeUnit.HOURS => "h"
case TimeUnit.DAYS => "d"
case TimeUnit.MINUTES => "m"
}

def millis: Long =
timeUnit match {
case TimeUnit.HOURS => 3600 * 1000
case TimeUnit.DAYS => 24 * 3600 * 1000
case TimeUnit.HOURS => 3600 * 1000
case TimeUnit.DAYS => 24 * 3600 * 1000
case TimeUnit.MINUTES => 60 * 1000
}
}

Expand Down Expand Up @@ -108,6 +110,9 @@ object Extensions {
def outputFinalView: String = s"${metaData.outputNamespace}.${metaData.cleanName}_labeled"
def outputLatestLabelView: String = s"${metaData.outputNamespace}.${metaData.cleanName}_labeled_latest"
def loggedTable: String = s"${outputTable}_logged"
def summaryTable: String = s"${outputTable}_summary"
def packedSummaryTable: String = s"${outputTable}_summary_packed"
def driftTable: String = s"${outputTable}_drift"

def bootstrapTable: String = s"${outputTable}_bootstrap"

Expand Down
Loading