Skip to content

Commit fcd9025

Browse files
authored
Introduced RemoteRuntime wrapper in the agent (#3227)
* Introduced RemoteRuntime type * Fix * Moved all BG tasks setup to a submodule * Fixed docs * Blackbox text improved * Some more fixes * More fixes * Removed blackbox test * Task setup fix * CR suggestions * MaybeRemoteRuntime -> BgTaskRuntime
1 parent 38ec566 commit fcd9025

22 files changed

+770
-866
lines changed

Cargo.lock

-7
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Reworked agent's threading model to avoid spawning excessive threads.

mirrord/agent/Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,3 @@ rcgen.workspace = true
8282
reqwest.workspace = true
8383
rstest.workspace = true
8484
tempfile.workspace = true
85-
test_bin = "0.4"

mirrord/agent/src/cli.rs

-2
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,6 @@ pub enum Mode {
9898
},
9999
#[default]
100100
Targetless,
101-
#[clap(hide = true)]
102-
BlackboxTest,
103101
}
104102

105103
impl Mode {

mirrord/agent/src/dns.rs

+55-37
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use std::{future, io, path::PathBuf, sync::atomic::Ordering, time::Duration};
1+
use std::{
2+
collections::HashMap, future, io, path::PathBuf, sync::atomic::Ordering, time::Duration,
3+
};
24

35
use futures::{stream::FuturesOrdered, StreamExt};
46
use hickory_resolver::{
@@ -24,16 +26,12 @@ use tokio::{
2426
mpsc::{Receiver, Sender},
2527
oneshot,
2628
},
27-
task::JoinSet,
29+
task::{Id, JoinSet},
2830
};
2931
use tokio_util::sync::CancellationToken;
3032
use tracing::{warn, Level};
3133

32-
use crate::{
33-
error::{AgentError, AgentResult},
34-
metrics::DNS_REQUEST_COUNT,
35-
watched_task::TaskStatus,
36-
};
34+
use crate::{error::AgentResult, metrics::DNS_REQUEST_COUNT, util::remote_runtime::BgTaskStatus};
3735

3836
#[derive(Debug)]
3937
pub(crate) enum ClientGetAddrInfoRequest {
@@ -54,7 +52,7 @@ impl ClientGetAddrInfoRequest {
5452
#[derive(Debug)]
5553
pub(crate) struct DnsCommand {
5654
request: ClientGetAddrInfoRequest,
57-
response_tx: oneshot::Sender<Result<DnsLookup, InternalLookupError>>,
55+
response_tx: oneshot::Sender<Result<DnsLookup, ResolveErrorKindInternal>>,
5856
}
5957

6058
/// Background task for resolving hostnames to IP addresses.
@@ -80,12 +78,11 @@ pub(crate) struct DnsWorker {
8078
/// Background tasks that handle the DNS requests.
8179
///
8280
/// Each of these builds a new [`TokioAsyncResolver`] and performs one lookup.
83-
tasks: JoinSet<()>,
81+
tasks: JoinSet<Result<DnsLookup, InternalLookupError>>,
82+
response_txs: HashMap<Id, oneshot::Sender<Result<DnsLookup, ResolveErrorKindInternal>>>,
8483
}
8584

8685
impl DnsWorker {
87-
pub const TASK_NAME: &'static str = "DNS worker";
88-
8986
/// Creates a new instance of this worker.
9087
/// To run this worker, call [`Self::run`].
9188
///
@@ -124,6 +121,7 @@ impl DnsWorker {
124121
attempts,
125122
support_ipv6,
126123
tasks: Default::default(),
124+
response_txs: Default::default(),
127125
}
128126
}
129127

@@ -203,34 +201,51 @@ impl DnsWorker {
203201
let attempts = self.attempts;
204202
let support_ipv6 = self.support_ipv6;
205203

206-
let lookup_future = async move {
207-
let result = Self::do_lookup(
208-
etc_path,
209-
message.request.into_v2(),
210-
attempts,
211-
timeout,
212-
support_ipv6,
213-
)
214-
.await;
215-
216-
let _ = message.response_tx.send(result);
217-
};
204+
let handle = self.tasks.spawn(Self::do_lookup(
205+
etc_path,
206+
message.request.into_v2(),
207+
attempts,
208+
timeout,
209+
support_ipv6,
210+
));
211+
self.response_txs.insert(handle.id(), message.response_tx);
218212

219213
DNS_REQUEST_COUNT.fetch_add(1, Ordering::Relaxed);
220-
self.tasks.spawn(lookup_future);
221214
}
222215

223-
pub(crate) async fn run(mut self, cancellation_token: CancellationToken) -> AgentResult<()> {
216+
pub(crate) async fn run(mut self, cancellation_token: CancellationToken) {
224217
loop {
225218
tokio::select! {
226-
_ = cancellation_token.cancelled() => break Ok(()),
219+
_ = cancellation_token.cancelled() => break,
227220

228-
Some(..) = self.tasks.join_next() => {
221+
Some(result) = self.tasks.join_next_with_id() => {
229222
DNS_REQUEST_COUNT.fetch_sub(1, Ordering::Relaxed);
223+
let (id, result) = match result {
224+
Ok((id, result)) => (
225+
id,
226+
result.map_err(Into::into),
227+
),
228+
Err(error) => {
229+
(
230+
error.id(),
231+
Err(ResolveErrorKindInternal::Message("DNS task panicked".into()))
232+
)
233+
}
234+
};
235+
236+
let response_tx = self.response_txs.remove(&id);
237+
match response_tx {
238+
Some(response_tx) => {
239+
let _ = response_tx.send(result);
240+
}
241+
None => {
242+
warn!(?id, "Received a DNS result with no matching response channel");
243+
}
244+
}
230245
}
231246

232247
message = self.request_rx.recv() => match message {
233-
None => break Ok(()),
248+
None => break,
234249
Some(message) => self.handle_message(message),
235250
},
236251
}
@@ -246,15 +261,15 @@ impl Drop for DnsWorker {
246261
}
247262

248263
pub(crate) struct DnsApi {
249-
task_status: TaskStatus,
264+
task_status: BgTaskStatus,
250265
request_tx: Sender<DnsCommand>,
251266
/// [`DnsWorker`] processes all requests concurrently, so we use a combination of [`oneshot`]
252267
/// channels and [`FuturesOrdered`] to preserve order of responses.
253-
responses: FuturesOrdered<oneshot::Receiver<Result<DnsLookup, InternalLookupError>>>,
268+
responses: FuturesOrdered<oneshot::Receiver<Result<DnsLookup, ResolveErrorKindInternal>>>,
254269
}
255270

256271
impl DnsApi {
257-
pub(crate) fn new(task_status: TaskStatus, task_sender: Sender<DnsCommand>) -> Self {
272+
pub(crate) fn new(task_status: BgTaskStatus, task_sender: Sender<DnsCommand>) -> Self {
258273
Self {
259274
task_status,
260275
request_tx: task_sender,
@@ -276,7 +291,7 @@ impl DnsApi {
276291
response_tx,
277292
};
278293
if self.request_tx.send(command).await.is_err() {
279-
return Err(self.task_status.unwrap_err().await);
294+
return Err(self.task_status.wait_assert_running().await);
280295
}
281296

282297
self.responses.push_back(response_rx);
@@ -294,11 +309,14 @@ impl DnsApi {
294309
return future::pending().await;
295310
};
296311

297-
let response = response
298-
.map_err(|_| AgentError::DnsTaskPanic)?
299-
.map_err(|error| ResponseError::DnsLookup(DnsLookupError { kind: error.into() }));
300-
301-
Ok(GetAddrInfoResponse(response))
312+
match response {
313+
Ok(response) => {
314+
Ok(GetAddrInfoResponse(response.map_err(|kind| {
315+
ResponseError::DnsLookup(DnsLookupError { kind })
316+
})))
317+
}
318+
Err(..) => Err(self.task_status.wait_assert_running().await),
319+
}
302320
}
303321
}
304322

0 commit comments

Comments
 (0)