Skip to content

Commit 1413de9

Browse files
zhenzhongxugforsyth
authored andcommitted
fix(flink): add test cases for recreate table
1 parent 0c9791f commit 1413de9

File tree

2 files changed

+172
-27
lines changed

2 files changed

+172
-27
lines changed

ibis/backends/flink/__init__.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ def drop_table(
419419
def create_view(
420420
self,
421421
name: str,
422-
obj: ir.Table,
422+
obj: pd.DataFrame | ir.Table | None = None,
423423
*,
424424
database: str | None = None,
425425
catalog: str | None = None,
@@ -449,6 +449,10 @@ def create_view(
449449
if obj is None:
450450
raise exc.IbisError("The obj parameter is required")
451451

452+
if isinstance(obj, ir.Table):
453+
# TODO(chloeh13q): implement CREATE VIEW for expressions
454+
raise NotImplementedError
455+
452456
if overwrite and self.list_views(name):
453457
self.drop_view(name=name, catalog=catalog, database=database, force=True)
454458

ibis/backends/flink/tests/test_ddl.py

Lines changed: 167 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,34 @@
1717
except ImportError:
1818
Py4JJavaError = None
1919

20+
_awards_players_schema = sch.Schema(
21+
{
22+
"playerID": dt.string,
23+
"awardID": dt.string,
24+
"yearID": dt.int32,
25+
"lgID": dt.string,
26+
"tie": dt.string,
27+
"notes": dt.string,
28+
}
29+
)
30+
31+
_functiona_alltypes_schema = sch.Schema(
32+
{
33+
"id": dt.int32,
34+
"bool_col": dt.bool,
35+
"smallint_col": dt.int16,
36+
"int_col": dt.int32,
37+
"bigint_col": dt.int64,
38+
"float_col": dt.float32,
39+
"double_col": dt.float64,
40+
"date_string_col": dt.string,
41+
"string_col": dt.string,
42+
"timestamp_col": dt.timestamp(scale=3),
43+
"year": dt.int32,
44+
"month": dt.int32,
45+
}
46+
)
47+
2048

2149
@pytest.fixture(autouse=True)
2250
def reset_con(con):
@@ -28,36 +56,12 @@ def reset_con(con):
2856

2957
@pytest.fixture
3058
def awards_players_schema():
31-
return sch.Schema(
32-
{
33-
"playerID": dt.string,
34-
"awardID": dt.string,
35-
"yearID": dt.int32,
36-
"lgID": dt.string,
37-
"tie": dt.string,
38-
"notes": dt.string,
39-
}
40-
)
59+
return _awards_players_schema
4160

4261

4362
@pytest.fixture
4463
def functiona_alltypes_schema():
45-
return sch.Schema(
46-
{
47-
"id": dt.int32,
48-
"bool_col": dt.bool,
49-
"smallint_col": dt.int16,
50-
"int_col": dt.int32,
51-
"bigint_col": dt.int64,
52-
"float_col": dt.float32,
53-
"double_col": dt.float64,
54-
"date_string_col": dt.string,
55-
"string_col": dt.string,
56-
"timestamp_col": dt.timestamp(scale=3),
57-
"year": dt.int32,
58-
"month": dt.int32,
59-
}
60-
)
64+
return _functiona_alltypes_schema
6165

6266

6367
@pytest.fixture
@@ -117,6 +121,143 @@ def test_create_table(con, awards_players_schema, temp_table, csv_source_configs
117121
assert temp_table not in con.list_tables()
118122

119123

124+
def test_recreate_table_from_schema(
125+
con, awards_players_schema, temp_table, csv_source_configs
126+
):
127+
# create table once
128+
new_table = con.create_table(
129+
temp_table,
130+
schema=awards_players_schema,
131+
tbl_properties=csv_source_configs("awards_players"),
132+
)
133+
assert temp_table in con.list_tables()
134+
assert new_table.schema() == awards_players_schema
135+
136+
# create the same table a second time should fail
137+
with pytest.raises(
138+
Py4JJavaError,
139+
match="org.apache.flink.table.catalog.exceptions.TableAlreadyExistException",
140+
):
141+
new_table = con.create_table(
142+
temp_table,
143+
schema=awards_players_schema,
144+
tbl_properties=csv_source_configs("awards_players"),
145+
overwrite=False,
146+
)
147+
148+
149+
def test_force_recreate_table_from_schema(
150+
con, awards_players_schema, temp_table, csv_source_configs
151+
):
152+
# create table once
153+
new_table = con.create_table(
154+
temp_table,
155+
schema=awards_players_schema,
156+
tbl_properties=csv_source_configs("awards_players"),
157+
)
158+
assert temp_table in con.list_tables()
159+
assert new_table.schema() == awards_players_schema
160+
161+
# force creating the same twice a second time
162+
new_table = con.create_table(
163+
temp_table,
164+
schema=awards_players_schema,
165+
tbl_properties=csv_source_configs("awards_players"),
166+
overwrite=True,
167+
)
168+
assert temp_table in con.list_tables()
169+
assert new_table.schema() == awards_players_schema
170+
171+
172+
@pytest.mark.parametrize(
173+
"employee_df",
174+
[
175+
pd.DataFrame(
176+
[("fred flintstone", "award", 2002, "lg_id", "tie", "this is a note")]
177+
)
178+
],
179+
)
180+
@pytest.mark.parametrize(
181+
"schema_props", [(None, None), (_awards_players_schema, "awards_players")]
182+
)
183+
def test_recreate_in_mem_table(
184+
con, employee_df, schema_props, temp_table, csv_source_configs
185+
):
186+
# create table once
187+
schema = schema_props[0]
188+
if schema_props[1] is not None:
189+
tbl_properties = csv_source_configs(schema_props[1])
190+
else:
191+
tbl_properties = None
192+
193+
new_table = con.create_table(
194+
name=temp_table,
195+
obj=employee_df,
196+
schema=schema,
197+
tbl_properties=tbl_properties,
198+
)
199+
assert temp_table in con.list_tables()
200+
if schema is not None:
201+
assert new_table.schema() == schema
202+
203+
# create the same table a second time should fail
204+
with pytest.raises(
205+
Py4JJavaError,
206+
match="An error occurred while calling o8.createTemporaryView",
207+
):
208+
new_table = con.create_table(
209+
name=temp_table,
210+
obj=employee_df,
211+
schema=schema,
212+
tbl_properties=tbl_properties,
213+
overwrite=False,
214+
)
215+
216+
217+
@pytest.mark.parametrize(
218+
"employee_df",
219+
[
220+
pd.DataFrame(
221+
[("fred flintstone", "award", 2002, "lg_id", "tie", "this is a note")]
222+
)
223+
],
224+
)
225+
@pytest.mark.parametrize(
226+
"schema_props", [(None, None), (_awards_players_schema, "awards_players")]
227+
)
228+
def test_force_recreate_in_mem_table(
229+
con, employee_df, schema_props, temp_table, csv_source_configs
230+
):
231+
# create table once
232+
schema = schema_props[0]
233+
if schema_props[1] is not None:
234+
tbl_properties = csv_source_configs(schema_props[1])
235+
else:
236+
tbl_properties = None
237+
238+
new_table = con.create_table(
239+
name=temp_table,
240+
obj=employee_df,
241+
schema=schema,
242+
tbl_properties=tbl_properties,
243+
)
244+
assert temp_table in con.list_tables()
245+
if schema is not None:
246+
assert new_table.schema() == schema
247+
248+
# force recreate the same table a second time should succeed
249+
new_table = con.create_table(
250+
name=temp_table,
251+
obj=employee_df,
252+
schema=schema,
253+
tbl_properties=tbl_properties,
254+
overwrite=True,
255+
)
256+
assert temp_table in con.list_tables()
257+
if schema is not None:
258+
assert new_table.schema() == schema
259+
260+
120261
def test_create_source_table_with_watermark(
121262
con, functiona_alltypes_schema, temp_table, csv_source_configs
122263
):

0 commit comments

Comments
 (0)