Skip to content

Commit f8f8000

Browse files
authored
Switch out old APIs (#259)
## Summary [Ticket](https://app.asana.com/0/0/1209184028451140) This PR includes both frontend and backend updates. Since @nikhil-zlai and I reviewed the high-level changes today, it might be helpful for @sean-zlai and me to go over them during our sync tomorrow. There are some key points I'd like to highlight, but I wanted to get this up early for review. **Reminder:** @sean-zlai will handle merging the new drift/summary functionality into the observability frontend. ### **Frontend Changes (@sean-zlai)** - Added new endpoints for Conf, Confs, search, drift, summary, etc., currently called on the `/thrift` route for demonstration. The old endpoints can be removed once we wire in the new ones. - Created `LogicalNodeTable.svelte` to display all `ConfTypes`, including empty states. - Updated search functionality to work for all `ConfTypes`, though only `risk.user_transactions.txn_join` is enabled for now. ### **Backend Changes (@nikhil-zlai)** - Implemented new endpoints using `RouteHandlerWrapper`. - Removed old endpoints (except `/timeseries`, which is still needed for charts). - Separated drift and summary into two distinct endpoints. - Introduced: - `DriftHandler` (handles drift and summaries). - `ConfHandler` (handles Conf, Confs, and Conf search). - Added filtering by column to `DriftStore`. - Improved and added various tests. - Developed a new serializer that integrates well with `RouteHandlerWrapper`. Let me know if you have any questions! ## Checklist - [x] Added Unit Tests - [x] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes - **New Features** - Added configuration management capabilities across joins, models, group bys, and staging queries. - Introduced drift analysis and summary tracking for configurations. - Enhanced API endpoints for retrieving and searching configurations. - New `LogicalNodeTable` component for displaying configuration data. - Added new structures and enumerations for handling configuration requests in the API. - **Improvements** - Refactored API response handling to use JSON serialization. - Improved null value handling in data processing with predefined constants. - Updated type definitions for better type safety. - Streamlined configuration retrieval and search mechanisms. - Enhanced handling of search results by categorizing entity types. - **Bug Fixes** - Resolved issues with null value representation in long and double iterators. - Improved error handling in API request processing. - **Breaking Changes** - Removed previous model and join metadata endpoints. - Updated API method signatures for configuration retrieval. - Eliminated deprecated response types related to joins and models. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent ea6cfb5 commit f8f8000

File tree

9 files changed

+189
-21
lines changed

9 files changed

+189
-21
lines changed

api/src/main/scala/ai/chronon/api/Constants.scala

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,7 @@ object Constants {
7777
val extensionsToIgnore: Array[String] = Array(".class", ".csv", ".java", ".scala", ".py", ".DS_Store")
7878
val foldersToIgnore: Array[String] = Array(".git")
7979

80-
// import base64
81-
// text_bytes = "chronon".encode('utf-8')
82-
// base64_str = base64.b64encode(text_bytes)
83-
// int.from_bytes(base64.b64decode(base64_str), "big")
84-
//
85-
// output: 27980863399423854
86-
87-
val magicNullDouble: java.lang.Double = -27980863399423854.0
80+
// A negative integer within the safe range for both long and double in JavaScript, Java, Scala, Python
81+
val magicNullLong: java.lang.Long = -1234567890L
82+
val magicNullDouble: java.lang.Double = -1234567890.0
8883
}

api/src/test/scala/ai/chronon/api/test/TileSeriesSerializationTest.scala

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@ import ai.chronon.api.Constants
44
import ai.chronon.api.ScalaJavaConversions.JListOps
55
import ai.chronon.api.ThriftJsonCodec
66
import ai.chronon.observability.TileDriftSeries
7+
import ai.chronon.observability.TileSummarySeries
78
import org.scalatest.flatspec.AnyFlatSpec
89
import org.scalatest.matchers.should.Matchers
910

1011
import java.lang.{Double => JDouble}
12+
import java.lang.{Long => JLong}
13+
import scala.jdk.CollectionConverters.asScalaBufferConverter
1114

1215
class TileSeriesSerializationTest extends AnyFlatSpec with Matchers {
1316

@@ -27,8 +30,49 @@ class TileSeriesSerializationTest extends AnyFlatSpec with Matchers {
2730

2831
val jsonStr = ThriftJsonCodec.toJsonStr(tileDriftSeries)
2932

30-
jsonStr should be ("""{"percentileDriftSeries":[0.1,-2.7980863399423856E16,-2.7980863399423856E16,-2.7980863399423856E16,0.5]}""")
33+
jsonStr should be (s"""{"percentileDriftSeries":[0.1,${Constants.magicNullDouble},${Constants.magicNullDouble},${Constants.magicNullDouble},0.5]}""")
3134
}
3235

36+
it should "deserialize double values correctly" in {
37+
val json = s"""{"percentileDriftSeries":[0.1,${Constants.magicNullDouble},${Constants.magicNullDouble},${Constants.magicNullDouble},0.5]}"""
38+
39+
val series = ThriftJsonCodec.fromJsonStr[TileDriftSeries](json, true, classOf[TileDriftSeries])(manifest[TileDriftSeries])
40+
41+
val drifts = series.getPercentileDriftSeries.asScala.toList
42+
drifts.size should be (5)
43+
drifts(0) should be (0.1)
44+
drifts(1) should be (Constants.magicNullDouble)
45+
drifts(2) should be (Constants.magicNullDouble)
46+
drifts(3) should be (Constants.magicNullDouble)
47+
drifts(4) should be (0.5)
48+
}
49+
50+
"TileSummarySeries" should "serialize with nulls and special long values" in {
51+
val tileSummarySeries = new TileSummarySeries()
52+
53+
val counts: Seq[JLong] = Seq(100L, null, Long.MaxValue, Constants.magicNullLong, 500L)
54+
.map(v => if (v == null) Constants.magicNullLong else v.asInstanceOf[JLong])
55+
56+
val countsList: java.util.List[JLong] = counts.toJava
57+
tileSummarySeries.setCount(countsList)
58+
59+
val jsonStr = ThriftJsonCodec.toJsonStr(tileSummarySeries)
60+
61+
jsonStr should be (s"""{"count":[100,${Constants.magicNullLong},9223372036854775807,${Constants.magicNullLong},500]}""")
62+
}
63+
64+
it should "deserialize long values correctly" in {
65+
val json = s"""{"count":[100,${Constants.magicNullLong},9223372036854775807,${Constants.magicNullLong},500]}"""
66+
67+
val series = ThriftJsonCodec.fromJsonStr[TileSummarySeries](json, true, classOf[TileSummarySeries])(manifest[TileSummarySeries])
68+
69+
val counts = series.getCount.asScala.toList
70+
counts.size should be (5)
71+
counts(0) should be (100L)
72+
counts(1) should be (Constants.magicNullLong)
73+
counts(2) should be (Long.MaxValue)
74+
counts(3) should be (Constants.magicNullLong)
75+
counts(4) should be (500L)
76+
}
3377

3478
}

api/thrift/api.thrift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -432,4 +432,4 @@ struct Model {
432432
3: optional TDataType outputSchema
433433
4: optional Source source
434434
5: optional map<string, string> modelParams
435-
}
435+
}

