Skip to content

Commit 4f61c89

Browse files
committed
Address review comments
1 parent 7225da0 commit 4f61c89

File tree

11 files changed

+103
-85
lines changed

11 files changed

+103
-85
lines changed

.github/workflows/test_scala_no_spark.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,3 +66,8 @@ jobs:
6666
run: |
6767
export SBT_OPTS="-Xmx8G -Xms2G"
6868
sbt "++ 2.12.18 hub/test"
69+
70+
- name: Run service tests
71+
run: |
72+
export SBT_OPTS="-Xmx8G -Xms2G"
73+
sbt "++ 2.12.18 service/test"

service/README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ This command will write out a file in the target/scala-2.12 sub-directory.
2828

2929
We can now use this to start up the feature service:
3030
```bash
31-
~/workspace/chronon $ java -jar service/target/scala-2.12/service-*.jar run ai.chronon.service.WebServiceVerticle \
31+
~/workspace/chronon $ java -jar service/target/scala-2.12/service-*.jar run ai.chronon.service.FetcherVerticle \
3232
-Dserver.port=9000 -conf service/src/main/resources/example_config.json
3333
...
3434
14:39:26.626 [vert.x-eventloop-thread-1] INFO a.chronon.service.WebServiceVerticle - HTTP server started on port 9000
@@ -46,7 +46,7 @@ Some examples to curl the webservice:
4646
```bash
4747
$ curl 'http://localhost:9000/ping'
4848
$ curl 'http://localhost:9000/config'
49-
$ curl -X POST 'http://localhost:9000/v1/features/join/quickstart%2Ftraining_set.v2' -H 'Content-Type: application/json' -d '[{"user_id": "5"}]'
49+
$ curl -X POST 'http://localhost:9000/v1/fetch/join/quickstart%2Ftraining_set.v2' -H 'Content-Type: application/json' -d '[{"user_id": "5"}]'
5050
```
5151

5252
## Metrics
@@ -77,9 +77,9 @@ StatsD Metric: ai.zipline.join.fetch.group_by_request.count 1|c|#null,null,accur
7777

7878
## Features Lookup Response Structure
7979

80-
The /v1/features/join and /v1/features/groupby endpoints are bulkGet endpoints (against a single GroupBy or Join). Users can request multiple lookups, for example:
80+
The /v1/fetch/join and /v1/fetch/groupby endpoints are bulkGet endpoints (against a single GroupBy or Join). Users can request multiple lookups, for example:
8181
```bash
82-
$ curl -X POST 'http://localhost:9000/v1/features/join/quickstart%2Ftraining_set.v2' -H 'Content-Type: application/json' -d '[{"user_id": "5"}, {"user_id": "7"}]'
82+
$ curl -X POST 'http://localhost:9000/v1/fetch/join/quickstart%2Ftraining_set.v2' -H 'Content-Type: application/json' -d '[{"user_id": "5"}, {"user_id": "7"}]'
8383
```
8484

8585
The response status is 4xx (in case of errors parsing the incoming json request payload), 5xx (internal error like the KV store being unreachable) or 200 (some / all successful lookups).

service/src/main/java/ai/chronon/service/ApiProvider.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,12 @@ public class ApiProvider {
2323
private static final Logger logger = LoggerFactory.getLogger(ApiProvider.class);
2424

2525
public static Api buildApi(ConfigStore configStore) throws Exception {
26-
Optional<String> maybeJarPath = configStore.getOnlineJar();
27-
Optional<String> maybeClass = configStore.getOnlineClass();
28-
if (!(maybeJarPath.isPresent() && maybeClass.isPresent())) {
29-
throw new IllegalArgumentException("Both 'online.jar' and 'online.class' configs must be set.");
30-
}
26+
configStore.validateOnlineApiConfig();
27+
28+
// we've already validated and confirmed these are present
29+
String jarPath = configStore.getOnlineJar().get();
30+
String className = configStore.getOnlineClass().get();
3131

32-
String jarPath = maybeJarPath.get();
33-
String className = maybeClass.get();
3432
File jarFile = new File(jarPath);
3533
if (!jarFile.exists()) {
3634
throw new IllegalArgumentException("JAR file does not exist: " + jarPath);

service/src/main/java/ai/chronon/service/ConfigStore.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,12 @@ public Optional<String> getOnlineClass() {
6464
return Optional.ofNullable(jsonConfig.getString(ONLINE_CLASS));
6565
}
6666

67+
public void validateOnlineApiConfig() {
68+
if (!(getOnlineJar().isPresent() && getOnlineClass().isPresent())) {
69+
throw new IllegalArgumentException("Both 'online.jar' and 'online.class' configs must be set.");
70+
}
71+
}
72+
6773
public Map<String, String> getOnlineApiProps() {
6874
JsonObject apiProps = jsonConfig.getJsonObject(ONLINE_API_PROPS);
6975
if (apiProps == null) {

service/src/main/java/ai/chronon/service/WebServiceVerticle.java renamed to service/src/main/java/ai/chronon/service/FetcherVerticle.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package ai.chronon.service;
22

33
import ai.chronon.online.Api;
4-
import ai.chronon.service.handlers.FeaturesRouter;
4+
import ai.chronon.service.handlers.FetcherRouter;
55
import io.vertx.core.AbstractVerticle;
66
import io.vertx.core.Promise;
77
import io.vertx.core.http.HttpServer;
@@ -11,12 +11,12 @@
1111
import org.slf4j.LoggerFactory;
1212

1313
/**
14-
* Entry point for the Chronon webservice. We wire up our API routes and configure and launch our HTTP service here.
14+
* Entry point for the Chronon fetcher endpoints. We wire up our API routes and configure and launch our HTTP service here.
1515
* We choose to use just 1 verticle for now as it allows us to keep things simple and we don't need to scale /
1616
* independently deploy different endpoint routes.
1717
*/
18-
public class WebServiceVerticle extends AbstractVerticle {
19-
private static final Logger logger = LoggerFactory.getLogger(WebServiceVerticle.class);
18+
public class FetcherVerticle extends AbstractVerticle {
19+
private static final Logger logger = LoggerFactory.getLogger(FetcherVerticle.class);
2020

2121
private HttpServer server;
2222

@@ -32,7 +32,7 @@ protected void startHttpServer(int port, String configJsonString, Api api, Promi
3232
// Define routes
3333

3434
// Set up sub-routes for the various feature retrieval apis
35-
router.route("/v1/features/*").subRouter(FeaturesRouter.createFeaturesRoutes(vertx, api));
35+
router.route("/v1/fetch/*").subRouter(FetcherRouter.createFetchRoutes(vertx, api));
3636

3737
// Health check route
3838
router.get("/ping").handler(ctx -> {

service/src/main/java/ai/chronon/service/handlers/FeaturesRouter.java

Lines changed: 0 additions & 24 deletions
This file was deleted.

service/src/main/java/ai/chronon/service/handlers/FeaturesHandler.java renamed to service/src/main/java/ai/chronon/service/handlers/FetchHandler.java

Lines changed: 29 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@
1919
import java.util.List;
2020
import java.util.Map;
2121
import java.util.concurrent.CompletableFuture;
22+
import java.util.function.BiFunction;
23+
import java.util.function.Function;
2224
import java.util.stream.Collectors;
2325

2426
import static ai.chronon.service.model.GetFeaturesResponse.Result.Status.Failure;
2527
import static ai.chronon.service.model.GetFeaturesResponse.Result.Status.Success;
2628

2729
/**
28-
* Concrete implementation of the GetFeatures endpoints. Supports loading groupBys and joins.
30+
* Concrete implementation of the Chronon fetcher endpoints. Supports loading groupBys and joins.
2931
* Some notes on this:
3032
* We currently support bulkGet lookups against a single groupBy / join. Attempts to lookup n different GroupBys / Joins
3133
* need to be split up into n different requests.
@@ -36,39 +38,23 @@
3638
* As an example:
3739
* { results: [ {"status": "Success", "features": ...}, {"status": "Failure", "error": ...} ] }
3840
*/
39-
public class FeaturesHandler implements Handler<RoutingContext> {
41+
public class FetchHandler implements Handler<RoutingContext> {
4042

41-
public enum EntityType {
42-
GroupBy,
43-
Join
44-
}
45-
46-
// PoJo to simplify transforming responses from the Fetcher to the final results form we return
47-
private static class EntityKeyToValues {
48-
public EntityKeyToValues(Map<String, Object> keys, JTry<Map<String, Object>> values) {
49-
this.entityKeys = keys;
50-
this.features = values;
51-
}
52-
53-
public Map<String, Object> entityKeys;
54-
public JTry<Map<String, Object>> features;
55-
}
56-
57-
private static final Logger logger = LoggerFactory.getLogger(FeaturesHandler.class);
43+
private static final Logger logger = LoggerFactory.getLogger(FetchHandler.class);
5844
private static final ObjectMapper objectMapper = new ObjectMapper();
5945

60-
private final EntityType entityType;
6146
private final JavaFetcher fetcher;
47+
private final BiFunction<JavaFetcher, List<JavaRequest>, CompletableFuture<List<JavaResponse>>> fetchFunction;
6248

63-
public FeaturesHandler(EntityType entityType, JavaFetcher fetcher) {
64-
this.entityType = entityType;
49+
public FetchHandler(JavaFetcher fetcher, BiFunction<JavaFetcher, List<JavaRequest>, CompletableFuture<List<JavaResponse>>> fetchFunction) {
6550
this.fetcher = fetcher;
51+
this.fetchFunction = fetchFunction;
6652
}
6753

6854
@Override
6955
public void handle(RoutingContext ctx) {
7056
String entityName = ctx.pathParam("name");
71-
logger.debug("Retrieving {} - {}", entityType.name(), entityName);
57+
logger.debug("Retrieving {}", entityName);
7258
JTry<List<JavaRequest>> maybeRequest = parseJavaRequest(entityName, ctx.body());
7359
if (! maybeRequest.isSuccess()) {
7460
logger.error("Unable to parse request body", maybeRequest.getException());
@@ -81,27 +67,19 @@ public void handle(RoutingContext ctx) {
8167
}
8268

8369
List<JavaRequest> requests = maybeRequest.getValue();
84-
CompletableFuture<List<JavaResponse>> resultsJavaFuture =
85-
entityType.equals(EntityType.GroupBy) ? fetcher.fetchGroupBys(requests) : fetcher.fetchJoin(requests);
70+
CompletableFuture<List<JavaResponse>> resultsJavaFuture = fetchFunction.apply(fetcher, requests);
8671
// wrap the Java future we get in a Vert.x Future to not block the worker thread
87-
Future<List<EntityKeyToValues>> maybeFeatureResponses =
72+
Future<List<GetFeaturesResponse.Result>> maybeFeatureResponses =
8873
Future.fromCompletionStage(resultsJavaFuture)
89-
.map(result -> result.stream().map(FeaturesHandler::responseToPoJo)
74+
.map(result -> result.stream().map(FetchHandler::responseToPoJo)
9075
.collect(Collectors.toList()));
9176

9277
maybeFeatureResponses.onSuccess(resultList -> {
9378
// as this is a bulkGet request, we might have some successful and some failed responses
9479
// we return the responses in the same order as they come in and mark them as successful / failed based
9580
// on the lookups
9681
GetFeaturesResponse.Builder responseBuilder = GetFeaturesResponse.builder();
97-
List<GetFeaturesResponse.Result> results = resultList.stream().map(resultsPojo -> {
98-
if (resultsPojo.features.isSuccess()) {
99-
return GetFeaturesResponse.Result.builder().status(Success).entityKeys(resultsPojo.entityKeys).features(resultsPojo.features.getValue()).build();
100-
} else {
101-
return GetFeaturesResponse.Result.builder().status(Failure).entityKeys(resultsPojo.entityKeys).error(resultsPojo.features.getException().getMessage()).build();
102-
}
103-
}).collect(Collectors.toList());
104-
responseBuilder.results(results);
82+
responseBuilder.results(resultList);
10583

10684
ctx.response()
10785
.setStatusCode(200)
@@ -118,8 +96,22 @@ public void handle(RoutingContext ctx) {
11896
});
11997
}
12098

121-
public static EntityKeyToValues responseToPoJo(JavaResponse response) {
122-
return new EntityKeyToValues(response.request.keys, response.values);
99+
public static GetFeaturesResponse.Result responseToPoJo(JavaResponse response) {
100+
if (response.values.isSuccess()) {
101+
return GetFeaturesResponse.Result
102+
.builder()
103+
.status(Success)
104+
.entityKeys(response.request.keys)
105+
.features(response.values.getValue())
106+
.build();
107+
} else {
108+
return GetFeaturesResponse.Result
109+
.builder()
110+
.status(Failure)
111+
.entityKeys(response.request.keys)
112+
.error(response.values.getException().getMessage())
113+
.build();
114+
}
123115
}
124116

125117
public static JTry<List<JavaRequest>> parseJavaRequest(String name, RequestBody body) {
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package ai.chronon.service.handlers;
2+
3+
import ai.chronon.online.*;
4+
import io.vertx.core.Vertx;
5+
import io.vertx.ext.web.Router;
6+
import io.vertx.ext.web.handler.BodyHandler;
7+
8+
import java.util.List;
9+
import java.util.concurrent.CompletableFuture;
10+
import java.util.function.BiFunction;
11+
12+
// Configures the routes for our get features endpoints
13+
// We support bulkGets of groupBys and bulkGets of joins
14+
public class FetcherRouter {
15+
16+
public static class GroupByFetcherFunction implements BiFunction<JavaFetcher, List<JavaRequest>, CompletableFuture<List<JavaResponse>>> {
17+
@Override
18+
public CompletableFuture<List<JavaResponse>> apply(JavaFetcher fetcher, List<JavaRequest> requests) {
19+
return fetcher.fetchGroupBys(requests);
20+
}
21+
}
22+
23+
public static class JoinFetcherFunction implements BiFunction<JavaFetcher, List<JavaRequest>, CompletableFuture<List<JavaResponse>>> {
24+
@Override
25+
public CompletableFuture<List<JavaResponse>> apply(JavaFetcher fetcher, List<JavaRequest> requests) {
26+
return fetcher.fetchJoin(requests);
27+
}
28+
}
29+
30+
public static Router createFetchRoutes(Vertx vertx, Api api) {
31+
Router router = Router.router(vertx);
32+
router.route().handler(BodyHandler.create());
33+
JavaFetcher fetcher = api.buildJavaFetcher("feature-service", false);
34+
35+
router.post("/groupby/:name").handler(new FetchHandler(fetcher, new GroupByFetcherFunction()));
36+
router.post("/join/:name").handler(new FetchHandler(fetcher, new JoinFetcherFunction()));
37+
38+
return router;
39+
}
40+
}

service/src/main/java/ai/chronon/service/model/GetFeaturesResponse.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import java.util.Map;
88

99
/**
10-
* PoJo capturing the response we return back as part of /v1/features/groupby and /v1/features/join endpoints
10+
* PoJo capturing the response we return back as part of /v1/fetch/groupby and /v1/fetch/join endpoints
1111
* when the individual bulkGet lookups were either all successful or partially successful.
1212
*/
1313
@JsonInclude(JsonInclude.Include.NON_NULL)

service/src/test/java/ai/chronon/service/handlers/FeaturesHandlerJsonSerDeTest.java renamed to service/src/test/java/ai/chronon/service/handlers/FetchHandlerJsonSerDeTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import static org.mockito.Mockito.mock;
1212
import static org.mockito.Mockito.when;
1313

14-
public class FeaturesHandlerJsonSerDeTest {
14+
public class FetchHandlerJsonSerDeTest {
1515

1616
@Test
1717
public void testParsingOfSimpleJavaRequests() {
@@ -20,7 +20,7 @@ public void testParsingOfSimpleJavaRequests() {
2020
when(mockRequestBody.asString()).thenReturn(mockRequest);
2121

2222
String groupByName = "my_groupby.1";
23-
JTry<List<JavaRequest>> maybeRequest = FeaturesHandler.parseJavaRequest(groupByName, mockRequestBody);
23+
JTry<List<JavaRequest>> maybeRequest = FetchHandler.parseJavaRequest(groupByName, mockRequestBody);
2424
assertTrue(maybeRequest.isSuccess());
2525
List<JavaRequest> reqs = maybeRequest.getValue();
2626
assertEquals(1, reqs.size());
@@ -38,7 +38,7 @@ public void testParsingInvalidRequest() {
3838
when(mockRequestBody.asString()).thenReturn(mockRequest);
3939

4040
String groupByName = "my_groupby.1";
41-
JTry<List<JavaRequest>> maybeRequest = FeaturesHandler.parseJavaRequest(groupByName, mockRequestBody);
41+
JTry<List<JavaRequest>> maybeRequest = FetchHandler.parseJavaRequest(groupByName, mockRequestBody);
4242
assertFalse(maybeRequest.isSuccess());
4343
assertNotNull(maybeRequest.getException());
4444
}
@@ -50,7 +50,7 @@ public void testParsingOneValidAndInvalidRequest() {
5050
when(mockRequestBody.asString()).thenReturn(mockRequest);
5151

5252
String groupByName = "my_groupby.1";
53-
JTry<List<JavaRequest>> maybeRequest = FeaturesHandler.parseJavaRequest(groupByName, mockRequestBody);
53+
JTry<List<JavaRequest>> maybeRequest = FetchHandler.parseJavaRequest(groupByName, mockRequestBody);
5454
assertFalse(maybeRequest.isSuccess());
5555
assertNotNull(maybeRequest.getException());
5656
}

service/src/test/java/ai/chronon/service/handlers/FeaturesHandlerTest.java renamed to service/src/test/java/ai/chronon/service/handlers/FetchHandlerTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import static org.mockito.Mockito.when;
3232

3333
@RunWith(VertxUnitRunner.class)
34-
public class FeaturesHandlerTest {
34+
public class FetchHandlerTest {
3535

3636
@Mock
3737
private JavaFetcher mockFetcher;
@@ -45,7 +45,7 @@ public class FeaturesHandlerTest {
4545
@Mock
4646
RequestBody requestBody;
4747

48-
private FeaturesHandler handler;
48+
private FetchHandler handler;
4949
private Vertx vertx;
5050

5151
private static final String TEST_GROUP_BY = "test_groupby.v1";
@@ -54,7 +54,8 @@ public class FeaturesHandlerTest {
5454
public void setUp(TestContext context) {
5555
MockitoAnnotations.openMocks(this);
5656
vertx = Vertx.vertx();
57-
handler = new FeaturesHandler(FeaturesHandler.EntityType.Join, mockFetcher);
57+
58+
handler = new FetchHandler(mockFetcher, new FetcherRouter.JoinFetcherFunction());
5859

5960
// Set up common routing context behavior
6061
when(routingContext.response()).thenReturn(response);

0 commit comments

Comments
 (0)