Skip to content

Commit 4cefdbe

Browse files
committed
Merge branch 'v1.2-histrionicus' into querying-dropped
2 parents 34ba716 + 24d18d0 commit 4cefdbe

File tree

60 files changed

+2698
-223
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

60 files changed

+2698
-223
lines changed
88 Bytes
Binary file not shown.

data/csv/afl/4172/case_4.csv

239 Bytes
Binary file not shown.

data/csv/bad_csv_file_2047.csv

Lines changed: 2054 additions & 0 deletions
Large diffs are not rendered by default.

extension/core_functions/aggregate/distributive/string_agg.cpp

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -44,41 +44,33 @@ struct StringAggFunction {
4444
if (!state.dataptr) {
4545
finalize_data.ReturnNull();
4646
} else {
47-
target = StringVector::AddString(finalize_data.result, state.dataptr, state.size);
48-
}
49-
}
50-
51-
template <class STATE>
52-
static void Destroy(STATE &state, AggregateInputData &aggr_input_data) {
53-
if (state.dataptr) {
54-
delete[] state.dataptr;
47+
target = string_t(state.dataptr, state.size);
5548
}
5649
}
5750

5851
static bool IgnoreNull() {
5952
return true;
6053
}
6154

62-
static inline void PerformOperation(StringAggState &state, const char *str, const char *sep, idx_t str_size,
63-
idx_t sep_size) {
55+
static inline void PerformOperation(StringAggState &state, ArenaAllocator &allocator, const char *str,
56+
const char *sep, idx_t str_size, idx_t sep_size) {
6457
if (!state.dataptr) {
6558
// first iteration: allocate space for the string and copy it into the state
6659
state.alloc_size = MaxValue<idx_t>(8, NextPowerOfTwo(str_size));
67-
state.dataptr = new char[state.alloc_size];
60+
state.dataptr = char_ptr_cast(allocator.Allocate(state.alloc_size));
6861
state.size = str_size;
6962
memcpy(state.dataptr, str, str_size);
7063
} else {
7164
// subsequent iteration: first check if we have space to place the string and separator
7265
idx_t required_size = state.size + str_size + sep_size;
7366
if (required_size > state.alloc_size) {
7467
// no space! allocate extra space
68+
const auto old_size = state.alloc_size;
7569
while (state.alloc_size < required_size) {
7670
state.alloc_size *= 2;
7771
}
78-
auto new_data = new char[state.alloc_size];
79-
memcpy(new_data, state.dataptr, state.size);
80-
delete[] state.dataptr;
81-
state.dataptr = new_data;
72+
state.dataptr =
73+
char_ptr_cast(allocator.Reallocate(data_ptr_cast(state.dataptr), old_size, state.alloc_size));
8274
}
8375
// copy the separator
8476
memcpy(state.dataptr + state.size, sep, sep_size);
@@ -89,14 +81,15 @@ struct StringAggFunction {
8981
}
9082
}
9183

92-
static inline void PerformOperation(StringAggState &state, string_t str, optional_ptr<FunctionData> data_p) {
84+
static inline void PerformOperation(StringAggState &state, ArenaAllocator &allocator, string_t str,
85+
optional_ptr<FunctionData> data_p) {
9386
auto &data = data_p->Cast<StringAggBindData>();
94-
PerformOperation(state, str.GetData(), data.sep.c_str(), str.GetSize(), data.sep.size());
87+
PerformOperation(state, allocator, str.GetData(), data.sep.c_str(), str.GetSize(), data.sep.size());
9588
}
9689

9790
template <class INPUT_TYPE, class STATE, class OP>
9891
static void Operation(STATE &state, const INPUT_TYPE &input, AggregateUnaryInput &unary_input) {
99-
PerformOperation(state, input, unary_input.input.bind_data);
92+
PerformOperation(state, unary_input.input.allocator, input, unary_input.input.bind_data);
10093
}
10194

10295
template <class INPUT_TYPE, class STATE, class OP>
@@ -113,8 +106,8 @@ struct StringAggFunction {
113106
// source is not set: skip combining
114107
return;
115108
}
116-
PerformOperation(target, string_t(source.dataptr, UnsafeNumericCast<uint32_t>(source.size)),
117-
aggr_input_data.bind_data);
109+
PerformOperation(target, aggr_input_data.allocator,
110+
string_t(source.dataptr, UnsafeNumericCast<uint32_t>(source.size)), aggr_input_data.bind_data);
118111
}
119112
};
120113

@@ -162,8 +155,7 @@ AggregateFunctionSet StringAggFun::GetFunctions() {
162155
AggregateFunction::UnaryScatterUpdate<StringAggState, string_t, StringAggFunction>,
163156
AggregateFunction::StateCombine<StringAggState, StringAggFunction>,
164157
AggregateFunction::StateFinalize<StringAggState, string_t, StringAggFunction>,
165-
AggregateFunction::UnaryUpdate<StringAggState, string_t, StringAggFunction>, StringAggBind,
166-
AggregateFunction::StateDestroy<StringAggState, StringAggFunction>);
158+
AggregateFunction::UnaryUpdate<StringAggState, string_t, StringAggFunction>, StringAggBind);
167159
string_agg_param.serialize = StringAggSerialize;
168160
string_agg_param.deserialize = StringAggDeserialize;
169161
string_agg.AddFunction(string_agg_param);

