diff --git a/.claude/settings.local.json b/.claude/settings.local.json index b83de1a96a..0f864d7abe 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -1,58 +1,58 @@ { - "permissions": { - "allow": [ - "Bash(pnpm typecheck:*)", - "Bash(pnpm lint:*)", - "Bash(pnpm build:*)", - "Bash(cargo check:*)", - "Bash(cargo fmt:*)", - "Bash(pnpm format:*)", - "Bash(pnpm exec biome check:*)", - "Bash(grep:*)", - "Bash(cargo metadata:*)", - "Bash(ffprobe:*)", - "Bash(ls:*)", - "Bash(find:*)", - "Bash(cat:*)", - "WebFetch(domain:raw.githubusercontent.com)", - "WebFetch(domain:api.github.com)", - "Bash(cargo doc:*)", - "Bash(cargo clippy:*)", - "Bash(python3:*)", - "Bash(cargo run:*)", - "WebSearch", - "Bash(xargs ls:*)", - "WebFetch(domain:ffmpeg.org)", - "Bash(git log:*)", - "Bash(tree:*)", - "Bash(tail:*)", - "Bash(pnpm typecheck:desktop:*)", - "Bash(pnpm exec tsc:*)", - "Bash(pnpm biome check:*)", - "Bash(pnpm --dir apps/desktop exec tsc:*)", - "Bash(xxd:*)", - "Bash(git checkout:*)", - "WebFetch(domain:www.npmjs.com)", - "Bash(pnpm install:*)", - "Bash(pnpm --dir apps/desktop exec biome check:*)", - "Bash(pnpm --dir apps/desktop exec biome format:*)", - "Bash(echo:*)", - "Bash(pnpm exec biome:*)", - "Bash(rustfmt:*)", - "Bash(cargo tree:*)", - "WebFetch(domain:github.com)", - "WebFetch(domain:docs.rs)", - "WebFetch(domain:gix.github.io)", - "Bash(cargo clean:*)", - "Bash(cargo test:*)", - "Bash(powershell -Command \"[System.Environment]::OSVersion.Version.ToString()\")", - "Bash(cargo build:*)", - "Bash(gh api:*)", - "Bash(curl:*)", - "Bash(node -e:*)", - "Bash(findstr:*)" - ], - "deny": [], - "ask": [] - } + "permissions": { + "allow": [ + "Bash(pnpm typecheck:*)", + "Bash(pnpm lint:*)", + "Bash(pnpm build:*)", + "Bash(cargo check:*)", + "Bash(cargo fmt:*)", + "Bash(pnpm format:*)", + "Bash(pnpm exec biome check:*)", + "Bash(grep:*)", + "Bash(cargo metadata:*)", + "Bash(ffprobe:*)", + "Bash(ls:*)", + "Bash(find:*)", + "Bash(cat:*)", + "WebFetch(domain:raw.githubusercontent.com)", + "WebFetch(domain:api.github.com)", + "Bash(cargo doc:*)", + "Bash(cargo clippy:*)", + "Bash(python3:*)", + "Bash(cargo run:*)", + "WebSearch", + "Bash(xargs ls:*)", + "WebFetch(domain:ffmpeg.org)", + "Bash(git log:*)", + "Bash(tree:*)", + "Bash(tail:*)", + "Bash(pnpm typecheck:desktop:*)", + "Bash(pnpm exec tsc:*)", + "Bash(pnpm biome check:*)", + "Bash(pnpm --dir apps/desktop exec tsc:*)", + "Bash(xxd:*)", + "Bash(git checkout:*)", + "WebFetch(domain:www.npmjs.com)", + "Bash(pnpm install:*)", + "Bash(pnpm --dir apps/desktop exec biome check:*)", + "Bash(pnpm --dir apps/desktop exec biome format:*)", + "Bash(echo:*)", + "Bash(pnpm exec biome:*)", + "Bash(rustfmt:*)", + "Bash(cargo tree:*)", + "WebFetch(domain:github.com)", + "WebFetch(domain:docs.rs)", + "WebFetch(domain:gix.github.io)", + "Bash(ffmpeg:*)", + "Bash(DYLD_LIBRARY_PATH=/opt/homebrew/lib:$DYLD_LIBRARY_PATH ./target/release/examples/memory-leak-detector:*)", + "Bash(ln:*)", + "Bash(./target/release/examples/memory-leak-detector:*)", + "Bash(cargo build:*)", + "Bash(footprint:*)", + "Bash(RUST_LOG=info,cap_recording=debug ./target/release/examples/memory-leak-detector:*)", + "Bash(git rm:*)" + ], + "deny": [], + "ask": [] + } } diff --git a/.gitignore b/.gitignore index 62ede0e17e..d2086b210a 100644 --- a/.gitignore +++ b/.gitignore @@ -51,3 +51,4 @@ tauri.windows.conf.json .cursor .env*.local .docs/ +.claude/ diff --git a/apps/desktop/src/routes/(window-chrome)/settings/feedback.tsx b/apps/desktop/src/routes/(window-chrome)/settings/feedback.tsx index ae984036cf..806b7ad120 100644 --- a/apps/desktop/src/routes/(window-chrome)/settings/feedback.tsx +++ b/apps/desktop/src/routes/(window-chrome)/settings/feedback.tsx @@ -132,7 +132,7 @@ export default function FeedbackTab() { > {(diag) => (
- + {(ver) => (

Operating System

@@ -143,45 +143,21 @@ export default function FeedbackTab() { )} - - {(gpu) => ( -
-

Graphics

-

- {gpu().description} ({gpu().vendor},{" "} - {gpu().dedicatedVideoMemoryMb} MB VRAM) -

-
- )} -
-

Capture Support

- Graphics Capture:{" "} - {diag().graphicsCaptureSupported + Screen Capture:{" "} + {diag().screenCaptureSupported ? "Supported" : "Not Supported"} - - D3D11 Video:{" "} - {diag().d3D11VideoProcessorAvailable - ? "Available" - : "Unavailable"} -
diff --git a/apps/desktop/src/utils/tauri.ts b/apps/desktop/src/utils/tauri.ts index f78bc7845a..77c8a8a94e 100644 --- a/apps/desktop/src/utils/tauri.ts +++ b/apps/desktop/src/utils/tauri.ts @@ -358,7 +358,6 @@ uploadProgressEvent: "upload-progress-event" /** user-defined types **/ -export type AllGpusInfo = { gpus: GpuInfoDiag[]; primaryGpuIndex: number | null; isMultiGpuSystem: boolean; hasDiscreteGpu: boolean } export type Annotation = { id: string; type: AnnotationType; x: number; y: number; width: number; height: number; strokeColor: string; strokeWidth: number; fillColor: string; opacity: number; rotation: number; text: string | null; maskType?: MaskType | null; maskLevel?: number | null } export type AnnotationType = "arrow" | "circle" | "rectangle" | "text" | "mask" export type AppTheme = "system" | "light" | "dark" @@ -424,7 +423,6 @@ quality: number | null; * Whether to prioritize speed over quality (default: false) */ fast: boolean | null } -export type GpuInfoDiag = { vendor: string; description: string; dedicatedVideoMemoryMb: number; adapterIndex: number; isSoftwareAdapter: boolean; isBasicRenderDriver: boolean; supportsHardwareEncoding: boolean } export type HapticPattern = "alignment" | "levelChange" | "generic" export type HapticPerformanceTime = "default" | "now" | "drawCompleted" export type Hotkey = { code: string; meta: boolean; ctrl: boolean; alt: boolean; shift: boolean } @@ -437,6 +435,7 @@ export type JsonValue = [T] export type LogicalBounds = { position: LogicalPosition; size: LogicalSize } export type LogicalPosition = { x: number; y: number } export type LogicalSize = { width: number; height: number } +export type MacOSVersionInfo = { displayName: string } export type MainWindowRecordingStartBehaviour = "close" | "minimise" export type MaskKeyframes = { position?: MaskVectorKeyframe[]; size?: MaskVectorKeyframe[]; intensity?: MaskScalarKeyframe[] } export type MaskKind = "sensitive" | "highlight" @@ -479,7 +478,6 @@ export type RecordingStatus = "pending" | "recording" export type RecordingStopped = null export type RecordingTargetMode = "display" | "window" | "area" export type RenderFrameEvent = { frame_number: number; fps: number; resolution_base: XY } -export type RenderingStatus = { isUsingSoftwareRendering: boolean; isUsingBasicRenderDriver: boolean; hardwareEncodingAvailable: boolean; warningMessage: string | null } export type RequestOpenRecordingPicker = { target_mode: RecordingTargetMode | null } export type RequestOpenSettings = { page: string } export type RequestScreenCapturePrewarm = { force?: boolean } @@ -500,7 +498,7 @@ export type StartRecordingInputs = { capture_target: ScreenCaptureTarget; captur export type StereoMode = "stereo" | "monoL" | "monoR" export type StudioRecordingMeta = { segment: SingleSegment } | { inner: MultipleSegments } export type StudioRecordingStatus = { status: "InProgress" } | { status: "NeedsRemux" } | { status: "Failed"; error: string } | { status: "Complete" } -export type SystemDiagnostics = { windowsVersion: WindowsVersionInfo | null; gpuInfo: GpuInfoDiag | null; allGpus: AllGpusInfo | null; renderingStatus: RenderingStatus; availableEncoders: string[]; graphicsCaptureSupported: boolean; d3D11VideoProcessorAvailable: boolean } +export type SystemDiagnostics = { macosVersion: MacOSVersionInfo | null; availableEncoders: string[]; screenCaptureSupported: boolean } export type TargetUnderCursor = { display_id: DisplayId | null; window: WindowUnderCursor | null } export type TextSegment = { start: number; end: number; enabled?: boolean; content?: string; center?: XY; size?: XY; fontFamily?: string; fontSize?: number; fontWeight?: number; italic?: boolean; color?: string; fadeDuration?: number } export type TimelineConfiguration = { segments: TimelineSegment[]; zoomSegments: ZoomSegment[]; sceneSegments?: SceneSegment[]; maskSegments?: MaskSegment[]; textSegments?: TextSegment[] } @@ -517,7 +515,6 @@ export type VideoUploadInfo = { id: string; link: string; config: S3UploadMeta } export type WindowExclusion = { bundleIdentifier?: string | null; ownerName?: string | null; windowTitle?: string | null } export type WindowId = string export type WindowUnderCursor = { id: WindowId; app_name: string; bounds: LogicalBounds } -export type WindowsVersionInfo = { major: number; minor: number; build: number; displayName: string; meetsRequirements: boolean; isWindows11: boolean } export type XY = { x: T; y: T } export type ZoomMode = "auto" | { manual: { x: number; y: number } } export type ZoomSegment = { start: number; end: number; amount: number; mode: ZoomMode } diff --git a/crates/camera-ffmpeg/src/macos.rs b/crates/camera-ffmpeg/src/macos.rs index b3649f67d9..eb8ee25cc3 100644 --- a/crates/camera-ffmpeg/src/macos.rs +++ b/crates/camera-ffmpeg/src/macos.rs @@ -20,6 +20,8 @@ pub enum AsFFmpegError { SwscaleFallbackFailed { format: String, reason: String }, #[error("{0}")] Native(#[from] cidre::os::Error), + #[error("No image buffer available")] + NoImageBuffer, } struct FourccInfo { @@ -53,17 +55,39 @@ static FALLBACK_WARNING_LOGGED: AtomicBool = AtomicBool::new(false); impl CapturedFrameExt for CapturedFrame { fn as_ffmpeg(&self) -> Result { - let native = self.native().clone(); - - let width = native.image_buf().width(); - let height = native.image_buf().height(); + let native = self.native(); + + let mut image_buf = native.image_buf().ok_or(AsFFmpegError::NoImageBuffer)?; + + let width = image_buf.width(); + let height = image_buf.height(); + let plane0_stride = image_buf.plane_bytes_per_row(0); + let plane1_stride = image_buf.plane_bytes_per_row(1); + let plane_count = image_buf.plane_count(); + let plane_info: [(usize, usize, usize); 3] = [ + ( + image_buf.plane_bytes_per_row(0), + image_buf.plane_height(0), + image_buf.plane_width(0), + ), + ( + image_buf.plane_bytes_per_row(1), + image_buf.plane_height(1), + image_buf.plane_width(1), + ), + ( + image_buf.plane_bytes_per_row(2), + image_buf.plane_height(2), + image_buf.plane_width(2), + ), + ]; let format_desc = native.sample_buf().format_desc().unwrap(); - let mut this = native.image_buf().clone(); - - let bytes_lock = - ImageBufExt::base_addr_lock(this.as_mut(), cv::pixel_buffer::LockFlags::READ_ONLY)?; + let bytes_lock = ImageBufExt::base_addr_lock( + image_buf.as_mut(), + cv::pixel_buffer::LockFlags::READ_ONLY, + )?; let res = match cidre::four_cc_to_str(&mut format_desc.media_sub_type().to_be_bytes()) { "2vuy" => { @@ -73,7 +97,7 @@ impl CapturedFrameExt for CapturedFrame { height as u32, ); - let src_stride = native.image_buf().plane_bytes_per_row(0); + let src_stride = plane0_stride; let dest_stride = ff_frame.stride(0); let src_bytes = bytes_lock.plane_data(0); @@ -96,7 +120,7 @@ impl CapturedFrameExt for CapturedFrame { height as u32, ); - let src_stride = native.image_buf().plane_bytes_per_row(0); + let src_stride = plane0_stride; let dest_stride = ff_frame.stride(0); let src_bytes = bytes_lock.plane_data(0); @@ -110,7 +134,7 @@ impl CapturedFrameExt for CapturedFrame { dest_row.copy_from_slice(src_row); } - let src_stride = native.image_buf().plane_bytes_per_row(1); + let src_stride = plane1_stride; let dest_stride = ff_frame.stride(1); let src_bytes = bytes_lock.plane_data(1); @@ -133,7 +157,7 @@ impl CapturedFrameExt for CapturedFrame { height as u32, ); - let src_stride = native.image_buf().plane_bytes_per_row(0); + let src_stride = plane0_stride; let dest_stride = ff_frame.stride(0); let src_bytes = bytes_lock.plane_data(0); @@ -156,7 +180,7 @@ impl CapturedFrameExt for CapturedFrame { height as u32, ); - let src_stride = native.image_buf().plane_bytes_per_row(0); + let src_stride = plane0_stride; let dest_stride = ff_frame.stride(0); let src_bytes = bytes_lock.plane_data(0); @@ -179,7 +203,7 @@ impl CapturedFrameExt for CapturedFrame { height as u32, ); - let src_stride = native.image_buf().plane_bytes_per_row(0); + let src_stride = plane0_stride; let dest_stride = ff_frame.stride(0); let src_bytes = bytes_lock.plane_data(0); @@ -202,7 +226,7 @@ impl CapturedFrameExt for CapturedFrame { height as u32, ); - let src_stride = native.image_buf().plane_bytes_per_row(0); + let src_stride = plane0_stride; let dest_stride = ff_frame.stride(0); let src_bytes = bytes_lock.plane_data(0); @@ -225,7 +249,7 @@ impl CapturedFrameExt for CapturedFrame { height as u32, ); - let src_stride = native.image_buf().plane_bytes_per_row(0); + let src_stride = plane0_stride; let dest_stride = ff_frame.stride(0); let src_bytes = bytes_lock.plane_data(0); @@ -248,7 +272,7 @@ impl CapturedFrameExt for CapturedFrame { height as u32, ); - let src_stride = native.image_buf().plane_bytes_per_row(0); + let src_stride = plane0_stride; let dest_stride = ff_frame.stride(0); let src_bytes = bytes_lock.plane_data(0); @@ -265,7 +289,6 @@ impl CapturedFrameExt for CapturedFrame { ff_frame } "y420" => { - let plane_count = native.image_buf().plane_count(); if plane_count < 3 { return Err(AsFFmpegError::InsufficientPlaneCount { format: "y420".to_string(), @@ -280,15 +303,13 @@ impl CapturedFrameExt for CapturedFrame { height as u32, ); - for plane in 0..3 { - let src_stride = native.image_buf().plane_bytes_per_row(plane); + for (plane, &(src_stride, plane_height, row_width)) in plane_info.iter().enumerate() + { let dest_stride = ff_frame.stride(plane); - let plane_height = native.image_buf().plane_height(plane); let src_bytes = bytes_lock.plane_data(plane); let dest_bytes = &mut ff_frame.data_mut(plane); - let row_width = native.image_buf().plane_width(plane); for y in 0..plane_height { let src_row = &src_bytes[y * src_stride..y * src_stride + row_width]; let dest_row = @@ -306,7 +327,7 @@ impl CapturedFrameExt for CapturedFrame { height as u32, ); - let src_stride = native.image_buf().plane_bytes_per_row(0); + let src_stride = plane0_stride; let dest_stride = ff_frame.stride(0); let src_bytes = bytes_lock.plane_data(0); @@ -334,7 +355,7 @@ impl CapturedFrameExt for CapturedFrame { let mut src_frame = ffmpeg::frame::Video::new(info.pixel, width as u32, height as u32); - let src_stride = native.image_buf().plane_bytes_per_row(0); + let src_stride = plane0_stride; let dest_stride = src_frame.stride(0); let src_bytes = bytes_lock.plane_data(0); let dest_bytes = &mut src_frame.data_mut(0); diff --git a/crates/camera/src/macos.rs b/crates/camera/src/macos.rs index cb2035030c..6a8902445d 100644 --- a/crates/camera/src/macos.rs +++ b/crates/camera/src/macos.rs @@ -99,15 +99,13 @@ pub(super) fn start_capturing_impl( let queue = dispatch::Queue::new(); let delegate = CallbackOutputDelegate::with(CallbackOutputDelegateInner::new(Box::new(move |data| { - let Some(image_buf) = data.sample_buf.image_buf() else { + if data.sample_buf.image_buf().is_none() { return; }; callback(CapturedFrame { - native: NativeCapturedFrame(image_buf.retained(), data.sample_buf.retained()), - // reference_time: Instant::now(), + native: NativeCapturedFrame(data.sample_buf.retained()), timestamp: data.timestamp, - // capture_begin_time: Some(data.capture_begin_time), }); }))); @@ -202,14 +200,14 @@ impl Debug for AVFoundationError { } #[derive(Debug, Clone)] -pub struct NativeCapturedFrame(arc::R, arc::R); +pub struct NativeCapturedFrame(arc::R); impl NativeCapturedFrame { - pub fn image_buf(&self) -> &arc::R { - &self.0 + pub fn image_buf(&self) -> Option> { + self.0.image_buf().map(|b| b.retained()) } pub fn sample_buf(&self) -> &arc::R { - &self.1 + &self.0 } } diff --git a/crates/enc-avfoundation/src/mp4.rs b/crates/enc-avfoundation/src/mp4.rs index 2a7510b0a7..e0be1560cc 100644 --- a/crates/enc-avfoundation/src/mp4.rs +++ b/crates/enc-avfoundation/src/mp4.rs @@ -1,7 +1,7 @@ use cap_media_info::{AudioInfo, VideoInfo}; use cidre::{cm::SampleTimingInfo, objc::Obj, *}; use ffmpeg::frame; -use std::{ops::Sub, path::PathBuf, time::Duration}; +use std::{path::PathBuf, time::Duration}; use tracing::*; // before pausing at all, subtract 0. @@ -16,12 +16,11 @@ pub struct MP4Encoder { asset_writer: arc::R, video_input: arc::R, audio_input: Option>, - most_recent_frame: Option<(arc::R, Duration)>, + last_frame_timestamp: Option, pause_timestamp: Option, timestamp_offset: Duration, is_writing: bool, is_paused: bool, - // elapsed_duration: cm::Time, video_frames_appended: usize, audio_frames_appended: usize, last_timestamp: Option, @@ -53,10 +52,14 @@ pub enum QueueFrameError { AppendError(arc::R), #[error("Failed")] Failed, + #[error("WriterFailed/{0}")] + WriterFailed(arc::R), #[error("Construct/{0}")] Construct(cidre::os::Error), #[error("NotReadyForMore")] NotReadyForMore, + #[error("NoEncoder")] + NoEncoder, } #[derive(thiserror::Error, Debug)] @@ -200,7 +203,7 @@ impl MP4Encoder { audio_input, asset_writer, video_input, - most_recent_frame: None, + last_frame_timestamp: None, pause_timestamp: None, timestamp_offset: Duration::ZERO, is_writing: false, @@ -234,7 +237,7 @@ impl MP4Encoder { .start_session_at_src_time(cm::Time::new(timestamp.as_millis() as i64, 1_000)); } - self.most_recent_frame = Some((frame.clone(), timestamp)); + self.last_frame_timestamp = Some(timestamp); if let Some(pause_timestamp) = self.pause_timestamp && let Some(gap) = timestamp.checked_sub(pause_timestamp) @@ -272,9 +275,10 @@ impl MP4Encoder { let mut timing = frame.timing_info(0).unwrap(); timing.pts = cm::Time::new(pts_duration.as_millis() as i64, 1_000); - let frame = frame.copy_with_new_timing(&[timing]).unwrap(); + let new_frame = frame.copy_with_new_timing(&[timing]).unwrap(); + drop(frame); - append_sample_buf(&mut self.video_input, &self.asset_writer, &frame)?; + append_sample_buf(&mut self.video_input, &self.asset_writer, &new_frame)?; self.video_frames_appended += 1; self.last_timestamp = Some(timestamp); @@ -294,7 +298,7 @@ impl MP4Encoder { } let Some(audio_input) = &mut self.audio_input else { - return Err(QueueFrameError::Failed); + return Err(QueueFrameError::NoEncoder); }; if let Some(pause_timestamp) = self.pause_timestamp @@ -449,6 +453,23 @@ impl MP4Encoder { } pub fn finish(&mut self, timestamp: Option) -> Result<(), FinishError> { + let writer = self.finish_start(timestamp)?; + wait_for_writer_finished(&writer)?; + info!("Finished writing"); + Ok(()) + } + + pub fn finish_nowait( + &mut self, + timestamp: Option, + ) -> Result, FinishError> { + self.finish_start(timestamp) + } + + fn finish_start( + &mut self, + timestamp: Option, + ) -> Result, FinishError> { if !self.is_writing { return Err(FinishError::NotWriting); } @@ -462,7 +483,7 @@ impl MP4Encoder { }); } - let Some(mut most_recent_frame) = self.most_recent_frame.take() else { + let Some(last_frame_ts) = self.last_frame_timestamp.take() else { warn!("Encoder attempted to finish with no frame"); return Err(FinishError::NoFrames); }; @@ -470,25 +491,14 @@ impl MP4Encoder { self.is_paused = false; self.pause_timestamp = None; - // We extend the video to the provided timestamp if possible - if let Some(timestamp) = finish_timestamp - && let Some(diff) = timestamp.checked_sub(most_recent_frame.1) - && diff > Duration::from_millis(500) - { - match self.queue_video_frame(most_recent_frame.0.clone(), timestamp) { - Ok(()) => { - most_recent_frame = (most_recent_frame.0, timestamp); - } - Err(e) => { - error!("Failed to queue final video frame: {e}"); - } - } - } + let end_timestamp = finish_timestamp.unwrap_or(last_frame_ts); self.is_writing = false; self.asset_writer.end_session_at_src_time(cm::Time::new( - most_recent_frame.1.sub(self.timestamp_offset).as_millis() as i64, + end_timestamp + .saturating_sub(self.timestamp_offset) + .as_millis() as i64, 1000, )); self.video_input.mark_as_finished(); @@ -501,11 +511,7 @@ impl MP4Encoder { debug!("Appended {} video frames", self.video_frames_appended); debug!("Appended {} audio frames", self.audio_frames_appended); - wait_for_writer_finished(&self.asset_writer).map_err(|_| FinishError::Failed)?; - - info!("Finished writing"); - - Ok(()) + Ok(self.asset_writer.clone()) } } @@ -644,7 +650,10 @@ fn append_sample_buf( Ok(true) => {} Ok(false) => { if writer.status() == av::asset::writer::Status::Failed { - return Err(QueueFrameError::Failed); + return Err(match writer.error() { + Some(err) => QueueFrameError::WriterFailed(err), + None => QueueFrameError::Failed, + }); } if writer.status() == av::asset::writer::Status::Writing { return Err(QueueFrameError::NotReadyForMore); @@ -656,13 +665,50 @@ fn append_sample_buf( Ok(()) } -fn wait_for_writer_finished(writer: &av::AssetWriter) -> Result<(), ()> { +const WRITER_FINISH_TIMEOUT: Duration = Duration::from_secs(10); +const WRITER_POLL_INTERVAL: Duration = Duration::from_millis(10); +const WRITER_LOG_INTERVAL: Duration = Duration::from_secs(2); + +pub fn wait_for_writer_finished(writer: &av::AssetWriter) -> Result<(), FinishError> { use av::asset::writer::Status; + use std::time::Instant; + + let start = Instant::now(); + let mut last_log = start; + loop { - match writer.status() { - Status::Completed | Status::Cancelled => return Ok(()), - Status::Failed | Status::Unknown => return Err(()), - Status::Writing => std::thread::sleep(Duration::from_millis(2)), + let status = writer.status(); + let elapsed = start.elapsed(); + + match status { + Status::Completed | Status::Cancelled => { + if elapsed > Duration::from_millis(100) { + info!("Writer finished after {:?}", elapsed); + } + return Ok(()); + } + Status::Failed | Status::Unknown => { + if let Some(err) = writer.error() { + error!("Writer failed with error: {:?}", err); + } + return Err(FinishError::Failed); + } + Status::Writing => { + if elapsed >= WRITER_FINISH_TIMEOUT { + error!( + "Writer timeout after {:?} - still in Writing state", + elapsed + ); + return Err(FinishError::Failed); + } + + if last_log.elapsed() >= WRITER_LOG_INTERVAL { + warn!("Writer still finalizing after {:?}...", elapsed); + last_log = Instant::now(); + } + + std::thread::sleep(WRITER_POLL_INTERVAL); + } } } } diff --git a/crates/enc-avfoundation/src/segmented.rs b/crates/enc-avfoundation/src/segmented.rs index c1053a960b..1c0cff38af 100644 --- a/crates/enc-avfoundation/src/segmented.rs +++ b/crates/enc-avfoundation/src/segmented.rs @@ -1,4 +1,4 @@ -use crate::{FinishError, InitError, MP4Encoder, QueueFrameError}; +use crate::{FinishError, InitError, MP4Encoder, QueueFrameError, wait_for_writer_finished}; use cap_media_info::{AudioInfo, VideoInfo}; use cidre::arc; use ffmpeg::frame; @@ -8,6 +8,7 @@ use std::{ path::{Path, PathBuf}, time::Duration, }; +use tracing::warn; fn atomic_write_json(path: &Path, data: &T) -> std::io::Result<()> { let temp_path = path.with_extension("json.tmp"); @@ -55,6 +56,7 @@ pub struct SegmentInfo { pub index: u32, pub duration: Duration, pub file_size: Option, + pub is_failed: bool, } #[derive(Serialize)] @@ -65,6 +67,8 @@ struct FragmentEntry { is_complete: bool, #[serde(skip_serializing_if = "Option::is_none")] file_size: Option, + #[serde(skip_serializing_if = "std::ops::Not::not")] + is_failed: bool, } const MANIFEST_VERSION: u32 = 2; @@ -127,7 +131,7 @@ impl SegmentedMP4Encoder { if let Some(encoder) = &mut self.current_encoder { encoder.queue_video_frame(frame, timestamp) } else { - Err(QueueFrameError::Failed) + Err(QueueFrameError::NoEncoder) } } @@ -139,7 +143,7 @@ impl SegmentedMP4Encoder { if let Some(encoder) = &mut self.current_encoder { encoder.queue_audio_frame(frame, timestamp) } else { - Err(QueueFrameError::Failed) + Err(QueueFrameError::NoEncoder) } } @@ -147,13 +151,30 @@ impl SegmentedMP4Encoder { let segment_start = self.segment_start_time.unwrap_or(Duration::ZERO); let segment_duration = timestamp.saturating_sub(segment_start); let completed_segment_path = self.current_segment_path(); + let current_index = self.current_index; if let Some(mut encoder) = self.current_encoder.take() { - if let Err(e) = encoder.finish(Some(timestamp)) { - tracing::warn!("Failed to finish encoder during rotation: {e}"); - } - - sync_file(&completed_segment_path); + let finish_failed = match encoder.finish_nowait(Some(timestamp)) { + Ok(writer) => { + let path_for_sync = completed_segment_path.clone(); + std::thread::spawn(move || { + if let Err(e) = wait_for_writer_finished(&writer) { + warn!( + "Background writer finalization failed for segment {current_index}: {e}" + ); + } + sync_file(&path_for_sync); + }); + false + } + Err(e) => { + tracing::error!( + "Failed to finish encoder during rotation for segment {}: {e}", + current_index + ); + true + } + }; let file_size = std::fs::metadata(&completed_segment_path) .ok() @@ -161,12 +182,20 @@ impl SegmentedMP4Encoder { self.completed_segments.push(SegmentInfo { path: completed_segment_path, - index: self.current_index, + index: current_index, duration: segment_duration, file_size, + is_failed: finish_failed, }); self.write_manifest(); + + if finish_failed { + tracing::warn!( + "Segment {} marked as failed in manifest, continuing with new segment", + current_index + ); + } } self.current_index += 1; @@ -180,7 +209,13 @@ impl SegmentedMP4Encoder { self.audio_config, self.output_height, ) - .map_err(|_| QueueFrameError::Failed)?, + .map_err(|e| { + tracing::error!( + "Failed to create new encoder for segment {}: {e}", + self.current_index + ); + QueueFrameError::Failed + })?, ); self.write_in_progress_manifest(); @@ -208,8 +243,9 @@ impl SegmentedMP4Encoder { .into_owned(), index: s.index, duration: s.duration.as_secs_f64(), - is_complete: true, + is_complete: !s.is_failed, file_size: s.file_size, + is_failed: s.is_failed, }) .collect(), total_duration: None, @@ -238,8 +274,9 @@ impl SegmentedMP4Encoder { .into_owned(), index: s.index, duration: s.duration.as_secs_f64(), - is_complete: true, + is_complete: !s.is_failed, file_size: s.file_size, + is_failed: s.is_failed, }) .collect(); @@ -254,6 +291,7 @@ impl SegmentedMP4Encoder { duration: 0.0, is_complete: false, file_size: None, + is_failed: false, }); let manifest = Manifest { @@ -287,22 +325,50 @@ impl SegmentedMP4Encoder { pub fn finish(&mut self, timestamp: Option) -> Result<(), FinishError> { let segment_path = self.current_segment_path(); let segment_start = self.segment_start_time; + let current_index = self.current_index; if let Some(mut encoder) = self.current_encoder.take() { - encoder.finish(timestamp)?; - - sync_file(&segment_path); - - if let Some(start) = segment_start { - let final_duration = timestamp.unwrap_or(start).saturating_sub(start); - let file_size = std::fs::metadata(&segment_path).ok().map(|m| m.len()); - - self.completed_segments.push(SegmentInfo { - path: segment_path, - index: self.current_index, - duration: final_duration, - file_size, - }); + match encoder.finish_nowait(timestamp) { + Ok(writer) => { + let path_for_sync = segment_path.clone(); + std::thread::spawn(move || { + if let Err(e) = wait_for_writer_finished(&writer) { + warn!( + "Background writer finalization failed for segment {current_index}: {e}" + ); + } + sync_file(&path_for_sync); + }); + + if let Some(start) = segment_start { + let final_duration = timestamp.unwrap_or(start).saturating_sub(start); + let file_size = std::fs::metadata(&segment_path).ok().map(|m| m.len()); + + self.completed_segments.push(SegmentInfo { + path: segment_path, + index: current_index, + duration: final_duration, + file_size, + is_failed: false, + }); + } + } + Err(e) => { + tracing::error!("Failed to finish final segment {current_index}: {e}"); + + if let Some(start) = segment_start { + let final_duration = timestamp.unwrap_or(start).saturating_sub(start); + let file_size = std::fs::metadata(&segment_path).ok().map(|m| m.len()); + + self.completed_segments.push(SegmentInfo { + path: segment_path, + index: current_index, + duration: final_duration, + file_size, + is_failed: true, + }); + } + } } } @@ -313,6 +379,17 @@ impl SegmentedMP4Encoder { fn finalize_manifest(&self) { let total_duration: Duration = self.completed_segments.iter().map(|s| s.duration).sum(); + let has_failed_segments = self.completed_segments.iter().any(|s| s.is_failed); + + if has_failed_segments { + tracing::warn!( + "Recording completed with {} failed segment(s)", + self.completed_segments + .iter() + .filter(|s| s.is_failed) + .count() + ); + } let manifest = Manifest { version: MANIFEST_VERSION, @@ -328,8 +405,9 @@ impl SegmentedMP4Encoder { .into_owned(), index: s.index, duration: s.duration.as_secs_f64(), - is_complete: true, + is_complete: !s.is_failed, file_size: s.file_size, + is_failed: s.is_failed, }) .collect(), total_duration: Some(total_duration.as_secs_f64()), diff --git a/crates/enc-ffmpeg/src/lib.rs b/crates/enc-ffmpeg/src/lib.rs index 2dd64f560d..d57097346c 100644 --- a/crates/enc-ffmpeg/src/lib.rs +++ b/crates/enc-ffmpeg/src/lib.rs @@ -13,3 +13,6 @@ pub mod remux; pub mod segmented_audio { pub use crate::mux::segmented_audio::*; } +pub mod fragmented_mp4 { + pub use crate::mux::fragmented_mp4::*; +} diff --git a/crates/enc-ffmpeg/src/mux/fragmented_mp4.rs b/crates/enc-ffmpeg/src/mux/fragmented_mp4.rs new file mode 100644 index 0000000000..20f1b034ef --- /dev/null +++ b/crates/enc-ffmpeg/src/mux/fragmented_mp4.rs @@ -0,0 +1,170 @@ +use cap_media_info::RawVideoFormat; +use ffmpeg::{format, frame}; +use std::{path::PathBuf, time::Duration}; +use tracing::*; + +use crate::{ + audio::AudioEncoder, + h264, + video::h264::{H264Encoder, H264EncoderError}, +}; + +pub struct FragmentedMP4File { + output: format::context::Output, + video: H264Encoder, + audio: Option>, + is_finished: bool, + has_frames: bool, +} + +#[derive(thiserror::Error, Debug)] +pub enum InitError { + #[error("{0:?}")] + Ffmpeg(ffmpeg::Error), + #[error("Video/{0}")] + VideoInit(H264EncoderError), + #[error("Audio/{0}")] + AudioInit(Box), + #[error("Failed to create output directory: {0}")] + CreateDirectory(std::io::Error), +} + +#[derive(thiserror::Error, Debug)] +pub enum FinishError { + #[error("Already finished")] + AlreadyFinished, + #[error("{0}")] + WriteTrailerFailed(ffmpeg::Error), +} + +pub struct FinishResult { + pub video_finish: Result<(), ffmpeg::Error>, + pub audio_finish: Result<(), ffmpeg::Error>, +} + +impl FragmentedMP4File { + pub fn init( + mut output_path: PathBuf, + video: impl FnOnce(&mut format::context::Output) -> Result, + audio: impl FnOnce( + &mut format::context::Output, + ) + -> Option, Box>>, + ) -> Result { + output_path.set_extension("mp4"); + + if let Some(parent) = output_path.parent() { + std::fs::create_dir_all(parent).map_err(InitError::CreateDirectory)?; + } + + let mut output = format::output_as(&output_path, "mp4").map_err(InitError::Ffmpeg)?; + + unsafe { + let opts = output.as_mut_ptr(); + let key = std::ffi::CString::new("movflags").unwrap(); + let value = + std::ffi::CString::new("frag_keyframe+empty_moov+default_base_moof").unwrap(); + ffmpeg::ffi::av_opt_set((*opts).priv_data, key.as_ptr(), value.as_ptr(), 0); + } + + trace!("Preparing encoders for fragmented mp4 file"); + + let video = video(&mut output).map_err(InitError::VideoInit)?; + let audio = audio(&mut output) + .transpose() + .map_err(InitError::AudioInit)?; + + info!("Prepared encoders for fragmented mp4 file"); + + output.write_header().map_err(InitError::Ffmpeg)?; + + Ok(Self { + output, + video, + audio, + is_finished: false, + has_frames: false, + }) + } + + pub fn video_format() -> RawVideoFormat { + RawVideoFormat::Yuv420p + } + + pub fn queue_video_frame( + &mut self, + frame: frame::Video, + timestamp: Duration, + ) -> Result<(), h264::QueueFrameError> { + if self.is_finished { + return Ok(()); + } + + self.has_frames = true; + self.video.queue_frame(frame, timestamp, &mut self.output) + } + + pub fn queue_audio_frame(&mut self, frame: frame::Audio) { + if self.is_finished { + return; + } + + let Some(audio) = &mut self.audio else { + return; + }; + + self.has_frames = true; + audio.send_frame(frame, &mut self.output); + } + + pub fn finish(&mut self) -> Result { + if self.is_finished { + return Err(FinishError::AlreadyFinished); + } + + self.is_finished = true; + + tracing::info!("FragmentedMP4File: Finishing encoding"); + + let video_finish = self.video.flush(&mut self.output).inspect_err(|e| { + error!("Failed to finish video encoder: {e:#}"); + }); + + let audio_finish = self + .audio + .as_mut() + .map(|enc| { + tracing::info!("FragmentedMP4File: Flushing audio encoder"); + enc.flush(&mut self.output).inspect_err(|e| { + error!("Failed to finish audio encoder: {e:#}"); + }) + }) + .unwrap_or(Ok(())); + + tracing::info!("FragmentedMP4File: Writing trailer"); + self.output + .write_trailer() + .map_err(FinishError::WriteTrailerFailed)?; + + Ok(FinishResult { + video_finish, + audio_finish, + }) + } + + pub fn video(&self) -> &H264Encoder { + &self.video + } + + pub fn video_mut(&mut self) -> &mut H264Encoder { + &mut self.video + } +} + +impl Drop for FragmentedMP4File { + fn drop(&mut self) { + if let Err(e) = self.finish() { + error!("Failed to finish FragmentedMP4File in Drop: {e}"); + } + } +} diff --git a/crates/enc-ffmpeg/src/mux/mod.rs b/crates/enc-ffmpeg/src/mux/mod.rs index bb986be77c..8ed8362d1b 100644 --- a/crates/enc-ffmpeg/src/mux/mod.rs +++ b/crates/enc-ffmpeg/src/mux/mod.rs @@ -1,4 +1,5 @@ pub mod fragmented_audio; +pub mod fragmented_mp4; pub mod mp4; pub mod ogg; pub mod segmented_audio; diff --git a/crates/recording/examples/memory-leak-detector.rs b/crates/recording/examples/memory-leak-detector.rs new file mode 100644 index 0000000000..0e77765732 --- /dev/null +++ b/crates/recording/examples/memory-leak-detector.rs @@ -0,0 +1,528 @@ +use cap_recording::{ + CameraFeed, MicrophoneFeed, + feeds::{ + camera::{self, DeviceOrModelID}, + microphone, + }, + screen_capture::ScreenCaptureTarget, +}; +use kameo::Actor; +use scap_targets::Display; +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; +use tracing::{info, warn}; + +const DEFAULT_DURATION_SECS: u64 = 120; + +#[cfg(target_os = "macos")] +fn get_memory_usage() -> Option { + use std::process::Command; + + let pid = std::process::id(); + + let ps_output = Command::new("ps") + .args(["-o", "rss=,vsz=", "-p", &pid.to_string()]) + .output() + .ok()?; + + let stdout = String::from_utf8_lossy(&ps_output.stdout); + let parts: Vec<&str> = stdout.split_whitespace().collect(); + + let (rss_mb, vsz_mb) = if parts.len() >= 2 { + let rss_kb: u64 = parts[0].parse().ok()?; + let vsz_kb: u64 = parts[1].parse().ok()?; + (rss_kb as f64 / 1024.0, vsz_kb as f64 / 1024.0) + } else { + return None; + }; + + let (footprint_mb, dirty_mb) = Command::new("footprint") + .arg(pid.to_string()) + .output() + .ok() + .filter(|o| o.status.success()) + .and_then(|output| { + let stdout = String::from_utf8_lossy(&output.stdout); + parse_footprint_values(&stdout) + }) + .unwrap_or((None, None)); + + Some(MemoryStats { + resident_mb: rss_mb, + virtual_mb: vsz_mb, + footprint_mb, + dirty_mb, + compressed_mb: None, + }) +} + +#[cfg(target_os = "macos")] +fn parse_footprint_values(output: &str) -> Option<(Option, Option)> { + let mut footprint_kb: Option = None; + let mut dirty_kb: Option = None; + + for line in output.lines() { + if line.contains("phys_footprint:") { + let parts: Vec<&str> = line.split_whitespace().collect(); + if parts.len() >= 2 { + footprint_kb = parse_size_kb(parts[1]); + } + } else if line.contains("TOTAL") && dirty_kb.is_none() { + let parts: Vec<&str> = line.split_whitespace().collect(); + if !parts.is_empty() { + dirty_kb = parse_size_kb(parts[0]); + } + } + } + + Some(( + footprint_kb.map(|v| v / 1024.0), + dirty_kb.map(|v| v / 1024.0), + )) +} + +#[cfg(target_os = "macos")] +fn parse_size_kb(s: &str) -> Option { + let s = s.trim(); + if s.ends_with("KB") || s.ends_with("kb") { + s.trim_end_matches("KB") + .trim_end_matches("kb") + .trim() + .parse() + .ok() + } else if s.ends_with("MB") || s.ends_with("mb") { + s.trim_end_matches("MB") + .trim_end_matches("mb") + .trim() + .parse::() + .ok() + .map(|v| v * 1024.0) + } else if s.ends_with("GB") || s.ends_with("gb") { + s.trim_end_matches("GB") + .trim_end_matches("gb") + .trim() + .parse::() + .ok() + .map(|v| v * 1024.0 * 1024.0) + } else if s.ends_with('B') || s.ends_with('b') { + s.trim_end_matches('B') + .trim_end_matches('b') + .trim() + .parse::() + .ok() + .map(|v| v / 1024.0) + } else { + s.parse().ok() + } +} + +#[cfg(not(target_os = "macos"))] +fn get_memory_usage() -> Option { + None +} + +#[derive(Debug, Clone, Copy)] +struct MemoryStats { + resident_mb: f64, + virtual_mb: f64, + footprint_mb: Option, + #[allow(dead_code)] + dirty_mb: Option, + #[allow(dead_code)] + compressed_mb: Option, +} + +impl MemoryStats { + fn primary_metric(&self) -> f64 { + self.resident_mb + } + + fn metric_name() -> &'static str { + "RSS" + } +} + +struct MemoryTracker { + samples: Vec<(Duration, MemoryStats)>, + start: Instant, + baseline: Option, +} + +impl MemoryTracker { + fn new() -> Self { + Self { + samples: Vec::new(), + start: Instant::now(), + baseline: get_memory_usage(), + } + } + + fn sample(&mut self) { + if let Some(stats) = get_memory_usage() { + self.samples.push((self.start.elapsed(), stats)); + } + } + + fn print_report(&self) { + println!("\n=== Memory Usage Report ===\n"); + + if let Some(baseline) = self.baseline { + println!( + "Baseline: {:.1} MB {} (Footprint: {:.1} MB)", + baseline.primary_metric(), + MemoryStats::metric_name(), + baseline.footprint_mb.unwrap_or(0.0) + ); + } + + if self.samples.len() < 2 { + println!("Not enough samples to analyze"); + return; + } + + let first = &self.samples[0]; + let last = &self.samples[self.samples.len() - 1]; + + let duration_secs = last.0.as_secs_f64() - first.0.as_secs_f64(); + let memory_growth = last.1.primary_metric() - first.1.primary_metric(); + let growth_rate = if duration_secs > 0.0 { + memory_growth / duration_secs + } else { + 0.0 + }; + + println!("\nMemory Timeline:"); + println!( + "{:>8} {:>12} {:>12} {:>12} {:>12}", + "Time(s)", "RSS(MB)", "Delta", "Footprint", "VSZ(MB)" + ); + println!("{:-<70}", ""); + + let mut prev_memory = first.1.primary_metric(); + for (time, stats) in &self.samples { + let current = stats.primary_metric(); + let delta = current - prev_memory; + let delta_str = if delta.abs() > 0.5 { + format!("{delta:+.1}") + } else { + "~0".to_string() + }; + println!( + "{:>8.1} {:>12.1} {:>12} {:>12.1} {:>12.1}", + time.as_secs_f64(), + current, + delta_str, + stats.footprint_mb.unwrap_or(0.0), + stats.virtual_mb + ); + prev_memory = current; + } + + println!("\n=== Summary ==="); + println!("Duration: {duration_secs:.1}s"); + println!("Start RSS: {:.1} MB", first.1.primary_metric()); + println!("End RSS: {:.1} MB", last.1.primary_metric()); + println!("Total growth: {memory_growth:.1} MB"); + println!( + "Growth rate: {:.2} MB/s ({:.1} MB/10s)", + growth_rate, + growth_rate * 10.0 + ); + + if growth_rate > 20.0 { + println!( + "\n*** SEVERE MEMORY LEAK: Growth rate > 20 MB/s ({:.0} MB/10s) ***", + growth_rate * 10.0 + ); + } else if growth_rate > 5.0 { + println!( + "\n*** MEMORY LEAK DETECTED: Growth rate > 5 MB/s ({:.0} MB/10s) ***", + growth_rate * 10.0 + ); + } else if growth_rate > 1.0 { + println!( + "\n*** POTENTIAL LEAK: Growth rate > 1 MB/s ({:.1} MB/10s) ***", + growth_rate * 10.0 + ); + } else { + println!("\n[OK] Memory appears stable (< 1 MB/s growth)"); + } + + println!( + "\nPeak RSS: {:.1} MB", + self.samples + .iter() + .map(|(_, s)| s.primary_metric()) + .fold(0.0_f64, |a, b| a.max(b)) + ); + } +} + +async fn run_memory_test( + duration_secs: u64, + include_camera: bool, + include_mic: bool, + fragmented: bool, +) -> Result<(), Box> { + println!("=== Cap Memory Leak Detector ===\n"); + println!("Configuration:"); + println!(" Duration: {duration_secs}s"); + println!(" Camera: {include_camera}"); + println!(" Microphone: {include_mic}"); + println!(" Fragmented MP4: {fragmented}"); + println!(); + + let mut memory_tracker = MemoryTracker::new(); + memory_tracker.sample(); + + let dir = tempfile::tempdir()?; + info!("Recording to: {}", dir.path().display()); + + let mut builder = cap_recording::studio_recording::Actor::builder( + dir.path().into(), + ScreenCaptureTarget::Display { + id: Display::primary().id(), + }, + ) + .with_fragmented(fragmented) + .with_system_audio(true); + + if include_camera { + if let Some(camera_info) = cap_camera::list_cameras().next() { + println!("Using camera: {}", camera_info.display_name()); + + let feed = CameraFeed::spawn(CameraFeed::default()); + + feed.ask(camera::SetInput { + id: DeviceOrModelID::from_info(&camera_info), + }) + .await? + .await?; + + tokio::time::sleep(Duration::from_millis(500)).await; + + let lock = feed.ask(camera::Lock).await?; + builder = builder.with_camera_feed(Arc::new(lock)); + } else { + warn!("No camera found"); + } + } + + if include_mic { + if let Some((mic_name, _, _)) = MicrophoneFeed::default_device() { + println!("Using microphone: {mic_name}"); + + let error_sender = flume::unbounded().0; + let mic_feed = MicrophoneFeed::spawn(MicrophoneFeed::new(error_sender)); + + mic_feed + .ask(microphone::SetInput { + label: mic_name.clone(), + }) + .await? + .await?; + + tokio::time::sleep(Duration::from_millis(500)).await; + + let mic_lock = mic_feed.ask(microphone::Lock).await?; + builder = builder.with_mic_feed(Arc::new(mic_lock)); + } else { + warn!("No microphone found"); + } + } + + memory_tracker.sample(); + println!("\nStarting recording..."); + let start = Instant::now(); + + let handle = builder + .build( + #[cfg(target_os = "macos")] + cidre::sc::ShareableContent::current().await?, + ) + .await?; + + let sample_interval = Duration::from_secs(5); + let mut next_sample = start + sample_interval; + + while start.elapsed() < Duration::from_secs(duration_secs) { + tokio::time::sleep(Duration::from_millis(100)).await; + + if Instant::now() >= next_sample { + memory_tracker.sample(); + let current = get_memory_usage(); + if let Some(stats) = current { + println!( + "[{:>5.1}s] RSS: {:.1} MB, Footprint: {:.1} MB, VSZ: {:.1} MB", + start.elapsed().as_secs_f64(), + stats.resident_mb, + stats.footprint_mb.unwrap_or(0.0), + stats.virtual_mb + ); + } + next_sample = Instant::now() + sample_interval; + } + } + + println!("\nStopping recording..."); + memory_tracker.sample(); + + let stop_start = Instant::now(); + let result = handle.stop().await?; + let stop_duration = stop_start.elapsed(); + + memory_tracker.sample(); + + println!("Stop took: {stop_duration:?}"); + println!("Output path: {}", result.project_path.display()); + + memory_tracker.print_report(); + + std::mem::forget(dir); + + Ok(()) +} + +async fn run_camera_only_test(duration_secs: u64) -> Result<(), Box> { + println!("=== Camera Only Test (no encoding) ===\n"); + + let mut memory_tracker = MemoryTracker::new(); + memory_tracker.sample(); + + if let Some(camera_info) = cap_camera::list_cameras().next() { + println!("Testing camera: {}", camera_info.display_name()); + + let feed = CameraFeed::spawn(CameraFeed::default()); + + let (frame_tx, frame_rx) = flume::bounded::(128); + + feed.ask(camera::AddNativeSender(frame_tx)).await?; + + feed.ask(camera::SetInput { + id: DeviceOrModelID::from_info(&camera_info), + }) + .await? + .await?; + + let start = Instant::now(); + let sample_interval = Duration::from_secs(5); + let mut next_sample = start + sample_interval; + let mut frame_count = 0u64; + + while start.elapsed() < Duration::from_secs(duration_secs) { + match frame_rx.try_recv() { + Ok(_frame) => { + frame_count += 1; + } + Err(flume::TryRecvError::Empty) => { + tokio::time::sleep(Duration::from_millis(1)).await; + } + Err(flume::TryRecvError::Disconnected) => break, + } + + if Instant::now() >= next_sample { + memory_tracker.sample(); + let current = get_memory_usage(); + let queue_len = frame_rx.len(); + if let Some(stats) = current { + println!( + "[{:>5.1}s] RSS: {:.1} MB, Footprint: {:.1} MB, Frames: {}, Queue: {}", + start.elapsed().as_secs_f64(), + stats.resident_mb, + stats.footprint_mb.unwrap_or(0.0), + frame_count, + queue_len + ); + } + next_sample = Instant::now() + sample_interval; + } + } + + feed.ask(camera::RemoveInput).await?; + } else { + println!("No camera found"); + } + + memory_tracker.print_report(); + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + unsafe { std::env::set_var("RUST_LOG", "info,cap_recording=debug") }; + tracing_subscriber::fmt::init(); + + let args: Vec = std::env::args().collect(); + + let duration = args + .iter() + .position(|a| a == "--duration") + .and_then(|i| args.get(i + 1)) + .and_then(|s| s.parse().ok()) + .unwrap_or(DEFAULT_DURATION_SECS); + + let mode = args + .iter() + .position(|a| a == "--mode") + .and_then(|i| args.get(i + 1).map(|s| s.as_str())) + .unwrap_or("full"); + + let include_camera = !args.contains(&"--no-camera".to_string()); + let include_mic = !args.contains(&"--no-mic".to_string()); + let fragmented = !args.contains(&"--no-fragmented".to_string()); + + match mode { + "full" => { + run_memory_test(duration, include_camera, include_mic, fragmented).await?; + } + "screen-only" => { + run_memory_test(duration, false, false, fragmented).await?; + } + "no-fragmented" => { + run_memory_test(duration, include_camera, include_mic, false).await?; + } + "camera-only" => { + run_camera_only_test(duration).await?; + } + "compare" => { + println!("=== Comparison Test ===\n"); + println!("First: Testing WITHOUT fragmented MP4...\n"); + run_memory_test(60, include_camera, include_mic, false).await?; + + println!("\n\n====================================\n"); + println!("Second: Testing WITH fragmented MP4...\n"); + run_memory_test(60, include_camera, include_mic, true).await?; + } + _ => { + println!("Cap Memory Leak Detector"); + println!(); + println!("Usage: memory-leak-detector [OPTIONS]"); + println!(); + println!("Options:"); + println!(" --duration Test duration (default: {DEFAULT_DURATION_SECS})"); + println!(" --mode Test mode:"); + println!(" full Full recording pipeline with fragmented MP4 (default)"); + println!(" screen-only Screen recording only (no camera/mic)"); + println!(" no-fragmented Full recording without fragmented MP4"); + println!(" camera-only Camera feed only (no encoding)"); + println!(" compare Run both fragmented and non-fragmented for comparison"); + println!(" --no-camera Disable camera"); + println!(" --no-mic Disable microphone"); + println!(" --no-fragmented Disable fragmented MP4 encoding"); + println!(); + println!("Examples:"); + println!(" # Test full pipeline with camera, mic, fragmented MP4 for 2 minutes"); + println!(" cargo run --example memory-leak-detector -- --duration 120"); + println!(); + println!(" # Test screen-only to isolate the leak"); + println!(" cargo run --example memory-leak-detector -- --mode screen-only"); + println!(); + println!(" # Compare fragmented vs non-fragmented"); + println!(" cargo run --example memory-leak-detector -- --mode compare"); + } + } + + Ok(()) +} diff --git a/crates/recording/src/capture_pipeline.rs b/crates/recording/src/capture_pipeline.rs index 7f53cf99a2..1caa3afe8e 100644 --- a/crates/recording/src/capture_pipeline.rs +++ b/crates/recording/src/capture_pipeline.rs @@ -6,9 +6,7 @@ use crate::{ }; #[cfg(target_os = "macos")] -use crate::output_pipeline::{ - FragmentedAVFoundationMp4Muxer, FragmentedAVFoundationMp4MuxerConfig, -}; +use crate::output_pipeline::{MacOSSegmentedMuxer, MacOSSegmentedMuxerConfig}; use anyhow::anyhow; use cap_timestamp::Timestamps; use std::{path::PathBuf, sync::Arc}; @@ -88,9 +86,7 @@ impl MakeCapturePipeline for screen_capture::CMSampleBufferCapture { OutputPipeline::builder(fragments_dir) .with_video::(screen_capture) .with_timestamps(start_time) - .build::( - FragmentedAVFoundationMp4MuxerConfig::default(), - ) + .build::(MacOSSegmentedMuxerConfig::default()) .await } else { OutputPipeline::builder(output_path.clone()) diff --git a/crates/recording/src/feeds/camera.rs b/crates/recording/src/feeds/camera.rs index 5d06df067b..93f7c72165 100644 --- a/crates/recording/src/feeds/camera.rs +++ b/crates/recording/src/feeds/camera.rs @@ -477,7 +477,7 @@ async fn setup_camera( let _ = native_recipient .tell(NewNativeFrame(NativeCameraFrame { - sample_buf: frame.native().sample_buf().retained(), + sample_buf: frame.native().sample_buf().clone(), timestamp, })) .try_send(); diff --git a/crates/recording/src/output_pipeline/fragmented.rs b/crates/recording/src/output_pipeline/fragmented.rs index 4ced9f28ca..a5dd3d933a 100644 --- a/crates/recording/src/output_pipeline/fragmented.rs +++ b/crates/recording/src/output_pipeline/fragmented.rs @@ -44,7 +44,7 @@ impl Default for FragmentedAVFoundationMp4MuxerConfig { #[cfg(target_os = "macos")] impl FragmentedAVFoundationMp4Muxer { - const MAX_QUEUE_RETRIES: u32 = 500; + const MAX_QUEUE_RETRIES: u32 = 1500; } #[cfg(target_os = "macos")] @@ -190,7 +190,7 @@ impl Default for FragmentedAVFoundationCameraMuxerConfig { #[cfg(target_os = "macos")] impl FragmentedAVFoundationCameraMuxer { - const MAX_QUEUE_RETRIES: u32 = 500; + const MAX_QUEUE_RETRIES: u32 = 1500; } #[cfg(target_os = "macos")] diff --git a/crates/recording/src/output_pipeline/macos.rs b/crates/recording/src/output_pipeline/macos.rs index 936b730098..f1d12fa91e 100644 --- a/crates/recording/src/output_pipeline/macos.rs +++ b/crates/recording/src/output_pipeline/macos.rs @@ -35,7 +35,7 @@ pub struct AVFoundationMp4Muxer( ); impl AVFoundationMp4Muxer { - const MAX_QUEUE_RETRIES: u32 = 500; + const MAX_QUEUE_RETRIES: u32 = 1500; } #[derive(Default)] @@ -146,7 +146,7 @@ pub struct AVFoundationCameraMuxer( ); impl AVFoundationCameraMuxer { - const MAX_QUEUE_RETRIES: u32 = 500; + const MAX_QUEUE_RETRIES: u32 = 1500; } #[derive(Default)] diff --git a/crates/recording/src/output_pipeline/macos_segmented_ffmpeg.rs b/crates/recording/src/output_pipeline/macos_segmented_ffmpeg.rs new file mode 100644 index 0000000000..ad128644d9 --- /dev/null +++ b/crates/recording/src/output_pipeline/macos_segmented_ffmpeg.rs @@ -0,0 +1,746 @@ +use crate::{AudioFrame, AudioMuxer, Muxer, TaskPool, VideoMuxer, fragmentation, screen_capture}; +use anyhow::{Context, anyhow}; +use cap_media_info::{AudioInfo, VideoInfo}; +use serde::Serialize; +use std::{ + path::PathBuf, + sync::{ + Arc, Mutex, + atomic::{AtomicBool, Ordering}, + mpsc::{SyncSender, sync_channel}, + }, + thread::JoinHandle, + time::Duration, +}; +use tracing::*; + +#[derive(Debug, Clone)] +pub struct SegmentInfo { + pub path: PathBuf, + pub index: u32, + pub duration: Duration, + pub file_size: Option, +} + +#[derive(Serialize)] +struct FragmentEntry { + path: String, + index: u32, + duration: f64, + is_complete: bool, + #[serde(skip_serializing_if = "Option::is_none")] + file_size: Option, +} + +const MANIFEST_VERSION: u32 = 2; + +#[derive(Serialize)] +struct Manifest { + version: u32, + fragments: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + total_duration: Option, + is_complete: bool, +} + +struct SegmentState { + video_tx: SyncSender, Duration)>>, + output: Arc>, + encoder_handle: Option>>, +} + +struct PauseTracker { + flag: Arc, + paused_at: Option, + offset: Duration, +} + +struct FrameDropTracker { + count: u32, + last_warning: std::time::Instant, +} + +impl FrameDropTracker { + fn new() -> Self { + Self { + count: 0, + last_warning: std::time::Instant::now(), + } + } + + fn record_drop(&mut self) { + self.count += 1; + if self.count >= 30 && self.last_warning.elapsed() > Duration::from_secs(5) { + warn!( + "Dropped {} screen frames due to encoder backpressure", + self.count + ); + self.count = 0; + self.last_warning = std::time::Instant::now(); + } + } + + fn reset(&mut self) { + if self.count > 0 { + trace!("Frame drop count at segment boundary: {}", self.count); + } + self.count = 0; + } +} + +impl PauseTracker { + fn new(flag: Arc) -> Self { + Self { + flag, + paused_at: None, + offset: Duration::ZERO, + } + } + + fn adjust(&mut self, timestamp: Duration) -> anyhow::Result> { + if self.flag.load(Ordering::Relaxed) { + if self.paused_at.is_none() { + self.paused_at = Some(timestamp); + } + return Ok(None); + } + + if let Some(start) = self.paused_at.take() { + let delta = timestamp.checked_sub(start).ok_or_else(|| { + anyhow!( + "Frame timestamp went backward during unpause (resume={start:?}, current={timestamp:?})" + ) + })?; + + self.offset = self.offset.checked_add(delta).ok_or_else(|| { + anyhow!( + "Pause offset overflow (offset={:?}, delta={delta:?})", + self.offset + ) + })?; + } + + let adjusted = timestamp.checked_sub(self.offset).ok_or_else(|| { + anyhow!( + "Adjusted timestamp underflow (timestamp={timestamp:?}, offset={:?})", + self.offset + ) + })?; + + Ok(Some(adjusted)) + } +} + +pub struct MacOSSegmentedMuxer { + base_path: PathBuf, + segment_duration: Duration, + current_index: u32, + segment_start_time: Option, + completed_segments: Vec, + pending_segments: Arc>>, + + current_state: Option, + + video_config: VideoInfo, + + pause: PauseTracker, + frame_drops: FrameDropTracker, +} + +pub struct MacOSSegmentedMuxerConfig { + pub segment_duration: Duration, +} + +impl Default for MacOSSegmentedMuxerConfig { + fn default() -> Self { + Self { + segment_duration: Duration::from_secs(3), + } + } +} + +impl Muxer for MacOSSegmentedMuxer { + type Config = MacOSSegmentedMuxerConfig; + + async fn setup( + config: Self::Config, + output_path: PathBuf, + video_config: Option, + _audio_config: Option, + pause_flag: Arc, + _tasks: &mut TaskPool, + ) -> anyhow::Result + where + Self: Sized, + { + let video_config = + video_config.ok_or_else(|| anyhow!("invariant: video config expected"))?; + + std::fs::create_dir_all(&output_path) + .with_context(|| format!("Failed to create segments directory: {output_path:?}"))?; + + Ok(Self { + base_path: output_path, + segment_duration: config.segment_duration, + current_index: 0, + segment_start_time: None, + completed_segments: Vec::new(), + pending_segments: Arc::new(Mutex::new(Vec::new())), + current_state: None, + video_config, + pause: PauseTracker::new(pause_flag), + frame_drops: FrameDropTracker::new(), + }) + } + + fn stop(&mut self) { + if let Some(state) = &self.current_state + && let Err(e) = state.video_tx.send(None) + { + trace!("Screen encoder channel already closed during stop: {e}"); + } + } + + fn finish(&mut self, timestamp: Duration) -> anyhow::Result> { + self.collect_pending_segments(); + + let segment_path = self.current_segment_path(); + let segment_start = self.segment_start_time; + let current_index = self.current_index; + + if let Some(mut state) = self.current_state.take() { + if let Err(e) = state.video_tx.send(None) { + trace!("Screen encoder channel already closed during finish: {e}"); + } + + if let Some(handle) = state.encoder_handle.take() { + let timeout = Duration::from_secs(5); + let start = std::time::Instant::now(); + loop { + if handle.is_finished() { + if let Err(panic_payload) = handle.join() { + warn!( + "Screen encoder thread panicked during finish: {:?}", + panic_payload + ); + } + break; + } + if start.elapsed() > timeout { + warn!( + "Screen encoder thread did not finish within {:?}, abandoning", + timeout + ); + break; + } + std::thread::sleep(Duration::from_millis(50)); + } + } + + if let Ok(mut output) = state.output.lock() + && let Err(e) = output.write_trailer() + { + warn!("Failed to write trailer for segment {current_index}: {e}"); + } + + fragmentation::sync_file(&segment_path); + + if let Some(start) = segment_start { + let final_duration = timestamp.saturating_sub(start); + let file_size = std::fs::metadata(&segment_path).ok().map(|m| m.len()); + + self.completed_segments.push(SegmentInfo { + path: segment_path, + index: current_index, + duration: final_duration, + file_size, + }); + } + } + + self.finalize_manifest(); + + Ok(Ok(())) + } +} + +impl MacOSSegmentedMuxer { + fn current_segment_path(&self) -> PathBuf { + self.base_path + .join(format!("fragment_{:03}.mp4", self.current_index)) + } + + fn write_manifest(&self) { + let manifest = Manifest { + version: MANIFEST_VERSION, + fragments: self + .completed_segments + .iter() + .map(|s| FragmentEntry { + path: s + .path + .file_name() + .unwrap_or_default() + .to_string_lossy() + .into_owned(), + index: s.index, + duration: s.duration.as_secs_f64(), + is_complete: true, + file_size: s.file_size, + }) + .collect(), + total_duration: None, + is_complete: false, + }; + + let manifest_path = self.base_path.join("manifest.json"); + if let Err(e) = fragmentation::atomic_write_json(&manifest_path, &manifest) { + warn!( + "Failed to write manifest to {}: {e}", + manifest_path.display() + ); + } + } + + fn finalize_manifest(&self) { + let total_duration: Duration = self.completed_segments.iter().map(|s| s.duration).sum(); + + let manifest = Manifest { + version: MANIFEST_VERSION, + fragments: self + .completed_segments + .iter() + .map(|s| FragmentEntry { + path: s + .path + .file_name() + .unwrap_or_default() + .to_string_lossy() + .into_owned(), + index: s.index, + duration: s.duration.as_secs_f64(), + is_complete: true, + file_size: s.file_size, + }) + .collect(), + total_duration: Some(total_duration.as_secs_f64()), + is_complete: true, + }; + + let manifest_path = self.base_path.join("manifest.json"); + if let Err(e) = fragmentation::atomic_write_json(&manifest_path, &manifest) { + warn!( + "Failed to write final manifest to {}: {e}", + manifest_path.display() + ); + } + } + + fn collect_pending_segments(&mut self) { + if let Ok(mut pending) = self.pending_segments.lock() { + for segment in pending.drain(..) { + self.completed_segments.push(segment); + } + self.completed_segments.sort_by_key(|s| s.index); + } + } + + fn create_segment(&mut self) -> anyhow::Result<()> { + let segment_path = self.current_segment_path(); + + let (video_tx, video_rx) = + sync_channel::, Duration)>>(8); + let (ready_tx, ready_rx) = sync_channel::>(1); + let output = ffmpeg::format::output(&segment_path)?; + let output = Arc::new(Mutex::new(output)); + + let video_config = self.video_config; + let output_clone = output.clone(); + + let encoder_handle = std::thread::Builder::new() + .name(format!("segment-encoder-{}", self.current_index)) + .spawn(move || { + let encoder = (|| { + let mut output_guard = match output_clone.lock() { + Ok(guard) => guard, + Err(poisoned) => { + return Err(anyhow!( + "MacOSSegmentedEncoder: failed to lock output mutex: {}", + poisoned + )); + } + }; + + cap_enc_ffmpeg::h264::H264Encoder::builder(video_config) + .build(&mut output_guard) + .map_err(|e| anyhow!("MacOSSegmentedEncoder/{e}")) + })(); + + let mut encoder = match encoder { + Ok(encoder) => { + if ready_tx.send(Ok(())).is_err() { + error!("Failed to send ready signal - receiver dropped"); + return Ok(()); + } + encoder + } + Err(e) => { + error!("Encoder setup failed: {:#}", e); + if let Err(send_err) = ready_tx.send(Err(anyhow!("{e}"))) { + error!("failed to send ready_tx error: {send_err}"); + } + return Err(anyhow!("{e}")); + } + }; + + let mut first_timestamp: Option = None; + + while let Ok(Some((sample_buf, timestamp))) = video_rx.recv() { + let Ok(mut output) = output_clone.lock() else { + continue; + }; + + let relative = if let Some(first) = first_timestamp { + timestamp.checked_sub(first).unwrap_or(Duration::ZERO) + } else { + first_timestamp = Some(timestamp); + Duration::ZERO + }; + + let frame = sample_buf_to_ffmpeg_frame(&sample_buf); + + match frame { + Ok(frame) => { + if let Err(e) = encoder.queue_frame(frame, relative, &mut output) { + warn!("Failed to encode frame: {e}"); + } + } + Err(e) => { + warn!("Failed to convert frame: {e:?}"); + } + } + } + + if let Ok(mut output) = output_clone.lock() + && let Err(e) = encoder.flush(&mut output) + { + warn!("Failed to flush encoder: {e}"); + } + + drop(encoder); + + Ok(()) + })?; + + ready_rx + .recv() + .map_err(|_| anyhow!("Encoder thread ended unexpectedly"))??; + + output + .lock() + .map_err(|_| anyhow!("output mutex poisoned when writing header"))? + .write_header()?; + + self.current_state = Some(SegmentState { + video_tx, + output, + encoder_handle: Some(encoder_handle), + }); + + Ok(()) + } + + fn rotate_segment(&mut self, timestamp: Duration) -> anyhow::Result<()> { + self.collect_pending_segments(); + + let segment_start = self.segment_start_time.unwrap_or(Duration::ZERO); + let segment_duration = timestamp.saturating_sub(segment_start); + let completed_segment_path = self.current_segment_path(); + let current_index = self.current_index; + + if let Some(mut state) = self.current_state.take() { + if let Err(e) = state.video_tx.send(None) { + trace!("Screen encoder channel already closed during rotation: {e}"); + } + + let output = state.output.clone(); + let encoder_handle = state.encoder_handle.take(); + let path_for_sync = completed_segment_path.clone(); + let pending_segments = self.pending_segments.clone(); + + std::thread::spawn(move || { + if let Some(handle) = encoder_handle { + let timeout = Duration::from_secs(5); + let start = std::time::Instant::now(); + loop { + if handle.is_finished() { + if let Err(panic_payload) = handle.join() { + warn!( + "Screen encoder thread panicked during rotation: {:?}", + panic_payload + ); + } + break; + } + if start.elapsed() > timeout { + warn!( + "Screen encoder thread did not finish within {:?} during rotation, abandoning", + timeout + ); + break; + } + std::thread::sleep(Duration::from_millis(50)); + } + } + + if let Ok(mut output) = output.lock() + && let Err(e) = output.write_trailer() + { + warn!("Failed to write trailer for segment {current_index}: {e}"); + } + + fragmentation::sync_file(&path_for_sync); + + let file_size = std::fs::metadata(&path_for_sync).ok().map(|m| m.len()); + + if let Ok(mut pending) = pending_segments.lock() { + pending.push(SegmentInfo { + path: path_for_sync, + index: current_index, + duration: segment_duration, + file_size, + }); + } + }); + + self.write_manifest(); + } + + self.frame_drops.reset(); + self.current_index += 1; + self.segment_start_time = Some(timestamp); + + self.create_segment()?; + self.write_in_progress_manifest(); + + info!( + "Rotated to segment {} at {:?}", + self.current_index, timestamp + ); + + Ok(()) + } + + fn write_in_progress_manifest(&self) { + let mut fragments: Vec = self + .completed_segments + .iter() + .map(|s| FragmentEntry { + path: s + .path + .file_name() + .unwrap_or_default() + .to_string_lossy() + .into_owned(), + index: s.index, + duration: s.duration.as_secs_f64(), + is_complete: true, + file_size: s.file_size, + }) + .collect(); + + fragments.push(FragmentEntry { + path: self + .current_segment_path() + .file_name() + .unwrap_or_default() + .to_string_lossy() + .into_owned(), + index: self.current_index, + duration: 0.0, + is_complete: false, + file_size: None, + }); + + let manifest = Manifest { + version: MANIFEST_VERSION, + fragments, + total_duration: None, + is_complete: false, + }; + + let manifest_path = self.base_path.join("manifest.json"); + if let Err(e) = fragmentation::atomic_write_json(&manifest_path, &manifest) { + warn!( + "Failed to write in-progress manifest to {}: {e}", + manifest_path.display() + ); + } + } +} + +impl VideoMuxer for MacOSSegmentedMuxer { + type VideoFrame = screen_capture::VideoFrame; + + fn send_video_frame( + &mut self, + frame: Self::VideoFrame, + timestamp: Duration, + ) -> anyhow::Result<()> { + let Some(adjusted_timestamp) = self.pause.adjust(timestamp)? else { + return Ok(()); + }; + + if self.current_state.is_none() { + self.segment_start_time = Some(adjusted_timestamp); + self.create_segment()?; + self.write_in_progress_manifest(); + } + + if self.segment_start_time.is_none() { + self.segment_start_time = Some(adjusted_timestamp); + } + + let segment_elapsed = + adjusted_timestamp.saturating_sub(self.segment_start_time.unwrap_or(Duration::ZERO)); + + if segment_elapsed >= self.segment_duration { + self.rotate_segment(adjusted_timestamp)?; + } + + if let Some(state) = &self.current_state + && let Err(e) = state + .video_tx + .try_send(Some((frame.sample_buf, adjusted_timestamp))) + { + match e { + std::sync::mpsc::TrySendError::Full(_) => { + self.frame_drops.record_drop(); + } + std::sync::mpsc::TrySendError::Disconnected(_) => { + trace!("Screen encoder channel disconnected"); + } + } + } + + Ok(()) + } +} + +impl AudioMuxer for MacOSSegmentedMuxer { + fn send_audio_frame(&mut self, _frame: AudioFrame, _timestamp: Duration) -> anyhow::Result<()> { + Ok(()) + } +} + +fn sample_buf_to_ffmpeg_frame( + sample_buf: &cidre::cm::SampleBuf, +) -> Result { + use cidre::cv::{self, pixel_buffer::LockFlags}; + + let Some(image_buf_ref) = sample_buf.image_buf() else { + return Err(SampleBufConversionError::NoImageBuffer); + }; + let mut image_buf = image_buf_ref.retained(); + + let width = image_buf.width(); + let height = image_buf.height(); + let pixel_format = image_buf.pixel_format(); + let plane0_stride = image_buf.plane_bytes_per_row(0); + let plane1_stride = image_buf.plane_bytes_per_row(1); + + let bytes_lock = BaseAddrLockGuard::lock(image_buf.as_mut(), LockFlags::READ_ONLY) + .map_err(SampleBufConversionError::BaseAddrLock)?; + + Ok(match pixel_format { + cv::PixelFormat::_420V => { + let mut ff_frame = + ffmpeg::frame::Video::new(ffmpeg::format::Pixel::NV12, width as u32, height as u32); + + let src_stride = plane0_stride; + let dest_stride = ff_frame.stride(0); + + let src_bytes = bytes_lock.plane_data(0); + let dest_bytes = &mut ff_frame.data_mut(0); + + for y in 0..height { + let row_width = width; + let src_row = &src_bytes[y * src_stride..y * src_stride + row_width]; + let dest_row = &mut dest_bytes[y * dest_stride..y * dest_stride + row_width]; + + dest_row.copy_from_slice(src_row); + } + + let src_stride = plane1_stride; + let dest_stride = ff_frame.stride(1); + + let src_bytes = bytes_lock.plane_data(1); + let dest_bytes = &mut ff_frame.data_mut(1); + + for y in 0..height / 2 { + let row_width = width; + let src_row = &src_bytes[y * src_stride..y * src_stride + row_width]; + let dest_row = &mut dest_bytes[y * dest_stride..y * dest_stride + row_width]; + + dest_row.copy_from_slice(src_row); + } + + ff_frame + } + cv::PixelFormat::_32_BGRA => { + let mut ff_frame = + ffmpeg::frame::Video::new(ffmpeg::format::Pixel::BGRA, width as u32, height as u32); + + let src_stride = plane0_stride; + let dest_stride = ff_frame.stride(0); + + let src_bytes = bytes_lock.plane_data(0); + let dest_bytes = &mut ff_frame.data_mut(0); + + for y in 0..height { + let row_width = width * 4; + let src_row = &src_bytes[y * src_stride..y * src_stride + row_width]; + let dest_row = &mut dest_bytes[y * dest_stride..y * dest_stride + row_width]; + + dest_row.copy_from_slice(src_row); + } + + ff_frame + } + format => return Err(SampleBufConversionError::UnsupportedFormat(format)), + }) +} + +#[derive(Debug)] +pub enum SampleBufConversionError { + UnsupportedFormat(cidre::cv::PixelFormat), + BaseAddrLock(cidre::os::Error), + NoImageBuffer, +} + +struct BaseAddrLockGuard<'a>( + &'a mut cidre::cv::ImageBuf, + cidre::cv::pixel_buffer::LockFlags, +); + +impl<'a> BaseAddrLockGuard<'a> { + fn lock( + image_buf: &'a mut cidre::cv::ImageBuf, + flags: cidre::cv::pixel_buffer::LockFlags, + ) -> cidre::os::Result { + unsafe { image_buf.lock_base_addr(flags) }.result()?; + Ok(Self(image_buf, flags)) + } + + fn plane_data(&self, index: usize) -> &[u8] { + let base_addr = self.0.plane_base_address(index); + let plane_size = self.0.plane_bytes_per_row(index); + unsafe { std::slice::from_raw_parts(base_addr, plane_size * self.0.plane_height(index)) } + } +} + +impl Drop for BaseAddrLockGuard<'_> { + fn drop(&mut self) { + let _ = unsafe { self.0.unlock_lock_base_addr(self.1) }; + } +} diff --git a/crates/recording/src/output_pipeline/mod.rs b/crates/recording/src/output_pipeline/mod.rs index 8087f6a400..bf1df859ef 100644 --- a/crates/recording/src/output_pipeline/mod.rs +++ b/crates/recording/src/output_pipeline/mod.rs @@ -3,12 +3,16 @@ mod core; pub mod ffmpeg; #[cfg(target_os = "macos")] mod fragmented; +#[cfg(target_os = "macos")] +mod macos_segmented_ffmpeg; pub use async_camera::*; pub use core::*; pub use ffmpeg::*; #[cfg(target_os = "macos")] pub use fragmented::*; +#[cfg(target_os = "macos")] +pub use macos_segmented_ffmpeg::*; #[cfg(target_os = "macos")] mod macos; diff --git a/crates/recording/src/sources/native_camera.rs b/crates/recording/src/sources/native_camera.rs index 128b015bfe..4a93001cb8 100644 --- a/crates/recording/src/sources/native_camera.rs +++ b/crates/recording/src/sources/native_camera.rs @@ -29,7 +29,7 @@ impl VideoSource for NativeCamera { where Self: Sized, { - let (tx, rx) = flume::bounded(256); + let (tx, rx) = flume::bounded(8); feed_lock .ask(camera::AddNativeSender(tx)) diff --git a/crates/recording/src/sources/screen_capture/macos.rs b/crates/recording/src/sources/screen_capture/macos.rs index 2f0566a949..51916b5e25 100644 --- a/crates/recording/src/sources/screen_capture/macos.rs +++ b/crates/recording/src/sources/screen_capture/macos.rs @@ -174,7 +174,10 @@ impl ScreenCaptureConfig { match &frame { scap_screencapturekit::Frame::Screen(frame) => { - if frame.image_buf().height() == 0 || frame.image_buf().width() == 0 { + let Some(image_buf) = frame.image_buf() else { + return; + }; + if image_buf.height() == 0 || image_buf.width() == 0 { return; } diff --git a/crates/scap-ffmpeg/src/screencapturekit.rs b/crates/scap-ffmpeg/src/screencapturekit.rs index c3095b603c..7c84d11e74 100644 --- a/crates/scap-ffmpeg/src/screencapturekit.rs +++ b/crates/scap-ffmpeg/src/screencapturekit.rs @@ -7,20 +7,27 @@ use cidre::{ pub enum AsFFmpegError { UnsupportedFormat(cv::PixelFormat), BaseAddrLock(os::Error), + NoImageBuffer, } impl super::AsFFmpeg for scap_screencapturekit::VideoFrame { fn as_ffmpeg(&self) -> Result { - let mut image_buf = self.image_buf().retained(); + let Some(image_buf_ref) = self.image_buf() else { + return Err(AsFFmpegError::NoImageBuffer); + }; + let mut image_buf = image_buf_ref.retained(); let width = image_buf.width(); let height = image_buf.height(); + let pixel_format = image_buf.pixel_format(); + let plane0_stride = image_buf.plane_bytes_per_row(0); + let plane1_stride = image_buf.plane_bytes_per_row(1); let bytes_lock = ImageBufExt::base_addr_lock(image_buf.as_mut(), cv::pixel_buffer::LockFlags::READ_ONLY) .map_err(AsFFmpegError::BaseAddrLock)?; - Ok(match self.image_buf().pixel_format() { + Ok(match pixel_format { cv::PixelFormat::_420V => { let mut ff_frame = ffmpeg::frame::Video::new( ffmpeg::format::Pixel::NV12, @@ -28,7 +35,7 @@ impl super::AsFFmpeg for scap_screencapturekit::VideoFrame { height as u32, ); - let src_stride = self.image_buf().plane_bytes_per_row(0); + let src_stride = plane0_stride; let dest_stride = ff_frame.stride(0); let src_bytes = bytes_lock.plane_data(0); @@ -42,7 +49,7 @@ impl super::AsFFmpeg for scap_screencapturekit::VideoFrame { dest_row.copy_from_slice(src_row); } - let src_stride = self.image_buf().plane_bytes_per_row(1); + let src_stride = plane1_stride; let dest_stride = ff_frame.stride(1); let src_bytes = bytes_lock.plane_data(1); @@ -65,7 +72,7 @@ impl super::AsFFmpeg for scap_screencapturekit::VideoFrame { height as u32, ); - let src_stride = self.image_buf().plane_bytes_per_row(0); + let src_stride = plane0_stride; let dest_stride = ff_frame.stride(0); let src_bytes = bytes_lock.plane_data(0); diff --git a/crates/scap-screencapturekit/src/capture.rs b/crates/scap-screencapturekit/src/capture.rs index 5eb35c1812..3bf9e177e5 100644 --- a/crates/scap-screencapturekit/src/capture.rs +++ b/crates/scap-screencapturekit/src/capture.rs @@ -25,14 +25,11 @@ impl sc::stream::OutputImpl for CapturerCallbacks { let sample_buf = sample_buf.retained(); let frame = match kind { sc::OutputType::Screen => { - let Some(image_buf) = sample_buf.image_buf().map(|v| v.retained()) else { + if sample_buf.image_buf().is_none() { return; - }; + } - Frame::Screen(VideoFrame { - sample_buf, - image_buf, - }) + Frame::Screen(VideoFrame { sample_buf }) } sc::OutputType::Audio => Frame::Audio(AudioFrame { sample_buf }), sc::OutputType::Mic => Frame::Mic(AudioFrame { sample_buf }), @@ -97,7 +94,6 @@ impl Capturer { pub struct VideoFrame { sample_buf: arc::R, - image_buf: arc::R, } unsafe impl Send for VideoFrame {} @@ -111,12 +107,8 @@ impl VideoFrame { self.sample_buf.as_mut() } - pub fn image_buf(&self) -> &cv::ImageBuf { - self.image_buf.as_ref() - } - - pub fn image_buf_mut(&mut self) -> &mut cv::ImageBuf { - self.image_buf.as_mut() + pub fn image_buf(&self) -> Option<&cv::ImageBuf> { + self.sample_buf.image_buf() } }