Skip to content

fix: remove references to custom json, float essential apis to top #492

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Mar 13, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions api/src/main/scala/ai/chronon/api/Builders.scala
Original file line number Diff line number Diff line change
Expand Up @@ -264,14 +264,14 @@ object Builders {
online: Boolean = false,
production: Boolean = false,
customJson: String = null,
dependencies: Seq[String] = null,
namespace: String = null,
team: String = null,
samplePercent: Double = 100,
consistencySamplePercent: Double = 5,
tableProperties: Map[String, String] = Map.empty,
historicalBackfill: Boolean = true,
driftSpec: DriftSpec = null
driftSpec: DriftSpec = null,
additionalOutputPartitionColumns: Seq[String] = Seq.empty
): MetaData = {
val result = new MetaData()
result.setName(name)
Expand All @@ -298,6 +298,11 @@ object Builders {
result.setTableProperties(tableProperties.toJava)
if (driftSpec != null)
result.setDriftSpec(driftSpec)

if (additionalOutputPartitionColumns.nonEmpty) {
result.setAdditionalOutputPartitionColumns(additionalOutputPartitionColumns.toJava)
}

result
}
}
Expand Down
21 changes: 0 additions & 21 deletions api/src/main/scala/ai/chronon/api/Extensions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -154,20 +154,6 @@ object Extensions {
@deprecated("Use `name` instead.")
def nameToFilePath: String = metaData.name.replaceFirst("\\.", "/")

// helper function to extract values from customJson
def customJsonLookUp(key: String): Any = {
if (metaData.customJson == null) return null
val mapper = new ObjectMapper()
val typeRef = new TypeReference[java.util.HashMap[String, Object]]() {}
val jMap: java.util.Map[String, Object] = mapper.readValue(metaData.customJson, typeRef)
jMap.toScala.get(key).orNull
}

def owningTeam: String = {
val teamOverride = Try(customJsonLookUp(Constants.TeamOverride).asInstanceOf[String]).toOption
teamOverride.getOrElse(metaData.team)
}

// if drift spec is set but tile size is not set, default to 30 minutes
def driftTileSize: Option[Window] = {
Option(metaData.getDriftSpec) match {
Expand Down Expand Up @@ -460,13 +446,6 @@ object Extensions {
}
}

// Check if tiling is enabled for a given GroupBy. Defaults to false if the 'enable_tiling' flag isn't set.
def isTilingEnabled: Boolean =
groupBy.getMetaData.customJsonLookUp("enable_tiling") match {
case s: Boolean => s
case _ => false
}

def semanticHash: String = {
val newGroupBy = groupBy.deepCopy()
newGroupBy.unsetMetaData()
Expand Down
45 changes: 3 additions & 42 deletions api/src/test/scala/ai/chronon/api/test/ExtensionsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,11 @@

package ai.chronon.api.test

import ai.chronon.api.Accuracy
import ai.chronon.api.Builders
import ai.chronon.api.Constants
import ai.chronon.api.Extensions._
import ai.chronon.api.GroupBy
import ai.chronon.api.ScalaJavaConversions._
import org.junit.Assert.assertEquals
import org.junit.Assert.assertFalse
import org.junit.Assert.assertTrue
import org.mockito.Mockito.spy
import org.mockito.Mockito.when
import ai.chronon.api.{Accuracy, Builders, Constants, GroupBy}
import org.junit.Assert.{assertEquals, assertTrue}
import org.mockito.Mockito.{spy, when}
import org.scalatest.flatspec.AnyFlatSpec

import java.util.Arrays
Expand All @@ -41,24 +35,6 @@ class ExtensionsTest extends AnyFlatSpec {
)
}

it should "owning team" in {
val metadata =
Builders.MetaData(
customJson = "{\"check_consistency\": true, \"lag\": 0, \"team_override\": \"ml_infra\"}",
team = "chronon"
)

assertEquals(
"ml_infra",
metadata.owningTeam
)

assertEquals(
"chronon",
metadata.team
)
}

it should "row identifier" in {
val labelPart = Builders.LabelPart();
val res = labelPart.rowIdentifier(Arrays.asList("yoyo", "yujia"), "ds")
Expand Down Expand Up @@ -140,19 +116,4 @@ class ExtensionsTest extends AnyFlatSpec {
assertTrue(keys.contains(Constants.TimeColumn))
assertEquals(4, keys.size)
}

it should "is tiling enabled" in {
def buildGroupByWithCustomJson(customJson: String = null): GroupBy =
Builders.GroupBy(
metaData = Builders.MetaData(name = "featureGroupName", customJson = customJson)
)

// customJson not set defaults to false
assertFalse(buildGroupByWithCustomJson().isTilingEnabled)
assertFalse(buildGroupByWithCustomJson("{}").isTilingEnabled)

assertTrue(buildGroupByWithCustomJson("{\"enable_tiling\": true}").isTilingEnabled)
assertFalse(buildGroupByWithCustomJson("{\"enable_tiling\": false}").isTilingEnabled)
assertFalse(buildGroupByWithCustomJson("{\"enable_tiling\": \"string instead of bool\"}").isTilingEnabled)
}
}
14 changes: 9 additions & 5 deletions api/thrift/api.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,19 @@ struct MetaData {

4: optional string outputNamespace

5: optional map<string, string> tableProperties
/**
* By default we will just partition the output by the date column - set via "spark.chronon.partition.column"
* With this we will partition the output with the specified additional columns
**/
5: optional list<string> additionalOutputPartitionColumns

6: optional map<string, string> tableProperties
Comment on lines +256 to +258
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we keep tableProperties the same field number as before (5)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought outputPartitionCols are more important than table props. Safe to change these for now actually.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we should call out that folks need to recompile their existing configs right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call - we need to release a wheel and cut them over. was working on it separately. (basically I am doing the compile for them)


// tag_key -> tag_value - tags allow for repository wide querying, deprecations etc
// this is object level tag - applies to all columns produced by the object - GroupBy, Join, Model etc
6: optional map<string, string> tags
20: optional map<string, string> tags
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Field number here too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the spacing in field nums allows for adding things new fields in the right order. so if we find some other thing later, we can add it in the right place instead of in the end.

// column -> tag_key -> tag_value
7: optional map<string, map<string, string>> columnTags
21: optional map<string, map<string, string>> columnTags

// marking this as true means that the conf can be served online
// once marked online, a conf cannot be changed - compiling the conf won't be allowed
Expand Down Expand Up @@ -286,8 +292,6 @@ struct MetaData {
204: optional common.ExecutionInfo executionInfo
}



// Equivalent to a FeatureSet in chronon terms
struct GroupBy {
1: optional MetaData metaData
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import ai.chronon.api.Constants
import ai.chronon.api.DataType
import ai.chronon.api.GroupBy
import ai.chronon.api.Row
import ai.chronon.api.ScalaJavaConversions.ListOps
import ai.chronon.api.ScalaJavaConversions.{IteratorOps, ListOps}
import ai.chronon.flink.types.TimestampedIR
import ai.chronon.flink.types.TimestampedTile
import ai.chronon.online.TileCodec
Expand Down Expand Up @@ -42,6 +42,23 @@ class FlinkRowAggregationFunction(
private val valueColumns: Array[String] = inputSchema.map(_._1).toArray // column order matters
private val timeColumnAlias: String = Constants.TimeColumn

private val isMutation: Boolean = {
groupBy.getSources
.iterator()
.toScala
.exists(source => source.isSetEntities && source.getEntities.isSetMutationTopic)
}

private val reversalIndex = {
val result = inputSchema.indexWhere(_._1 == Constants.ReversalColumn)

if (isMutation)
require(result >= 0,
s"Please specify source.query.reversal_column for CDC sources, only found, ${inputSchema.map(_._1)}")

result
}

/*
* Initialize the transient rowAggregator.
* Running this method is an idempotent operation:
Expand All @@ -60,6 +77,7 @@ class FlinkRowAggregationFunction(
element: Map[String, Any],
accumulatorIr: TimestampedIR
): TimestampedIR = {

// Most times, the time column is a Long, but it could be a Double.
val tsMills = Try(element(timeColumnAlias).asInstanceOf[Long])
.getOrElse(element(timeColumnAlias).asInstanceOf[Double].toLong)
Expand All @@ -79,7 +97,14 @@ class FlinkRowAggregationFunction(
)

val partialAggregates = Try {
rowAggregator.update(accumulatorIr.ir, row)
val isDelete = isMutation && row.getAs[Boolean](reversalIndex)

if (isDelete) {
rowAggregator.delete(accumulatorIr.ir, row)
} else {
rowAggregator.update(accumulatorIr.ir, row)
}

}

partialAggregates match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ class GroupByServingInfoParsed(val groupByServingInfo: GroupByServingInfo, parti
// Start tiling specific variables

lazy val tiledCodec: TileCodec = new TileCodec(groupBy, valueChrononSchema.fields.map(sf => (sf.name, sf.fieldType)))
lazy val isTilingEnabled: Boolean = groupByOps.isTilingEnabled

// End tiling specific variables

Expand Down
18 changes: 4 additions & 14 deletions online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,12 @@ object MetadataEndPoint {
val ConfByKeyEndPointName = "CHRONON_METADATA"
val NameByTeamEndPointName = "CHRONON_ENTITY_BY_TEAM"

private def getTeamFromMetadata(metaData: MetaData): String = {
val team = metaData.team
if (metaData.customJson != null && metaData.customJson.nonEmpty) {
implicit val formats = DefaultFormats
val customJson = parse(metaData.customJson)
val teamFromJson: String = (customJson \ "team_override").extractOpt[String].getOrElse("")
if (teamFromJson.nonEmpty) teamFromJson else team
} else team
}

private def parseTeam[Conf <: TBase[_, _]: Manifest: ClassTag](conf: Conf): String = {
conf match {
case join: Join => "joins/" + getTeamFromMetadata(join.metaData)
case groupBy: GroupBy => "group_bys/" + getTeamFromMetadata(groupBy.metaData)
case stagingQuery: StagingQuery => "staging_queries/" + getTeamFromMetadata(stagingQuery.metaData)
case model: Model => "models/" + getTeamFromMetadata(model.metaData)
case join: Join => "joins/" + join.metaData.team
case groupBy: GroupBy => "group_bys/" + groupBy.metaData.team
case stagingQuery: StagingQuery => "staging_queries/" + stagingQuery.metaData.team
case model: Model => "models/" + model.metaData.team
case _ =>
logger.error(s"Failed to parse team from $conf")
throw new Exception(s"Failed to parse team from $conf")
Expand Down
6 changes: 3 additions & 3 deletions online/src/main/scala/ai/chronon/online/Metrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ object Metrics {
environment = environment,
join = join.metaData.cleanName,
production = join.metaData.isProduction,
team = join.metaData.owningTeam
team = join.metaData.team
)
}

Expand All @@ -108,7 +108,7 @@ object Metrics {
groupBy = groupBy.metaData.cleanName,
production = groupBy.metaData.isProduction,
accuracy = groupBy.inferredAccuracy,
team = groupBy.metaData.owningTeam,
team = groupBy.metaData.team,
join = groupBy.sources.toScala
.find(_.isSetJoinSource)
.map(_.getJoinSource.join.metaData.cleanName)
Expand All @@ -127,7 +127,7 @@ object Metrics {
environment = environment,
groupBy = stagingQuery.metaData.cleanName,
production = stagingQuery.metaData.isProduction,
team = stagingQuery.metaData.owningTeam
team = stagingQuery.metaData.team
)
}

Expand Down
12 changes: 0 additions & 12 deletions online/src/main/scala/ai/chronon/online/fetcher/FetchContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,6 @@ case class FetchContext(kvStore: KVStore,
disableErrorThrows: Boolean = false,
executionContextOverride: ExecutionContext = null) {

def isTilingEnabled: Boolean = {
Option(flagStore)
.map(_.isSet(FlagStoreConstants.TILING_ENABLED, Map.empty[String, String].toJava))
.exists(_.asInstanceOf[Boolean])
}

def shouldStreamingDecodeThrow(groupByName: String): Boolean = {
Option(flagStore)
.exists(
_.isSet("disable_streaming_decoding_error_throws", Map("group_by_streaming_dataset" -> groupByName).toJava))
}

def getOrCreateExecutionContext: ExecutionContext = {
Option(executionContextOverride).getOrElse(FlexibleExecutionContext.buildExecutionContext)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class GroupByFetcher(fetchContext: FetchContext, metadataStore: MetadataStore)
case Accuracy.TEMPORAL =>
// Build a tile key for the streaming request
// When we build support for layering, we can expand this out into a utility that builds n tile keys for n layers
val keyBytes = if (fetchContext.isTilingEnabled) {
val keyBytes = {

val tileKey = TilingUtils.buildTileKey(
groupByServingInfo.groupByOps.streamingDataset,
Expand All @@ -96,8 +96,6 @@ class GroupByFetcher(fetchContext: FetchContext, metadataStore: MetadataStore)
)

TilingUtils.serializeTileKey(tileKey)
} else {
streamingKeyBytes
}

Some(
Expand Down
Loading
Loading