Skip to content

Commit 606dccb

Browse files
committed
chore: try using the entire table path for flink inserts
1 parent e1c0e72 commit 606dccb

File tree

1 file changed

+6
-2
lines changed

1 file changed

+6
-2
lines changed

ibis/backends/flink/__init__.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -906,18 +906,22 @@ def insert(
906906
)
907907
return self.raw_sql(statement.compile())
908908

909+
identifier = sg.table(
910+
table_name, db=database, catalog=catalog, quoted=self.compiler.quoted
911+
).sql(self.dialect)
912+
909913
if isinstance(obj, pa.Table):
910914
obj = obj.to_pandas()
911915
if isinstance(obj, dict):
912916
obj = pd.DataFrame.from_dict(obj)
913917
if isinstance(obj, pd.DataFrame):
914918
table = self._table_env.from_pandas(obj)
915-
return table.execute_insert(table_name, overwrite=overwrite)
919+
return table.execute_insert(identifier, overwrite=overwrite)
916920

917921
if isinstance(obj, list):
918922
# pyflink infers datatypes, which may sometimes result in incompatible types
919923
table = self._table_env.from_elements(obj)
920-
return table.execute_insert(table_name, overwrite=overwrite)
924+
return table.execute_insert(identifier, overwrite=overwrite)
921925

922926
raise ValueError(
923927
"No operation is being performed. Either the obj parameter "

0 commit comments

Comments
 (0)