Skip to content

Commit d042402

Browse files
zhenzhongxugforsyth
authored andcommitted
fix(flink): use lazy import to prevent premature loading of pyflink during gen_matrix
1 parent 33e1a31 commit d042402

File tree

7 files changed

+53
-79
lines changed

7 files changed

+53
-79
lines changed

.github/workflows/ibis-docs-lint.yml

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -186,9 +186,6 @@ jobs:
186186
- name: checkout
187187
uses: actions/checkout@v4
188188

189-
- name: install pyflink
190-
run: nix develop --ignore-environment --keep HOME -c pip install apache-flink
191-
192189
- name: run doctest
193190
# keep HOME because duckdb (which we use for doctests) wants to use
194191
# that for extensions
@@ -203,21 +200,6 @@ jobs:
203200
- name: check that all frozen computations were done before push
204201
run: git diff --exit-code --stat
205202

206-
- name: ls links 1
207-
run: ls /home/runner/work/ibis/ibis/docs/_output/backends/
208-
209-
- name: ls links 2
210-
run: ls /home/runner/work/ibis/ibis/docs/_output/
211-
212-
- name: ls links 3
213-
run: ls /home/runner/work/ibis/ibis/docs/
214-
215-
- name: support
216-
run: nix develop --ignore-environment -c python ./gen_matrix.py
217-
218-
- name: redirect links
219-
run: python ./gen_redirects.py
220-
221203
- name: verify internal links
222204
run: nix develop --ignore-environment '.#links' -c just checklinks --offline --no-progress
223205

docs/support_matrix.qmd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ The changes will show up in the dev docs when your PR is merged!
3636
## Raw Data
3737

3838
```{python}
39-
#| echo: true
39+
#| echo: false
4040
!python ../gen_matrix.py
4141
```
4242

gen_matrix.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ def main():
4545
"docs", "backends", "raw_support_matrix.csv"
4646
).open(mode="w") as f:
4747
df.to_csv(f, index_label="FullOperation")
48-
print(f"CSV output path: {f.name}") # noqa: T201
4948

5049

5150
if __name__ == "__main__":

