Skip to content

Commit 622de09

Browse files
authored
fix(athena): implement proper support for inserting data (#10770)
1 parent 2c5b670 commit 622de09

File tree

3 files changed

+21
-25
lines changed

3 files changed

+21
-25
lines changed

ibis/backends/athena/__init__.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import contextlib
66
import getpass
77
import os
8+
import re
89
import sys
910
import tempfile
1011
from pathlib import Path
@@ -159,11 +160,7 @@ def create_table(
159160
if location is None:
160161
location = f"{self._s3_staging_dir}/{name}"
161162

162-
property_list = [
163-
sge.ExternalProperty(),
164-
sge.FileFormatProperty(this=compiler.v[stored_as]),
165-
sge.LocationProperty(this=sge.convert(location)),
166-
]
163+
property_list = []
167164

168165
for k, v in (properties or {}).items():
169166
name = sg.to_identifier(k)
@@ -196,6 +193,9 @@ def create_table(
196193
).from_(compiler.to_sqlglot(table).subquery())
197194
else:
198195
select = None
196+
property_list.append(sge.ExternalProperty())
197+
property_list.append(sge.FileFormatProperty(this=compiler.v[stored_as]))
198+
property_list.append(sge.LocationProperty(this=sge.convert(location)))
199199

200200
create_stmt = sge.Create(
201201
kind="TABLE",
@@ -287,8 +287,20 @@ def get_schema(
287287
def _safe_raw_sql(self, query, *args, unload: bool = True, **kwargs):
288288
with contextlib.suppress(AttributeError):
289289
query = query.sql(self.dialect)
290-
with self.con.cursor(unload=unload) as cur:
291-
yield cur.execute(query, *args, **kwargs)
290+
try:
291+
with self.con.cursor(unload=unload) as cur:
292+
yield cur.execute(query, *args, **kwargs)
293+
except pyathena.error.OperationalError as e:
294+
# apparently unload=True and can just nope out and not tell you
295+
# why, but unload=False is "fine"
296+
#
297+
# if the error isn't this opaque "internal" error, then we raise the original
298+
# exception, otherwise try to execute the query again with unload=False
299+
if unload and re.search("ErrorCode: INTERNAL_ERROR_QUERY_ENGINE", str(e)):
300+
with self.con.cursor(unload=False) as cur:
301+
yield cur.execute(query, *args, **kwargs)
302+
else:
303+
raise
292304

293305
def list_catalogs(self, like: str | None = None) -> list[str]:
294306
response = self.con.client.list_data_catalogs()

ibis/backends/tests/errors.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -186,11 +186,3 @@
186186
from pyathena.error import OperationalError as PyAthenaOperationalError
187187
except ImportError:
188188
PyAthenaDatabaseError = PyAthenaOperationalError = None
189-
190-
191-
try:
192-
from botocore.errorfactory import (
193-
InvalidRequestException as BotoInvalidRequestException,
194-
)
195-
except ImportError:
196-
BotoInvalidRequestException = None

ibis/backends/tests/test_client.py

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import ibis.expr.operations as ops
2727
from ibis.backends.conftest import ALL_BACKENDS
2828
from ibis.backends.tests.errors import (
29-
BotoInvalidRequestException,
3029
DatabricksServerOperationError,
3130
ExaQueryError,
3231
ImpalaHiveServer2Error,
@@ -97,11 +96,6 @@ def _create_temp_table_with_schema(backend, con, temp_table_name, schema, data=N
9796
ids=["no_schema", "dict_schema", "tuples", "schema"],
9897
)
9998
@pytest.mark.notimpl(["druid"])
100-
@pytest.mark.notimpl(
101-
["athena"],
102-
raises=BotoInvalidRequestException,
103-
reason="create table requires a location",
104-
)
10599
@pytest.mark.notimpl(
106100
["flink"],
107101
reason="Flink backend supports creating only TEMPORARY VIEW for in-memory data.",
@@ -952,6 +946,7 @@ def test_self_join_memory_table(backend, con, monkeypatch):
952946
"sqlite",
953947
"trino",
954948
"databricks",
949+
"athena",
955950
]
956951
)
957952
],
@@ -982,6 +977,7 @@ def test_self_join_memory_table(backend, con, monkeypatch):
982977
"sqlite",
983978
"trino",
984979
"databricks",
980+
"athena",
985981
],
986982
raises=com.UnsupportedOperationError,
987983
reason="we don't materialize datasets to avoid perf footguns",
@@ -1033,7 +1029,6 @@ def test_self_join_memory_table(backend, con, monkeypatch):
10331029
],
10341030
)
10351031
@pytest.mark.notimpl(["druid"])
1036-
@pytest.mark.notimpl(["athena"], raises=BotoInvalidRequestException)
10371032
@pytest.mark.notimpl(
10381033
["flink"],
10391034
reason="Flink backend supports creating only TEMPORARY VIEW for in-memory data.",
@@ -1417,7 +1412,6 @@ def create_and_destroy_db(con):
14171412
reason="unclear whether Flink supports cross catalog/database inserts",
14181413
raises=Py4JJavaError,
14191414
)
1420-
@pytest.mark.notimpl(["athena"])
14211415
def test_insert_with_database_specified(con_create_database):
14221416
con = con_create_database
14231417

@@ -1604,7 +1598,6 @@ def test_schema_with_caching(alltypes):
16041598
["druid"], raises=NotImplementedError, reason="doesn't support create_table"
16051599
)
16061600
@pytest.mark.notyet(["polars"], reason="Doesn't support insert")
1607-
@pytest.mark.notyet(["athena"])
16081601
@pytest.mark.notyet(
16091602
["datafusion"], reason="Doesn't support table creation from records"
16101603
)
@@ -1698,7 +1691,6 @@ def test_no_accidental_cross_database_table_load(con_create_database):
16981691

16991692

17001693
@pytest.mark.notyet(["druid"], reason="can't create tables")
1701-
@pytest.mark.notimpl(["athena"], reason="can't create tables correctly in some cases")
17021694
@pytest.mark.notyet(
17031695
["flink"], reason="can't create non-temporary tables from in-memory data"
17041696
)

0 commit comments

Comments
 (0)