Skip to content

Commit ae35b82

Browse files
authored
Add list & join schema fetcher APIs (#431)
## Summary Add a couple of APIs to help with the our clients ***REMOVED*** integration. One is to list out all online joins and the second is to retrieve the join schema details for a given Join. As part of wiring up list support, I tweaked a couple of properties like the list pagination key / list call limit to make things consistent between DynamoDB and BigTable. For the BT implementation we issue a range query under the 'joins/' prefix. Subsequent calls (in case of pagination) continue off this range (verified this via unit tests and also basic sanity cheour clientss on our clients). APIs added are: * /v1/joins -> Return the list of online joins * /v1/join/schema/join-name -> Return a payload consisting of {"joinName": "..", "keySchema": "avro schema", "valueSchema": "avro schema", "schemaHash": "hash"} Tested by dropping the doour clientser container and confirming things on the our clients side: ``` $ curl http://localhost:9000/v1/joins {"joinNames":["search.ranking.v1_web_zipline_cdc_and_beacon_external" ...} ``` And ``` curl http://localhost:9000/v1/join/schema/search.ranking.v1_web_zipline_cdc_and_beacon_external { big payload } ``` ## Cheour clientslist - [X] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced new API endpoints that let users list available joins and retrieve detailed join schema information. - Added enhanced configuration options to support complex join workflows. - New test cases for validating join listing and schema retrieval functionalities. - Added new constants for pagination and entity type handling. - **Improvements** - Standardized pagination and entity handling across cloud integrations, ensuring a consistent and reliable data listing experience. - Enhanced error handling and response formatting for join-related requests. - Expanded testing capabilities with additional dependencies and resource inclusion. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 01a853a commit ae35b82

File tree

24 files changed

+1208
-39
lines changed

24 files changed

+1208
-39
lines changed

api/src/main/scala/ai/chronon/api/Constants.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,4 +85,14 @@ object Constants {
8585
val GroupByKeyword = "group_bys"
8686
val StagingQueryKeyword = "staging_queries"
8787
val ModelKeyword = "models"
88+
89+
// KV store related constants
90+
// continuation key to help with list pagination
91+
val ContinuationKey: String = "continuation-key"
92+
93+
// Limit of max number of entries to return in a list call
94+
val ListLimit: String = "limit"
95+
96+
// List entity type
97+
val ListEntityType: String = "entity_type"
8898
}

cloud_aws/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ scala_library(
2424

2525
test_deps = [
2626
":cloud_aws_lib",
27+
"//api:lib",
2728
"//online:lib",
2829
maven_artifact("software.amazon.awssdk:dynamodb"),
2930
maven_artifact("software.amazon.awssdk:regions"),

cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
paour clientsage ai.chronon.integrations.aws
22

33
import ai.chronon.api.Constants
4+
import ai.chronon.api.Constants.{ContinuationKey, ListLimit}
45
import ai.chronon.api.ScalaJavaConversions._
56
import ai.chronon.online.KVStore
67
import ai.chronon.online.KVStore.GetResponse
@@ -36,7 +37,6 @@ import java.util.concurrent.ConcurrentHashMap
3637
import scala.concurrent.Future
3738
import scala.util.Success
3839
import scala.util.Try
39-
4040
import scala.collection.Seq
4141

4242
object DynamoDBKVStoreConstants {
@@ -49,12 +49,6 @@ object DynamoDBKVStoreConstants {
4949
// Optional field that indicates if this table is meant to be time sorted in Dynamo or not
5050
val isTimedSorted = "is-time-sorted"
5151

52-
// Limit of max number of entries to return in a list call
53-
val listLimit = "limit"
54-
55-
// continuation key to help with list pagination
56-
val continuationKey = "continuation-key"
57-
5852
// Name of the partition key column to use
5953
val partitionKeyColumn = "keyBytes"
6054

@@ -172,13 +166,13 @@ class DynamoDBKVStoreImpl(dynamoDbClient: DynamoDbClient) extends KVStore {
172166
}
173167

174168
override def list(request: ListRequest): Future[ListResponse] = {
175-
val listLimit = request.props.get(DynamoDBKVStoreConstants.listLimit) match {
169+
val listLimit = request.props.get(ListLimit) match {
176170
case Some(value: Int) => value
177171
case Some(value: String) => value.toInt
178172
case _ => 100
179173
}
180174

181-
val maybeExclusiveStartKey = request.props.get(continuationKey)
175+
val maybeExclusiveStartKey = request.props.get(ContinuationKey)
182176
val maybeExclusiveStartKeyAttribute = maybeExclusiveStartKey.map { k =>
183177
AttributeValue.builder.b(SdkBytes.fromByteArray(k.asInstanceOf[Array[Byte]])).build
184178
}
@@ -199,7 +193,7 @@ class DynamoDBKVStoreImpl(dynamoDbClient: DynamoDbClient) extends KVStore {
199193
case Success(scanResponse) if scanResponse.hasLastEvaluatedKey =>
200194
val lastEvalKey = scanResponse.lastEvaluatedKey().toScala.get(partitionKeyColumn)
201195
lastEvalKey match {
202-
case Some(av) => ListResponse(request, resultElements, Map(continuationKey -> av.b().asByteArray()))
196+
case Some(av) => ListResponse(request, resultElements, Map(ContinuationKey -> av.b().asByteArray()))
203197
case _ => noPagesLeftResponse
204198
}
205199
case _ => noPagesLeftResponse

cloud_aws/src/test/scala/ai/chronon/integrations/aws/DynamoDBKVStoreTest.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
paour clientsage ai.chronon.integrations.aws
22

3+
import ai.chronon.api.Constants.{ContinuationKey, ListLimit}
34
import ai.chronon.online.KVStore._
45
import com.amazonaws.services.dynamodbv2.local.main.ServerRunner
56
import com.amazonaws.services.dynamodbv2.local.server.DynamoDBProxyServer
@@ -124,16 +125,16 @@ class DynamoDBKVStoreTest extends AnyFlatSpec with BeforeAndAfterAll {
124125
putResults.foreach(r => r shouldBe true)
125126

126127
// call list - first call is only for 10 elements
127-
val listReq1 = ListRequest(dataset, Map(listLimit -> 10))
128+
val listReq1 = ListRequest(dataset, Map(ListLimit -> 10))
128129
val listResults1 = Await.result(kvStore.list(listReq1), 1.minute)
129-
listResults1.resultProps.contains(continuationKey) shouldBe true
130+
listResults1.resultProps.contains(ContinuationKey) shouldBe true
130131
validateExpectedListResponse(listResults1.values, 10)
131132

132133
// call list - with continuation key
133134
val listReq2 =
134-
ListRequest(dataset, Map(listLimit -> 100, continuationKey -> listResults1.resultProps(continuationKey)))
135+
ListRequest(dataset, Map(ListLimit -> 100, ContinuationKey -> listResults1.resultProps(ContinuationKey)))
135136
val listResults2 = Await.result(kvStore.list(listReq2), 1.minute)
136-
listResults2.resultProps.contains(continuationKey) shouldBe false
137+
listResults2.resultProps.contains(ContinuationKey) shouldBe false
137138
validateExpectedListResponse(listResults2.values, 100)
138139
}
139140

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
paour clientsage ai.chronon.integrations.cloud_gcp
22

3+
import ai.chronon.api.Constants.{ContinuationKey, ListEntityType, ListLimit}
34
import ai.chronon.api.Extensions.GroupByOps
45
import ai.chronon.api.Extensions.StringOps
56
import ai.chronon.api.Extensions.WindowOps
@@ -211,13 +212,14 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient,
211212
override def list(request: ListRequest): Future[ListResponse] = {
212213
logger.info(s"Performing list for ${request.dataset}")
213214

214-
val listLimit = request.props.get(BigTableKVStore.listLimit) match {
215+
val listLimit = request.props.get(ListLimit) match {
215216
case Some(value: Int) => value
216217
case Some(value: String) => value.toInt
217218
case _ => defaultListLimit
218219
}
219220

220-
val maybeStartKey = request.props.get(continuationKey)
221+
val maybeListEntityType = request.props.get(ListEntityType)
222+
val maybeStartKey = request.props.get(ContinuationKey)
221223

222224
val query = Query
223225
.create(mapDatasetToTable(request.dataset))
@@ -227,9 +229,15 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient,
227229
.filter(Filters.FILTERS.limit().cellsPerRow(1))
228230
.limit(listLimit)
229231

230-
// if we got a start row key, lets wire it up
231-
maybeStartKey.foreach { startKey =>
232-
query.range(ByteStringRange.unbounded().startOpen(ByteString.copyFrom(startKey.asInstanceOf[Array[Byte]])))
232+
(maybeStartKey, maybeListEntityType) match {
233+
case (Some(startKey), _) =>
234+
// we have a start key, we use that to piour clients up from where we left off
235+
query.range(ByteStringRange.unbounded().startOpen(ByteString.copyFrom(startKey.asInstanceOf[Array[Byte]])))
236+
case (None, Some(listEntityType)) =>
237+
val startRowKey = buildRowKey(s"$listEntityType/".getBytes(Charset.forName("UTF-8")), request.dataset)
238+
query.range(ByteStringRange.unbounded().startOpen(ByteString.copyFrom(startRowKey)))
239+
case _ =>
240+
logger.info("No start key or list entity type provided. Starting from the beginning")
233241
}
234242

235243
val startTs = System.currentTimeMillis()
@@ -253,7 +261,7 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient,
253261
if (listValues.size < listLimit) {
254262
Map.empty // last page, we're done
255263
} else
256-
Map(continuationKey -> listValues.last.keyBytes)
264+
Map(ContinuationKey -> listValues.last.keyBytes)
257265

258266
ListResponse(request, Success(listValues), propsMap)
259267

@@ -410,12 +418,6 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient,
410418

411419
object BigTableKVStore {
412420

413-
// continuation key to help with list pagination
414-
val continuationKey: String = "continuationKey"
415-
416-
// Limit of max number of entries to return in a list call
417-
val listLimit: String = "limit"
418-
419421
// Default list limit
420422
val defaultListLimit: Int = 100
421423

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
paour clientsage ai.chronon.integrations.cloud_gcp
22

3+
import ai.chronon.api.Constants.{ContinuationKey, GroupByKeyword, JoinKeyword, ListEntityType, ListLimit}
34
import ai.chronon.api.TilingUtils
45
import ai.chronon.online.KVStore.GetRequest
56
import ai.chronon.online.KVStore.GetResponse
@@ -176,21 +177,21 @@ class BigTableKVStoreTest extends AnyFlatSpec with BeforeAndAfter {
176177

177178
// let's try and read these
178179
val limit = 10
179-
val listReq1 = ListRequest(dataset, Map(listLimit -> limit))
180+
val listReq1 = ListRequest(dataset, Map(ListLimit -> limit))
180181

181182
val listResult1 = Await.result(kvStore.list(listReq1), 1.second)
182183
listResult1.values.isSuccess shouldBe true
183-
listResult1.resultProps.contains(BigTableKVStore.continuationKey) shouldBe true
184+
listResult1.resultProps.contains(ContinuationKey) shouldBe true
184185
val listValues1 = listResult1.values.get
185186
listValues1.size shouldBe limit
186187

187188
// another call, bigger limit
188189
val limit2 = 1000
189-
val continuationKey = listResult1.resultProps(BigTableKVStore.continuationKey)
190-
val listReq2 = ListRequest(dataset, Map(listLimit -> limit2, BigTableKVStore.continuationKey -> continuationKey))
190+
val continuationKey = listResult1.resultProps(ContinuationKey)
191+
val listReq2 = ListRequest(dataset, Map(ListLimit -> limit2, ContinuationKey -> continuationKey))
191192
val listResult2 = Await.result(kvStore.list(listReq2), 1.second)
192193
listResult2.values.isSuccess shouldBe true
193-
listResult2.resultProps.contains(BigTableKVStore.continuationKey) shouldBe false
194+
listResult2.resultProps.contains(ContinuationKey) shouldBe false
194195
val listValues2 = listResult2.values.get
195196
listValues2.size shouldBe (putReqs.size - limit)
196197

@@ -201,6 +202,53 @@ class BigTableKVStoreTest extends AnyFlatSpec with BeforeAndAfter {
201202
.toSet
202203
}
203204

205+
it should "list entity types with pagination" in {
206+
val dataset = "metadata"
207+
val kvStore = new BigTableKVStoreImpl(dataClient, adminClient)
208+
kvStore.create(dataset)
209+
210+
val putGrpByReqs = (0 until 50).map { i =>
211+
val key = s"$GroupByKeyword/gbkey-$i"
212+
val value = s"""{"name": "name-$i", "age": $i}"""
213+
PutRequest(key.getBytes, value.getBytes, dataset, None)
214+
}
215+
216+
val putJoinReqs = (0 until 50).map { i =>
217+
val key = s"$JoinKeyword/joinkey-$i"
218+
val value = s"""{"name": "name-$i", "age": $i}"""
219+
PutRequest(key.getBytes, value.getBytes, dataset, None)
220+
}
221+
222+
val putResults = Await.result(kvStore.multiPut(putGrpByReqs ++ putJoinReqs), 1.second)
223+
putResults.foreach(r => r shouldBe true)
224+
225+
// let's try and read just the joins
226+
val limit = 10
227+
val listReq1 = ListRequest(dataset, Map(ListLimit -> limit, ListEntityType -> JoinKeyword))
228+
229+
val listResult1 = Await.result(kvStore.list(listReq1), 1.second)
230+
listResult1.values.isSuccess shouldBe true
231+
listResult1.resultProps.contains(ContinuationKey) shouldBe true
232+
val listValues1 = listResult1.values.get
233+
listValues1.size shouldBe limit
234+
235+
// another call, bigger limit
236+
val limit2 = 1000
237+
val continuationKey = listResult1.resultProps(ContinuationKey)
238+
val listReq2 = ListRequest(dataset, Map(ListLimit -> limit2, ContinuationKey -> continuationKey))
239+
val listResult2 = Await.result(kvStore.list(listReq2), 1.second)
240+
listResult2.values.isSuccess shouldBe true
241+
listResult2.resultProps.contains(ContinuationKey) shouldBe false
242+
val listValues2 = listResult2.values.get
243+
listValues2.size shouldBe (putJoinReqs.size - limit)
244+
245+
// lets collect all the keys and confirm we got everything
246+
val allKeys = (listValues1 ++ listValues2).map(v => new String(v.keyBytes, StandardCharsets.UTF_8))
247+
allKeys.toSet shouldBe putJoinReqs
248+
.map(r => new String(buildRowKey(r.keyBytes, r.dataset), StandardCharsets.UTF_8))
249+
.toSet
250+
}
251+
204252
it should "multiput failures" in {
205253
val moour clientsDataClient = moour clients[BigtableDataClient](withSettings().moour clientsMaker("moour clients-maker-inline"))
206254
val moour clientsAdminClient = moour clients[BigtableTableAdminClient]

distribution/publish_gcp_docker_images.sh renamed to distribution/publish_docker_images.sh

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@ cd $CHRONON_ROOT_DIR
3030
echo "Building jars"
3131

3232
bazel build //cloud_gcp:cloud_gcp_lib_deploy.jar
33+
bazel build //cloud_aws:cloud_aws_lib_deploy.jar
3334
bazel build //service:service_assembly_deploy.jar
3435

3536
CLOUD_GCP_JAR="$CHRONON_ROOT_DIR/bazel-bin/cloud_gcp/cloud_gcp_lib_deploy.jar"
37+
CLOUD_AWS_JAR="$CHRONON_ROOT_DIR/bazel-bin/cloud_aws/cloud_aws_lib_deploy.jar"
3638
SERVICE_JAR="$CHRONON_ROOT_DIR/bazel-bin/service/service_assembly_deploy.jar"
3739

3840
if [ ! -f "$CLOUD_GCP_JAR" ]; then
@@ -45,6 +47,11 @@ if [ ! -f "$SERVICE_JAR" ]; then
4547
exit 1
4648
fi
4749

50+
if [ ! -f "$CLOUD_AWS_JAR" ]; then
51+
echo "$CLOUD_AWS_JAR not found"
52+
exit 1
53+
fi
54+
4855
# We copy to build output as the doour clientser build can't access the bazel-bin (as its a symlink)
4956
echo "Copying jars to build_output"
5057
mkdir -p build_output

online/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ test_deps = [
7070
scala_library(
7171
name = "test_lib",
7272
srcs = glob(["src/test/**/*.scala"]),
73+
resources = glob(["src/test/resources/**/*"]),
7374
format = select({
7475
"//tools/config:scala_2_13": False, # Disable for 2.13
7576
"//conditions:default": True, # Enable for other versions
@@ -81,6 +82,7 @@ scala_library(
8182
scala_test_suite(
8283
name = "tests",
8384
srcs = glob(["src/test/**/*.scala"]),
85+
resources = glob(["src/test/resources/**/*"]),
8486
jvm_flags = _JVM_FLAGS_FOR_ACCESSING_BASE_JAVA_CLASSES,
8587
visibility = ["//visibility:public"],
8688
deps = test_deps + [":test_lib"],

online/src/main/java/ai/chronon/online/JavaFetcher.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
paour clientsage ai.chronon.online;
1818

19+
import ai.chronon.api.ScalaJavaConversions;
1920
import ai.chronon.online.fetcher.Fetcher;
2021
import ai.chronon.online.fetcher.FetcherResponseWithTs;
2122
import scala.collection.Iterator;
@@ -25,6 +26,7 @@
2526
import scala.compat.java8.FutureConverters;
2627
import scala.concurrent.Future;
2728
import scala.concurrent.ExecutionContext;
29+
import scala.util.Try;
2830

2931
import java.util.ArrayList;
3032
import java.util.List;
@@ -170,6 +172,18 @@ public CompletableFuture<List<JavaResponse>> fetchJoin(List<JavaRequest> request
170172
return convertResponsesWithTs(scalaResponses, false, startTs);
171173
}
172174

175+
public CompletableFuture<List<String>> listJoins(boolean isOnline) {
176+
// Get responses from the fetcher
177+
Future<Seq<String>> scalaResponses = this.fetcher.metadataStore().listJoins(isOnline);
178+
// convert to Java friendly types
179+
return FutureConverters.toJava(scalaResponses).toCompletableFuture().thenApply(ScalaJavaConversions::toJava);
180+
}
181+
182+
public JTry<JavaJoinSchemaResponse> fetchJoinSchema(String joinName) {
183+
Try<Fetcher.JoinSchemaResponse> scalaResponse = this.fetcher.fetchJoinSchema(joinName);
184+
return JTry.fromScala(scalaResponse).map(JavaJoinSchemaResponse::new);
185+
}
186+
173187
private void instrument(List<String> requestNames, boolean isGroupBy, String metricName, Long startTs) {
174188
long endTs = System.currentTimeMillis();
175189
for (String s : requestNames) {
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
paour clientsage ai.chronon.online;
2+
3+
import ai.chronon.online.fetcher.Fetcher;
4+
5+
public class JavaJoinSchemaResponse {
6+
public String joinName;
7+
public String keySchema;
8+
public String valueSchema;
9+
public String schemaHash;
10+
11+
public JavaJoinSchemaResponse(String joinName, String keySchema, String valueSchema, String schemaHash) {
12+
this.joinName = joinName;
13+
this.keySchema = keySchema;
14+
this.valueSchema = valueSchema;
15+
this.schemaHash = schemaHash;
16+
}
17+
18+
public JavaJoinSchemaResponse(Fetcher.JoinSchemaResponse scalaResponse){
19+
this.joinName = scalaResponse.joinName();
20+
this.keySchema = scalaResponse.keySchema();
21+
this.valueSchema = scalaResponse.valueSchema();
22+
this.schemaHash = scalaResponse.schemaHash();
23+
}
24+
25+
public Fetcher.JoinSchemaResponse toScala() {
26+
return new Fetcher.JoinSchemaResponse(
27+
joinName,
28+
keySchema,
29+
valueSchema,
30+
schemaHash);
31+
}
32+
}

online/src/main/scala/ai/chronon/online/JoinCodec.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ import com.google.gson.Gson
3434
case class JoinCodec(conf: JoinOps,
3535
keySchema: StructType,
3636
baseValueSchema: StructType,
37-
keyCodec: serde.AvroCodec,
38-
baseValueCodec: serde.AvroCodec)
37+
keyCodec: AvroCodec,
38+
baseValueCodec: AvroCodec)
3939
extends Serializable {
4040

4141
@transient lazy val valueSchema: StructType = {
@@ -89,7 +89,7 @@ case class JoinCodec(conf: JoinOps,
8989

9090
object JoinCodec {
9191

92-
def buildLoggingSchema(joinName: String, keyCodec: serde.AvroCodec, valueCodec: serde.AvroCodec): String = {
92+
def buildLoggingSchema(joinName: String, keyCodec: AvroCodec, valueCodec: AvroCodec): String = {
9393
val schemaMap = Map(
9494
"join_name" -> joinName,
9595
"key_schema" -> keyCodec.schemaStr,

online/src/main/scala/ai/chronon/online/Metrics.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ object Metrics {
2828
type Environment = String
2929
val MetaDataFetching = "metadata.fetch"
3030
val JoinFetching = "join.fetch"
31+
val JoinSchemaFetching = "join.schema.fetch"
3132
val GroupByFetching = "group_by.fetch"
3233
val GroupByUpload = "group_by.upload"
3334
val GroupByStreaming = "group_by.streaming"

0 commit comments

Comments
 (0)