Skip to content

Commit 0c9791f

Browse files
zhenzhongxugforsyth
authored andcommitted
fix(flink): fix recreating table/view issue on flink backend
1 parent 9006642 commit 0c9791f

File tree

2 files changed

+73
-19
lines changed

2 files changed

+73
-19
lines changed

ibis/backends/flink/__init__.py

Lines changed: 60 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
DropTable,
2020
InsertSelect,
2121
)
22+
from ibis.backends.flink.utils import ibis_schema_to_flink_schema
2223

2324
if TYPE_CHECKING:
2425
from collections.abc import Mapping
@@ -145,6 +146,34 @@ def list_tables(
145146
# but executing the SQL string directly yields a `TableResult` object
146147
return self._filter_with_like(tables, like)
147148

149+
def list_views(
150+
self,
151+
like: str | None = None,
152+
temporary: bool = True,
153+
) -> list[str]:
154+
"""Return the list of view names.
155+
156+
Return the list of view names in the specified database and catalog.
157+
or the default one if no database/catalog is specified.
158+
159+
Parameters
160+
----------
161+
like : str, optional
162+
A pattern in Python's regex format.
163+
temporary : bool, optional
164+
Whether to list temporary views or permanent views.
165+
166+
Returns
167+
-------
168+
list[str]
169+
The list of the view names that match the pattern `like`.
170+
"""
171+
if temporary:
172+
views = self._table_env.list_temporary_views()
173+
else:
174+
views = self._table_env.list_views()
175+
return self._filter_with_like(views, like)
176+
148177
def _fully_qualified_name(
149178
self,
150179
name: str,
@@ -319,26 +348,29 @@ def create_table(
319348
if obj is None and schema is None:
320349
raise exc.IbisError("The schema or obj parameter is required")
321350

322-
if overwrite:
323-
self.drop_table(name=name, catalog=catalog, database=database, force=True)
351+
# in-memory data is created as views in `pyflink`
352+
elif obj is not None:
353+
if isinstance(obj, pa.Table):
354+
obj = obj.to_pandas()
355+
if isinstance(obj, pd.DataFrame):
356+
table = self._table_env.from_pandas(
357+
obj, ibis_schema_to_flink_schema(schema)
358+
)
359+
if isinstance(obj, ir.Table):
360+
table = obj
361+
return self.create_view(name, table, database=database, overwrite=overwrite)
324362

325-
if isinstance(obj, pa.Table):
326-
obj = obj.to_pandas()
327-
if isinstance(obj, pd.DataFrame):
328-
qualified_name = self._fully_qualified_name(name, database, catalog)
329-
table = self._table_env.from_pandas(obj)
330-
# in-memory data is created as views in `pyflink`
331-
# TODO(chloeh13q): alternatively, we can do CREATE TABLE and then INSERT
332-
# INTO ... VALUES to keep things consistent
333-
self._table_env.create_temporary_view(qualified_name, table)
334-
if isinstance(obj, ir.Table):
335-
# TODO(chloeh13q): implement CREATE TABLE for expressions
336-
raise NotImplementedError
337-
if schema is not None:
363+
# external data is created as tables in `pyflink`
364+
else:
338365
if not tbl_properties:
339366
raise exc.IbisError(
340367
"tbl_properties is required when creating table with schema"
341368
)
369+
if overwrite:
370+
self.drop_table(
371+
name=name, catalog=catalog, database=database, force=True
372+
)
373+
342374
statement = CreateTableFromConnector(
343375
table_name=name,
344376
schema=schema,
@@ -349,8 +381,7 @@ def create_table(
349381
catalog=catalog,
350382
)
351383
self._exec_sql(statement.compile())
352-
353-
return self.table(name, database=database)
384+
return self.table(name, database=database)
354385

355386
def drop_table(
356387
self,
@@ -391,6 +422,7 @@ def create_view(
391422
obj: ir.Table,
392423
*,
393424
database: str | None = None,
425+
catalog: str | None = None,
394426
overwrite: bool = False,
395427
) -> ir.Table:
396428
"""Create a new view from an expression.
@@ -404,6 +436,8 @@ def create_view(
404436
database
405437
Name of the database where the view will be created, if not
406438
provided the database's default is used.
439+
catalog
440+
Name of the catalog where the table exists, if not the default.
407441
overwrite
408442
Whether to clobber an existing view with the same name.
409443
@@ -412,7 +446,15 @@ def create_view(
412446
Table
413447
The view that was created.
414448
"""
415-
raise NotImplementedError
449+
if obj is None:
450+
raise exc.IbisError("The obj parameter is required")
451+
452+
if overwrite and self.list_views(name):
453+
self.drop_view(name=name, catalog=catalog, database=database, force=True)
454+
455+
qualified_name = self._fully_qualified_name(name, database, catalog)
456+
self._table_env.create_temporary_view(qualified_name, obj)
457+
return self.table(name, database=database)
416458

417459
def drop_view(
418460
self,

ibis/backends/flink/utils.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@
55
from abc import ABC, abstractmethod
66
from collections import defaultdict
77

8-
from pyflink.table.types import DataTypes
8+
from pyflink.table.types import DataTypes, RowType
99

1010
import ibis.expr.datatypes as dt
1111
import ibis.expr.operations as ops
12+
import ibis.expr.schema as sch
1213
from ibis.common.temporal import IntervalUnit
1314
from ibis.util import convert_unit
1415

@@ -326,3 +327,14 @@ def translate_literal(op: ops.Literal) -> str:
326327
return f"ARRAY{list(value)}"
327328

328329
raise NotImplementedError(f"No translation rule for {dtype}")
330+
331+
332+
def ibis_schema_to_flink_schema(schema: sch.Schema) -> RowType:
333+
if schema is None:
334+
return None
335+
return DataTypes.ROW(
336+
[
337+
DataTypes.FIELD(key, _to_pyflink_types[type(value)])
338+
for key, value in schema.fields.items()
339+
]
340+
)

0 commit comments

Comments
 (0)