Skip to content

Refactor vectorized grouping to prepare for hash grouping #7408

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions tsl/src/compression/algorithms/deltadelta_impl.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,20 @@ FUNCTION_NAME(delta_delta_decompress_all, ELEMENT_TYPE)(Datum compressed, Memory
* Pad the number of elements to multiple of 64 bytes if needed, so that we
* can work in 64-byte blocks.
*/
#define INNER_LOOP_SIZE_LOG2 3
#define INNER_LOOP_SIZE (1 << INNER_LOOP_SIZE_LOG2)
const uint32 n_total = has_nulls ? nulls.num_elements : num_deltas;
const uint32 n_total_padded =
((n_total * sizeof(ELEMENT_TYPE) + 63) / 64) * 64 / sizeof(ELEMENT_TYPE);
const uint32 n_total_padded = pad_to_multiple(INNER_LOOP_SIZE, n_total);
const uint32 n_notnull = num_deltas;
const uint32 n_notnull_padded =
((n_notnull * sizeof(ELEMENT_TYPE) + 63) / 64) * 64 / sizeof(ELEMENT_TYPE);
const uint32 n_notnull_padded = pad_to_multiple(INNER_LOOP_SIZE, n_notnull);
Assert(n_total_padded >= n_total);
Assert(n_notnull_padded >= n_notnull);
Assert(n_total >= n_notnull);
Assert(n_total <= GLOBAL_MAX_ROWS_PER_COMPRESSION);

/*
* We need additional padding at the end of buffer, because the code that
* converts the elements to postres Datum always reads in 8 bytes.
* converts the elements to postgres Datum always reads in 8 bytes.
*/
const int buffer_bytes = n_total_padded * sizeof(ELEMENT_TYPE) + 8;
ELEMENT_TYPE *restrict decompressed_values = MemoryContextAlloc(dest_mctx, buffer_bytes);
Expand All @@ -75,7 +75,6 @@ FUNCTION_NAME(delta_delta_decompress_all, ELEMENT_TYPE)(Datum compressed, Memory
* Also tried zig-zag decoding in a separate loop, seems to be slightly
* slower, around the noise threshold.
*/
#define INNER_LOOP_SIZE 8
Assert(n_notnull_padded % INNER_LOOP_SIZE == 0);
for (uint32 outer = 0; outer < n_notnull_padded; outer += INNER_LOOP_SIZE)
{
Expand All @@ -86,6 +85,7 @@ FUNCTION_NAME(delta_delta_decompress_all, ELEMENT_TYPE)(Datum compressed, Memory
decompressed_values[outer + inner] = current_element;
}
}
#undef INNER_LOOP_SIZE_LOG2
#undef INNER_LOOP_SIZE

uint64 *restrict validity_bitmap = NULL;
Expand Down
27 changes: 26 additions & 1 deletion tsl/src/compression/arrow_c_data_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,31 @@ arrow_set_row_validity(uint64 *bitmap, size_t row_number, bool value)
Assert(arrow_row_is_valid(bitmap, row_number) == value);
}

/*
* AND two optional arrow validity bitmaps into the given storage.
*/
static inline const uint64 *
arrow_combine_validity(size_t num_words, uint64 *restrict storage, const uint64 *filter1,
const uint64 *filter2)
{
if (filter1 == NULL)
{
return filter2;
}

if (filter2 == NULL)
{
return filter1;
}

for (size_t i = 0; i < num_words; i++)
{
storage[i] = filter1[i] & filter2[i];
}

return storage;
}

/* Increase the `source_value` to be an even multiple of `pad_to`. */
static inline uint64
pad_to_multiple(uint64 pad_to, uint64 source_value)
Expand All @@ -190,7 +215,7 @@ pad_to_multiple(uint64 pad_to, uint64 source_value)
}

