Skip to content

Commit 3046981

Browse files
authored
subscriber: implement FIlter for reload::Subscriber (#2159)
## Motivation The `reload` subscriber doesn't (and can't) implement downcasting correctly, which breaks certain subscribers like the opentelemetry one. ## Solution Most uses of the `reload` module (including mine) are just to change the filter. Therefore, this PR implements `Filter` for `reload::Subscriber` to allow users to not need to wrap the whole layer trait. Another advantage of this is that the common-case critical sections are shortened.
1 parent 758df19 commit 3046981

File tree

2 files changed

+155
-23
lines changed

2 files changed

+155
-23
lines changed

tracing-subscriber/src/reload.rs

Lines changed: 96 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,25 @@
11
//! Wrapper for a `Collect` or `Subscribe` to allow it to be dynamically reloaded.
22
//!
3-
//! This module provides a type implementing [`Subscribe`] which wraps another type implementing
4-
//! the [`Subscribe`] trait, allowing the wrapped type to be replaced with another
3+
//! This module provides a type implementing [`Subscribe`] and [`Filter`]
4+
//! which wraps another type implementing the corresponding trait. This
5+
//! allows the wrapped type to be replaced with another
56
//! instance of that type at runtime.
67
//!
7-
//! This can be used in cases where a subset of `Collect` functionality
8+
//! This can be used in cases where a subset of `Collect` or `Filter` functionality
89
//! should be dynamically reconfigured, such as when filtering directives may
910
//! change at runtime. Note that this subscriber introduces a (relatively small)
1011
//! amount of overhead, and should thus only be used as needed.
1112
//!
13+
//! ## Note
14+
//!
15+
//! //! The [`Subscribe`] implementation is unable to implement downcasting functionality,
16+
//! so certain `Subscribers` will fail to reload if wrapped in a `reload::Subscriber`.
17+
//!
18+
//! If you only want to be able to dynamically change the
19+
//! `Filter` on your layer, prefer wrapping that `Filter` in the `reload::Subscriber`.
20+
//!
1221
//! [`Subscribe`]: crate::Subscribe
22+
//! [`Filter`]: crate::subscribe::Filter
1323
use crate::subscribe;
1424
use crate::sync::RwLock;
1525

@@ -23,7 +33,10 @@ use tracing_core::{
2333
span, Event, Metadata,
2434
};
2535

26-
/// Wraps a `Collect` or `Subscribe`, allowing it to be reloaded dynamically at runtime.
36+
/// Wraps a `Filter` or `Subscribe`, allowing it to be reloaded dynamically at runtime.
37+
///
38+
/// [`Filter`]: crate::subscribe::Filter
39+
/// [`Subscribe`]: crate::Subscribe
2740
#[derive(Debug)]
2841
pub struct Subscriber<S> {
2942
// TODO(eliza): this once used a `crossbeam_util::ShardedRwLock`. We may
@@ -119,10 +132,67 @@ where
119132
}
120133
}
121134

