Skip to content

Commit f983bfa

Browse files
zhenzhongxugforsyth
authored andcommitted
fix(flink): implement TypeMapper and SchemaMapper for Flink backend
1 parent 1413de9 commit f983bfa

File tree

4 files changed

+100
-42
lines changed

4 files changed

+100
-42
lines changed

ibis/backends/flink/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,14 @@
1212
from ibis.backends.base import BaseBackend, CanCreateDatabase
1313
from ibis.backends.base.sql.ddl import fully_qualified_re, is_fully_qualified
1414
from ibis.backends.flink.compiler.core import FlinkCompiler
15+
from ibis.backends.flink.datatypes import FlinkRowSchema
1516
from ibis.backends.flink.ddl import (
1617
CreateDatabase,
1718
CreateTableFromConnector,
1819
DropDatabase,
1920
DropTable,
2021
InsertSelect,
2122
)
22-
from ibis.backends.flink.utils import ibis_schema_to_flink_schema
2323

2424
if TYPE_CHECKING:
2525
from collections.abc import Mapping
@@ -354,7 +354,7 @@ def create_table(
354354
obj = obj.to_pandas()
355355
if isinstance(obj, pd.DataFrame):
356356
table = self._table_env.from_pandas(
357-
obj, ibis_schema_to_flink_schema(schema)
357+
obj, FlinkRowSchema.from_ibis(schema)
358358
)
359359
if isinstance(obj, ir.Table):
360360
table = obj

ibis/backends/flink/datatypes.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
from __future__ import annotations
2+
3+
import pyflink.table.types as fl
4+
5+
import ibis.expr.datatypes as dt
6+
import ibis.expr.schema as sch
7+
from ibis.formats import SchemaMapper, TypeMapper
8+
9+
10+
class FlinkRowSchema(SchemaMapper):
11+
@classmethod
12+
def from_ibis(cls, schema: sch.Schema | None) -> list[fl.RowType]:
13+
if schema is None:
14+
return None
15+
16+
return fl.DataTypes.ROW(
17+
[
18+
fl.DataTypes.FIELD(k, FlinkType.from_ibis(v))
19+
for k, v in schema.fields.items()
20+
]
21+
)
22+
23+
24+
class FlinkType(TypeMapper):
25+
@classmethod
26+
def to_ibis(cls, typ: fl.DataType, nullable=True) -> dt.DataType:
27+
"""Convert a flink type to an ibis type."""
28+
if typ == fl.DataTypes.STRING():
29+
return dt.String(nullable=nullable)
30+
elif typ == fl.DataTypes.BOOLEAN():
31+
return dt.Boolean(nullable=nullable)
32+
elif typ == fl.DataTypes.BYTES():
33+
return dt.Binary(nullable=nullable)
34+
elif typ == fl.DataTypes.TINYINT():
35+
return dt.Int8(nullable=nullable)
36+
elif typ == fl.DataTypes.SMALLINT():
37+
return dt.Int16(nullable=nullable)
38+
elif typ == fl.DataTypes.INT():
39+
return dt.Int32(nullable=nullable)
40+
elif typ == fl.DataTypes.BIGINT():
41+
return dt.Int64(nullable=nullable)
42+
elif typ == fl.DataTypes.FLOAT():
43+
return dt.Float32(nullable=nullable)
44+
elif typ == fl.DataTypes.DOUBLE():
45+
return dt.Float64(nullable=nullable)
46+
elif typ == fl.DataTypes.DATE():
47+
return dt.Date(nullable=nullable)
48+
elif typ == fl.DataTypes.TIME():
49+
return dt.Time(nullable=nullable)
50+
elif typ == fl.DataTypes.TIMESTAMP():
51+
return dt.Timestamp(nullable=nullable)
52+
else:
53+
return super().to_ibis(typ, nullable=nullable)
54+
55+
@classmethod
56+
def from_ibis(cls, dtype: dt.DataType) -> fl.DataType:
57+
"""Convert an ibis type to a flink type."""
58+
if dtype.is_string():
59+
return fl.DataTypes.STRING()
60+
elif dtype.is_boolean():
61+
return fl.DataTypes.BOOLEAN()
62+
elif dtype.is_binary():
63+
return fl.DataTypes.BYTES()
64+
elif dtype.is_int8():
65+
return fl.DataTypes.TINYINT()
66+
elif dtype.is_int16():
67+
return fl.DataTypes.SMALLINT()
68+
elif dtype.is_int32():
69+
return fl.DataTypes.INT()
70+
elif dtype.is_int64():
71+
return fl.DataTypes.BIGINT()
72+
elif dtype.is_uint8():
73+
return fl.DataTypes.TINYINT()
74+
elif dtype.is_uint16():
75+
return fl.DataTypes.SMALLINT()
76+
elif dtype.is_uint32():
77+
return fl.DataTypes.INT()
78+
elif dtype.is_uint64():
79+
return fl.DataTypes.BIGINT()
80+
elif dtype.is_float16():
81+
return fl.DataTypes.FLOAT()
82+
elif dtype.is_float32():
83+
return fl.DataTypes.FLOAT()
84+
elif dtype.is_float64():
85+
return fl.DataTypes.DOUBLE()
86+
elif dtype.is_date():
87+
return fl.DataTypes.DATE()
88+
elif dtype.is_time():
89+
return fl.DataTypes.TIME()
90+
elif dtype.is_timestamp():
91+
return fl.DataTypes.TIMESTAMP()
92+
else:
93+
return super().from_ibis(dtype)

ibis/backends/flink/registry.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
operation_registry as base_operation_registry,
1010
)
1111
from ibis.backends.base.sql.registry.main import varargs
12+
from ibis.backends.flink.datatypes import FlinkType
1213
from ibis.common.temporal import TimestampUnit
1314

