Skip to content

Commit 075d50a

Browse files
authored
feat(frontend): support BATCH_PARALLELISM (risingwavelabs#8552)
1 parent bd9d156 commit 075d50a

File tree

7 files changed

+70
-1
lines changed

7 files changed

+70
-1
lines changed

e2e_test/batch/basic/join.slt.part

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,25 @@ select * from t1 join t2 using(v1) join t3 using(v2);
3232
----
3333
2 1 3 3
3434

35+
statement ok
36+
set batch_parallelism = 1;
37+
38+
query IIIIII
39+
select * from t1 join t2 using(v1) join t3 using(v2);
40+
----
41+
2 1 3 3
42+
43+
statement ok
44+
set batch_parallelism = 1000;
45+
46+
query IIIIII
47+
select * from t1 join t2 using(v1) join t3 using(v2);
48+
----
49+
2 1 3 3
50+
51+
statement ok
52+
set batch_parallelism = 0;
53+
3554
statement ok
3655
create index i1 on t1(v1) include(v2);
3756

src/common/src/session_config/mod.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ mod search_path;
1717
mod transaction_isolation_level;
1818
mod visibility_mode;
1919

20+
use std::num::NonZeroU64;
2021
use std::ops::Deref;
2122

2223
use chrono_tz::Tz;
@@ -33,7 +34,7 @@ use crate::util::epoch::Epoch;
3334

3435
// This is a hack, &'static str is not allowed as a const generics argument.
3536
// TODO: refine this using the adt_const_params feature.
36-
const CONFIG_KEYS: [&str; 20] = [
37+
const CONFIG_KEYS: [&str; 21] = [
3738
"RW_IMPLICIT_FLUSH",
3839
"CREATE_COMPACTION_GROUP_FOR_MV",
3940
"QUERY_MODE",
@@ -54,6 +55,7 @@ const CONFIG_KEYS: [&str; 20] = [
5455
"RW_FORCE_TWO_PHASE_AGG",
5556
"RW_ENABLE_SHARE_PLAN",
5657
"INTERVALSTYLE",
58+
"BATCH_PARALLELISM",
5759
];
5860

5961
// MUST HAVE 1v1 relationship to CONFIG_KEYS. e.g. CONFIG_KEYS[IMPLICIT_FLUSH] =
@@ -78,6 +80,7 @@ const ENABLE_TWO_PHASE_AGG: usize = 16;
7880
const FORCE_TWO_PHASE_AGG: usize = 17;
7981
const RW_ENABLE_SHARE_PLAN: usize = 18;
8082
const INTERVAL_STYLE: usize = 19;
83+
const BATCH_PARALLELISM: usize = 20;
8184

8285
trait ConfigEntry: Default + for<'a> TryFrom<&'a [&'a str], Error = RwError> {
8386
fn entry_name() -> &'static str;
@@ -278,6 +281,7 @@ type EnableTwoPhaseAgg = ConfigBool<ENABLE_TWO_PHASE_AGG, true>;
278281
type ForceTwoPhaseAgg = ConfigBool<FORCE_TWO_PHASE_AGG, false>;
279282
type EnableSharePlan = ConfigBool<RW_ENABLE_SHARE_PLAN, true>;
280283
type IntervalStyle = ConfigString<INTERVAL_STYLE>;
284+
type BatchParallelism = ConfigU64<BATCH_PARALLELISM, 0>;
281285

282286
#[derive(Derivative)]
283287
#[derivative(Default)]
@@ -354,6 +358,8 @@ pub struct ConfigMap {
354358

355359
/// see <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-INTERVALSTYLE>
356360
interval_style: IntervalStyle,
361+
362+
batch_parallelism: BatchParallelism,
357363
}
358364

359365
impl ConfigMap {
@@ -410,6 +416,8 @@ impl ConfigMap {
410416
self.enable_share_plan = val.as_slice().try_into()?;
411417
} else if key.eq_ignore_ascii_case(IntervalStyle::entry_name()) {
412418
self.interval_style = val.as_slice().try_into()?;
419+
} else if key.eq_ignore_ascii_case(BatchParallelism::entry_name()) {
420+
self.batch_parallelism = val.as_slice().try_into()?;
413421
} else {
414422
return Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into());
415423
}
@@ -458,6 +466,8 @@ impl ConfigMap {
458466
Ok(self.enable_share_plan.to_string())
459467
} else if key.eq_ignore_ascii_case(IntervalStyle::entry_name()) {
460468
Ok(self.interval_style.to_string())
469+
} else if key.eq_ignore_ascii_case(BatchParallelism::entry_name()) {
470+
Ok(self.batch_parallelism.to_string())
461471
} else {
462472
Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into())
463473
}
@@ -560,6 +570,11 @@ impl ConfigMap {
560570
setting : self.interval_style.to_string(),
561571
description : String::from("It is typically set by an application upon connection to the server.")
562572
},
573+
VariableInfo{
574+
name : BatchParallelism::entry_name().to_lowercase(),
575+
setting : self.batch_parallelism.to_string(),
576+
description: String::from("Sets the parallelism for batch. If 0, use default value.")
577+
},
563578
]
564579
}
565580

@@ -648,4 +663,11 @@ impl ConfigMap {
648663
pub fn get_interval_style(&self) -> &str {
649664
&self.interval_style
650665
}
666+
667+
pub fn get_batch_parallelism(&self) -> Option<NonZeroU64> {
668+
if self.batch_parallelism.0 != 0 {
669+
return Some(NonZeroU64::new(self.batch_parallelism.0).unwrap());
670+
}
671+
None
672+
}
651673
}

src/frontend/src/handler/explain.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ pub async fn handle_explain(
145145
plan_fragmenter = Some(BatchPlanFragmenter::new(
146146
session.env().worker_node_manager_ref(),
147147
session.env().catalog_reader().clone(),
148+
session.config().get_batch_parallelism(),
148149
plan,
149150
)?);
150151
}

src/frontend/src/handler/query.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ pub async fn handle_query(
162162
let plan_fragmenter = BatchPlanFragmenter::new(
163163
session.env().worker_node_manager_ref(),
164164
session.env().catalog_reader().clone(),
165+
session.config().get_batch_parallelism(),
165166
plan,
166167
)?;
167168
context.append_notice(&mut notice);

src/frontend/src/scheduler/distributed/query.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -660,6 +660,7 @@ pub(crate) mod tests {
660660
let fragmenter = BatchPlanFragmenter::new(
661661
worker_node_manager,
662662
catalog_reader,
663+
None,
663664
batch_exchange_node.clone(),
664665
)
665666
.unwrap();

src/frontend/src/scheduler/plan_fragmenter.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::cmp::min;
1516
use std::collections::{HashMap, HashSet};
1617
use std::fmt::{Debug, Formatter};
18+
use std::num::NonZeroU64;
1719
use std::sync::Arc;
1820

1921
use anyhow::anyhow;
@@ -120,6 +122,12 @@ pub struct BatchPlanFragmenter {
120122
worker_node_manager: WorkerNodeManagerRef,
121123
catalog_reader: CatalogReader,
122124

125+
/// if batch_parallelism is None, it means no limit, we will use the available nodes count as
126+
/// parallelism.
127+
/// if batch_parallelism is Some(num), we will use the min(num, the available
128+
/// nodes count) as parallelism.
129+
batch_parallelism: Option<NonZeroU64>,
130+
123131
stage_graph_builder: Option<StageGraphBuilder>,
124132
stage_graph: Option<StageGraph>,
125133
}
@@ -136,6 +144,7 @@ impl BatchPlanFragmenter {
136144
pub fn new(
137145
worker_node_manager: WorkerNodeManagerRef,
138146
catalog_reader: CatalogReader,
147+
batch_parallelism: Option<NonZeroU64>,
139148
batch_node: PlanRef,
140149
) -> SchedulerResult<Self> {
141150
let mut plan_fragmenter = Self {
@@ -144,6 +153,7 @@ impl BatchPlanFragmenter {
144153
next_stage_id: 0,
145154
worker_node_manager,
146155
catalog_reader,
156+
batch_parallelism,
147157
stage_graph: None,
148158
};
149159
plan_fragmenter.split_into_stage(batch_node)?;
@@ -751,6 +761,11 @@ impl BatchPlanFragmenter {
751761
lookup_join_parallelism
752762
} else if source_info.is_some() {
753763
0
764+
} else if let Some(num) = self.batch_parallelism {
765+
min(
766+
num.get() as usize,
767+
self.worker_node_manager.schedule_unit_count(),
768+
)
754769
} else {
755770
self.worker_node_manager.worker_node_count()
756771
}

src/frontend/src/scheduler/worker_node_manager.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,16 @@ impl WorkerNodeManager {
105105
self.inner.read().unwrap().worker_nodes.len()
106106
}
107107

108+
pub fn schedule_unit_count(&self) -> usize {
109+
self.inner
110+
.read()
111+
.unwrap()
112+
.worker_nodes
113+
.iter()
114+
.map(|node| node.parallel_units.len())
115+
.sum()
116+
}
117+
108118
/// If parallel unit ids is empty, the scheduler may fail to schedule any task and stuck at
109119
/// schedule next stage. If we do not return error in this case, needs more complex control
110120
/// logic above. Report in this function makes the schedule root fail reason more clear.

0 commit comments

Comments
 (0)