ibis/backends/flink/__init__.py

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
from ibis.backends.base import BaseBackend, CanCreateDatabase
1313
from ibis.backends.base.sql.ddl import fully_qualified_re, is_fully_qualified
1414
from ibis.backends.flink.compiler.core import FlinkCompiler
15-
from ibis.backends.flink.datatypes import FlinkRowSchema
1615
from ibis.backends.flink.ddl import (
1716
CreateDatabase,
1817
CreateTableFromConnector,
@@ -146,15 +145,14 @@ def list_tables(
146145
# but executing the SQL string directly yields a `TableResult` object
147146
return self._filter_with_like(tables, like)
148147

149-
def list_views(
148+
def _list_views(
150149
self,
151150
like: str | None = None,
152151
temporary: bool = True,
153152
) -> list[str]:
154153
"""Return the list of view names.
155154
156-
Return the list of view names in the specified database and catalog.
157-
or the default one if no database/catalog is specified.
155+
Return the list of view names.
158156
159157
Parameters
160158
----------
@@ -311,12 +309,11 @@ def create_table(
311309
name
312310
Name of the new table.
313311
obj
314-
An Ibis table expression or pandas table that will be used to
315-
extract the schema and the data of the new table. If not provided,
316-
`schema` must be given.
312+
An Ibis table expression, pandas DataFrame, or PyArrow Table that will
313+
be used to extract the schema and the data of the new table. An
314+
optional `schema` can be used to override the schema.
317315
schema
318-
The schema for the new table. Only one of `schema` or `obj` can be
319-
provided.
316+
The schema for the new table. Required if `obj` is not provided.
320317
database
321318
Name of the database where the table will be created, if not the
322319
default.
@@ -344,6 +341,7 @@ def create_table(
344341
import pyarrow_hotfix # noqa: F401
345342

346343
import ibis.expr.types as ir
344+
from ibis.backends.flink.datatypes import FlinkRowSchema
347345

348346
if obj is None and schema is None:
349347
raise exc.IbisError("The schema or obj parameter is required")
@@ -381,7 +379,7 @@ def create_table(
381379
catalog=catalog,
382380
)
383381
self._exec_sql(statement.compile())
384-
return self.table(name, database=database)
382+
return self.table(name, database=database, catalog=catalog)
385383

386384
def drop_table(
387385
self,
@@ -419,7 +417,7 @@ def drop_table(
419417
def create_view(
420418
self,
421419
name: str,
422-
obj: pd.DataFrame | ir.Table | None = None,
420+
obj: pd.DataFrame | ir.Table,
423421
*,
424422
database: str | None = None,
425423
catalog: str | None = None,
@@ -446,19 +444,16 @@ def create_view(
446444
Table
447445
The view that was created.
448446
"""
449-
if obj is None:
450-
raise exc.IbisError("The obj parameter is required")
451-
452447
if isinstance(obj, ir.Table):
453448
# TODO(chloeh13q): implement CREATE VIEW for expressions
454449
raise NotImplementedError
455450

456-
if overwrite and self.list_views(name):
451+
if overwrite and name in self._list_views():
457452
self.drop_view(name=name, catalog=catalog, database=database, force=True)
458453

459454
qualified_name = self._fully_qualified_name(name, database, catalog)
460455
self._table_env.create_temporary_view(qualified_name, obj)
461-
return self.table(name, database=database)
456+
return self.table(name, database=database, catalog=catalog)
462457

463458
def drop_view(
464459
self,

ibis/backends/flink/datatypes.py

Lines changed: 35 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from __future__ import annotations
22

3-
import pyflink.table.types as fl
3+
from pyflink.table.types import DataType, DataTypes, RowType
44

55
import ibis.expr.datatypes as dt
66
import ibis.expr.schema as sch
@@ -9,86 +9,86 @@
99

1010
class FlinkRowSchema(SchemaMapper):
1111
@classmethod
12-
def from_ibis(cls, schema: sch.Schema | None) -> list[fl.RowType]:
12+
def from_ibis(cls, schema: sch.Schema | None) -> list[RowType]:
1313
if schema is None:
1414
return None
1515

16-
return fl.DataTypes.ROW(
16+
return DataTypes.ROW(
1717
[
18-
fl.DataTypes.FIELD(k, FlinkType.from_ibis(v))
18+
DataTypes.FIELD(k, FlinkType.from_ibis(v))
1919
for k, v in schema.fields.items()
2020
]
2121
)
2222

2323

2424
class FlinkType(TypeMapper):
2525
@classmethod
26-
def to_ibis(cls, typ: fl.DataType, nullable=True) -> dt.DataType:
26+
def to_ibis(cls, typ: DataType, nullable=True) -> dt.DataType:
2727
"""Convert a flink type to an ibis type."""
28-
if typ == fl.DataTypes.STRING():
28+
if typ == DataTypes.STRING():
2929
return dt.String(nullable=nullable)
30-
elif typ == fl.DataTypes.BOOLEAN():
30+
elif typ == DataTypes.BOOLEAN():
3131
return dt.Boolean(nullable=nullable)
32-
elif typ == fl.DataTypes.BYTES():
32+
elif typ == DataTypes.BYTES():
3333
return dt.Binary(nullable=nullable)
34-
elif typ == fl.DataTypes.TINYINT():
34+
elif typ == DataTypes.TINYINT():
3535
return dt.Int8(nullable=nullable)
36-
elif typ == fl.DataTypes.SMALLINT():
36+
elif typ == DataTypes.SMALLINT():
3737
return dt.Int16(nullable=nullable)
38-
elif typ == fl.DataTypes.INT():
38+
elif typ == DataTypes.INT():
3939
return dt.Int32(nullable=nullable)
40-
elif typ == fl.DataTypes.BIGINT():
40+
elif typ == DataTypes.BIGINT():
4141
return dt.Int64(nullable=nullable)
42-
elif typ == fl.DataTypes.FLOAT():
42+
elif typ == DataTypes.FLOAT():
4343
return dt.Float32(nullable=nullable)
44-
elif typ == fl.DataTypes.DOUBLE():
44+
elif typ == DataTypes.DOUBLE():
4545
return dt.Float64(nullable=nullable)
46-
elif typ == fl.DataTypes.DATE():
46+
elif typ == DataTypes.DATE():
4747
return dt.Date(nullable=nullable)
48-
elif typ == fl.DataTypes.TIME():
48+
elif typ == DataTypes.TIME():
4949
return dt.Time(nullable=nullable)
50-
elif typ == fl.DataTypes.TIMESTAMP():
50+
elif typ == DataTypes.TIMESTAMP():
5151
return dt.Timestamp(nullable=nullable)
5252
else:
5353
return super().to_ibis(typ, nullable=nullable)
5454

5555
@classmethod
56-
def from_ibis(cls, dtype: dt.DataType) -> fl.DataType:
56+
def from_ibis(cls, dtype: dt.DataType) -> DataType:
5757
"""Convert an ibis type to a flink type."""
5858
if dtype.is_string():
59-
return fl.DataTypes.STRING()
59+
return DataTypes.STRING(nullable=dtype.nullable)
6060
elif dtype.is_boolean():
61-
return fl.DataTypes.BOOLEAN()
61+
return DataTypes.BOOLEAN(nullable=dtype.nullable)
6262
elif dtype.is_binary():
63-
return fl.DataTypes.BYTES()
63+
return DataTypes.BYTES(nullable=dtype.nullable)
6464
elif dtype.is_int8():
65-
return fl.DataTypes.TINYINT()
65+
return DataTypes.TINYINT(nullable=dtype.nullable)
6666
elif dtype.is_int16():
67-
return fl.DataTypes.SMALLINT()
67+
return DataTypes.SMALLINT(nullable=dtype.nullable)
6868
elif dtype.is_int32():
69-
return fl.DataTypes.INT()
69+
return DataTypes.INT(nullable=dtype.nullable)
7070
elif dtype.is_int64():
71-
return fl.DataTypes.BIGINT()
71+
return DataTypes.BIGINT(nullable=dtype.nullable)
7272
elif dtype.is_uint8():
73-
return fl.DataTypes.TINYINT()
73+
return DataTypes.TINYINT(nullable=dtype.nullable)
7474
elif dtype.is_uint16():
75-
return fl.DataTypes.SMALLINT()
75+
return DataTypes.SMALLINT(nullable=dtype.nullable)
7676
elif dtype.is_uint32():
77-
return fl.DataTypes.INT()
77+
return DataTypes.INT(nullable=dtype.nullable)
7878
elif dtype.is_uint64():
79-
return fl.DataTypes.BIGINT()
79+
return DataTypes.BIGINT(nullable=dtype.nullable)
8080
elif dtype.is_float16():
81-
return fl.DataTypes.FLOAT()
81+
return DataTypes.FLOAT(nullable=dtype.nullable)
8282
elif dtype.is_float32():
83-
return fl.DataTypes.FLOAT()
83+
return DataTypes.FLOAT(nullable=dtype.nullable)
8484
elif dtype.is_float64():
85-
return fl.DataTypes.DOUBLE()
85+
return DataTypes.DOUBLE(nullable=dtype.nullable)
8686
elif dtype.is_date():
87-
return fl.DataTypes.DATE()
87+
return DataTypes.DATE(nullable=dtype.nullable)
8888
elif dtype.is_time():
89-
return fl.DataTypes.TIME()
89+
return DataTypes.TIME(nullable=dtype.nullable)
9090
elif dtype.is_timestamp():
91-
return fl.DataTypes.TIMESTAMP()
91+
return DataTypes.TIMESTAMP(nullable=dtype.nullable)
9292
else:
9393
return super().from_ibis(dtype)
9494

ibis/backends/flink/registry.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
operation_registry as base_operation_registry,
1010
)
1111
from ibis.backends.base.sql.registry.main import varargs
12-
from ibis.backends.flink.datatypes import FlinkType
1312
from ibis.common.temporal import TimestampUnit
1413

1514
if TYPE_CHECKING:
@@ -222,6 +221,8 @@ def _window(translator: ExprTranslator, op: ops.Node) -> str:
222221

223222

224223
def _clip(translator: ExprTranslator, op: ops.Node) -> str:
224+
from ibis.backends.flink.datatypes import FlinkType
225+
225226
arg = translator.translate(op.arg)
226227

227228
if op.upper is not None:

ibis/backends/flink/tests/test_ddl.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -178,15 +178,14 @@ def test_force_recreate_table_from_schema(
178178
],
179179
)
180180
@pytest.mark.parametrize(
181-
"schema_props", [(None, None), (_awards_players_schema, "awards_players")]
181+
"schema, table_name", [(None, None), (_awards_players_schema, "awards_players")]
182182
)
183183
def test_recreate_in_mem_table(
184-
con, employee_df, schema_props, temp_table, csv_source_configs
184+
con, employee_df, schema, table_name, temp_table, csv_source_configs
185185
):
186186
# create table once
187-
schema = schema_props[0]
188-
if schema_props[1] is not None:
189-
tbl_properties = csv_source_configs(schema_props[1])
187+
if table_name is not None:
188+
tbl_properties = csv_source_configs(table_name)
190189
else:
191190
tbl_properties = None
192191

@@ -242,7 +241,6 @@ def test_force_recreate_in_mem_table(
242241
tbl_properties=tbl_properties,
243242
)
244243
assert temp_table in con.list_tables()
245-
assert temp_table in con.list_views()
246244
if schema is not None:
247245
assert new_table.schema() == schema
248246

@@ -255,7 +253,6 @@ def test_force_recreate_in_mem_table(
255253
overwrite=True,
256254
)
257255
assert temp_table in con.list_tables()
258-
assert temp_table in con.list_views()
259256
if schema is not None:
260257
assert new_table.schema() == schema
261258

0 commit comments

Comments
 (0)