Skip to content

Commit 6d8afd0

Browse files
authored
fix: remove references to custom json, float essential apis to top (#492)
## Summary custom json is not supposed to be relied on in scala code at all - it is purely meant for users to attach THEIR metadata to objects. ## Checklist - [x] Added Unit Tests - [x] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced new configuration flags that improve control over tiling behavior and streaming error handling. - Expanded metadata support to include additional output partition columns, enhancing data staging and query processing. - **Documentation** - Updated instructions for enabling tiled reads to reflect the new configuration parameter. - **Tests** - Enhanced test coverage to validate the new serving and tiling configurations. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent fa29dd4 commit 6d8afd0

File tree

17 files changed

+229
-128
lines changed

17 files changed

+229
-128
lines changed

api/py/ai/chronon/cli/compile/parse_teams.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ def _merge_mode_maps(
123123
result.backfill = _merge_maps(result.common, result.backfill)
124124
result.upload = _merge_maps(result.common, result.upload)
125125
result.streaming = _merge_maps(result.common, result.streaming)
126+
result.serving = _merge_maps(result.common, result.serving)
126127
result.common = None
127128
continue
128129

@@ -135,5 +136,6 @@ def _merge_mode_maps(
135136
result.streaming = _merge_maps(
136137
result.streaming, mode_map.common, mode_map.streaming
137138
)
139+
result.serving = _merge_maps(result.serving, mode_map.common, mode_map.serving)
138140

139141
return result

api/src/main/scala/ai/chronon/api/Builders.scala

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -264,14 +264,15 @@ object Builders {
264264
online: Boolean = false,
265265
production: Boolean = false,
266266
customJson: String = null,
267-
dependencies: Seq[String] = null,
268267
namespace: String = null,
269268
team: String = null,
270269
samplePercent: Double = 100,
271270
consistencySamplePercent: Double = 5,
272271
tableProperties: Map[String, String] = Map.empty,
273272
historicalBackfill: Boolean = true,
274-
driftSpec: DriftSpec = null
273+
driftSpec: DriftSpec = null,
274+
additionalOutputPartitionColumns: Seq[String] = Seq.empty,
275+
executionInfo: ExecutionInfo = null
275276
): MetaData = {
276277
val result = new MetaData()
277278
result.setName(name)
@@ -287,9 +288,7 @@ object Builders {
287288
}
288289

289290
result.setTeam(effectiveTeam)
290-
val executionInfo = new ExecutionInfo()
291-
.setHistoricalBackfill(historicalBackfill)
292-
result.setExecutionInfo(executionInfo)
291+
293292
if (samplePercent > 0)
294293
result.setSamplePercent(samplePercent)
295294
if (consistencySamplePercent > 0)
@@ -298,6 +297,19 @@ object Builders {
298297
result.setTableProperties(tableProperties.toJava)
299298
if (driftSpec != null)
300299
result.setDriftSpec(driftSpec)
300+
301+
if (executionInfo != null) {
302+
result.setExecutionInfo(executionInfo.setHistoricalBackfill(historicalBackfill))
303+
} else {
304+
result.setExecutionInfo(
305+
new ExecutionInfo()
306+
.setHistoricalBackfill(historicalBackfill))
307+
}
308+
309+
if (additionalOutputPartitionColumns.nonEmpty) {
310+
result.setAdditionalOutputPartitionColumns(additionalOutputPartitionColumns.toJava)
311+
}
312+
301313
result
302314
}
303315
}

api/src/main/scala/ai/chronon/api/Extensions.scala

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -154,20 +154,6 @@ object Extensions {
154154
@deprecated("Use `name` instead.")
155155
def nameToFilePath: String = metaData.name.replaceFirst("\\.", "/")
156156

157-
// helper function to extract values from customJson
158-
def customJsonLookUp(key: String): Any = {
159-
if (metaData.customJson == null) return null
160-
val mapper = new ObjectMapper()
161-
val typeRef = new TypeReference[java.util.HashMap[String, Object]]() {}
162-
val jMap: java.util.Map[String, Object] = mapper.readValue(metaData.customJson, typeRef)
163-
jMap.toScala.get(key).orNull
164-
}
165-
166-
def owningTeam: String = {
167-
val teamOverride = Try(customJsonLookUp(Constants.TeamOverride).asInstanceOf[String]).toOption
168-
teamOverride.getOrElse(metaData.team)
169-
}
170-
171157
// if drift spec is set but tile size is not set, default to 30 minutes
172158
def driftTileSize: Option[Window] = {
173159
Option(metaData.getDriftSpec) match {
@@ -460,13 +446,6 @@ object Extensions {
460446
}
461447
}
462448

463-
// Check if tiling is enabled for a given GroupBy. Defaults to false if the 'enable_tiling' flag isn't set.
464-
def isTilingEnabled: Boolean =
465-
groupBy.getMetaData.customJsonLookUp("enable_tiling") match {
466-
case s: Boolean => s
467-
case _ => false
468-
}
469-
470449
def semanticHash: String = {
471450
val newGroupBy = groupBy.deepCopy()
472451
newGroupBy.unsetMetaData()
@@ -600,6 +579,22 @@ object Extensions {
600579
QueryParts(allSelects, wheres)
601580
}
602581

582+
def servingFlagValue(flag: String): Option[String] = {
583+
for (
584+
execInfo <- Option(groupBy.metaData.executionInfo);
585+
conf <- Option(execInfo.conf);
586+
servingConf <- Option(conf.serving);
587+
value <- Option(servingConf.get(flag))
588+
) {
589+
return Some(value)
590+
}
591+
None
592+
}
593+
594+
def tilingFlag: Boolean = servingFlagValue("tiling").exists(_.toLowerCase() == "true")
595+
596+
def dontThrowOnDecodeFailFlag: Boolean = servingFlagValue("decode.throw_on_fail").exists(_.toLowerCase() == "false")
597+
603598
// build left streaming query for join source runner
604599
def buildLeftStreamingQuery(query: Query, defaultFieldNames: Seq[String]): String = {
605600
val queryParts = groupBy.buildQueryParts(query)

api/src/test/scala/ai/chronon/api/test/ExtensionsTest.scala

Lines changed: 22 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,11 @@
1616

1717
package ai.chronon.api.test
1818

19-
import ai.chronon.api.Accuracy
20-
import ai.chronon.api.Builders
21-
import ai.chronon.api.Constants
2219
import ai.chronon.api.Extensions._
23-
import ai.chronon.api.GroupBy
2420
import ai.chronon.api.ScalaJavaConversions._
25-
import org.junit.Assert.assertEquals
26-
import org.junit.Assert.assertFalse
27-
import org.junit.Assert.assertTrue
28-
import org.mockito.Mockito.spy
29-
import org.mockito.Mockito.when
21+
import ai.chronon.api.{Accuracy, Builders, ConfigProperties, Constants, ExecutionInfo, GroupBy}
22+
import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
23+
import org.mockito.Mockito.{spy, when}
3024
import org.scalatest.flatspec.AnyFlatSpec
3125

3226
import java.util.Arrays
@@ -41,24 +35,6 @@ class ExtensionsTest extends AnyFlatSpec {
4135
)
4236
}
4337

44-
it should "owning team" in {
45-
val metadata =
46-
Builders.MetaData(
47-
customJson = "{\"check_consistency\": true, \"lag\": 0, \"team_override\": \"ml_infra\"}",
48-
team = "chronon"
49-
)
50-
51-
assertEquals(
52-
"ml_infra",
53-
metadata.owningTeam
54-
)
55-
56-
assertEquals(
57-
"chronon",
58-
metadata.team
59-
)
60-
}
61-
6238
it should "row identifier" in {
6339
val labelPart = Builders.LabelPart();
6440
val res = labelPart.rowIdentifier(Arrays.asList("yoyo", "yujia"), "ds")
@@ -142,17 +118,29 @@ class ExtensionsTest extends AnyFlatSpec {
142118
}
143119

144120
it should "is tiling enabled" in {
145-
def buildGroupByWithCustomJson(customJson: String = null): GroupBy =
121+
def buildGroupByWithServingFlags(flags: Map[String, String] = null): GroupByOps = {
122+
123+
val execInfo: ExecutionInfo = if (flags != null) {
124+
new ExecutionInfo()
125+
.setConf(new ConfigProperties().setServing(flags.toJava))
126+
} else {
127+
null
128+
}
129+
146130
Builders.GroupBy(
147-
metaData = Builders.MetaData(name = "featureGroupName", customJson = customJson)
131+
metaData = Builders.MetaData(name = "featureGroupName", executionInfo = execInfo)
148132
)
149133

134+
}
135+
150136
// customJson not set defaults to false
151-
assertFalse(buildGroupByWithCustomJson().isTilingEnabled)
152-
assertFalse(buildGroupByWithCustomJson("{}").isTilingEnabled)
137+
assertFalse(buildGroupByWithServingFlags().tilingFlag)
138+
assertFalse(buildGroupByWithServingFlags(Map.empty).tilingFlag)
139+
140+
val trueGb = buildGroupByWithServingFlags(Map("tiling" -> "true"))
141+
assertTrue(trueGb.tilingFlag)
142+
assertFalse(buildGroupByWithServingFlags(Map("tiling" -> "false")).tilingFlag)
143+
assertFalse(buildGroupByWithServingFlags(Map("tiling" -> "invalid")).tilingFlag)
153144

154-
assertTrue(buildGroupByWithCustomJson("{\"enable_tiling\": true}").isTilingEnabled)
155-
assertFalse(buildGroupByWithCustomJson("{\"enable_tiling\": false}").isTilingEnabled)
156-
assertFalse(buildGroupByWithCustomJson("{\"enable_tiling\": \"string instead of bool\"}").isTilingEnabled)
157145
}
158146
}

api/thrift/api.thrift

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -249,13 +249,19 @@ struct MetaData {
249249

250250
4: optional string outputNamespace
251251

252-
5: optional map<string, string> tableProperties
252+
/**
253+
* By default we will just partition the output by the date column - set via "spark.chronon.partition.column"
254+
* With this we will partition the output with the specified additional columns
255+
**/
256+
5: optional list<string> additionalOutputPartitionColumns
257+
258+
6: optional map<string, string> tableProperties
253259

254260
// tag_key -> tag_value - tags allow for repository wide querying, deprecations etc
255261
// this is object level tag - applies to all columns produced by the object - GroupBy, Join, Model etc
256-
6: optional map<string, string> tags
262+
20: optional map<string, string> tags
257263
// column -> tag_key -> tag_value
258-
7: optional map<string, map<string, string>> columnTags
264+
21: optional map<string, map<string, string>> columnTags
259265

260266
// marking this as true means that the conf can be served online
261267
// once marked online, a conf cannot be changed - compiling the conf won't be allowed
@@ -286,8 +292,6 @@ struct MetaData {
286292
204: optional common.ExecutionInfo executionInfo
287293
}
288294

289-
290-
291295
// Equivalent to a FeatureSet in chronon terms
292296
struct GroupBy {
293297
1: optional MetaData metaData

api/thrift/common.thrift

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,42 @@ struct DateRange {
2525
2: string endDate
2626
}
2727

28+
/**
29+
* env vars for different modes of execution - with "common" applying to all modes
30+
* the submitter will set these env vars prior to launching the job
31+
*
32+
* these env vars are layered in order of priority
33+
* 1. company file defaults specified in teams.py - in the "common" team
34+
* 2. team wide defaults that apply to all objects in the team folder
35+
* 3. object specific defaults - applies to only the object that are declares them
36+
*
37+
* All the maps from the above three places are merged to create final env var
38+
**/
2839
struct EnvironmentVariables {
2940
1: optional map<string, string> common
3041
2: optional map<string, string> backfill
3142
3: optional map<string, string> upload
3243
4: optional map<string, string> streaming
44+
5: optional map<string, string> serving
3345
}
3446

47+
/**
48+
* job config for different modes of execution - with "common" applying to all modes
49+
* usually these are spark or flink conf params like "spark.executor.memory" etc
50+
*
51+
* these confs are layered in order of priority
52+
* 1. company file defaults specified in teams.py - in the "common" team
53+
* 2. team wide defaults that apply to all objects in the team folder
54+
* 3. object specific defaults - applies to only the object that are declares them
55+
*
56+
* All the maps from the above three places are merged to create final conf map
57+
**/
3558
struct ConfigProperties {
3659
1: optional map<string, string> common
3760
2: optional map<string, string> backfill
3861
3: optional map<string, string> upload
3962
4: optional map<string, string> streaming
63+
5: optional map<string, string> serving
4064
}
4165

4266
struct TableDependency {

docs/source/Tiled_Architecture.md

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,4 @@ the [Chronon on Flink documentation](setup/Flink.md) for instructions. As part o
8080
modify your KV store implementation to know how to write and fetch tiles.
8181

8282
Once the Flink app is set up and writing tiles to your datastore, the final step is to enable tiled reads in the
83-
Fetcher. Just add `enable_tiling=true` to
84-
the [customJson](https://github.com/airbnb/chronon/blob/48b789dd2c216c62bbf1d74fbf4e779f23db541f/api/py/ai/chronon/group_by.py#L561)
85-
of any GroupBy definition.
83+
Fetcher. Just add `tiling=true` to `metaData.executionInfo.conf.serving` of any GroupBy definition.

flink/src/main/scala/ai/chronon/flink/window/FlinkRowAggregators.scala

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import ai.chronon.api.Constants
55
import ai.chronon.api.DataType
66
import ai.chronon.api.GroupBy
77
import ai.chronon.api.Row
8-
import ai.chronon.api.ScalaJavaConversions.ListOps
8+
import ai.chronon.api.ScalaJavaConversions.{IteratorOps, ListOps}
99
import ai.chronon.flink.types.TimestampedIR
1010
import ai.chronon.flink.types.TimestampedTile
1111
import ai.chronon.online.TileCodec
@@ -42,6 +42,23 @@ class FlinkRowAggregationFunction(
4242
private val valueColumns: Array[String] = inputSchema.map(_._1).toArray // column order matters
4343
private val timeColumnAlias: String = Constants.TimeColumn
4444

45+
private val isMutation: Boolean = {
46+
Option(groupBy.getSources).exists(
47+
_.iterator().toScala
48+
.exists(source => source.isSetEntities && source.getEntities.isSetMutationTopic)
49+
)
50+
}
51+
52+
private val reversalIndex = {
53+
val result = inputSchema.indexWhere(_._1 == Constants.ReversalColumn)
54+
55+
if (isMutation)
56+
require(result >= 0,
57+
s"Please specify source.query.reversal_column for CDC sources, only found, ${inputSchema.map(_._1)}")
58+
59+
result
60+
}
61+
4562
/*
4663
* Initialize the transient rowAggregator.
4764
* Running this method is an idempotent operation:
@@ -60,6 +77,7 @@ class FlinkRowAggregationFunction(
6077
element: Map[String, Any],
6178
accumulatorIr: TimestampedIR
6279
): TimestampedIR = {
80+
6381
// Most times, the time column is a Long, but it could be a Double.
6482
val tsMills = Try(element(timeColumnAlias).asInstanceOf[Long])
6583
.getOrElse(element(timeColumnAlias).asInstanceOf[Double].toLong)
@@ -79,7 +97,14 @@ class FlinkRowAggregationFunction(
7997
)
8098

8199
val partialAggregates = Try {
82-
rowAggregator.update(accumulatorIr.ir, row)
100+
val isDelete = isMutation && row.getAs[Boolean](reversalIndex)
101+
102+
if (isDelete) {
103+
rowAggregator.delete(accumulatorIr.ir, row)
104+
} else {
105+
rowAggregator.update(accumulatorIr.ir, row)
106+
}
107+
83108
}
84109

85110
partialAggregates match {

online/src/main/scala/ai/chronon/online/GroupByServingInfoParsed.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ class GroupByServingInfoParsed(val groupByServingInfo: GroupByServingInfo, parti
8989
// Start tiling specific variables
9090

9191
lazy val tiledCodec: TileCodec = new TileCodec(groupBy, valueChrononSchema.fields.map(sf => (sf.name, sf.fieldType)))
92-
lazy val isTilingEnabled: Boolean = groupByOps.isTilingEnabled
9392

9493
// End tiling specific variables
9594

online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,22 +25,12 @@ object MetadataEndPoint {
2525
val ConfByKeyEndPointName = "CHRONON_METADATA"
2626
val NameByTeamEndPointName = "CHRONON_ENTITY_BY_TEAM"
2727

28-
private def getTeamFromMetadata(metaData: MetaData): String = {
29-
val team = metaData.team
30-
if (metaData.customJson != null && metaData.customJson.nonEmpty) {
31-
implicit val formats = DefaultFormats
32-
val customJson = parse(metaData.customJson)
33-
val teamFromJson: String = (customJson \ "team_override").extractOpt[String].getOrElse("")
34-
if (teamFromJson.nonEmpty) teamFromJson else team
35-
} else team
36-
}
37-
3828
private def parseTeam[Conf <: TBase[_, _]: Manifest: ClassTag](conf: Conf): String = {
3929
conf match {
40-
case join: Join => "joins/" + getTeamFromMetadata(join.metaData)
41-
case groupBy: GroupBy => "group_bys/" + getTeamFromMetadata(groupBy.metaData)
42-
case stagingQuery: StagingQuery => "staging_queries/" + getTeamFromMetadata(stagingQuery.metaData)
43-
case model: Model => "models/" + getTeamFromMetadata(model.metaData)
30+
case join: Join => "joins/" + join.metaData.team
31+
case groupBy: GroupBy => "group_bys/" + groupBy.metaData.team
32+
case stagingQuery: StagingQuery => "staging_queries/" + stagingQuery.metaData.team
33+
case model: Model => "models/" + model.metaData.team
4434
case _ =>
4535
logger.error(s"Failed to parse team from $conf")
4636
throw new Exception(s"Failed to parse team from $conf")

0 commit comments

Comments
 (0)