Skip to content
48 changes: 48 additions & 0 deletions tokio-trace/tests/subscriber.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#[macro_use]
extern crate tokio_trace;
use tokio_trace::{
span,
subscriber::{with_default, Interest, Subscriber},
Event, Level, Metadata,
};

#[test]
fn event_macros_dont_infinite_loop() {
// This test ensures that an event macro within a subscriber
// won't cause an infinite loop of events.
struct TestSubscriber;
impl Subscriber for TestSubscriber {
fn register_callsite(&self, _: &Metadata) -> Interest {
// Always return sometimes so that `enabled` will be called
// (which can loop).
Interest::sometimes()
}

fn enabled(&self, meta: &Metadata) -> bool {
assert!(meta.fields().iter().any(|f| f.name() == "foo"));
event!(Level::TRACE, bar = false);
true
}

fn new_span(&self, _: &span::Attributes) -> span::Id {
span::Id::from_u64(0xAAAA)
}

fn record(&self, _: &span::Id, _: &span::Record) {}

fn record_follows_from(&self, _: &span::Id, _: &span::Id) {}

fn event(&self, event: &Event) {
assert!(event.metadata().fields().iter().any(|f| f.name() == "foo"));
event!(Level::TRACE, baz = false);
}

fn enter(&self, _: &span::Id) {}

fn exit(&self, _: &span::Id) {}
}

with_default(TestSubscriber, || {
event!(Level::TRACE, foo = false);
})
}
224 changes: 199 additions & 25 deletions tokio-trace/tokio-trace-core/src/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Dispatches trace events to `Subscriber`s.
//! Dispatches trace events to `Subscriber`s.c
use {
callsite, span,
subscriber::{self, Subscriber},
Expand All @@ -7,7 +7,7 @@ use {

use std::{
any::Any,
cell::RefCell,
cell::{Cell, RefCell},
fmt,
sync::{Arc, Weak},
};
Expand All @@ -21,9 +21,30 @@ pub struct Dispatch {
}

thread_local! {
static CURRENT_DISPATCH: RefCell<Dispatch> = RefCell::new(Dispatch::none());
static CURRENT_STATE: State = State {
default: RefCell::new(Dispatch::none()),
can_enter: Cell::new(true),
};
}

/// The dispatch state of a thread.
struct State {
/// This thread's current default dispatcher.
default: RefCell<Dispatch>,
/// Whether or not we can currently begin dispatching a trace event.
///
/// This is set to `false` when functions such as `enter`, `exit`, `event`,
/// and `new_span` are called on this thread's default dispatcher, to
/// prevent further trace events triggered inside those functions from
/// creating an infinite recursion. When we finish handling a dispatch, this
/// is set back to `true`.
can_enter: Cell<bool>,
}

/// A guard that resets the current default dispatcher to the prior
/// default dispatcher when dropped.
struct ResetGuard(Option<Dispatch>);

/// Sets this dispatch as the default for the duration of a closure.
///
/// The default dispatcher is used when creating a new [span] or
Expand All @@ -35,41 +56,52 @@ thread_local! {
/// [`Subscriber`]: ../subscriber/trait.Subscriber.html
/// [`Event`]: ../event/struct.Event.html
pub fn with_default<T>(dispatcher: &Dispatch, f: impl FnOnce() -> T) -> T {
// A drop guard that resets CURRENT_DISPATCH to the prior dispatcher.
// Using this (rather than simply resetting after calling `f`) ensures
// that we always reset to the prior dispatcher even if `f` panics.
struct ResetGuard(Option<Dispatch>);
impl Drop for ResetGuard {
fn drop(&mut self) {
if let Some(dispatch) = self.0.take() {
let _ = CURRENT_DISPATCH.try_with(|current| {
*current.borrow_mut() = dispatch;
});
}
}
}

let dispatcher = dispatcher.clone();
let prior = CURRENT_DISPATCH.try_with(|current| current.replace(dispatcher));
let _guard = ResetGuard(prior.ok());
// When this guard is dropped, the default dispatcher will be reset to the
// prior default. Using this (rather than simply resetting after calling
// `f`) ensures that we always reset to the prior dispatcher even if `f`
// panics.
let _guard = State::set_default(dispatcher.clone());
f()
}
/// Executes a closure with a reference to this thread's current [dispatcher].
///
/// Note that calls to `get_default` should not be nested; if this function is
/// called while inside of another `get_default`, that closure will be provided
/// with `Dispatch::none` rather than the previously set dispatcher.
///
/// [dispatcher]: ../dispatcher/struct.Dispatch.html
pub fn get_default<T, F>(mut f: F) -> T
where
F: FnMut(&Dispatch) -> T,
{
CURRENT_DISPATCH
.try_with(|current| f(&*current.borrow()))
// While this guard is active, additional calls to subscriber functions on
// the default dispatcher will not be able to access the dispatch context.
// Dropping the guard will allow the dispatch context to be re-entered.
struct Entered<'a>(&'a Cell<bool>);
impl<'a> Drop for Entered<'a> {
#[inline]
fn drop(&mut self) {
self.0.set(true);
}
}

CURRENT_STATE
.try_with(|state| {
if state.can_enter.replace(false) {
let _guard = Entered(&state.can_enter);
f(&state.default.borrow())
} else {
f(&Dispatch::none())
}
})
.unwrap_or_else(|_| f(&Dispatch::none()))
}

pub(crate) struct Registrar(Weak<Subscriber + Send + Sync>);

impl Dispatch {
/// Returns a new `Dispatch` that discards events and spans.
#[inline]
pub fn none() -> Self {
Dispatch {
subscriber: Arc::new(NoSubscriber),
Expand Down Expand Up @@ -173,7 +205,7 @@ impl Dispatch {
self.subscriber.event(event)
}

/// Records that a span has been entered.
/// Records that a span has been can_enter.
///
/// This calls the [`enter`] function on the [`Subscriber`] that this
/// `Dispatch` forwards to.
Expand All @@ -182,7 +214,7 @@ impl Dispatch {
/// [`event`]: ../subscriber/trait.Subscriber.html#method.event
#[inline]
pub fn enter(&self, span: &span::Id) {
self.subscriber.enter(span)
self.subscriber.enter(span);
}

/// Records that a span has been exited.
Expand All @@ -191,7 +223,7 @@ impl Dispatch {
/// that this `Dispatch` forwards to.
#[inline]
pub fn exit(&self, span: &span::Id) {
self.subscriber.exit(span)
self.subscriber.exit(span);
}

/// Notifies the subscriber that a [span ID] has been cloned.
Expand Down Expand Up @@ -295,9 +327,50 @@ impl Registrar {
}
}

// ===== impl State =====

impl State {
/// Replaces the current default dispatcher on this thread with the provided
/// dispatcher.Any
///
/// Dropping the returned `ResetGuard` will reset the default dispatcher to
/// the previous value.
#[inline]
fn set_default(new_dispatch: Dispatch) -> ResetGuard {
let prior = CURRENT_STATE
.try_with(|state| {
state.can_enter.set(true);
state.default.replace(new_dispatch)
})
.ok();
ResetGuard(prior)
}
}

// ===== impl ResetGuard =====

impl Drop for ResetGuard {
#[inline]
fn drop(&mut self) {
if let Some(dispatch) = self.0.take() {
let _ = CURRENT_STATE.try_with(|state| {
*state.default.borrow_mut() = dispatch;
});
}
}
}

#[cfg(test)]
mod test {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use {
callsite::Callsite,
metadata::{Kind, Level, Metadata},
span,
subscriber::{Interest, Subscriber},
Event,
};

#[test]
fn dispatch_is() {
Expand All @@ -310,4 +383,105 @@ mod test {
let dispatcher = Dispatch::new(NoSubscriber);
assert!(dispatcher.downcast_ref::<NoSubscriber>().is_some());
}

struct TestCallsite;
static TEST_CALLSITE: TestCallsite = TestCallsite;
static TEST_META: Metadata<'static> = metadata! {
name: "test",
target: module_path!(),
level: Level::DEBUG,
fields: &[],
callsite: &TEST_CALLSITE,
kind: Kind::EVENT
};

impl Callsite for TestCallsite {
fn set_interest(&self, _: Interest) {}
fn metadata(&self) -> &Metadata {
&TEST_META
}
}

#[test]
fn events_dont_infinite_loop() {
// This test ensures that an event triggered within a subscriber
// won't cause an infinite loop of events.
struct TestSubscriber;
impl Subscriber for TestSubscriber {
fn enabled(&self, _: &Metadata) -> bool {
true
}

fn new_span(&self, _: &span::Attributes) -> span::Id {
span::Id::from_u64(0xAAAA)
}

fn record(&self, _: &span::Id, _: &span::Record) {}

fn record_follows_from(&self, _: &span::Id, _: &span::Id) {}

fn event(&self, _: &Event) {
static EVENTS: AtomicUsize = AtomicUsize::new(0);
assert_eq!(
EVENTS.fetch_add(1, Ordering::Relaxed),
0,
"event method called twice!"
);
Event::dispatch(&TEST_META, &TEST_META.fields().value_set(&[]))
}

fn enter(&self, _: &span::Id) {}

fn exit(&self, _: &span::Id) {}
}

with_default(&Dispatch::new(TestSubscriber), || {
Event::dispatch(&TEST_META, &TEST_META.fields().value_set(&[]))
})
}

#[test]
fn spans_dont_infinite_loop() {
// This test ensures that a span created within a subscriber
// won't cause an infinite loop of new spans.

fn mk_span() {
get_default(|current| {
current.new_span(&span::Attributes::new(
&TEST_META,
&TEST_META.fields().value_set(&[]),
))
});
}

struct TestSubscriber;
impl Subscriber for TestSubscriber {
fn enabled(&self, _: &Metadata) -> bool {
true
}

fn new_span(&self, _: &span::Attributes) -> span::Id {
static NEW_SPANS: AtomicUsize = AtomicUsize::new(0);
assert_eq!(
NEW_SPANS.fetch_add(1, Ordering::Relaxed),
0,
"new_span method called twice!"
);
mk_span();
span::Id::from_u64(0xAAAA)
}

fn record(&self, _: &span::Id, _: &span::Record) {}

fn record_follows_from(&self, _: &span::Id, _: &span::Id) {}

fn event(&self, _: &Event) {}

fn enter(&self, _: &span::Id) {}

fn exit(&self, _: &span::Id) {}
}

with_default(&Dispatch::new(TestSubscriber), || mk_span())
}
}