Skip to content

Commit e6ace81

Browse files
committed
bug(tracing): register_callsite doesn't get called
This branch contains code to reproduce an issue where a single global subscriber that takes a "long time" in `register_callsite` causes the second of 2 threads that emit the same event to not receive `register_callsite` at all, but it does receive `event`. It also contains an attempt at a fix in `DefaultCallsite::register` for when another thread is registering the same callsite, in this case we will spin-lock until the registration is complete. This fix complements the one implemented in #2938. The two fixes are really for different problems which manifest in the same way (`event` getting called without `register_callsite` having been called first). The issue for this problem is #2743.
1 parent cdc3212 commit e6ace81

File tree

6 files changed

+339
-33
lines changed

6 files changed

+339
-33
lines changed

tracing-core/src/callsite.rs

Lines changed: 79 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -307,46 +307,75 @@ impl DefaultCallsite {
307307
// This only happens once (or if the cached interest value was corrupted).
308308
#[cold]
309309
pub fn register(&'static self) -> Interest {
310-
// Attempt to advance the registration state to `REGISTERING`...
311-
match self.registration.compare_exchange(
312-
Self::UNREGISTERED,
313-
Self::REGISTERING,
314-
Ordering::AcqRel,
315-
Ordering::Acquire,
316-
) {
317-
Ok(_) => {
318-
// Okay, we advanced the state, try to register the callsite.
319-
CALLSITES.push_default(self);
320-
rebuild_callsite_interest(self, &DISPATCHERS.rebuilder());
321-
self.registration.store(Self::REGISTERED, Ordering::Release);
322-
}
323-
// Great, the callsite is already registered! Just load its
324-
// previous cached interest.
325-
Err(Self::REGISTERED) => {}
326-
// Someone else is registering...
327-
Err(_state) => {
328-
debug_assert_eq!(
329-
_state,
330-
Self::REGISTERING,
331-
"weird callsite registration state"
310+
let mut count = 0;
311+
loop {
312+
// Attempt to advance the registration state to `REGISTERING`...
313+
let prev_state = self.registration.compare_exchange(
314+
Self::UNREGISTERED,
315+
Self::REGISTERING,
316+
Ordering::AcqRel,
317+
Ordering::Acquire,
318+
);
319+
if count == 0 {
320+
println!(
321+
"{thread:?}: DefaultCallsite::register: prev_state: {prev_state:?}",
322+
thread = std::thread::current().name()
332323
);
333-
// Just hit `enabled` this time.
334-
return Interest::sometimes();
324+
}
325+
count += 1;
326+
match prev_state {
327+
Ok(_) => {
328+
// Okay, we advanced the state, try to register the callsite.
329+
CALLSITES.push_default(self);
330+
rebuild_callsite_interest(self, &DISPATCHERS.rebuilder());
331+
self.registration.store(Self::REGISTERED, Ordering::Release);
332+
break;
333+
}
334+
// Great, the callsite is already registered! Just load its
335+
// previous cached interest.
336+
Err(Self::REGISTERED) => break,
337+
// Someone else is registering...
338+
Err(_state) => {
339+
debug_assert_eq!(
340+
_state,
341+
Self::REGISTERING,
342+
"weird callsite registration state: {_state}"
343+
);
344+
// The callsite is being registered. We have to wait until
345+
// registration is finished, otherwise
346+
continue;
347+
}
335348
}
336349
}
337350

351+
println!(
352+
"{thread:?}: DefaultCallsite::register: loop count: {count}",
353+
thread = std::thread::current().name()
354+
);
338355
match self.interest.load(Ordering::Relaxed) {
339356
Self::INTEREST_NEVER => Interest::never(),
340357
Self::INTEREST_ALWAYS => Interest::always(),
341-
_ => Interest::sometimes(),
358+
Self::INTEREST_SOMETIMES => Interest::sometimes(),
359+
other_interest => {
360+
println!(
361+
"{thread:?} DefaultCallsite::register: other_interest {other_interest}",
362+
thread = std::thread::current().name()
363+
);
364+
Interest::sometimes()
365+
}
342366
}
343367
}
344368

345369
/// Returns the callsite's cached `Interest`, or registers it for the
346370
/// first time if it has not yet been registered.
347371
#[inline]
348372
pub fn interest(&'static self) -> Interest {
349-
match self.interest.load(Ordering::Relaxed) {
373+
let interest = self.interest.load(Ordering::Relaxed);
374+
println!(
375+
"{thread:?} interest: {interest}",
376+
thread = std::thread::current().name()
377+
);
378+
match interest {
350379
Self::INTEREST_NEVER => Interest::never(),
351380
Self::INTEREST_SOMETIMES => Interest::sometimes(),
352381
Self::INTEREST_ALWAYS => Interest::always(),
@@ -406,6 +435,10 @@ impl Callsites {
406435
///
407436
/// This also re-computes the max level hint.
408437
fn rebuild_interest(&self, dispatchers: dispatchers::Rebuilder<'_>) {
438+
println!(
439+
"{thread:?}: rebuild_interest: dispatchers: {dispatchers:?}",
440+
thread = std::thread::current().name()
441+
);
409442
let mut max_level = LevelFilter::OFF;
410443
dispatchers.for_each(|dispatch| {
411444
// If the subscriber did not provide a max level hint, assume
@@ -493,7 +526,10 @@ fn rebuild_callsite_interest(
493526
dispatchers: &dispatchers::Rebuilder<'_>,
494527
) {
495528
let meta = callsite.metadata();
496-
529+
println!(
530+
"{thread:?}: rebuild_callsite_interest: dispatchers: {dispatchers:?}",
531+
thread = std::thread::current().name()
532+
);
497533
let mut interest = None;
498534
dispatchers.for_each(|dispatch| {
499535
let this_interest = dispatch.register_callsite(meta);
@@ -534,6 +570,22 @@ mod dispatchers {
534570
Write(RwLockWriteGuard<'a, Vec<dispatcher::Registrar>>),
535571
}
536572

573+
impl<'a> std::fmt::Debug for Rebuilder<'a> {
574+
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
575+
match self {
576+
Self::JustOne => write!(f, "JustOne"),
577+
Self::Read(arg0) => f
578+
.debug_tuple("Read")
579+
.field(&arg0.len() as &dyn std::fmt::Debug)
580+
.finish(),
581+
Self::Write(arg0) => f
582+
.debug_tuple("Write")
583+
.field(&arg0.len() as &dyn std::fmt::Debug)
584+
.finish(),
585+
}
586+
}
587+
}
588+
537589
impl Dispatchers {
538590
pub(super) const fn new() -> Self {
539591
Self {
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
use std::{
2+
ptr,
3+
sync::atomic::{AtomicPtr, Ordering},
4+
thread,
5+
time::Duration,
6+
};
7+
8+
use tracing_core::{
9+
callsite::{Callsite as _, DefaultCallsite},
10+
dispatcher,
11+
field::{FieldSet, Value},
12+
span, Event, Kind, Level, Metadata, Subscriber,
13+
};
14+
15+
struct TestSubscriber {
16+
sleep: Duration,
17+
callsite: AtomicPtr<Metadata<'static>>,
18+
}
19+
20+
impl TestSubscriber {
21+
fn new(sleep_micros: u64) -> Self {
22+
Self {
23+
sleep: Duration::from_micros(sleep_micros),
24+
callsite: AtomicPtr::new(ptr::null_mut()),
25+
}
26+
}
27+
}
28+
29+
impl Subscriber for TestSubscriber {
30+
fn register_callsite(&self, metadata: &'static Metadata<'static>) -> tracing_core::Interest {
31+
println!(
32+
"{thread:?}: TestSubscriber::register_callsite: start",
33+
thread = std::thread::current().name()
34+
);
35+
if !self.sleep.is_zero() {
36+
thread::sleep(self.sleep);
37+
}
38+
39+
self.callsite
40+
.store(metadata as *const _ as *mut _, Ordering::SeqCst);
41+
42+
println!(
43+
"{thread:?}: TestSubscriber::register_callsite: end",
44+
thread = std::thread::current().name()
45+
);
46+
tracing_core::Interest::always()
47+
}
48+
49+
fn event(&self, event: &tracing_core::Event<'_>) {
50+
println!(
51+
"{thread:?}: TestSubscriber::register_callsite: start",
52+
thread = std::thread::current().name()
53+
);
54+
let stored_callsite = self.callsite.load(Ordering::SeqCst);
55+
let event_callsite: *mut Metadata<'static> = event.metadata() as *const _ as *mut _;
56+
57+
// This assert is the actual test.
58+
assert_eq!(
59+
stored_callsite, event_callsite,
60+
"stored callsite: {stored_callsite:#?} does not match event \
61+
callsite: {event_callsite:#?}. Was `event` called before \
62+
`register_callsite`?"
63+
);
64+
}
65+
66+
fn enabled(&self, _metadata: &Metadata<'_>) -> bool {
67+
true
68+
}
69+
fn new_span(&self, _span: &span::Attributes<'_>) -> span::Id {
70+
span::Id::from_u64(0)
71+
}
72+
fn record(&self, _span: &span::Id, _values: &span::Record<'_>) {}
73+
fn record_follows_from(&self, _span: &span::Id, _follows: &span::Id) {}
74+
fn enter(&self, _span: &tracing_core::span::Id) {}
75+
fn exit(&self, _span: &tracing_core::span::Id) {}
76+
}
77+
78+
fn emit_event() {
79+
let thread = thread::current();
80+
static CALLSITE: DefaultCallsite = {
81+
// The values of the metadata are unimportant
82+
static META: Metadata<'static> = Metadata::new(
83+
"event ",
84+
"module::path",
85+
Level::INFO,
86+
None,
87+
None,
88+
None,
89+
FieldSet::new(&["message"], tracing_core::callsite::Identifier(&CALLSITE)),
90+
Kind::EVENT,
91+
);
92+
DefaultCallsite::new(&META)
93+
};
94+
let _interest = CALLSITE.interest();
95+
96+
let meta = CALLSITE.metadata();
97+
let field = meta.fields().field("message").unwrap();
98+
let message = format!("event-from-{idx}", idx = thread.name().unwrap_or("unnamed"));
99+
let values = [(&field, Some(&message as &dyn Value))];
100+
let value_set = CALLSITE.metadata().fields().value_set(&values);
101+
102+
Event::dispatch(meta, &value_set);
103+
}
104+
105+
/// Regression test for missing register_callsite call (#2743)
106+
///
107+
/// This test provokes the race condition which causes the only (global) subscriber to not receive
108+
/// a call to `register_callsite` before it receives a call to `event`. This occurs when the
109+
/// (first) call to `register_callsite` takes a long time to complete and the second thread that
110+
/// attempts to register the same callsite finds that some other thread is already registering and
111+
/// leaves `DefaultCallsite::register` before the first registration is complete.
112+
///
113+
/// Because the test depends on the interaction of multiple dispatchers in different threads,
114+
/// it needs to be in a test file by itself.
115+
#[test]
116+
fn event_before_register() {
117+
let subscriber_register_sleep_micros = 1000;
118+
119+
let subscriber = TestSubscriber::new(subscriber_register_sleep_micros);
120+
dispatcher::set_global_default(subscriber.into()).unwrap();
121+
122+
let jh1 = thread::Builder::new().name("thread-1".into()).spawn(emit_event).unwrap();
123+
let jh2 = thread::Builder::new().name("thread-2".into()).spawn(emit_event).unwrap();
124+
125+
jh1.join().expect("failed to join thread");
126+
jh2.join().expect("failed to join thread");
127+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
use std::sync::{
2+
Arc,
3+
atomic::{AtomicBool, Ordering::SeqCst},
4+
};
5+
6+
use tracing::{Event, Metadata};
7+
use tracing_subscriber::{
8+
layer::{Context, SubscriberExt},
9+
registry::LookupSpan,
10+
util::SubscriberInitExt,
11+
};
12+
13+
#[derive(Clone, Debug, Default)]
14+
struct MyLayer {
15+
callsite_registered: Arc<AtomicBool>,
16+
}
17+
18+
impl<S> tracing_subscriber::Layer<S> for MyLayer
19+
where
20+
S: tracing::Subscriber + for<'a> LookupSpan<'a>,
21+
{
22+
fn register_callsite(&self, _m: &'static Metadata<'static>) -> tracing::subscriber::Interest {
23+
std::thread::sleep(std::time::Duration::from_millis(100)); // Simulate some work
24+
self.callsite_registered.store(true, SeqCst);
25+
tracing::subscriber::Interest::always()
26+
}
27+
28+
fn on_event(&self, _event: &Event<'_>, _ctx: Context<'_, S>) {
29+
assert!(self.callsite_registered.load(SeqCst));
30+
}
31+
}
32+
33+
#[test]
34+
fn missed_register_callsite() {
35+
let my_layer = MyLayer::default();
36+
tracing_subscriber::registry().with(my_layer.clone()).init();
37+
38+
std::thread::scope(|s| {
39+
for i in 0..16 {
40+
s.spawn(move || tracing::info!("Thread {} started", i));
41+
}
42+
});
43+
44+
dbg!(my_layer);
45+
}

tracing/Cargo.toml

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ tracing-attributes = { path = "../tracing-attributes", version = "0.1.28", optio
3131
pin-project-lite = "0.2.9"
3232

3333
[dev-dependencies]
34+
backtrace = "0.3.71"
3435
criterion = { version = "0.3.6", default-features = false }
3536
futures = { version = "0.3.21", default-features = false }
3637
log = "0.4.17"
@@ -42,17 +43,17 @@ wasm-bindgen-test = "0.3.38"
4243
[features]
4344
default = ["std", "attributes"]
4445

45-
max_level_off = []
46+
max_level_off = []
4647
max_level_error = []
47-
max_level_warn = []
48-
max_level_info = []
48+
max_level_warn = []
49+
max_level_info = []
4950
max_level_debug = []
5051
max_level_trace = []
5152

52-
release_max_level_off = []
53+
release_max_level_off = []
5354
release_max_level_error = []
54-
release_max_level_warn = []
55-
release_max_level_info = []
55+
release_max_level_warn = []
56+
release_max_level_info = []
5657
release_max_level_debug = []
5758
release_max_level_trace = []
5859

0 commit comments

Comments
 (0)