122-
impl<S> Subscriber<S> {
123-
/// Wraps the given `Subscribe`, returning a subscriber and a `Handle` that allows
135+
#[cfg(all(feature = "registry", feature = "std"))]
136+
#[cfg_attr(docsrs, doc(cfg(all(feature = "registry", feature = "std"))))]
137+
impl<S, C> crate::subscribe::Filter<C> for Subscriber<S>
138+
where
139+
S: crate::subscribe::Filter<C> + 'static,
140+
C: Collect,
141+
{
142+
#[inline]
143+
fn callsite_enabled(&self, metadata: &'static Metadata<'static>) -> Interest {
144+
try_lock!(self.inner.read(), else return Interest::sometimes()).callsite_enabled(metadata)
145+
}
146+
147+
#[inline]
148+
fn enabled(&self, metadata: &Metadata<'_>, ctx: &subscribe::Context<'_, C>) -> bool {
149+
try_lock!(self.inner.read(), else return false).enabled(metadata, ctx)
150+
}
151+
152+
#[inline]
153+
fn on_new_span(
154+
&self,
155+
attrs: &span::Attributes<'_>,
156+
id: &span::Id,
157+
ctx: subscribe::Context<'_, C>,
158+
) {
159+
try_lock!(self.inner.read()).on_new_span(attrs, id, ctx)
160+
}
161+
162+
#[inline]
163+
fn on_record(
164+
&self,
165+
span: &span::Id,
166+
values: &span::Record<'_>,
167+
ctx: subscribe::Context<'_, C>,
168+
) {
169+
try_lock!(self.inner.read()).on_record(span, values, ctx)
170+
}
171+
172+
#[inline]
173+
fn on_enter(&self, id: &span::Id, ctx: subscribe::Context<'_, C>) {
174+
try_lock!(self.inner.read()).on_enter(id, ctx)
175+
}
176+
177+
#[inline]
178+
fn on_exit(&self, id: &span::Id, ctx: subscribe::Context<'_, C>) {
179+
try_lock!(self.inner.read()).on_exit(id, ctx)
180+
}
181+
182+
#[inline]
183+
fn on_close(&self, id: span::Id, ctx: subscribe::Context<'_, C>) {
184+
try_lock!(self.inner.read()).on_close(id, ctx)
185+
}
186+
}
187+
188+
impl<T> Subscriber<T> {
189+
/// Wraps the given `Subscribe` or `Filter`,
190+
/// returning a subscriber or filter and a `Handle` that allows
124191
/// the inner type to be modified at runtime.
125-
pub fn new(inner: S) -> (Self, Handle<S>) {
192+
///
193+
/// [`Filter`]: crate::subscribe::Filter
194+
/// [`Subscribe`]: crate::Subscribe
195+
pub fn new(inner: T) -> (Self, Handle<T>) {
126196
let this = Self {
127197
inner: Arc::new(RwLock::new(inner)),
128198
};
@@ -131,7 +201,7 @@ impl<S> Subscriber<S> {
131201
}
132202

133203
/// Returns a `Handle` that can be used to reload the wrapped `Subscribe`.
134-
pub fn handle(&self) -> Handle<S> {
204+
pub fn handle(&self) -> Handle<T> {
135205
Handle {
136206
inner: Arc::downgrade(&self.inner),
137207
}
@@ -140,22 +210,25 @@ impl<S> Subscriber<S> {
140210

141211
// ===== impl Handle =====
142212

143-
impl<S> Handle<S> {
144-
/// Replace the current subscriber with the provided `new_subscriber`.
213+
impl<T> Handle<T> {
214+
/// Replace the current subscriber or filter with the provided `new_value`.
215+
///
216+
/// [`Handle::reload`] cannot be used with the [`Filtered`](crate::filter::Filtered)
217+
/// subscriber; use [`Handle::modify`] instead (see [this issue] for additional details).
218+
///
219+
/// However, if the _only_ the [`Filter`](crate::subscribe::Filter) needs to be modified,
220+
/// use `reload::Subscriber` to wrap the `Filter` directly.
145221
///
146-
/// **Warning:** The [`Filtered`](crate::filter::Filtered) type currently can't be changed
147-
/// at runtime via the [`Handle::reload`] method.
148-
/// Use the [`Handle::modify`] method to change the filter instead.
149-
/// (see <https://github.com/tokio-rs/tracing/issues/1629>)
150-
pub fn reload(&self, new_subscriber: impl Into<S>) -> Result<(), Error> {
151-
self.modify(|subscriber| {
152-
*subscriber = new_subscriber.into();
222+
/// [this issue]: https://github.com/tokio-rs/tracing/issues/1629
223+
pub fn reload(&self, new_value: impl Into<T>) -> Result<(), Error> {
224+
self.modify(|object| {
225+
*object = new_value.into();
153226
})
154227
}
155228

156229
/// Invokes a closure with a mutable reference to the current subscriber,
157230
/// allowing it to be modified in place.
158-
pub fn modify(&self, f: impl FnOnce(&mut S)) -> Result<(), Error> {
231+
pub fn modify(&self, f: impl FnOnce(&mut T)) -> Result<(), Error> {
159232
let inner = self.inner.upgrade().ok_or(Error {
160233
kind: ErrorKind::CollectorGone,
161234
})?;
@@ -182,16 +255,16 @@ impl<S> Handle<S> {
182255

183256
/// Returns a clone of the subscriber's current value if it still exists.
184257
/// Otherwise, if the collector has been dropped, returns `None`.
185-
pub fn clone_current(&self) -> Option<S>
258+
pub fn clone_current(&self) -> Option<T>
186259
where
187-
S: Clone,
260+
T: Clone,
188261
{
189-
self.with_current(S::clone).ok()
262+
self.with_current(T::clone).ok()
190263
}
191264

192265
/// Invokes a closure with a borrowed reference to the current subscriber,
193266
/// returning the result (or an error if the collector no longer exists).
194-
pub fn with_current<T>(&self, f: impl FnOnce(&S) -> T) -> Result<T, Error> {
267+
pub fn with_current<T2>(&self, f: impl FnOnce(&T) -> T2) -> Result<T2, Error> {
195268
let inner = self.inner.upgrade().ok_or(Error {
196269
kind: ErrorKind::CollectorGone,
197270
})?;
@@ -200,7 +273,7 @@ impl<S> Handle<S> {
200273
}
201274
}
202275

203-
impl<S> Clone for Handle<S> {
276+
impl<T> Clone for Handle<T> {
204277
fn clone(&self) -> Self {
205278
Handle {
206279
inner: self.inner.clone(),

tracing-subscriber/tests/reload.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,17 @@ impl Collect for NopCollector {
3232
}
3333
}
3434

35+
pub struct NopSubscriber;
36+
impl<S: Collect> tracing_subscriber::Subscribe<S> for NopSubscriber {
37+
fn register_callsite(&self, _m: &Metadata<'_>) -> Interest {
38+
Interest::sometimes()
39+
}
40+
41+
fn enabled(&self, _m: &Metadata<'_>, _: subscribe::Context<'_, S>) -> bool {
42+
true
43+
}
44+
}
45+
3546
#[test]
3647
fn reload_handle() {
3748
static FILTER1_CALLS: AtomicUsize = AtomicUsize::new(0);
@@ -82,3 +93,51 @@ fn reload_handle() {
8293
assert_eq!(FILTER2_CALLS.load(Ordering::SeqCst), 1);
8394
})
8495
}
96+
97+
#[test]
98+
fn reload_filter() {
99+
static FILTER1_CALLS: AtomicUsize = AtomicUsize::new(0);
100+
static FILTER2_CALLS: AtomicUsize = AtomicUsize::new(0);
101+
102+
enum Filter {
103+
One,
104+
Two,
105+
}
106+
107+
impl<S: Collect> tracing_subscriber::subscribe::Filter<S> for Filter {
108+
fn enabled(&self, m: &Metadata<'_>, _: &subscribe::Context<'_, S>) -> bool {
109+
println!("ENABLED: {:?}", m);
110+
match self {
111+
Filter::One => FILTER1_CALLS.fetch_add(1, Ordering::SeqCst),
112+
Filter::Two => FILTER2_CALLS.fetch_add(1, Ordering::SeqCst),
113+
};
114+
true
115+
}
116+
}
117+
fn event() {
118+
tracing::trace!("my event");
119+
}
120+
121+
let (filter, handle) = Subscriber::new(Filter::One);
122+
123+
let dispatcher = tracing_core::dispatch::Dispatch::new(
124+
tracing_subscriber::registry().with(NopSubscriber.with_filter(filter)),
125+
);
126+
127+
tracing_core::dispatch::with_default(&dispatcher, || {
128+
assert_eq!(FILTER1_CALLS.load(Ordering::SeqCst), 0);
129+
assert_eq!(FILTER2_CALLS.load(Ordering::SeqCst), 0);
130+
131+
event();
132+
133+
assert_eq!(FILTER1_CALLS.load(Ordering::SeqCst), 1);
134+
assert_eq!(FILTER2_CALLS.load(Ordering::SeqCst), 0);
135+
136+
handle.reload(Filter::Two).expect("should reload");
137+
138+
event();
139+
140+
assert_eq!(FILTER1_CALLS.load(Ordering::SeqCst), 1);
141+
assert_eq!(FILTER2_CALLS.load(Ordering::SeqCst), 1);
142+
})
143+
}

0 commit comments

Comments
 (0)