Skip to content

Commit 0495e10

Browse files
authored
Merge branch 'main' into tchow/more-refactor-2
2 parents 0c5783a + 3e74ec6 commit 0495e10

File tree

4 files changed

+170
-193
lines changed

4 files changed

+170
-193
lines changed

.github/workflows/test_scala_2_12_non_spark.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ on:
1212
- 'hub/**'
1313
- 'orchestration/**'
1414
- 'service/**'
15+
- 'service_commons/**'
1516
- 'cloud_aws/**'
1617
- 'cloud_gcp/**'
1718
- '.github/workflows/test_scala_2_12_non_spark.yaml'
@@ -26,6 +27,7 @@ on:
2627
- 'hub/**'
2728
- 'orchestration/**'
2829
- 'service/**'
30+
- 'service_commons/**'
2931
- 'cloud_aws/**'
3032
- 'cloud_gcp/**'
3133
- '.github/workflows/test_scala_2_12_non_spark.yaml'
@@ -96,6 +98,13 @@ jobs:
9698
--google_credentials=bazel-cache-key.json \
9799
//service:tests
98100
101+
- name: Run service_commons tests
102+
run: |
103+
bazel test \
104+
--remote_cache=https://storage.googleapis.com/zipline-bazel-cache \
105+
--google_credentials=bazel-cache-key.json \
106+
//service_commons:tests
107+
99108
- name: Run orchestrator tests
100109
run: |
101110
bazel test \

service_commons/src/main/java/ai/chronon/service/RouteHandlerWrapper.java

Lines changed: 86 additions & 146 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,21 @@
44
import ai.chronon.api.thrift.protocol.TBinaryProtocol;
55
import ai.chronon.api.thrift.protocol.TSimpleJSONProtocol;
66
import ai.chronon.api.thrift.transport.TTransportException;
7+
import com.fasterxml.jackson.databind.DeserializationFeature;
8+
import com.fasterxml.jackson.databind.ObjectMapper;
79
import io.vertx.core.Handler;
10+
import io.vertx.core.json.Json;
811
import io.vertx.core.json.JsonObject;
912
import io.vertx.ext.web.RoutingContext;
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
1015

16+
import java.io.PrintWriter;
17+
import java.io.StringWriter;
1118
import java.lang.reflect.InvocationTargetException;
12-
import java.lang.reflect.Method;
13-
import java.lang.reflect.ParameterizedType;
14-
import java.lang.reflect.Type;
15-
import java.util.*;
16-
import java.util.concurrent.ConcurrentHashMap;
19+
import java.util.Base64;
20+
import java.util.Map;
1721
import java.util.function.Function;
18-
import java.util.stream.Collectors;
19-
import org.slf4j.Logger;
20-
import org.slf4j.LoggerFactory;
2122

2223
/**
2324
* Wrapper class for creating Route handlers that map parameters to an Input object and transform it to Output
@@ -58,7 +59,28 @@ public static <T extends TBase> T deserializeTBinaryBase64(String base64Data, Cl
5859
return tb;
5960
}
6061

61-
private static final Map<Class<?>, Map<String, Method>> SETTER_CACHE = new ConcurrentHashMap<>();
62+
/**
63+
* Combines path parameters, query parameters, and JSON body into a single JSON object.
64+
* Returns the JSON object as a string.
65+
*/
66+
public static String combinedParamJson(RoutingContext ctx) {
67+
JsonObject params = ctx.body().asJsonObject();
68+
if (params == null) {
69+
params = new JsonObject();
70+
}
71+
72+
// Add path parameters
73+
for (Map.Entry<String, String> entry : ctx.pathParams().entrySet()) {
74+
params.put(entry.getKey(), entry.getValue());
75+
}
76+
77+
// Add query parameters
78+
for (Map.Entry<String, String> entry : ctx.queryParams().entries()) {
79+
params.put(entry.getKey(), entry.getValue());
80+
}
81+
82+
return params.encodePrettily();
83+
}
6284

