Skip to content

Commit 8d0e869

Browse files
stdrcBugenZhao
andauthored
refactor(common): unify order-related types (risingwavelabs#8449)
Signed-off-by: Richard Chien <[email protected]> Co-authored-by: Bugen Zhao <[email protected]>
1 parent 4b008ac commit 8d0e869

File tree

105 files changed

+1076
-1289
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

105 files changed

+1076
-1289
lines changed

src/batch/benches/sort.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criteri
1818
use risingwave_batch::executor::{BoxedExecutor, SortExecutor};
1919
use risingwave_common::enable_jemalloc_on_linux;
2020
use risingwave_common::types::DataType;
21-
use risingwave_common::util::sort_util::{OrderPair, OrderType};
21+
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
2222
use tokio::runtime::Runtime;
2323
use utils::{create_input, execute_executor};
2424

@@ -30,9 +30,9 @@ fn create_order_by_executor(
3030
single_column: bool,
3131
) -> BoxedExecutor {
3232
const CHUNK_SIZE: usize = 1024;
33-
let (child, order_pairs) = if single_column {
33+
let (child, column_orders) = if single_column {
3434
let input = create_input(&[DataType::Int64], chunk_size, chunk_num);
35-
(input, vec![OrderPair::new(0, OrderType::Ascending)])
35+
(input, vec![ColumnOrder::new(0, OrderType::ascending())])
3636
} else {
3737
let input = create_input(
3838
&[
@@ -47,16 +47,16 @@ fn create_order_by_executor(
4747
(
4848
input,
4949
vec![
50-
OrderPair::new(0, OrderType::Ascending),
51-
OrderPair::new(1, OrderType::Descending),
52-
OrderPair::new(2, OrderType::Ascending),
50+
ColumnOrder::new(0, OrderType::ascending()),
51+
ColumnOrder::new(1, OrderType::descending()),
52+
ColumnOrder::new(2, OrderType::ascending()),
5353
],
5454
)
5555
};
5656

5757
Box::new(SortExecutor::new(
5858
child,
59-
order_pairs,
59+
column_orders,
6060
"SortExecutor".into(),
6161
CHUNK_SIZE,
6262
))

src/batch/benches/top_n.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criteri
1818
use risingwave_batch::executor::{BoxedExecutor, TopNExecutor};
1919
use risingwave_common::enable_jemalloc_on_linux;
2020
use risingwave_common::types::DataType;
21-
use risingwave_common::util::sort_util::{OrderPair, OrderType};
21+
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
2222
use tokio::runtime::Runtime;
2323
use utils::{create_input, execute_executor};
2424

@@ -32,9 +32,9 @@ fn create_top_n_executor(
3232
limit: usize,
3333
) -> BoxedExecutor {
3434
const CHUNK_SIZE: usize = 1024;
35-
let (child, order_pairs) = if single_column {
35+
let (child, column_orders) = if single_column {
3636
let input = create_input(&[DataType::Int64], chunk_size, chunk_num);
37-
(input, vec![OrderPair::new(0, OrderType::Ascending)])
37+
(input, vec![ColumnOrder::new(0, OrderType::ascending())])
3838
} else {
3939
let input = create_input(
4040
&[
@@ -49,16 +49,16 @@ fn create_top_n_executor(
4949
(
5050
input,
5151
vec![
52-
OrderPair::new(0, OrderType::Ascending),
53-
OrderPair::new(1, OrderType::Descending),
54-
OrderPair::new(2, OrderType::Ascending),
52+
ColumnOrder::new(0, OrderType::ascending()),
53+
ColumnOrder::new(1, OrderType::descending()),
54+
ColumnOrder::new(2, OrderType::ascending()),
5555
],
5656
)
5757
};
5858

5959
Box::new(TopNExecutor::new(
6060
child,
61-
order_pairs,
61+
column_orders,
6262
offset,
6363
limit,
6464
false,

src/batch/src/executor/group_top_n.rs

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use risingwave_common::types::DataType;
2727
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
2828
use risingwave_common::util::encoding_for_comparison::encode_chunk;
2929
use risingwave_common::util::iter_util::ZipEqFast;
30-
use risingwave_common::util::sort_util::OrderPair;
30+
use risingwave_common::util::sort_util::ColumnOrder;
3131
use risingwave_pb::batch_plan::plan_node::NodeBody;
3232

3333
use super::top_n::{HeapElem, TopNHeap};
@@ -41,7 +41,7 @@ use crate::task::BatchTaskContext;
4141
/// For each group, use a N-heap to store the smallest N rows.
4242
pub struct GroupTopNExecutor<K: HashKey> {
4343
child: BoxedExecutor,
44-
order_pairs: Vec<OrderPair>,
44+
column_orders: Vec<ColumnOrder>,
4545
offset: usize,
4646
limit: usize,
4747
group_key: Vec<usize>,
@@ -54,7 +54,7 @@ pub struct GroupTopNExecutor<K: HashKey> {
5454

5555
pub struct GroupTopNExecutorBuilder {
5656
child: BoxedExecutor,
57-
order_pairs: Vec<OrderPair>,
57+
column_orders: Vec<ColumnOrder>,
5858
offset: usize,
5959
limit: usize,
6060
group_key: Vec<usize>,
@@ -70,7 +70,7 @@ impl HashKeyDispatcher for GroupTopNExecutorBuilder {
7070
fn dispatch_impl<K: HashKey>(self) -> Self::Output {
7171
Box::new(GroupTopNExecutor::<K>::new(
7272
self.child,
73-
self.order_pairs,
73+
self.column_orders,
7474
self.offset,
7575
self.limit,
7676
self.with_ties,
@@ -98,10 +98,10 @@ impl BoxedExecutorBuilder for GroupTopNExecutorBuilder {
9898
NodeBody::GroupTopN
9999
)?;
100100

101-
let order_pairs = top_n_node
101+
let column_orders = top_n_node
102102
.column_orders
103103
.iter()
104-
.map(OrderPair::from_protobuf)
104+
.map(ColumnOrder::from_protobuf)
105105
.collect();
106106

107107
let group_key = top_n_node
@@ -117,7 +117,7 @@ impl BoxedExecutorBuilder for GroupTopNExecutorBuilder {
117117

118118
let builder = Self {
119119
child,
120-
order_pairs,
120+
column_orders,
121121
offset: top_n_node.get_offset() as usize,
122122
limit: top_n_node.get_limit() as usize,
123123
group_key,
@@ -135,7 +135,7 @@ impl<K: HashKey> GroupTopNExecutor<K> {
135135
#[expect(clippy::too_many_arguments)]
136136
pub fn new(
137137
child: BoxedExecutor,
138-
order_pairs: Vec<OrderPair>,
138+
column_orders: Vec<ColumnOrder>,
139139
offset: usize,
140140
limit: usize,
141141
with_ties: bool,
@@ -146,7 +146,7 @@ impl<K: HashKey> GroupTopNExecutor<K> {
146146
let schema = child.schema().clone();
147147
Self {
148148
child,
149-
order_pairs,
149+
column_orders,
150150
offset,
151151
limit,
152152
with_ties,
@@ -186,7 +186,7 @@ impl<K: HashKey> GroupTopNExecutor<K> {
186186
let chunk = Arc::new(chunk?.compact());
187187
let keys = K::build(self.group_key.as_slice(), &chunk)?;
188188

189-
for (row_id, (encoded_row, key)) in encode_chunk(&chunk, &self.order_pairs)
189+
for (row_id, (encoded_row, key)) in encode_chunk(&chunk, &self.column_orders)
190190
.into_iter()
191191
.zip_eq_fast(keys.into_iter())
192192
.enumerate()
@@ -256,19 +256,19 @@ mod tests {
256256
5 2 2
257257
",
258258
));
259-
let order_pairs = vec![
260-
OrderPair {
261-
column_idx: 1,
262-
order_type: OrderType::Ascending,
259+
let column_orders = vec![
260+
ColumnOrder {
261+
column_index: 1,
262+
order_type: OrderType::ascending(),
263263
},
264-
OrderPair {
265-
column_idx: 0,
266-
order_type: OrderType::Ascending,
264+
ColumnOrder {
265+
column_index: 0,
266+
order_type: OrderType::ascending(),
267267
},
268268
];
269269
let top_n_executor = (GroupTopNExecutorBuilder {
270270
child: Box::new(mock_executor),
271-
order_pairs,
271+
column_orders,
272272
offset: 1,
273273
limit: 3,
274274
with_ties: false,

src/batch/src/executor/join/distributed_lookup_join.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder {
186186
let order_types: Vec<OrderType> = table_desc
187187
.pk
188188
.iter()
189-
.map(|order| OrderType::from_protobuf(&order.get_order_type().unwrap().direction()))
189+
.map(|order| OrderType::from_protobuf(order.get_order_type().unwrap()))
190190
.collect();
191191

192192
let pk_indices = table_desc

src/batch/src/executor/join/local_lookup_join.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,7 @@ mod tests {
465465
use risingwave_common::hash::HashKeyDispatcher;
466466
use risingwave_common::types::{DataType, ScalarImpl};
467467
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
468-
use risingwave_common::util::sort_util::{OrderPair, OrderType};
468+
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
469469
use risingwave_expr::expr::{
470470
new_binary_expr, BoxedExpression, InputRefExpression, LiteralExpression,
471471
};
@@ -557,20 +557,20 @@ mod tests {
557557
}
558558

559559
fn create_order_by_executor(child: BoxedExecutor) -> BoxedExecutor {
560-
let order_pairs = vec![
561-
OrderPair {
562-
column_idx: 0,
563-
order_type: OrderType::Ascending,
560+
let column_orders = vec![
561+
ColumnOrder {
562+
column_index: 0,
563+
order_type: OrderType::ascending(),
564564
},
565-
OrderPair {
566-
column_idx: 1,
567-
order_type: OrderType::Ascending,
565+
ColumnOrder {
566+
column_index: 1,
567+
order_type: OrderType::ascending(),
568568
},
569569
];
570570

571571
Box::new(SortExecutor::new(
572572
child,
573-
order_pairs,
573+
column_orders,
574574
"SortExecutor".into(),
575575
CHUNK_SIZE,
576576
))

src/batch/src/executor/merge_sort_exchange.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use risingwave_common::array::DataChunk;
2020
use risingwave_common::catalog::{Field, Schema};
2121
use risingwave_common::error::{Result, RwError};
2222
use risingwave_common::types::ToOwnedDatum;
23-
use risingwave_common::util::sort_util::{HeapElem, OrderPair};
23+
use risingwave_common::util::sort_util::{ColumnOrder, HeapElem};
2424
use risingwave_pb::batch_plan::plan_node::NodeBody;
2525
use risingwave_pb::batch_plan::ExchangeSource as ProstExchangeSource;
2626

@@ -39,7 +39,7 @@ pub struct MergeSortExchangeExecutorImpl<CS, C> {
3939
context: C,
4040
/// keeps one data chunk of each source if any
4141
source_inputs: Vec<Option<DataChunk>>,
42-
order_pairs: Arc<Vec<OrderPair>>,
42+
column_orders: Arc<Vec<ColumnOrder>>,
4343
min_heap: BinaryHeap<HeapElem>,
4444
proto_sources: Vec<ProstExchangeSource>,
4545
sources: Vec<ExchangeSourceImpl>, // impl
@@ -76,7 +76,7 @@ impl<CS: 'static + Send + CreateSource, C: BatchTaskContext> MergeSortExchangeEx
7676
assert!(source_idx < self.source_inputs.len());
7777
let chunk_ref = self.source_inputs[source_idx].as_ref().unwrap();
7878
self.min_heap.push(HeapElem {
79-
order_pairs: self.order_pairs.clone(),
79+
column_orders: self.column_orders.clone(),
8080
chunk: chunk_ref.clone(),
8181
chunk_idx: source_idx,
8282
elem_idx: row_idx,
@@ -191,12 +191,12 @@ impl BoxedExecutorBuilder for MergeSortExchangeExecutorBuilder {
191191
NodeBody::MergeSortExchange
192192
)?;
193193

194-
let order_pairs = sort_merge_node
194+
let column_orders = sort_merge_node
195195
.column_orders
196196
.iter()
197-
.map(OrderPair::from_protobuf)
197+
.map(ColumnOrder::from_protobuf)
198198
.collect();
199-
let order_pairs = Arc::new(order_pairs);
199+
let column_orders = Arc::new(column_orders);
200200

201201
let exchange_node = sort_merge_node.get_exchange()?;
202202
let proto_sources: Vec<ProstExchangeSource> = exchange_node.get_sources().to_vec();
@@ -213,7 +213,7 @@ impl BoxedExecutorBuilder for MergeSortExchangeExecutorBuilder {
213213
Ok(Box::new(MergeSortExchangeExecutor::<C> {
214214
context: source.context().clone(),
215215
source_inputs: vec![None; num_sources],
216-
order_pairs,
216+
column_orders,
217217
min_heap: BinaryHeap::new(),
218218
proto_sources,
219219
sources: vec![],
@@ -260,9 +260,9 @@ mod tests {
260260
proto_sources.push(ProstExchangeSource::default());
261261
source_creators.push(fake_create_source.clone());
262262
}
263-
let order_pairs = Arc::new(vec![OrderPair {
264-
column_idx: 0,
265-
order_type: OrderType::Ascending,
263+
let column_orders = Arc::new(vec![ColumnOrder {
264+
column_index: 0,
265+
order_type: OrderType::ascending(),
266266
}]);
267267

268268
let executor = Box::new(MergeSortExchangeExecutorImpl::<
@@ -271,7 +271,7 @@ mod tests {
271271
> {
272272
context: ComputeNodeContext::for_test(),
273273
source_inputs: vec![None; proto_sources.len()],
274-
order_pairs,
274+
column_orders,
275275
min_heap: BinaryHeap::new(),
276276
proto_sources,
277277
sources: vec![],

0 commit comments

Comments
 (0)