Skip to content

Commit 01b62ea

Browse files
authored
our clients ***REMOVED*** / ***REMOVED*** shading reverts and other fixes (#544)
## Summary We decided to push the bulk of the shading logic into a script on the our clients side. Shading our BT jars was painful as they were shading things again so we decided to roll that baour clients. We still have the two forked bazel targets (cloud_gcp_lib and cloud_gcp_embedded_lib) with the embedded one being used by our clients. Also made a couple of other updates: * Update the thrift decoder class to use our vendored thrift -> A couple of follow up AIs on the thrift front would be to: a) [add thrift version guards while publishing locally](https://linear.app/zipline-ai/issue/ZIP-651/add-thrift-version-guards-while-publishing-locally) b) revisit using shading instead of vendoring our thrift (more long term) * Fetcher changes to use flag store to detect tiling / not for now - I have a Flink PR upcoming that yanks out the untiled implementation and can put up a follow up to drop the flag store references to use untiled and default to tiled across the board. ## Cheour clientslist - [ ] Added Unit Tests - [X] Covered by existing CI - [X] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Refactor** - Updated dependency imports to use the official versions of Bigtable, YAML, and Thrift libraries, ensuring improved stability and compatibility. - **New Features** - Introduced a tiling configuration cheour clients for streaming requests, providing more flexible data handling. - Added a method to cheour clients if tiling is enabled in the FetchContext class. - **Chores** - Streamlined build configurations by removing deprecated dependencies and leveraging direct Maven artifacts, enabling clients to manage specific dependency versions. - Introduced a new flag store for managing tiling functionality during tests. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 0c004ea commit 01b62ea

File tree

15 files changed

+53
-73
lines changed

15 files changed

+53
-73
lines changed

api/src/main/scala/ai/chronon/api/Extensions.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -591,8 +591,6 @@ object Extensions {
591591
None
592592
}
593593

594-
def tilingFlag: Boolean = servingFlagValue("tiling").exists(_.toLowerCase() == "true")
595-
596594
def dontThrowOnDecodeFailFlag: Boolean = servingFlagValue("decode.throw_on_fail").exists(_.toLowerCase() == "false")
597595

598596
// build left streaming query for join source runner

api/src/test/scala/ai/chronon/api/test/ExtensionsTest.scala

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -117,30 +117,4 @@ class ExtensionsTest extends AnyFlatSpec {
117117
assertEquals(4, keys.size)
118118
}
119119

120-
it should "is tiling enabled" in {
121-
def buildGroupByWithServingFlags(flags: Map[String, String] = null): GroupByOps = {
122-
123-
val execInfo: ExecutionInfo = if (flags != null) {
124-
new ExecutionInfo()
125-
.setConf(new ConfigProperties().setServing(flags.toJava))
126-
} else {
127-
null
128-
}
129-
130-
Builders.GroupBy(
131-
metaData = Builders.MetaData(name = "featureGroupName", executionInfo = execInfo)
132-
)
133-
134-
}
135-
136-
// customJson not set defaults to false
137-
assertFalse(buildGroupByWithServingFlags().tilingFlag)
138-
assertFalse(buildGroupByWithServingFlags(Map.empty).tilingFlag)
139-
140-
val trueGb = buildGroupByWithServingFlags(Map("tiling" -> "true"))
141-
assertTrue(trueGb.tilingFlag)
142-
assertFalse(buildGroupByWithServingFlags(Map("tiling" -> "false")).tilingFlag)
143-
assertFalse(buildGroupByWithServingFlags(Map("tiling" -> "invalid")).tilingFlag)
144-
145-
}
146120
}