1415
if TYPE_CHECKING:
@@ -221,8 +222,6 @@ def _window(translator: ExprTranslator, op: ops.Node) -> str:
221222

222223

223224
def _clip(translator: ExprTranslator, op: ops.Node) -> str:
224-
from ibis.backends.flink.utils import _to_pyflink_types
225-
226225
arg = translator.translate(op.arg)
227226

228227
if op.upper is not None:
@@ -233,7 +232,7 @@ def _clip(translator: ExprTranslator, op: ops.Node) -> str:
233232
lower = translator.translate(op.lower)
234233
arg = f"IF({arg} < {lower} AND {arg} IS NOT NULL, {lower}, {arg})"
235234

236-
return f"CAST({arg} AS {_to_pyflink_types[type(op.dtype)]!s})"
235+
return f"CAST({arg} AS {FlinkType.from_ibis(op.dtype)!s})"
237236

238237

239238
def _floor_divide(translator: ExprTranslator, op: ops.Node) -> str:

ibis/backends/flink/utils.py

Lines changed: 3 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,9 @@
55
from abc import ABC, abstractmethod
66
from collections import defaultdict
77

8-
from pyflink.table.types import DataTypes, RowType
9-
108
import ibis.expr.datatypes as dt
119
import ibis.expr.operations as ops
12-
import ibis.expr.schema as sch
10+
from ibis.backends.flink.datatypes import FlinkType
1311
from ibis.common.temporal import IntervalUnit
1412
from ibis.util import convert_unit
1513

@@ -247,35 +245,14 @@ def _translate_interval(value, dtype):
247245
return interval.format_as_string()
248246

249247

250-
_to_pyflink_types = {
251-
dt.String: DataTypes.STRING(),
252-
dt.Boolean: DataTypes.BOOLEAN(),
253-
dt.Binary: DataTypes.BYTES(),
254-
dt.Int8: DataTypes.TINYINT(),
255-
dt.Int16: DataTypes.SMALLINT(),
256-
dt.Int32: DataTypes.INT(),
257-
dt.Int64: DataTypes.BIGINT(),
258-
dt.UInt8: DataTypes.TINYINT(),
259-
dt.UInt16: DataTypes.SMALLINT(),
260-
dt.UInt32: DataTypes.INT(),
261-
dt.UInt64: DataTypes.BIGINT(),
262-
dt.Float16: DataTypes.FLOAT(),
263-
dt.Float32: DataTypes.FLOAT(),
264-
dt.Float64: DataTypes.DOUBLE(),
265-
dt.Date: DataTypes.DATE(),
266-
dt.Time: DataTypes.TIME(),
267-
dt.Timestamp: DataTypes.TIMESTAMP(),
268-
}
269-
270-
271248
def translate_literal(op: ops.Literal) -> str:
272249
value = op.value
273250
dtype = op.dtype
274251

275252
if value is None:
276253
if dtype.is_null():
277254
return "NULL"
278-
return f"CAST(NULL AS {_to_pyflink_types[type(dtype)]!s})"
255+
return f"CAST(NULL AS {FlinkType.from_ibis(dtype)!s})"
279256

280257
if dtype.is_boolean():
281258
# TODO(chloeh13q): Flink supports a third boolean called "UNKNOWN"
@@ -305,7 +282,7 @@ def translate_literal(op: ops.Literal) -> str:
305282
raise ValueError("The precision can be up to 38 in Flink")
306283

307284
return f"CAST({value} AS DECIMAL({precision}, {scale}))"
308-
return f"CAST({value} AS {_to_pyflink_types[type(dtype)]!s})"
285+
return f"CAST({value} AS {FlinkType.from_ibis(dtype)!s})"
309286
elif dtype.is_timestamp():
310287
# TODO(chloeh13q): support timestamp with local timezone
311288
if isinstance(value, datetime.datetime):
@@ -327,14 +304,3 @@ def translate_literal(op: ops.Literal) -> str:
327304
return f"ARRAY{list(value)}"
328305

329306
raise NotImplementedError(f"No translation rule for {dtype}")
330-
331-
332-
def ibis_schema_to_flink_schema(schema: sch.Schema) -> RowType:
333-
if schema is None:
334-
return None
335-
return DataTypes.ROW(
336-
[
337-
DataTypes.FIELD(key, _to_pyflink_types[type(value)])
338-
for key, value in schema.fields.items()
339-
]
340-
)

0 commit comments

Comments
 (0)