Skip to content

Commit 057fabc

Browse files
chloeh13qcpcloud
authored andcommitted
feat(flink): implement nested schema support
1 parent bdde3a4 commit 057fabc

File tree

5 files changed

+44
-12
lines changed

5 files changed

+44
-12
lines changed

ibis/backends/flink/ddl.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
format_partition,
1616
is_fully_qualified,
1717
)
18-
from ibis.backends.base.sql.registry import quote_identifier, type_to_sql_string
18+
from ibis.backends.base.sql.registry import quote_identifier
19+
from ibis.backends.flink.registry import type_to_sql_string
1920

2021
if TYPE_CHECKING:
2122
from ibis.api import Watermark

ibis/backends/flink/registry.py

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,13 @@
44

55
import ibis.common.exceptions as com
66
import ibis.expr.operations as ops
7-
from ibis.backends.base.sql.registry import aggregate, fixed_arity, helpers, unary
7+
from ibis.backends.base.sql.registry import (
8+
aggregate,
9+
fixed_arity,
10+
helpers,
11+
quote_identifier,
12+
unary,
13+
)
814
from ibis.backends.base.sql.registry import (
915
operation_registry as base_operation_registry,
1016
)
@@ -17,6 +23,12 @@
1723
operation_registry = base_operation_registry.copy()
1824

1925

26+
def type_to_sql_string(tval):
27+
if tval.is_array():
28+
return f"array<{helpers.type_to_sql_string(tval.value_type)}>"
29+
return helpers.type_to_sql_string(tval)
30+
31+
2032
def _not(translator: ExprTranslator, op: ops.Node) -> str:
2133
formatted_arg = translator.translate(op.arg)
2234
if helpers.needs_parens(op.arg):
@@ -61,10 +73,11 @@ def _cast(translator: ExprTranslator, op: ops.generic.Cast) -> str:
6173
return f"CAST({arg_translated} AS date)"
6274
elif to.is_json():
6375
return arg_translated
64-
65-
from ibis.backends.base.sql.registry.main import cast
66-
67-
return cast(translator=translator, op=op)
76+
elif op.arg.dtype.is_temporal() and op.to.is_int64():
77+
return f"1000000 * unix_timestamp({arg_translated})"
78+
else:
79+
sql_type = type_to_sql_string(op.to)
80+
return f"CAST({arg_translated} AS {sql_type})"
6881

6982

7083
def _left_op_right(translator: ExprTranslator, op_node: ops.Node, op_sign: str) -> str:
@@ -96,7 +109,7 @@ def _try_cast(translator: ExprTranslator, op: ops.Node) -> str:
96109
# It's recommended to use UNIX_TIMESTAMP(CAST(timestamp_col AS STRING)) instead.
97110
return f"UNIX_TIMESTAMP(TRY_CAST({arg_formatted} AS STRING))"
98111
else:
99-
sql_type = helpers.type_to_sql_string(op.to)
112+
sql_type = type_to_sql_string(op.to)
100113
return f"TRY_CAST({arg_formatted} AS {sql_type})"
101114

102115

@@ -382,6 +395,11 @@ def _timestamp_from_ymdhms(
382395
return f"CAST({concat_string} AS TIMESTAMP)"
383396

384397

398+
def _struct_field(translator, op):
399+
arg = translator.translate(op.arg)
400+
return f"{arg}.{quote_identifier(op.field, force=True)}"
401+
402+
385403
operation_registry.update(
386404
{
387405
# Unary operations
@@ -444,6 +462,7 @@ def _timestamp_from_ymdhms(
444462
ops.TimestampFromUNIX: _timestamp_from_unix,
445463
ops.TimestampFromYMDHMS: _timestamp_from_ymdhms,
446464
ops.TimestampSub: _timestamp_sub,
465+
ops.StructField: _struct_field,
447466
}
448467
)
449468

ibis/backends/flink/tests/conftest.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010

1111

1212
class TestConf(BackendTest):
13-
supports_structs = False
1413
force_sort = True
1514
deps = "pandas", "pyflink"
1615

@@ -50,13 +49,14 @@ def connect(*, tmpdir, worker_id, **kw: Any):
5049
def _load_data(self, **_: Any) -> None:
5150
import pandas as pd
5251

53-
from ibis.backends.tests.data import json_types
52+
from ibis.backends.tests.data import json_types, struct_types
5453

5554
for table_name in TEST_TABLES:
5655
path = self.data_dir / "parquet" / f"{table_name}.parquet"
5756
self.connection.create_table(table_name, pd.read_parquet(path))
5857

5958
self.connection.create_table("json_t", json_types)
59+
self.connection.create_table("struct", struct_types)
6060

6161

6262
class TestConfForStreaming(TestConf):

ibis/backends/tests/test_struct.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
pytestmark = [
1414
pytest.mark.never(["mysql", "sqlite", "mssql"], reason="No struct support"),
1515
pytest.mark.notyet(["impala"]),
16-
pytest.mark.notimpl(["datafusion", "druid", "oracle", "flink"]),
16+
pytest.mark.notimpl(["datafusion", "druid", "oracle"]),
1717
]
1818

1919

@@ -55,6 +55,9 @@ def test_all_fields(struct, struct_df):
5555

5656
@pytest.mark.notimpl(["postgres"])
5757
@pytest.mark.parametrize("field", ["a", "b", "c"])
58+
@pytest.mark.notyet(
59+
["flink"], reason="flink doesn't support creating struct columns from literals"
60+
)
5861
def test_literal(backend, con, field):
5962
query = _STRUCT_LITERAL[field]
6063
dtype = query.type().to_pandas()
@@ -69,6 +72,9 @@ def test_literal(backend, con, field):
6972
@pytest.mark.notyet(
7073
["clickhouse"], reason="clickhouse doesn't support nullable nested types"
7174
)
75+
@pytest.mark.notyet(
76+
["flink"], reason="flink doesn't support creating struct columns from literals"
77+
)
7278
def test_null_literal(backend, con, field):
7379
query = _NULL_STRUCT_LITERAL[field]
7480
result = pd.Series([con.execute(query)])
@@ -78,6 +84,9 @@ def test_null_literal(backend, con, field):
7884

7985

8086
@pytest.mark.notimpl(["dask", "pandas", "postgres"])
87+
@pytest.mark.notyet(
88+
["flink"], reason="flink doesn't support creating struct columns from literals"
89+
)
8190
def test_struct_column(backend, alltypes, df):
8291
t = alltypes
8392
expr = ibis.struct(dict(a=t.string_col, b=1, c=t.bigint_col)).name("s")
@@ -91,6 +100,9 @@ def test_struct_column(backend, alltypes, df):
91100

92101

93102
@pytest.mark.notimpl(["dask", "pandas", "postgres", "polars"])
103+
@pytest.mark.notyet(
104+
["flink"], reason="flink doesn't support creating struct columns from collect"
105+
)
94106
def test_collect_into_struct(alltypes):
95107
from ibis import _
96108

ibis/backends/tests/test_timecontext.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,8 @@ def test_context_adjustment_filter_before_window(
123123
@pytest.mark.notimpl(["duckdb", "pyspark"])
124124
@pytest.mark.notimpl(
125125
["flink"],
126-
raises=com.OperationNotDefinedError,
127-
reason="No translation rule for <class 'ibis.expr.operations.structs.StructField'>",
126+
raises=com.UnsupportedOperationError,
127+
reason="Flink engine does not support generic window clause with no order by",
128128
)
129129
def test_context_adjustment_multi_col_udf_non_grouped(
130130
backend, alltypes, context, monkeypatch

0 commit comments

Comments
 (0)