Skip to content

Commit 812929b

Browse files
feat(internal telemetry at shutdown): close internal sources after external ones (vectordotdev#17741)
<!-- **Your PR title must conform to the conventional commit spec!** <type>(<scope>)!: <description> * `type` = chore, enhancement, feat, fix, docs * `!` = OPTIONAL: signals a breaking change * `scope` = Optional when `type` is "chore" or "docs", available scopes https://github.com/vectordotdev/vector/blob/master/.github/semantic.yml#L20 * `description` = short description of the change Examples: * enhancement(file source): Add `sort` option to sort discovered files * feat(new source): Initial `statsd` source * fix(file source): Fix a bug discovering new files * chore(external docs): Clarify `batch_size` option --> We would like to close the internal logs, metrics, and trace sources sent from Vector as late as possible during shutdown to facilitate debugging. In this PR, we wait until all other sources are shut down before shutting down internal telemetry sources. This means that shutdown may be a bit longer, but we will have better observability on the shutdown process. issue: vectordotdev#15912
1 parent c19938c commit 812929b

File tree

5 files changed

+30
-16
lines changed

5 files changed

+30
-16
lines changed

lib/vector-common/src/shutdown.rs

+22-11
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,11 @@ impl ShutdownSignal {
107107
}
108108
}
109109