api/thrift/hub.thrift

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,3 +115,39 @@ struct Submission {
115115
20: optional i64 finishedTs
116116
21: optional DateRange dateRange
117117
}
118+
119+
enum ConfType{
120+
STAGING_QUERY = 1
121+
GROUP_BY = 2
122+
JOIN = 3
123+
MODEL = 4
124+
}
125+
126+
struct ConfRequest {
127+
1: optional string confName
128+
2: optional ConfType confType
129+
130+
// one of either branch or version are set - otherwise we will pull conf for main branch
131+
3: optional string branch
132+
4: optional string version
133+
}
134+
135+
/**
136+
* lists all confs of the specified type
137+
*/
138+
struct ConfListRequest {
139+
1: optional ConfType confType
140+
141+
// if not specified we will pull conf list for main branch
142+
2: optional string branch
143+
}
144+
145+
/**
146+
* Response for listing configurations of a specific type
147+
*/
148+
struct ConfListResponse {
149+
1: optional list<api.Join> joins
150+
2: optional list<api.GroupBy> groupBys
151+
3: optional list<api.Model> models
152+
4: optional list<api.StagingQuery> stagingQueries
153+
}

api/thrift/observability.thrift

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,4 +139,24 @@ struct DriftSpec {
139139

140140
// default drift metric to use
141141
6: optional DriftMetric driftMetric = DriftMetric.JENSEN_SHANNON
142-
}
142+
}
143+
144+
struct JoinDriftRequest {
145+
1: required string name
146+
2: required i64 startTs
147+
3: required i64 endTs
148+
6: optional string offset // Format: "24h" or "7d"
149+
7: optional DriftMetric algorithm
150+
8: optional string columnName
151+
}
152+
153+
struct JoinDriftResponse {
154+
1: required list<TileDriftSeries> driftSeries
155+
}
156+
157+
struct JoinSummaryRequest {
158+
1: required string name
159+
2: required i64 startTs
160+
3: required i64 endTs
161+
8: required string columnName
162+
}