extension/core_functions/aggregate/nested/list.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,6 @@ static void ListFinalize(Vector &states_vector, AggregateInputData &aggr_input_d
116116

117117
// first iterate over all entries and set up the list entries, and get the newly required total length
118118
for (idx_t i = 0; i < count; i++) {
119-
120119
auto &state = *states[states_data.sel->get_index(i)];
121120
const auto rid = i + offset;
122121
result_data[rid].offset = total_len;

extension/core_functions/scalar/list/list_aggregates.cpp

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,17 @@
1515

1616
namespace duckdb {
1717

18-
// FIXME: use a local state for each thread to increase performance?
18+
struct ListAggregatesLocalState : public FunctionLocalState {
19+
explicit ListAggregatesLocalState(Allocator &allocator) : arena_allocator(allocator) {
20+
}
21+
22+
ArenaAllocator arena_allocator;
23+
};
24+
25+
unique_ptr<FunctionLocalState> ListAggregatesInitLocalState(ExpressionState &state, const BoundFunctionExpression &expr,
26+
FunctionData *bind_data) {
27+
return make_uniq<ListAggregatesLocalState>(BufferAllocator::Get(state.GetContext()));
28+
}
1929
// FIXME: benchmark the use of simple_update against using update (if applicable)
2030

2131
static unique_ptr<FunctionData> ListAggregatesBindFailure(ScalarFunction &bound_function) {
@@ -207,7 +217,8 @@ static void ListAggregatesFunction(DataChunk &args, ExpressionState &state, Vect
207217
auto &func_expr = state.expr.Cast<BoundFunctionExpression>();
208218
auto &info = func_expr.bind_info->Cast<ListAggregatesBindData>();
209219
auto &aggr = info.aggr_expr->Cast<BoundAggregateExpression>();
210-
ArenaAllocator allocator(Allocator::DefaultAllocator());
220+
auto &allocator = ExecuteFunctionState::GetFunctionState(state)->Cast<ListAggregatesLocalState>().arena_allocator;
221+
allocator.Reset();
211222
AggregateInputData aggr_input_data(aggr.bind_info.get(), allocator);
212223

213224
D_ASSERT(aggr.function.update);
@@ -511,8 +522,9 @@ static unique_ptr<FunctionData> ListUniqueBind(ClientContext &context, ScalarFun
511522
}
512523

513524
ScalarFunction ListAggregateFun::GetFunction() {
514-
auto result = ScalarFunction({LogicalType::LIST(LogicalType::ANY), LogicalType::VARCHAR}, LogicalType::ANY,
515-
ListAggregateFunction, ListAggregateBind);
525+
auto result =
526+
ScalarFunction({LogicalType::LIST(LogicalType::ANY), LogicalType::VARCHAR}, LogicalType::ANY,
527+
ListAggregateFunction, ListAggregateBind, nullptr, nullptr, ListAggregatesInitLocalState);
516528
BaseScalarFunction::SetReturnsError(result);
517529
result.null_handling = FunctionNullHandling::SPECIAL_HANDLING;
518530
result.varargs = LogicalType::ANY;
@@ -523,12 +535,12 @@ ScalarFunction ListAggregateFun::GetFunction() {
523535

524536
ScalarFunction ListDistinctFun::GetFunction() {
525537
return ScalarFunction({LogicalType::LIST(LogicalType::ANY)}, LogicalType::LIST(LogicalType::ANY),
526-
ListDistinctFunction, ListDistinctBind);
538+
ListDistinctFunction, ListDistinctBind, nullptr, nullptr, ListAggregatesInitLocalState);
527539
}
528540

529541
ScalarFunction ListUniqueFun::GetFunction() {
530542
return ScalarFunction({LogicalType::LIST(LogicalType::ANY)}, LogicalType::UBIGINT, ListUniqueFunction,
531-
ListUniqueBind);
543+
ListUniqueBind, nullptr, nullptr, ListAggregatesInitLocalState);
532544
}
533545

534546
} // namespace duckdb

extension/json/buffered_json_reader.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ void JSONFileHandle::ReadAtPosition(char *pointer, idx_t size, idx_t position, b
9292
auto &handle = override_handle ? *override_handle.get() : *file_handle.get();
9393
if (can_seek) {
9494
handle.Read(pointer, size, position);
95-
} else if (sample_run) { // Cache the buffer
95+
} else if (file_handle->IsPipe()) { // Cache the buffer
9696
handle.Read(pointer, size, position);
9797

9898
cached_buffers.emplace_back(allocator.Allocate(size));
@@ -128,7 +128,7 @@ bool JSONFileHandle::Read(char *pointer, idx_t &read_size, idx_t requested_size,
128128
if (can_seek) {
129129
read_size = ReadInternal(pointer, requested_size);
130130
read_position += read_size;
131-
} else if (sample_run) { // Cache the buffer
131+
} else if (file_handle->IsPipe()) { // Cache the buffer
132132
read_size = ReadInternal(pointer, requested_size);
133133
if (read_size > 0) {
134134
cached_buffers.emplace_back(allocator.Allocate(read_size));

extension/json/json_extension.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,17 @@
1717
namespace duckdb {
1818

1919
static DefaultMacro json_macros[] = {
20-
{DEFAULT_SCHEMA, "json_group_array", {"x", nullptr}, {{nullptr, nullptr}}, "to_json(list(x))"},
20+
{DEFAULT_SCHEMA,
21+
"json_group_array",
22+
{"x", nullptr},
23+
{{nullptr, nullptr}},
24+
"CAST('[' || string_agg(CASE WHEN x IS NULL THEN 'null'::JSON ELSE to_json(x) END, ',') || ']' AS JSON)"},
2125
{DEFAULT_SCHEMA,
2226
"json_group_object",
23-
{"name", "value", nullptr},
27+
{"n", "v", nullptr},
2428
{{nullptr, nullptr}},
25-
"to_json(map(list(name), list(value)))"},
29+
"CAST('{' || string_agg(to_json(n::VARCHAR) || ':' || CASE WHEN v IS NULL THEN 'null'::JSON ELSE to_json(v) END, "
30+
"',') || '}' AS JSON)"},
2631
{DEFAULT_SCHEMA,
2732
"json_group_structure",
2833
{"x", nullptr},

extension/parquet/column_reader.cpp

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,8 @@ void ColumnReader::PreparePageV2(PageHeader &page_hdr) {
319319

320320
auto compressed_bytes = page_hdr.compressed_page_size - uncompressed_bytes;
321321

322-
AllocateCompressed(compressed_bytes);
322+
ResizeableBuffer compressed_buffer;
323+
compressed_buffer.resize(GetAllocator(), compressed_bytes);
323324
reader.ReadData(*protocol, compressed_buffer.ptr, compressed_bytes);
324325

325326
DecompressInternal(chunk->meta_data.codec, compressed_buffer.ptr, compressed_bytes, block->ptr + uncompressed_bytes,
@@ -334,10 +335,6 @@ void ColumnReader::AllocateBlock(idx_t size) {
334335
}
335336
}
336337

337-
void ColumnReader::AllocateCompressed(idx_t size) {
338-
compressed_buffer.resize(GetAllocator(), size);
339-
}
340-
341338
void ColumnReader::PreparePage(PageHeader &page_hdr) {
342339
AllocateBlock(page_hdr.uncompressed_page_size + 1);
343340
if (chunk->meta_data.codec == CompressionCodec::UNCOMPRESSED) {
@@ -348,7 +345,8 @@ void ColumnReader::PreparePage(PageHeader &page_hdr) {
348345
return;
349346
}
350347

351-
AllocateCompressed(page_hdr.compressed_page_size + 1);
348+
ResizeableBuffer compressed_buffer;
349+
compressed_buffer.resize(GetAllocator(), page_hdr.compressed_page_size + 1);
352350
reader.ReadData(*protocol, compressed_buffer.ptr, page_hdr.compressed_page_size);
353351

354352
DecompressInternal(chunk->meta_data.codec, compressed_buffer.ptr, page_hdr.compressed_page_size, block->ptr,

extension/parquet/column_writer.cpp

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,7 @@ class BasicColumnWriter : public ColumnWriter {
388388
virtual unique_ptr<ColumnWriterStatistics> InitializeStatsState();
389389

390390
//! Initialize the writer for a specific page. Only used for scalar types.
391-
virtual unique_ptr<ColumnWriterPageState> InitializePageState(BasicColumnWriterState &state);
391+
virtual unique_ptr<ColumnWriterPageState> InitializePageState(BasicColumnWriterState &state, idx_t page_idx);
392392

393393
//! Flushes the writer for a specific page. Only used for scalar types.
394394
virtual void FlushPageState(WriteStream &temp_writer, ColumnWriterPageState *state);
@@ -427,7 +427,8 @@ void BasicColumnWriter::RegisterToRowGroup(duckdb_parquet::RowGroup &row_group)
427427
row_group.columns.push_back(std::move(column_chunk));
428428
}
429429

430-
unique_ptr<ColumnWriterPageState> BasicColumnWriter::InitializePageState(BasicColumnWriterState &state) {
430+
unique_ptr<ColumnWriterPageState> BasicColumnWriter::InitializePageState(BasicColumnWriterState &state,
431+
idx_t page_idx) {
431432
return nullptr;
432433
}
433434

@@ -502,7 +503,7 @@ void BasicColumnWriter::BeginWrite(ColumnWriterState &state_p) {
502503
MaxValue<idx_t>(NextPowerOfTwo(page_info.estimated_page_size), MemoryStream::DEFAULT_INITIAL_CAPACITY));
503504
write_info.write_count = page_info.empty_count;
504505
write_info.max_write_count = page_info.row_count;
505-
write_info.page_state = InitializePageState(state);
506+
write_info.page_state = InitializePageState(state, page_idx);
506507

507508
write_info.compressed_size = 0;
508509
write_info.compressed_data = nullptr;
@@ -1232,11 +1233,11 @@ class StandardColumnWriter : public BasicColumnWriter {
12321233
return std::move(result);
12331234
}
12341235

1235-
unique_ptr<ColumnWriterPageState> InitializePageState(BasicColumnWriterState &state_p) override {
1236+
unique_ptr<ColumnWriterPageState> InitializePageState(BasicColumnWriterState &state_p, idx_t page_idx) override {
12361237
auto &state = state_p.Cast<StandardColumnWriterState<SRC>>();
1237-
1238-
auto result = make_uniq<StandardWriterPageState<SRC, TGT>>(state.total_value_count, state.total_string_size,
1239-
state.encoding, state.dictionary);
1238+
const auto &page_info = state_p.page_info[page_idx];
1239+
auto result = make_uniq<StandardWriterPageState<SRC, TGT>>(
1240+
page_info.row_count - page_info.empty_count, state.total_string_size, state.encoding, state.dictionary);
12401241
return std::move(result);
12411242
}
12421243

@@ -1586,7 +1587,7 @@ class BooleanColumnWriter : public BasicColumnWriter {
15861587
}
15871588
}
15881589

1589-
unique_ptr<ColumnWriterPageState> InitializePageState(BasicColumnWriterState &state) override {
1590+
unique_ptr<ColumnWriterPageState> InitializePageState(BasicColumnWriterState &state, idx_t page_idx) override {
15901591
return make_uniq<BooleanWriterPageState>();
15911592
}
15921593

@@ -1828,7 +1829,7 @@ class EnumColumnWriter : public BasicColumnWriter {
18281829
}
18291830
}
18301831

1831-
unique_ptr<ColumnWriterPageState> InitializePageState(BasicColumnWriterState &state) override {
1832+
unique_ptr<ColumnWriterPageState> InitializePageState(BasicColumnWriterState &state, idx_t page_idx) override {
18321833
return make_uniq<EnumWriterPageState>(bit_width);
18331834
}
18341835

extension/parquet/include/column_reader.hpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,6 @@ class ColumnReader {
160160

161161
private:
162162
void AllocateBlock(idx_t size);
163-
void AllocateCompressed(idx_t size);
164163
void PrepareRead(parquet_filter_t &filter);
165164
void PreparePage(PageHeader &page_hdr);
166165
void PrepareDataPage(PageHeader &page_hdr);
@@ -178,7 +177,6 @@ class ColumnReader {
178177

179178
shared_ptr<ResizeableBuffer> block;
180179

181-
ResizeableBuffer compressed_buffer;
182180
ResizeableBuffer offset_buffer;
183181

184182
unique_ptr<RleBpDecoder> dict_decoder;

extension/parquet/include/parquet_bss_encoder.hpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ class BssEncoder {
3030
}
3131

3232
void FinishWrite(WriteStream &writer) {
33-
D_ASSERT(count == total_value_count);
3433
writer.WriteData(buffer.get(), total_value_count * bit_width);
3534
}
3635

extension/parquet/include/parquet_dlba_encoder.hpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,8 @@ class DlbaEncoder {
3333
}
3434

3535
void FinishWrite(WriteStream &writer) {
36-
D_ASSERT(stream->GetPosition() == total_string_size);
3736
dbp_encoder.FinishWrite(writer);
38-
writer.WriteData(buffer.get(), total_string_size);
37+
writer.WriteData(buffer.get(), stream->GetPosition());
3938
}
4039

4140
private:

src/common/compressed_file_system.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ void CompressedFile::Initialize(bool write) {
3131
stream_data.out_buff_start = stream_data.out_buff.get();
3232
stream_data.out_buff_end = stream_data.out_buff.get();
3333

34+
current_position = 0;
35+
3436
stream_wrapper = compressed_fs.CreateStream();
3537
stream_wrapper->Initialize(*this, write);
3638
}

src/execution/index/art/iterator.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,11 @@ bool Iterator::Scan(const ARTKey &upper_bound, const idx_t max_count, unsafe_vec
4646
bool has_next;
4747
do {
4848
// An empty upper bound indicates that no upper bound exists.
49-
if (!upper_bound.Empty() && status == GateStatus::GATE_NOT_SET) {
50-
if (current_key.GreaterThan(upper_bound, equal, nested_depth)) {
51-
return true;
49+
if (!upper_bound.Empty()) {
50+
if (status == GateStatus::GATE_NOT_SET || entered_nested_leaf) {
51+
if (current_key.GreaterThan(upper_bound, equal, nested_depth)) {
52+
return true;
53+
}
5254
}
5355
}
5456

@@ -86,6 +88,7 @@ bool Iterator::Scan(const ARTKey &upper_bound, const idx_t max_count, unsafe_vec
8688
throw InternalException("Invalid leaf type for index scan.");
8789
}
8890

91+
entered_nested_leaf = false;
8992
has_next = Next();
9093
} while (has_next);
9194
return true;
@@ -104,6 +107,7 @@ void Iterator::FindMinimum(const Node &node) {
104107
if (node.GetGateStatus() == GateStatus::GATE_SET) {
105108
D_ASSERT(status == GateStatus::GATE_NOT_SET);
106109
status = GateStatus::GATE_SET;
110+
entered_nested_leaf = true;
107111
nested_depth = 0;
108112
}
109113

src/execution/operator/aggregate/physical_window.cpp

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -575,6 +575,11 @@ class WindowLocalSourceState : public LocalSourceState {
575575

576576
explicit WindowLocalSourceState(WindowGlobalSourceState &gsource);
577577

578+
void ReleaseLocalStates() {
579+
auto &local_states = window_hash_group->thread_states.at(task->thread_idx);
580+
local_states.clear();
581+
}
582+
578583
//! Does the task have more work to do?
579584
bool TaskFinished() const {
580585
return !task || task->begin_idx == task->end_idx;
@@ -792,6 +797,12 @@ void WindowGlobalSourceState::FinishTask(TaskPtr task) {
792797
}
793798

794799
bool WindowLocalSourceState::TryAssignTask() {
800+
D_ASSERT(TaskFinished());
801+
if (task && task->stage == WindowGroupStage::GETDATA) {
802+
// If this state completed the last block in the previous iteration,
803+
// release out local state memory.
804+
ReleaseLocalStates();
805+
}
795806
// Because downstream operators may be using our internal buffers,
796807
// we can't "finish" a task until we are about to get the next one.
797808

@@ -888,10 +899,6 @@ void WindowLocalSourceState::GetData(DataChunk &result) {
888899
++task->begin_idx;
889900
}
890901

891-
// If that was the last block, release out local state memory.
892-
if (TaskFinished()) {
893-
local_states.clear();
894-
}
895902
result.Verify();
896903
}
897904

0 commit comments

Comments
 (0)