110+
type IsInternal = bool;
111+
110112
#[derive(Debug, Default)]
111113
pub struct SourceShutdownCoordinator {
112-
shutdown_begun_triggers: HashMap<ComponentKey, Trigger>,
114+
shutdown_begun_triggers: HashMap<ComponentKey, (IsInternal, Trigger)>,
113115
shutdown_force_triggers: HashMap<ComponentKey, Trigger>,
114116
shutdown_complete_tripwires: HashMap<ComponentKey, Tripwire>,
115117
}
@@ -121,13 +123,14 @@ impl SourceShutdownCoordinator {
121123
pub fn register_source(
122124
&mut self,
123125
id: &ComponentKey,
126+
internal: bool,
124127
) -> (ShutdownSignal, impl Future<Output = ()>) {
125128
let (shutdown_begun_trigger, shutdown_begun_tripwire) = Tripwire::new();
126129
let (force_shutdown_trigger, force_shutdown_tripwire) = Tripwire::new();
127130
let (shutdown_complete_trigger, shutdown_complete_tripwire) = Tripwire::new();
128131

129132
self.shutdown_begun_triggers
130-
.insert(id.clone(), shutdown_begun_trigger);
133+
.insert(id.clone(), (internal, shutdown_begun_trigger));
131134
self.shutdown_force_triggers
132135
.insert(id.clone(), force_shutdown_trigger);
133136
self.shutdown_complete_tripwires
@@ -201,13 +204,14 @@ impl SourceShutdownCoordinator {
201204
/// Panics if this coordinator has had its triggers removed (ie
202205
/// has been taken over with `Self::takeover_source`).
203206
pub fn shutdown_all(self, deadline: Option<Instant>) -> impl Future<Output = ()> {
204-
let mut complete_futures = Vec::new();
207+
let mut internal_sources_complete_futures = Vec::new();
208+
let mut external_sources_complete_futures = Vec::new();
205209

206210
let shutdown_begun_triggers = self.shutdown_begun_triggers;
207211
let mut shutdown_complete_tripwires = self.shutdown_complete_tripwires;
208212
let mut shutdown_force_triggers = self.shutdown_force_triggers;
209213

210-
for (id, trigger) in shutdown_begun_triggers {
214+
for (id, (internal, trigger)) in shutdown_begun_triggers {
211215
trigger.cancel();
212216

213217
let shutdown_complete_tripwire =
@@ -229,10 +233,16 @@ impl SourceShutdownCoordinator {
229233
deadline,
230234
);
231235

232-
complete_futures.push(source_complete);
236+
if internal {
237+
internal_sources_complete_futures.push(source_complete);
238+
} else {
239+
external_sources_complete_futures.push(source_complete);
240+
}
233241
}
234242

235-
futures::future::join_all(complete_futures).map(|_| ())
243+
futures::future::join_all(external_sources_complete_futures)
244+
.then(|_| futures::future::join_all(internal_sources_complete_futures))
245+
.map(|_| ())
236246
}
237247

238248
/// Sends the signal to the given source to begin shutting down. Returns a future that resolves
@@ -250,11 +260,12 @@ impl SourceShutdownCoordinator {
250260
id: &ComponentKey,
251261
deadline: Instant,
252262
) -> impl Future<Output = bool> {
253-
let begin_shutdown_trigger = self.shutdown_begun_triggers.remove(id).unwrap_or_else(|| {
254-
panic!(
263+
let (_, begin_shutdown_trigger) =
264+
self.shutdown_begun_triggers.remove(id).unwrap_or_else(|| {
265+
panic!(
255266
"shutdown_begun_trigger for source \"{id}\" not found in the ShutdownCoordinator"
256267
)
257-
});
268+
});
258269
// This is what actually triggers the source to begin shutting down.
259270
begin_shutdown_trigger.cancel();
260271

@@ -336,7 +347,7 @@ mod test {
336347
let mut shutdown = SourceShutdownCoordinator::default();
337348
let id = ComponentKey::from("test");
338349

339-
let (shutdown_signal, _) = shutdown.register_source(&id);
350+
let (shutdown_signal, _) = shutdown.register_source(&id, false);
340351

341352
let deadline = Instant::now() + Duration::from_secs(1);
342353
let shutdown_complete = shutdown.shutdown_source(&id, deadline);
@@ -352,7 +363,7 @@ mod test {
352363
let mut shutdown = SourceShutdownCoordinator::default();
353364
let id = ComponentKey::from("test");
354365

355-
let (_shutdown_signal, force_shutdown_tripwire) = shutdown.register_source(&id);
366+
let (_shutdown_signal, force_shutdown_tripwire) = shutdown.register_source(&id, false);
356367

357368
let deadline = Instant::now() + Duration::from_secs(1);
358369
let shutdown_complete = shutdown.shutdown_source(&id, deadline);

src/config/source.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ impl SourceContext {
143143
out: SourceSender,
144144
) -> (Self, crate::shutdown::SourceShutdownCoordinator) {
145145
let mut shutdown = crate::shutdown::SourceShutdownCoordinator::default();
146-
let (shutdown_signal, _) = shutdown.register_source(key);
146+
let (shutdown_signal, _) = shutdown.register_source(key, false);
147147
(
148148
Self {
149149
key: key.clone(),

src/sources/socket/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -872,7 +872,7 @@ mod test {
872872
source_id: &ComponentKey,
873873
shutdown: &mut SourceShutdownCoordinator,
874874
) -> (SocketAddr, JoinHandle<Result<(), ()>>) {
875-
let (shutdown_signal, _) = shutdown.register_source(source_id);
875+
let (shutdown_signal, _) = shutdown.register_source(source_id, false);
876876
init_udp_inner(sender, source_id, shutdown_signal, None, false).await
877877
}
878878

src/sources/util/framestream.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -728,7 +728,7 @@ mod test {
728728
let source_id = ComponentKey::from(source_id);
729729
let socket_path = frame_handler.socket_path();
730730
let mut shutdown = SourceShutdownCoordinator::default();
731-
let (shutdown_signal, _) = shutdown.register_source(&source_id);
731+
let (shutdown_signal, _) = shutdown.register_source(&source_id, false);
732732
let server = build_framestream_unix_source(frame_handler, shutdown_signal, pipeline)
733733
.expect("Failed to build framestream unix source.");
734734

src/topology/builder.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ static TRANSFORM_CONCURRENCY_LIMIT: Lazy<usize> = Lazy::new(|| {
7171
.unwrap_or_else(crate::num_threads)
7272
});
7373

74+
const INTERNAL_SOURCES: [&str; 2] = ["internal_logs", "internal_metrics"];
75+
7476
/// Builds only the new pieces, and doesn't check their topology.
7577
pub async fn build_pieces(
7678
config: &super::Config,
@@ -313,8 +315,9 @@ impl<'a> Builder<'a> {
313315

314316
let pipeline = builder.build();
315317

316-
let (shutdown_signal, force_shutdown_tripwire) =
317-
self.shutdown_coordinator.register_source(key);
318+
let (shutdown_signal, force_shutdown_tripwire) = self
319+
.shutdown_coordinator
320+
.register_source(key, INTERNAL_SOURCES.contains(&typetag));
318321

319322
let context = SourceContext {
320323
key: key.clone(),

0 commit comments

Comments
 (0)