From 5d08eae788916c4c19a5d69e94a093f376263810 Mon Sep 17 00:00:00 2001 From: Brayden Strong <bstrong621@gmail.com> Date: Tue, 6 May 2025 23:50:52 -0500 Subject: [PATCH] draft --- src/vmm/src/devices/virtio/net/tap.rs | 13 +- src/vmm/src/devices/virtio/rng/device.rs | 111 ++++---- src/vmm/src/devices/virtio/rng/metrics.rs | 228 +++++++++++++++-- src/vmm/src/devices/virtio/vsock/device.rs | 16 +- .../src/devices/virtio/vsock/event_handler.rs | 21 +- src/vmm/src/devices/virtio/vsock/metrics.rs | 238 +++++++++++++++++- 6 files changed, 530 insertions(+), 97 deletions(-) diff --git a/src/vmm/src/devices/virtio/net/tap.rs b/src/vmm/src/devices/virtio/net/tap.rs index c516705af31..a18460e1159 100644 --- a/src/vmm/src/devices/virtio/net/tap.rs +++ b/src/vmm/src/devices/virtio/net/tap.rs @@ -241,10 +241,17 @@ pub mod tests { generated::ifreq__bindgen_ty_1::default().ifrn_name.len() }); - // Empty name - The tap should be named "tap0" by default + // Empty name - The tap should be named by the kernel (e.g., "tap0", "tap1", etc.) let tap = Tap::open_named("").unwrap(); - assert_eq!(b"tap0\0\0\0\0\0\0\0\0\0\0\0\0", &tap.if_name); - assert_eq!("tap0", tap.if_name_as_str()); + let tap_name_str = tap.if_name_as_str(); + + // Check that it starts with "tap" and the remainder is numeric. + assert!( + Regex::new(r"^tap\d+$").unwrap().is_match(tap_name_str), + "Generated tap name '{}' does not match expected pattern", + tap_name_str + ); + // Test using '%d' to have the kernel assign an unused name, // and that we correctly copy back that generated name diff --git a/src/vmm/src/devices/virtio/rng/device.rs b/src/vmm/src/devices/virtio/rng/device.rs index 97ac8676e0a..08ef31567ec 100644 --- a/src/vmm/src/devices/virtio/rng/device.rs +++ b/src/vmm/src/devices/virtio/rng/device.rs @@ -9,7 +9,7 @@ use aws_lc_rs::rand; use vm_memory::GuestMemoryError; use vmm_sys_util::eventfd::EventFd; -use super::metrics::METRICS; +use super::metrics::EntropyMetricsPerDevice; use super::{RNG_NUM_QUEUES, RNG_QUEUE}; use crate::devices::DeviceError; use crate::devices::virtio::device::{DeviceState, IrqTrigger, IrqType, VirtioDevice}; @@ -113,6 +113,7 @@ impl Entropy { } fn handle_one(&mut self) -> Result<u32, EntropyError> { + let global = EntropyMetricsPerDevice::alloc("global".to_string()); // If guest provided us with an empty buffer just return directly if self.buffer.is_empty() { return Ok(0); @@ -120,7 +121,7 @@ impl Entropy { let mut rand_bytes = vec![0; self.buffer.len() as usize]; rand::fill(&mut rand_bytes).inspect_err(|_| { - METRICS.host_rng_fails.inc(); + global.host_rng_fails.inc(); })?; // It is ok to unwrap here. We are writing `iovec.len()` bytes at offset 0. @@ -129,12 +130,13 @@ impl Entropy { } fn process_entropy_queue(&mut self) { + let global = EntropyMetricsPerDevice::alloc("global".to_string()); let mut used_any = false; while let Some(desc) = self.queues[RNG_QUEUE].pop() { // This is safe since we checked in the event handler that the device is activated. let mem = self.device_state.mem().unwrap(); let index = desc.index; - METRICS.entropy_event_count.inc(); + global.entropy_event_count.inc(); // SAFETY: This descriptor chain points to a single `DescriptorChain` memory buffer, // no other `IoVecBufferMut` object points to the same `DescriptorChain` at the same @@ -151,20 +153,20 @@ impl Entropy { // to handle once we do have budget. if !self.rate_limit_request(u64::from(self.buffer.len())) { debug!("entropy: throttling entropy queue"); - METRICS.entropy_rate_limiter_throttled.inc(); + global.entropy_rate_limiter_throttled.inc(); self.queues[RNG_QUEUE].undo_pop(); break; } self.handle_one().unwrap_or_else(|err| { error!("entropy: {err}"); - METRICS.entropy_event_fails.inc(); + global.entropy_event_fails.inc(); 0 }) } Err(err) => { error!("entropy: Could not parse descriptor chain: {err}"); - METRICS.entropy_event_fails.inc(); + global.entropy_event_fails.inc(); 0 } }; @@ -172,12 +174,12 @@ impl Entropy { match self.queues[RNG_QUEUE].add_used(index, bytes) { Ok(_) => { used_any = true; - METRICS.entropy_bytes.add(bytes.into()); + global.entropy_bytes.add(bytes.into()); } Err(err) => { error!("entropy: Could not add used descriptor to queue: {err}"); Self::rate_limit_replenish_request(&mut self.rate_limiter, bytes.into()); - METRICS.entropy_event_fails.inc(); + global.entropy_event_fails.inc(); // If we are not able to add a buffer to the used queue, something // is probably seriously wrong, so just stop processing additional // buffers @@ -189,25 +191,26 @@ impl Entropy { if used_any { self.signal_used_queue().unwrap_or_else(|err| { error!("entropy: {err:?}"); - METRICS.entropy_event_fails.inc() + global.entropy_event_fails.inc() }); } } pub(crate) fn process_entropy_queue_event(&mut self) { + let global = EntropyMetricsPerDevice::alloc("global".to_string()); if let Err(err) = self.queue_events[RNG_QUEUE].read() { error!("Failed to read entropy queue event: {err}"); - METRICS.entropy_event_fails.inc(); + global.entropy_event_fails.inc(); } else if !self.rate_limiter.is_blocked() { // We are not throttled, handle the entropy queue self.process_entropy_queue(); } else { - METRICS.rate_limiter_event_count.inc(); + global.rate_limiter_event_count.inc(); } } pub(crate) fn process_rate_limiter_event(&mut self) { - METRICS.rate_limiter_event_count.inc(); + global.rate_limiter_event_count.inc(); match self.rate_limiter.event_handler() { Ok(_) => { // There might be enough budget now to process entropy requests. @@ -215,7 +218,7 @@ impl Entropy { } Err(err) => { error!("entropy: Failed to handle rate-limiter event: {err:?}"); - METRICS.entropy_event_fails.inc(); + global.entropy_event_fails.inc(); } } } @@ -291,13 +294,15 @@ impl VirtioDevice for Entropy { } fn activate(&mut self, mem: GuestMemoryMmap) -> Result<(), ActivateError> { + let global = EntropyMetricsPerDevice::alloc("global".to_string()); + for q in self.queues.iter_mut() { q.initialize(&mem) .map_err(ActivateError::QueueMemoryError)?; } self.activate_event.write(1).map_err(|_| { - METRICS.activate_fails.inc(); + global.activate_fails.inc(); ActivateError::EventFd })?; self.device_state = DeviceState::Activated(mem); @@ -454,6 +459,7 @@ mod tests { #[test] fn test_entropy_event() { + let global = EntropyMetricsPerDevice::alloc("global".to_string()); let mem = create_virtio_mem(); let mut th = VirtioTestHelper::<Entropy>::new(&mem, default_entropy()); @@ -462,29 +468,29 @@ mod tests { // Add a read-only descriptor (this should fail) th.add_desc_chain(RNG_QUEUE, 0, &[(0, 64, 0)]); - let entropy_event_fails = METRICS.entropy_event_fails.count(); - let entropy_event_count = METRICS.entropy_event_count.count(); - let entropy_bytes = METRICS.entropy_bytes.count(); - let host_rng_fails = METRICS.host_rng_fails.count(); + let entropy_event_fails = global.entropy_event_fails.count(); + let entropy_event_count = global.entropy_event_count.count(); + let entropy_bytes = global.entropy_bytes.count(); + let host_rng_fails = global.host_rng_fails.count(); assert_eq!(th.emulate_for_msec(100).unwrap(), 1); - assert_eq!(METRICS.entropy_event_fails.count(), entropy_event_fails + 1); - assert_eq!(METRICS.entropy_event_count.count(), entropy_event_count + 1); - assert_eq!(METRICS.entropy_bytes.count(), entropy_bytes); - assert_eq!(METRICS.host_rng_fails.count(), host_rng_fails); + assert_eq!(global.entropy_event_fails.count(), entropy_event_fails + 1); + assert_eq!(global.entropy_event_count.count(), entropy_event_count + 1); + assert_eq!(global.entropy_bytes.count(), entropy_bytes); + assert_eq!(global.host_rng_fails.count(), host_rng_fails); // Add two good descriptors th.add_desc_chain(RNG_QUEUE, 0, &[(1, 10, VIRTQ_DESC_F_WRITE)]); th.add_desc_chain(RNG_QUEUE, 100, &[(2, 20, VIRTQ_DESC_F_WRITE)]); - let entropy_event_fails = METRICS.entropy_event_fails.count(); - let entropy_event_count = METRICS.entropy_event_count.count(); - let entropy_bytes = METRICS.entropy_bytes.count(); - let host_rng_fails = METRICS.host_rng_fails.count(); + let entropy_event_fails = global.entropy_event_fails.count(); + let entropy_event_count = global.entropy_event_count.count(); + let entropy_bytes = global.entropy_bytes.count(); + let host_rng_fails = global.host_rng_fails.count(); assert_eq!(th.emulate_for_msec(100).unwrap(), 1); - assert_eq!(METRICS.entropy_event_fails.count(), entropy_event_fails); - assert_eq!(METRICS.entropy_event_count.count(), entropy_event_count + 2); - assert_eq!(METRICS.entropy_bytes.count(), entropy_bytes + 30); - assert_eq!(METRICS.host_rng_fails.count(), host_rng_fails); + assert_eq!(global.entropy_event_fails.count(), entropy_event_fails); + assert_eq!(global.entropy_event_count.count(), entropy_event_count + 2); + assert_eq!(global.entropy_bytes.count(), entropy_bytes + 30); + assert_eq!(global.host_rng_fails.count(), host_rng_fails); th.add_desc_chain( RNG_QUEUE, @@ -496,19 +502,20 @@ mod tests { ], ); - let entropy_event_fails = METRICS.entropy_event_fails.count(); - let entropy_event_count = METRICS.entropy_event_count.count(); - let entropy_bytes = METRICS.entropy_bytes.count(); - let host_rng_fails = METRICS.host_rng_fails.count(); + let entropy_event_fails = global.entropy_event_fails.count(); + let entropy_event_count = global.entropy_event_count.count(); + let entropy_bytes = global.entropy_bytes.count(); + let host_rng_fails = global.host_rng_fails.count(); assert_eq!(th.emulate_for_msec(100).unwrap(), 1); - assert_eq!(METRICS.entropy_event_fails.count(), entropy_event_fails); - assert_eq!(METRICS.entropy_event_count.count(), entropy_event_count + 1); - assert_eq!(METRICS.entropy_bytes.count(), entropy_bytes + 512); - assert_eq!(METRICS.host_rng_fails.count(), host_rng_fails); + assert_eq!(global.entropy_event_fails.count(), entropy_event_fails); + assert_eq!(global.entropy_event_count.count(), entropy_event_count + 1); + assert_eq!(global.entropy_bytes.count(), entropy_bytes + 512); + assert_eq!(global.host_rng_fails.count(), host_rng_fails); } #[test] fn test_bad_rate_limiter_event() { + let global = EntropyMetricsPerDevice::alloc("global".to_string()); let mem = create_virtio_mem(); let mut th = VirtioTestHelper::<Entropy>::new(&mem, default_entropy()); @@ -516,7 +523,7 @@ mod tests { let mut dev = th.device(); check_metric_after_block!( - &METRICS.entropy_event_fails, + &global.entropy_event_fails, 1, dev.process_rate_limiter_event() ); @@ -524,6 +531,7 @@ mod tests { #[test] fn test_bandwidth_rate_limiter() { + let global = EntropyMetricsPerDevice::alloc("global".to_string()); let mem = create_virtio_mem(); // Rate Limiter with 4000 bytes / sec allowance and no initial burst allowance let device = Entropy::new(RateLimiter::new(4000, 0, 1000, 0, 0, 0).unwrap()).unwrap(); @@ -535,7 +543,7 @@ mod tests { // buffer should be processed normally th.add_desc_chain(RNG_QUEUE, 0, &[(0, 4000, VIRTQ_DESC_F_WRITE)]); check_metric_after_block!( - METRICS.entropy_bytes, + global.entropy_bytes, 4000, th.device().process_entropy_queue() ); @@ -551,12 +559,12 @@ mod tests { th.add_desc_chain(RNG_QUEUE, 0, &[(0, 4000, VIRTQ_DESC_F_WRITE)]); th.add_desc_chain(RNG_QUEUE, 1, &[(1, 1000, VIRTQ_DESC_F_WRITE)]); check_metric_after_block!( - METRICS.entropy_bytes, + global.entropy_bytes, 4000, th.device().process_entropy_queue() ); check_metric_after_block!( - METRICS.entropy_rate_limiter_throttled, + global.entropy_rate_limiter_throttled, 1, th.device().process_entropy_queue() ); @@ -565,12 +573,13 @@ mod tests { // 250 msec should give enough time for replenishing 1000 bytes worth of tokens. // Give it an extra 100 ms just to be sure the timer event reaches us from the kernel. std::thread::sleep(Duration::from_millis(350)); - check_metric_after_block!(METRICS.entropy_bytes, 1000, th.emulate_for_msec(100)); + check_metric_after_block!(global.entropy_bytes, 1000, th.emulate_for_msec(100)); assert!(!th.device().rate_limiter().is_blocked()); } #[test] fn test_ops_rate_limiter() { + let global = EntropyMetricsPerDevice::alloc("global".to_string()); let mem = create_virtio_mem(); // Rate Limiter with unlimited bandwidth and allowance for 1 operation every 100 msec, // (10 ops/sec), without initial burst. @@ -583,7 +592,7 @@ mod tests { // so this should succeed. th.add_desc_chain(RNG_QUEUE, 0, &[(0, 4000, VIRTQ_DESC_F_WRITE)]); check_metric_after_block!( - METRICS.entropy_bytes, + global.entropy_bytes, 4000, th.device().process_entropy_queue() ); @@ -593,30 +602,30 @@ mod tests { std::thread::sleep(Duration::from_millis(1000)); // First one should succeed - let entropy_bytes = METRICS.entropy_bytes.count(); + let entropy_bytes = global.entropy_bytes.count(); th.add_desc_chain(RNG_QUEUE, 0, &[(0, 64, VIRTQ_DESC_F_WRITE)]); - check_metric_after_block!(METRICS.entropy_bytes, 64, th.emulate_for_msec(100)); - assert_eq!(METRICS.entropy_bytes.count(), entropy_bytes + 64); + check_metric_after_block!(global.entropy_bytes, 64, th.emulate_for_msec(100)); + assert_eq!(global.entropy_bytes.count(), entropy_bytes + 64); // The rate limiter is not blocked yet. assert!(!th.device().rate_limiter().is_blocked()); // But immediately asking another operation should block it because we have 1 op every 100 // msec. th.add_desc_chain(RNG_QUEUE, 0, &[(0, 64, VIRTQ_DESC_F_WRITE)]); check_metric_after_block!( - METRICS.entropy_rate_limiter_throttled, + global.entropy_rate_limiter_throttled, 1, th.emulate_for_msec(50) ); // Entropy bytes count should not have increased. - assert_eq!(METRICS.entropy_bytes.count(), entropy_bytes + 64); + assert_eq!(global.entropy_bytes.count(), entropy_bytes + 64); // After 100 msec (plus 50 msec for ensuring the event reaches us from the kernel), the // timer of the rate limiter should fire saying that there's now more tokens available check_metric_after_block!( - METRICS.rate_limiter_event_count, + global.rate_limiter_event_count, 1, th.emulate_for_msec(150) ); // The rate limiter event should have processed the pending buffer as well - assert_eq!(METRICS.entropy_bytes.count(), entropy_bytes + 128); + assert_eq!(global.entropy_bytes.count(), entropy_bytes + 128); } } diff --git a/src/vmm/src/devices/virtio/rng/metrics.rs b/src/vmm/src/devices/virtio/rng/metrics.rs index e02f5ce8cc4..d0af144fb6e 100644 --- a/src/vmm/src/devices/virtio/rng/metrics.rs +++ b/src/vmm/src/devices/virtio/rng/metrics.rs @@ -38,16 +38,63 @@ use serde::{Serialize, Serializer}; use crate::logger::SharedIncMetric; -/// Stores aggregated entropy metrics -pub(super) static METRICS: EntropyDeviceMetrics = EntropyDeviceMetrics::new(); +use std::sync::{Arc, RwLock}; +use std::collections::BTreeMap; -/// Called by METRICS.flush(), this function facilitates serialization of entropy device metrics. +/// This function facilitates aggregation and serialization of +/// per device vsock metrics. (Can also handle singular) pub fn flush_metrics<S: Serializer>(serializer: S) -> Result<S::Ok, S::Error> { - let mut seq = serializer.serialize_map(Some(1))?; - seq.serialize_entry("entropy", &METRICS)?; + let entropy_metrics = METRICS.read().unwrap(); + let metrics_len = entropy_metrics.metrics.len(); + // +1 to accomodate aggregate net metrics + let mut seq = serializer.serialize_map(Some(1 + metrics_len))?; + + let mut entropy_aggregated: EntropyDeviceMetrics = EntropyDeviceMetrics::default(); + + for (name, metrics) in entropy_metrics.metrics.iter() { + let devn = format!("entropy_{}", name); + // serialization will flush the metrics so aggregate before it. + let m: &EntropyDeviceMetrics = metrics; + entropy_aggregated.aggregate(m); + seq.serialize_entry(&devn, m)?; + } + seq.serialize_entry("entropy", &entropy_aggregated)?; seq.end() } +pub struct EntropyMetricsPerDevice { + pub metrics: BTreeMap<String, Arc<EntropyDeviceMetrics>> +} + +impl EntropyMetricsPerDevice { + /// Allocate `NetDeviceMetrics` for net device having + /// id `iface_id`. Also, allocate only if it doesn't + /// exist to avoid overwriting previously allocated data. + /// lock is always initialized so it is safe the unwrap + /// the lock without a check. + pub fn alloc(iface_id: String) -> Arc<EntropyDeviceMetrics> { + Arc::clone( + METRICS + .write() + .unwrap() + .metrics + .entry(iface_id) + .or_insert_with(|| Arc::new(EntropyDeviceMetrics::default())), + ) + } +} + +static METRICS: RwLock<EntropyMetricsPerDevice> = RwLock::new(EntropyMetricsPerDevice { + metrics: { + let tree = BTreeMap::new(); + tree.insert( + "global".to_string(), + Arc::new(EntropyDeviceMetrics::default()), + ); + tree + }, +}); + #[derive(Debug, Serialize)] pub(super) struct EntropyDeviceMetrics { /// Number of device activation failures @@ -86,15 +133,166 @@ pub mod tests { use crate::logger::IncMetric; #[test] - fn test_entropy_dev_metrics() { - let entropy_metrics: EntropyDeviceMetrics = EntropyDeviceMetrics::new(); - let entropy_metrics_local: String = serde_json::to_string(&entropy_metrics).unwrap(); - // the 1st serialize flushes the metrics and resets values to 0 so that - // we can compare the values with local metrics. - serde_json::to_string(&METRICS).unwrap(); - let entropy_metrics_global: String = serde_json::to_string(&METRICS).unwrap(); - assert_eq!(entropy_metrics_local, entropy_metrics_global); - entropy_metrics.entropy_event_count.inc(); - assert_eq!(entropy_metrics.entropy_event_count.count(), 1); + fn test_rng_dev_metrics() { + drop(METRICS.read().unwrap()); + drop(METRICS.write().unwrap()); + + for i in 0..5 { + let devn: String = format!("entropy{}", i); + NetMetricsPerDevice::alloc(devn.clone()); + METRICS + .read() + .unwrap() + .metrics + .get(&devn) + .unwrap() + .activate_fails + .inc(); + METRICS + .read() + .unwrap() + .metrics + .get(&devn) + .unwrap() + .entropy_bytes + .add(10); + METRICS + .read() + .unwrap() + .metrics + .get(&devn) + .unwrap() + .host_rng_fails + .add(5); + } + + for i in 0..5 { + let devn: String = format!("entropy{}", i); + assert!( + METRICS + .read() + .unwrap() + .metrics + .get(&devn) + .unwrap() + .activate_fails + .count() + >= 1 + ); + assert!( + METRICS + .read() + .unwrap() + .metrics + .get(&devn) + .unwrap() + .entropy_bytes + .count() + >= 10 + ); + assert_eq!( + METRICS + .read() + .unwrap() + .metrics + .get(&devn) + .unwrap() + .host_rng_fails + .count(), + 5 + ); + } + } + + #[test] + fn test_single_rng_metrics() { + // Use eth0 so that we can check thread safety with the + // `test_net_dev_metrics` which also uses the same name. + let devn = "entropy0"; + + drop(METRICS.read().unwrap()); + drop(METRICS.write().unwrap()); + + NetMetricsPerDevice::alloc(String::from(devn)); + METRICS.read().unwrap().metrics.get(devn).unwrap(); + + METRICS + .read() + .unwrap() + .metrics + .get(devn) + .unwrap() + .activate_fails + .inc(); + assert!( + METRICS + .read() + .unwrap() + .metrics + .get(devn) + .unwrap() + .activate_fails + .count() + > 0, + "{}", + METRICS + .read() + .unwrap() + .metrics + .get(devn) + .unwrap() + .activate_fails + .count() + ); + // we expect only 2 tests (this and test_max_net_dev_metrics) + // to update activate_fails count for eth0. + assert!( + METRICS + .read() + .unwrap() + .metrics + .get(devn) + .unwrap() + .activate_fails + .count() + <= 2, + "{}", + METRICS + .read() + .unwrap() + .metrics + .get(devn) + .unwrap() + .activate_fails + .count() + ); + + METRICS + .read() + .unwrap() + .metrics + .get(devn) + .unwrap() + .activate_fails + .inc(); + METRICS + .read() + .unwrap() + .metrics + .get(devn) + .unwrap() + .entropy_bytes + .add(5); + assert!( + METRICS + .read() + .unwrap() + .metrics + .get(devn) + .unwrap() + .entropy_bytes + .count() + >= 5 + ); } } diff --git a/src/vmm/src/devices/virtio/vsock/device.rs b/src/vmm/src/devices/virtio/vsock/device.rs index aa114f6cccb..3dfd7cc3236 100644 --- a/src/vmm/src/devices/virtio/vsock/device.rs +++ b/src/vmm/src/devices/virtio/vsock/device.rs @@ -34,7 +34,7 @@ use crate::devices::virtio::device::{DeviceState, IrqTrigger, IrqType, VirtioDev use crate::devices::virtio::generated::virtio_config::{VIRTIO_F_IN_ORDER, VIRTIO_F_VERSION_1}; use crate::devices::virtio::queue::Queue as VirtQueue; use crate::devices::virtio::vsock::VsockError; -use crate::devices::virtio::vsock::metrics::METRICS; +use crate::devices::virtio::vsock::metrics::VsockMetricsPerDevice; use crate::logger::IncMetric; use crate::utils::byte_order; use crate::vstate::memory::{Bytes, GuestMemoryMmap}; @@ -241,11 +241,12 @@ where // connections and the guest_cid configuration field is fetched again. Existing listen sockets // remain but their CID is updated to reflect the current guest_cid. pub fn send_transport_reset_event(&mut self) -> Result<(), DeviceError> { + let global = VsockMetricsPerDevice::alloc("global".to_string()); // This is safe since we checked in the caller function that the device is activated. let mem = self.device_state.mem().unwrap(); let head = self.queues[EVQ_INDEX].pop().ok_or_else(|| { - METRICS.ev_queue_event_fails.inc(); + global.ev_queue_event_fails.inc(); DeviceError::VsockError(VsockError::EmptyQueue) })?; @@ -301,6 +302,7 @@ where } fn read_config(&self, offset: u64, data: &mut [u8]) { + let global = VsockMetricsPerDevice::alloc("global".to_string()); match offset { 0 if data.len() == 8 => byte_order::write_le_u64(data, self.cid()), 0 if data.len() == 4 => { @@ -310,7 +312,7 @@ where byte_order::write_le_u32(data, ((self.cid() >> 32) & 0xffff_ffff) as u32) } _ => { - METRICS.cfg_fails.inc(); + global.cfg_fails.inc(); warn!( "vsock: virtio-vsock received invalid read request of {} bytes at offset {}", data.len(), @@ -321,7 +323,8 @@ where } fn write_config(&mut self, offset: u64, data: &[u8]) { - METRICS.cfg_fails.inc(); + let global = VsockMetricsPerDevice::alloc("global".to_string()); + global.cfg_fails.inc(); warn!( "vsock: guest driver attempted to write device config (offset={:#x}, len={:#x})", offset, @@ -330,13 +333,14 @@ where } fn activate(&mut self, mem: GuestMemoryMmap) -> Result<(), ActivateError> { + let global = VsockMetricsPerDevice::alloc("global".to_string()); for q in self.queues.iter_mut() { q.initialize(&mem) .map_err(ActivateError::QueueMemoryError)?; } if self.queues.len() != defs::VSOCK_NUM_QUEUES { - METRICS.activate_fails.inc(); + global.activate_fails.inc(); return Err(ActivateError::QueueMismatch { expected: defs::VSOCK_NUM_QUEUES, got: self.queues.len(), @@ -344,7 +348,7 @@ where } if self.activate_evt.write(1).is_err() { - METRICS.activate_fails.inc(); + global.activate_fails.inc(); return Err(ActivateError::EventFd); } diff --git a/src/vmm/src/devices/virtio/vsock/event_handler.rs b/src/vmm/src/devices/virtio/vsock/event_handler.rs index 632148546e5..7ec0e3aa1c3 100755 --- a/src/vmm/src/devices/virtio/vsock/event_handler.rs +++ b/src/vmm/src/devices/virtio/vsock/event_handler.rs @@ -33,7 +33,7 @@ use vmm_sys_util::epoll::EventSet; use super::VsockBackend; use super::device::{EVQ_INDEX, RXQ_INDEX, TXQ_INDEX, Vsock}; use crate::devices::virtio::device::VirtioDevice; -use crate::devices::virtio::vsock::metrics::METRICS; +use crate::devices::virtio::vsock::metrics::VsockMetricsPerDevice; use crate::logger::IncMetric; impl<B> Vsock<B> @@ -47,37 +47,39 @@ where const PROCESS_NOTIFY_BACKEND: u32 = 4; pub fn handle_rxq_event(&mut self, evset: EventSet) -> bool { + let global = VsockMetricsPerDevice::alloc("global".to_string()); if evset != EventSet::IN { warn!("vsock: rxq unexpected event {:?}", evset); - METRICS.rx_queue_event_fails.inc(); + global.rx_queue_event_fails.inc(); return false; } let mut raise_irq = false; if let Err(err) = self.queue_events[RXQ_INDEX].read() { error!("Failed to get vsock rx queue event: {:?}", err); - METRICS.rx_queue_event_fails.inc(); + global.rx_queue_event_fails.inc(); } else if self.backend.has_pending_rx() { raise_irq |= self.process_rx(); - METRICS.rx_queue_event_count.inc(); + global.rx_queue_event_count.inc(); } raise_irq } pub fn handle_txq_event(&mut self, evset: EventSet) -> bool { + let global = VsockMetricsPerDevice::alloc("global".to_string()); if evset != EventSet::IN { warn!("vsock: txq unexpected event {:?}", evset); - METRICS.tx_queue_event_fails.inc(); + global.tx_queue_event_fails.inc(); return false; } let mut raise_irq = false; if let Err(err) = self.queue_events[TXQ_INDEX].read() { error!("Failed to get vsock tx queue event: {:?}", err); - METRICS.tx_queue_event_fails.inc(); + global.tx_queue_event_fails.inc(); } else { raise_irq |= self.process_tx(); - METRICS.tx_queue_event_count.inc(); + global.tx_queue_event_count.inc(); // The backend may have queued up responses to the packets we sent during // TX queue processing. If that happened, we need to fetch those responses // and place them into RX buffers. @@ -89,15 +91,16 @@ where } pub fn handle_evq_event(&mut self, evset: EventSet) -> bool { + let global = VsockMetricsPerDevice::alloc("global".to_string()); if evset != EventSet::IN { warn!("vsock: evq unexpected event {:?}", evset); - METRICS.ev_queue_event_fails.inc(); + global.ev_queue_event_fails.inc(); return false; } if let Err(err) = self.queue_events[EVQ_INDEX].read() { error!("Failed to consume vsock evq event: {:?}", err); - METRICS.ev_queue_event_fails.inc(); + global.ev_queue_event_fails.inc(); } false } diff --git a/src/vmm/src/devices/virtio/vsock/metrics.rs b/src/vmm/src/devices/virtio/vsock/metrics.rs index d626d5dca34..f3bdde2d353 100644 --- a/src/vmm/src/devices/virtio/vsock/metrics.rs +++ b/src/vmm/src/devices/virtio/vsock/metrics.rs @@ -41,16 +41,66 @@ use serde::{Serialize, Serializer}; use crate::logger::SharedIncMetric; +use std::sync::{Arc, RwLock}; +use std::collections::BTreeMap; + /// Stores aggregate metrics of all Vsock connections/actions -pub(super) static METRICS: VsockDeviceMetrics = VsockDeviceMetrics::new(); +// pub(super) static METRICS: VsockDeviceMetrics = VsockDeviceMetrics::new(); -/// Called by METRICS.flush(), this function facilitates serialization of vsock device metrics. +/// This function facilitates aggregation and serialization of +/// per device vsock metrics. (Can also handle singular) pub fn flush_metrics<S: Serializer>(serializer: S) -> Result<S::Ok, S::Error> { - let mut seq = serializer.serialize_map(Some(1))?; - seq.serialize_entry("vsock", &METRICS)?; + let vsock_metrics = METRICS.read().unwrap(); + let metrics_len = vsock_metrics.metrics.len(); + // +1 to accomodate aggregate net metrics + let mut seq = serializer.serialize_map(Some(1 + metrics_len))?; + + let mut vsock_aggregated: VsockDeviceMetrics = VsockDeviceMetrics::default(); + + for (name, metrics) in vsock_metrics.metrics.iter() { + let devn = format!("vsock_{}", name); + // serialization will flush the metrics so aggregate before it. + let m: &VsockDeviceMetrics = metrics; + vsock_aggregated.aggregate(m); + seq.serialize_entry(&devn, m)?; + } + seq.serialize_entry("vsock", &vsock_aggregated)?; seq.end() } +pub struct VsockMetricsPerDevice { + pub metrics: BTreeMap<String, Arc<VsockDeviceMetrics>> +} + +impl VsockMetricsPerDevice { + /// Allocate `NetDeviceMetrics` for net device having + /// id `iface_id`. Also, allocate only if it doesn't + /// exist to avoid overwriting previously allocated data. + /// lock is always initialized so it is safe the unwrap + /// the lock without a check. + pub fn alloc(iface_id: String) -> Arc<VsockDeviceMetrics> { + Arc::clone( + METRICS + .write() + .unwrap() + .metrics + .entry(iface_id) + .or_insert_with(|| Arc::new(VsockDeviceMetrics::default())), + ) + } +} + +static METRICS: RwLock<VsockMetricsPerDevice> = RwLock::new(VsockMetricsPerDevice { + metrics: { + let tree = BTreeMap::new(); + tree.insert( + "global".to_string(), + Arc::new(VsockDeviceMetrics::default()), + ); + tree + }, +}); + /// Vsock-related metrics. #[derive(Debug, Serialize)] pub(super) struct VsockDeviceMetrics { @@ -130,16 +180,178 @@ pub mod tests { use super::*; use crate::logger::IncMetric; + // Simple test to test ability to handle different devices based on some id + // Mimics the behavior and test of per-device structure in network devices. #[test] fn test_vsock_dev_metrics() { - let vsock_metrics: VsockDeviceMetrics = VsockDeviceMetrics::new(); - let vsock_metrics_local: String = serde_json::to_string(&vsock_metrics).unwrap(); - // the 1st serialize flushes the metrics and resets values to 0 so that - // we can compare the values with local metrics. - serde_json::to_string(&METRICS).unwrap(); - let vsock_metrics_global: String = serde_json::to_string(&METRICS).unwrap(); - assert_eq!(vsock_metrics_local, vsock_metrics_global); - vsock_metrics.conns_added.inc(); - assert_eq!(vsock_metrics.conns_added.count(), 1); + drop(METRICS.read().unwrap()); + drop(METRICS.write().unwrap()); + + for i in 0..3 { + let devn: String = format!("vsock{}", i); + VsockMetricsPerDevice::alloc(devn.clone()); + METRICS + .read() + .unwrap() + .metrics + .get(&devn) + .unwrap() + .conns_added + .inc(); + } + METRICS + .read() + .unwrap() + .metrics + .get("vsock1") + .unwrap() + .conns_added + .add(5); + METRICS + .read() + .unwrap() + .metrics + .get("vsock2") + .unwrap() + .activate_fails + .inc(); + + let json_output = serde_json::to_string(&*METRICS.read().unwrap()).unwrap(); + + // Optional: print JSON to visually verify structure + println!("{}", json_output); + + let parsed: serde_json::Value = serde_json::from_str(&json_output).unwrap(); + let a_count = parsed["vsock_vsock0"]["conns_added"]["count"].as_u64().unwrap(); + let b_count = parsed["vsock_vsock1"]["conns_added"]["count"].as_u64().unwrap(); + let c_count = parsed["vsock_vsock2"]["conns_added"]["count"].as_u64().unwrap(); + let a_count_2 = parsed["vsock_vsock0"]["activate_fails"]["count"].as_u64().unwrap(); + let c_count_2 = parsed["vsock_vsock2"]["activate_fails"]["count"].as_u64().unwrap(); + let aggregated = parsed["vsock"]["conns_added"]["count"].as_u64().unwrap(); + + assert_eq!(a_count, 1); + assert_eq!(b_count, 6); + assert_eq!(c_count, 1); + assert_eq!(a_count_2, 0); + assert_eq!(c_count_2, 1); + assert_eq!(aggregated, 8); + + drop(METRICS.read().unwrap()); + assert_eq!(METRICS + .read() + .unwrap() + .metrics + .get("vsock0") + .unwrap() + .conns_added + .count(), 0); + assert_eq!(METRICS + .read() + .unwrap() + .metrics + .get("vsock1") + .unwrap() + .conns_added + .count(), 0); + + METRICS + .read() + .unwrap() + .metrics + .get("vsock0") + .unwrap() + .activate_fails + .inc(); + + METRICS + .read() + .unwrap() + .metrics + .get("vsock0") + .unwrap() + .rx_bytes_count + .inc(); + + } + + // Device meant to test capability of retrieving and maintaining + // a default vsock for the tree, the default represents the global value. + // Also copies thread safety test from net devices. + #[test] + fn test_vsock_default() { + // Use vsock0 so that we can check thread safety with other tests. + let devn = "vsock0"; + + // Drop any existing read/write lock to avoid deadlocks or stale locks. + drop(METRICS.read().unwrap()); + drop(METRICS.write().unwrap()); + + // Allocate metrics for the device. + VsockMetricsPerDevice::alloc(String::from(devn)); + assert!(METRICS.read().unwrap().metrics.get(devn).is_some()); + + // Increment a field (e.g. activate_fails) to ensure it's being tracked. + METRICS + .read() + .unwrap() + .metrics + .get(devn) + .unwrap() + .activate_fails + .inc(); + + let count = METRICS + .read() + .unwrap() + .metrics + .get(devn) + .unwrap() + .activate_fails + .count(); + assert!( + count > 0, + "Expected activate_fails count > 0 but got {}", + count + ); + + // Ensure only up to 2 tests increment this (if sharing across tests). + assert!( + count <= 2, + "Expected activate_fails count <= 2 but got {}", + count + ); + + // Add more metric changes and assert correctness. + METRICS + .read() + .unwrap() + .metrics + .get(devn) + .unwrap() + .activate_fails + .inc(); + + METRICS + .read() + .unwrap() + .metrics + .get(devn) + .unwrap() + .rx_bytes_count + .add(5); + + let rx_count = METRICS + .read() + .unwrap() + .metrics + .get(devn) + .unwrap() + .rx_bytes_count + .count(); + assert!( + rx_count >= 5, + "Expected rx_bytes_count >= 5 but got {}", + rx_count + ); } }