Skip to content

Commit cd5e881

Browse files
committed
feat(bigquery): move bigquery backend back into the main repo
1 parent 370830b commit cd5e881

File tree

287 files changed

+6794
-553
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

287 files changed

+6794
-553
lines changed

.github/workflows/ibis-snowflake.yml renamed to .github/workflows/ibis-backends-cloud.yml

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# vim: filetype=yaml
2-
name: Snowflake Backend
2+
name: Cloud Backends
33

44
on:
55
push:
@@ -33,6 +33,8 @@ jobs:
3333
backend:
3434
- name: snowflake
3535
title: Snowflake
36+
- name: bigquery
37+
title: BigQuery
3638
steps:
3739
- name: checkout
3840
uses: actions/checkout@v3
@@ -46,6 +48,7 @@ jobs:
4648
- run: python -m pip install --upgrade pip 'poetry>=1.2'
4749

4850
- name: set a compatible pyarrow version
51+
if: ${{ matrix.backend.name == 'snowflake' }}
4952
run: poetry add pyarrow@'>=8.0.0,<8.1.0' --optional
5053

5154
- name: install ibis
@@ -58,11 +61,20 @@ jobs:
5861
- name: download backend data
5962
run: just download-data
6063

61-
- name: "run parallel tests: ${{ matrix.backend.name }}"
62-
run: just ci-check -m ${{ matrix.backend.name }} --numprocesses auto --dist=loadgroup
64+
- uses: google-github-actions/auth@v1
65+
if: ${{ matrix.backend.name == 'bigquery' }}
66+
with:
67+
credentials_json: ${{ secrets.GCP_CREDENTIALS }}
68+
69+
- name: setup snowflake credentials
70+
if: ${{ matrix.backend.name == 'snowflake' }}
71+
run: echo "SNOWFLAKE_URL=${SNOWFLAKE_URL}" >> "$GITHUB_ENV"
6372
env:
6473
SNOWFLAKE_URL: ${{ secrets.SNOWFLAKE_URL }}
6574

75+
- name: "run parallel tests: ${{ matrix.backend.name }}"
76+
run: just ci-check -m ${{ matrix.backend.name }} --numprocesses auto --dist=loadgroup
77+
6678
- name: upload code coverage
6779
if: success()
6880
uses: codecov/codecov-action@v3

ibis/backends/base/sql/alchemy/__init__.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -142,16 +142,11 @@ def _to_geodataframe(df, schema):
142142
return df
143143

144144
def fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame:
145-
146145
import pandas as pd
147146

148-
df = pd.DataFrame.from_records(
149-
cursor,
150-
columns=cursor.keys(),
151-
coerce_float=True,
152-
)
147+
df = pd.DataFrame.from_records(cursor, columns=schema.names, coerce_float=True)
153148
df = schema.apply_to(df)
154-
if len(df) and geospatial_supported:
149+
if not df.empty and geospatial_supported:
155150
return self._to_geodataframe(df, schema)
156151
return df
157152

ibis/backends/base/sql/alchemy/registry.py

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -332,14 +332,6 @@ def _window(t, op):
332332

333333
window_op = arg
334334

335-
_require_order_by = (
336-
ops.DenseRank,
337-
ops.MinRank,
338-
ops.NTile,
339-
ops.PercentRank,
340-
ops.CumeDist,
341-
)
342-
343335
if isinstance(window_op, ops.CumulativeOp):
344336
arg = _cumulative_to_window(t, arg, window).op()
345337
return t.translate(arg)
@@ -352,29 +344,18 @@ def _window(t, op):
352344

353345
# Some analytic functions need to have the expression of interest in
354346
# the ORDER BY part of the window clause
355-
if isinstance(window_op, _require_order_by) and not window._order_by:
347+
if isinstance(window_op, t._require_order_by) and not window._order_by:
356348
order_by = t.translate(window_op.args[0])
357349
else:
358350
order_by = [t.translate(arg) for arg in window._order_by]
359351

360352
partition_by = [t.translate(arg) for arg in window._group_by]
361353

