Skip to content

Commit 8dd9f83

Browse files
committed
feat: server shutdown signal
1 parent 69041c4 commit 8dd9f83

File tree

8 files changed

+170
-27
lines changed

8 files changed

+170
-27
lines changed

.cspell.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
11
version: "0.2"
22
words:
3+
- actix
4+
- addrs
5+
- mptcp
6+
- nonblocking
7+
- oneshot
38
- rustup

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

actix-server/CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
## Unreleased
44

5+
- Add `ServerBuilder::shutdown_signal()` method.
56
- Minimum supported Rust version (MSRV) is now 1.74.
67

78
## 2.5.1

actix-server/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@ actix-rt = "2.8"
4444
bytes = "1"
4545
futures-util = { version = "0.3.17", default-features = false, features = ["sink", "async-await-macro"] }
4646
pretty_env_logger = "0.5"
47-
tokio = { version = "1.44.2", features = ["io-util", "rt-multi-thread", "macros", "fs"] }
47+
static_assertions = "1"
48+
tokio = { version = "1.44.2", features = ["io-util", "rt-multi-thread", "macros", "fs", "time"] }
49+
tokio-util = "0.7"
4850

4951
[lints]
5052
workspace = true
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
//! Demonstrates use of the `ServerBuilder::shutdown_signal` method using `tokio-util`s
2+
//! `CancellationToken` helper using a nonsensical timer. In practice, this cancellation token would
3+
//! be wired throughout your application and typically triggered by OS signals elsewhere.
4+
5+
use std::{io, time::Duration};
6+
7+
use actix_rt::net::TcpStream;
8+
use actix_server::Server;
9+
use actix_service::fn_service;
10+
use tokio_util::sync::CancellationToken;
11+
12+
async fn run(stop_signal: CancellationToken) -> io::Result<()> {
13+
pretty_env_logger::formatted_timed_builder()
14+
.parse_env(pretty_env_logger::env_logger::Env::default().default_filter_or("info"));
15+
16+
let addr = ("127.0.0.1", 8080);
17+
tracing::info!("starting server on port: {}", &addr.0);
18+
19+
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
20+
// let (tx, mut rx) = tokio::sync::broadcast::channel::<()>(1);
21+
22+
Server::build()
23+
.bind("shutdown-signal", addr, || {
24+
fn_service(|_stream: TcpStream| async { Ok::<_, io::Error>(()) })
25+
})?
26+
// .shutdown_signal(stop_signal.cancelled_owned())
27+
.shutdown_signal(async move {
28+
rx.await;
29+
// rx.recv().await;
30+
})
31+
.run()
32+
.await
33+
}
34+
35+
#[tokio::main]
36+
async fn main() -> io::Result<()> {
37+
let stop_signal = CancellationToken::new();
38+
39+
tokio::spawn({
40+
let stop_signal = stop_signal.clone();
41+
async move {
42+
tokio::time::sleep(Duration::from_secs(10)).await;
43+
stop_signal.cancel();
44+
}
45+
});
46+
47+
run(stop_signal).await?;
48+
Ok(())
49+
}

actix-server/src/builder.rs

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
use std::{io, num::NonZeroUsize, time::Duration};
1+
use std::{future::Future, io, num::NonZeroUsize, time::Duration};
22

33
use actix_rt::net::TcpStream;
4+
use futures_core::future::BoxFuture;
45
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
56