online/src/main/scala/ai/chronon/online/stats/DriftStore.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,11 +84,14 @@ class DriftStore(kvStore: KVStore,
8484
val tileKeyMap = tileKeysForJoin(joinConf, None, columnPrefix)
8585
val requestContextMap: Map[GetRequest, SummaryRequestContext] = tileKeyMap.flatMap {
8686
case (group, keys) =>
87-
keys.map { key =>
88-
val keyBytes = serializer.serialize(key)
89-
val get = GetRequest(keyBytes, summaryDataset, startTsMillis = startMs, endTsMillis = endMs)
90-
get -> SummaryRequestContext(get, key, group)
91-
}
87+
// Only create requests for keys that match our column prefix
88+
keys
89+
.filter(key => columnPrefix.forall(prefix => key.getColumn == prefix))
90+
.map { key =>
91+
val keyBytes = serializer.serialize(key)
92+
val get = GetRequest(keyBytes, summaryDataset, startTsMillis = startMs, endTsMillis = endMs)
93+
get -> SummaryRequestContext(get, key, group)
94+
}
9295
}
9396

9497
val responseFuture = kvStore.multiGet(requestContextMap.keys.toSeq)

online/src/main/scala/ai/chronon/online/stats/PivotUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ object PivotUtils {
9595
if (isSetFunc(summary)) {
9696
JLong.valueOf(extract(summary))
9797
} else {
98-
null
98+
Constants.magicNullLong
9999
}
100100
}
101101
}

online/src/test/scala/ai/chronon/online/test/stats/PivotUtilsTest.scala

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ class PivotUtilsTest extends AnyFlatSpec with Matchers {
113113
(ts3, 3000L)
114114
))
115115

116-
result.getCount.asScala shouldEqual List(100L, null, 300L)
116+
result.getCount.asScala.toList shouldEqual List(100L, Constants.magicNullLong, 300L)
117117
}
118118

