Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 3 additions & 21 deletions src/rtsp/factory.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use gstreamer::ClockTime;
use std::{collections::HashMap, time::Duration};
use std::time::Duration;

use anyhow::{anyhow, Context, Result};
use gstreamer::{prelude::*, Bin, Caps, Element, ElementFactory, FlowError, GhostPad};
Expand Down Expand Up @@ -245,13 +245,11 @@ pub(super) async fn make_factory(
std::thread::spawn(move || {
let mut aud_ts = 0u32;
let mut vid_ts = 0u32;
let mut pools = Default::default();

log::trace!("{name}::{stream}: Sending buffered frames");
for buffered in buffer.drain(..) {
send_to_sources(
buffered,
&mut pools,
&vid_src,
&aud_src,
&mut vid_ts,
Expand All @@ -264,7 +262,6 @@ pub(super) async fn make_factory(
while let Some(data) = media_rx.blocking_recv() {
let r = send_to_sources(
data,
&mut pools,
&vid_src,
&aud_src,
&mut vid_ts,
Expand Down Expand Up @@ -301,7 +298,6 @@ pub(super) async fn make_factory(

fn send_to_sources(
data: BcMedia,
pools: &mut HashMap<usize, gstreamer::BufferPool>,
vid_src: &Option<AppSrc>,
aud_src: &Option<AppSrc>,
vid_ts: &mut u32,
Expand All @@ -318,7 +314,6 @@ fn send_to_sources(
aud_src,
aac.data,
Duration::from_micros(*aud_ts as u64),
pools,
)?;
}
*aud_ts += duration;
Expand All @@ -333,7 +328,6 @@ fn send_to_sources(
aud_src,
adpcm.data,
Duration::from_micros(*aud_ts as u64),
pools,
)?;
}
*aud_ts += duration;
Expand All @@ -342,7 +336,7 @@ fn send_to_sources(
| BcMedia::Pframe(BcMediaPframe { data, .. }) => {
if let Some(vid_src) = vid_src.as_ref() {
log::trace!("Sending VID: {:?}", Duration::from_micros(*vid_ts as u64));
send_to_appsrc(vid_src, data, Duration::from_micros(*vid_ts as u64), pools)?;
send_to_appsrc(vid_src, data, Duration::from_micros(*vid_ts as u64))?;
}
const MICROSECONDS: u32 = 1000000;
*vid_ts += MICROSECONDS / stream_config.fps;
Expand All @@ -356,7 +350,6 @@ fn send_to_appsrc(
appsrc: &AppSrc,
data: Vec<u8>,
mut ts: Duration,
pools: &mut HashMap<usize, gstreamer::BufferPool>,
) -> AnyResult<()> {
check_live(appsrc)?; // Stop if appsrc is dropped

Expand All @@ -383,20 +376,9 @@ fn send_to_appsrc(
let buf = {
let msg_size = data.len();

// Get or create a pool of this len
let pool = pools.entry(msg_size).or_insert_with_key(|size| {
let pool = gstreamer::BufferPool::new();
let mut pool_config = pool.config();
// Set a max buffers to ensure we don't grow in memory endlessly
pool_config.set_params(None, (*size) as u32, 8, 32);
pool.set_config(pool_config).unwrap();
pool.set_active(true).unwrap();
pool
});

// Get a buffer from the pool and then copy in the data
let gst_buf = {
let mut new_buf = pool.acquire_buffer(None).unwrap();
let mut new_buf = gstreamer::Buffer::with_size(msg_size).unwrap();
let gst_buf_mut = new_buf.get_mut().unwrap();
let time = ClockTime::from_useconds(ts.as_micros() as u64);
gst_buf_mut.set_dts(time);
Expand Down