From 321daa9d2e4b583f627afc101c4ea68139b47684 Mon Sep 17 00:00:00 2001 From: Wadim Mueller Date: Wed, 13 Aug 2025 22:14:31 +0200 Subject: [PATCH] feat(rtsp): fix file descriptor exhaustion and memory fragmentation Previously, a new buffer pool was allocated for every received frame-sized package. This led to several critical issues: - Excessive memory consumption: H264 frames have large variations in size, causing significant memory usage over time. - File descriptor leaks: Each invocation of `gstreamer::BufferPool::new()` created a new socketpair, resulting in a steady increase in open file descriptors. This can be observed with: watch -n1 "ls -l /proc//fd | wc -l" Over time, this would exhaust available file descriptors. - Application instability: On devices such as the Lumus cam, memory usage would continuously rise (over 7-8GiB after 5 hours), eventually leading to a crash. This commit resolves these issues by reusing buffer pools where possible and preventing unnecessary allocation of resources. This allocates a little bit too much memory for the frames, as it takes the next power of two for the buffer, but its worth it to stabilize the application Tested-by: Wadim Mueller --- src/rtsp/factory.rs | 146 ++++++++++++++++++++++++++------------------ 1 file changed, 87 insertions(+), 59 deletions(-) diff --git a/src/rtsp/factory.rs b/src/rtsp/factory.rs index 9ba058ee..025f884f 100644 --- a/src/rtsp/factory.rs +++ b/src/rtsp/factory.rs @@ -2,7 +2,7 @@ use gstreamer::ClockTime; use std::{collections::HashMap, time::Duration}; use anyhow::{anyhow, Context, Result}; -use gstreamer::{prelude::*, Bin, Caps, Element, ElementFactory, FlowError, GhostPad}; +use gstreamer::{prelude::*, Bin, Caps, Element, ElementFactory, GhostPad}; use gstreamer_app::{AppSrc, AppSrcCallbacks, AppStreamType}; use neolink_core::{ bc_protocol::StreamKind, @@ -352,11 +352,75 @@ fn send_to_sources( Ok(()) } +fn bucket_size_for(n: usize) -> Option { + const MIN_BUCKET: usize = 256; + const MAX_BUCKET: usize = 64 * 1024; + if n == 0 { + return Some(MIN_BUCKET); + } + if n > MAX_BUCKET { + return None; + } + let mut b = n.next_power_of_two(); + if b < MIN_BUCKET { + b = MIN_BUCKET; + } + Some(b) +} + +fn acquire_pooled_buffer( + pools: &mut std::collections::HashMap, + data: &[u8], + timestamp: gstreamer::ClockTime, +) -> AnyResult { + let needed = data.len(); + if let Some(bucket) = bucket_size_for(needed) { + let pool = pools.entry(bucket).or_insert_with(|| { + let pool = gstreamer::BufferPool::new(); + let mut cfg = pool.config(); + // caps=None, size=bucket, min=8, max=64 + cfg.set_params(None, bucket as u32, 8, 64); + pool.set_config(cfg).expect("pool config failed"); + pool.set_active(true).expect("activate pool"); + log::info!("New BufferPool (Bucket) allocated: size={bucket}"); + pool + }); + + let mut buf = pool.acquire_buffer(None)?; + { + let buf_ref = buf.get_mut().unwrap(); + buf_ref.set_dts(timestamp); + buf_ref.set_pts(timestamp); + { + let mut map = buf_ref.map_writable().unwrap(); + map[..needed].copy_from_slice(data); + } + if bucket > needed { + let _ = buf_ref.set_size(needed); + } + } + Ok(buf) + } else { + // Fallback without pooling + let mut buf = gstreamer::Buffer::with_size(needed) + .context("allocate large non-pooled buffer")?; + { + let buf_ref = buf.get_mut().unwrap(); + buf_ref.set_dts(timestamp); + buf_ref.set_pts(timestamp); + let mut map = buf_ref.map_writable().unwrap(); + map.copy_from_slice(data); + } + Ok(buf) + } +} + + fn send_to_appsrc( - appsrc: &AppSrc, + appsrc: &gstreamer_app::AppSrc, data: Vec, - mut ts: Duration, - pools: &mut HashMap, + mut ts: std::time::Duration, + pools: &mut std::collections::HashMap, ) -> AnyResult<()> { check_live(appsrc)?; // Stop if appsrc is dropped @@ -372,78 +436,42 @@ fn send_to_appsrc( if matches!(appsrc.current_state(), gstreamer::State::Playing) { ts = Duration::from_micros(time.useconds()); } else { - // Not playing + // Not playing return Ok(()); } } else { - // Clock not up yet + // Clock not up yet return Ok(()); } } - let buf = { - let msg_size = data.len(); - // Get or create a pool of this len - let pool = pools.entry(msg_size).or_insert_with_key(|size| { - let pool = gstreamer::BufferPool::new(); - let mut pool_config = pool.config(); - // Set a max buffers to ensure we don't grow in memory endlessly - pool_config.set_params(None, (*size) as u32, 8, 32); - pool.set_config(pool_config).unwrap(); - pool.set_active(true).unwrap(); - pool - }); - - // Get a buffer from the pool and then copy in the data - let gst_buf = { - let mut new_buf = pool.acquire_buffer(None).unwrap(); - let gst_buf_mut = new_buf.get_mut().unwrap(); - let time = ClockTime::from_useconds(ts.as_micros() as u64); - gst_buf_mut.set_dts(time); - gst_buf_mut.set_pts(time); - let mut gst_buf_data = gst_buf_mut.map_writable().unwrap(); - gst_buf_data.copy_from_slice(data.as_slice()); - drop(gst_buf_data); - new_buf - }; + let timestamp = ClockTime::from_useconds(ts.as_micros() as u64); + let buf = acquire_pooled_buffer(pools, &data, timestamp)?; - // Return the new buffer with the data - gst_buf - }; - - // Push buffer into the appsrc match appsrc.push_buffer(buf) { - Ok(_) => { - // log::info!( - // "Send {}{} on {}", - // data.data.len(), - // if data.keyframe { " (keyframe)" } else { "" }, - // appsrc.name() - // ); - Ok(()) - } - Err(FlowError::Flushing) => { - // Buffer is full just skip + Ok(_) => {} + Err(gstreamer::FlowError::Flushing) => { log::info!( "Buffer full on {} pausing stream until client consumes frames", appsrc.name() ); - Ok(()) + return Ok(()); } - Err(e) => Err(anyhow!("Error in streaming: {e:?}")), - }?; - // Check if we need to pause - if appsrc.current_level_bytes() >= appsrc.max_bytes() * 2 / 3 - && matches!(appsrc.current_state(), gstreamer::State::Paused) - { - appsrc.set_state(gstreamer::State::Playing).unwrap(); - } else if appsrc.current_level_bytes() <= appsrc.max_bytes() / 3 - && matches!(appsrc.current_state(), gstreamer::State::Playing) - { - appsrc.set_state(gstreamer::State::Paused).unwrap(); + Err(e) => return Err(anyhow::anyhow!("Error in streaming: {e:?}")), } + + // Backpressure-Logic + let level = appsrc.current_level_bytes(); + let max = appsrc.max_bytes(); + if level >= max * 2 / 3 && matches!(appsrc.current_state(), gstreamer::State::Paused) { + let _ = appsrc.set_state(gstreamer::State::Playing); + } else if level <= max / 3 && matches!(appsrc.current_state(), gstreamer::State::Playing) { + let _ = appsrc.set_state(gstreamer::State::Paused); + } + Ok(()) } + fn check_live(app: &AppSrc) -> Result<()> { app.bus().ok_or(anyhow!("App source is closed"))?; app.pads()