67
use crate::{
@@ -39,6 +40,7 @@ pub struct ServerBuilder {
3940
pub(crate) mptcp: MpTcp,
4041
pub(crate) exit: bool,
4142
pub(crate) listen_os_signals: bool,
43+
pub(crate) shutdown_signal: Option<BoxFuture<'static, ()>>,
4244
pub(crate) cmd_tx: UnboundedSender<ServerCommand>,
4345
pub(crate) cmd_rx: UnboundedReceiver<ServerCommand>,
4446
pub(crate) worker_config: ServerWorkerConfig,
@@ -64,6 +66,7 @@ impl ServerBuilder {
6466
mptcp: MpTcp::Disabled,
6567
exit: false,
6668
listen_os_signals: true,
69+
shutdown_signal: None,
6770
cmd_tx,
6871
cmd_rx,
6972
worker_config: ServerWorkerConfig::default(),
@@ -170,6 +173,41 @@ impl ServerBuilder {
170173
self
171174
}
172175

176+
/// Specify shutdown signal from a future.
177+
///
178+
/// Using this method will prevent OS signal handlers being set up.
179+
///
180+
/// Typically, a `CancellationToken` or `oneshot` / `broadcast` channel will be used.
181+
///
182+
/// # Examples
183+
///
184+
/// ```
185+
/// # use std::io;
186+
/// # use tokio::net::TcpStream;
187+
/// # use actix_server::Server;
188+
/// # async fn run() -> io::Result<()> {
189+
/// use actix_service::fn_service;
190+
/// use tokio_util::sync::CancellationToken;
191+
///
192+
/// let stop_signal = CancellationToken::new();
193+
///
194+
/// Server::build()
195+
/// .bind("shutdown-signal", "127.0.0.1:12345", || {
196+
/// fn_service(|_stream: TcpStream| async { Ok::<_, io::Error>(()) })
197+
/// })?
198+
/// .shutdown_signal(stop_signal.cancelled_owned())
199+
/// .run()
200+
/// .await
201+
/// # }
202+
/// ```
203+
pub fn shutdown_signal<Fut>(mut self, shutdown_signal: Fut) -> Self
204+
where
205+
Fut: Future<Output = ()> + Send + 'static,
206+
{
207+
self.shutdown_signal = Some(Box::pin(shutdown_signal));
208+
self
209+
}
210+
173211
/// Timeout for graceful workers shutdown in seconds.
174212
///
175213
/// After receiving a stop signal, workers have this much time to finish serving requests.

actix-server/src/server.rs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use crate::{
1818
builder::ServerBuilder,
1919
join_all::join_all,
2020
service::InternalServiceFactory,
21-
signals::{SignalKind, Signals},
21+
signals::{OsSignals, SignalKind, StopSignal},
2222
waker_queue::{WakerInterest, WakerQueue},
2323
worker::{ServerWorker, ServerWorkerConfig, WorkerHandleServer},
2424
ServerHandle,
@@ -210,7 +210,12 @@ impl ServerInner {
210210
let (waker_queue, worker_handles, accept_handle) = Accept::start(sockets, &builder)?;
211211

212212
let mux = ServerEventMultiplexer {
213-
signal_fut: (builder.listen_os_signals).then(Signals::new),
213+
signal_fut: builder.shutdown_signal.map(StopSignal::Cancel).or_else(|| {
214+
builder
215+
.listen_os_signals
216+
.then(OsSignals::new)
217+
.map(StopSignal::Os)
218+
}),
214219
cmd_rx: builder.cmd_rx,
215220
};
216221

@@ -315,7 +320,16 @@ impl ServerInner {
315320

316321
fn map_signal(signal: SignalKind) -> ServerCommand {
317322
match signal {
318-
SignalKind::Int => {
323+
SignalKind::Cancel => {
324+
info!("Cancellation token/channel received; starting graceful shutdown");
325+
ServerCommand::Stop {
326+
graceful: true,
327+
completion: None,
328+
force_system_stop: true,
329+
}
330+
}
331+
332+
SignalKind::OsInt => {
319333
info!("SIGINT received; starting forced shutdown");
320334
ServerCommand::Stop {
321335
graceful: false,
@@ -324,7 +338,7 @@ impl ServerInner {
324338
}
325339
}
326340

327-
SignalKind::Term => {
341+
SignalKind::OsTerm => {
328342
info!("SIGTERM received; starting graceful shutdown");
329343
ServerCommand::Stop {
330344
graceful: true,
@@ -333,7 +347,7 @@ impl ServerInner {
333347
}
334348
}
335349

336-
SignalKind::Quit => {
350+
SignalKind::OsQuit => {
337351
info!("SIGQUIT received; starting forced shutdown");
338352
ServerCommand::Stop {
339353
graceful: false,
@@ -347,7 +361,7 @@ impl ServerInner {
347361

348362
struct ServerEventMultiplexer {
349363
cmd_rx: UnboundedReceiver<ServerCommand>,
350-
signal_fut: Option<Signals>,
364+
signal_fut: Option<StopSignal>,
351365
}
352366

353367
impl Stream for ServerEventMultiplexer {

actix-server/src/signals.rs

Lines changed: 51 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,79 @@
11
use std::{
22
fmt,
33
future::Future,
4-
pin::Pin,
4+
pin::{pin, Pin},
55
task::{Context, Poll},
66
};
77

8+
use futures_core::future::BoxFuture;
89
use tracing::trace;
910

1011
/// Types of process signals.
1112
// #[allow(dead_code)]
1213
#[derive(Debug, Clone, Copy, PartialEq)]
1314
#[allow(dead_code)] // variants are never constructed on non-unix
1415
pub(crate) enum SignalKind {
15-
/// `SIGINT`
16-
Int,
16+
/// Cancellation token or channel.
17+
Cancel,
1718

18-
/// `SIGTERM`
19-
Term,
19+
/// OS `SIGINT`.
20+
OsInt,
2021

21-
/// `SIGQUIT`
22-
Quit,
22+
/// OS `SIGTERM`.
23+
OsTerm,
24+
25+
/// OS `SIGQUIT`.
26+
OsQuit,
2327
}
2428

2529
impl fmt::Display for SignalKind {
2630
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2731
f.write_str(match self {
28-
SignalKind::Int => "SIGINT",
29-
SignalKind::Term => "SIGTERM",
30-
SignalKind::Quit => "SIGQUIT",
32+
SignalKind::Cancel => "Cancellation token or channel",
33+
SignalKind::OsInt => "SIGINT",
34+
SignalKind::OsTerm => "SIGTERM",
35+
SignalKind::OsQuit => "SIGQUIT",
3136
})
3237
}
3338
}
3439

40+
pub(crate) enum StopSignal {
41+
/// OS signal handling is configured.
42+
Os(OsSignals),
43+
44+
/// Cancellation token or channel.
45+
Cancel(BoxFuture<'static, ()>),
46+
}
47+
48+
impl Future for StopSignal {
49+
type Output = SignalKind;
50+
51+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
52+
match self.get_mut() {
53+
StopSignal::Os(os_signals) => pin!(os_signals).poll(cx),
54+
StopSignal::Cancel(cancel) => pin!(cancel).poll(cx).map(|()| SignalKind::Cancel),
55+
}
56+
}
57+
}
58+
3559
/// Process signal listener.
36-
pub(crate) struct Signals {
60+
#[derive(Debug)]
61+
pub(crate) struct OsSignals {
3762
#[cfg(not(unix))]
3863
signals: futures_core::future::BoxFuture<'static, std::io::Result<()>>,
3964

4065
#[cfg(unix)]
4166
signals: Vec<(SignalKind, actix_rt::signal::unix::Signal)>,
4267
}
4368

44-
impl Signals {
69+
impl OsSignals {
4570
/// Constructs an OS signal listening future.
4671
pub(crate) fn new() -> Self {
4772
trace!("setting up OS signal listener");
4873

4974
#[cfg(not(unix))]
5075
{
51-
Signals {
76+
OsSignals {
5277
signals: Box::pin(actix_rt::signal::ctrl_c()),
5378
}
5479
}
@@ -58,9 +83,9 @@ impl Signals {
5883
use actix_rt::signal::unix;
5984

6085
let sig_map = [
61-
(unix::SignalKind::interrupt(), SignalKind::Int),
62-
(unix::SignalKind::terminate(), SignalKind::Term),
63-
(unix::SignalKind::quit(), SignalKind::Quit),
86+
(unix::SignalKind::interrupt(), SignalKind::OsInt),
87+
(unix::SignalKind::terminate(), SignalKind::OsTerm),
88+
(unix::SignalKind::quit(), SignalKind::OsQuit),
6489
];
6590

6691
let signals = sig_map
@@ -79,18 +104,18 @@ impl Signals {
79104
})
80105
.collect::<Vec<_>>();
81106

82-
Signals { signals }
107+
OsSignals { signals }
83108
}
84109
}
85110
}
86111

87-
impl Future for Signals {
112+
impl Future for OsSignals {
88113
type Output = SignalKind;
89114

90115
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
91116
#[cfg(not(unix))]
92117
{
93-
self.signals.as_mut().poll(cx).map(|_| SignalKind::Int)
118+
self.signals.as_mut().poll(cx).map(|_| SignalKind::OsInt)
94119
}
95120

96121
#[cfg(unix)]
@@ -106,3 +131,10 @@ impl Future for Signals {
106131
}
107132
}
108133
}
134+
135+
#[cfg(test)]
136+
mod tests {
137+
use super::*;
138+
139+
static_assertions::assert_impl_all!(StopSignal: Send, Unpin);
140+
}

0 commit comments

Comments
 (0)