362-
frame_clause_not_allowed = (
363-
ops.Lag,
364-
ops.Lead,
365-
ops.DenseRank,
366-
ops.MinRank,
367-
ops.NTile,
368-
ops.PercentRank,
369-
ops.CumeDist,
370-
ops.RowNumber,
371-
)
372-
373354
how = {'range': 'range_'}.get(window.how, window.how)
374355
preceding = window.preceding
375356
additional_params = (
376357
{}
377-
if isinstance(window_op, frame_clause_not_allowed)
358+
if t._forbids_frame_clause and isinstance(window_op, t._forbids_frame_clause)
378359
else {
379360
how: (
380361
-preceding if preceding is not None else preceding,

ibis/backends/base/sql/alchemy/translator.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ def subcontext(self):
3333

3434

3535
class AlchemyExprTranslator(ExprTranslator):
36-
3736
_registry = sqlalchemy_operation_registry
3837
_rewrites = ExprTranslator._rewrites.copy()
3938
_type_map = ibis_type_to_sqla
@@ -47,6 +46,14 @@ class AlchemyExprTranslator(ExprTranslator):
4746
native_json_type = True
4847
_always_quote_columns = False
4948

49+
_require_order_by = (
50+
ops.DenseRank,
51+
ops.MinRank,
52+
ops.NTile,
53+
ops.PercentRank,
54+
ops.CumeDist,
55+
)
56+
5057
def name(self, translated, name, force=True):
5158
return translated.label(name)
5259

ibis/backends/base/sql/compiler/translator.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,25 @@ class ExprTranslator:
162162

163163
_registry = operation_registry
164164
_rewrites: dict[ops.Node, Callable] = {}
165+
_forbids_frame_clause = (
166+
ops.DenseRank,
167+
ops.MinRank,
168+
ops.NTile,
169+
ops.PercentRank,
170+
ops.CumeDist,
171+
ops.RowNumber,
172+
)
173+
_require_order_by = (
174+
ops.Lag,
175+
ops.Lead,
176+
ops.DenseRank,
177+
ops.MinRank,
178+
ops.FirstValue,
179+
ops.LastValue,
180+
ops.PercentRank,
181+
ops.CumeDist,
182+
ops.NTile,
183+
)
165184

166185
def __init__(self, node, context, named=False, permit_subquery=False):
167186
self.node = node

ibis/backends/base/sql/registry/window.py

Lines changed: 27 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from operator import add, mul, sub
2-
from typing import Optional, Union
2+
from typing import Optional
33

44
import ibis
55
import ibis.common.exceptions as com
@@ -41,7 +41,7 @@
4141
}
4242

4343

44-
def _replace_interval_with_scalar(expr: Union[ir.Expr, dt.Interval, float]):
44+
def _replace_interval_with_scalar(op: ops.Node):
4545
"""Good old Depth-First Search to identify the Interval and IntervalValue
4646
components of the expression and return a comparable scalar expression.
4747
@@ -54,37 +54,22 @@ def _replace_interval_with_scalar(expr: Union[ir.Expr, dt.Interval, float]):
5454
-------
5555
preceding : float or ir.FloatingScalar, depending upon the expr
5656
"""
57-
if isinstance(expr, ir.Expr):
58-
expr_op = expr.op()
59-
else:
60-
expr_op = None
61-
62-
# TODO: fix this to use to_interval if possible
63-
if not isinstance(expr, (dt.Interval, ir.IntervalValue)):
64-
# Literal expressions have op method but native types do not.
65-
if isinstance(expr_op, ops.Literal):
66-
return expr_op.value
67-
else:
68-
return expr
69-
elif isinstance(expr, dt.Interval):
57+
if isinstance(op, ops.Literal):
58+
unit = getattr(op.output_dtype, "unit", "us")
7059
try:
71-
microseconds = _map_interval_to_microseconds[expr.unit]
72-
return microseconds
60+
micros = _map_interval_to_microseconds[unit]
61+
return op.value * micros
7362
except KeyError:
74-
raise ValueError(
75-
"Expected preceding values of week(), "
76-
+ "day(), hour(), minute(), second(), millisecond(), "
77-
+ f"microseconds(), nanoseconds(); got {expr}"
78-
)
79-
elif expr_op.args and isinstance(expr, ir.IntervalValue):
80-
if len(expr_op.args) > 2:
63+
raise ValueError(f"Unsupported unit {unit!r}")
64+
elif op.args and isinstance(op.output_dtype, dt.Interval):
65+
if len(op.args) > 2:
8166
raise NotImplementedError("'preceding' argument cannot be parsed.")
82-
left_arg = _replace_interval_with_scalar(expr_op.args[0])
83-
right_arg = _replace_interval_with_scalar(expr_op.args[1])
84-
method = _map_interval_op_to_op[type(expr_op)]
67+
left_arg = _replace_interval_with_scalar(op.args[0])
68+
right_arg = _replace_interval_with_scalar(op.args[1])
69+
method = _map_interval_op_to_op[type(op)]
8570
return method(left_arg, right_arg)
8671
else:
87-
raise TypeError(f'expr has unknown type {type(expr).__name__}')
72+
raise TypeError(f'input has unknown type {type(op)}')
8873

8974

9075
def cumulative_to_window(translator, op, window):
@@ -105,20 +90,20 @@ def cumulative_to_window(translator, op, window):
10590

10691
def time_range_to_range_window(_, window):
10792
# Check that ORDER BY column is a single time column:
108-
order_by_vars = [x.op().args[0] for x in window._order_by]
93+
order_by_vars = [x.args[0] for x in window._order_by]
10994
if len(order_by_vars) > 1:
11095
raise com.IbisInputError(
11196
f"Expected 1 order-by variable, got {len(order_by_vars)}"
11297
)
11398

114-
order_var = window._order_by[0].op().args[0]
115-
timestamp_order_var = order_var.cast('int64')
99+
order_var = order_by_vars[0]
100+
timestamp_order_var = ops.Cast(order_var, dt.int64).to_expr()
116101
window = window._replace(order_by=timestamp_order_var, how='range')
117102

118103
# Need to change preceding interval expression to scalars
119104
preceding = window.preceding
120105
if isinstance(preceding, ir.IntervalScalar):
121-
new_preceding = _replace_interval_with_scalar(preceding)
106+
new_preceding = _replace_interval_with_scalar(preceding.op())
122107
window = window._replace(preceding=new_preceding)
123108

124109
return window
@@ -165,22 +150,12 @@ def _foll(f: Optional[int]) -> str:
165150

166151
return f'{prefix} FOLLOWING'
167152

168-
frame_clause_not_allowed = (
169-
ops.Lag,
170-
ops.Lead,
171-
ops.DenseRank,
172-
ops.MinRank,
173-
ops.NTile,
174-
ops.PercentRank,
175-
ops.CumeDist,
176-
ops.RowNumber,
177-
)
178-
179-
if isinstance(op.expr, frame_clause_not_allowed):
153+
if translator._forbids_frame_clause and isinstance(
154+
op.expr, translator._forbids_frame_clause
155+
):
180156
frame = None
181157
elif p is not None and f is not None:
182158
frame = f'{window.how.upper()} BETWEEN {_prec(p)} AND {_foll(f)}'
183-
184159
elif p is not None:
185160
if isinstance(p, tuple):
186161
start, end = p
@@ -200,7 +175,6 @@ def _foll(f: Optional[int]) -> str:
200175
kind = 'ROWS' if f > 0 else 'RANGE'
201176
frame = f'{kind} BETWEEN UNBOUNDED PRECEDING AND {_foll(f)}'
202177
else:
203-
# no-op, default is full sample
204178
frame = None
205179

206180
if frame is not None:
@@ -223,18 +197,6 @@ def _foll(f: Optional[int]) -> str:
223197
def window(translator, op):
224198
arg, window = op.args
225199

226-
_require_order_by = (
227-
ops.Lag,
228-
ops.Lead,
229-
ops.DenseRank,
230-
ops.MinRank,
231-
ops.FirstValue,
232-
ops.LastValue,
233-
ops.PercentRank,
234-
ops.CumeDist,
235-
ops.NTile,
236-
)
237-
238200
_unsupported_reductions = (
239201
ops.ApproxMedian,
240202
ops.GroupConcat,
@@ -252,15 +214,18 @@ def window(translator, op):
252214

253215
# Some analytic functions need to have the expression of interest in
254216
# the ORDER BY part of the window clause
255-
if isinstance(arg, _require_order_by) and not window._order_by:
217+
if isinstance(arg, translator._require_order_by) and not window._order_by:
256218
window = window.order_by(arg.args[0])
257219

258220
# Time ranges need to be converted to microseconds.
259221
# FIXME(kszucs): avoid the expression roundtrip
260222
if window.how == 'range':
261-
order_by_types = [type(x.op().args[0]) for x in window._order_by]
262-
time_range_types = (ir.TimeColumn, ir.DateColumn, ir.TimestampColumn)
263-
if any(col_type in time_range_types for col_type in order_by_types):
223+
time_range_types = (dt.Time, dt.Date, dt.Timestamp)
224+
if any(
225+
isinstance(c.output_dtype, time_range_types)
226+
and c.output_shape.is_columnar()
227+
for c in window._order_by
228+
):
264229
window = time_range_to_range_window(translator, window)
265230

266231
window_formatted = format_window(translator, op, window)

0 commit comments

Comments
 (0)