Skip to content

[WIP] Drift metrics #59

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/test_scala_and_python.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:
path: api/py/htmlcov

other_spark_tests:
runs-on: ubuntu-latest
runs-on: ubuntu-8_cores-32_gb
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Critical: Invalid runner label will break CI

The runner label ubuntu-8_cores-32_gb is not a standard GitHub-hosted runner. This will cause the CI workflow to fail.

For GitHub-hosted runners with more resources, use:

  • ubuntu-latest-8-cores for 8 core runner
  • ubuntu-latest-16-cores for 16 core runner

Apply this diff to use a valid GitHub-hosted runner:

-    runs-on: ubuntu-8_cores-32_gb
+    runs-on: ubuntu-latest-8-cores

If you're using self-hosted runners:

  1. Ensure the runner label is properly configured
  2. Add an actionlint.yaml to specify custom runner labels
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
runs-on: ubuntu-8_cores-32_gb
runs-on: ubuntu-latest-8-cores
🧰 Tools
🪛 actionlint

52-52: label "ubuntu-8_cores-32_gb" is unknown. available labels are "windows-latest", "windows-latest-8-cores", "windows-2022", "windows-2019", "ubuntu-latest", "ubuntu-latest-4-cores", "ubuntu-latest-8-cores", "ubuntu-latest-16-cores", "ubuntu-24.04", "ubuntu-22.04", "ubuntu-20.04", "macos-latest", "macos-latest-xl", "macos-latest-xlarge", "macos-latest-large", "macos-15-xlarge", "macos-15-large", "macos-15", "macos-14-xl", "macos-14-xlarge", "macos-14-large", "macos-14", "macos-14.0", "macos-13-xl", "macos-13-xlarge", "macos-13-large", "macos-13", "macos-13.0", "macos-12-xl", "macos-12-xlarge", "macos-12-large", "macos-12", "macos-12.0", "self-hosted", "x64", "arm", "arm64", "linux", "macos", "windows". if it is a custom label for self-hosted runner, set list of labels in actionlint.yaml config file

(runner-label)

container:
image: ghcr.io/${{ github.repository }}-ci:latest
credentials:
Expand All @@ -64,7 +64,7 @@ jobs:

- name: Run other spark tests
run: |
export SBT_OPTS="-Xmx8G -Xms2G --add-opens=java.base/sun.nio.ch=ALL-UNNAMED"
export SBT_OPTS="-Xmx24G -Xms4G --add-opens=java.base/sun.nio.ch=ALL-UNNAMED"
sbt "spark/testOnly"

