Skip to content

Commit 018fe9e

Browse files
doc(udf): improve UDF documentation (risingwavelabs#8597)
Signed-off-by: Runji Wang <[email protected]>
1 parent 2eaea50 commit 018fe9e

File tree

5 files changed

+138
-26
lines changed

5 files changed

+138
-26
lines changed

src/udf/README.md

Lines changed: 0 additions & 20 deletions
This file was deleted.

src/udf/python/README.md

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# RisingWave Python API
2+
3+
This library provides a Python API for creating user-defined functions (UDF) in RisingWave.
4+
5+
Currently, RisingWave supports user-defined functions implemented as external functions.
6+
Users need to define functions using the API provided by this library, and then start a Python process as a UDF server.
7+
RisingWave calls the function remotely by accessing the UDF server at a given address.
8+
9+
## Installation
10+
11+
```sh
12+
pip install risingwave
13+
```
14+
15+
## Usage
16+
17+
Define functions in a Python file:
18+
19+
```python
20+
# udf.py
21+
from risingwave.udf import udf, udtf, UdfServer
22+
23+
# Define a scalar function
24+
@udf(input_types=['INT', 'INT'], result_type='INT')
25+
def gcd(x, y):
26+
while y != 0:
27+
(x, y) = (y, x % y)
28+
return x
29+
30+
# Define a table function
31+
@udtf(input_types='INT', result_types='INT')
32+
def series(n):
33+
for i in range(n):
34+
yield i
35+
36+
# Start a UDF server
37+
if __name__ == '__main__':
38+
server = UdfServer(location="0.0.0.0:8815")
39+
server.add_function(gcd)
40+
server.add_function(series)
41+
server.serve()
42+
```
43+
44+
Start the UDF server:
45+
46+
```sh
47+
python3 udf.py
48+
```
49+
50+
To create functions in RisingWave, use the following syntax:
51+
52+
```sql
53+
create function <name> ( <arg_type>[, ...] )
54+
[ returns <ret_type> | returns table ( <column_name> <column_type> [, ...] ) ]
55+
language python as <name_defined_in_server>
56+
using link '<udf_server_address>';
57+
```
58+
59+
- The `language` parameter must be set to `python`.
60+
- The `as` parameter specifies the function name defined in the UDF server.
61+
- The `link` parameter specifies the address of the UDF server.
62+
63+
For example:
64+
65+
```sql
66+
create function gcd(int, int) returns int
67+
language python as gcd using link 'http://localhost:8815';
68+
69+
create function series(int) returns table (x int)
70+
language python as series using link 'http://localhost:8815';
71+
72+
select gcd(25, 15);
73+
74+
select * from series(10);
75+
```

src/udf/python/example.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def series2(n: int) -> Iterator[tuple[int, str]]:
3333

3434

3535
if __name__ == '__main__':
36-
server = UdfServer()
36+
server = UdfServer(location="0.0.0.0:8815")
3737
server.add_function(random_int)
3838
server.add_function(gcd)
3939
server.add_function(gcd3)

src/udf/python/risingwave/udf.py

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,21 @@ def udf(input_types: Union[List[Union[str, pa.DataType]], Union[str, pa.DataType
119119
result_type: Union[str, pa.DataType],
120120
name: Optional[str] = None,) -> Union[Callable, UserDefinedFunction]:
121121
"""
122-
Annotation for creating a user-defined function.
122+
Annotation for creating a user-defined scalar function.
123+
124+
Parameters:
125+
- input_types: A list of strings or Arrow data types that specifies the input data types.
126+
- result_type: A string or an Arrow data type that specifies the return value type.
127+
- name: An optional string specifying the function name. If not provided, the original name will be used.
128+
129+
Example:
130+
```
131+
@udf(input_types=['INT', 'INT'], result_type='INT')
132+
def gcd(x, y):
133+
while y != 0:
134+
(x, y) = (y, x % y)
135+
return x
136+
```
123137
"""
124138

125139
return lambda f: UserDefinedScalarFunctionWrapper(f, input_types, result_type, name)
@@ -130,20 +144,42 @@ def udtf(input_types: Union[List[Union[str, pa.DataType]], Union[str, pa.DataTyp
130144
name: Optional[str] = None,) -> Union[Callable, UserDefinedFunction]:
131145
"""
132146
Annotation for creating a user-defined table function.
147+
148+
Parameters:
149+
- input_types: A list of strings or Arrow data types that specifies the input data types.
150+
- result_types A list of strings or Arrow data types that specifies the return value types.
151+
- name: An optional string specifying the function name. If not provided, the original name will be used.
152+
153+
Example:
154+
```
155+
@udtf(input_types='INT', result_types='INT')
156+
def series(n):
157+
for i in range(n):
158+
yield i
159+
```
133160
"""
134161

135162
return lambda f: UserDefinedTableFunctionWrapper(f, input_types, result_types, name)
136163

137164

138165
class UdfServer(pa.flight.FlightServerBase):
139166
"""
140-
UDF server based on Apache Arrow Flight protocol.
141-
Reference: https://arrow.apache.org/cookbook/py/flight.html#simple-parquet-storage-service-with-arrow-flight
167+
A server that provides user-defined functions to clients.
168+
169+
Example:
170+
```
171+
server = UdfServer(location="0.0.0.0:8815")
172+
server.add_function(my_udf)
173+
server.serve()
174+
```
142175
"""
176+
# UDF server based on Apache Arrow Flight protocol.
177+
# Reference: https://arrow.apache.org/cookbook/py/flight.html#simple-parquet-storage-service-with-arrow-flight
178+
143179
_functions: Dict[str, UserDefinedFunction]
144180

145-
def __init__(self, location="grpc://0.0.0.0:8815", **kwargs):
146-
super(UdfServer, self).__init__(location, **kwargs)
181+
def __init__(self, location="0.0.0.0:8815", **kwargs):
182+
super(UdfServer, self).__init__('grpc://' + location, **kwargs)
147183
self._functions = {}
148184

149185
def get_flight_info(self, context, descriptor):

src/udf/python/setup.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from setuptools import find_packages, setup
2+
3+
with open("README.md", "r") as fh:
4+
long_description = fh.read()
5+
6+
setup(
7+
name="risingwave",
8+
version="0.0.2",
9+
author="RisingWave Labs",
10+
description="RisingWave Python API",
11+
long_description=long_description,
12+
long_description_content_type='text/markdown',
13+
url="https://github.com/risingwavelabs/risingwave",
14+
packages=find_packages(),
15+
classifiers=[
16+
"Programming Language :: Python",
17+
"License :: OSI Approved :: Apache Software License"
18+
],
19+
python_requires=">=3.10",
20+
install_requires=['pyarrow'],
21+
)

0 commit comments

Comments
 (0)