From e0ffbda31b1d4a00e2aaffe66a7f4b44779a8812 Mon Sep 17 00:00:00 2001 From: maddeleine <59030281+maddeleine@users.noreply.github.com> Date: Mon, 23 Sep 2024 12:14:30 -0700 Subject: [PATCH] feat(s2n-quic-events): Adds events for s2n-quic-dc (#2321) --- dc/s2n-quic-dc/src/event/events.rs | 10 + dc/s2n-quic-dc/src/event/generated.rs | 537 ++++++++++++++++++++++++ quic/s2n-quic-events/src/main.rs | 569 +++++++++++++++++--------- quic/s2n-quic-events/src/parser.rs | 59 +-- 4 files changed, 965 insertions(+), 210 deletions(-) create mode 100644 dc/s2n-quic-dc/src/event/events.rs create mode 100644 dc/s2n-quic-dc/src/event/generated.rs diff --git a/dc/s2n-quic-dc/src/event/events.rs b/dc/s2n-quic-dc/src/event/events.rs new file mode 100644 index 0000000000..1e31524484 --- /dev/null +++ b/dc/s2n-quic-dc/src/event/events.rs @@ -0,0 +1,10 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +#[event("transport:frame_sent")] +/// Frame was sent +struct FrameSent { + packet_header: PacketHeader, + path_id: u64, + frame: Frame, +} diff --git a/dc/s2n-quic-dc/src/event/generated.rs b/dc/s2n-quic-dc/src/event/generated.rs new file mode 100644 index 0000000000..7639587eea --- /dev/null +++ b/dc/s2n-quic-dc/src/event/generated.rs @@ -0,0 +1,537 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +// DO NOT MODIFY THIS FILE +// This file was generated with the `s2n-quic-events` crate and any required +// changes should be made there. + +use super::*; +pub mod api { + #![doc = r" This module contains events that are emitted to the [`Subscriber`](crate::event::Subscriber)"] + use super::*; + pub use traits::Subscriber; + #[derive(Clone, Debug)] + #[non_exhaustive] + #[doc = " Frame was sent"] + pub struct FrameSent { + pub packet_header: PacketHeader, + pub path_id: u64, + pub frame: Frame, + } + impl Event for FrameSent { + const NAME: &'static str = "transport:frame_sent"; + } +} +#[cfg(feature = "event-tracing")] +pub mod tracing { + #![doc = r" This module contains event integration with [`tracing`](https://docs.rs/tracing)"] + use super::api; + #[doc = r" Emits events with [`tracing`](https://docs.rs/tracing)"] + #[derive(Clone, Debug)] + pub struct Subscriber { + client: tracing::Span, + server: tracing::Span, + } + impl Default for Subscriber { + fn default() -> Self { + let root = tracing :: span ! (target : "s2n_quic_dc" , tracing :: Level :: DEBUG , "s2n_quic_dc"); + let client = + tracing :: span ! (parent : root . id () , tracing :: Level :: DEBUG , "client"); + let server = + tracing :: span ! (parent : root . id () , tracing :: Level :: DEBUG , "server"); + Self { client, server } + } + } + impl super::Subscriber for Subscriber { + type ConnectionContext = tracing::Span; + fn create_connection_context( + &self, + meta: &api::ConnectionMeta, + _info: &api::ConnectionInfo, + ) -> Self::ConnectionContext { + let parent = match meta.endpoint_type { + api::EndpointType::Client {} => self.client.id(), + api::EndpointType::Server {} => self.server.id(), + }; + tracing :: span ! (target : "s2n_quic_dc" , parent : parent , tracing :: Level :: DEBUG , "conn" , id = meta . id) + } + #[inline] + fn on_frame_sent( + &self, + context: &Self::ConnectionContext, + _meta: &api::ConnectionMeta, + event: &api::FrameSent, + ) { + let id = context.id(); + let api::FrameSent { + packet_header, + path_id, + frame, + } = event; + tracing :: event ! (target : "frame_sent" , parent : id , tracing :: Level :: DEBUG , packet_header = tracing :: field :: debug (packet_header) , path_id = tracing :: field :: debug (path_id) , frame = tracing :: field :: debug (frame)); + } + } +} +pub mod builder { + use super::*; + #[derive(Clone, Debug)] + #[doc = " Frame was sent"] + pub struct FrameSent { + pub packet_header: PacketHeader, + pub path_id: u64, + pub frame: Frame, + } + impl IntoEvent for FrameSent { + #[inline] + fn into_event(self) -> api::FrameSent { + let FrameSent { + packet_header, + path_id, + frame, + } = self; + api::FrameSent { + packet_header: packet_header.into_event(), + path_id: path_id.into_event(), + frame: frame.into_event(), + } + } + } +} +pub use traits::*; +mod traits { + use super::*; + use crate::query; + use api::*; + use core::fmt; + #[doc = r" Provides metadata related to an event"] + pub trait Meta: fmt::Debug { + #[doc = r" Returns whether the local endpoint is a Client or Server"] + fn endpoint_type(&self) -> &EndpointType; + #[doc = r" A context from which the event is being emitted"] + #[doc = r""] + #[doc = r" An event can occur in the context of an Endpoint or Connection"] + fn subject(&self) -> Subject; + #[doc = r" The time the event occurred"] + fn timestamp(&self) -> &crate::event::Timestamp; + } + impl Meta for ConnectionMeta { + fn endpoint_type(&self) -> &EndpointType { + &self.endpoint_type + } + fn subject(&self) -> Subject { + Subject::Connection { id: self.id } + } + fn timestamp(&self) -> &crate::event::Timestamp { + &self.timestamp + } + } + impl Meta for EndpointMeta { + fn endpoint_type(&self) -> &EndpointType { + &self.endpoint_type + } + fn subject(&self) -> Subject { + Subject::Endpoint {} + } + fn timestamp(&self) -> &crate::event::Timestamp { + &self.timestamp + } + } + #[doc = r" Allows for events to be subscribed to"] + pub trait Subscriber: 'static + Send { + #[doc = r" An application provided type associated with each connection."] + #[doc = r""] + #[doc = r" The context provides a mechanism for applications to provide a custom type"] + #[doc = r" and update it on each event, e.g. computing statistics. Each event"] + #[doc = r" invocation (e.g. [`Subscriber::on_packet_sent`]) also provides mutable"] + #[doc = r" access to the context `&mut ConnectionContext` and allows for updating the"] + #[doc = r" context."] + #[doc = r""] + #[doc = r" ```no_run"] + #[doc = r" # mod s2n_quic { pub mod provider { pub mod event {"] + #[doc = r" # pub use s2n_quic_core::event::{api as events, api::ConnectionInfo, api::ConnectionMeta, Subscriber};"] + #[doc = r" # }}}"] + #[doc = r" use s2n_quic::provider::event::{"] + #[doc = r" ConnectionInfo, ConnectionMeta, Subscriber, events::PacketSent"] + #[doc = r" };"] + #[doc = r""] + #[doc = r" pub struct MyEventSubscriber;"] + #[doc = r""] + #[doc = r" pub struct MyEventContext {"] + #[doc = r" packet_sent: u64,"] + #[doc = r" }"] + #[doc = r""] + #[doc = r" impl Subscriber for MyEventSubscriber {"] + #[doc = r" type ConnectionContext = MyEventContext;"] + #[doc = r""] + #[doc = r" fn create_connection_context("] + #[doc = r" &mut self, _meta: &ConnectionMeta,"] + #[doc = r" _info: &ConnectionInfo,"] + #[doc = r" ) -> Self::ConnectionContext {"] + #[doc = r" MyEventContext { packet_sent: 0 }"] + #[doc = r" }"] + #[doc = r""] + #[doc = r" fn on_packet_sent("] + #[doc = r" &mut self,"] + #[doc = r" context: &mut Self::ConnectionContext,"] + #[doc = r" _meta: &ConnectionMeta,"] + #[doc = r" _event: &PacketSent,"] + #[doc = r" ) {"] + #[doc = r" context.packet_sent += 1;"] + #[doc = r" }"] + #[doc = r" }"] + #[doc = r" ```"] + type ConnectionContext: 'static + Send; + #[doc = r" Creates a context to be passed to each connection-related event"] + fn create_connection_context( + &self, + meta: &ConnectionMeta, + info: &ConnectionInfo, + ) -> Self::ConnectionContext; + #[doc = "Called when the `FrameSent` event is triggered"] + #[inline] + fn on_frame_sent( + &self, + context: &Self::ConnectionContext, + meta: &ConnectionMeta, + event: &FrameSent, + ) { + let _ = context; + let _ = meta; + let _ = event; + } + #[doc = r" Called for each event that relates to the endpoint and all connections"] + #[inline] + fn on_event(&self, meta: &M, event: &E) { + let _ = meta; + let _ = event; + } + #[doc = r" Called for each event that relates to a connection"] + #[inline] + fn on_connection_event( + &self, + context: &Self::ConnectionContext, + meta: &ConnectionMeta, + event: &E, + ) { + let _ = context; + let _ = meta; + let _ = event; + } + #[doc = r" Used for querying the `Subscriber::ConnectionContext` on a Subscriber"] + #[inline] + fn query( + context: &Self::ConnectionContext, + query: &mut dyn query::Query, + ) -> query::ControlFlow { + query.execute(context) + } + } + #[doc = r" Subscriber is implemented for a 2-element tuple to make it easy to compose multiple"] + #[doc = r" subscribers."] + impl Subscriber for (A, B) + where + A: Subscriber, + B: Subscriber, + { + type ConnectionContext = (A::ConnectionContext, B::ConnectionContext); + #[inline] + fn create_connection_context( + &self, + meta: &ConnectionMeta, + info: &ConnectionInfo, + ) -> Self::ConnectionContext { + ( + self.0.create_connection_context(meta, info), + self.1.create_connection_context(meta, info), + ) + } + #[inline] + fn on_frame_sent( + &self, + context: &Self::ConnectionContext, + meta: &ConnectionMeta, + event: &FrameSent, + ) { + (self.0).on_frame_sent(&context.0, meta, event); + (self.1).on_frame_sent(&context.1, meta, event); + } + #[inline] + fn on_event(&self, meta: &M, event: &E) { + self.0.on_event(meta, event); + self.1.on_event(meta, event); + } + #[inline] + fn on_connection_event( + &self, + context: &Self::ConnectionContext, + meta: &ConnectionMeta, + event: &E, + ) { + self.0.on_connection_event(&context.0, meta, event); + self.1.on_connection_event(&context.1, meta, event); + } + #[inline] + fn query( + context: &Self::ConnectionContext, + query: &mut dyn query::Query, + ) -> query::ControlFlow { + query + .execute(context) + .and_then(|| A::query(&context.0, query)) + .and_then(|| B::query(&context.1, query)) + } + } + pub trait EndpointPublisher { + #[doc = r" Returns the QUIC version, if any"] + fn quic_version(&self) -> Option; + } + pub struct EndpointPublisherSubscriber<'a, Sub: Subscriber> { + meta: EndpointMeta, + quic_version: Option, + subscriber: &'a Sub, + } + impl<'a, Sub: Subscriber> fmt::Debug for EndpointPublisherSubscriber<'a, Sub> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("ConnectionPublisherSubscriber") + .field("meta", &self.meta) + .field("quic_version", &self.quic_version) + .finish() + } + } + impl<'a, Sub: Subscriber> EndpointPublisherSubscriber<'a, Sub> { + #[inline] + pub fn new( + meta: builder::EndpointMeta, + quic_version: Option, + subscriber: &'a Sub, + ) -> Self { + Self { + meta: meta.into_event(), + quic_version, + subscriber, + } + } + } + impl<'a, Sub: Subscriber> EndpointPublisher for EndpointPublisherSubscriber<'a, Sub> { + #[inline] + fn quic_version(&self) -> Option { + self.quic_version + } + } + pub trait ConnectionPublisher { + #[doc = "Publishes a `FrameSent` event to the publisher's subscriber"] + fn on_frame_sent(&self, event: builder::FrameSent); + #[doc = r" Returns the QUIC version negotiated for the current connection, if any"] + fn quic_version(&self) -> u32; + #[doc = r" Returns the [`Subject`] for the current publisher"] + fn subject(&self) -> Subject; + } + pub struct ConnectionPublisherSubscriber<'a, Sub: Subscriber> { + meta: ConnectionMeta, + quic_version: u32, + subscriber: &'a Sub, + context: &'a Sub::ConnectionContext, + } + impl<'a, Sub: Subscriber> fmt::Debug for ConnectionPublisherSubscriber<'a, Sub> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("ConnectionPublisherSubscriber") + .field("meta", &self.meta) + .field("quic_version", &self.quic_version) + .finish() + } + } + impl<'a, Sub: Subscriber> ConnectionPublisherSubscriber<'a, Sub> { + #[inline] + pub fn new( + meta: builder::ConnectionMeta, + quic_version: u32, + subscriber: &'a Sub, + context: &'a Sub::ConnectionContext, + ) -> Self { + Self { + meta: meta.into_event(), + quic_version, + subscriber, + context, + } + } + } + impl<'a, Sub: Subscriber> ConnectionPublisher for ConnectionPublisherSubscriber<'a, Sub> { + #[inline] + fn on_frame_sent(&self, event: builder::FrameSent) { + let event = event.into_event(); + self.subscriber + .on_frame_sent(self.context, &self.meta, &event); + self.subscriber + .on_connection_event(self.context, &self.meta, &event); + self.subscriber.on_event(&self.meta, &event); + } + #[inline] + fn quic_version(&self) -> u32 { + self.quic_version + } + #[inline] + fn subject(&self) -> api::Subject { + self.meta.subject() + } + } +} +#[cfg(any(test, feature = "testing"))] +pub mod testing { + use super::*; + use core::sync::atomic::{AtomicU32, Ordering}; + use std::sync::{Arc, Mutex}; + #[derive(Clone, Debug)] + pub struct Subscriber { + location: Option, + output: Arc>>, + pub frame_sent: Arc, + } + impl Drop for Subscriber { + fn drop(&mut self) { + if std::thread::panicking() { + return; + } + if let Some(location) = self.location.as_ref() { + location.snapshot(&self.output.lock().unwrap()); + } + } + } + impl Subscriber { + #[doc = r" Creates a subscriber with snapshot assertions enabled"] + #[track_caller] + pub fn snapshot() -> Self { + let mut sub = Self::no_snapshot(); + sub.location = Location::try_new(); + sub + } + #[doc = r" Creates a subscriber with snapshot assertions disabled"] + pub fn no_snapshot() -> Self { + Self { + location: None, + output: Default::default(), + frame_sent: Arc::new(AtomicU32::new(0)), + } + } + } + impl super::Subscriber for Subscriber { + type ConnectionContext = (); + fn create_connection_context( + &self, + _meta: &api::ConnectionMeta, + _info: &api::ConnectionInfo, + ) -> Self::ConnectionContext { + } + fn on_frame_sent( + &self, + _context: &Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::FrameSent, + ) { + self.frame_sent.fetch_add(1, Ordering::SeqCst); + if self.location.is_some() { + self.output + .lock() + .unwrap() + .push(format!("{meta:?} {event:?}")); + } + } + } + #[derive(Clone, Debug)] + pub struct Publisher { + location: Option, + output: Arc>>, + pub frame_sent: Arc, + } + impl Publisher { + #[doc = r" Creates a publisher with snapshot assertions enabled"] + #[track_caller] + pub fn snapshot() -> Self { + let mut sub = Self::no_snapshot(); + sub.location = Location::try_new(); + sub + } + #[doc = r" Creates a publisher with snapshot assertions disabled"] + pub fn no_snapshot() -> Self { + Self { + location: None, + output: Default::default(), + frame_sent: Arc::new(AtomicU32::new(0)), + } + } + } + impl super::EndpointPublisher for Publisher { + fn quic_version(&self) -> Option { + Some(1) + } + } + impl super::ConnectionPublisher for Publisher { + fn on_frame_sent(&self, event: builder::FrameSent) { + self.frame_sent.fetch_add(1, Ordering::SeqCst); + let event = event.into_event(); + if self.location.is_some() { + self.output.lock().unwrap().push(format!("{event:?}")); + } + } + fn quic_version(&self) -> u32 { + 1 + } + fn subject(&self) -> api::Subject { + api::Subject::Connection { id: 0 } + } + } + impl Drop for Publisher { + fn drop(&mut self) { + if std::thread::panicking() { + return; + } + if let Some(location) = self.location.as_ref() { + location.snapshot(&self.output.lock().unwrap()); + } + } + } + #[derive(Clone, Debug)] + struct Location(&'static core::panic::Location<'static>); + impl Location { + #[track_caller] + fn try_new() -> Option { + let thread = std::thread::current(); + if thread.name().map_or(false, |name| name != "main") { + Some(Self(core::panic::Location::caller())) + } else { + None + } + } + fn snapshot(&self, output: &[String]) { + if cfg!(miri) { + return; + } + use std::path::{Component, Path}; + let value = output.join("\n"); + let thread = std::thread::current(); + let function_name = thread.name().unwrap(); + let test_path = Path::new(self.0.file().trim_end_matches(".rs")); + let module_path = test_path + .components() + .filter_map(|comp| match comp { + Component::Normal(comp) => comp.to_str(), + _ => Some("_"), + }) + .chain(Some("events")) + .collect::>() + .join("::"); + let current_dir = std::env::current_dir().unwrap(); + insta::_macro_support::assert_snapshot( + insta::_macro_support::AutoName.into(), + &value, + current_dir.to_str().unwrap(), + function_name, + &module_path, + self.0.file(), + self.0.line(), + "", + ) + .unwrap() + } + } +} diff --git a/quic/s2n-quic-events/src/main.rs b/quic/s2n-quic-events/src/main.rs index 21d4742e7a..97387e2167 100644 --- a/quic/s2n-quic-events/src/main.rs +++ b/quic/s2n-quic-events/src/main.rs @@ -9,6 +9,279 @@ type Result = core::result::Result; mod parser; +#[derive(Debug, Default)] +enum OutputMode { + Ref, + #[default] + Mut, +} + +impl OutputMode { + fn receiver(&self) -> TokenStream { + match self { + OutputMode::Ref => quote!(), + OutputMode::Mut => quote!(mut), + } + } + fn counter_type(&self) -> TokenStream { + match self { + OutputMode::Ref => quote!(Arc), + OutputMode::Mut => quote!(u32), + } + } + + fn counter_init(&self) -> TokenStream { + match self { + OutputMode::Ref => quote!(Arc::new(AtomicU32::new(0))), + OutputMode::Mut => quote!(0), + } + } + + fn counter_increment(&self) -> TokenStream { + match self { + OutputMode::Ref => quote!(.fetch_add(1, Ordering::SeqCst)), + OutputMode::Mut => quote!(+= 1), + } + } + + fn lock(&self) -> TokenStream { + match self { + OutputMode::Ref => quote!(.lock().unwrap()), + OutputMode::Mut => quote!(), + } + } + + fn imports(&self) -> TokenStream { + match self { + OutputMode::Ref => quote!( + use core::sync::atomic::{AtomicU32, Ordering}; + use std::sync::{Arc, Mutex}; + ), + OutputMode::Mut => quote!(), + } + } + + fn testing_output_type(&self) -> TokenStream { + match self { + OutputMode::Ref => quote!(Arc>>), + OutputMode::Mut => quote!(Vec), + } + } + + fn target_crate(&self) -> TokenStream { + match self { + OutputMode::Ref => quote!("s2n_quic_dc"), + OutputMode::Mut => quote!("s2n_quic"), + } + } + + fn query_mut(&self) -> TokenStream { + match self { + OutputMode::Ref => quote!(), + OutputMode::Mut => quote!( + /// Used for querying and mutating the `Subscriber::ConnectionContext` on a Subscriber + #[inline] + fn query_mut( + context: &mut Self::ConnectionContext, + query: &mut dyn query::QueryMut, + ) -> query::ControlFlow { + query.execute_mut(context) + } + ), + } + } + + fn query_mut_tuple(&self) -> TokenStream { + match self { + OutputMode::Ref => quote!(), + OutputMode::Mut => quote!( + #[inline] + fn query_mut( + context: &mut Self::ConnectionContext, + query: &mut dyn query::QueryMut, + ) -> query::ControlFlow { + query + .execute_mut(context) + .and_then(|| A::query_mut(&mut context.0, query)) + .and_then(|| B::query_mut(&mut context.1, query)) + } + ), + } + } + + fn supervisor(&self) -> TokenStream { + match self { + OutputMode::Ref => quote!(), + OutputMode::Mut => quote!( + pub mod supervisor { + //! This module contains the `supervisor::Outcome` and `supervisor::Context` for use + //! when implementing [`Subscriber::supervisor_timeout`](crate::event::Subscriber::supervisor_timeout) and + //! [`Subscriber::on_supervisor_timeout`](crate::event::Subscriber::on_supervisor_timeout) + //! on a Subscriber. + + use crate::{ + application, + event::{builder::SocketAddress, IntoEvent}, + }; + + #[non_exhaustive] + #[derive(Clone, Debug, Eq, PartialEq)] + pub enum Outcome { + /// Allow the connection to remain open + Continue, + + /// Close the connection and notify the peer + Close { error_code: application::Error }, + + /// Close the connection without notifying the peer + ImmediateClose { reason: &'static str }, + } + + impl Default for Outcome { + fn default() -> Self { + Self::Continue + } + } + + #[non_exhaustive] + #[derive(Debug)] + pub struct Context<'a> { + /// Number of handshakes that have begun but not completed + pub inflight_handshakes: usize, + + /// Number of open connections + pub connection_count: usize, + + /// The address of the peer + pub remote_address: SocketAddress<'a>, + + /// True if the connection is in the handshake state, false otherwise + pub is_handshaking: bool, + } + + impl<'a> Context<'a> { + pub fn new( + inflight_handshakes: usize, + connection_count: usize, + remote_address: &'a crate::inet::SocketAddress, + is_handshaking: bool, + ) -> Self { + Self { + inflight_handshakes, + connection_count, + remote_address: remote_address.into_event(), + is_handshaking, + } + } + } + } + ), + } + } + + fn supervisor_timeout(&self) -> TokenStream { + match self { + OutputMode::Ref => quote!(), + OutputMode::Mut => quote!( + /// The period at which `on_supervisor_timeout` is called + /// + /// If multiple `event::Subscriber`s are composed together, the minimum `supervisor_timeout` + /// across all `event::Subscriber`s will be used. + /// + /// If the `supervisor_timeout()` is `None` across all `event::Subscriber`s, connection supervision + /// will cease for the remaining lifetime of the connection and `on_supervisor_timeout` will no longer + /// be called. + /// + /// It is recommended to avoid setting this value less than ~100ms, as short durations + /// may lead to higher CPU utilization. + #[allow(unused_variables)] + fn supervisor_timeout( + &mut self, + conn_context: &mut Self::ConnectionContext, + meta: &ConnectionMeta, + context: &supervisor::Context, + ) -> Option { + None + } + + /// Called for each `supervisor_timeout` to determine any action to take on the connection based on the `supervisor::Outcome` + /// + /// If multiple `event::Subscriber`s are composed together, the minimum `supervisor_timeout` + /// across all `event::Subscriber`s will be used, and thus `on_supervisor_timeout` may be called + /// earlier than the `supervisor_timeout` for a given `event::Subscriber` implementation. + #[allow(unused_variables)] + fn on_supervisor_timeout( + &mut self, + conn_context: &mut Self::ConnectionContext, + meta: &ConnectionMeta, + context: &supervisor::Context, + ) -> supervisor::Outcome { + supervisor::Outcome::default() + } + ), + } + } + + fn supervisor_timeout_tuple(&self) -> TokenStream { + match self { + OutputMode::Ref => quote!(), + OutputMode::Mut => quote!( + #[inline] + fn supervisor_timeout( + &mut self, + conn_context: &mut Self::ConnectionContext, + meta: &ConnectionMeta, + context: &supervisor::Context, + ) -> Option { + let timeout_a = self + .0 + .supervisor_timeout(&mut conn_context.0, meta, context); + let timeout_b = self + .1 + .supervisor_timeout(&mut conn_context.1, meta, context); + match (timeout_a, timeout_b) { + (None, None) => None, + (None, Some(timeout)) | (Some(timeout), None) => Some(timeout), + (Some(a), Some(b)) => Some(a.min(b)), + } + } + + #[inline] + fn on_supervisor_timeout( + &mut self, + conn_context: &mut Self::ConnectionContext, + meta: &ConnectionMeta, + context: &supervisor::Context, + ) -> supervisor::Outcome { + let outcome_a = + self.0 + .on_supervisor_timeout(&mut conn_context.0, meta, context); + let outcome_b = + self.1 + .on_supervisor_timeout(&mut conn_context.1, meta, context); + match (outcome_a, outcome_b) { + (supervisor::Outcome::ImmediateClose { reason }, _) + | (_, supervisor::Outcome::ImmediateClose { reason }) => { + supervisor::Outcome::ImmediateClose { reason } + } + (supervisor::Outcome::Close { error_code }, _) + | (_, supervisor::Outcome::Close { error_code }) => { + supervisor::Outcome::Close { error_code } + } + _ => supervisor::Outcome::Continue, + } + } + ), + } + } +} + +impl ToTokens for OutputMode { + fn to_tokens(&self, tokens: &mut TokenStream) { + tokens.extend(self.receiver()); + } +} + #[derive(Debug, Default)] struct Output { pub subscriber: TokenStream, @@ -26,6 +299,7 @@ struct Output { pub endpoint_publisher_testing: TokenStream, pub connection_publisher_testing: TokenStream, pub extra: TokenStream, + pub mode: OutputMode, } impl ToTokens for Output { @@ -46,8 +320,19 @@ impl ToTokens for Output { endpoint_publisher_testing, connection_publisher_testing, extra, + mode, } = self; + let imports = self.mode.imports(); + let testing_output_type = self.mode.testing_output_type(); + let lock = self.mode.lock(); + let target_crate = self.mode.target_crate(); + let supervisor = self.mode.supervisor(); + let supervisor_timeout = self.mode.supervisor_timeout(); + let supervisor_timeout_tuple = self.mode.supervisor_timeout_tuple(); + let query_mut = self.mode.query_mut(); + let query_mut_tuple = self.mode.query_mut_tuple(); + tokens.extend(quote!( use super::*; @@ -76,7 +361,7 @@ impl ToTokens for Output { impl Default for Subscriber { fn default() -> Self { - let root = tracing::span!(target: "s2n_quic", tracing::Level::DEBUG, "s2n_quic"); + let root = tracing::span!(target: #target_crate, tracing::Level::DEBUG, #target_crate); let client = tracing::span!(parent: root.id(), tracing::Level::DEBUG, "client"); let server = tracing::span!(parent: root.id(), tracing::Level::DEBUG, "server"); @@ -90,7 +375,7 @@ impl ToTokens for Output { impl super::Subscriber for Subscriber { type ConnectionContext = tracing::Span; - fn create_connection_context(&mut self, meta: &api::ConnectionMeta, _info: &api::ConnectionInfo) -> Self::ConnectionContext { + fn create_connection_context(&#mode self, meta: &api::ConnectionMeta, _info: &api::ConnectionInfo) -> Self::ConnectionContext { let parent = match meta.endpoint_type { api::EndpointType::Client {} => { self.client.id() @@ -99,7 +384,7 @@ impl ToTokens for Output { self.server.id() } }; - tracing::span!(target: "s2n_quic", parent: parent, tracing::Level::DEBUG, "conn", id = meta.id) + tracing::span!(target: #target_crate, parent: parent, tracing::Level::DEBUG, "conn", id = meta.id) } #tracing_subscriber @@ -112,68 +397,7 @@ impl ToTokens for Output { #builders } - pub mod supervisor { - //! This module contains the `supervisor::Outcome` and `supervisor::Context` for use - //! when implementing [`Subscriber::supervisor_timeout`](crate::event::Subscriber::supervisor_timeout) and - //! [`Subscriber::on_supervisor_timeout`](crate::event::Subscriber::on_supervisor_timeout) - //! on a Subscriber. - - use crate::{ - application, - event::{builder::SocketAddress, IntoEvent}, - }; - - #[non_exhaustive] - #[derive(Clone, Debug, Eq, PartialEq)] - pub enum Outcome { - /// Allow the connection to remain open - Continue, - - /// Close the connection and notify the peer - Close {error_code: application::Error}, - - /// Close the connection without notifying the peer - ImmediateClose {reason: &'static str}, - } - - impl Default for Outcome { - fn default() -> Self { - Self::Continue - } - } - - #[non_exhaustive] - #[derive(Debug)] - pub struct Context<'a> { - /// Number of handshakes that have begun but not completed - pub inflight_handshakes: usize, - - /// Number of open connections - pub connection_count: usize, - - /// The address of the peer - pub remote_address: SocketAddress<'a>, - - /// True if the connection is in the handshake state, false otherwise - pub is_handshaking: bool, - } - - impl<'a> Context<'a> { - pub fn new( - inflight_handshakes: usize, - connection_count: usize, - remote_address: &'a crate::inet::SocketAddress, - is_handshaking: bool, - ) -> Self { - Self { - inflight_handshakes, - connection_count, - remote_address: remote_address.into_event(), - is_handshaking, - } - } - } - } + #supervisor pub use traits::*; mod traits { @@ -272,46 +496,22 @@ impl ToTokens for Output { type ConnectionContext: 'static + Send; /// Creates a context to be passed to each connection-related event - fn create_connection_context(&mut self, meta: &ConnectionMeta, info: &ConnectionInfo) -> Self::ConnectionContext; + fn create_connection_context(&#mode self, meta: &ConnectionMeta, info: &ConnectionInfo) -> Self::ConnectionContext; - /// The period at which `on_supervisor_timeout` is called - /// - /// If multiple `event::Subscriber`s are composed together, the minimum `supervisor_timeout` - /// across all `event::Subscriber`s will be used. - /// - /// If the `supervisor_timeout()` is `None` across all `event::Subscriber`s, connection supervision - /// will cease for the remaining lifetime of the connection and `on_supervisor_timeout` will no longer - /// be called. - /// - /// It is recommended to avoid setting this value less than ~100ms, as short durations - /// may lead to higher CPU utilization. - #[allow(unused_variables)] - fn supervisor_timeout(&mut self, conn_context: &mut Self::ConnectionContext, meta: &ConnectionMeta, context: &supervisor::Context) -> Option { - None - } - - /// Called for each `supervisor_timeout` to determine any action to take on the connection based on the `supervisor::Outcome` - /// - /// If multiple `event::Subscriber`s are composed together, the minimum `supervisor_timeout` - /// across all `event::Subscriber`s will be used, and thus `on_supervisor_timeout` may be called - /// earlier than the `supervisor_timeout` for a given `event::Subscriber` implementation. - #[allow(unused_variables)] - fn on_supervisor_timeout(&mut self, conn_context: &mut Self::ConnectionContext, meta: &ConnectionMeta, context: &supervisor::Context) -> supervisor::Outcome { - supervisor::Outcome::default() - } + #supervisor_timeout #subscriber /// Called for each event that relates to the endpoint and all connections #[inline] - fn on_event(&mut self, meta: &M, event: &E) { + fn on_event(&#mode self, meta: &M, event: &E) { let _ = meta; let _ = event; } /// Called for each event that relates to a connection #[inline] - fn on_connection_event(&mut self, context: &mut Self::ConnectionContext, meta: &ConnectionMeta, event: &E) { + fn on_connection_event(&#mode self, context: &#mode Self::ConnectionContext, meta: &ConnectionMeta, event: &E) { let _ = context; let _ = meta; let _ = event; @@ -323,11 +523,7 @@ impl ToTokens for Output { query.execute(context) } - /// Used for querying and mutating the `Subscriber::ConnectionContext` on a Subscriber - #[inline] - fn query_mut(context: &mut Self::ConnectionContext, query: &mut dyn query::QueryMut) -> query::ControlFlow { - query.execute_mut(context) - } + #query_mut } /// Subscriber is implemented for a 2-element tuple to make it easy to compose multiple @@ -340,44 +536,24 @@ impl ToTokens for Output { type ConnectionContext = (A::ConnectionContext, B::ConnectionContext); #[inline] - fn create_connection_context(&mut self, meta: &ConnectionMeta, info: &ConnectionInfo) -> Self::ConnectionContext { + fn create_connection_context(&#mode self, meta: &ConnectionMeta, info: &ConnectionInfo) -> Self::ConnectionContext { (self.0.create_connection_context(meta, info), self.1.create_connection_context(meta, info)) } - #[inline] - fn supervisor_timeout(&mut self, conn_context: &mut Self::ConnectionContext, meta: &ConnectionMeta, context: &supervisor::Context) -> Option { - let timeout_a = self.0.supervisor_timeout(&mut conn_context.0, meta, context); - let timeout_b = self.1.supervisor_timeout(&mut conn_context.1, meta, context); - match (timeout_a, timeout_b) { - (None, None) => None, - (None, Some(timeout)) | (Some(timeout), None) => Some(timeout), - (Some(a), Some(b)) => Some(a.min(b)), - } - } - - #[inline] - fn on_supervisor_timeout(&mut self, conn_context: &mut Self::ConnectionContext, meta: &ConnectionMeta, context: &supervisor::Context) -> supervisor::Outcome { - let outcome_a = self.0.on_supervisor_timeout(&mut conn_context.0, meta, context); - let outcome_b = self.1.on_supervisor_timeout(&mut conn_context.1, meta, context); - match (outcome_a, outcome_b) { - (supervisor::Outcome::ImmediateClose { reason }, _) | (_, supervisor::Outcome::ImmediateClose { reason }) => supervisor::Outcome::ImmediateClose { reason }, - (supervisor::Outcome::Close { error_code }, _) | (_, supervisor::Outcome::Close { error_code }) => supervisor::Outcome::Close { error_code }, - _ => supervisor::Outcome::Continue, - } - } + #supervisor_timeout_tuple #tuple_subscriber #[inline] - fn on_event(&mut self, meta: &M, event: &E) { + fn on_event(&#mode self, meta: &M, event: &E) { self.0.on_event(meta, event); self.1.on_event(meta, event); } #[inline] - fn on_connection_event(&mut self, context: &mut Self::ConnectionContext, meta: &ConnectionMeta, event: &E) { - self.0.on_connection_event(&mut context.0, meta, event); - self.1.on_connection_event(&mut context.1, meta, event); + fn on_connection_event(&#mode self, context: &#mode Self::ConnectionContext, meta: &ConnectionMeta, event: &E) { + self.0.on_connection_event(&#mode context.0, meta, event); + self.1.on_connection_event(&#mode context.1, meta, event); } #[inline] @@ -387,12 +563,7 @@ impl ToTokens for Output { .and_then(|| B::query(&context.1, query)) } - #[inline] - fn query_mut(context: &mut Self::ConnectionContext, query: &mut dyn query::QueryMut) -> query::ControlFlow { - query.execute_mut(context) - .and_then(|| A::query_mut(&mut context.0, query)) - .and_then(|| B::query_mut(&mut context.1, query)) - } + #query_mut_tuple } pub trait EndpointPublisher { @@ -405,7 +576,7 @@ impl ToTokens for Output { pub struct EndpointPublisherSubscriber<'a, Sub: Subscriber> { meta: EndpointMeta, quic_version: Option, - subscriber: &'a mut Sub, + subscriber: &'a #mode Sub, } impl<'a, Sub: Subscriber> fmt::Debug for EndpointPublisherSubscriber<'a, Sub> { @@ -422,7 +593,7 @@ impl ToTokens for Output { pub fn new( meta: builder::EndpointMeta, quic_version: Option, - subscriber: &'a mut Sub, + subscriber: &'a #mode Sub, ) -> Self { Self { meta: meta.into_event(), @@ -454,8 +625,8 @@ impl ToTokens for Output { pub struct ConnectionPublisherSubscriber<'a, Sub: Subscriber> { meta: ConnectionMeta, quic_version: u32, - subscriber: &'a mut Sub, - context: &'a mut Sub::ConnectionContext, + subscriber: &'a #mode Sub, + context: &'a #mode Sub::ConnectionContext, } impl<'a, Sub: Subscriber> fmt::Debug for ConnectionPublisherSubscriber<'a, Sub> { @@ -472,8 +643,8 @@ impl ToTokens for Output { pub fn new( meta: builder::ConnectionMeta, quic_version: u32, - subscriber: &'a mut Sub, - context: &'a mut Sub::ConnectionContext + subscriber: &'a #mode Sub, + context: &'a #mode Sub::ConnectionContext ) -> Self { Self { meta: meta.into_event(), @@ -502,11 +673,11 @@ impl ToTokens for Output { #[cfg(any(test, feature = "testing"))] pub mod testing { use super::*; - + #imports #[derive(Clone, Debug)] pub struct Subscriber { location: Option, - output: Vec, + output: #testing_output_type, #testing_fields } @@ -518,7 +689,7 @@ impl ToTokens for Output { } if let Some(location) = self.location.as_ref() { - location.snapshot(&self.output); + location.snapshot(&self.output #lock); } } } @@ -545,7 +716,7 @@ impl ToTokens for Output { impl super::Subscriber for Subscriber { type ConnectionContext = (); - fn create_connection_context(&mut self, _meta: &api::ConnectionMeta, _info: &api::ConnectionInfo) -> Self::ConnectionContext {} + fn create_connection_context(&#mode self, _meta: &api::ConnectionMeta, _info: &api::ConnectionInfo) -> Self::ConnectionContext {} #subscriber_testing } @@ -553,7 +724,7 @@ impl ToTokens for Output { #[derive(Clone, Debug)] pub struct Publisher { location: Option, - output: Vec, + output: #testing_output_type, #testing_fields } @@ -604,7 +775,7 @@ impl ToTokens for Output { } if let Some(location) = self.location.as_ref() { - location.snapshot(&self.output); + location.snapshot(&self.output #lock); } } } @@ -669,50 +840,80 @@ impl ToTokens for Output { } } -fn main() -> Result<()> { - let mut files = vec![]; - - for path in glob::glob(concat!(env!("CARGO_MANIFEST_DIR"), "/events/**/*.rs"))? { - let path = path?; - let file = std::fs::read_to_string(path)?; - files.push(parser::parse(&file).unwrap()); - } - - let mut output = Output::default(); - - for file in &files { - file.to_tokens(&mut output); - } - - let generated = concat!( - env!("CARGO_MANIFEST_DIR"), - "/../s2n-quic-core/src/event/generated.rs" - ); - - let mut o = std::fs::File::create(generated)?; +struct EventInfo<'a> { + input_path: &'a str, + output_path: &'a str, + output_mode: OutputMode, +} - macro_rules! put { - ($($arg:tt)*) => {{ - use std::io::Write; - writeln!(o, $($arg)*)?; - }} +fn main() -> Result<()> { + let event_paths = [ + EventInfo { + input_path: concat!( + env!("CARGO_MANIFEST_DIR"), + "/../../dc/s2n-quic-dc/src/event/events.rs" + ), + output_path: concat!( + env!("CARGO_MANIFEST_DIR"), + "/../../dc/s2n-quic-dc/src/event/generated.rs" + ), + output_mode: OutputMode::Ref, + }, + EventInfo { + input_path: concat!(env!("CARGO_MANIFEST_DIR"), "/events/**/*.rs"), + output_path: concat!( + env!("CARGO_MANIFEST_DIR"), + "/../s2n-quic-core/src/event/generated.rs" + ), + output_mode: OutputMode::Mut, + }, + ]; + + for event_info in event_paths { + let mut files = vec![]; + + for path in glob::glob(event_info.input_path)? { + let path = path?; + let file = std::fs::read_to_string(path)?; + files.push(parser::parse(&file).unwrap()); + } + + let mut output = Output { + mode: event_info.output_mode, + ..Default::default() + }; + + for file in &files { + file.to_tokens(&mut output); + } + + let generated = event_info.output_path; + + let mut o = std::fs::File::create(generated)?; + + macro_rules! put { + ($($arg:tt)*) => {{ + use std::io::Write; + writeln!(o, $($arg)*)?; + }} + } + + put!("// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved."); + put!("// SPDX-License-Identifier: Apache-2.0"); + put!(); + put!("// DO NOT MODIFY THIS FILE"); + put!("// This file was generated with the `s2n-quic-events` crate and any required"); + put!("// changes should be made there."); + put!(); + put!("{}", output.to_token_stream()); + + let status = std::process::Command::new("rustfmt") + .arg(generated) + .spawn()? + .wait()?; + + assert!(status.success()); } - put!("// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved."); - put!("// SPDX-License-Identifier: Apache-2.0"); - put!(); - put!("// DO NOT MODIFY THIS FILE"); - put!("// This file was generated with the `s2n-quic-events` crate and any required"); - put!("// changes should be made there."); - put!(); - put!("{}", output.to_token_stream()); - - let status = std::process::Command::new("rustfmt") - .arg(generated) - .spawn()? - .wait()?; - - assert!(status.success()); - Ok(()) } diff --git a/quic/s2n-quic-events/src/parser.rs b/quic/s2n-quic-events/src/parser.rs index fdd12b2df4..948bee45b5 100644 --- a/quic/s2n-quic-events/src/parser.rs +++ b/quic/s2n-quic-events/src/parser.rs @@ -148,14 +148,21 @@ impl Struct { let publisher_doc = format!("Publishes a `{ident_str}` event to the publisher's subscriber"); + let counter_type = output.mode.counter_type(); + let counter_init = output.mode.counter_init(); + // add a counter for testing structs output.testing_fields.extend(quote!( - pub #counter: u32, + pub #counter: #counter_type, )); output.testing_fields_init.extend(quote!( - #counter: 0, + #counter: #counter_init, )); + let receiver = output.mode.receiver(); + let counter_increment = output.mode.counter_increment(); + let lock = output.mode.lock(); + match attrs.subject { Subject::Endpoint => { output.subscriber.extend(quote!( @@ -163,7 +170,7 @@ impl Struct { #[inline] #deprecated #allow_deprecated - fn #function(&mut self, meta: &EndpointMeta, event: &#ident) { + fn #function(&#receiver self, meta: &EndpointMeta, event: &#ident) { let _ = meta; let _ = event; } @@ -172,7 +179,7 @@ impl Struct { output.tuple_subscriber.extend(quote!( #[inline] #allow_deprecated - fn #function(&mut self, meta: &EndpointMeta, event: &#ident) { + fn #function(&#receiver self, meta: &EndpointMeta, event: &#ident) { (self.0).#function(meta, event); (self.1).#function(meta, event); } @@ -181,7 +188,7 @@ impl Struct { output.tracing_subscriber.extend(quote!( #[inline] #allow_deprecated - fn #function(&mut self, meta: &api::EndpointMeta, event: &api::#ident) { + fn #function(&#receiver self, meta: &api::EndpointMeta, event: &api::#ident) { let parent = match meta.endpoint_type { api::EndpointType::Client {} => { self.client.id() @@ -197,13 +204,13 @@ impl Struct { output.endpoint_publisher.extend(quote!( #[doc = #publisher_doc] - fn #function(&mut self, event: builder::#ident); + fn #function(&#receiver self, event: builder::#ident); )); output.endpoint_publisher_subscriber.extend(quote!( #[inline] #allow_deprecated - fn #function(&mut self, event: builder::#ident) { + fn #function(&#receiver self, event: builder::#ident) { let event = event.into_event(); self.subscriber.#function(&self.meta, &event); self.subscriber.on_event(&self.meta, &event); @@ -212,18 +219,18 @@ impl Struct { output.subscriber_testing.extend(quote!( #allow_deprecated - fn #function(&mut self, meta: &api::EndpointMeta, event: &api::#ident) { - self.#counter += 1; - self.output.push(format!("{meta:?} {event:?}")); + fn #function(&#receiver self, meta: &api::EndpointMeta, event: &api::#ident) { + self.#counter #counter_increment; + self.output #lock.push(format!("{meta:?} {event:?}")); } )); output.endpoint_publisher_testing.extend(quote!( #allow_deprecated - fn #function(&mut self, event: builder::#ident) { - self.#counter += 1; + fn #function(&#receiver self, event: builder::#ident) { + self.#counter #counter_increment; let event = event.into_event(); - self.output.push(format!("{event:?}")); + self.output #lock.push(format!("{event:?}")); } )); } @@ -233,7 +240,7 @@ impl Struct { #[inline] #deprecated #allow_deprecated - fn #function(&mut self, context: &mut Self::ConnectionContext, meta: &ConnectionMeta, event: &#ident) { + fn #function(&#receiver self, context: &#receiver Self::ConnectionContext, meta: &ConnectionMeta, event: &#ident) { let _ = context; let _ = meta; let _ = event; @@ -243,16 +250,16 @@ impl Struct { output.tuple_subscriber.extend(quote!( #[inline] #allow_deprecated - fn #function(&mut self, context: &mut Self::ConnectionContext, meta: &ConnectionMeta, event: &#ident) { - (self.0).#function(&mut context.0, meta, event); - (self.1).#function(&mut context.1, meta, event); + fn #function(&#receiver self, context: &#receiver Self::ConnectionContext, meta: &ConnectionMeta, event: &#ident) { + (self.0).#function(&#receiver context.0, meta, event); + (self.1).#function(&#receiver context.1, meta, event); } )); output.tracing_subscriber.extend(quote!( #[inline] #allow_deprecated - fn #function(&mut self, context: &mut Self::ConnectionContext, _meta: &api::ConnectionMeta, event: &api::#ident) { + fn #function(&#receiver self, context: &#receiver Self::ConnectionContext, _meta: &api::ConnectionMeta, event: &api::#ident) { let id = context.id(); let api::#ident { #(#destructure_fields),* } = event; tracing::event!(target: #snake, parent: id, tracing::Level::DEBUG, #(#destructure_fields = tracing::field::debug(#destructure_fields)),*); @@ -261,13 +268,13 @@ impl Struct { output.connection_publisher.extend(quote!( #[doc = #publisher_doc] - fn #function(&mut self, event: builder::#ident); + fn #function(&#receiver self, event: builder::#ident); )); output.connection_publisher_subscriber.extend(quote!( #[inline] #allow_deprecated - fn #function(&mut self, event: builder::#ident) { + fn #function(&#receiver self, event: builder::#ident) { let event = event.into_event(); self.subscriber.#function(self.context, &self.meta, &event); self.subscriber.on_connection_event(self.context, &self.meta, &event); @@ -277,21 +284,21 @@ impl Struct { output.subscriber_testing.extend(quote!( #allow_deprecated - fn #function(&mut self, _context: &mut Self::ConnectionContext, meta: &api::ConnectionMeta, event: &api::#ident) { - self.#counter += 1; + fn #function(&#receiver self, _context: &#receiver Self::ConnectionContext, meta: &api::ConnectionMeta, event: &api::#ident) { + self.#counter #counter_increment; if self.location.is_some() { - self.output.push(format!("{meta:?} {event:?}")); + self.output #lock.push(format!("{meta:?} {event:?}")); } } )); output.connection_publisher_testing.extend(quote!( #allow_deprecated - fn #function(&mut self, event: builder::#ident) { - self.#counter += 1; + fn #function(&#receiver self, event: builder::#ident) { + self.#counter #counter_increment; let event = event.into_event(); if self.location.is_some() { - self.output.push(format!("{event:?}")); + self.output #lock.push(format!("{event:?}")); } } ));