Skip to content

Commit da2ef02

Browse files
[Feature]support max_by/min_by in window op (#54961)
Signed-off-by: before-Sunrise <[email protected]>
1 parent 9446659 commit da2ef02

File tree

3 files changed

+116
-0
lines changed

3 files changed

+116
-0
lines changed

be/src/exprs/agg/maxmin_by.h

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -590,6 +590,42 @@ class MaxMinByAggregateFunction final
590590
}
591591
}
592592

593+
void get_values(FunctionContext* ctx, ConstAggDataPtr __restrict state, Column* dst, size_t start,
594+
size_t end) const override {
595+
if constexpr (State::not_filter_nulls_flag) {
596+
if (this->data(state).null_result) {
597+
DCHECK(dst->is_nullable());
598+
for (size_t i = start; i < end; ++i) {
599+
dst->append_default();
600+
}
601+
} else {
602+
if (dst->is_nullable()) {
603+
for (size_t i = start; i < end; ++i) {
604+
down_cast<NullableColumn*>(dst)->null_column()->append(DATUM_NOT_NULL);
605+
}
606+
}
607+
for (size_t i = start; i < end; ++i) {
608+
ColumnHelper::get_data_column(dst)->deserialize_and_append(this->data(state).buffer_result.data());
609+
}
610+
}
611+
} else {
612+
if (this->data(state).buffer_result.empty()) {
613+
for (size_t i = start; i < end; ++i) {
614+
dst->append_default();
615+
}
616+
} else {
617+
if (dst->is_nullable()) {
618+
for (size_t i = start; i < end; ++i) {
619+
down_cast<NullableColumn*>(dst)->null_column()->append(DATUM_NOT_NULL);
620+
}
621+
}
622+
for (size_t i = start; i < end; ++i) {
623+
ColumnHelper::get_data_column(dst)->deserialize_and_append(this->data(state).buffer_result.data());
624+
}
625+
}
626+
}
627+
}
628+
593629
std::string get_name() const override { return "maxmin_by"; }
594630
};
595631

@@ -793,6 +829,42 @@ class MaxMinByAggregateFunction<LT, State, OP, RunTimeCppType<LT>, StringLTGuard
793829
}
794830
}
795831

832+
void get_values(FunctionContext* ctx, ConstAggDataPtr __restrict state, Column* dst, size_t start,
833+
size_t end) const override {
834+
if constexpr (State::not_filter_nulls_flag) {
835+
if (this->data(state).null_result) {
836+
DCHECK(dst->is_nullable());
837+
for (size_t i = start; i < end; ++i) {
838+
dst->append_default();
839+
}
840+
} else {
841+
if (dst->is_nullable()) {
842+
for (size_t i = start; i < end; ++i) {
843+
down_cast<NullableColumn*>(dst)->null_column()->append(DATUM_NOT_NULL);
844+
}
845+
}
846+
for (size_t i = start; i < end; ++i) {
847+
ColumnHelper::get_data_column(dst)->deserialize_and_append(this->data(state).buffer_result.data());
848+
}
849+
}
850+
} else {
851+
if (this->data(state).buffer_result.empty()) {
852+
for (size_t i = start; i < end; ++i) {
853+
dst->append_default();
854+
}
855+
} else {
856+
if (dst->is_nullable()) {
857+
for (size_t i = start; i < end; ++i) {
858+
down_cast<NullableColumn*>(dst)->null_column()->append(DATUM_NOT_NULL);
859+
}
860+
}
861+
for (size_t i = start; i < end; ++i) {
862+
ColumnHelper::get_data_column(dst)->deserialize_and_append(this->data(state).buffer_result.data());
863+
}
864+
}
865+
}
866+
}
867+
796868
std::string get_name() const override { return "maxmin_by"; }
797869
};
798870

test/sql/test_max_min_by_not_filter_nulls_with_nulls/R/test_max_min_by_not_filter_nulls_with_nulls

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,4 +352,30 @@ select (sum(murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(a,0))+murmur_h
352352
select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(a,0))+murmur_hash3_32(ifnull(b,0)))) as fingerprint from (select c0,max_by(c2,concat(coalesce(c2,'NULL'),c3)) over(partition by c1) a,min_by(c2,concat(coalesce(c2,'NULL'),c3)) over(partition by c1) b from t0) as t;
353353
-- result:
354354
25406211869
355+
-- !result
356+
-- name: test_max_min_by_support_window
357+
CREATE TABLE exam (
358+
subject_id INT,
359+
subject STRING,
360+
exam_result INT
361+
) DISTRIBUTED BY HASH(`subject_id`) PROPERTIES ("replication_num" = "1");
362+
-- result:
363+
-- !result
364+
insert into exam values
365+
(1,'math',90),
366+
(2,'english',70),
367+
(3,'physics',95),
368+
(4,'chemistry',85),
369+
(5,'music',95),
370+
(6,'biology',null);
371+
-- result:
372+
-- !result
373+
SELECT max_by(subject, exam_result) over(partition by subject_id) FROM exam;
374+
-- result:
375+
english
376+
None
377+
physics
378+
chemistry
379+
music
380+
math
355381
-- !result

test/sql/test_max_min_by_not_filter_nulls_with_nulls/T/test_max_min_by_not_filter_nulls_with_nulls

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,3 +112,21 @@ select (sum(murmur_hash3_32(ifnull(__c_0,0))+murmur_hash3_32(ifnull(a,0))+murmu
112112
select (sum(murmur_hash3_32(ifnull(__c_0,0))+murmur_hash3_32(ifnull(a,0))+murmur_hash3_32(ifnull(b,0)))) as fingerprint from (select (count(DISTINCT c1)) as __c_0 ,max_by(c2,concat(coalesce(c2,'NULL'),c3)) a,min_by(c2,concat(coalesce(c2,'NULL'),c3)) b from t0) as t;
113113
select (sum(murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(a,0))+murmur_hash3_32(ifnull(b,0)))) as fingerprint from (select c2,max_by(c0,coalesce(c0,0)*1000+c1) over(partition by c2) a,min_by(c0,coalesce(c0,0)*1000+c1) over(partition by c2) b from t0) as t;
114114
select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(a,0))+murmur_hash3_32(ifnull(b,0)))) as fingerprint from (select c0,max_by(c2,concat(coalesce(c2,'NULL'),c3)) over(partition by c1) a,min_by(c2,concat(coalesce(c2,'NULL'),c3)) over(partition by c1) b from t0) as t;
115+
116+
-- name: test_max_min_by_support_window
117+
CREATE TABLE exam (
118+
subject_id INT,
119+
subject STRING,
120+
exam_result INT
121+
) DISTRIBUTED BY HASH(`subject_id`) PROPERTIES ("replication_num" = "1");
122+
123+
124+
insert into exam values
125+
(1,'math',90),
126+
(2,'english',70),
127+
(3,'physics',95),
128+
(4,'chemistry',85),
129+
(5,'music',95),
130+
(6,'biology',null);
131+
132+
SELECT max_by(subject, exam_result) over(partition by subject_id) FROM exam;

0 commit comments

Comments
 (0)