Skip to content

Commit f64e089

Browse files
lnfjptlongbinlai
andauthored
feat(interactive): Support cancel in interactive (#3310)
<!-- Thanks for your contribution! please review https://github.com/alibaba/GraphScope/blob/main/CONTRIBUTING.md before opening an issue. --> ## What do these changes do? <!-- Please give a short brief about these changes. --> ## Related issue number <!-- Are there any issues opened that will be resolved by merging this change? --> Fixes #3309 --------- Co-authored-by: Longbin Lai <[email protected]>
1 parent d61c7c7 commit f64e089

File tree

6 files changed

+48
-4
lines changed

6 files changed

+48
-4
lines changed

interactive_engine/executor/engine/pegasus/common/src/channel.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ impl<T: Send> Clone for MessageSender<T> {
136136
impl<T: Send> Drop for MessageSender<T> {
137137
fn drop(&mut self) {
138138
if !self.is_closed.get() {
139-
warn!("dropping an unclosed 'MessageSender' id = {}", self.id);
139+
trace!("dropping an unclosed 'MessageSender' id = {}", self.id);
140140
self.poison();
141141
self.close();
142142
}

interactive_engine/executor/engine/pegasus/pegasus/src/data_plane/intra_thread.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ impl<T: Send> Push<T> for ThreadPush<T> {
8282
impl<T> Drop for ThreadPush<T> {
8383
fn drop(&mut self) {
8484
if Arc::strong_count(&self.exhaust) == 2 && !self.exhaust.load(Ordering::SeqCst) {
85-
warn_worker!("{:?}: drop 'ThreadPush' without close;", self.id);
85+
trace_worker!("{:?}: drop 'ThreadPush' without close;", self.id);
8686
// if cfg!(debug_assertions) {
8787
// let bt = backtrace::Backtrace::new();
8888
// error_worker!("caused by:\n{:?}", bt);

interactive_engine/executor/engine/pegasus/pegasus/src/errors/mod.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,23 @@ impl From<std::io::Error> for StartupError {
340340
}
341341
}
342342

343+
#[derive(Debug)]
344+
pub enum CancelError {
345+
JobNotFoundError(u64),
346+
}
347+
348+
impl Display for CancelError {
349+
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
350+
match self {
351+
CancelError::JobNotFoundError(e) => {
352+
write!(f, "fail to find job, job id: {};", e)
353+
}
354+
}
355+
}
356+
}
357+
358+
impl Error for CancelError {}
359+
343360
#[macro_export]
344361
macro_rules! throw_io_error {
345362
() => {{

interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,10 @@ pub mod stream;
5151
pub mod utils;
5252
mod worker;
5353

54-
use std::collections::HashSet;
54+
use std::collections::{HashMap, HashSet};
5555
use std::fmt::Debug;
5656
use std::net::SocketAddr;
57+
use std::sync::atomic::{AtomicBool, Ordering};
5758

5859
pub use config::{read_from, Configuration, JobConf, ServerConf};
5960
pub use data::Data;
@@ -65,14 +66,15 @@ pub use worker::Worker;
6566
pub use worker_id::{get_current_worker, get_current_worker_checked, set_current_worker, WorkerId};
6667

6768
use crate::api::Source;
68-
pub use crate::errors::{BuildJobError, JobSubmitError, SpawnJobError, StartupError};
69+
pub use crate::errors::{BuildJobError, CancelError, JobSubmitError, SpawnJobError, StartupError};
6970
use crate::resource::PartitionedResource;
7071
use crate::result::{ResultSink, ResultStream};
7172
use crate::worker_id::WorkerIdIter;
7273

7374
lazy_static! {
7475
static ref SERVER_ID: Mutex<Option<u64>> = Mutex::new(None);
7576
static ref SERVERS: RwLock<Vec<u64>> = RwLock::new(vec![]);
77+
static ref JOB_CANCEL_MAP: RwLock<HashMap<u64, Arc<AtomicBool>>> = RwLock::new(HashMap::new());
7678
pub static ref PROFILE_TIME_FLAG: bool = configure_with_default!(bool, "PROFILE_TIME_FLAG", false);
7779
pub static ref PROFILE_COMM_FLAG: bool = configure_with_default!(bool, "PROFILE_COMM_FLAG", false);
7880
}
@@ -261,6 +263,9 @@ where
261263
F: FnMut(&mut Worker<DI, DO>) -> Result<(), BuildJobError>,
262264
{
263265
init_env();
266+
let cancel_hook = sink.get_cancel_hook().clone();
267+
let mut lock = JOB_CANCEL_MAP.write().expect("lock poisoned");
268+
lock.insert(conf.job_id, cancel_hook);
264269
let peer_guard = Arc::new(AtomicUsize::new(0));
265270
let conf = Arc::new(conf);
266271
let workers = allocate_local_worker(&conf)?;
@@ -293,6 +298,16 @@ where
293298
}
294299
}
295300

301+
pub fn cancel_job(job_id: u64) -> Result<(), CancelError> {
302+
let mut hook = JOB_CANCEL_MAP.write().expect("lock poisoned");
303+
if let Some(cancel_hook) = hook.get_mut(&job_id) {
304+
cancel_hook.store(true, Ordering::SeqCst);
305+
} else {
306+
return Err(CancelError::JobNotFoundError(job_id));
307+
}
308+
Ok(())
309+
}
310+
296311
#[inline]
297312
fn allocate_local_worker(conf: &Arc<JobConf>) -> Result<Option<WorkerIdIter>, BuildJobError> {
298313
let server_conf = conf.servers();

interactive_engine/executor/engine/pegasus/server/proto/job_service.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,17 @@ message JobResponse {
6262
bytes resp = 2;
6363
}
6464

65+
message CancelRequest {
66+
uint64 job_id = 1;
67+
}
68+
6569
service JobService {
6670

6771
rpc AddLibrary(BinaryResource) returns(Empty) {}
6872

6973
rpc RemoveLibrary(Name) returns(Empty) {}
7074

75+
rpc Cancel(CancelRequest) returns(Empty) {}
76+
7177
rpc Submit(JobRequest) returns(stream JobResponse) {}
7278
}

interactive_engine/executor/engine/pegasus/server/src/rpc.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,12 @@ where
157157

158158
type SubmitStream = UnboundedReceiverStream<Result<pb::JobResponse, Status>>;
159159

160+
async fn cancel(&self, req: Request<pb::CancelRequest>) -> Result<Response<Empty>, Status> {
161+
let pb::CancelRequest { job_id } = req.into_inner();
162+
pegasus::cancel_job(job_id);
163+
Ok(Response::new(Empty {}))
164+
}
165+
160166
async fn submit(&self, req: Request<pb::JobRequest>) -> Result<Response<Self::SubmitStream>, Status> {
161167
debug!("accept new request from {:?};", req.remote_addr());
162168
let pb::JobRequest { conf, source, plan, resource } = req.into_inner();

0 commit comments

Comments
 (0)