static inline size_t
arrow_num_valid(uint64 *bitmap, size_t total_rows)
arrow_num_valid(const uint64 *bitmap, size_t total_rows)
{
if (bitmap == NULL)
{
Expand Down
19 changes: 8 additions & 11 deletions tsl/src/nodes/decompress_chunk/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -887,8 +887,12 @@ static void
find_vectorized_quals(DecompressionMapContext *context, DecompressChunkPath *path, List *qual_list,
List **vectorized, List **nonvectorized)
{
ListCell *lc;
VectorQualInfo vqi = {
.vector_attrs = build_vector_attrs_array(context->uncompressed_attno_info, path->info),
.rti = path->info->chunk_rel->relid,
};

ListCell *lc;
foreach (lc, qual_list)
{
Node *source_qual = lfirst(lc);
Expand All @@ -901,14 +905,7 @@ find_vectorized_quals(DecompressionMapContext *context, DecompressChunkPath *pat
*/
Node *transformed_comparison =
(Node *) ts_transform_cross_datatype_comparison((Expr *) source_qual);
VectorQualInfoDecompressChunk vqidc = {
.vqinfo = {
.vector_attrs = build_vector_attrs_array(context->uncompressed_attno_info, path->info),
.rti = path->info->chunk_rel->relid,
},
.colinfo = context->uncompressed_attno_info,
};
Node *vectorized_qual = vector_qual_make(transformed_comparison, &vqidc.vqinfo);
Node *vectorized_qual = vector_qual_make(transformed_comparison, &vqi);
if (vectorized_qual)
{
*vectorized = lappend(*vectorized, vectorized_qual);
Expand All @@ -917,9 +914,9 @@ find_vectorized_quals(DecompressionMapContext *context, DecompressChunkPath *pat
{
*nonvectorized = lappend(*nonvectorized, source_qual);
}

pfree(vqidc.vqinfo.vector_attrs);
}

pfree(vqi.vector_attrs);
}

/*
Expand Down
73 changes: 59 additions & 14 deletions tsl/src/nodes/vector_agg/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ get_input_offset(DecompressChunkState *decompress_state, Var *var)
Assert(value_column_description->type == COMPRESSED_COLUMN ||
value_column_description->type == SEGMENTBY_COLUMN);

return value_column_description - dcontext->compressed_chunk_columns;
const int index = value_column_description - dcontext->compressed_chunk_columns;
return index;
}

static void
Expand Down Expand Up @@ -76,21 +77,60 @@ vector_agg_begin(CustomScanState *node, EState *estate, int eflags)
*/
List *aggregated_tlist =
castNode(CustomScan, vector_agg_state->custom.ss.ps.plan)->custom_scan_tlist;
const int naggs = list_length(aggregated_tlist);
for (int i = 0; i < naggs; i++)
const int tlist_length = list_length(aggregated_tlist);

/*
* First, count how many grouping columns and aggregate functions we have.
*/
int agg_functions_counter = 0;
int grouping_column_counter = 0;
for (int i = 0; i < tlist_length; i++)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason for not using a foreach loop here? I don't the see the index i being used for anything except getting the list elements.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to cosmetically match the same loop below where i is required.

{
TargetEntry *tlentry = (TargetEntry *) list_nth(aggregated_tlist, i);
if (IsA(tlentry->expr, Aggref))
{
agg_functions_counter++;
}
else
{
/* This is a grouping column. */
Assert(IsA(tlentry->expr, Var));
grouping_column_counter++;
}
}
Assert(agg_functions_counter + grouping_column_counter == tlist_length);

/*
* Allocate the storage for definitions of aggregate function and grouping
* columns.
*/
vector_agg_state->num_agg_defs = agg_functions_counter;
vector_agg_state->agg_defs =
palloc0(sizeof(*vector_agg_state->agg_defs) * vector_agg_state->num_agg_defs);

vector_agg_state->num_grouping_columns = grouping_column_counter;
vector_agg_state->grouping_columns = palloc0(sizeof(*vector_agg_state->grouping_columns) *
vector_agg_state->num_grouping_columns);

/*
* Loop through the aggregated targetlist again and fill the definitions.
*/
agg_functions_counter = 0;
grouping_column_counter = 0;
for (int i = 0; i < tlist_length; i++)
{
TargetEntry *tlentry = (TargetEntry *) list_nth(aggregated_tlist, i);
if (IsA(tlentry->expr, Aggref))
{
/* This is an aggregate function. */
VectorAggDef *def = palloc0(sizeof(VectorAggDef));
vector_agg_state->agg_defs = lappend(vector_agg_state->agg_defs, def);
VectorAggDef *def = &vector_agg_state->agg_defs[agg_functions_counter++];
def->output_offset = i;

Aggref *aggref = castNode(Aggref, tlentry->expr);

VectorAggFunctions *func = get_vector_aggregate(aggref->aggfnoid);
Assert(func != NULL);
def->func = func;
def->func = *func;

if (list_length(aggref->args) > 0)
{
Expand All @@ -112,21 +152,22 @@ vector_agg_begin(CustomScanState *node, EState *estate, int eflags)
/* This is a grouping column. */
Assert(IsA(tlentry->expr, Var));

GroupingColumn *col = palloc0(sizeof(GroupingColumn));
vector_agg_state->output_grouping_columns =
lappend(vector_agg_state->output_grouping_columns, col);
GroupingColumn *col = &vector_agg_state->grouping_columns[grouping_column_counter++];
col->output_offset = i;

Var *var = castNode(Var, tlentry->expr);
col->input_offset = get_input_offset(decompress_state, var);
}
}

List *grouping_column_offsets = linitial(cscan->custom_private);
/*
* Currently the only grouping policy we use is per-batch grouping.
*/
vector_agg_state->grouping =
create_grouping_policy_batch(vector_agg_state->agg_defs,
vector_agg_state->output_grouping_columns,
/* partial_per_batch = */ grouping_column_offsets != NIL);
create_grouping_policy_batch(vector_agg_state->num_agg_defs,
vector_agg_state->agg_defs,
vector_agg_state->num_grouping_columns,
vector_agg_state->grouping_columns);
}

static void
Expand Down Expand Up @@ -272,7 +313,11 @@ vector_agg_exec(CustomScanState *node)
static void
vector_agg_explain(CustomScanState *node, List *ancestors, ExplainState *es)
{
/* No additional output is needed. */
VectorAggState *state = (VectorAggState *) node;
if (es->verbose || es->format != EXPLAIN_FORMAT_TEXT)
{
ExplainPropertyText("Grouping Policy", state->grouping->gp_explain(state->grouping), es);
}
}

static struct CustomExecMethods exec_methods = {
Expand Down
12 changes: 7 additions & 5 deletions tsl/src/nodes/vector_agg/exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
#include "function/functions.h"
#include "grouping_policy.h"

typedef struct
typedef struct VectorAggDef
{
VectorAggFunctions *func;
VectorAggFunctions func;
int input_offset;
int output_offset;
} VectorAggDef;

typedef struct
typedef struct GroupingColumn
{
int input_offset;
int output_offset;
Expand All @@ -30,9 +30,11 @@ typedef struct
{
CustomScanState custom;

List *agg_defs;
int num_agg_defs;
VectorAggDef *agg_defs;

List *output_grouping_columns;
int num_grouping_columns;
GroupingColumn *grouping_columns;

/*
* We can't call the underlying scan after it has ended, or it will be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,20 @@
* implementation otherwise.
*/
static void
FUNCTION_NAME(const)(void *agg_state, Datum constvalue, bool constisnull, int n,
MemoryContext agg_extra_mctx)
FUNCTION_NAME(scalar)(void *agg_state, Datum constvalue, bool constisnull, int n,
MemoryContext agg_extra_mctx)
{
const uint64 valid = constisnull ? 0 : 1;
const CTYPE value = valid ? DATUM_TO_CTYPE(constvalue) : 0;
if (constisnull)
{
return;
}

const CTYPE value = DATUM_TO_CTYPE(constvalue);

MemoryContext old = MemoryContextSwitchTo(agg_extra_mctx);
for (int i = 0; i < n; i++)
{
FUNCTION_NAME(vector_impl)(agg_state, 1, &value, &valid, NULL, agg_extra_mctx);
FUNCTION_NAME(one)(agg_state, value);
}
MemoryContextSwitchTo(old);
}
40 changes: 9 additions & 31 deletions tsl/src/nodes/vector_agg/function/agg_vector_validity_helper.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,62 +11,40 @@
*/

static pg_attribute_always_inline void
FUNCTION_NAME(vector_impl_arrow)(void *agg_state, const ArrowArray *vector, const uint64 *valid1,
const uint64 *valid2, MemoryContext agg_extra_mctx)
FUNCTION_NAME(vector_impl_arrow)(void *agg_state, const ArrowArray *vector, const uint64 *filter,
MemoryContext agg_extra_mctx)
{
const int n = vector->length;
const CTYPE *values = vector->buffers[1];
FUNCTION_NAME(vector_impl)(agg_state, n, values, valid1, valid2, agg_extra_mctx);
FUNCTION_NAME(vector_impl)(agg_state, n, values, filter, agg_extra_mctx);
}

static pg_noinline void
FUNCTION_NAME(vector_all_valid)(void *agg_state, const ArrowArray *vector,
MemoryContext agg_extra_mctx)
{
FUNCTION_NAME(vector_impl_arrow)(agg_state, vector, NULL, NULL, agg_extra_mctx);
FUNCTION_NAME(vector_impl_arrow)(agg_state, vector, NULL, agg_extra_mctx);
}

static pg_noinline void
FUNCTION_NAME(vector_one_validity)(void *agg_state, const ArrowArray *vector, const uint64 *valid,
FUNCTION_NAME(vector_one_validity)(void *agg_state, const ArrowArray *vector, const uint64 *filter,
MemoryContext agg_extra_mctx)
{
FUNCTION_NAME(vector_impl_arrow)(agg_state, vector, valid, NULL, agg_extra_mctx);
}

static pg_noinline void
FUNCTION_NAME(vector_two_validity)(void *agg_state, const ArrowArray *vector, const uint64 *valid1,
const uint64 *valid2, MemoryContext agg_extra_mctx)
{
FUNCTION_NAME(vector_impl_arrow)(agg_state, vector, valid1, valid2, agg_extra_mctx);
FUNCTION_NAME(vector_impl_arrow)(agg_state, vector, filter, agg_extra_mctx);
}

static void
FUNCTION_NAME(vector)(void *agg_state, const ArrowArray *vector, const uint64 *filter,
MemoryContext agg_extra_mctx)
{
const uint64 *row_validity = vector->buffers[0];

if (row_validity == NULL && filter == NULL)
if (filter == NULL)
{
/* All rows are valid and we don't have to check any validity bitmaps. */
FUNCTION_NAME(vector_all_valid)(agg_state, vector, agg_extra_mctx);
}
else if (row_validity != NULL && filter == NULL)
{
/* Have to check only one bitmap -- row validity bitmap. */
FUNCTION_NAME(vector_one_validity)(agg_state, vector, row_validity, agg_extra_mctx);
}
else if (filter != NULL && row_validity == NULL)
{
/* Have to check only one bitmap -- results of the vectorized filter. */
FUNCTION_NAME(vector_one_validity)(agg_state, vector, filter, agg_extra_mctx);
}
else
{
/*
* Have to check both the row validity bitmap and the results of the
* vectorized filter.
*/
FUNCTION_NAME(vector_two_validity)(agg_state, vector, row_validity, filter, agg_extra_mctx);
/* Have to check only one combined validity bitmap. */
FUNCTION_NAME(vector_one_validity)(agg_state, vector, filter, agg_extra_mctx);
}
}
Loading
Loading