| 
 | 1 | +use std::any::TypeId;  | 
 | 2 | + | 
 | 3 | +use opentelemetry::{trace::TraceContextExt as _, Key, KeyValue, Value};  | 
 | 4 | +use tracing::{span, Event, Subscriber};  | 
 | 5 | +use tracing_subscriber::{  | 
 | 6 | +    layer::{Context, Filter},  | 
 | 7 | +    registry::LookupSpan,  | 
 | 8 | +    Layer,  | 
 | 9 | +};  | 
 | 10 | + | 
 | 11 | +use crate::{OtelData, OtelDataState};  | 
 | 12 | + | 
 | 13 | +use super::{OpenTelemetryLayer, SPAN_EVENT_COUNT_FIELD};  | 
 | 14 | + | 
 | 15 | +/// A layer wrapping a [`OpenTelemetryLayer`], discarding all events filtered out by a given  | 
 | 16 | +/// [`Filter`].  | 
 | 17 | +///  | 
 | 18 | +/// Only events that are not filtered out will be saved as events on the span. All events, including  | 
 | 19 | +/// those filtered out, will be counted and the total will be provided in the  | 
 | 20 | +/// `otel.tracing_event_count` field of the exported span.  | 
 | 21 | +///  | 
 | 22 | +/// This is useful when there is large volume of logs outputted by the application and it would be  | 
 | 23 | +/// too expensive to export all of them as span events, but it is still desirable to have  | 
 | 24 | +/// information whether there is more information in logs for the given span.  | 
 | 25 | +pub struct FilteredOpenTelemetryLayer<S, T, F> {  | 
 | 26 | +    inner: OpenTelemetryLayer<S, T>,  | 
 | 27 | +    filter: F,  | 
 | 28 | +}  | 
 | 29 | + | 
 | 30 | +impl<S, T, F> FilteredOpenTelemetryLayer<S, T, F> {  | 
 | 31 | +    pub fn map_inner<Mapper, S2, T2>(self, mapper: Mapper) -> FilteredOpenTelemetryLayer<S2, T2, F>  | 
 | 32 | +    where  | 
 | 33 | +        Mapper: FnOnce(OpenTelemetryLayer<S, T>) -> OpenTelemetryLayer<S2, T2>,  | 
 | 34 | +        F: Filter<S>,  | 
 | 35 | +    {  | 
 | 36 | +        FilteredOpenTelemetryLayer {  | 
 | 37 | +            inner: mapper(self.inner),  | 
 | 38 | +            filter: self.filter,  | 
 | 39 | +        }  | 
 | 40 | +    }  | 
 | 41 | + | 
 | 42 | +    pub fn with_filter<F2>(self, filter: F2) -> FilteredOpenTelemetryLayer<S, T, F2>  | 
 | 43 | +    where  | 
 | 44 | +        F2: Filter<S>,  | 
 | 45 | +    {  | 
 | 46 | +        FilteredOpenTelemetryLayer {  | 
 | 47 | +            inner: self.inner,  | 
 | 48 | +            filter,  | 
 | 49 | +        }  | 
 | 50 | +    }  | 
 | 51 | + | 
 | 52 | +    pub(crate) fn new(inner: OpenTelemetryLayer<S, T>, filter: F) -> Self  | 
 | 53 | +    where  | 
 | 54 | +        S: Subscriber + for<'span> LookupSpan<'span>,  | 
 | 55 | +        F: Filter<S>,  | 
 | 56 | +    {  | 
 | 57 | +        Self { inner, filter }  | 
 | 58 | +    }  | 
 | 59 | +}  | 
 | 60 | + | 
 | 61 | +struct EventCount(u32);  | 
 | 62 | + | 
 | 63 | +impl<S, T, F> Layer<S> for FilteredOpenTelemetryLayer<S, T, F>  | 
 | 64 | +where  | 
 | 65 | +    S: Subscriber + for<'lookup> LookupSpan<'lookup>,  | 
 | 66 | +    OpenTelemetryLayer<S, T>: Layer<S>,  | 
 | 67 | +    F: Filter<S> + 'static,  | 
 | 68 | +{  | 
 | 69 | +    fn on_layer(&mut self, subscriber: &mut S) {  | 
 | 70 | +        self.inner.on_layer(subscriber);  | 
 | 71 | +    }  | 
 | 72 | + | 
 | 73 | +    fn register_callsite(  | 
 | 74 | +        &self,  | 
 | 75 | +        metadata: &'static tracing::Metadata<'static>,  | 
 | 76 | +    ) -> tracing_core::Interest {  | 
 | 77 | +        self.inner.register_callsite(metadata)  | 
 | 78 | +    }  | 
 | 79 | + | 
 | 80 | +    fn enabled(&self, metadata: &tracing::Metadata<'_>, ctx: Context<'_, S>) -> bool {  | 
 | 81 | +        self.inner.enabled(metadata, ctx)  | 
 | 82 | +    }  | 
 | 83 | + | 
 | 84 | +    fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {  | 
 | 85 | +        self.inner.on_new_span(attrs, id, ctx);  | 
 | 86 | +    }  | 
 | 87 | + | 
 | 88 | +    fn on_record(&self, span: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {  | 
 | 89 | +        self.inner.on_record(span, values, ctx);  | 
 | 90 | +    }  | 
 | 91 | + | 
 | 92 | +    fn on_follows_from(&self, span: &span::Id, follows: &span::Id, ctx: Context<'_, S>) {  | 
 | 93 | +        self.inner.on_follows_from(span, follows, ctx);  | 
 | 94 | +    }  | 
 | 95 | + | 
 | 96 | +    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {  | 
 | 97 | +        let Some(span) = event.parent().and_then(|id| ctx.span(id)).or_else(|| {  | 
 | 98 | +            event  | 
 | 99 | +                .is_contextual()  | 
 | 100 | +                .then(|| ctx.lookup_current())  | 
 | 101 | +                .flatten()  | 
 | 102 | +        }) else {  | 
 | 103 | +            return;  | 
 | 104 | +        };  | 
 | 105 | + | 
 | 106 | +        {  | 
 | 107 | +            let mut extensions = span.extensions_mut();  | 
 | 108 | + | 
 | 109 | +            if let Some(count) = extensions.get_mut::<EventCount>() {  | 
 | 110 | +                count.0 += 1;  | 
 | 111 | +            } else {  | 
 | 112 | +                extensions.insert(EventCount(1));  | 
 | 113 | +            }  | 
 | 114 | +        }  | 
 | 115 | + | 
 | 116 | +        drop(span);  | 
 | 117 | + | 
 | 118 | +        if self.filter.enabled(event.metadata(), &ctx) {  | 
 | 119 | +            self.inner.on_event(event, ctx);  | 
 | 120 | +        }  | 
 | 121 | +    }  | 
 | 122 | + | 
 | 123 | +    fn on_enter(&self, id: &span::Id, ctx: Context<'_, S>) {  | 
 | 124 | +        self.inner.on_enter(id, ctx);  | 
 | 125 | +    }  | 
 | 126 | + | 
 | 127 | +    fn on_exit(&self, id: &span::Id, ctx: Context<'_, S>) {  | 
 | 128 | +        self.inner.on_exit(id, ctx);  | 
 | 129 | +    }  | 
 | 130 | + | 
 | 131 | +    fn on_close(&self, id: span::Id, ctx: Context<'_, S>) {  | 
 | 132 | +        let span = ctx.span(&id).expect("Span not found, this is a bug");  | 
 | 133 | +        let mut extensions = span.extensions_mut();  | 
 | 134 | + | 
 | 135 | +        let count = extensions.remove::<EventCount>().map_or(0, |count| count.0);  | 
 | 136 | +        if let Some(OtelData { state, end_time: _ }) = extensions.get_mut::<OtelData>() {  | 
 | 137 | +            let key_value = KeyValue::new(  | 
 | 138 | +                Key::from_static_str(SPAN_EVENT_COUNT_FIELD),  | 
 | 139 | +                Value::I64(i64::from(count)),  | 
 | 140 | +            );  | 
 | 141 | +            match state {  | 
 | 142 | +                OtelDataState::Builder {  | 
 | 143 | +                    builder,  | 
 | 144 | +                    parent_cx: _,  | 
 | 145 | +                } => {  | 
 | 146 | +                    builder.attributes.get_or_insert(Vec::new()).push(key_value);  | 
 | 147 | +                }  | 
 | 148 | +                OtelDataState::Context { current_cx } => {  | 
 | 149 | +                    let span = current_cx.span();  | 
 | 150 | +                    span.set_attribute(key_value);  | 
 | 151 | +                }  | 
 | 152 | +            }  | 
 | 153 | +        }  | 
 | 154 | + | 
 | 155 | +        drop(extensions);  | 
 | 156 | +        drop(span);  | 
 | 157 | + | 
 | 158 | +        self.inner.on_close(id, ctx);  | 
 | 159 | +    }  | 
 | 160 | + | 
 | 161 | +    fn on_id_change(&self, old: &span::Id, new: &span::Id, ctx: Context<'_, S>) {  | 
 | 162 | +        self.inner.on_id_change(old, new, ctx);  | 
 | 163 | +    }  | 
 | 164 | + | 
 | 165 | +    /// SAFETY: this is sound as long as the inner implementation is sound.  | 
 | 166 | +    unsafe fn downcast_raw(&self, id: TypeId) -> Option<*const ()> {  | 
 | 167 | +        if id == TypeId::of::<Self>() {  | 
 | 168 | +            Some(self as *const _ as *const ())  | 
 | 169 | +        } else {  | 
 | 170 | +            unsafe { self.inner.downcast_raw(id) }  | 
 | 171 | +        }  | 
 | 172 | +    }  | 
 | 173 | + | 
 | 174 | +    // `and_then`, `with_subscriber`, and `with_filter` are not implemented on purpose. Other  | 
 | 175 | +    // methods should probably be implemented manually if there are new provided methods.  | 
 | 176 | +}  | 
0 commit comments