Skip to content

Backport service changes to zipline fork #70

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/test_scala_no_spark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,8 @@ jobs:
run: |
export SBT_OPTS="-Xmx8G -Xms2G"
sbt "++ 2.12.18 hub/test"

- name: Run service tests
run: |
export SBT_OPTS="-Xmx8G -Xms2G"
sbt "++ 2.12.18 service/test"
58 changes: 55 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ lazy val jackson_2_15 = "2.15.2"
lazy val avro_1_11 = "1.11.2"
lazy val circeVersion = "0.14.9"
lazy val deltaVersion = "3.2.0"
lazy val slf4jApiVersion = "2.0.12"
lazy val logbackClassicVersion = "1.5.6"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets do log4j2 everywhere

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with log4j2 though I do want to ensure the settings we've included in the logback.xml are mimicked correctly in the log4j2 props as many of them are useful for prod safety and perf (async logging, log rotation on file sizes etc). If we want to swap I'll propose doing this in a follow up to not drag this PR out.


// skip tests on assembly - uncomment if builds become slow
// ThisBuild / assembly / test := {}
Expand All @@ -53,7 +55,7 @@ inThisBuild(
lazy val supportedVersions = List(scala_2_12) // List(scala211, scala212, scala213)

lazy val root = (project in file("."))
.aggregate(api, aggregator, online, spark, flink, cloud_gcp, cloud_aws, hub)
.aggregate(api, aggregator, online, spark, flink, cloud_gcp, cloud_aws, hub, service)
.settings(name := "chronon")

val spark_sql = Seq(
Expand Down Expand Up @@ -278,11 +280,61 @@ lazy val hub = (project in file("hub"))
),
// Ensure consistent versions of logging libraries
dependencyOverrides ++= Seq(
"org.slf4j" % "slf4j-api" % "1.7.36",
"ch.qos.logback" % "logback-classic" % "1.2.13"
"org.slf4j" % "slf4j-api" % slf4jApiVersion,
"ch.qos.logback" % "logback-classic" % logbackClassicVersion
)
)

