Skip to content

Commit 684963f

Browse files
committed
add code docs
1 parent 5389f16 commit 684963f

File tree

6 files changed

+195
-16
lines changed

6 files changed

+195
-16
lines changed

src/udf/java/src/main/java/com/risingwave/functions/FunctionWrapper.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,22 +11,40 @@
1111
import java.util.Iterator;
1212
import java.util.function.Function;
1313

14+
/**
15+
* Base class for a batch-processing user-defined function.
16+
*/
1417
abstract class UserDefinedFunctionBatch {
1518
protected Schema inputSchema;
1619
protected Schema outputSchema;
1720
protected BufferAllocator allocator;
1821

19-
public Schema getInputSchema() {
22+
/**
23+
* Get the input schema of the function.
24+
*/
25+
Schema getInputSchema() {
2026
return inputSchema;
2127
}
2228

23-
public Schema getOutputSchema() {
29+
/**
30+
* Get the output schema of the function.
31+
*/
32+
Schema getOutputSchema() {
2433
return outputSchema;
2534
}
2635

36+
/**
37+
* Evaluate the function by processing a batch of input data.
38+
*
39+
* @param batch the input data batch to process
40+
* @return an iterator over the output data batches
41+
*/
2742
abstract Iterator<VectorSchemaRoot> evalBatch(VectorSchemaRoot batch);
2843
}
2944

45+
/**
46+
* Batch-processing wrapper over a user-defined scalar function.
47+
*/
3048
class ScalarFunctionBatch extends UserDefinedFunctionBatch {
3149
ScalarFunction function;
3250
Method method;
@@ -66,6 +84,9 @@ Iterator<VectorSchemaRoot> evalBatch(VectorSchemaRoot batch) {
6684

6785
}
6886

87+
/**
88+
* Batch-processing wrapper over a user-defined table function.
89+
*/
6990
class TableFunctionBatch extends UserDefinedFunctionBatch {
7091
TableFunction<?> function;
7192
Method method;
@@ -131,8 +152,14 @@ Iterator<VectorSchemaRoot> evalBatch(VectorSchemaRoot batch) {
131152
}
132153
}
133154

155+
/**
156+
* Utility class for reflection.
157+
*/
134158
class Reflection {
135-
static Method getEvalMethod(Object obj) {
159+
/**
160+
* Get the method named <code>eval</code>.
161+
*/
162+
static Method getEvalMethod(UserDefinedFunction obj) {
136163
var methods = new ArrayList<Method>();
137164
for (Method method : obj.getClass().getDeclaredMethods()) {
138165
if (method.getName().equals("eval")) {
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,46 @@
11
package com.risingwave.functions;
22

3+
/**
4+
* Base class for a user-defined scalar function. A user-defined scalar function
5+
* maps zero, one, or multiple scalar values to a new scalar value.
6+
*
7+
* <p>
8+
* The behavior of a {@link ScalarFunction} can be defined by implementing a
9+
* custom evaluation method. An evaluation method must be declared publicly and
10+
* named <code>eval</code>. Multiple overloaded methods named <code>eval</code>
11+
* are not supported yet.
12+
*
13+
* <p>
14+
* By default, input and output data types are automatically extracted using
15+
* reflection.
16+
*
17+
* <p>
18+
* The following examples show how to specify a scalar function:
19+
*
20+
* <pre>
21+
* {@code
22+
* // a function that accepts two INT arguments and computes a sum
23+
* class SumFunction extends ScalarFunction {
24+
* public Integer eval(Integer a, Integer b) {
25+
* return a + b;
26+
* }
27+
* }
28+
*
29+
* // a function that returns a struct type
30+
* class StructFunction extends ScalarFunction {
31+
* public static class KeyValue {
32+
* public String key;
33+
* public int value;
34+
* }
35+
*
36+
* public KeyValue eval(int a) {
37+
* KeyValue kv = new KeyValue();
38+
* kv.key = a.toString();
39+
* kv.value = a;
40+
* return kv;
41+
* }
42+
* }
43+
* }</pre>
44+
*/
345
public abstract class ScalarFunction extends UserDefinedFunction {
446
}

src/udf/java/src/main/java/com/risingwave/functions/TableFunction.java

Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,24 +3,77 @@
33
import java.util.ArrayList;
44
import java.util.List;
55

6+
/**
7+
* Base class for a user-defined table function. A user-defined table function
8+
* maps zero, one, or multiple scalar values to zero, one, or multiple rows (or
9+
* structured types). If an output record consists of only one field, the
10+
* structured record can be omitted, and a scalar value can be emitted that will
11+
* be implicitly wrapped into a row by the runtime.
12+
*
13+
* <p>
14+
* The behavior of a {@link TableFunction} can be defined by implementing a
15+
* custom evaluation method. An evaluation method must be declared publicly, not
16+
* static, and named <code>eval</code>. Multiple overloaded methods named
17+
* <code>eval</code> are not supported yet.
18+
*
19+
* <p>
20+
* By default, input and output data types are automatically extracted using
21+
* reflection. This includes the generic argument {@code T} of the class for
22+
* determining an output data type. Input arguments are derived from one or more
23+
* {@code eval()} methods.
24+
*
25+
* <p>
26+
* The following examples show how to specify a table function:
27+
*
28+
* <pre>
29+
* {@code
30+
* // a function that accepts an INT arguments and emits the range from 0 to the
31+
* // given number.
32+
* class Series extends TableFunction<Integer> {
33+
* public void eval(int x) {
34+
* for (int i = 0; i < n; i++) {
35+
* collect(i);
36+
* }
37+
* }
38+
* }
39+
*
40+
* // a function that accepts an String arguments and emits the words of the
41+
* // given string.
42+
* class Split extends TableFunction<Split.Row> {
43+
* public static class Row {
44+
* public String word;
45+
* public int length;
46+
* }
47+
*
48+
* public void eval(String str) {
49+
* for (var s : str.split(" ")) {
50+
* Row row = new Row();
51+
* row.word = s;
52+
* row.length = s.length();
53+
* collect(row);
54+
* }
55+
* }
56+
* }
57+
* }</pre>
58+
*/
659
public abstract class TableFunction<T> extends UserDefinedFunction {
760

8-
// Collector used to emit rows.
61+
/** Collector used to emit rows. */
962
private transient List<Object> rows = new ArrayList<>();
1063

11-
// Takes all emitted rows.
12-
public final Object[] take() {
64+
/** Takes all emitted rows. */
65+
final Object[] take() {
1366
var result = this.rows.toArray();
1467
this.rows.clear();
1568
return result;
1669
}
1770

18-
// Returns the number of emitted rows.
19-
public final int size() {
71+
/** Returns the number of emitted rows. */
72+
final int size() {
2073
return this.rows.size();
2174
}
2275

23-
// Emits an (implicit or explicit) output row.
76+
/** Emits an output row. */
2477
protected final void collect(T row) {
2578
this.rows.add(row);
2679
}

src/udf/java/src/main/java/com/risingwave/functions/TypeUtils.java

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@
1616
import java.util.stream.Collectors;
1717

1818
class TypeUtils {
19-
public static Field stringToField(String typeStr) {
19+
/**
20+
* Convert a string to an Arrow type.
21+
*/
22+
static Field stringToField(String typeStr) {
2023
typeStr = typeStr.toUpperCase();
2124
if (typeStr.equals("BOOLEAN") || typeStr.equals("BOOL")) {
2225
return Field.nullable("", new ArrowType.Bool());
@@ -61,7 +64,14 @@ public static Field stringToField(String typeStr) {
6164
}
6265
}
6366

64-
public static Field classToField(Class<?> param, String name) {
67+
/**
68+
* Convert a Java class to an Arrow type.
69+
*
70+
* @param param The Java class.
71+
* @param name The name of the field.
72+
* @return The Arrow type.
73+
*/
74+
static Field classToField(Class<?> param, String name) {
6575
if (param == Boolean.class || param == boolean.class) {
6676
return Field.nullable(name, new ArrowType.Bool());
6777
} else if (param == Short.class || param == short.class) {
@@ -95,20 +105,29 @@ public static Field classToField(Class<?> param, String name) {
95105
}
96106
}
97107

98-
public static Schema methodToInputSchema(Method method) {
108+
/**
109+
* Get the input schema from a Java method.
110+
*/
111+
static Schema methodToInputSchema(Method method) {
99112
var fields = new ArrayList<Field>();
100113
for (var param : method.getParameters()) {
101114
fields.add(classToField(param.getType(), param.getName()));
102115
}
103116
return new Schema(fields);
104117
}
105118

106-
public static Schema methodToOutputSchema(Method method) {
119+
/**
120+
* Get the output schema of a scalar function from a Java method.
121+
*/
122+
static Schema methodToOutputSchema(Method method) {
107123
var type = method.getReturnType();
108124
return new Schema(Arrays.asList(classToField(type, "")));
109125
}
110126

111-
public static Schema tableFunctionToOutputSchema(Class<?> type) {
127+
/**
128+
* Get the output schema of a table function from a Java class.
129+
*/
130+
static Schema tableFunctionToOutputSchema(Class<?> type) {
112131
var parameterizedType = (ParameterizedType) type.getGenericSuperclass();
113132
var typeArguments = parameterizedType.getActualTypeArguments();
114133
type = (Class<?>) typeArguments[0];
@@ -117,12 +136,18 @@ public static Schema tableFunctionToOutputSchema(Class<?> type) {
117136
return new Schema(Arrays.asList(row_index, classToField(type, "")));
118137
}
119138

120-
public static FieldVector createVector(Field field, BufferAllocator allocator, Object[] values) {
139+
/**
140+
* Create an Arrow vector from an array of values.
141+
*/
142+
static FieldVector createVector(Field field, BufferAllocator allocator, Object[] values) {
121143
var vector = field.createVector(allocator);
122144
fillVector(vector, values);
123145
return vector;
124146
}
125147

148+
/**
149+
* Fill an Arrow vector with an array of values.
150+
*/
126151
static void fillVector(FieldVector fieldVector, Object[] values) {
127152
if (fieldVector instanceof SmallIntVector) {
128153
var vector = (SmallIntVector) fieldVector;
@@ -218,7 +243,10 @@ static void fillVector(FieldVector fieldVector, Object[] values) {
218243
fieldVector.setValueCount(values.length);
219244
}
220245

221-
// Returns a function that converts the object to the correct type.
246+
/**
247+
* Return a function that converts the object get from input array to the
248+
* correct type.
249+
*/
222250
static Function<Object, Object> processFunc(Field field) {
223251
if (field.getType() instanceof ArrowType.Utf8) {
224252
// object is org.apache.arrow.vector.util.Text

src/udf/java/src/main/java/com/risingwave/functions/UdfServer.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
import org.slf4j.Logger;
1717
import org.slf4j.LoggerFactory;
1818

19+
/**
20+
* A server that exposes user-defined functions over Apache Arrow Flight.
21+
*/
1922
public class UdfServer implements AutoCloseable {
2023

2124
private FlightServer server;
@@ -32,24 +35,44 @@ public UdfServer(String host, int port) {
3235
this.producer).build();
3336
}
3437

38+
/**
39+
* Add a user-defined function to the server.
40+
*
41+
* @param name the name of the function
42+
* @param udf the function to add
43+
* @throws IllegalArgumentException if a function with the same name already
44+
* exists
45+
*/
3546
public void addFunction(String name, UserDefinedFunction udf) throws IllegalArgumentException {
3647
logger.info("added function: " + name);
3748
this.producer.addFunction(name, udf);
3849
}
3950

51+
/**
52+
* Start the server.
53+
*/
4054
public void start() throws IOException {
4155
this.server.start();
4256
logger.info("listening on " + this.server.getLocation().toSocketAddress());
4357
}
4458

59+
/**
60+
* Get the port the server is listening on.
61+
*/
4562
public int getPort() {
4663
return this.server.getPort();
4764
}
4865

66+
/**
67+
* Wait for the server to terminate.
68+
*/
4969
public void awaitTermination() throws InterruptedException {
5070
this.server.awaitTermination();
5171
}
5272

73+
/**
74+
* Close the server.
75+
*/
5376
public void close() throws InterruptedException {
5477
this.server.close();
5578
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
11
package com.risingwave.functions;
22

3+
/**
4+
* Base class for all user-defined functions.
5+
*
6+
* @see ScalarFunction
7+
* @see TableFunction
8+
*/
39
public abstract class UserDefinedFunction {
410
}

0 commit comments

Comments
 (0)