Skip to content

Commit a4afac3

Browse files
xx01cyxBugenZhao
andauthored
refactor: refine conditional compilation for mem control on different OSs (risingwavelabs#8504)
Co-authored-by: Bugen Zhao <[email protected]>
1 parent 9564db0 commit a4afac3

File tree

4 files changed

+105
-78
lines changed

4 files changed

+105
-78
lines changed

src/compute/src/memory_management/memory_manager.rs

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,15 @@
1414

1515
use std::sync::atomic::AtomicU64;
1616
use std::sync::Arc;
17+
use std::time::Duration;
1718

1819
use risingwave_batch::task::BatchManager;
20+
use risingwave_common::util::epoch::Epoch;
1921
use risingwave_stream::executor::monitor::StreamingMetrics;
2022
use risingwave_stream::task::LocalStreamManager;
2123

22-
use super::policy::MemoryControlPolicy;
24+
use super::MemoryControlPolicy;
25+
use crate::memory_management::MemoryControlStats;
2326

2427
/// The minimal memory requirement of computing tasks in megabytes.
2528
pub const MIN_COMPUTE_MEMORY_MB: usize = 512;
@@ -29,7 +32,6 @@ pub const SYSTEM_RESERVED_MEMORY_MB: usize = 512;
2932

3033
/// When `enable_managed_cache` is set, compute node will launch a [`GlobalMemoryManager`] to limit
3134
/// the memory usage.
32-
#[cfg_attr(not(target_os = "linux"), expect(dead_code))]
3335
pub struct GlobalMemoryManager {
3436
/// All cached data before the watermark should be evicted.
3537
watermark_epoch: Arc<AtomicU64>,
@@ -74,28 +76,13 @@ impl GlobalMemoryManager {
7476
self.watermark_epoch.clone()
7577
}
7678

77-
// FIXME: remove such limitation after #7180
78-
/// Jemalloc is not supported on Windows, because of tikv-jemalloc's own reasons.
79-
/// See the comments for the macro `enable_jemalloc_on_linux!()`
80-
#[cfg(not(target_os = "linux"))]
81-
#[expect(clippy::unused_async)]
82-
pub async fn run(self: Arc<Self>, _: Arc<BatchManager>, _: Arc<LocalStreamManager>) {}
83-
84-
/// Memory manager will get memory usage from batch and streaming, and do some actions.
85-
/// 1. if batch exceeds, kill running query.
86-
/// 2. if streaming exceeds, evict cache by watermark.
87-
#[cfg(target_os = "linux")]
79+
/// Memory manager will get memory usage statistics from batch and streaming and perform memory
80+
/// control accordingly.
8881
pub async fn run(
8982
self: Arc<Self>,
9083
batch_manager: Arc<BatchManager>,
9184
stream_manager: Arc<LocalStreamManager>,
9285
) {
93-
use std::time::Duration;
94-
95-
use risingwave_common::util::epoch::Epoch;
96-
97-
use crate::memory_management::policy::MemoryControlStats;
98-
9986
let mut tick_interval =
10087
tokio::time::interval(Duration::from_millis(self.barrier_interval_ms as u64));
10188
let mut memory_control_stats = MemoryControlStats {

src/compute/src/memory_management/mod.rs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,98 @@
1313
// limitations under the License.
1414

1515
pub mod memory_manager;
16+
#[cfg(target_os = "linux")]
1617
pub mod policy;
18+
19+
use std::sync::atomic::AtomicU64;
20+
use std::sync::Arc;
21+
22+
use risingwave_batch::task::BatchManager;
23+
use risingwave_common::error::Result;
24+
use risingwave_stream::task::LocalStreamManager;
25+
26+
use crate::ComputeNodeOpts;
27+
28+
/// `MemoryControlStats` contains the necessary information for memory control, including both batch
29+
/// and streaming.
30+
#[derive(Default)]
31+
pub struct MemoryControlStats {
32+
pub batch_memory_usage: usize,
33+
pub streaming_memory_usage: usize,
34+
pub jemalloc_allocated_mib: usize,
35+
pub lru_watermark_step: u64,
36+
pub lru_watermark_time_ms: u64,
37+
pub lru_physical_now_ms: u64,
38+
}
39+
40+
pub type MemoryControlPolicy = Box<dyn MemoryControl>;
41+
42+
pub trait MemoryControl: Send + Sync {
43+
fn apply(
44+
&self,
45+
total_compute_memory_bytes: usize,
46+
barrier_interval_ms: u32,
47+
prev_memory_stats: MemoryControlStats,
48+
batch_manager: Arc<BatchManager>,
49+
stream_manager: Arc<LocalStreamManager>,
50+
watermark_epoch: Arc<AtomicU64>,
51+
) -> MemoryControlStats;
52+
53+
fn describe(&self, total_compute_memory_bytes: usize) -> String;
54+
}
55+
56+
#[cfg(target_os = "linux")]
57+
pub fn memory_control_policy_from_config(opts: &ComputeNodeOpts) -> Result<MemoryControlPolicy> {
58+
use anyhow::anyhow;
59+
60+
use self::policy::{FixedProportionPolicy, StreamingOnlyPolicy};
61+
62+
let input_policy = &opts.memory_control_policy;
63+
if input_policy == FixedProportionPolicy::CONFIG_STR {
64+
Ok(Box::new(FixedProportionPolicy::new(
65+
opts.streaming_memory_proportion,
66+
)?))
67+
} else if input_policy == StreamingOnlyPolicy::CONFIG_STR {
68+
Ok(Box::new(StreamingOnlyPolicy))
69+
} else {
70+
let valid_values = [
71+
FixedProportionPolicy::CONFIG_STR,
72+
StreamingOnlyPolicy::CONFIG_STR,
73+
];
74+
Err(anyhow!(format!(
75+
"invalid memory control policy in configuration: {}, valid values: {:?}",
76+
input_policy, valid_values,
77+
))
78+
.into())
79+
}
80+
}
81+
82+
#[cfg(not(target_os = "linux"))]
83+
pub fn memory_control_policy_from_config(_opts: &ComputeNodeOpts) -> Result<MemoryControlPolicy> {
84+
// We disable memory control on operating systems other than Linux now because jemalloc
85+
// stats do not work well.
86+
tracing::warn!("memory control is only enabled on Linux now");
87+
Ok(Box::new(DummyPolicy))
88+
}
89+
90+
/// `DummyPolicy` is used for operarting systems other than Linux. It does nothing as memory control
91+
/// is disabled on non-Linux OS.
92+
pub struct DummyPolicy;
93+
94+
impl MemoryControl for DummyPolicy {
95+
fn apply(
96+
&self,
97+
_total_compute_memory_bytes: usize,
98+
_barrier_interval_ms: u32,
99+
_prev_memory_stats: MemoryControlStats,
100+
_batch_manager: Arc<BatchManager>,
101+
_stream_manager: Arc<LocalStreamManager>,
102+
_watermark_epoch: Arc<AtomicU64>,
103+
) -> MemoryControlStats {
104+
MemoryControlStats::default()
105+
}
106+
107+
fn describe(&self, _total_compute_memory_bytes: usize) -> String {
108+
"DummyPolicy".to_string()
109+
}
110+
}

src/compute/src/memory_management/policy.rs

Lines changed: 4 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -22,55 +22,7 @@ use risingwave_common::error::Result;
2222
use risingwave_common::util::epoch::Epoch;
2323
use risingwave_stream::task::LocalStreamManager;
2424

25-
use crate::ComputeNodeOpts;
26-
27-
/// `MemoryControlStats` contains the necessary information for memory control, including both batch
28-
/// and streaming.
29-
pub struct MemoryControlStats {
30-
pub batch_memory_usage: usize,
31-
pub streaming_memory_usage: usize,
32-
pub jemalloc_allocated_mib: usize,
33-
pub lru_watermark_step: u64,
34-
pub lru_watermark_time_ms: u64,
35-
pub lru_physical_now_ms: u64,
36-
}
37-
38-
pub type MemoryControlPolicy = Box<dyn MemoryControl>;
39-
40-
pub trait MemoryControl: Send + Sync {
41-
fn apply(
42-
&self,
43-
total_compute_memory_bytes: usize,
44-
barrier_interval_ms: u32,
45-
prev_memory_stats: MemoryControlStats,
46-
batch_manager: Arc<BatchManager>,
47-
stream_manager: Arc<LocalStreamManager>,
48-
watermark_epoch: Arc<AtomicU64>,
49-
) -> MemoryControlStats;
50-
51-
fn describe(&self, total_compute_memory_bytes: usize) -> String;
52-
}
53-
54-
pub fn memory_control_policy_from_config(opts: &ComputeNodeOpts) -> Result<MemoryControlPolicy> {
55-
let input_policy = &opts.memory_control_policy;
56-
if input_policy == FixedProportionPolicy::CONFIG_STR {
57-
Ok(Box::new(FixedProportionPolicy::new(
58-
opts.streaming_memory_proportion,
59-
)?))
60-
} else if input_policy == StreamingOnlyPolicy::CONFIG_STR {
61-
Ok(Box::new(StreamingOnlyPolicy {}))
62-
} else {
63-
let valid_values = [
64-
FixedProportionPolicy::CONFIG_STR,
65-
StreamingOnlyPolicy::CONFIG_STR,
66-
];
67-
Err(anyhow!(format!(
68-
"invalid memory control policy in configuration: {}, valid values: {:?}",
69-
input_policy, valid_values,
70-
))
71-
.into())
72-
}
73-
}
25+
use super::{MemoryControl, MemoryControlStats};
7426

7527
/// `FixedProportionPolicy` performs memory control by limiting the memory usage of both batch and
7628
/// streaming to a fixed proportion.
@@ -83,7 +35,7 @@ pub struct FixedProportionPolicy {
8335

8436
impl FixedProportionPolicy {
8537
const BATCH_KILL_QUERY_THRESHOLD: f64 = 0.8;
86-
const CONFIG_STR: &str = "streaming-batch";
38+
pub const CONFIG_STR: &str = "streaming-batch";
8739
const STREAM_EVICTION_THRESHOLD_AGGRESSIVE: f64 = 0.9;
8840
const STREAM_EVICTION_THRESHOLD_GRACEFUL: f64 = 0.7;
8941

@@ -172,10 +124,10 @@ impl MemoryControl for FixedProportionPolicy {
172124
/// `FixedProportionPolicy` in that it calculates the memory usage based on jemalloc statistics,
173125
/// which actually contains system usage other than computing tasks. This is the default memory
174126
/// control policy.
175-
pub struct StreamingOnlyPolicy {}
127+
pub struct StreamingOnlyPolicy;
176128

177129
impl StreamingOnlyPolicy {
178-
const CONFIG_STR: &str = "streaming-only";
130+
pub const CONFIG_STR: &str = "streaming-only";
179131
const STREAM_EVICTION_THRESHOLD_AGGRESSIVE: f64 = 0.9;
180132
const STREAM_EVICTION_THRESHOLD_GRACEFUL: f64 = 0.7;
181133
}
@@ -230,7 +182,6 @@ impl MemoryControl for StreamingOnlyPolicy {
230182
}
231183
}
232184

233-
#[cfg(target_os = "linux")]
234185
fn advance_jemalloc_epoch(prev_jemalloc_allocated_mib: usize) -> usize {
235186
use tikv_jemalloc_ctl::{epoch as jemalloc_epoch, stats as jemalloc_stats};
236187

@@ -246,11 +197,6 @@ fn advance_jemalloc_epoch(prev_jemalloc_allocated_mib: usize) -> usize {
246197
})
247198
}
248199

249-
#[cfg(not(target_os = "linux"))]
250-
fn advance_jemalloc_epoch(_prev_jemalloc_allocated_mib: usize) -> usize {
251-
0
252-
}
253-
254200
fn calculate_lru_watermark(
255201
cur_stream_used_memory_bytes: usize,
256202
stream_memory_threshold_graceful: usize,

src/compute/src/server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,10 @@ use risingwave_stream::task::{LocalStreamManager, StreamEnvironment};
5757
use tokio::sync::oneshot::Sender;
5858
use tokio::task::JoinHandle;
5959

60+
use crate::memory_management::memory_control_policy_from_config;
6061
use crate::memory_management::memory_manager::{
6162
GlobalMemoryManager, MIN_COMPUTE_MEMORY_MB, SYSTEM_RESERVED_MEMORY_MB,
6263
};
63-
use crate::memory_management::policy::memory_control_policy_from_config;
6464
use crate::observer::observer_manager::ComputeObserverNode;
6565
use crate::rpc::service::config_service::ConfigServiceImpl;
6666
use crate::rpc::service::exchange_metrics::ExchangeServiceMetrics;

0 commit comments

Comments
 (0)