-
Notifications
You must be signed in to change notification settings - Fork 0
[WIP] Drift metrics #59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 22 commits
c45017f
cac3e34
b4f0848
02fadac
b2a1947
40738b4
8c3d8f2
89ca7e5
d02202c
cdeafb2
c2e15b8
c8e5070
7a9eef4
c108a59
616aac1
a29dc9f
9968772
de35bc7
2eaced3
5d37c05
c08d6c6
882dc80
313b47c
957f438
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,7 +37,7 @@ object Constants { | |
val ChrononDynamicTable = "chronon_dynamic_table" | ||
val ChrononOOCTable: String = "chronon_ooc_table" | ||
val ChrononLogTable: String = "chronon_log_table" | ||
val ChrononMetadataKey = "ZIPLINE_METADATA" | ||
val MetadataDataset = "CHRONON_METADATA" | ||
val SchemaPublishEvent = "SCHEMA_PUBLISH_EVENT" | ||
val StatsBatchDataset = "CHRONON_STATS_BATCH" | ||
val ConsistencyMetricsDataset = "CHRONON_CONSISTENCY_METRICS_STATS_BATCH" | ||
|
@@ -62,5 +62,8 @@ object Constants { | |
val LabelViewPropertyFeatureTable: String = "feature_table" | ||
val LabelViewPropertyKeyLabelTable: String = "label_table" | ||
val ChrononRunDs: String = "CHRONON_RUN_DS" | ||
val DriftStatsTable: String = "drift_statistics" | ||
|
||
val TiledSummaryDataset: String = "TILE_SUMMARIES" | ||
|
||
val DefaultDriftTileSize: Window = new Window(30, TimeUnit.MINUTES) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add missing Window import and document the tile size choice Two issues to address:
Add the following import at the top of the file: + import ai.chronon.core.Window Consider adding a comment explaining why 30 minutes was chosen as the default tile size for drift calculations.
|
||
} |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -158,6 +158,15 @@ object Extensions { | |||||||||||||||||||||||||||||||||||||||||||||||||||||
val teamOverride = Try(customJsonLookUp(Constants.TeamOverride).asInstanceOf[String]).toOption | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
teamOverride.getOrElse(metaData.team) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
// if drift spec is set but tile size is not set, default to 30 minutes | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
def driftTileSize: Option[Window] = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
Option(metaData.getDriftSpec) match { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
case Some(driftSpec) => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
Option(driftSpec.getTileSize).orElse(Some(Constants.DefaultDriftTileSize)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
case None => None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
// one per output column - so single window | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -879,24 +888,69 @@ object Extensions { | |||||||||||||||||||||||||||||||||||||||||||||||||||||
partHashes ++ Map(leftSourceKey -> leftHash, join.metaData.bootstrapTable -> bootstrapHash) ++ derivedHashMap | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
/* | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
External features computed in online env and logged | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
This method will get the external feature column names | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
def getExternalFeatureCols: Seq[String] = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
Option(join.onlineExternalParts) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
.map(_.toScala | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
.map { part => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
val keys = part.source.getKeySchema.params.toScala | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
.map(_.name) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
val values = part.source.getValueSchema.params.toScala | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
.map(_.name) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
keys ++ values | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
def externalPartColumns: Map[String, Array[String]] = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
Option(join.onlineExternalParts) match { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
case Some(parts) => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
parts.toScala.map { part => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
val keys = part.source.getKeySchema.params.toScala.map(_.name) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
val values = part.source.getValueSchema.params.toScala.map(_.name) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
part.fullName -> (keys ++ values).toArray | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
}.toMap | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
case None => Map.empty | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+891
to
+900
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ensure safe handling of potential When accessing Consider modifying the code to safely handle -parts.toScala.map { part =>
- val keys = part.source.getKeySchema.params.toScala.map(_.name)
- val values = part.source.getValueSchema.params.toScala.map(_.name)
- part.fullName -> (keys ++ values).toArray
+parts.toScala.map { part =>
+ val keys = Option(part.source.getKeySchema)
+ .map(_.params.toScala.map(_.name))
+ .getOrElse(Seq.empty)
+ val values = Option(part.source.getValueSchema)
+ .map(_.params.toScala.map(_.name))
+ .getOrElse(Seq.empty)
+ part.fullName -> (keys ++ values).toArray 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
def derivedColumns: Array[String] = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
Option(join.getDerivations) match { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
case Some(derivations) => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
derivations.toScala.flatMap { derivation => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
derivation.getName match { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
case "*" => None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
case _ => Some(derivation.getName) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
.flatMap(_.toSet)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
.getOrElse(Seq.empty) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
}.toArray | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
case None => Array.empty | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+902
to
+912
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Handle potential In the Modify the code to filter out -derivations.toScala.flatMap { derivation =>
- derivation.getName match {
- case "*" => None
- case _ => Some(derivation.getName)
- }
-}.toArray
+derivations.toScala.flatMap { derivation =>
+ Option(derivation.getName) match {
+ case Some("*") => None
+ case Some(name) => Some(name)
+ case None => None
+ }
+}.toArray 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
// renamed cols are no longer part of the output | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
private def renamedColumns: Set[String] = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
Option(join.derivations) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
.map { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
_.toScala.renameOnlyDerivations.map(_.expression).toSet | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
.getOrElse(Set.empty) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
def joinPartColumns: Map[String, Array[String]] = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
Option(join.getJoinParts) match { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
case None => Map.empty | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
case Some(parts) => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
parts.toScala.map { part => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
val prefix = Option(part.prefix) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
val groupByName = part.getGroupBy.getMetaData.cleanName | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
val partName = (prefix.toSeq :+ groupByName).mkString("_") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
val outputColumns = part.getGroupBy.valueColumns | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
val cols = outputColumns.map { column => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
(prefix.toSeq :+ groupByName :+ column).mkString("_") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
partName -> cols | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
}.toMap | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
def outputColumnsByGroup: Map[String, Array[String]] = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
val preDeriveCols = (joinPartColumns ++ externalPartColumns) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
val preDerivedWithoutRenamed = preDeriveCols.mapValues(_.filterNot(renamedColumns.contains)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
val derivedColumns: Array[String] = Option(join.derivations) match { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
case Some(derivations) => derivations.toScala.map { _.getName }.filter(_ == "*").toArray | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
case None => Array.empty | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
preDerivedWithoutRenamed ++ Map("derivations" -> derivedColumns) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
def keyColumns: Array[String] = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
val joinPartKeys = join.joinParts.toScala.flatMap(_.groupBy.keyColumns.toScala).toSet | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
val externalKeys = join.onlineExternalParts.toScala.flatMap(_.source.keyNames).toSet | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
val bootstrapKeys = join.bootstrapParts.toScala.flatMap(_.keyColumns.toScala).toSet | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
(joinPartKeys ++ externalKeys ++ bootstrapKeys).toArray | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+950
to
+953
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ensure safe access to collections in Accessing Modify the code to include null checks: -val externalKeys = join.onlineExternalParts.toScala.flatMap(_.source.keyNames).toSet
-val bootstrapKeys = join.bootstrapParts.toScala.flatMap(_.keyColumns.toScala).toSet
+val externalKeys = Option(join.onlineExternalParts)
+ .map(_.toScala.flatMap(_.source.keyNames).toSet)
+ .getOrElse(Set.empty)
+val bootstrapKeys = Option(join.bootstrapParts)
+ .map(_.toScala.flatMap(_.keyColumns.toScala).toSet)
+ .getOrElse(Set.empty) 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
/* | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -127,7 +127,7 @@ class DynamoDBKVStoreImpl(dynamoDbClient: DynamoDbClient) extends KVStore { | |||||||||
override def multiGet(requests: Seq[KVStore.GetRequest]): Future[Seq[KVStore.GetResponse]] = { | ||||||||||
// partition our requests into pure get style requests (where we're missing timestamps and only have key lookup) | ||||||||||
// and query requests (we want to query a range based on afterTsMillis -> endTsMillis or now() ) | ||||||||||
val (getLookups, queryLookups) = requests.partition(r => r.afterTsMillis.isEmpty) | ||||||||||
val (getLookups, queryLookups) = requests.partition(r => r.startTsMillis.isEmpty) | ||||||||||
val getItemRequestPairs = getLookups.map { req => | ||||||||||
val keyAttributeMap = primaryKeyMap(req.keyBytes) | ||||||||||
(req, GetItemRequest.builder.key(keyAttributeMap.asJava).tableName(req.dataset).build) | ||||||||||
|
@@ -325,7 +325,7 @@ class DynamoDBKVStoreImpl(dynamoDbClient: DynamoDbClient) extends KVStore { | |||||||||
val partitionAlias = "#pk" | ||||||||||
val timeAlias = "#ts" | ||||||||||
val attrNameAliasMap = Map(partitionAlias -> partitionKeyColumn, timeAlias -> sortKeyColumn) | ||||||||||
val startTs = request.afterTsMillis.get | ||||||||||
val startTs = request.startTsMillis.get | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add error handling for startTsMillis.get The direct use of Apply this diff to add error handling: - val startTs = request.startTsMillis.get
+ val startTs = request.startTsMillis.getOrElse {
+ throw new IllegalArgumentException("startTsMillis is required for time-range queries")
+ } 📝 Committable suggestion
Suggested change
|
||||||||||
val endTs = request.endTsMillis.getOrElse(System.currentTimeMillis()) | ||||||||||
val attrValuesMap = | ||||||||||
Map( | ||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,7 +47,7 @@ class BigTableKVStoreImpl(projectId: String, instanceId: String) extends KVStore | |
|
||
val queryTime = System.currentTimeMillis() | ||
// scan from afterTsMillis to now - skip events with future timestamps | ||
request.afterTsMillis.foreach { ts => | ||
request.startTsMillis.foreach { ts => | ||
// Bigtable uses microseconds | ||
query.filter(Filters.FILTERS.timestamp().range().startOpen(ts * 1000).endClosed(queryTime)) | ||
} | ||
Comment on lines
+50
to
53
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Codebase verification Add tests for the timestamp range behavior. The current implementation changes how timestamps are handled but lacks corresponding tests. Please add tests to cover the new timestamp filtering logic to ensure reliability and prevent regressions. 🔗 Analysis chainVerify the timestamp range behavior change. The change from
Let's verify this change is consistent across the codebase: Consider adding documentation to clarify the timestamp behavior: request.startTsMillis.foreach { ts =>
+ // Convert to microseconds and create a range filter that:
+ // - Excludes events at exactly startTsMillis (startOpen)
+ // - Includes events up to and including queryTime (endClosed)
query.filter(Filters.FILTERS.timestamp().range().startOpen(ts * 1000).endClosed(queryTime))
} 🏁 Scripts executedThe following scripts were executed for the analysis: Script: #!/bin/bash
# Search for other timestamp-related filters to ensure consistent behavior
rg -t scala "afterTsMillis|startTsMillis" --no-heading
# Look for related test files to verify coverage
fd -e scala -e java test.*BigTable
Length of output: 1694 Script: #!/bin/bash
# Search for tests covering timestamp range behavior in BigTableKVStoreImpl.scala
rg -t scala "startTsMillis|endTsMillis" --context 3 cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImplSpec.scala
# Alternatively, search for any BigTable related tests that might cover timestamp filtering
rg -t scala "Filters.FILTERS.timestamp" --context 3 test.*BigTable
Length of output: 395 Script: #!/bin/bash
# Broad search for any test files related to BigTableKVStoreImpl
rg -t scala "BigTableKVStoreImpl" --files-with-matches
# Search for tests covering timestamp filtering logic across the entire test directory
rg -t scala "Filters.FILTERS.timestamp" --context 3 test/
Length of output: 319 |
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical: Invalid runner label will break CI
The runner label
ubuntu-8_cores-32_gb
is not a standard GitHub-hosted runner. This will cause the CI workflow to fail.For GitHub-hosted runners with more resources, use:
ubuntu-latest-8-cores
for 8 core runnerubuntu-latest-16-cores
for 16 core runnerApply this diff to use a valid GitHub-hosted runner:
If you're using self-hosted runners:
actionlint.yaml
to specify custom runner labels📝 Committable suggestion
🧰 Tools
🪛 actionlint
52-52: label "ubuntu-8_cores-32_gb" is unknown. available labels are "windows-latest", "windows-latest-8-cores", "windows-2022", "windows-2019", "ubuntu-latest", "ubuntu-latest-4-cores", "ubuntu-latest-8-cores", "ubuntu-latest-16-cores", "ubuntu-24.04", "ubuntu-22.04", "ubuntu-20.04", "macos-latest", "macos-latest-xl", "macos-latest-xlarge", "macos-latest-large", "macos-15-xlarge", "macos-15-large", "macos-15", "macos-14-xl", "macos-14-xlarge", "macos-14-large", "macos-14", "macos-14.0", "macos-13-xl", "macos-13-xlarge", "macos-13-large", "macos-13", "macos-13.0", "macos-12-xl", "macos-12-xlarge", "macos-12-large", "macos-12", "macos-12.0", "self-hosted", "x64", "arm", "arm64", "linux", "macos", "windows". if it is a custom label for self-hosted runner, set list of labels in actionlint.yaml config file
(runner-label)