cloud_gcp/BUILD.bazel

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,6 @@ shared_deps = [
55
"//online:lib",
66
"//spark:lib",
77
"//tools/build_rules/spark:spark-exec",
8-
":shaded_bigtable",
9-
":shaded_bigtable_proto",
10-
":shaded_bigtable_admin_proto",
11-
":shaded_grpc_bigtable",
12-
":shaded_snakeyaml",
138
maven_artifact_with_suffix("org.scala-lang.modules:scala-java8-compat"),
149
maven_artifact_with_suffix("org.json4s:json4s-core"),
1510
maven_artifact_with_suffix("org.json4s:json4s-jaour clientsson"),
@@ -18,6 +13,7 @@ shared_deps = [
1813
maven_artifact_with_suffix("org.rogach:scallop"),
1914
maven_artifact("com.google.cloud:google-cloud-core"),
2015
maven_artifact("com.google.cloud:google-cloud-bigquery"),
16+
maven_artifact("com.google.cloud:google-cloud-bigtable"),
2117
maven_artifact("com.google.cloud:google-cloud-pubsub"),
2218
maven_artifact("com.google.cloud:google-cloud-dataproc"),
2319
maven_artifact("com.google.cloud.bigdataoss:gcsio"),
@@ -31,8 +27,10 @@ shared_deps = [
3127
maven_artifact("com.google.api:gax"),
3228
maven_artifact("com.google.guava:guava"),
3329
maven_artifact("com.google.protobuf:protobuf-java"),
30+
maven_artifact("org.yaml:snakeyaml"),
3431
maven_artifact("io.grpc:grpc-netty-shaded"),
3532
maven_artifact("org.slf4j:slf4j-api"),
33+
maven_artifact("ch.qos.reload4j:reload4j"),
3634
maven_artifact("org.threeten:threetenbp"),
3735
maven_artifact("org.apache.kafka:kafka-clients"),
3836
maven_artifact("com.google.cloud.spark:spark-3.5-bigquery"),

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,16 @@ import com.google.cloud.bigquery.Job
2222
import com.google.cloud.bigquery.JobId
2323
import com.google.cloud.bigquery.JobInfo
2424
import com.google.cloud.bigquery.QueryJobConfiguration
25-
import com.google.cloud.shaded_bigtable.admin.v2.BigtableTableAdminClient
26-
import com.google.cloud.shaded_bigtable.admin.v2.models.CreateTableRequest
27-
import com.google.cloud.shaded_bigtable.admin.v2.models.GCRules
28-
import com.google.cloud.shaded_bigtable.data.v2.BigtableDataClient
29-
import com.google.cloud.shaded_bigtable.data.v2.models.Filters
30-
import com.google.cloud.shaded_bigtable.data.v2.models.Query
31-
import com.google.cloud.shaded_bigtable.data.v2.models.Range.ByteStringRange
32-
import com.google.cloud.shaded_bigtable.data.v2.models.Range.TimestampRange
33-
import com.google.cloud.shaded_bigtable.data.v2.models.RowMutation
34-
import com.google.cloud.shaded_bigtable.data.v2.models.{TableId => BTTableId}
25+
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient
26+
import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest
27+
import com.google.cloud.bigtable.admin.v2.models.GCRules
28+
import com.google.cloud.bigtable.data.v2.BigtableDataClient
29+
import com.google.cloud.bigtable.data.v2.models.Filters
30+
import com.google.cloud.bigtable.data.v2.models.Query
31+
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange
32+
import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange
33+
import com.google.cloud.bigtable.data.v2.models.RowMutation
34+
import com.google.cloud.bigtable.data.v2.models.{TableId => BTTableId}
3535
import com.google.protobuf.ByteString
3636
import org.slf4j.Logger
3737
import org.slf4j.LoggerFactory

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import com.google.api.gax.rpc.ApiException
99
import com.google.cloud.dataproc.v1._
1010
import org.json4s._
1111
import org.json4s.jaour clientsson.JsonMethods._
12-
import org.yaml.shaded_snakeyaml.Yaml
12+
import org.yaml.snakeyaml.Yaml
1313

1414
import scala.io.Source
1515

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,11 @@ import ai.chronon.online.Serde
1111
import ai.chronon.online.serde.AvroSerde
1212
import com.google.api.gax.core.NoCredentialsProvider
1313
import com.google.cloud.bigquery.BigQueryOptions
14-
import com.google.cloud.shaded_bigtable.admin.v2.BigtableTableAdminClient
15-
import com.google.cloud.shaded_bigtable.admin.v2.BigtableTableAdminSettings
16-
import com.google.cloud.shaded_bigtable.data.v2.BigtableDataClient
17-
import com.google.cloud.shaded_bigtable.data.v2.BigtableDataSettings
18-
import com.google.cloud.shaded_bigtable.data.v2.stub.metrics.NoopMetricsProvider
14+
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient
15+
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings
16+
import com.google.cloud.bigtable.data.v2.BigtableDataClient
17+
import com.google.cloud.bigtable.data.v2.BigtableDataSettings
18+
import com.google.cloud.bigtable.data.v2.stub.metrics.NoopMetricsProvider
1919

2020
import java.util
2121

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@ import com.google.api.core.ApiFutures
1010
import com.google.api.gax.core.NoCredentialsProvider
1111
import com.google.api.gax.rpc.ServerStreamingCallable
1212
import com.google.api.gax.rpc.UnaryCallable
13-
import com.google.cloud.shaded_bigtable.admin.v2.BigtableTableAdminClient
14-
import com.google.cloud.shaded_bigtable.admin.v2.BigtableTableAdminSettings
15-
import com.google.cloud.shaded_bigtable.data.v2.BigtableDataClient
16-
import com.google.cloud.shaded_bigtable.data.v2.BigtableDataSettings
17-
import com.google.cloud.shaded_bigtable.data.v2.models.Query
18-
import com.google.cloud.shaded_bigtable.data.v2.models.Row
19-
import com.google.cloud.shaded_bigtable.data.v2.models.RowMutation
13+
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient
14+
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings
15+
import com.google.cloud.bigtable.data.v2.BigtableDataClient
16+
import com.google.cloud.bigtable.data.v2.BigtableDataSettings
17+
import com.google.cloud.bigtable.data.v2.models.Query
18+
import com.google.cloud.bigtable.data.v2.models.Row
19+
import com.google.cloud.bigtable.data.v2.models.RowMutation
2020
import com.google.cloud.bigtable.emulator.v2.Emulator
2121
import org.moour clientsito.ArgumentMatchers.any
2222
import org.moour clientsito.Moour clientsito.when

online/src/main/java/ai/chronon/online/ThriftDecoder.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717
paour clientsage ai.chronon.online;
1818

1919
import ai.chronon.api.DataType;
20-
import org.apache.thrift.TBase;
21-
import org.apache.thrift.TFieldIdEnum;
22-
import org.apache.thrift.meta_data.FieldMetaData;
23-
import org.apache.thrift.meta_data.StructMetaData;
24-
import org.apache.thrift.protocol.TType;
20+
import ai.chronon.api.thrift.TBase;
21+
import ai.chronon.api.thrift.TFieldIdEnum;
22+
import ai.chronon.api.thrift.meta_data.FieldMetaData;
23+
import ai.chronon.api.thrift.meta_data.StructMetaData;
24+
import ai.chronon.api.thrift.protocol.TType;
2525

2626
import java.io.Serializable;
2727
import java.nio.ByteBuffer;

online/src/main/scala/ai/chronon/online/fetcher/FetchContext.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,10 @@ case class FetchContext(kvStore: KVStore,
1616
def getOrCreateExecutionContext: ExecutionContext = {
1717
Option(executionContextOverride).getOrElse(FlexibleExecutionContext.buildExecutionContext)
1818
}
19+
20+
def isTilingEnabled: Boolean = {
21+
Option(flagStore)
22+
.map(_.isSet(FlagStoreConstants.TILING_ENABLED, Map.empty[String, String].toJava))
23+
.exists(_.asInstanceOf[Boolean])
24+
}
1925
}

online/src/main/scala/ai/chronon/online/fetcher/GroupByFetcher.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ class GroupByFetcher(fetchContext: FetchContext, metadataStore: MetadataStore)
8686
case Accuracy.TEMPORAL =>
8787
// Build a tile key for the streaming request
8888
// When we build support for layering, we can expand this out into a utility that builds n tile keys for n layers
89-
val keyBytes = if (groupByServingInfo.groupByOps.tilingFlag) {
89+
val keyBytes = if (fetchContext.isTilingEnabled) {
9090

9191
val tileKey = TilingUtils.buildTileKey(
9292
groupByServingInfo.groupByOps.streamingDataset,

online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ class GroupByResponseHandler(fetchContext: FetchContext, metadataStore: Metadata
112112
val batchIr: FinalBatchIr =
113113
getBatchIrFromBatchResponse(batchResponses, batchBytes, servingInfo, toBatchIr, requestContext.keys)
114114

115-
if (servingInfo.groupByOps.tilingFlag) {
115+
if (fetchContext.isTilingEnabled) {
116116
mergeTiledIrsFromStreaming(requestContext.queryTimeMs, servingInfo, streamingResponses, aggregator, batchIr)
117117
} else {
118118
mergeRawEventsFromStreaming(requestContext.queryTimeMs,

spark/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ scala_library(
1212
"//api:thrift_java",
1313
"//online:lib",
1414
"//tools/build_rules/spark:spark-exec",
15-
":shaded_snakeyaml",
1615
maven_artifact("com.fasterxml.jaour clientsson.core:jaour clientsson-core"),
1716
maven_artifact("com.fasterxml.jaour clientsson.core:jaour clientsson-databind"),
1817
maven_artifact_with_suffix("com.fasterxml.jaour clientsson.module:jaour clientsson-module-scala"),
@@ -34,6 +33,7 @@ scala_library(
3433
maven_artifact("org.apache.datasketches:datasketches-java"),
3534
maven_artifact_with_suffix("org.rogach:scallop"),
3635
maven_artifact("io.netty:netty-all"),
36+
maven_artifact("org.yaml:snakeyaml"),
3737
maven_artifact("io.netty:netty-transport"),
3838
maven_artifact("io.netty:netty-handler"),
3939
maven_artifact("io.netty:netty-buffer"),

spark/src/main/scala/ai/chronon/spark/Driver.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ import org.rogach.scallop.ScallopOption
5252
import org.rogach.scallop.Subcommand
5353
import org.slf4j.Logger
5454
import org.slf4j.LoggerFactory
55-
import org.yaml.shaded_snakeyaml.Yaml
55+
import org.yaml.snakeyaml.Yaml
5656

5757
import java.io.File
5858
import java.nio.file.Files

spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTest.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -559,12 +559,9 @@ class FetcherTest extends AnyFlatSpec {
559559
name = "unit_test/fetcher_tiled_gb",
560560
namespace = namespace,
561561
team = "chronon",
562-
customJson = groupByCustomJson.orNull,
563-
executionInfo = new ExecutionInfo()
564-
.setConf(new ConfigProperties().setServing(Map("tiling" -> "true").toJava))
562+
customJson = groupByCustomJson.orNull
565563
)
566564
)
567-
groupBy.tilingFlag shouldBe true
568565

569566
val joinConf = Builders.Join(
570567
left = leftSource,
@@ -586,7 +583,18 @@ class FetcherTest extends AnyFlatSpec {
586583
val kvStoreFunc = () => OnlineUtils.buildInMemoryKVStore("FetcherTest")
587584
val inMemoryKvStore = kvStoreFunc()
588585

586+
val tilingEnabledFlagStore = new FlagStore {
587+
override def isSet(flagName: String, attributes: util.Map[String, String]): lang.Boolean = {
588+
if (flagName == FlagStoreConstants.TILING_ENABLED) {
589+
enableTiling
590+
} else {
591+
false
592+
}
593+
}
594+
}
595+
589596
val moour clientsApi = new Moour clientsApi(kvStoreFunc, namespace)
597+
moour clientsApi.setFlagStore(tilingEnabledFlagStore)
590598

591599
val joinedDf = new ai.chronon.spark.Join(joinConf, endDs, tableUtils).computeJoin()
592600
val joinTable = s"$namespace.join_test_expected_${joinConf.metaData.cleanName}"

tools/build_rules/cloud_gcp_embedded/BUILD

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,5 @@ java_binary(
88
maven_artifact("org.yaml:snakeyaml"),
99
# pull out some slf4j-impl dependencies - these can be included at the application deploy target level if needed
1010
maven_artifact("org.apache.logging.log4j:log4j-slf4j2-impl"),
11-
# Exclude grpc deps as clients might want to use their own version
12-
maven_artifact("io.grpc:grpc-core:1.69.0"),
13-
maven_artifact("io.grpc:grpc-inprocess:1.69.0"),
14-
maven_artifact("io.grpc:grpc-netty-shaded"),
1511
],
1612
)

0 commit comments

Comments
 (0)