6385
/**
6486
* Creates a RoutingContext handler that maps parameters to an Input object and transforms it to Output
@@ -74,167 +96,85 @@ public static <I, O> Handler<RoutingContext> createHandler(Function<I, O> transf
7496

7597
return ctx -> {
7698
try {
77-
// Create map with path parameters
78-
Map<String, String> params = new HashMap<>(ctx.pathParams());
99+
String encodedParams = combinedParamJson(ctx);
79100

80-
// Add query parameters
81-
for (Map.Entry<String, String> entry : ctx.queryParams().entries()) {
82-
params.put(entry.getKey(), entry.getValue());
83-
}
101+
ObjectMapper mapper = new ObjectMapper();
102+
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
84103

85-
I input = createInputFromParams(params, inputClass);
104+
I input = mapper.readValue(encodedParams, inputClass);
86105
O output = transformer.apply(input);
87106

88107
String responseFormat = ctx.request().getHeader(RESPONSE_CONTENT_TYPE_HEADER);
89108
if (responseFormat == null || responseFormat.equals("application/json")) {
90-
try {
91-
TSerializer serializer = new TSerializer(new TSimpleJSONProtocol.Factory());
92-
String jsonString = serializer.toString((TBase)output);
93-
ctx.response()
94-
.setStatusCode(200)
95-
.putHeader("content-type", JSON_TYPE_VALUE)
96-
.end(jsonString);
97-
} catch (TException e) {
98-
LOGGER.error("Failed to serialize response", e);
99-
throw new RuntimeException(e);
100-
} catch (Exception e) {
101-
LOGGER.error("Unexpected error during serialization", e);
102-
throw new RuntimeException(e);
103-
}
109+
String outputJson = outputToJson(ctx, output);
110+
ctx.response()
111+
.setStatusCode(200)
112+
.putHeader("content-type", JSON_TYPE_VALUE)
113+
.end(outputJson);
104114
} else {
105-
if (!responseFormat.equals(TBINARY_B64_TYPE_VALUE)) {
106-
throw new IllegalArgumentException(String.format("Unsupported response-content-type: %s. Supported values are: %s and %s", responseFormat, JSON_TYPE_VALUE, TBINARY_B64_TYPE_VALUE));
107-
}
108-
109-
// Verify output is a Thrift object before casting
110-
if (!(output instanceof TBase)) {
111-
throw new IllegalArgumentException("Output must be a Thrift object for binary serialization");
112-
}
113-
TBase<?, TFieldIdEnum> tb = (TBase<?, TFieldIdEnum>) output;
114-
// Serialize output to Thrift BinaryProtocol
115-
byte[] serializedOutput = binarySerializer.get().serialize(tb);
116-
String responseBase64 = base64Encoder.get().encodeToString(serializedOutput);
117-
115+
String responseBase64 = convertToTBinaryB64(responseFormat, output);
118116
ctx.response().setStatusCode(200).putHeader("content-type", TBINARY_B64_TYPE_VALUE).end(responseBase64);
119117
}
118+
120119
} catch (IllegalArgumentException ex) {
121120
LOGGER.error("Incorrect arguments passed for handler creation", ex);
122-
ctx.response().setStatusCode(400).putHeader("content-type", "application/json").end(new JsonObject().put("error", ex.getMessage()).encode());
121+
ctx.response()
122+
.setStatusCode(400)
123+
.putHeader("content-type", "application/json")
124+
.end(toErrorPayload(ex));
123125
} catch (Exception ex) {
124126
LOGGER.error("Internal error occurred during handler creation", ex);
125-
ctx.response().setStatusCode(500).putHeader("content-type", "application/json").end(new JsonObject().put("error", ex.getMessage()).encode());
127+
ctx.response()
128+
.setStatusCode(500)
129+
.putHeader("content-type", "application/json")
130+
.end(toErrorPayload(ex));
126131
}
127132
};
128133
}
129134

130-
public static <I> I createInputFromParams(Map<String, String> params, Class<I> inputClass) throws Exception {
131-
// Create new instance using no-args constructor
132-
I input = inputClass.getDeclaredConstructor().newInstance();
133-
134-
135-
Map<String, Method> setters = SETTER_CACHE.computeIfAbsent(inputClass, cls ->
136-
Arrays.stream(cls.getMethods())
137-
.filter(RouteHandlerWrapper::isSetter)
138-
.collect(Collectors.toMap(RouteHandlerWrapper::getFieldNameFromSetter, method -> method))
139-
);
140-
141-
// Find and invoke setters for matching parameters
142-
for (Map.Entry<String, String> param : params.entrySet()) {
143-
Method setter = setters.get(param.getKey());
144-
if (setter != null) {
145-
String paramValue = param.getValue();
146-
147-
if (paramValue != null) {
148-
Type paramType = setter.getGenericParameterTypes()[0];
149-
Object convertedValue = convertValue(paramValue, paramType);
150-
setter.invoke(input, convertedValue);
151-
}
152-
}
135+
private static <O> String convertToTBinaryB64(String responseFormat, O output) throws TException {
136+
if (!responseFormat.equals(TBINARY_B64_TYPE_VALUE)) {
137+
throw new IllegalArgumentException(String.format("Unsupported response-content-type: %s. Supported values are: %s and %s", responseFormat, JSON_TYPE_VALUE, TBINARY_B64_TYPE_VALUE));
153138
}
154139

155-
return input;
156-
}
157-
158-
private static boolean isSetter(Method method) {
159-
return method.getName().startsWith("set") && !method.getName().endsWith("IsSet") && method.getParameterCount() == 1 && (method.getReturnType() == void.class || method.getReturnType() == method.getDeclaringClass());
160-
}
161-
162-
private static String getFieldNameFromSetter(Method method) {
163-
String methodName = method.getName();
164-
String fieldName = methodName.substring(3); // Remove "set"
165-
return fieldName.substring(0, 1).toLowerCase() + fieldName.substring(1);
166-
}
167-
168-
private static Object convertValue(String value, Type targetType) { // Changed parameter to Type
169-
// Handle Class types
170-
if (targetType instanceof Class) {
171-
Class<?> targetClass = (Class<?>) targetType;
172-
173-
if (targetClass == String.class) {
174-
return value;
175-
} else if (targetClass == Integer.class || targetClass == int.class) {
176-
return Integer.parseInt(value);
177-
} else if (targetClass == Long.class || targetClass == long.class) {
178-
return Long.parseLong(value);
179-
} else if (targetClass == Double.class || targetClass == double.class) {
180-
return Double.parseDouble(value);
181-
} else if (targetClass == Boolean.class || targetClass == boolean.class) {
182-
return Boolean.parseBoolean(value);
183-
} else if (targetClass == Float.class || targetClass == float.class) {
184-
return Float.parseFloat(value);
185-
} else if (targetClass.isEnum()) {
186-
try {
187-
// Try custom fromString method first
188-
Method fromString = targetClass.getMethod("fromString", String.class);
189-
Object result = fromString.invoke(null, value);
190-
if (value != null && result == null) {
191-
throw new IllegalArgumentException(String.format("Invalid enum value %s for type %s", value, targetClass.getSimpleName()));
192-
}
193-
return result;
194-
} catch (NoSuchMethodException e) {
195-
// Fall back to standard enum valueOf
196-
return Enum.valueOf(targetClass.asSubclass(Enum.class), value.toUpperCase());
197-
} catch (Exception e) {
198-
throw new IllegalArgumentException(String.format("Error converting %s to enum type %s : %s", value, targetClass.getSimpleName(), e.getMessage()));
199-
}
200-
}
140+
// Verify output is a Thrift object before casting
141+
if (!(output instanceof TBase)) {
142+
throw new IllegalArgumentException("Output must be a Thrift object for binary serialization");
201143
}
144+
TBase<?, TFieldIdEnum> tb = (TBase<?, TFieldIdEnum>) output;
145+
// Serialize output to Thrift BinaryProtocol
146+
byte[] serializedOutput = binarySerializer.get().serialize(tb);
147+
String responseBase64 = base64Encoder.get().encodeToString(serializedOutput);
148+
return responseBase64;
149+
}
202150

203-
// Handle parameterized types (List, Map)
204-
if (targetType instanceof ParameterizedType) {
205-
ParameterizedType parameterizedType = (ParameterizedType) targetType;
206-
Class<?> rawType = (Class<?>) parameterizedType.getRawType();
207-
208-
// Handle List types
209-
if (List.class.isAssignableFrom(rawType)) {
210-
Type elementType = parameterizedType.getActualTypeArguments()[0];
211-
return Arrays
212-
.stream(value.split(","))
213-
.map(v -> convertValue(v.trim(), elementType))
214-
.collect(Collectors.toList());
215-
}
216-
217-
// Handle Map types
218-
if (Map.class.isAssignableFrom(rawType)) {
219-
Type keyType = parameterizedType.getActualTypeArguments()[0];
220-
Type valueType = parameterizedType.getActualTypeArguments()[1];
221-
return Arrays
222-
.stream(value.split(","))
223-
.map(entry -> entry.split(":"))
224-
.filter(kv -> {
225-
if (kv.length != 2) {
226-
throw new IllegalArgumentException("Invalid map entry format. Expected 'key:value' but got: " + String.join(":", kv));
227-
}
228-
return true;
229-
})
230-
.collect(Collectors.toMap(
231-
kv -> convertValue(kv[0].trim(), keyType),
232-
kv -> convertValue(kv[1].trim(), valueType)
233-
));
151+
private static <O> String outputToJson(RoutingContext ctx, O output) {
152+
try {
153+
String jsonString;
154+
if (output instanceof TBase) {
155+
// For Thrift objects, use TSerializer
156+
TSerializer serializer = new TSerializer(new TSimpleJSONProtocol.Factory());
157+
jsonString = serializer.toString((TBase) output);
158+
} else {
159+
// For regular Java objects, use Vertx's JSON support
160+
JsonObject jsonObject = new JsonObject(Json.encode(output));
161+
jsonString = jsonObject.encode();
234162
}
163+
return jsonString;
164+
} catch (TException e) {
165+
LOGGER.error("Failed to serialize response", e);
166+
throw new RuntimeException(e);
167+
} catch (Exception e) {
168+
LOGGER.error("Unexpected error during serialization", e);
169+
throw new RuntimeException(e);
235170
}
171+
}
236172

237-
throw new IllegalArgumentException("Unsupported type: " + targetType.getTypeName());
173+
public static String toErrorPayload(Throwable throwable) {
174+
StringWriter sw = new StringWriter();
175+
PrintWriter pw = new PrintWriter(sw, true);
176+
throwable.printStackTrace(pw);
177+
return new JsonObject().put("error", sw.getBuffer().toString()).encode();
238178
}
239179
}
240180

0 commit comments

Comments
 (0)