Skip to content

Commit 055d4ad

Browse files
akuzmerimatnor
andauthored
Refactor vectorized grouping to prepare for hash grouping (#7408)
This PR prepares for #7341 It has various assorted refactorings and cosmetic changes: * Various cosmetic things I don't know where to put. * The definitions of aggregate functions and grouping columns in the vector agg node are now typed arrays and not lists. * The aggegate function implementation always work with at most one filter bitmap. This reduces the amount of code and will help to support the aggregate FILTER clauses. * Parts of the aggregate function implementations are restructured and renamed in a way that will make it easier to support hash grouping. * EXPLAIN output is added for vector agg node that mentions the grouping policy that is being used. No functional changes are expected except for the EXPLAIN output. Disable-check: force-changelog-file --------- Signed-off-by: Alexander Kuzmenkov <[email protected]> Co-authored-by: Erik Nordström <[email protected]>
1 parent 2251c0e commit 055d4ad

34 files changed

+3635
-6003
lines changed

src/guc.c

+18
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,9 @@ char *ts_current_timestamp_mock = NULL;
182182
int ts_guc_debug_toast_tuple_target = 128;
183183

184184
#ifdef TS_DEBUG
185+
186+
bool ts_guc_debug_have_int128;
187+
185188
static const struct config_enum_entry debug_require_options[] = { { "allow", DRO_Allow, false },
186189
{ "forbid", DRO_Forbid, false },
187190
{ "require", DRO_Require, false },
@@ -1065,6 +1068,21 @@ _guc_init(void)
10651068
/* assign_hook= */ NULL,
10661069
/* show_hook= */ NULL);
10671070

1071+
DefineCustomBoolVariable(/* name= */ MAKE_EXTOPTION("debug_have_int128"),
1072+
/* short_desc= */ "whether we have int128 support",
1073+
/* long_desc= */ "this is for debugging purposes",
1074+
/* valueAddr= */ &ts_guc_debug_have_int128,
1075+
#ifdef HAVE_INT128
1076+
/* bootValue= */ true,
1077+
#else
1078+
/* bootValue= */ false,
1079+
#endif
1080+
/* context= */ PGC_INTERNAL,
1081+
/* flags= */ 0,
1082+
/* check_hook= */ NULL,
1083+
/* assign_hook= */ NULL,
1084+
/* show_hook= */ NULL);
1085+
10681086
DefineCustomEnumVariable(/* name= */ MAKE_EXTOPTION("debug_require_vector_agg"),
10691087
/* short_desc= */
10701088
"ensure that vectorized aggregation is used or not",

tsl/src/compression/algorithms/deltadelta_impl.c

+6-6
Original file line numberDiff line numberDiff line change
@@ -44,20 +44,20 @@ FUNCTION_NAME(delta_delta_decompress_all, ELEMENT_TYPE)(Datum compressed, Memory
4444
* Pad the number of elements to multiple of 64 bytes if needed, so that we
4545
* can work in 64-byte blocks.
4646
*/
47+
#define INNER_LOOP_SIZE_LOG2 3
48+
#define INNER_LOOP_SIZE (1 << INNER_LOOP_SIZE_LOG2)
4749
const uint32 n_total = has_nulls ? nulls.num_elements : num_deltas;
48-
const uint32 n_total_padded =
49-
((n_total * sizeof(ELEMENT_TYPE) + 63) / 64) * 64 / sizeof(ELEMENT_TYPE);
50+
const uint32 n_total_padded = pad_to_multiple(INNER_LOOP_SIZE, n_total);
5051
const uint32 n_notnull = num_deltas;
51-
const uint32 n_notnull_padded =
52-
((n_notnull * sizeof(ELEMENT_TYPE) + 63) / 64) * 64 / sizeof(ELEMENT_TYPE);
52+
const uint32 n_notnull_padded = pad_to_multiple(INNER_LOOP_SIZE, n_notnull);
5353
Assert(n_total_padded >= n_total);
5454
Assert(n_notnull_padded >= n_notnull);
5555
Assert(n_total >= n_notnull);
5656
Assert(n_total <= GLOBAL_MAX_ROWS_PER_COMPRESSION);
5757

5858
/*
5959
* We need additional padding at the end of buffer, because the code that
60-
* converts the elements to postres Datum always reads in 8 bytes.
60+
* converts the elements to postgres Datum always reads in 8 bytes.
6161
*/
6262
const int buffer_bytes = n_total_padded * sizeof(ELEMENT_TYPE) + 8;
6363
ELEMENT_TYPE *restrict decompressed_values = MemoryContextAlloc(dest_mctx, buffer_bytes);
@@ -75,7 +75,6 @@ FUNCTION_NAME(delta_delta_decompress_all, ELEMENT_TYPE)(Datum compressed, Memory
7575
* Also tried zig-zag decoding in a separate loop, seems to be slightly
7676
* slower, around the noise threshold.
7777
*/
78-
#define INNER_LOOP_SIZE 8
7978
Assert(n_notnull_padded % INNER_LOOP_SIZE == 0);
8079
for (uint32 outer = 0; outer < n_notnull_padded; outer += INNER_LOOP_SIZE)
8180
{
@@ -86,6 +85,7 @@ FUNCTION_NAME(delta_delta_decompress_all, ELEMENT_TYPE)(Datum compressed, Memory
8685
decompressed_values[outer + inner] = current_element;
8786
}
8887
}
88+
#undef INNER_LOOP_SIZE_LOG2
8989
#undef INNER_LOOP_SIZE
9090

9191
uint64 *restrict validity_bitmap = NULL;

tsl/src/compression/arrow_c_data_interface.h

+31-3
Original file line numberDiff line numberDiff line change
@@ -176,21 +176,49 @@ arrow_set_row_validity(uint64 *bitmap, size_t row_number, bool value)
176176
const size_t qword_index = row_number / 64;
177177
const size_t bit_index = row_number % 64;
178178
const uint64 mask = 1ull << bit_index;
179+
const uint64 new_bit = (value ? 1ull : 0ull) << bit_index;
179180

180-
bitmap[qword_index] = (bitmap[qword_index] & ~mask) | ((-(uint64) value) & mask);
181+
bitmap[qword_index] = (bitmap[qword_index] & ~mask) | new_bit;
181182

182183
Assert(arrow_row_is_valid(bitmap, row_number) == value);
183184
}
184185

185-
/* Increase the `source_value` to be an even multiple of `pad_to`. */
186+
/*
187+
* AND two optional arrow validity bitmaps into the given storage.
188+
*/
189+
static inline const uint64 *
190+
arrow_combine_validity(size_t num_words, uint64 *restrict storage, const uint64 *filter1,
191+
const uint64 *filter2)
192+
{
193+
if (filter1 == NULL)
194+
{
195+
return filter2;
196+
}
197+
198+
if (filter2 == NULL)
199+
{
200+
return filter1;
201+
}
202+
203+
for (size_t i = 0; i < num_words; i++)
204+
{
205+
storage[i] = filter1[i] & filter2[i];
206+
}
207+
208+
return storage;
209+
}
210+
211+
/*
212+
* Increase the `source_value` to be an even multiple of `pad_to`.
213+
*/
186214
static inline uint64
187215
pad_to_multiple(uint64 pad_to, uint64 source_value)
188216
{
189217
return ((source_value + pad_to - 1) / pad_to) * pad_to;
190218
}
191219

192220
static inline size_t
193-
arrow_num_valid(uint64 *bitmap, size_t total_rows)
221+
arrow_num_valid(const uint64 *bitmap, size_t total_rows)
194222
{
195223
if (bitmap == NULL)
196224
{

tsl/src/nodes/decompress_chunk/compressed_batch.c

+3
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,8 @@ decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state
265265
column_values->decompression_type = value_bytes;
266266
column_values->buffers[0] = arrow->buffers[0];
267267
column_values->buffers[1] = arrow->buffers[1];
268+
column_values->buffers[2] = NULL;
269+
column_values->buffers[3] = NULL;
268270
}
269271
else
270272
{
@@ -290,6 +292,7 @@ decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state
290292
column_values->buffers[0] = arrow->buffers[0];
291293
column_values->buffers[1] = arrow->buffers[1];
292294
column_values->buffers[2] = arrow->buffers[2];
295+
column_values->buffers[3] = NULL;
293296
}
294297
else
295298
{

tsl/src/nodes/decompress_chunk/planner.c

+8-11
Original file line numberDiff line numberDiff line change
@@ -889,8 +889,12 @@ static void
889889
find_vectorized_quals(DecompressionMapContext *context, DecompressChunkPath *path, List *qual_list,
890890
List **vectorized, List **nonvectorized)
891891
{
892-
ListCell *lc;
892+
VectorQualInfo vqi = {
893+
.vector_attrs = build_vector_attrs_array(context->uncompressed_attno_info, path->info),
894+
.rti = path->info->chunk_rel->relid,
895+
};
893896

897+
ListCell *lc;
894898
foreach (lc, qual_list)
895899
{
896900
Node *source_qual = lfirst(lc);
@@ -903,14 +907,7 @@ find_vectorized_quals(DecompressionMapContext *context, DecompressChunkPath *pat
903907
*/
904908
Node *transformed_comparison =
905909
(Node *) ts_transform_cross_datatype_comparison((Expr *) source_qual);
906-
VectorQualInfoDecompressChunk vqidc = {
907-
.vqinfo = {
908-
.vector_attrs = build_vector_attrs_array(context->uncompressed_attno_info, path->info),
909-
.rti = path->info->chunk_rel->relid,
910-
},
911-
.colinfo = context->uncompressed_attno_info,
912-
};
913-
Node *vectorized_qual = vector_qual_make(transformed_comparison, &vqidc.vqinfo);
910+
Node *vectorized_qual = vector_qual_make(transformed_comparison, &vqi);
914911
if (vectorized_qual)
915912
{
916913
*vectorized = lappend(*vectorized, vectorized_qual);
@@ -919,9 +916,9 @@ find_vectorized_quals(DecompressionMapContext *context, DecompressChunkPath *pat
919916
{
920917
*nonvectorized = lappend(*nonvectorized, source_qual);
921918
}
922-
923-
pfree(vqidc.vqinfo.vector_attrs);
924919
}
920+
921+
pfree(vqi.vector_attrs);
925922
}
926923

927924
/*

tsl/src/nodes/decompress_chunk/vector_quals.h

+8-4
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,16 @@
1818
*/
1919
typedef struct VectorQualInfo
2020
{
21-
/* The range-table index of the relation to compute vectorized quals
22-
* for */
21+
/*
22+
* The range-table index of the relation to compute vectorized quals
23+
* for.
24+
*/
2325
Index rti;
2426

25-
/* AttrNumber-indexed array indicating whether an attribute/column is a
26-
* vectorizable type */
27+
/*
28+
* Array indexed by uncompressed attno indicating whether an
29+
* attribute/column is a vectorizable type.
30+
*/
2731
bool *vector_attrs;
2832
} VectorQualInfo;
2933

tsl/src/nodes/vector_agg/exec.c

+60-15
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ get_input_offset(DecompressChunkState *decompress_state, Var *var)
4747
Assert(value_column_description->type == COMPRESSED_COLUMN ||
4848
value_column_description->type == SEGMENTBY_COLUMN);
4949

50-
return value_column_description - dcontext->compressed_chunk_columns;
50+
const int index = value_column_description - dcontext->compressed_chunk_columns;
51+
return index;
5152
}
5253

5354
static void
@@ -76,21 +77,60 @@ vector_agg_begin(CustomScanState *node, EState *estate, int eflags)
7677
*/
7778
List *aggregated_tlist =
7879
castNode(CustomScan, vector_agg_state->custom.ss.ps.plan)->custom_scan_tlist;
79-
const int naggs = list_length(aggregated_tlist);
80-
for (int i = 0; i < naggs; i++)
80+
const int tlist_length = list_length(aggregated_tlist);
81+
82+
/*
83+
* First, count how many grouping columns and aggregate functions we have.
84+
*/
85+
int agg_functions_counter = 0;
86+
int grouping_column_counter = 0;
87+
for (int i = 0; i < tlist_length; i++)
88+
{
89+
TargetEntry *tlentry = list_nth_node(TargetEntry, aggregated_tlist, i);
90+
if (IsA(tlentry->expr, Aggref))
91+
{
92+
agg_functions_counter++;
93+
}
94+
else
95+
{
96+
/* This is a grouping column. */
97+
Assert(IsA(tlentry->expr, Var));
98+
grouping_column_counter++;
99+
}
100+
}
101+
Assert(agg_functions_counter + grouping_column_counter == tlist_length);
102+
103+
/*
104+
* Allocate the storage for definitions of aggregate function and grouping
105+
* columns.
106+
*/
107+
vector_agg_state->num_agg_defs = agg_functions_counter;
108+
vector_agg_state->agg_defs =
109+
palloc0(sizeof(*vector_agg_state->agg_defs) * vector_agg_state->num_agg_defs);
110+
111+
vector_agg_state->num_grouping_columns = grouping_column_counter;
112+
vector_agg_state->grouping_columns = palloc0(sizeof(*vector_agg_state->grouping_columns) *
113+
vector_agg_state->num_grouping_columns);
114+
115+
/*
116+
* Loop through the aggregated targetlist again and fill the definitions.
117+
*/
118+
agg_functions_counter = 0;
119+
grouping_column_counter = 0;
120+
for (int i = 0; i < tlist_length; i++)
81121
{
82-
TargetEntry *tlentry = (TargetEntry *) list_nth(aggregated_tlist, i);
122+
TargetEntry *tlentry = list_nth_node(TargetEntry, aggregated_tlist, i);
83123
if (IsA(tlentry->expr, Aggref))
84124
{
85125
/* This is an aggregate function. */
86-
VectorAggDef *def = palloc0(sizeof(VectorAggDef));
87-
vector_agg_state->agg_defs = lappend(vector_agg_state->agg_defs, def);
126+
VectorAggDef *def = &vector_agg_state->agg_defs[agg_functions_counter++];
88127
def->output_offset = i;
89128

90129
Aggref *aggref = castNode(Aggref, tlentry->expr);
130+
91131
VectorAggFunctions *func = get_vector_aggregate(aggref->aggfnoid);
92132
Assert(func != NULL);
93-
def->func = func;
133+
def->func = *func;
94134

95135
if (list_length(aggref->args) > 0)
96136
{
@@ -112,21 +152,22 @@ vector_agg_begin(CustomScanState *node, EState *estate, int eflags)
112152
/* This is a grouping column. */
113153
Assert(IsA(tlentry->expr, Var));
114154

115-
GroupingColumn *col = palloc0(sizeof(GroupingColumn));
116-
vector_agg_state->output_grouping_columns =
117-
lappend(vector_agg_state->output_grouping_columns, col);
155+
GroupingColumn *col = &vector_agg_state->grouping_columns[grouping_column_counter++];
118156
col->output_offset = i;
119157

120158
Var *var = castNode(Var, tlentry->expr);
121159
col->input_offset = get_input_offset(decompress_state, var);
122160
}
123161
}
124162

125-
List *grouping_column_offsets = linitial(cscan->custom_private);
163+
/*
164+
* Currently the only grouping policy we use is per-batch grouping.
165+
*/
126166
vector_agg_state->grouping =
127-
create_grouping_policy_batch(vector_agg_state->agg_defs,
128-
vector_agg_state->output_grouping_columns,
129-
/* partial_per_batch = */ grouping_column_offsets != NIL);
167+
create_grouping_policy_batch(vector_agg_state->num_agg_defs,
168+
vector_agg_state->agg_defs,
169+
vector_agg_state->num_grouping_columns,
170+
vector_agg_state->grouping_columns);
130171
}
131172

132173
static void
@@ -272,7 +313,11 @@ vector_agg_exec(CustomScanState *node)
272313
static void
273314
vector_agg_explain(CustomScanState *node, List *ancestors, ExplainState *es)
274315
{
275-
/* No additional output is needed. */
316+
VectorAggState *state = (VectorAggState *) node;
317+
if (es->verbose || es->format != EXPLAIN_FORMAT_TEXT)
318+
{
319+
ExplainPropertyText("Grouping Policy", state->grouping->gp_explain(state->grouping), es);
320+
}
276321
}
277322

278323
static struct CustomExecMethods exec_methods = {

tsl/src/nodes/vector_agg/exec.h

+7-5
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,14 @@
1313
#include "function/functions.h"
1414
#include "grouping_policy.h"
1515

16-
typedef struct
16+
typedef struct VectorAggDef
1717
{
18-
VectorAggFunctions *func;
18+
VectorAggFunctions func;
1919
int input_offset;
2020
int output_offset;
2121
} VectorAggDef;
2222

23-
typedef struct
23+
typedef struct GroupingColumn
2424
{
2525
int input_offset;
2626
int output_offset;
@@ -30,9 +30,11 @@ typedef struct
3030
{
3131
CustomScanState custom;
3232

33-
List *agg_defs;
33+
int num_agg_defs;
34+
VectorAggDef *agg_defs;
3435

35-
List *output_grouping_columns;
36+
int num_grouping_columns;
37+
GroupingColumn *grouping_columns;
3638

3739
/*
3840
* We can't call the underlying scan after it has ended, or it will be

tsl/src/nodes/vector_agg/function/agg_const_helper.c renamed to tsl/src/nodes/vector_agg/function/agg_scalar_helper.c

+11-5
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,20 @@
1111
* implementation otherwise.
1212
*/
1313
static void
14-
FUNCTION_NAME(const)(void *agg_state, Datum constvalue, bool constisnull, int n,
15-
MemoryContext agg_extra_mctx)
14+
FUNCTION_NAME(scalar)(void *agg_state, Datum constvalue, bool constisnull, int n,
15+
MemoryContext agg_extra_mctx)
1616
{
17-
const uint64 valid = constisnull ? 0 : 1;
18-
const CTYPE value = valid ? DATUM_TO_CTYPE(constvalue) : 0;
17+
if (constisnull)
18+
{
19+
return;
20+
}
21+
22+
const CTYPE value = DATUM_TO_CTYPE(constvalue);
1923

24+
MemoryContext old = MemoryContextSwitchTo(agg_extra_mctx);
2025
for (int i = 0; i < n; i++)
2126
{
22-
FUNCTION_NAME(vector_impl)(agg_state, 1, &value, &valid, NULL, agg_extra_mctx);
27+
FUNCTION_NAME(one)(agg_state, value);
2328
}
29+
MemoryContextSwitchTo(old);
2430
}

0 commit comments

Comments
 (0)