Skip to content

Commit b6f7905

Browse files
committed
[Feature] Add Redis Scheduler
Adds an experimental redis scheduler that can be used as a distributed state-persistent scheduler backend. This scheduler is optimized to have each worker be its own scheduler or many small schedulers. closes: #359
1 parent ac4ca57 commit b6f7905

File tree

18 files changed

+1362
-32
lines changed

18 files changed

+1362
-32
lines changed

Cargo.lock

Lines changed: 51 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

nativelink-config/src/schedulers.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,16 @@ pub struct SimpleScheduler {
130130
pub enum ExperimentalSimpleSchedulerBackend {
131131
/// Use an in-memory store for the scheduler.
132132
memory,
133+
/// Use a redis store for the scheduler.
134+
redis(ExperimentalRedisSchedulerBackend),
135+
}
136+
137+
#[derive(Deserialize, Debug, Default)]
138+
#[serde(deny_unknown_fields)]
139+
pub struct ExperimentalRedisSchedulerBackend {
140+
/// A reference to the redis store to use for the scheduler.
141+
/// Note: This MUST resolve to a RedisStore.
142+
pub redis_store: StoreRefName,
133143
}
134144

135145
/// A scheduler that simply forwards requests to an upstream scheduler. This

nativelink-scheduler/BUILD.bazel

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ rust_test_suite(
6060
"tests/action_messages_test.rs",
6161
"tests/cache_lookup_scheduler_test.rs",
6262
"tests/property_modifier_scheduler_test.rs",
63+
"tests/redis_store_awaited_action_db_test.rs",
6364
"tests/simple_scheduler_test.rs",
6465
],
6566
compile_data = [
@@ -79,10 +80,14 @@ rust_test_suite(
7980
"//nativelink-store",
8081
"//nativelink-util",
8182
"@crates//:async-lock",
83+
"@crates//:bytes",
84+
"@crates//:fred",
8285
"@crates//:futures",
8386
"@crates//:mock_instant",
87+
"@crates//:parking_lot",
8488
"@crates//:pretty_assertions",
8589
"@crates//:prost",
90+
"@crates//:serde_json",
8691
"@crates//:tokio",
8792
"@crates//:tokio-stream",
8893
"@crates//:uuid",

nativelink-scheduler/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,6 @@ static_assertions = "1.1.0"
3535
[dev-dependencies]
3636
nativelink-macro = { path = "../nativelink-macro" }
3737
pretty_assertions = { version = "1.4.0", features = ["std"] }
38+
fred = { version = "9.1.2", default-features = false, features = [
39+
"mocks",
40+
] }

nativelink-scheduler/src/awaited_action_db/awaited_action.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use static_assertions::{assert_eq_size, const_assert, const_assert_eq};
2828
/// The version of the awaited action.
2929
/// This number will always increment by one each time
3030
/// the action is updated.
31-
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
31+
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
3232
struct AwaitedActionVersion(u64);
3333

3434
impl MetricsComponent for AwaitedActionVersion {
@@ -80,7 +80,7 @@ pub struct AwaitedAction {
8080
}
8181

8282
impl AwaitedAction {
83-
pub fn new(operation_id: OperationId, action_info: Arc<ActionInfo>) -> Self {
83+
pub fn new(operation_id: OperationId, action_info: Arc<ActionInfo>, now: SystemTime) -> Self {
8484
let stage = ActionStage::Queued;
8585
let sort_key = AwaitedActionSortKey::new_with_unique_key(
8686
action_info.priority,
@@ -102,7 +102,7 @@ impl AwaitedAction {
102102
operation_id,
103103
sort_key,
104104
attempts: 0,
105-
last_worker_updated_timestamp: SystemTime::now(),
105+
last_worker_updated_timestamp: now,
106106
worker_id: None,
107107
state,
108108
}
@@ -120,19 +120,19 @@ impl AwaitedAction {
120120
self.version = AwaitedActionVersion(self.version.0 + 1);
121121
}
122122

123-
pub(crate) fn action_info(&self) -> &Arc<ActionInfo> {
123+
pub fn action_info(&self) -> &Arc<ActionInfo> {
124124
&self.action_info
125125
}
126126

127-
pub(crate) fn operation_id(&self) -> &OperationId {
127+
pub fn operation_id(&self) -> &OperationId {
128128
&self.operation_id
129129
}
130130

131131
pub(crate) fn sort_key(&self) -> AwaitedActionSortKey {
132132
self.sort_key
133133
}
134134

135-
pub(crate) fn state(&self) -> &Arc<ActionState> {
135+
pub fn state(&self) -> &Arc<ActionState> {
136136
&self.state
137137
}
138138

@@ -158,7 +158,7 @@ impl AwaitedAction {
158158

159159
/// Sets the current state of the action and notifies subscribers.
160160
/// Returns true if the state was set, false if there are no subscribers.
161-
pub(crate) fn set_state(&mut self, mut state: Arc<ActionState>, now: Option<SystemTime>) {
161+
pub fn set_state(&mut self, mut state: Arc<ActionState>, now: Option<SystemTime>) {
162162
std::mem::swap(&mut self.state, &mut state);
163163
if let Some(now) = now {
164164
self.keep_alive(now);

nativelink-scheduler/src/default_scheduler_factory.rs

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ use std::time::SystemTime;
1717

1818
use nativelink_config::schedulers::{ExperimentalSimpleSchedulerBackend, SchedulerConfig};
1919
use nativelink_config::stores::EvictionPolicy;
20-
use nativelink_error::{Error, ResultExt};
20+
use nativelink_error::{make_input_err, Error, ResultExt};
21+
use nativelink_store::redis_store::RedisStore;
2122
use nativelink_store::store_manager::StoreManager;
2223
use nativelink_util::instant_wrapper::InstantWrapper;
2324
use nativelink_util::operation_state_manager::ClientStateManager;
@@ -28,6 +29,7 @@ use crate::grpc_scheduler::GrpcScheduler;
2829
use crate::memory_awaited_action_db::MemoryAwaitedActionDb;
2930
use crate::property_modifier_scheduler::PropertyModifierScheduler;
3031
use crate::simple_scheduler::SimpleScheduler;
32+
use crate::store_awaited_action_db::StoreAwaitedActionDb;
3133
use crate::worker_scheduler::WorkerScheduler;
3234

3335
/// Default timeout for recently completed actions in seconds.
@@ -51,7 +53,9 @@ fn inner_scheduler_factory(
5153
store_manager: &StoreManager,
5254
) -> Result<SchedulerFactoryResults, Error> {
5355
let scheduler: SchedulerFactoryResults = match scheduler_type_cfg {
54-
SchedulerConfig::simple(config) => simple_scheduler_factory(config)?,
56+
SchedulerConfig::simple(config) => {
57+
simple_scheduler_factory(config, store_manager, SystemTime::now)?
58+
}
5559
SchedulerConfig::grpc(config) => (Some(Arc::new(GrpcScheduler::new(config)?)), None),
5660
SchedulerConfig::cache_lookup(config) => {
5761
let ac_store = store_manager
@@ -83,6 +87,8 @@ fn inner_scheduler_factory(
8387

8488
fn simple_scheduler_factory(
8589
config: &nativelink_config::schedulers::SimpleScheduler,
90+
store_manager: &StoreManager,
91+
now_fn: fn() -> SystemTime,
8692
) -> Result<SchedulerFactoryResults, Error> {
8793
match config
8894
.experimental_backend
@@ -100,6 +106,36 @@ fn simple_scheduler_factory(
100106
SimpleScheduler::new(config, awaited_action_db, task_change_notify);
101107
Ok((Some(action_scheduler), Some(worker_scheduler)))
102108
}
109+
ExperimentalSimpleSchedulerBackend::redis(redis_config) => {
110+
let store = store_manager
111+
.get_store(redis_config.redis_store.as_ref())
112+
.err_tip(|| {
113+
format!(
114+
"'redis_store': '{}' does not exist",
115+
redis_config.redis_store
116+
)
117+
})?;
118+
let task_change_notify = Arc::new(Notify::new());
119+
let store = store
120+
.into_inner()
121+
.as_any_arc()
122+
.downcast::<RedisStore>()
123+
.map_err(|_| {
124+
make_input_err!(
125+
"Could not downcast to redis store in RedisAwaitedActionDb::new"
126+
)
127+
})?;
128+
let awaited_action_db = StoreAwaitedActionDb::new(
129+
store,
130+
task_change_notify.clone(),
131+
now_fn,
132+
Default::default,
133+
)
134+
.err_tip(|| "In state_manager_factory::redis_state_manager")?;
135+
let (action_scheduler, worker_scheduler) =
136+
SimpleScheduler::new(config, awaited_action_db, task_change_notify);
137+
Ok((Some(action_scheduler), Some(worker_scheduler)))
138+
}
103139
}
104140
}
105141

nativelink-scheduler/src/memory_awaited_action_db.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -697,7 +697,8 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI
697697
ActionUniqueQualifier::Uncachable(_unique_key) => None,
698698
};
699699
let operation_id = OperationId::default();
700-
let awaited_action = AwaitedAction::new(operation_id.clone(), action_info);
700+
let awaited_action =
701+
AwaitedAction::new(operation_id.clone(), action_info, (self.now_fn)().now());
701702
debug_assert!(
702703
ActionStage::Queued == awaited_action.state().stage,
703704
"Expected action to be queued"

nativelink-scheduler/src/store_awaited_action_db.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -296,17 +296,19 @@ impl SchedulerStoreDataProvider for UpdateClientIdToOperationId {
296296
}
297297

298298
#[derive(MetricsComponent)]
299-
pub struct StoreAwaitedActionDb<S: SchedulerStore> {
299+
pub struct StoreAwaitedActionDb<S: SchedulerStore, F: Fn() -> OperationId> {
300300
store: Arc<S>,
301301
now_fn: fn() -> SystemTime,
302+
operation_id_creator: F,
302303
_pull_task_change_subscriber_spawn: JoinHandleDropGuard<()>,
303304
}
304305

305-
impl<S: SchedulerStore> StoreAwaitedActionDb<S> {
306+
impl<S: SchedulerStore, F: Fn() -> OperationId> StoreAwaitedActionDb<S, F> {
306307
pub fn new(
307308
store: Arc<S>,
308309
task_change_publisher: Arc<Notify>,
309310
now_fn: fn() -> SystemTime,
311+
operation_id_creator: F,
310312
) -> Result<Self, Error> {
311313
let mut subscription = store
312314
.subscription_manager()
@@ -340,6 +342,7 @@ impl<S: SchedulerStore> StoreAwaitedActionDb<S> {
340342
Ok(Self {
341343
store,
342344
now_fn,
345+
operation_id_creator,
343346
_pull_task_change_subscriber_spawn: pull_task_change_subscriber,
344347
})
345348
}
@@ -409,7 +412,9 @@ impl<S: SchedulerStore> StoreAwaitedActionDb<S> {
409412
}
410413
}
411414

412-
impl<S: SchedulerStore> AwaitedActionDb for StoreAwaitedActionDb<S> {
415+
impl<S: SchedulerStore, F: Fn() -> OperationId + Send + Sync + Unpin + 'static> AwaitedActionDb
416+
for StoreAwaitedActionDb<S, F>
417+
{
413418
type Subscriber = OperationSubscriber<S>;
414419

415420
async fn get_awaited_action_by_id(
@@ -466,8 +471,9 @@ impl<S: SchedulerStore> AwaitedActionDb for StoreAwaitedActionDb<S> {
466471
return Ok(sub);
467472
}
468473

469-
let new_operation_id = OperationId::default();
470-
let awaited_action = AwaitedAction::new(new_operation_id.clone(), action_info);
474+
let new_operation_id = (self.operation_id_creator)();
475+
let awaited_action =
476+
AwaitedAction::new(new_operation_id.clone(), action_info, (self.now_fn)());
471477
debug_assert!(
472478
ActionStage::Queued == awaited_action.state().stage,
473479
"Expected action to be queued"

0 commit comments

Comments
 (0)