lazy val service = (project in file("service"))
.dependsOn(online)
.settings(
Comment on lines +288 to +290
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add cross-compilation settings

The service module should specify cross-compilation settings like other modules to ensure compatibility across Scala versions.

lazy val service = (project in file("service"))
  .dependsOn(online)
  .settings(
+   crossScalaVersions := supportedVersions,
    assembly / assemblyJarName := s"${name.value}-${version.value}.jar",

Committable suggestion skipped: line range outside the PR's diff.

assembly / assemblyJarName := s"${name.value}-${version.value}.jar",
Copy link

@coderabbitai coderabbitai bot Nov 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Remove duplicate assemblyJarName setting

The assemblyJarName is defined twice in the service project settings.

 .settings(
   assembly / assemblyJarName := s"${name.value}-${version.value}.jar",
   assembly / artifact := {
     val art = (assembly / artifact).value
     art.withClassifier(Some("assembly"))
   },
   addArtifact(assembly / artifact, assembly),
   libraryDependencies ++= Seq(
     // ... dependencies ...
   ),
-  assembly / assemblyJarName := s"${name.value}-${version.value}.jar",

Also applies to: 305-305

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good observation - what happens if we don't specify this?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nikhil-zlai, if we don't specify assembly / assemblyJarName, the sbt-assembly plugin will use its default naming convention for the assembled JAR file, which is typically assembly.jar. This means the generated JAR won't include the project name or version in its filename.

Not specifying a custom assemblyJarName can lead to confusion or potential overwriting of JAR files if multiple projects are assembled, as their artifacts might have the same default name. By specifying:

assembly / assemblyJarName := s"${name.value}-${version.value}.jar"

we ensure that the assembled JAR file includes both the project name and version, making it uniquely identifiable and preventing naming conflicts. This helps maintain clear and consistent artifact naming across different builds.

assembly / artifact := {
val art = (assembly / artifact).value
art.withClassifier(Some("assembly"))
},
addArtifact(assembly / artifact, assembly),
libraryDependencies ++= Seq(
"io.vertx" % "vertx-core" % "4.5.10",
"io.vertx" % "vertx-web" % "4.5.10",
"io.vertx" % "vertx-config" % "4.5.10",
"ch.qos.logback" % "logback-classic" % logbackClassicVersion,
"org.slf4j" % "slf4j-api" % slf4jApiVersion,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're at the point of commiting to web frameworks, could we make a call on the logging implementation we're using project-wide? If we're using logback for app logging we might as well do the same for spark etc.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

although I'd slightly prefer log4j2 (for the sole reason of not wanting to write xml 😬 )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind swapping to logj2 if we can mimic all the settings we have going in the logback.xml. I like the settings to limit file sizes, roll over, write to log + stdout async. I don't really think its the end of the world though if we do have different configs for logging for spark jobs & services. The considerations are fairly different so its fine if they're separate I think.

Copy link
Collaborator

@tchow-zlai tchow-zlai Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as long as we don't combine classpaths between hub and spark in the final version of this it's 👍 with me

Copy link
Contributor

@nikhil-zlai nikhil-zlai Dec 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets do log4j2 everywhere - preferably with mostly the same settings/pattern file in all environments. Hunting these log lib class path inconsistencies is gnarly. Also less things for folks to learn.

I like logback's functionality and log4j2 syntax. I think spark likes log4j2 as well - as their recommended logging lib.

Copy link
Collaborator

@tchow-zlai tchow-zlai Dec 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark uses slf4j so it's technically compatible with either - although if we were doing logback for spark we have to be sure to do the correct classpath sanitization in the spark builds. I do like log4j2's syntax. They do have size-based appending: https://logging.apache.org/log4j/2.x/manual/appenders/rolling-file.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets address this in a follow up - I don't mind swapping to log4j2 since you folks seem to be in favor of it. I don't want to work through the testing of the configs etc again as part of the critical path of this PR. It look a little iterating and I don't think it's worth re-spending the time immediately, I / someone can pick this up async.

"com.typesafe" % "config" % "1.4.3",
// force netty versions -> without this we conflict with the versions pulled in from
// our online module's spark deps which causes the web-app to not serve up content
"io.netty" % "netty-all" % "4.1.111.Final",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Update Netty version

The current Netty version might have security vulnerabilities.

-"io.netty" % "netty-all" % "4.1.111.Final",
+"io.netty" % "netty-all" % "4.1.112.Final",
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"io.netty" % "netty-all" % "4.1.111.Final",
"io.netty" % "netty-all" % "4.1.112.Final",

// wire up metrics using micro meter and statsd
"io.vertx" % "vertx-micrometer-metrics" % "4.5.10",
"io.micrometer" % "micrometer-registry-statsd" % "1.13.6",
"junit" % "junit" % "4.13.2" % Test,
"com.novocode" % "junit-interface" % "0.11" % Test,
"org.mockito" % "mockito-core" % "5.12.0" % Test,
"io.vertx" % "vertx-unit" % "4.5.10" % Test,
),
// Assembly settings
assembly / assemblyJarName := s"${name.value}-${version.value}.jar",

// Main class configuration
// We use a custom launcher to help us wire up our statsd metrics
Compile / mainClass := Some("ai.chronon.service.ChrononServiceLauncher"),
assembly / mainClass := Some("ai.chronon.service.ChrononServiceLauncher"),

// Merge strategy for assembly
assembly / assemblyMergeStrategy := {
case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard
case PathList("META-INF", xs @ _*) => MergeStrategy.first
case PathList("javax", "activation", xs @ _*) => MergeStrategy.first
case PathList("org", "apache", "logging", xs @ _*) => MergeStrategy.first
case PathList("org", "slf4j", xs @ _*) => MergeStrategy.first
case "application.conf" => MergeStrategy.concat
case "reference.conf" => MergeStrategy.concat
case x =>
val oldStrategy = (assembly / assemblyMergeStrategy).value
oldStrategy(x)
}
)

ThisBuild / assemblyMergeStrategy := {
case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard
case PathList("META-INF", _*) => MergeStrategy.filterDistinctLines
Expand Down
5 changes: 4 additions & 1 deletion online/src/main/scala/ai/chronon/online/Metrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ object Metrics {
)
}

// Host can also be a Unix socket like: unix:///opt/datadog-agent/run/dogstatsd.sock
// In the unix socket case port is configured to be 0
val statsHost: String = System.getProperty("ai.chronon.metrics.host", "localhost")
val statsPort: Int = System.getProperty("ai.chronon.metrics.port", "8125").toInt
val tagCache: TTLCache[Context, String] = new TTLCache[Context, String](
{ ctx => ctx.toTags.reverse.mkString(",") },
Expand All @@ -135,7 +138,7 @@ object Metrics {
)

private val statsClient: NonBlockingStatsDClient =
new NonBlockingStatsDClientBuilder().prefix("ai.zipline").hostname("localhost").port(statsPort).build()
new NonBlockingStatsDClientBuilder().prefix("ai.zipline").hostname(statsHost).port(statsPort).build()

}

Expand Down
112 changes: 112 additions & 0 deletions service/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Chronon Feature Fetching Service

The feature service module consists of code to bring up a service that provides a thin shim around the Fetcher code. This
is meant to aid Chronon adopters who either need a quicker way to get a feature serving layer up and running or need to
build a way to retrieve features and typically work in a non-JVM based organization.

## Core Technology

The Chronon Feature Service is built on top of the [Vert.x](https://vertx.io/) JVM framework. Vert.x is a high-performance
web framework which supports HTTP and gRPC based services.

## Running locally

To build the service sub-module:
```bash
~/workspace/chronon $ sbt "project service" clean assembly
```

To test out the service, you also need to build a concrete instantiation of the [Api](https://github.com/airbnb/chronon/blob/main/online/src/main/scala/ai/chronon/online/Api.scala#L187).
We can leverage the [quickstart Mongo API](https://github.com/airbnb/chronon/tree/main/quickstart/mongo-online-impl) for this:
```bash
~/workspace/chronon $ cd quickstart/mongo-online-impl
~/workspace/chronon/quickstart/mongo-online-impl $ sbt assembly
...
[success] Total time: 1 s, completed Nov 6, 2024, 2:35:26 PM
```
This command will write out a file in the target/scala-2.12 sub-directory.

We can now use this to start up the feature service:
```bash
~/workspace/chronon $ java -jar service/target/scala-2.12/service-*.jar run ai.chronon.service.FetcherVerticle \
-Dserver.port=9000 -conf service/src/main/resources/example_config.json
...
14:39:26.626 [vert.x-eventloop-thread-1] INFO a.chronon.service.WebServiceVerticle - HTTP server started on port 9000
14:39:26.627 [vert.x-eventloop-thread-0] INFO i.v.c.i.l.c.VertxIsolatedDeployer - Succeeded in deploying verticle
```

A few things to call out so you can customize:
- Choose your port (this is where you'll hit your webservice with traffic)
- Update the example_config.json (specifically confirm the path to the mongo-online-impl assembly jar matches your setup)

Comment on lines +38 to +41
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Document configuration schema

The example_config.json is referenced but its schema is not documented. Consider:

  1. Adding the configuration schema
  2. Documenting all available configuration options
  3. Providing example configurations for different scenarios

If you'd like some real data to query from the feature service, make sure to run through the relevant steps of the
[Quickstart - Online Flows](https://chronon.ai/getting_started/Tutorial.html#online-flows) tutorial.

Some examples to curl the webservice:
```bash
$ curl 'http://localhost:9000/ping'
$ curl 'http://localhost:9000/config'
$ curl -X POST 'http://localhost:9000/v1/fetch/join/quickstart%2Ftraining_set.v2' -H 'Content-Type: application/json' -d '[{"user_id": "5"}]'
```

## Metrics

The Vert.x feature service relies on the same statsd host / port coordinates as the rest of the Chronon project -
[Metrics](https://github.com/airbnb/chronon/blob/main/online/src/main/scala/ai/chronon/online/Metrics.scala#L135). When configured correctly,
the service will emit metrics captured by [Vert.x](https://vertx.io/docs/vertx-micrometer-metrics/java/#_http_client), JVM metrics as well as metrics
captured by existing Chronon Fetcher code.

To view these metrics for your locally running feature service:
- Install the [statsd-logger](https://github.com/jimf/statsd-logger) npm module (`npm install -g statsd-logger`)
- Run the command - `statsd-logger`

Now you should see metrics of the format:
```bash
$ statsd-logger
Server listening on 0.0.0.0:8125
StatsD Metric: jvm.buffer.memory.used 12605920|g|#statistic:value,id:direct
StatsD Metric: jvm.threads.states 0|g|#statistic:value,state:blocked
StatsD Metric: jvm.memory.used 8234008|g|#statistic:value,area:nonheap,id:Compressed Class Space
StatsD Metric: jvm.threads.states 19|g|#statistic:value,state:runnable
StatsD Metric: system.load.average.1m 1.504883|g|#statistic:value
StatsD Metric: vertx.http.server.active.requests 0|g|#statistic:value,method:GET,path:/ping
StatsD Metric: ai.zipline.join.fetch.join_request.count 1|c|#null,null,null,null,environment:join.fetch,owner:quickstart,team:quickstart,production:false,join:quickstart_training_set_v2
StatsD Metric: ai.zipline.join.fetch.group_by_request.count 1|c|#null,null,accuracy:SNAPSHOT,environment:join.fetch,owner:quickstart,team:quickstart,production:false,group_by:quickstart_purchases_v1,join:quickstart_training_set_v2
...
```

## Features Lookup Response Structure

The /v1/fetch/join and /v1/fetch/groupby endpoints are bulkGet endpoints (against a single GroupBy or Join). Users can request multiple lookups, for example:
```bash
$ curl -X POST 'http://localhost:9000/v1/fetch/join/quickstart%2Ftraining_set.v2' -H 'Content-Type: application/json' -d '[{"user_id": "5"}, {"user_id": "7"}]'
```

The response status is 4xx (in case of errors parsing the incoming json request payload), 5xx (internal error like the KV store being unreachable) or 200 (some / all successful lookups).
In case of the 200 response, the payload looks like the example shown below:
```json
{
"results": [
{
"status": "Success",
"entityKeys": {
"user_id": "5"
},
"features": {
"A": 12,
"B": 24
}
},
{
"status": "Success",
"entityKeys": {
"user_id": "7"
},
"features": {
"A": 36,
"B": 48,
}
}
]
}
```
59 changes: 59 additions & 0 deletions service/src/main/java/ai/chronon/service/ApiProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package ai.chronon.service;

import ai.chronon.online.Api;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.util.ScalaVersionSpecificCollectionsConverter;

import java.io.File;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Map;
import java.util.Optional;

/**
* Responsible for loading the relevant concrete Chronon Api implementation and providing that
* for use in the Web service code. We follow similar semantics as the Driver to configure this:
* online.jar - Jar that contains the implementation of the Api
* online.class - Name of the Api class
* online.api.props - Structure that contains fields that are loaded and passed to the Api implementation
* during instantiation to configure it (e.g. connection params)
*/
public class ApiProvider {
private static final Logger logger = LoggerFactory.getLogger(ApiProvider.class);

public static Api buildApi(ConfigStore configStore) throws Exception {
configStore.validateOnlineApiConfig();

// we've already validated and confirmed these are present
String jarPath = configStore.getOnlineJar().get();
String className = configStore.getOnlineClass().get();

File jarFile = new File(jarPath);
if (!jarFile.exists()) {
throw new IllegalArgumentException("JAR file does not exist: " + jarPath);
}

logger.info("Loading API implementation from JAR: {}, class: {}", jarPath, className);

// Create class loader for the API JAR
URL jarUrl = jarFile.toURI().toURL();
URLClassLoader apiClassLoader = new URLClassLoader(
new URL[]{jarUrl},
ApiProvider.class.getClassLoader()
);

// Load and instantiate the API implementation
Class<?> apiClass = Class.forName(className, true, apiClassLoader);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this be null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't be null as we create a Optional.ofNullable in the cfg store and check the optional is present else throw an ex. It could be empty / misconfigured in which case we'd throw an error at startup.

if (!Api.class.isAssignableFrom(apiClass)) {
throw new IllegalArgumentException(
"Class " + className + " does not extend the Api abstract class"
);
}
Comment on lines +47 to +52
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Handle exceptions when instantiating the API class

When calling newInstance, several exceptions can be thrown (e.g., InstantiationException, IllegalAccessException, InvocationTargetException). Ensure that these exceptions are properly handled or propagated with meaningful messages to aid in debugging.

Wrap the instantiation in a try-catch block and provide informative error messages:

try {
    Constructor<?> constructor = apiClass.getConstructor(scala.collection.immutable.Map.class);
    return (Api) constructor.newInstance(scalaPropsMap);
} catch (NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
    throw new RuntimeException("Failed to instantiate API class: " + className, e);
}


Map<String, String> propsMap = configStore.getOnlineApiProps();
scala.collection.immutable.Map<String, String> scalaPropsMap = ScalaVersionSpecificCollectionsConverter.convertJavaMapToScala(propsMap);

return (Api) apiClass.getConstructors()[0].newInstance(scalaPropsMap);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package ai.chronon.service;

import ai.chronon.online.Metrics;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.statsd.StatsdConfig;
import io.micrometer.statsd.StatsdMeterRegistry;
import io.vertx.core.Launcher;
import io.vertx.core.VertxOptions;
import io.vertx.micrometer.Label;
import io.vertx.micrometer.MicrometerMetricsFactory;
import io.vertx.micrometer.MicrometerMetricsOptions;

import java.util.HashMap;
import java.util.Map;

/**
* Custom launcher to help configure the Chronon vertx feature service
* to handle things like setting up a statsd metrics registry.
* We use statsd here to be consistent with the rest of our project (e.g. fetcher code).
* This allows us to send Vertx webservice metrics along with fetcher related metrics to allow users
* to debug performance issues and set alerts etc.
*/
public class ChrononServiceLauncher extends Launcher {

@Override
public void beforeStartingVertx(VertxOptions options) {

StatsdConfig config = new StatsdConfig() {
private final String statsdHost = Metrics.Context$.MODULE$.statsHost();
private final String statsdPort = String.valueOf(Metrics.Context$.MODULE$.statsPort());

final Map<String, String> statsProps = new HashMap<String, String>() {{
put(prefix() + "." + "port", statsdPort);
put(prefix() + "." + "host", statsdHost);
put(prefix() + "." + "protocol", Integer.parseInt(statsdPort) == 0 ? "UDS_DATAGRAM" : "UDP");
}};

@Override
public String get(String key) {
return statsProps.get(key);
}
};
Comment on lines +29 to +43
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add input validation and improve error handling.

The StatsdConfig implementation has several areas for improvement:

  1. Missing validation for statsdHost value
  2. No null checks for Metrics.Context values
  3. Protocol selection logic needs verification

Consider applying these improvements:

 private final String statsdHost = Metrics.Context$.MODULE$.statsHost();
 private final String statsdPort = String.valueOf(Metrics.Context$.MODULE$.statsPort());
+
+if (statsdHost == null || statsdHost.trim().isEmpty()) {
+    throw new IllegalStateException("StatsD host configuration is missing");
+}
+
+// Use an immutable map builder pattern
+private final Map<String, String> statsProps = Map.of(
+    prefix() + "." + "port", statsdPort,
+    prefix() + "." + "host", statsdHost,
+    prefix() + "." + "protocol", determineProtocol(statsdPort)
+);
+
+private String determineProtocol(String port) {
+    try {
+        return Integer.parseInt(port) == 0 ? "UDS_DATAGRAM" : "UDP";
+    } catch (NumberFormatException e) {
+        throw new IllegalStateException("Invalid StatsD port configuration", e);
+    }
+}

Committable suggestion skipped: line range outside the PR's diff.


MeterRegistry registry = new StatsdMeterRegistry(config, Clock.SYSTEM);
MicrometerMetricsFactory metricsFactory = new MicrometerMetricsFactory(registry);

// Configure metrics via statsd
MicrometerMetricsOptions metricsOptions = new MicrometerMetricsOptions()
.setEnabled(true)
.setJvmMetricsEnabled(true)
.setFactory(metricsFactory)
.addLabels(Label.HTTP_METHOD, Label.HTTP_CODE, Label.HTTP_PATH);

options.setMetricsOptions(metricsOptions);
}

public static void main(String[] args) {
new ChrononServiceLauncher().dispatch(args);
}
}
Loading
Loading