join_spark_tests:
Expand Down
7 changes: 5 additions & 2 deletions api/src/main/scala/ai/chronon/api/Builders.scala
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ object Builders {
samplePercent: Double = 100,
consistencySamplePercent: Double = 5,
tableProperties: Map[String, String] = Map.empty,
historicalBackill: Boolean = true
historicalBackfill: Boolean = true,
driftSpec: DriftSpec = null
): MetaData = {
val result = new MetaData()
result.setName(name)
Expand All @@ -283,7 +284,7 @@ object Builders {
}

result.setTeam(effectiveTeam)
result.setHistoricalBackfill(historicalBackill)
result.setHistoricalBackfill(historicalBackfill)
if (dependencies != null)
result.setDependencies(dependencies.toSeq.toJava)
if (samplePercent > 0)
Expand All @@ -292,6 +293,8 @@ object Builders {
result.setConsistencySamplePercent(consistencySamplePercent)
if (tableProperties.nonEmpty)
result.setTableProperties(tableProperties.toJava)
if (driftSpec != null)
result.setDriftSpec(driftSpec)
result
}
}
Expand Down
7 changes: 5 additions & 2 deletions api/src/main/scala/ai/chronon/api/Constants.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object Constants {
val ChrononDynamicTable = "chronon_dynamic_table"
val ChrononOOCTable: String = "chronon_ooc_table"
val ChrononLogTable: String = "chronon_log_table"
val ChrononMetadataKey = "ZIPLINE_METADATA"
val MetadataDataset = "CHRONON_METADATA"
val SchemaPublishEvent = "SCHEMA_PUBLISH_EVENT"
val StatsBatchDataset = "CHRONON_STATS_BATCH"
val ConsistencyMetricsDataset = "CHRONON_CONSISTENCY_METRICS_STATS_BATCH"
Expand All @@ -62,5 +62,8 @@ object Constants {
val LabelViewPropertyFeatureTable: String = "feature_table"
val LabelViewPropertyKeyLabelTable: String = "label_table"
val ChrononRunDs: String = "CHRONON_RUN_DS"
val DriftStatsTable: String = "drift_statistics"

val TiledSummaryDataset: String = "TILE_SUMMARIES"

val DefaultDriftTileSize: Window = new Window(30, TimeUnit.MINUTES)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add missing Window import and document the tile size choice

Two issues to address:

  1. The Window class import is missing
  2. The 30-minute default tile size needs documentation explaining the rationale

Add the following import at the top of the file:

+ import ai.chronon.core.Window

Consider adding a comment explaining why 30 minutes was chosen as the default tile size for drift calculations.

Committable suggestion skipped: line range outside the PR's diff.

}
88 changes: 71 additions & 17 deletions api/src/main/scala/ai/chronon/api/Extensions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,15 @@ object Extensions {
val teamOverride = Try(customJsonLookUp(Constants.TeamOverride).asInstanceOf[String]).toOption
teamOverride.getOrElse(metaData.team)
}

// if drift spec is set but tile size is not set, default to 30 minutes
def driftTileSize: Option[Window] = {
Option(metaData.getDriftSpec) match {
case Some(driftSpec) =>
Option(driftSpec.getTileSize).orElse(Some(Constants.DefaultDriftTileSize))
case None => None
}
}
}

// one per output column - so single window
Expand Down Expand Up @@ -879,24 +888,69 @@ object Extensions {
partHashes ++ Map(leftSourceKey -> leftHash, join.metaData.bootstrapTable -> bootstrapHash) ++ derivedHashMap
}

