Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 87 additions & 59 deletions src/rtsp/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use gstreamer::ClockTime;
use std::{collections::HashMap, time::Duration};

use anyhow::{anyhow, Context, Result};
use gstreamer::{prelude::*, Bin, Caps, Element, ElementFactory, FlowError, GhostPad};
use gstreamer::{prelude::*, Bin, Caps, Element, ElementFactory, GhostPad};
use gstreamer_app::{AppSrc, AppSrcCallbacks, AppStreamType};
use neolink_core::{
bc_protocol::StreamKind,
Expand Down Expand Up @@ -352,11 +352,75 @@ fn send_to_sources(
Ok(())
}

fn bucket_size_for(n: usize) -> Option<usize> {
const MIN_BUCKET: usize = 256;
const MAX_BUCKET: usize = 64 * 1024;
if n == 0 {
return Some(MIN_BUCKET);
}
if n > MAX_BUCKET {
return None;
}
let mut b = n.next_power_of_two();
if b < MIN_BUCKET {
b = MIN_BUCKET;
}
Some(b)
}

fn acquire_pooled_buffer(
pools: &mut std::collections::HashMap<usize, gstreamer::BufferPool>,
data: &[u8],
timestamp: gstreamer::ClockTime,
) -> AnyResult<gstreamer::Buffer> {
let needed = data.len();
if let Some(bucket) = bucket_size_for(needed) {
let pool = pools.entry(bucket).or_insert_with(|| {
let pool = gstreamer::BufferPool::new();
let mut cfg = pool.config();
// caps=None, size=bucket, min=8, max=64
cfg.set_params(None, bucket as u32, 8, 64);
pool.set_config(cfg).expect("pool config failed");
pool.set_active(true).expect("activate pool");
log::info!("New BufferPool (Bucket) allocated: size={bucket}");
pool
});

let mut buf = pool.acquire_buffer(None)?;
{
let buf_ref = buf.get_mut().unwrap();
buf_ref.set_dts(timestamp);
buf_ref.set_pts(timestamp);
{
let mut map = buf_ref.map_writable().unwrap();
map[..needed].copy_from_slice(data);
}
if bucket > needed {
let _ = buf_ref.set_size(needed);
}
}
Ok(buf)
} else {
// Fallback without pooling
let mut buf = gstreamer::Buffer::with_size(needed)
.context("allocate large non-pooled buffer")?;
{
let buf_ref = buf.get_mut().unwrap();
buf_ref.set_dts(timestamp);
buf_ref.set_pts(timestamp);
let mut map = buf_ref.map_writable().unwrap();
map.copy_from_slice(data);
}
Ok(buf)
}
}


fn send_to_appsrc(
appsrc: &AppSrc,
appsrc: &gstreamer_app::AppSrc,
data: Vec<u8>,
mut ts: Duration,
pools: &mut HashMap<usize, gstreamer::BufferPool>,
mut ts: std::time::Duration,
pools: &mut std::collections::HashMap<usize, gstreamer::BufferPool>,
) -> AnyResult<()> {
check_live(appsrc)?; // Stop if appsrc is dropped

Expand All @@ -372,78 +436,42 @@ fn send_to_appsrc(
if matches!(appsrc.current_state(), gstreamer::State::Playing) {
ts = Duration::from_micros(time.useconds());
} else {
// Not playing
// Not playing
return Ok(());
}
} else {
// Clock not up yet
// Clock not up yet
return Ok(());
}
}
let buf = {
let msg_size = data.len();

// Get or create a pool of this len
let pool = pools.entry(msg_size).or_insert_with_key(|size| {
let pool = gstreamer::BufferPool::new();
let mut pool_config = pool.config();
// Set a max buffers to ensure we don't grow in memory endlessly
pool_config.set_params(None, (*size) as u32, 8, 32);
pool.set_config(pool_config).unwrap();
pool.set_active(true).unwrap();
pool
});

// Get a buffer from the pool and then copy in the data
let gst_buf = {
let mut new_buf = pool.acquire_buffer(None).unwrap();
let gst_buf_mut = new_buf.get_mut().unwrap();
let time = ClockTime::from_useconds(ts.as_micros() as u64);
gst_buf_mut.set_dts(time);
gst_buf_mut.set_pts(time);
let mut gst_buf_data = gst_buf_mut.map_writable().unwrap();
gst_buf_data.copy_from_slice(data.as_slice());
drop(gst_buf_data);
new_buf
};
let timestamp = ClockTime::from_useconds(ts.as_micros() as u64);
let buf = acquire_pooled_buffer(pools, &data, timestamp)?;

// Return the new buffer with the data
gst_buf
};

// Push buffer into the appsrc
match appsrc.push_buffer(buf) {
Ok(_) => {
// log::info!(
// "Send {}{} on {}",
// data.data.len(),
// if data.keyframe { " (keyframe)" } else { "" },
// appsrc.name()
// );
Ok(())
}
Err(FlowError::Flushing) => {
// Buffer is full just skip
Ok(_) => {}
Err(gstreamer::FlowError::Flushing) => {
log::info!(
"Buffer full on {} pausing stream until client consumes frames",
appsrc.name()
);
Ok(())
return Ok(());
}
Err(e) => Err(anyhow!("Error in streaming: {e:?}")),
}?;
// Check if we need to pause
if appsrc.current_level_bytes() >= appsrc.max_bytes() * 2 / 3
&& matches!(appsrc.current_state(), gstreamer::State::Paused)
{
appsrc.set_state(gstreamer::State::Playing).unwrap();
} else if appsrc.current_level_bytes() <= appsrc.max_bytes() / 3
&& matches!(appsrc.current_state(), gstreamer::State::Playing)
{
appsrc.set_state(gstreamer::State::Paused).unwrap();
Err(e) => return Err(anyhow::anyhow!("Error in streaming: {e:?}")),
}

// Backpressure-Logic
let level = appsrc.current_level_bytes();
let max = appsrc.max_bytes();
if level >= max * 2 / 3 && matches!(appsrc.current_state(), gstreamer::State::Paused) {
let _ = appsrc.set_state(gstreamer::State::Playing);
} else if level <= max / 3 && matches!(appsrc.current_state(), gstreamer::State::Playing) {
let _ = appsrc.set_state(gstreamer::State::Paused);
}

Ok(())
}

fn check_live(app: &AppSrc) -> Result<()> {
app.bus().ok_or(anyhow!("App source is closed"))?;
app.pads()
Expand Down