@@ -139,13 +139,37 @@ def _limit(op: ops.Limit, *, table, n, offset, **_):
139
139
def _aggregation (
140
140
op : ops .Aggregation , * , table , metrics , by , having , predicates , sort_keys , ** _
141
141
):
142
- selections = (by + metrics ) or (STAR ,)
142
+ if by :
143
+ # datafusion doesn't support count distinct aggregations alongside
144
+ # computed grouping keys so create a projection of the key and all
145
+ # existing columns first, followed by the usual group by
146
+ #
147
+ # analogous to a user calling mutate -> group_by
148
+ by_names = frozenset (b .alias_or_name for b in by )
149
+ cols = [
150
+ sg .column (
151
+ name ,
152
+ table = sg .to_identifier (table .alias_or_name , quoted = True ),
153
+ quoted = True ,
154
+ )
155
+ for name in op .table .schema .keys () - by_names
156
+ ]
157
+ table = sg .select (* cols , * by ).from_ (table ).subquery ()
158
+
159
+ # datafusion lower cases all column names internally unless quoted so
160
+ # quoted=True is required here for correctness
161
+ by_names_quoted = tuple (
162
+ sg .column (b .alias_or_name , table = getattr (b , "table" , None ), quoted = True )
163
+ for b in by
164
+ )
165
+ selections = by_names_quoted + metrics
166
+ else :
167
+ selections = metrics or (STAR ,)
168
+
143
169
sel = sg .select (* selections ).from_ (table )
144
170
145
171
if by :
146
- sel = sel .group_by (
147
- * (key .this if isinstance (key , sg .exp .Alias ) else key for key in by )
148
- )
172
+ sel = sel .group_by (* by_names_quoted )
149
173
150
174
if predicates :
151
175
sel = sel .where (* predicates )
0 commit comments