/*
External features computed in online env and logged
This method will get the external feature column names
*/
def getExternalFeatureCols: Seq[String] = {
Option(join.onlineExternalParts)
.map(_.toScala
.map { part =>
{
val keys = part.source.getKeySchema.params.toScala
.map(_.name)
val values = part.source.getValueSchema.params.toScala
.map(_.name)
keys ++ values
def externalPartColumns: Map[String, Array[String]] =
Option(join.onlineExternalParts) match {
case Some(parts) =>
parts.toScala.map { part =>
val keys = part.source.getKeySchema.params.toScala.map(_.name)
val values = part.source.getValueSchema.params.toScala.map(_.name)
part.fullName -> (keys ++ values).toArray
}.toMap
case None => Map.empty
}
Comment on lines +891 to +900
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure safe handling of potential null values in externalPartColumns.

When accessing part.source.getKeySchema and part.source.getValueSchema, there is a possibility of encountering null values, which could lead to a NullPointerException. It's important to safely handle these potential nulls.

Consider modifying the code to safely handle null values:

-parts.toScala.map { part =>
-  val keys = part.source.getKeySchema.params.toScala.map(_.name)
-  val values = part.source.getValueSchema.params.toScala.map(_.name)
-  part.fullName -> (keys ++ values).toArray
+parts.toScala.map { part =>
+  val keys = Option(part.source.getKeySchema)
+    .map(_.params.toScala.map(_.name))
+    .getOrElse(Seq.empty)
+  val values = Option(part.source.getValueSchema)
+    .map(_.params.toScala.map(_.name))
+    .getOrElse(Seq.empty)
+  part.fullName -> (keys ++ values).toArray
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def externalPartColumns: Map[String, Array[String]] =
Option(join.onlineExternalParts) match {
case Some(parts) =>
parts.toScala.map { part =>
val keys = part.source.getKeySchema.params.toScala.map(_.name)
val values = part.source.getValueSchema.params.toScala.map(_.name)
part.fullName -> (keys ++ values).toArray
}.toMap
case None => Map.empty
}
def externalPartColumns: Map[String, Array[String]] =
Option(join.onlineExternalParts) match {
case Some(parts) =>
parts.toScala.map { part =>
val keys = Option(part.source.getKeySchema)
.map(_.params.toScala.map(_.name))
.getOrElse(Seq.empty)
val values = Option(part.source.getValueSchema)
.map(_.params.toScala.map(_.name))
.getOrElse(Seq.empty)
part.fullName -> (keys ++ values).toArray
}.toMap
case None => Map.empty
}


def derivedColumns: Array[String] =
Option(join.getDerivations) match {
case Some(derivations) =>
derivations.toScala.flatMap { derivation =>
derivation.getName match {
case "*" => None
case _ => Some(derivation.getName)
}
}
.flatMap(_.toSet))
.getOrElse(Seq.empty)
}.toArray
case None => Array.empty
}
Comment on lines +902 to +912
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle potential null names in derivedColumns.

In the derivedColumns method, derivation.getName could be null, leading to a NullPointerException. Ensure that null names are appropriately handled.

Modify the code to filter out null names:

-derivations.toScala.flatMap { derivation =>
-  derivation.getName match {
-    case "*" => None
-    case _   => Some(derivation.getName)
-  }
-}.toArray
+derivations.toScala.flatMap { derivation =>
+  Option(derivation.getName) match {
+    case Some("*") => None
+    case Some(name) => Some(name)
+    case None => None
+  }
+}.toArray
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def derivedColumns: Array[String] =
Option(join.getDerivations) match {
case Some(derivations) =>
derivations.toScala.flatMap { derivation =>
derivation.getName match {
case "*" => None
case _ => Some(derivation.getName)
}
}
.flatMap(_.toSet))
.getOrElse(Seq.empty)
}.toArray
case None => Array.empty
}
def derivedColumns: Array[String] =
Option(join.getDerivations) match {
case Some(derivations) =>
derivations.toScala.flatMap { derivation =>
Option(derivation.getName) match {
case Some("*") => None
case Some(name) => Some(name)
case None => None
}
}.toArray
case None => Array.empty
}


// renamed cols are no longer part of the output
private def renamedColumns: Set[String] =
Option(join.derivations)
.map {
_.toScala.renameOnlyDerivations.map(_.expression).toSet
}
.getOrElse(Set.empty)

def joinPartColumns: Map[String, Array[String]] =
Option(join.getJoinParts) match {
case None => Map.empty
case Some(parts) =>
parts.toScala.map { part =>
val prefix = Option(part.prefix)
val groupByName = part.getGroupBy.getMetaData.cleanName
val partName = (prefix.toSeq :+ groupByName).mkString("_")

val outputColumns = part.getGroupBy.valueColumns
val cols = outputColumns.map { column =>
(prefix.toSeq :+ groupByName :+ column).mkString("_")
}
partName -> cols
}.toMap
}

def outputColumnsByGroup: Map[String, Array[String]] = {
val preDeriveCols = (joinPartColumns ++ externalPartColumns)
val preDerivedWithoutRenamed = preDeriveCols.mapValues(_.filterNot(renamedColumns.contains))
val derivedColumns: Array[String] = Option(join.derivations) match {
case Some(derivations) => derivations.toScala.map { _.getName }.filter(_ == "*").toArray
case None => Array.empty
}
preDerivedWithoutRenamed ++ Map("derivations" -> derivedColumns)
}

def keyColumns: Array[String] = {
val joinPartKeys = join.joinParts.toScala.flatMap(_.groupBy.keyColumns.toScala).toSet
val externalKeys = join.onlineExternalParts.toScala.flatMap(_.source.keyNames).toSet
val bootstrapKeys = join.bootstrapParts.toScala.flatMap(_.keyColumns.toScala).toSet
(joinPartKeys ++ externalKeys ++ bootstrapKeys).toArray
Comment on lines +950 to +953
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure safe access to collections in keyColumns to prevent NullPointerException.

Accessing join.onlineExternalParts or join.bootstrapParts without null checks can lead to NullPointerException if they are null. Safely handle these potential null values.

Modify the code to include null checks:

-val externalKeys = join.onlineExternalParts.toScala.flatMap(_.source.keyNames).toSet
-val bootstrapKeys = join.bootstrapParts.toScala.flatMap(_.keyColumns.toScala).toSet
+val externalKeys = Option(join.onlineExternalParts)
+  .map(_.toScala.flatMap(_.source.keyNames).toSet)
+  .getOrElse(Set.empty)
+val bootstrapKeys = Option(join.bootstrapParts)
+  .map(_.toScala.flatMap(_.keyColumns.toScala).toSet)
+  .getOrElse(Set.empty)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
val joinPartKeys = join.joinParts.toScala.flatMap(_.groupBy.keyColumns.toScala).toSet
val externalKeys = join.onlineExternalParts.toScala.flatMap(_.source.keyNames).toSet
val bootstrapKeys = join.bootstrapParts.toScala.flatMap(_.keyColumns.toScala).toSet
(joinPartKeys ++ externalKeys ++ bootstrapKeys).toArray
val joinPartKeys = join.joinParts.toScala.flatMap(_.groupBy.keyColumns.toScala).toSet
val externalKeys = Option(join.onlineExternalParts)
.map(_.toScala.flatMap(_.source.keyNames).toSet)
.getOrElse(Set.empty)
val bootstrapKeys = Option(join.bootstrapParts)
.map(_.toScala.flatMap(_.keyColumns.toScala).toSet)
.getOrElse(Set.empty)
(joinPartKeys ++ externalKeys ++ bootstrapKeys).toArray

}

/*
Expand Down
8 changes: 8 additions & 0 deletions api/src/main/scala/ai/chronon/api/ThriftJsonCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import ai.chronon.api.thrift.protocol.TSimpleJSONProtocol
import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import com.google.gson.GsonBuilder
import org.slf4j.Logger
import org.slf4j.LoggerFactory

Expand All @@ -48,6 +49,13 @@ object ThriftJsonCodec {
new String(serializer.serialize(obj), Constants.UTF8)
}

@transient private lazy val prettyGson = new GsonBuilder().setPrettyPrinting().create()
def toPrettyJsonStr[T <: TBase[_, _]: Manifest](obj: T): String = {
val raw = toJsonStr(obj)
val je = prettyGson.fromJson(raw, classOf[com.google.gson.JsonElement])
prettyGson.toJson(je)
}

def toJsonList[T <: TBase[_, _]: Manifest](obj: util.List[T]): String = {
if (obj == null) return ""
obj.toScala
Expand Down
88 changes: 83 additions & 5 deletions api/thrift/api.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -234,16 +234,12 @@ enum Cardinality {
+----------------------------------+-------------------+----------------+----------------------------------+
| Hellinger Distance | 0.1 - 0.25 | > 0.25 | Ranges from 0 to 1 |
+----------------------------------+-------------------+----------------+----------------------------------+
| Kolmogorov-Smirnov (K-S) | 0.1 - 0.2 | > 0.2 | Ranges from 0 to 1 |
| Distance | | | |
+----------------------------------+-------------------+----------------+----------------------------------+
| Population Stability Index (PSI) | 0.1 - 0.2 | > 0.2 | Industry standard in some fields |
+----------------------------------+-------------------+----------------+----------------------------------+
**/
enum DriftMetric {
JENSEN_SHANNON = 0,
HELLINGER = 1,
KOLMOGOROV_SMIRNOV = 2,
PSI = 3
}

Expand All @@ -254,7 +250,10 @@ struct TileKey {
4: optional i64 sizeMillis
}

struct TileSummaries {
// summary of distribution & coverage etc for a given (table, column, slice, tileWindow)
// for categorical types, distribution is histogram, otherwise percentiles
// we also handle container types by counting inner value distribution and inner value coverage
struct TileSummary {
1: optional list<double> percentiles
2: optional map<string, i64> histogram
3: optional i64 count
Expand All @@ -269,6 +268,72 @@ struct TileSummaries {
8: optional list<i32> stringLengthPercentiles
}

struct TileSeriesKey {
1: optional string column // name of the column - avg_txns
2: optional string slice // value of the slice - merchant_category
3: optional string groupName // name of the columnGroup within node, for join - joinPart name, externalPart name etc
4: optional string nodeName // name of the node - join name etc
}

// array of tuples of (TileSummary, timestamp) ==(pivot)==> TileSummarySeries
struct TileSummarySeries {
1: optional list<list<double>> percentiles
2: optional map<string, list<i64>> histogram
3: optional list<i64> count
4: optional list<i64> nullCount

// for container types
5: optional list<i64> innerCount // total of number of entries within all containers of this column
6: optional list<i64> innerNullCount
7: optional list<list<i32>> lengthPercentiles

// high cardinality string type
8: optional list<list<i32>> stringLengthPercentiles

200: optional list<i64> timestamps
300: optional TileSeriesKey key
}

// (DriftMetric + old TileSummary + new TileSummary) = TileDrift
struct TileDrift {

// for continuous values - scalar values or within containers
// (lists - for eg. via last_k or maps for eg. via bucketing)
1: optional double percentileDrift
// for categorical values - scalar values or within containers
2: optional double histogramDrift

// for all types
3: optional double countChangePercent
4: optional double nullRatioChangePercent

// additional tracking for container types
5: optional double innerCountChangePercent // total of number of entries within all containers of this column
6: optional double innerNullCountChangePercent
7: optional double lengthPercentilesDrift

// additional tracking for string types
8: optional double stringLengthPercentilesDrift
}

// PivotUtils.pivot(Array[(Long, TileDrift)]) = TileDriftSeries
// used in front end after this is computed
struct TileDriftSeries {
1: optional list<double> percentileDriftSeries
2: optional list<double> histogramDriftSeries
3: optional list<double> countChangePercentSeries
4: optional list<double> nullRatioChangePercentSeries

5: optional list<double> innerCountChangePercentSeries
6: optional list<double> innerNullCountChangePercentSeries
7: optional list<double> lengthPercentilesDriftSeries
8: optional list<double> stringLengthPercentilesDriftSeries

200: optional list<i64> timestamps

300: optional TileSeriesKey key
}

struct DriftSpec {
// slices is another key to summarize the data with - besides the column & slice
// currently supports only one slice
Expand All @@ -279,9 +344,19 @@ struct DriftSpec {
// likes_over_dislines = IF(dislikes > likes, 1, 0)
// or any other expression that you care about
2: optional map<string, string> derivations

// we measure the unique counts of the columns and decide if they are categorical and numeric
// you can use this to override that decision by setting cardinality hints
3: optional map<string, Cardinality> columnCardinalityHints

4: optional Window tileSize

// the current tile summary will be compared with older summaries using the metric
// if the drift is more than the threshold, we will raise an alert
5: optional list<Window> lookbackWindows

// default drift metric to use
6: optional DriftMetric driftMetric = DriftMetric.JENSEN_SHANNON
}

struct MetaData {
Expand Down Expand Up @@ -315,6 +390,9 @@ struct MetaData {
// Flag to indicate whether join backfill should backfill previous holes.
// Setting to false will only backfill latest single partition
14: optional bool historicalBackfill

// specify how to compute drift
15: optional DriftSpec driftSpec
}


Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ lazy val online = project
"com.datadoghq" % "java-dogstatsd-client" % "4.4.1",
"org.rogach" %% "scallop" % "5.1.0",
"net.jodah" % "typetools" % "0.6.3",
"com.github.ben-manes.caffeine" % "caffeine" % "3.1.8"
"com.github.ben-manes.caffeine" % "caffeine" % "3.1.8",
),
libraryDependencies ++= jackson,
libraryDependencies ++= spark_all.map(_ % "provided"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class DynamoDBKVStoreImpl(dynamoDbClient: DynamoDbClient) extends KVStore {
override def multiGet(requests: Seq[KVStore.GetRequest]): Future[Seq[KVStore.GetResponse]] = {
// partition our requests into pure get style requests (where we're missing timestamps and only have key lookup)
// and query requests (we want to query a range based on afterTsMillis -> endTsMillis or now() )
val (getLookups, queryLookups) = requests.partition(r => r.afterTsMillis.isEmpty)
val (getLookups, queryLookups) = requests.partition(r => r.startTsMillis.isEmpty)
val getItemRequestPairs = getLookups.map { req =>
val keyAttributeMap = primaryKeyMap(req.keyBytes)
(req, GetItemRequest.builder.key(keyAttributeMap.asJava).tableName(req.dataset).build)
Expand Down Expand Up @@ -325,7 +325,7 @@ class DynamoDBKVStoreImpl(dynamoDbClient: DynamoDbClient) extends KVStore {
val partitionAlias = "#pk"
val timeAlias = "#ts"
val attrNameAliasMap = Map(partitionAlias -> partitionKeyColumn, timeAlias -> sortKeyColumn)
val startTs = request.afterTsMillis.get
val startTs = request.startTsMillis.get
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling for startTsMillis.get

The direct use of .get on startTsMillis could throw a NoSuchElementException. Consider adding proper error handling.

Apply this diff to add error handling:

-    val startTs = request.startTsMillis.get
+    val startTs = request.startTsMillis.getOrElse {
+      throw new IllegalArgumentException("startTsMillis is required for time-range queries")
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
val startTs = request.startTsMillis.get
val startTs = request.startTsMillis.getOrElse {
throw new IllegalArgumentException("startTsMillis is required for time-range queries")
}

val endTs = request.endTsMillis.getOrElse(System.currentTimeMillis())
val attrValuesMap =
Map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class BigTableKVStoreImpl(projectId: String, instanceId: String) extends KVStore

val queryTime = System.currentTimeMillis()
// scan from afterTsMillis to now - skip events with future timestamps
request.afterTsMillis.foreach { ts =>
request.startTsMillis.foreach { ts =>
// Bigtable uses microseconds
query.filter(Filters.FILTERS.timestamp().range().startOpen(ts * 1000).endClosed(queryTime))
}
Comment on lines +50 to 53
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Add tests for the timestamp range behavior.

The current implementation changes how timestamps are handled but lacks corresponding tests. Please add tests to cover the new timestamp filtering logic to ensure reliability and prevent regressions.

🔗 Analysis chain

Verify the timestamp range behavior change.

The change from afterTsMillis to startTsMillis represents a semantic shift in how time ranges are handled. The current implementation:

  • Excludes the start timestamp (using startOpen)
  • Includes the end timestamp (using endClosed)
  • Converts milliseconds to microseconds for Bigtable compatibility

Let's verify this change is consistent across the codebase:

Consider adding documentation to clarify the timestamp behavior:

 request.startTsMillis.foreach { ts =>
+  // Convert to microseconds and create a range filter that:
+  // - Excludes events at exactly startTsMillis (startOpen)
+  // - Includes events up to and including queryTime (endClosed)
   query.filter(Filters.FILTERS.timestamp().range().startOpen(ts * 1000).endClosed(queryTime))
 }
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for other timestamp-related filters to ensure consistent behavior
rg -t scala "afterTsMillis|startTsMillis" --no-heading

# Look for related test files to verify coverage
fd -e scala -e java test.*BigTable

Length of output: 1694


Script:

#!/bin/bash
# Search for tests covering timestamp range behavior in BigTableKVStoreImpl.scala
rg -t scala "startTsMillis|endTsMillis" --context 3 cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImplSpec.scala

# Alternatively, search for any BigTable related tests that might cover timestamp filtering
rg -t scala "Filters.FILTERS.timestamp" --context 3 test.*BigTable

Length of output: 395


Script:

#!/bin/bash
# Broad search for any test files related to BigTableKVStoreImpl
rg -t scala "BigTableKVStoreImpl" --files-with-matches

# Search for tests covering timestamp filtering logic across the entire test directory
rg -t scala "Filters.FILTERS.timestamp" --context 3 test/

Length of output: 319

Expand Down
Loading
Loading