119119
it should "preserve timestamp order" in {
@@ -313,4 +313,62 @@ class PivotUtilsTest extends AnyFlatSpec with Matchers {
313313
series(0) shouldBe Constants.magicNullDouble
314314
series(1) shouldBe 0.5
315315
}
316+
317+
it should "handle Long.MAX_VALUE and magicNullLong values" in {
318+
val ts1 = new TileSummary()
319+
ts1.setCount(Long.MaxValue)
320+
321+
val ts2 = new TileSummary()
322+
// count is not set, should become magicNullLong
323+
324+
val ts3 = new TileSummary()
325+
ts3.setCount(100L)
326+
327+
val result = pivot(Array(
328+
(ts1, 1000L),
329+
(ts2, 2000L),
330+
(ts3, 3000L)
331+
))
332+
333+
result.getCount.asScala shouldEqual List(Long.MaxValue, Constants.magicNullLong, 100L)
334+
}
335+
336+
it should "handle all null Long values" in {
337+
val ts1 = new TileSummary()
338+
val ts2 = new TileSummary()
339+
val ts3 = new TileSummary()
340+
// no counts set for any summary
341+
342+
val result = pivot(Array(
343+
(ts1, 1000L),
344+
(ts2, 2000L),
345+
(ts3, 3000L)
346+
))
347+
348+
// Since all values are unset, they should all be magicNullLong rather than null
349+
result.getCount.asScala.toList shouldEqual List.fill(3)(Constants.magicNullLong)
350+
}
351+
352+
it should "handle mixed null and non-null Long fields" in {
353+
val ts1 = new TileSummary()
354+
ts1.setCount(100L)
355+
ts1.setNullCount(10L)
356+
357+
val ts2 = new TileSummary()
358+
// count not set
359+
ts2.setNullCount(20L)
360+
361+
val ts3 = new TileSummary()
362+
ts3.setCount(300L)
363+
// nullCount not set
364+
365+
val result = pivot(Array(
366+
(ts1, 1000L),
367+
(ts2, 2000L),
368+
(ts3, 3000L)
369+
))
370+
371+
result.getCount.asScala shouldEqual List(100L, Constants.magicNullLong, 300L)
372+
result.getNullCount.asScala shouldEqual List(10L, 20L, Constants.magicNullLong)
373+
}
316374
}

service_commons/src/main/java/ai/chronon/service/RouteHandlerWrapper.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import ai.chronon.api.thrift.*;
44
import ai.chronon.api.thrift.protocol.TBinaryProtocol;
5+
import ai.chronon.api.thrift.protocol.TSimpleJSONProtocol;
56
import ai.chronon.api.thrift.transport.TTransportException;
67
import io.vertx.core.Handler;
78
import io.vertx.core.json.JsonObject;
@@ -50,7 +51,6 @@ public class RouteHandlerWrapper {
5051
private static final ThreadLocal<Base64.Encoder> base64Encoder = ThreadLocal.withInitial(Base64::getEncoder);
5152
private static final ThreadLocal<Base64.Decoder> base64Decoder = ThreadLocal.withInitial(Base64::getDecoder);
5253

53-
5454
public static <T extends TBase> T deserializeTBinaryBase64(String base64Data, Class<? extends TBase> clazz) throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, TException {
5555
byte[] binaryData = base64Decoder.get().decode(base64Data);
5656
T tb = (T) clazz.getDeclaredConstructor().newInstance();
@@ -87,8 +87,20 @@ public static <I, O> Handler<RoutingContext> createHandler(Function<I, O> transf
8787

8888
String responseFormat = ctx.request().getHeader(RESPONSE_CONTENT_TYPE_HEADER);
8989
if (responseFormat == null || responseFormat.equals("application/json")) {
90-
// Send json response
91-
ctx.response().setStatusCode(200).putHeader("content-type", JSON_TYPE_VALUE).end(JsonObject.mapFrom(output).encode());
90+
try {
91+
TSerializer serializer = new TSerializer(new TSimpleJSONProtocol.Factory());
92+
String jsonString = serializer.toString((TBase)output);
93+
ctx.response()
94+
.setStatusCode(200)
95+
.putHeader("content-type", JSON_TYPE_VALUE)
96+
.end(jsonString);
97+
} catch (TException e) {
98+
LOGGER.error("Failed to serialize response", e);
99+
throw new RuntimeException(e);
100+
} catch (Exception e) {
101+
LOGGER.error("Unexpected error during serialization", e);
102+
throw new RuntimeException(e);
103+
}
92104
} else {
93105
if (!responseFormat.equals(TBINARY_B64_TYPE_VALUE)) {
94106
throw new IllegalArgumentException(String.format("Unsupported response-content-type: %s. Supported values are: %s and %s", responseFormat, JSON_TYPE_VALUE, TBINARY_B64_TYPE_VALUE));

0 commit comments

Comments
 (0)