-
Notifications
You must be signed in to change notification settings - Fork 0
Backport service changes to zipline fork #70
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a94366e
4c377b5
35c18f6
7254519
7225da0
4f61c89
5b00465
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -30,6 +30,8 @@ lazy val jackson_2_15 = "2.15.2" | |||||
lazy val avro_1_11 = "1.11.2" | ||||||
lazy val circeVersion = "0.14.9" | ||||||
lazy val deltaVersion = "3.2.0" | ||||||
lazy val slf4jApiVersion = "2.0.12" | ||||||
lazy val logbackClassicVersion = "1.5.6" | ||||||
|
||||||
// skip tests on assembly - uncomment if builds become slow | ||||||
// ThisBuild / assembly / test := {} | ||||||
|
@@ -53,7 +55,7 @@ inThisBuild( | |||||
lazy val supportedVersions = List(scala_2_12) // List(scala211, scala212, scala213) | ||||||
|
||||||
lazy val root = (project in file(".")) | ||||||
.aggregate(api, aggregator, online, spark, flink, cloud_gcp, cloud_aws, hub) | ||||||
.aggregate(api, aggregator, online, spark, flink, cloud_gcp, cloud_aws, hub, service) | ||||||
.settings(name := "chronon") | ||||||
|
||||||
val spark_sql = Seq( | ||||||
|
@@ -278,11 +280,61 @@ lazy val hub = (project in file("hub")) | |||||
), | ||||||
// Ensure consistent versions of logging libraries | ||||||
dependencyOverrides ++= Seq( | ||||||
"org.slf4j" % "slf4j-api" % "1.7.36", | ||||||
"ch.qos.logback" % "logback-classic" % "1.2.13" | ||||||
"org.slf4j" % "slf4j-api" % slf4jApiVersion, | ||||||
"ch.qos.logback" % "logback-classic" % logbackClassicVersion | ||||||
) | ||||||
) | ||||||
|
||||||
lazy val service = (project in file("service")) | ||||||
.dependsOn(online) | ||||||
.settings( | ||||||
Comment on lines
+288
to
+290
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Add cross-compilation settings The service module should specify cross-compilation settings like other modules to ensure compatibility across Scala versions. lazy val service = (project in file("service"))
.dependsOn(online)
.settings(
+ crossScalaVersions := supportedVersions,
assembly / assemblyJarName := s"${name.value}-${version.value}.jar",
|
||||||
assembly / assemblyJarName := s"${name.value}-${version.value}.jar", | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Remove duplicate assemblyJarName setting The assemblyJarName is defined twice in the service project settings. .settings(
assembly / assemblyJarName := s"${name.value}-${version.value}.jar",
assembly / artifact := {
val art = (assembly / artifact).value
art.withClassifier(Some("assembly"))
},
addArtifact(assembly / artifact, assembly),
libraryDependencies ++= Seq(
// ... dependencies ...
),
- assembly / assemblyJarName := s"${name.value}-${version.value}.jar", Also applies to: 305-305 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good observation - what happens if we don't specify this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @nikhil-zlai, if we don't specify Not specifying a custom assembly / assemblyJarName := s"${name.value}-${version.value}.jar" we ensure that the assembled JAR file includes both the project name and version, making it uniquely identifiable and preventing naming conflicts. This helps maintain clear and consistent artifact naming across different builds. |
||||||
assembly / artifact := { | ||||||
val art = (assembly / artifact).value | ||||||
art.withClassifier(Some("assembly")) | ||||||
}, | ||||||
addArtifact(assembly / artifact, assembly), | ||||||
libraryDependencies ++= Seq( | ||||||
"io.vertx" % "vertx-core" % "4.5.10", | ||||||
"io.vertx" % "vertx-web" % "4.5.10", | ||||||
"io.vertx" % "vertx-config" % "4.5.10", | ||||||
"ch.qos.logback" % "logback-classic" % logbackClassicVersion, | ||||||
"org.slf4j" % "slf4j-api" % slf4jApiVersion, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we're at the point of commiting to web frameworks, could we make a call on the logging implementation we're using project-wide? If we're using logback for app logging we might as well do the same for spark etc. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. although I'd slightly prefer log4j2 (for the sole reason of not wanting to write There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't mind swapping to logj2 if we can mimic all the settings we have going in the logback.xml. I like the settings to limit file sizes, roll over, write to log + stdout async. I don't really think its the end of the world though if we do have different configs for logging for spark jobs & services. The considerations are fairly different so its fine if they're separate I think. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as long as we don't combine classpaths between hub and spark in the final version of this it's 👍 with me There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lets do log4j2 everywhere - preferably with mostly the same settings/pattern file in all environments. Hunting these log lib class path inconsistencies is gnarly. Also less things for folks to learn. I like logback's functionality and log4j2 syntax. I think spark likes log4j2 as well - as their recommended logging lib. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. spark uses slf4j so it's technically compatible with either - although if we were doing logback for spark we have to be sure to do the correct classpath sanitization in the spark builds. I do like log4j2's syntax. They do have size-based appending: https://logging.apache.org/log4j/2.x/manual/appenders/rolling-file.html There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lets address this in a follow up - I don't mind swapping to log4j2 since you folks seem to be in favor of it. I don't want to work through the testing of the configs etc again as part of the critical path of this PR. It look a little iterating and I don't think it's worth re-spending the time immediately, I / someone can pick this up async. |
||||||
"com.typesafe" % "config" % "1.4.3", | ||||||
// force netty versions -> without this we conflict with the versions pulled in from | ||||||
// our online module's spark deps which causes the web-app to not serve up content | ||||||
"io.netty" % "netty-all" % "4.1.111.Final", | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Update Netty version The current Netty version might have security vulnerabilities. -"io.netty" % "netty-all" % "4.1.111.Final",
+"io.netty" % "netty-all" % "4.1.112.Final", 📝 Committable suggestion
Suggested change
|
||||||
// wire up metrics using micro meter and statsd | ||||||
"io.vertx" % "vertx-micrometer-metrics" % "4.5.10", | ||||||
"io.micrometer" % "micrometer-registry-statsd" % "1.13.6", | ||||||
"junit" % "junit" % "4.13.2" % Test, | ||||||
"com.novocode" % "junit-interface" % "0.11" % Test, | ||||||
"org.mockito" % "mockito-core" % "5.12.0" % Test, | ||||||
"io.vertx" % "vertx-unit" % "4.5.10" % Test, | ||||||
), | ||||||
// Assembly settings | ||||||
assembly / assemblyJarName := s"${name.value}-${version.value}.jar", | ||||||
|
||||||
// Main class configuration | ||||||
// We use a custom launcher to help us wire up our statsd metrics | ||||||
Compile / mainClass := Some("ai.chronon.service.ChrononServiceLauncher"), | ||||||
assembly / mainClass := Some("ai.chronon.service.ChrononServiceLauncher"), | ||||||
|
||||||
// Merge strategy for assembly | ||||||
assembly / assemblyMergeStrategy := { | ||||||
case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard | ||||||
case PathList("META-INF", xs @ _*) => MergeStrategy.first | ||||||
case PathList("javax", "activation", xs @ _*) => MergeStrategy.first | ||||||
case PathList("org", "apache", "logging", xs @ _*) => MergeStrategy.first | ||||||
case PathList("org", "slf4j", xs @ _*) => MergeStrategy.first | ||||||
case "application.conf" => MergeStrategy.concat | ||||||
case "reference.conf" => MergeStrategy.concat | ||||||
case x => | ||||||
val oldStrategy = (assembly / assemblyMergeStrategy).value | ||||||
oldStrategy(x) | ||||||
} | ||||||
) | ||||||
|
||||||
ThisBuild / assemblyMergeStrategy := { | ||||||
case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard | ||||||
case PathList("META-INF", _*) => MergeStrategy.filterDistinctLines | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
# Chronon Feature Fetching Service | ||
|
||
The feature service module consists of code to bring up a service that provides a thin shim around the Fetcher code. This | ||
is meant to aid Chronon adopters who either need a quicker way to get a feature serving layer up and running or need to | ||
build a way to retrieve features and typically work in a non-JVM based organization. | ||
|
||
## Core Technology | ||
|
||
The Chronon Feature Service is built on top of the [Vert.x](https://vertx.io/) JVM framework. Vert.x is a high-performance | ||
web framework which supports HTTP and gRPC based services. | ||
|
||
## Running locally | ||
|
||
To build the service sub-module: | ||
```bash | ||
~/workspace/chronon $ sbt "project service" clean assembly | ||
``` | ||
|
||
To test out the service, you also need to build a concrete instantiation of the [Api](https://github.com/airbnb/chronon/blob/main/online/src/main/scala/ai/chronon/online/Api.scala#L187). | ||
We can leverage the [quickstart Mongo API](https://github.com/airbnb/chronon/tree/main/quickstart/mongo-online-impl) for this: | ||
```bash | ||
~/workspace/chronon $ cd quickstart/mongo-online-impl | ||
~/workspace/chronon/quickstart/mongo-online-impl $ sbt assembly | ||
... | ||
[success] Total time: 1 s, completed Nov 6, 2024, 2:35:26 PM | ||
``` | ||
This command will write out a file in the target/scala-2.12 sub-directory. | ||
|
||
We can now use this to start up the feature service: | ||
```bash | ||
~/workspace/chronon $ java -jar service/target/scala-2.12/service-*.jar run ai.chronon.service.FetcherVerticle \ | ||
-Dserver.port=9000 -conf service/src/main/resources/example_config.json | ||
... | ||
14:39:26.626 [vert.x-eventloop-thread-1] INFO a.chronon.service.WebServiceVerticle - HTTP server started on port 9000 | ||
14:39:26.627 [vert.x-eventloop-thread-0] INFO i.v.c.i.l.c.VertxIsolatedDeployer - Succeeded in deploying verticle | ||
``` | ||
|
||
A few things to call out so you can customize: | ||
- Choose your port (this is where you'll hit your webservice with traffic) | ||
- Update the example_config.json (specifically confirm the path to the mongo-online-impl assembly jar matches your setup) | ||
|
||
Comment on lines
+38
to
+41
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Document configuration schema The example_config.json is referenced but its schema is not documented. Consider:
|
||
If you'd like some real data to query from the feature service, make sure to run through the relevant steps of the | ||
[Quickstart - Online Flows](https://chronon.ai/getting_started/Tutorial.html#online-flows) tutorial. | ||
|
||
Some examples to curl the webservice: | ||
```bash | ||
$ curl 'http://localhost:9000/ping' | ||
$ curl 'http://localhost:9000/config' | ||
$ curl -X POST 'http://localhost:9000/v1/fetch/join/quickstart%2Ftraining_set.v2' -H 'Content-Type: application/json' -d '[{"user_id": "5"}]' | ||
``` | ||
|
||
## Metrics | ||
|
||
The Vert.x feature service relies on the same statsd host / port coordinates as the rest of the Chronon project - | ||
[Metrics](https://github.com/airbnb/chronon/blob/main/online/src/main/scala/ai/chronon/online/Metrics.scala#L135). When configured correctly, | ||
the service will emit metrics captured by [Vert.x](https://vertx.io/docs/vertx-micrometer-metrics/java/#_http_client), JVM metrics as well as metrics | ||
captured by existing Chronon Fetcher code. | ||
|
||
To view these metrics for your locally running feature service: | ||
- Install the [statsd-logger](https://github.com/jimf/statsd-logger) npm module (`npm install -g statsd-logger`) | ||
- Run the command - `statsd-logger` | ||
|
||
Now you should see metrics of the format: | ||
```bash | ||
$ statsd-logger | ||
Server listening on 0.0.0.0:8125 | ||
StatsD Metric: jvm.buffer.memory.used 12605920|g|#statistic:value,id:direct | ||
StatsD Metric: jvm.threads.states 0|g|#statistic:value,state:blocked | ||
StatsD Metric: jvm.memory.used 8234008|g|#statistic:value,area:nonheap,id:Compressed Class Space | ||
StatsD Metric: jvm.threads.states 19|g|#statistic:value,state:runnable | ||
StatsD Metric: system.load.average.1m 1.504883|g|#statistic:value | ||
StatsD Metric: vertx.http.server.active.requests 0|g|#statistic:value,method:GET,path:/ping | ||
StatsD Metric: ai.zipline.join.fetch.join_request.count 1|c|#null,null,null,null,environment:join.fetch,owner:quickstart,team:quickstart,production:false,join:quickstart_training_set_v2 | ||
StatsD Metric: ai.zipline.join.fetch.group_by_request.count 1|c|#null,null,accuracy:SNAPSHOT,environment:join.fetch,owner:quickstart,team:quickstart,production:false,group_by:quickstart_purchases_v1,join:quickstart_training_set_v2 | ||
... | ||
``` | ||
|
||
## Features Lookup Response Structure | ||
|
||
The /v1/fetch/join and /v1/fetch/groupby endpoints are bulkGet endpoints (against a single GroupBy or Join). Users can request multiple lookups, for example: | ||
```bash | ||
$ curl -X POST 'http://localhost:9000/v1/fetch/join/quickstart%2Ftraining_set.v2' -H 'Content-Type: application/json' -d '[{"user_id": "5"}, {"user_id": "7"}]' | ||
``` | ||
|
||
The response status is 4xx (in case of errors parsing the incoming json request payload), 5xx (internal error like the KV store being unreachable) or 200 (some / all successful lookups). | ||
In case of the 200 response, the payload looks like the example shown below: | ||
```json | ||
{ | ||
"results": [ | ||
{ | ||
"status": "Success", | ||
"entityKeys": { | ||
"user_id": "5" | ||
}, | ||
"features": { | ||
"A": 12, | ||
"B": 24 | ||
} | ||
}, | ||
{ | ||
"status": "Success", | ||
"entityKeys": { | ||
"user_id": "7" | ||
}, | ||
"features": { | ||
"A": 36, | ||
"B": 48, | ||
} | ||
} | ||
] | ||
} | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
package ai.chronon.service; | ||
|
||
import ai.chronon.online.Api; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import scala.util.ScalaVersionSpecificCollectionsConverter; | ||
nikhil-zlai marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
import java.io.File; | ||
import java.net.URL; | ||
import java.net.URLClassLoader; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
|
||
/** | ||
* Responsible for loading the relevant concrete Chronon Api implementation and providing that | ||
* for use in the Web service code. We follow similar semantics as the Driver to configure this: | ||
* online.jar - Jar that contains the implementation of the Api | ||
* online.class - Name of the Api class | ||
* online.api.props - Structure that contains fields that are loaded and passed to the Api implementation | ||
* during instantiation to configure it (e.g. connection params) | ||
*/ | ||
public class ApiProvider { | ||
private static final Logger logger = LoggerFactory.getLogger(ApiProvider.class); | ||
|
||
public static Api buildApi(ConfigStore configStore) throws Exception { | ||
nikhil-zlai marked this conversation as resolved.
Show resolved
Hide resolved
|
||
configStore.validateOnlineApiConfig(); | ||
|
||
// we've already validated and confirmed these are present | ||
String jarPath = configStore.getOnlineJar().get(); | ||
String className = configStore.getOnlineClass().get(); | ||
|
||
File jarFile = new File(jarPath); | ||
if (!jarFile.exists()) { | ||
throw new IllegalArgumentException("JAR file does not exist: " + jarPath); | ||
} | ||
|
||
logger.info("Loading API implementation from JAR: {}, class: {}", jarPath, className); | ||
|
||
// Create class loader for the API JAR | ||
URL jarUrl = jarFile.toURI().toURL(); | ||
URLClassLoader apiClassLoader = new URLClassLoader( | ||
new URL[]{jarUrl}, | ||
ApiProvider.class.getClassLoader() | ||
); | ||
|
||
// Load and instantiate the API implementation | ||
Class<?> apiClass = Class.forName(className, true, apiClassLoader); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could this be null? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shouldn't be null as we create a Optional.ofNullable in the cfg store and check the optional is present else throw an ex. It could be empty / misconfigured in which case we'd throw an error at startup. |
||
if (!Api.class.isAssignableFrom(apiClass)) { | ||
throw new IllegalArgumentException( | ||
"Class " + className + " does not extend the Api abstract class" | ||
); | ||
} | ||
Comment on lines
+47
to
+52
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Handle exceptions when instantiating the API class When calling Wrap the instantiation in a try-catch block and provide informative error messages: try {
Constructor<?> constructor = apiClass.getConstructor(scala.collection.immutable.Map.class);
return (Api) constructor.newInstance(scalaPropsMap);
} catch (NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException("Failed to instantiate API class: " + className, e);
} |
||
|
||
Map<String, String> propsMap = configStore.getOnlineApiProps(); | ||
scala.collection.immutable.Map<String, String> scalaPropsMap = ScalaVersionSpecificCollectionsConverter.convertJavaMapToScala(propsMap); | ||
|
||
return (Api) apiClass.getConstructors()[0].newInstance(scalaPropsMap); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
package ai.chronon.service; | ||
|
||
import ai.chronon.online.Metrics; | ||
import io.micrometer.core.instrument.Clock; | ||
import io.micrometer.core.instrument.MeterRegistry; | ||
import io.micrometer.statsd.StatsdConfig; | ||
import io.micrometer.statsd.StatsdMeterRegistry; | ||
import io.vertx.core.Launcher; | ||
import io.vertx.core.VertxOptions; | ||
import io.vertx.micrometer.Label; | ||
import io.vertx.micrometer.MicrometerMetricsFactory; | ||
import io.vertx.micrometer.MicrometerMetricsOptions; | ||
|
||
import java.util.HashMap; | ||
import java.util.Map; | ||
|
||
/** | ||
* Custom launcher to help configure the Chronon vertx feature service | ||
* to handle things like setting up a statsd metrics registry. | ||
* We use statsd here to be consistent with the rest of our project (e.g. fetcher code). | ||
* This allows us to send Vertx webservice metrics along with fetcher related metrics to allow users | ||
* to debug performance issues and set alerts etc. | ||
*/ | ||
public class ChrononServiceLauncher extends Launcher { | ||
|
||
@Override | ||
public void beforeStartingVertx(VertxOptions options) { | ||
|
||
StatsdConfig config = new StatsdConfig() { | ||
private final String statsdHost = Metrics.Context$.MODULE$.statsHost(); | ||
private final String statsdPort = String.valueOf(Metrics.Context$.MODULE$.statsPort()); | ||
|
||
final Map<String, String> statsProps = new HashMap<String, String>() {{ | ||
put(prefix() + "." + "port", statsdPort); | ||
put(prefix() + "." + "host", statsdHost); | ||
put(prefix() + "." + "protocol", Integer.parseInt(statsdPort) == 0 ? "UDS_DATAGRAM" : "UDP"); | ||
}}; | ||
|
||
@Override | ||
public String get(String key) { | ||
return statsProps.get(key); | ||
} | ||
}; | ||
Comment on lines
+29
to
+43
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add input validation and improve error handling. The StatsdConfig implementation has several areas for improvement:
Consider applying these improvements: private final String statsdHost = Metrics.Context$.MODULE$.statsHost();
private final String statsdPort = String.valueOf(Metrics.Context$.MODULE$.statsPort());
+
+if (statsdHost == null || statsdHost.trim().isEmpty()) {
+ throw new IllegalStateException("StatsD host configuration is missing");
+}
+
+// Use an immutable map builder pattern
+private final Map<String, String> statsProps = Map.of(
+ prefix() + "." + "port", statsdPort,
+ prefix() + "." + "host", statsdHost,
+ prefix() + "." + "protocol", determineProtocol(statsdPort)
+);
+
+private String determineProtocol(String port) {
+ try {
+ return Integer.parseInt(port) == 0 ? "UDS_DATAGRAM" : "UDP";
+ } catch (NumberFormatException e) {
+ throw new IllegalStateException("Invalid StatsD port configuration", e);
+ }
+}
|
||
|
||
MeterRegistry registry = new StatsdMeterRegistry(config, Clock.SYSTEM); | ||
MicrometerMetricsFactory metricsFactory = new MicrometerMetricsFactory(registry); | ||
|
||
// Configure metrics via statsd | ||
MicrometerMetricsOptions metricsOptions = new MicrometerMetricsOptions() | ||
.setEnabled(true) | ||
.setJvmMetricsEnabled(true) | ||
.setFactory(metricsFactory) | ||
.addLabels(Label.HTTP_METHOD, Label.HTTP_CODE, Label.HTTP_PATH); | ||
|
||
options.setMetricsOptions(metricsOptions); | ||
} | ||
|
||
public static void main(String[] args) { | ||
new ChrononServiceLauncher().dispatch(args); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets do log4j2 everywhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm ok with log4j2 though I do want to ensure the settings we've included in the logback.xml are mimicked correctly in the log4j2 props as many of them are useful for prod safety and perf (async logging, log rotation on file sizes etc). If we want to swap I'll propose doing this in a follow up to not drag this PR out.