diff --git a/apps/desktop/src-tauri/src/camera_legacy.rs b/apps/desktop/src-tauri/src/camera_legacy.rs index ee99b50fb1..ac980a7d1e 100644 --- a/apps/desktop/src-tauri/src/camera_legacy.rs +++ b/apps/desktop/src-tauri/src/camera_legacy.rs @@ -64,6 +64,8 @@ pub async fn create_camera_preview_ws() -> (Sender, u16, Cance width: frame.width(), height: frame.height(), stride: frame.stride(0) as u32, + frame_number: 0, + target_time_ns: 0, created_at: Instant::now(), }) .ok(); diff --git a/apps/desktop/src-tauri/src/editor_window.rs b/apps/desktop/src-tauri/src/editor_window.rs index 66ea67b975..97e2b439a1 100644 --- a/apps/desktop/src-tauri/src/editor_window.rs +++ b/apps/desktop/src-tauri/src/editor_window.rs @@ -1,40 +1,15 @@ use std::{collections::HashMap, ops::Deref, path::PathBuf, sync::Arc, time::Instant}; use tauri::{AppHandle, Manager, Runtime, Window, ipc::CommandArg}; -use tokio::sync::{RwLock, watch}; +use tokio::sync::{Mutex, RwLock, mpsc}; use tokio_util::sync::CancellationToken; use tracing::debug; -use cap_rendering::RenderedFrame; - use crate::{ create_editor_instance_impl, - frame_ws::{WSFrame, create_watch_frame_ws}, + frame_ws::{WSFrame, create_mpsc_frame_ws}, }; -fn strip_frame_padding(frame: RenderedFrame) -> Result<(Vec, u32), &'static str> { - let expected_stride = frame - .width - .checked_mul(4) - .ok_or("overflow computing expected_stride")?; - - if frame.padded_bytes_per_row == expected_stride { - Ok((frame.data, expected_stride)) - } else { - let capacity = expected_stride - .checked_mul(frame.height) - .ok_or("overflow computing buffer capacity")?; - let mut stripped = Vec::with_capacity(capacity as usize); - for row in 0..frame.height { - let start = row - .checked_mul(frame.padded_bytes_per_row) - .ok_or("overflow computing row start")? as usize; - let end = start + expected_stride as usize; - stripped.extend_from_slice(&frame.data[start..end]); - } - - Ok((stripped, expected_stride)) - } -} +const FRAME_CHANNEL_BUFFER: usize = 8; pub struct EditorInstance { inner: Arc, @@ -49,25 +24,30 @@ type PendingReceiver = tokio::sync::watch::Receiver>; pub struct PendingEditorInstances(Arc>>); async fn do_prewarm(app: AppHandle, path: PathBuf) -> PendingResult { - let (frame_tx, frame_rx) = watch::channel(None); + let (frame_tx, frame_rx) = mpsc::channel(FRAME_CHANNEL_BUFFER); + let frame_rx = Arc::new(Mutex::new(frame_rx)); - let (ws_port, ws_shutdown_token) = create_watch_frame_ws(frame_rx).await; + let (ws_port, ws_shutdown_token) = create_mpsc_frame_ws(frame_rx).await; let inner = create_editor_instance_impl( &app, path, Box::new(move |frame| { let width = frame.width; let height = frame.height; - if let Ok((data, stride)) = strip_frame_padding(frame) - && let Err(e) = frame_tx.send(Some(WSFrame { - data, - width, - height, - stride, - created_at: Instant::now(), - })) - { - debug!("Frame receiver dropped during prewarm: {e}"); + let stride = frame.padded_bytes_per_row; + let frame_number = frame.frame_number; + let target_time_ns = frame.target_time_ns; + let data = frame.data; + if let Err(e) = frame_tx.try_send(WSFrame { + data, + width, + height, + stride, + frame_number, + target_time_ns, + created_at: Instant::now(), + }) { + debug!("Frame channel full or closed during prewarm: {e}"); } }), ) @@ -211,25 +191,30 @@ impl EditorInstances { } } - let (frame_tx, frame_rx) = watch::channel(None); + let (frame_tx, frame_rx) = mpsc::channel(FRAME_CHANNEL_BUFFER); + let frame_rx = Arc::new(Mutex::new(frame_rx)); - let (ws_port, ws_shutdown_token) = create_watch_frame_ws(frame_rx).await; + let (ws_port, ws_shutdown_token) = create_mpsc_frame_ws(frame_rx).await; let instance = create_editor_instance_impl( window.app_handle(), path, Box::new(move |frame| { let width = frame.width; let height = frame.height; - if let Ok((data, stride)) = strip_frame_padding(frame) - && let Err(e) = frame_tx.send(Some(WSFrame { - data, - width, - height, - stride, - created_at: Instant::now(), - })) - { - debug!("Frame receiver dropped in get_or_create: {e}"); + let stride = frame.padded_bytes_per_row; + let frame_number = frame.frame_number; + let target_time_ns = frame.target_time_ns; + let data = frame.data; + if let Err(e) = frame_tx.try_send(WSFrame { + data, + width, + height, + stride, + frame_number, + target_time_ns, + created_at: Instant::now(), + }) { + debug!("Frame channel full or closed in get_or_create: {e}"); } }), ) diff --git a/apps/desktop/src-tauri/src/frame_ws.rs b/apps/desktop/src-tauri/src/frame_ws.rs index 93146003f1..df0c542bd4 100644 --- a/apps/desktop/src-tauri/src/frame_ws.rs +++ b/apps/desktop/src-tauri/src/frame_ws.rs @@ -1,11 +1,22 @@ +use std::sync::Arc; use std::time::Instant; -use tokio::sync::{broadcast, watch}; +use tokio::sync::{Mutex, broadcast, mpsc, watch}; use tokio_util::sync::CancellationToken; -fn pack_frame_data(mut data: Vec, stride: u32, height: u32, width: u32) -> Vec { +fn pack_frame_data( + mut data: Vec, + stride: u32, + height: u32, + width: u32, + frame_number: u32, + target_time_ns: u64, +) -> Vec { + data.reserve_exact(24); data.extend_from_slice(&stride.to_le_bytes()); data.extend_from_slice(&height.to_le_bytes()); data.extend_from_slice(&width.to_le_bytes()); + data.extend_from_slice(&frame_number.to_le_bytes()); + data.extend_from_slice(&target_time_ns.to_le_bytes()); data } @@ -15,6 +26,8 @@ pub struct WSFrame { pub width: u32, pub height: u32, pub stride: u32, + pub frame_number: u32, + pub target_time_ns: u64, #[allow(dead_code)] pub created_at: Instant, } @@ -49,7 +62,14 @@ pub async fn create_watch_frame_ws( { let frame_opt = camera_rx.borrow().clone(); if let Some(frame) = frame_opt { - let packed = pack_frame_data(frame.data, frame.stride, frame.height, frame.width); + let packed = pack_frame_data( + frame.data, + frame.stride, + frame.height, + frame.width, + frame.frame_number, + frame.target_time_ns, + ); if let Err(e) = socket.send(Message::Binary(packed)).await { tracing::error!("Failed to send initial frame to socket: {:?}", e); @@ -82,7 +102,14 @@ pub async fn create_watch_frame_ws( } let frame_opt = camera_rx.borrow().clone(); if let Some(frame) = frame_opt { - let packed = pack_frame_data(frame.data, frame.stride, frame.height, frame.width); + let packed = pack_frame_data( + frame.data, + frame.stride, + frame.height, + frame.width, + frame.frame_number, + frame.target_time_ns, + ); if let Err(e) = socket.send(Message::Binary(packed)).await { tracing::error!("Failed to send frame to socket: {:?}", e); @@ -121,6 +148,103 @@ pub async fn create_watch_frame_ws( (port, cancel_token_child) } +pub async fn create_mpsc_frame_ws( + frame_rx: Arc>>, +) -> (u16, CancellationToken) { + use axum::{ + extract::{ + State, + ws::{Message, WebSocket, WebSocketUpgrade}, + }, + response::IntoResponse, + routing::get, + }; + + type RouterState = Arc>>; + + #[axum::debug_handler] + async fn ws_handler( + ws: WebSocketUpgrade, + State(state): State, + ) -> impl IntoResponse { + ws.on_upgrade(move |socket| handle_socket(socket, state)) + } + + async fn handle_socket(mut socket: WebSocket, frame_rx: RouterState) { + tracing::info!("Socket connection established (mpsc)"); + let now = std::time::Instant::now(); + + loop { + let frame_opt = { + let mut rx = frame_rx.lock().await; + tokio::select! { + biased; + msg = socket.recv() => { + match msg { + Some(Ok(Message::Close(_))) | None => { + tracing::info!("WebSocket closed"); + break; + } + Some(Ok(_)) => { + continue; + } + Some(Err(e)) => { + tracing::error!("WebSocket error: {:?}", e); + break; + } + } + }, + frame = rx.recv() => frame, + } + }; + + let Some(frame) = frame_opt else { + tracing::info!("Frame channel closed"); + break; + }; + + let packed = pack_frame_data( + frame.data, + frame.stride, + frame.height, + frame.width, + frame.frame_number, + frame.target_time_ns, + ); + + if let Err(e) = socket.send(Message::Binary(packed)).await { + tracing::error!("Failed to send frame to socket: {:?}", e); + break; + } + } + + let elapsed = now.elapsed(); + tracing::info!("Websocket closing after {elapsed:.2?}"); + } + + let router = axum::Router::new() + .route("/", get(ws_handler)) + .with_state(frame_rx); + + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + tracing::info!("WebSocket server (mpsc) listening on port {}", port); + + let cancel_token = CancellationToken::new(); + let cancel_token_child = cancel_token.child_token(); + tokio::spawn(async move { + let server = axum::serve(listener, router.into_make_service()); + tokio::select! { + _ = server => {}, + _ = cancel_token.cancelled() => { + tracing::info!("WebSocket server shutting down"); + } + } + }); + + (port, cancel_token_child) +} + pub async fn create_frame_ws(frame_tx: broadcast::Sender) -> (u16, CancellationToken) { use axum::{ extract::{ @@ -167,7 +291,14 @@ pub async fn create_frame_ws(frame_tx: broadcast::Sender) -> (u16, Canc incoming_frame = camera_rx.recv() => { match incoming_frame { Ok(frame) => { - let packed = pack_frame_data(frame.data, frame.stride, frame.height, frame.width); + let packed = pack_frame_data( + frame.data, + frame.stride, + frame.height, + frame.width, + frame.frame_number, + frame.target_time_ns, + ); if let Err(e) = socket.send(Message::Binary(packed)).await { tracing::error!("Failed to send frame to socket: {:?}", e); diff --git a/apps/desktop/src-tauri/src/screenshot_editor.rs b/apps/desktop/src-tauri/src/screenshot_editor.rs index 58f7a5f8f6..90bffe7b60 100644 --- a/apps/desktop/src-tauri/src/screenshot_editor.rs +++ b/apps/desktop/src-tauri/src/screenshot_editor.rs @@ -360,6 +360,8 @@ impl ScreenshotEditorInstances { width: frame.width, height: frame.height, stride: frame.padded_bytes_per_row, + frame_number: frame.frame_number, + target_time_ns: frame.target_time_ns, created_at: Instant::now(), })); } diff --git a/apps/desktop/src/routes/editor/Editor.tsx b/apps/desktop/src/routes/editor/Editor.tsx index 4e6d98121c..b02cd55672 100644 --- a/apps/desktop/src/routes/editor/Editor.tsx +++ b/apps/desktop/src/routes/editor/Editor.tsx @@ -96,6 +96,7 @@ function Inner() { setEditorState, previewResolutionBase, dialog, + canvasControls, } = useEditorContext(); const isExportMode = () => { @@ -208,6 +209,7 @@ function Inner() { const updateConfigAndRender = throttle(async (time: number) => { const config = serializeProjectConfiguration(project); await commands.updateProjectConfigInMemory(config); + canvasControls()?.resetFrameState(); renderFrame(time); }, 1000 / FPS); createEffect( diff --git a/apps/desktop/src/routes/editor/ExportPage.tsx b/apps/desktop/src/routes/editor/ExportPage.tsx index b905656a61..56c0f724dd 100644 --- a/apps/desktop/src/routes/editor/ExportPage.tsx +++ b/apps/desktop/src/routes/editor/ExportPage.tsx @@ -1,5 +1,4 @@ import { Button } from "@cap/ui-solid"; -import { RadioGroup as KRadioGroup } from "@kobalte/core/radio-group"; import { debounce } from "@solid-primitives/scheduled"; import { makePersisted } from "@solid-primitives/storage"; import { createMutation } from "@tanstack/solid-query"; @@ -13,7 +12,6 @@ import { createEffect, createSignal, For, - type JSX, Match, mergeProps, on, @@ -41,7 +39,7 @@ import { } from "~/utils/tauri"; import { type RenderState, useEditorContext } from "./context"; import { RESOLUTION_OPTIONS } from "./Header"; -import { Dialog, Field, Slider } from "./ui"; +import { Dialog, Field } from "./ui"; class SilentError extends Error {} @@ -56,13 +54,6 @@ export const COMPRESSION_OPTIONS: Array<{ { label: "Potato", value: "Potato", bpp: 0.04 }, ]; -const BPP_TO_COMPRESSION: Record = { - "0.3": "Minimal", - "0.15": "Social", - "0.08": "Web", - "0.04": "Potato", -}; - const COMPRESSION_TO_BPP: Record = { Minimal: 0.3, Social: 0.15, @@ -585,13 +576,6 @@ export function ExportPage() { }, })); - const qualityLabel = () => { - const option = COMPRESSION_OPTIONS.find( - (opt) => opt.value === settings.compression, - ); - return option?.label ?? "Minimal"; - }; - const formatDuration = (seconds: number) => { const hours = Math.floor(seconds / 3600); const minutes = Math.floor((seconds % 3600) / 60); @@ -756,46 +740,47 @@ export function ExportPage() {
}> - { - setSettings( - produce((newSettings) => { - newSettings.exportTo = value as ExportToOption; - if (value === "link" && settings.format === "Gif") { - newSettings.format = "Mp4"; - } - }), - ); - }} - > +
{(option) => { const Icon = option.icon; + const isSelected = () => settings.exportTo === option.value; return ( - { + setSettings( + produce((newSettings) => { + newSettings.exportTo = + option.value as ExportToOption; + if ( + option.value === "link" && + settings.format === "Gif" + ) { + newSettings.format = "Mp4"; + } + }), + ); + }} > - - - - -
- - {option.label} - - - {option.description} - -
-
-
+ + {option.label} + ); }}
- +
+ ); return disabledReason() ? ( @@ -910,7 +900,7 @@ export function ExportPage() { name="Resolution" icon={} > -
+
{(option) => ( - + )}
- } - value={ - - {settings.fps} FPS - - } - > -
+ }> +
{(option) => ( - + )}
@@ -975,33 +964,35 @@ export function ExportPage() { } - value={ - {qualityLabel()} - } > - opt.value === settings.compression, - ), - ]} - minValue={0} - maxValue={COMPRESSION_OPTIONS.length - 1} - step={1} - onChange={([v]) => { - if (v === undefined) return; - const option = - COMPRESSION_OPTIONS[COMPRESSION_OPTIONS.length - 1 - v]; - if (option) { - setPreviewLoading(true); - setCompressionBpp(option.bpp); - setSettings("compression", option.value); - } - }} - history={{ pause: () => () => {} }} - /> +
+ + {(option) => ( + + )} + +
+
+ Smaller file + Larger file +
diff --git a/apps/desktop/src/utils/frame-worker.ts b/apps/desktop/src/utils/frame-worker.ts index 5b61309b6f..ae93be6930 100644 --- a/apps/desktop/src/utils/frame-worker.ts +++ b/apps/desktop/src/utils/frame-worker.ts @@ -32,6 +32,10 @@ interface CleanupMessage { type: "cleanup"; } +interface ResetFrameStateMessage { + type: "reset-frame-state"; +} + interface ReadyMessage { type: "ready"; } @@ -84,13 +88,20 @@ type IncomingMessage = | InitCanvasMessage | ResizeMessage | InitSharedBufferMessage - | CleanupMessage; + | CleanupMessage + | ResetFrameStateMessage; + +interface FrameTiming { + frameNumber: number; + targetTimeNs: bigint; +} interface PendingFrameCanvas2D { mode: "canvas2d"; imageData: ImageData; width: number; height: number; + timing: FrameTiming; } interface PendingFrameWebGPU { @@ -98,6 +109,9 @@ interface PendingFrameWebGPU { data: Uint8ClampedArray; width: number; height: number; + strideBytes: number; + timing: FrameTiming; + releaseCallback?: () => void; } type PendingFrame = PendingFrameCanvas2D | PendingFrameWebGPU; @@ -125,19 +139,265 @@ let lastRawFrameData: Uint8ClampedArray | null = null; let lastRawFrameWidth = 0; let lastRawFrameHeight = 0; -let webgpuFrameBuffer: Uint8ClampedArray | null = null; -let webgpuFrameBufferSize = 0; - let frameDropCount = 0; let lastFrameDropLogTime = 0; let consumer: Consumer | null = null; let useSharedBuffer = false; +let sharedReadBuffer: Uint8Array | null = null; +let sharedReadBufferSize = 0; -let pendingRenderFrame: PendingFrame | null = null; +const FRAME_QUEUE_SIZE = 5; +let frameQueue: PendingFrame[] = []; let _rafId: number | null = null; let rafRunning = false; +let playbackStartTime: number | null = null; +let playbackStartTargetTimeNs: bigint | null = null; +let lastRenderedFrameNumber = -1; + +const EARLY_THRESHOLD_MS = 16; +const LATE_THRESHOLD_MS = 40; + +function tryPollSharedBuffer(): boolean { + if (!consumer || !useSharedBuffer) return false; + + if (renderMode !== "webgpu") { + if (!sharedReadBuffer || sharedReadBufferSize < consumer.getSlotSize()) { + sharedReadBuffer = new Uint8Array(consumer.getSlotSize()); + sharedReadBufferSize = sharedReadBuffer.byteLength; + } + + const size = consumer.readInto(sharedReadBuffer, 0); + if (size != null && size > 0) { + queueFrameFromBytes(sharedReadBuffer.subarray(0, size)); + return true; + } + } + return false; +} + +function parseFrameMetadata(bytes: Uint8Array): { + width: number; + height: number; + strideBytes: number; + frameNumber: number; + targetTimeNs: bigint; + availableLength: number; +} | null { + if (bytes.byteLength < 24) return null; + + const metadataOffset = bytes.byteOffset + bytes.byteLength - 24; + const meta = new DataView(bytes.buffer, metadataOffset, 24); + const strideBytes = meta.getUint32(0, true); + const height = meta.getUint32(4, true); + const width = meta.getUint32(8, true); + const frameNumber = meta.getUint32(12, true); + const targetTimeNs = meta.getBigUint64(16, true); + + if (!width || !height) return null; + + const expectedRowBytes = width * 4; + const availableLength = strideBytes * height; + + if ( + strideBytes === 0 || + strideBytes < expectedRowBytes || + bytes.byteLength - 24 < availableLength + ) { + return null; + } + + return { + width, + height, + strideBytes, + frameNumber, + targetTimeNs, + availableLength, + }; +} + +function renderBorrowedWebGPU(bytes: Uint8Array, release: () => void): boolean { + if (renderMode !== "webgpu" || !webgpuRenderer) { + release(); + return false; + } + + const meta = parseFrameMetadata(bytes); + if (!meta) { + release(); + return false; + } + + const { + width, + height, + strideBytes, + frameNumber, + targetTimeNs, + availableLength, + } = meta; + + const frameData = new Uint8ClampedArray( + bytes.buffer, + bytes.byteOffset, + bytes.byteLength - 24, + ).subarray(0, availableLength); + + const isSeek = + lastRenderedFrameNumber >= 0 && + (frameNumber < lastRenderedFrameNumber || + frameNumber > lastRenderedFrameNumber + 30); + + if ( + playbackStartTime === null || + playbackStartTargetTimeNs === null || + isSeek + ) { + playbackStartTime = performance.now(); + playbackStartTargetTimeNs = targetTimeNs; + } + + lastRenderedFrameNumber = frameNumber; + + renderFrameWebGPU(webgpuRenderer, frameData, width, height, strideBytes); + release(); + + self.postMessage({ + type: "frame-rendered", + width, + height, + } satisfies FrameRenderedMessage); + + return true; +} + +function drainAndRenderLatestSharedWebGPU(maxDrain: number): boolean { + if (!consumer || !useSharedBuffer || consumer.isShutdown()) return false; + if (renderMode !== "webgpu" || !webgpuRenderer) return false; + + let latest: { bytes: Uint8Array; release: () => void } | null = null; + + for (let i = 0; i < maxDrain; i += 1) { + const borrowed = consumer.borrow(0); + if (!borrowed) break; + + if (latest) { + latest.release(); + } + latest = { bytes: borrowed.data, release: borrowed.release }; + } + + if (!latest) return false; + + return renderBorrowedWebGPU(latest.bytes, latest.release); +} + +function queueFrameFromBytes( + bytes: Uint8Array, + releaseCallback?: () => void, +): void { + const meta = parseFrameMetadata(bytes); + if (!meta) { + releaseCallback?.(); + return; + } + + const { + width, + height, + strideBytes, + frameNumber, + targetTimeNs, + availableLength, + } = meta; + const timing: FrameTiming = { frameNumber, targetTimeNs }; + const expectedRowBytes = width * 4; + + const frameData = new Uint8ClampedArray( + bytes.buffer, + bytes.byteOffset, + bytes.byteLength - 24, + ); + + if (renderMode === "webgpu" && webgpuRenderer) { + for (const queued of frameQueue) { + if (queued.mode === "webgpu" && queued.releaseCallback) { + queued.releaseCallback(); + } + } + frameQueue = frameQueue.filter((f) => f.mode !== "webgpu"); + + frameQueue.push({ + mode: "webgpu", + data: frameData.subarray(0, availableLength), + width, + height, + strideBytes, + timing, + releaseCallback, + }); + } else { + const expectedLength = expectedRowBytes * height; + let processedFrameData: Uint8ClampedArray; + + if (strideBytes === expectedRowBytes) { + processedFrameData = frameData.subarray(0, expectedLength); + } else { + if (!strideBuffer || strideBufferSize < expectedLength) { + strideBuffer = new Uint8ClampedArray(expectedLength); + strideBufferSize = expectedLength; + } + for (let row = 0; row < height; row += 1) { + const srcStart = row * strideBytes; + const destStart = row * expectedRowBytes; + strideBuffer.set( + frameData.subarray(srcStart, srcStart + expectedRowBytes), + destStart, + ); + } + processedFrameData = strideBuffer.subarray(0, expectedLength); + } + + if (!lastRawFrameData || lastRawFrameData.length < expectedLength) { + lastRawFrameData = new Uint8ClampedArray(expectedLength); + } + lastRawFrameData.set(processedFrameData); + lastRawFrameWidth = width; + lastRawFrameHeight = height; + + if (!cachedImageData || cachedWidth !== width || cachedHeight !== height) { + cachedImageData = new ImageData(width, height); + cachedWidth = width; + cachedHeight = height; + } + cachedImageData.data.set(processedFrameData); + lastImageData = cachedImageData; + + releaseCallback?.(); + + while (frameQueue.length >= FRAME_QUEUE_SIZE) { + frameQueue.shift(); + frameDropCount++; + } + + frameQueue.push({ + mode: "canvas2d", + imageData: cachedImageData, + width, + height, + timing, + }); + } + + self.postMessage({ + type: "frame-queued", + width, + height, + } satisfies FrameQueuedMessage); +} + function renderLoop() { _rafId = null; @@ -151,12 +411,109 @@ function renderLoop() { return; } - const frame = pendingRenderFrame; - if (frame) { - pendingRenderFrame = null; + if (useSharedBuffer && consumer && !consumer.isShutdown()) { + if (renderMode === "webgpu" && webgpuRenderer) { + const rendered = drainAndRenderLatestSharedWebGPU(8); + if (rendered) { + _rafId = requestAnimationFrame(renderLoop); + return; + } + } + + let polled = 0; + while (polled < 4 && tryPollSharedBuffer()) { + polled++; + } + } + + const now = performance.now(); + let frameToRender: PendingFrame | null = null; + let frameIndex = -1; + let fallbackFrame: PendingFrame | null = null; + let fallbackIndex = -1; + let fallbackDiffMs = Number.MAX_VALUE; + + for (let i = 0; i < frameQueue.length; i++) { + const frame = frameQueue[i]; + const frameNum = frame.timing.frameNumber; + + const isSeek = + playbackStartTime !== null && + lastRenderedFrameNumber >= 0 && + (frameNum < lastRenderedFrameNumber || + frameNum > lastRenderedFrameNumber + 30); + + if (playbackStartTime === null || isSeek) { + playbackStartTime = now; + playbackStartTargetTimeNs = frame.timing.targetTimeNs; + } + + if (frame.timing.targetTimeNs > 0n) { + const elapsedMs = now - playbackStartTime; + const startTarget = + playbackStartTargetTimeNs ?? frame.timing.targetTimeNs; + const adjustedTargetNs = frame.timing.targetTimeNs - startTarget; + const targetMs = Number(adjustedTargetNs / 1_000_000n); + const diffMs = targetMs - elapsedMs; + + const absDiff = Math.abs(diffMs); + if (absDiff < fallbackDiffMs) { + fallbackDiffMs = absDiff; + fallbackFrame = frame; + fallbackIndex = i; + } + + if (diffMs < -LATE_THRESHOLD_MS && !isSeek) { + if (frame.mode === "webgpu" && frame.releaseCallback) { + frame.releaseCallback(); + } + frameQueue.splice(i, 1); + i--; + frameDropCount++; + const logNow = performance.now(); + if (logNow - lastFrameDropLogTime > 1000) { + if (frameDropCount > 0) { + console.warn( + `[frame-worker] Dropped ${frameDropCount} frames in the last second`, + ); + } + frameDropCount = 0; + lastFrameDropLogTime = logNow; + } + continue; + } + } + + if ( + frameToRender === null || + frame.timing.frameNumber > frameToRender.timing.frameNumber + ) { + frameToRender = frame; + frameIndex = i; + } + } + + if (frameToRender === null && fallbackFrame !== null) { + frameToRender = fallbackFrame; + frameIndex = fallbackIndex; + } + + if (frameToRender !== null && frameIndex >= 0) { + frameQueue.splice(frameIndex, 1); + const frame = frameToRender; + lastRenderedFrameNumber = frame.timing.frameNumber; if (frame.mode === "webgpu" && webgpuRenderer) { - renderFrameWebGPU(webgpuRenderer, frame.data, frame.width, frame.height); + renderFrameWebGPU( + webgpuRenderer, + frame.data, + frame.width, + frame.height, + frame.strideBytes, + ); + if (frame.releaseCallback) { + frame.releaseCallback(); + } } else if (frame.mode === "canvas2d" && offscreenCanvas && offscreenCtx) { if ( offscreenCanvas.width !== frame.width || @@ -175,7 +532,15 @@ function renderLoop() { } satisfies FrameRenderedMessage); } - _rafId = requestAnimationFrame(renderLoop); + const shouldContinue = + frameQueue.length > 0 || + (useSharedBuffer && consumer && !consumer.isShutdown()); + + if (shouldContinue) { + _rafId = requestAnimationFrame(renderLoop); + } else { + rafRunning = false; + } } function startRenderLoop() { @@ -202,6 +567,12 @@ function stopRenderLoop() { function cleanup() { stopRenderLoop(); + for (const frame of frameQueue) { + if (frame.mode === "webgpu" && frame.releaseCallback) { + frame.releaseCallback(); + } + } + frameQueue = []; if (webgpuRenderer) { disposeWebGPU(webgpuRenderer); webgpuRenderer = null; @@ -210,7 +581,8 @@ function cleanup() { offscreenCtx = null; consumer = null; useSharedBuffer = false; - pendingRenderFrame = null; + sharedReadBuffer = null; + sharedReadBufferSize = 0; lastImageData = null; cachedImageData = null; cachedWidth = 0; @@ -220,10 +592,11 @@ function cleanup() { lastRawFrameData = null; lastRawFrameWidth = 0; lastRawFrameHeight = 0; - webgpuFrameBuffer = null; - webgpuFrameBufferSize = 0; frameDropCount = 0; lastFrameDropLogTime = 0; + playbackStartTime = null; + playbackStartTargetTimeNs = null; + lastRenderedFrameNumber = -1; } function initWorker() { @@ -236,7 +609,7 @@ function initWorker() { } if (useSharedBuffer && consumer) { - pollSharedBuffer(); + startRenderLoop(); } } @@ -298,6 +671,7 @@ async function initCanvas(canvas: OffscreenCanvas): Promise { lastRawFrameData, lastRawFrameWidth, lastRawFrameHeight, + lastRawFrameWidth * 4, ); self.postMessage({ type: "frame-rendered", @@ -337,23 +711,34 @@ async function initCanvas(canvas: OffscreenCanvas): Promise { type DecodeResult = FrameQueuedMessage | DecodedFrame | ErrorMessage; -async function processFrame(buffer: ArrayBuffer): Promise { - const data = new Uint8Array(buffer); - if (data.length < 12) { +function processFrameBytesSync( + bytes: Uint8Array, + releaseCallback?: () => void, +): DecodeResult { + if (bytes.byteLength < 24) { + releaseCallback?.(); return { type: "error", message: "Received frame too small to contain metadata", }; } - const metadataOffset = data.length - 12; - const meta = new DataView(buffer, metadataOffset, 12); + const metadataOffset = bytes.byteOffset + bytes.byteLength - 24; + const meta = new DataView(bytes.buffer, metadataOffset, 24); const strideBytes = meta.getUint32(0, true); const height = meta.getUint32(4, true); const width = meta.getUint32(8, true); - const frameData = new Uint8ClampedArray(buffer, 0, metadataOffset); + const frameNumber = meta.getUint32(12, true); + const targetTimeNs = meta.getBigUint64(16, true); + const timing: FrameTiming = { frameNumber, targetTimeNs }; + const frameData = new Uint8ClampedArray( + bytes.buffer, + bytes.byteOffset, + bytes.byteLength - 24, + ); if (!width || !height) { + releaseCallback?.(); return { type: "error", message: `Received invalid frame dimensions: ${width}x${height}`, @@ -369,12 +754,35 @@ async function processFrame(buffer: ArrayBuffer): Promise { strideBytes < expectedRowBytes || frameData.length < availableLength ) { + releaseCallback?.(); return { type: "error", message: `Received invalid frame stride: ${strideBytes}, expected: ${expectedRowBytes}`, }; } + if (renderMode === "webgpu" && webgpuRenderer) { + while (frameQueue.length >= FRAME_QUEUE_SIZE) { + const dropped = frameQueue.shift(); + if (dropped?.mode === "webgpu" && dropped.releaseCallback) { + dropped.releaseCallback(); + } + frameDropCount++; + } + + frameQueue.push({ + mode: "webgpu", + data: frameData.subarray(0, availableLength), + width, + height, + strideBytes, + timing, + releaseCallback, + }); + startRenderLoop(); + return { type: "frame-queued", width, height }; + } + let processedFrameData: Uint8ClampedArray; if (strideBytes === expectedRowBytes) { processedFrameData = frameData.subarray(0, expectedLength); @@ -394,36 +802,6 @@ async function processFrame(buffer: ArrayBuffer): Promise { processedFrameData = strideBuffer.subarray(0, expectedLength); } - if (renderMode === "webgpu" && webgpuRenderer) { - if (pendingRenderFrame !== null) { - frameDropCount++; - const now = performance.now(); - if (now - lastFrameDropLogTime > 1000) { - if (frameDropCount > 0) { - console.warn( - `[frame-worker] Dropped ${frameDropCount} frames in the last second`, - ); - } - frameDropCount = 0; - lastFrameDropLogTime = now; - } - } - - if (!webgpuFrameBuffer || webgpuFrameBufferSize < expectedLength) { - webgpuFrameBuffer = new Uint8ClampedArray(expectedLength); - webgpuFrameBufferSize = expectedLength; - } - webgpuFrameBuffer.set(processedFrameData); - - pendingRenderFrame = { - mode: "webgpu", - data: webgpuFrameBuffer, - width, - height, - }; - return { type: "frame-queued", width, height }; - } - if (!lastRawFrameData || lastRawFrameData.length < expectedLength) { lastRawFrameData = new Uint8ClampedArray(expectedLength); } @@ -439,51 +817,26 @@ async function processFrame(buffer: ArrayBuffer): Promise { cachedImageData.data.set(processedFrameData); lastImageData = cachedImageData; - if (offscreenCanvas && offscreenCtx) { - pendingRenderFrame = { - mode: "canvas2d", - imageData: cachedImageData, - width, - height, - }; + releaseCallback?.(); - return { type: "frame-queued", width, height }; + while (frameQueue.length >= FRAME_QUEUE_SIZE) { + frameQueue.shift(); + frameDropCount++; } - try { - const bitmap = await createImageBitmap(cachedImageData); - return { - type: "decoded", - bitmap, - width, - height, - }; - } catch (e) { - return { - type: "error", - message: `Failed to create ImageBitmap: ${e}`, - }; - } -} - -async function pollSharedBuffer(): Promise { - if (!consumer || !useSharedBuffer) return; + frameQueue.push({ + mode: "canvas2d", + imageData: cachedImageData, + width, + height, + timing, + }); - const buffer = consumer.read(50); - if (buffer) { - const result = await processFrame(buffer); - if (result.type === "decoded") { - self.postMessage(result, { transfer: [result.bitmap] }); - } else if (result.type === "frame-queued") { - self.postMessage(result); - } else if (result.type === "error") { - self.postMessage(result); - } + if (offscreenCanvas && offscreenCtx) { + startRenderLoop(); } - if (!consumer.isShutdown()) { - setTimeout(pollSharedBuffer, 0); - } + return { type: "frame-queued", width, height }; } self.onmessage = async (e: MessageEvent) => { @@ -492,12 +845,27 @@ self.onmessage = async (e: MessageEvent) => { return; } + if (e.data.type === "reset-frame-state") { + lastRenderedFrameNumber = -1; + playbackStartTime = null; + playbackStartTargetTimeNs = null; + for (const frame of frameQueue) { + if (frame.mode === "webgpu" && frame.releaseCallback) { + frame.releaseCallback(); + } + } + frameQueue = []; + return; + } + if (e.data.type === "init-shared-buffer") { consumer = createConsumer(e.data.buffer); useSharedBuffer = true; + sharedReadBuffer = null; + sharedReadBufferSize = 0; if (workerReady) { - pollSharedBuffer(); + startRenderLoop(); } return; } @@ -536,10 +904,8 @@ self.onmessage = async (e: MessageEvent) => { } if (e.data.type === "frame") { - const result = await processFrame(e.data.buffer); - if (result.type === "decoded") { - self.postMessage(result, { transfer: [result.bitmap] }); - } else if (result.type === "frame-queued") { + const result = processFrameBytesSync(new Uint8Array(e.data.buffer)); + if (result.type === "frame-queued") { self.postMessage(result); } else if (result.type === "error") { self.postMessage(result); diff --git a/apps/desktop/src/utils/shared-frame-buffer.ts b/apps/desktop/src/utils/shared-frame-buffer.ts index 7b0f82737f..24a4637242 100644 --- a/apps/desktop/src/utils/shared-frame-buffer.ts +++ b/apps/desktop/src/utils/shared-frame-buffer.ts @@ -18,7 +18,7 @@ const CONTROL_SLOT_SIZE = 4; const CONTROL_METADATA_OFFSET = 5; const CONTROL_DATA_OFFSET = 6; const CONTROL_VERSION = 7; -const CONTROL_READER_ACTIVE = 8; +const _CONTROL_READER_ACTIVE = 8; const META_FRAME_SIZE = 0; const META_FRAME_NUMBER = 1; @@ -233,9 +233,18 @@ export function createProducer(init: SharedFrameBufferInit): Producer { }; } +export interface BorrowedFrame { + data: Uint8Array; + frameSize: number; + release(): void; +} + export interface Consumer { read(timeoutMs?: number): ArrayBuffer | null; + readInto(target: Uint8Array, timeoutMs?: number): number | null; + borrow(timeoutMs?: number): BorrowedFrame | null; isShutdown(): boolean; + getSlotSize(): number; } function advanceReadIndexCAS( @@ -243,8 +252,10 @@ function advanceReadIndexCAS( readIdx: number, nextIdx: number, ): void { + const MAX_ADVANCE_RETRIES = 16; let expectedIdx = readIdx; - while (true) { + + for (let attempt = 0; attempt < MAX_ADVANCE_RETRIES; attempt++) { const exchanged = Atomics.compareExchange( controlView, CONTROL_READ_INDEX, @@ -373,8 +384,196 @@ export function createConsumer(buffer: SharedArrayBuffer): Consumer { return null; }, + readInto(target: Uint8Array, timeoutMs: number = 100): number | null { + const MAX_CAS_RETRIES = 3; + + for (let attempt = 0; attempt < MAX_CAS_RETRIES; attempt++) { + const shutdownFlag = Atomics.load(controlView, CONTROL_SHUTDOWN); + if (shutdownFlag) { + return null; + } + + const readIdx = Atomics.load(controlView, CONTROL_READ_INDEX); + const slotMetaIdx = + (metadataOffset + readIdx * METADATA_ENTRY_SIZE) / 4; + + let state = Atomics.load(metadataView, slotMetaIdx + META_SLOT_STATE); + + if (state !== SLOT_STATE.READY) { + const waitResult = Atomics.wait( + metadataView, + slotMetaIdx + META_SLOT_STATE, + state, + timeoutMs, + ); + if (waitResult === "timed-out") { + return null; + } + + const shutdownCheck = Atomics.load(controlView, CONTROL_SHUTDOWN); + if (shutdownCheck) { + return null; + } + + state = Atomics.load(metadataView, slotMetaIdx + META_SLOT_STATE); + if (state !== SLOT_STATE.READY) { + continue; + } + } + + const exchangedState = Atomics.compareExchange( + metadataView, + slotMetaIdx + META_SLOT_STATE, + SLOT_STATE.READY, + SLOT_STATE.READING, + ); + if (exchangedState !== SLOT_STATE.READY) { + continue; + } + + const frameSize = Atomics.load( + metadataView, + slotMetaIdx + META_FRAME_SIZE, + ); + const slotDataOffset = dataOffset + readIdx * slotSize; + + if ( + !Number.isInteger(frameSize) || + frameSize < 0 || + frameSize > slotSize || + slotDataOffset < 0 || + slotDataOffset + frameSize > buffer.byteLength || + frameSize > target.byteLength + ) { + Atomics.store( + metadataView, + slotMetaIdx + META_SLOT_STATE, + SLOT_STATE.EMPTY, + ); + const nextIdx = (readIdx + 1) % slotCount; + advanceReadIndexCAS(controlView, readIdx, nextIdx); + return null; + } + + target.set(new Uint8Array(buffer, slotDataOffset, frameSize), 0); + + Atomics.store( + metadataView, + slotMetaIdx + META_SLOT_STATE, + SLOT_STATE.EMPTY, + ); + + const nextIdx = (readIdx + 1) % slotCount; + advanceReadIndexCAS(controlView, readIdx, nextIdx); + + return frameSize; + } + + return null; + }, + + borrow(timeoutMs: number = 100): BorrowedFrame | null { + const MAX_CAS_RETRIES = 3; + + for (let attempt = 0; attempt < MAX_CAS_RETRIES; attempt++) { + const shutdownFlag = Atomics.load(controlView, CONTROL_SHUTDOWN); + if (shutdownFlag) { + return null; + } + + const readIdx = Atomics.load(controlView, CONTROL_READ_INDEX); + const slotMetaIdx = + (metadataOffset + readIdx * METADATA_ENTRY_SIZE) / 4; + + let state = Atomics.load(metadataView, slotMetaIdx + META_SLOT_STATE); + + if (state !== SLOT_STATE.READY) { + const waitResult = Atomics.wait( + metadataView, + slotMetaIdx + META_SLOT_STATE, + state, + timeoutMs, + ); + if (waitResult === "timed-out") { + return null; + } + + const shutdownCheck = Atomics.load(controlView, CONTROL_SHUTDOWN); + if (shutdownCheck) { + return null; + } + + state = Atomics.load(metadataView, slotMetaIdx + META_SLOT_STATE); + if (state !== SLOT_STATE.READY) { + continue; + } + } + + const exchangedState = Atomics.compareExchange( + metadataView, + slotMetaIdx + META_SLOT_STATE, + SLOT_STATE.READY, + SLOT_STATE.READING, + ); + if (exchangedState !== SLOT_STATE.READY) { + continue; + } + + const frameSize = Atomics.load( + metadataView, + slotMetaIdx + META_FRAME_SIZE, + ); + const slotDataOffset = dataOffset + readIdx * slotSize; + + if ( + !Number.isInteger(frameSize) || + frameSize < 0 || + frameSize > slotSize || + slotDataOffset < 0 || + slotDataOffset + frameSize > buffer.byteLength + ) { + Atomics.store( + metadataView, + slotMetaIdx + META_SLOT_STATE, + SLOT_STATE.EMPTY, + ); + const nextIdx = (readIdx + 1) % slotCount; + advanceReadIndexCAS(controlView, readIdx, nextIdx); + return null; + } + + const data = new Uint8Array(buffer, slotDataOffset, frameSize); + + let released = false; + const capturedReadIdx = readIdx; + const capturedSlotMetaIdx = slotMetaIdx; + + const release = () => { + if (released) return; + released = true; + + Atomics.store( + metadataView, + capturedSlotMetaIdx + META_SLOT_STATE, + SLOT_STATE.EMPTY, + ); + + const nextIdx = (capturedReadIdx + 1) % slotCount; + advanceReadIndexCAS(controlView, capturedReadIdx, nextIdx); + }; + + return { data, frameSize, release }; + } + + return null; + }, + isShutdown(): boolean { return Atomics.load(controlView, CONTROL_SHUTDOWN) === 1; }, + + getSlotSize(): number { + return slotSize; + }, }; } diff --git a/apps/desktop/src/utils/socket.ts b/apps/desktop/src/utils/socket.ts index 4484b5b043..5bf9c6ad47 100644 --- a/apps/desktop/src/utils/socket.ts +++ b/apps/desktop/src/utils/socket.ts @@ -8,11 +8,13 @@ import { type Producer, type SharedFrameBufferConfig, } from "./shared-frame-buffer"; +import type { StrideCorrectionResponse } from "./stride-correction-worker"; +import StrideCorrectionWorker from "./stride-correction-worker?worker"; const SAB_SUPPORTED = isSharedArrayBufferSupported(); const FRAME_BUFFER_CONFIG: SharedFrameBufferConfig = { - slotCount: 4, - slotSize: 8 * 1024 * 1024, + slotCount: 6, + slotSize: 16 * 1024 * 1024, }; export type FrameData = { @@ -26,6 +28,7 @@ export type CanvasControls = { resizeCanvas: (width: number, height: number) => void; hasRenderedFrame: () => boolean; initDirectCanvas: (canvas: HTMLCanvasElement) => void; + resetFrameState: () => void; }; interface ReadyMessage { @@ -110,6 +113,15 @@ export function createImageDataWS( let directCanvas: HTMLCanvasElement | null = null; let directCtx: CanvasRenderingContext2D | null = null; + let strideWorker: Worker | null = null; + + let cachedDirectImageData: ImageData | null = null; + let cachedDirectWidth = 0; + let cachedDirectHeight = 0; + + let cachedStrideImageData: ImageData | null = null; + let cachedStrideWidth = 0; + let cachedStrideHeight = 0; function cleanup() { if (isCleanedUp) return; @@ -123,10 +135,23 @@ export function createImageDataWS( worker.onmessage = null; worker.terminate(); + if (strideWorker) { + strideWorker.onmessage = null; + strideWorker.terminate(); + strideWorker = null; + } + pendingFrame = null; nextFrame = null; isProcessing = false; + cachedDirectImageData = null; + cachedDirectWidth = 0; + cachedDirectHeight = 0; + cachedStrideImageData = null; + cachedStrideWidth = 0; + cachedStrideHeight = 0; + setIsConnected(false); } @@ -141,6 +166,37 @@ export function createImageDataWS( initDirectCanvas: (canvas: HTMLCanvasElement) => { directCanvas = canvas; directCtx = canvas.getContext("2d", { alpha: false }); + strideWorker = new StrideCorrectionWorker(); + strideWorker.onmessage = (e: MessageEvent) => { + if (e.data.type !== "corrected" || !directCanvas || !directCtx) return; + + const { buffer, width, height } = e.data; + if (directCanvas.width !== width || directCanvas.height !== height) { + directCanvas.width = width; + directCanvas.height = height; + } + + const frameData = new Uint8ClampedArray(buffer); + if ( + !cachedStrideImageData || + cachedStrideWidth !== width || + cachedStrideHeight !== height + ) { + cachedStrideImageData = new ImageData(width, height); + cachedStrideWidth = width; + cachedStrideHeight = height; + } + cachedStrideImageData.data.set(frameData); + directCtx.putImageData(cachedStrideImageData, 0, 0); + + if (!hasRenderedFrame()) { + setHasRenderedFrame(true); + } + onmessage({ width, height }); + }; + }, + resetFrameState: () => { + worker.postMessage({ type: "reset-frame-state" }); }, }; @@ -166,6 +222,8 @@ export function createImageDataWS( } if (e.data.type === "frame-rendered") { + const { width, height } = e.data; + onmessage({ width, height }); if (!hasRenderedFrame()) { setHasRenderedFrame(true); } @@ -225,53 +283,60 @@ export function createImageDataWS( ws.onmessage = (event) => { const buffer = event.data as ArrayBuffer; - if (directCanvas && directCtx) { - const data = new Uint8Array(buffer); - if (data.length >= 12) { - const metadataOffset = data.length - 12; - const meta = new DataView(buffer, metadataOffset, 12); + if (directCanvas && directCtx && strideWorker) { + if (buffer.byteLength >= 24) { + const metadataOffset = buffer.byteLength - 24; + const meta = new DataView(buffer, metadataOffset, 24); const strideBytes = meta.getUint32(0, true); const height = meta.getUint32(4, true); const width = meta.getUint32(8, true); if (width > 0 && height > 0) { const expectedRowBytes = width * 4; - let frameData: Uint8ClampedArray; if (strideBytes === expectedRowBytes) { - frameData = new Uint8ClampedArray( + const frameData = new Uint8ClampedArray( buffer, 0, expectedRowBytes * height, ); - } else { - frameData = new Uint8ClampedArray(expectedRowBytes * height); - for (let row = 0; row < height; row++) { - const srcStart = row * strideBytes; - const destStart = row * expectedRowBytes; - frameData.set( - new Uint8ClampedArray(buffer, srcStart, expectedRowBytes), - destStart, - ); - } - } - if (directCanvas.width !== width || directCanvas.height !== height) { - directCanvas.width = width; - directCanvas.height = height; - } + if ( + directCanvas.width !== width || + directCanvas.height !== height + ) { + directCanvas.width = width; + directCanvas.height = height; + } - const imageData = new ImageData( - new Uint8ClampedArray(frameData.buffer), - width, - height, - ); - directCtx.putImageData(imageData, 0, 0); + if ( + !cachedDirectImageData || + cachedDirectWidth !== width || + cachedDirectHeight !== height + ) { + cachedDirectImageData = new ImageData(width, height); + cachedDirectWidth = width; + cachedDirectHeight = height; + } + cachedDirectImageData.data.set(frameData); + directCtx.putImageData(cachedDirectImageData, 0, 0); - if (!hasRenderedFrame()) { - setHasRenderedFrame(true); + if (!hasRenderedFrame()) { + setHasRenderedFrame(true); + } + onmessage({ width, height }); + } else { + strideWorker.postMessage( + { + type: "correct-stride", + buffer, + strideBytes, + width, + height, + }, + [buffer], + ); } - onmessage({ width, height }); } } return; diff --git a/apps/desktop/src/utils/stride-correction-worker.ts b/apps/desktop/src/utils/stride-correction-worker.ts new file mode 100644 index 0000000000..b98a355bd3 --- /dev/null +++ b/apps/desktop/src/utils/stride-correction-worker.ts @@ -0,0 +1,60 @@ +interface StrideCorrectionRequest { + type: "correct-stride"; + buffer: ArrayBuffer; + strideBytes: number; + width: number; + height: number; +} + +interface StrideCorrectionResponse { + type: "corrected"; + buffer: ArrayBuffer; + width: number; + height: number; +} + +interface ErrorResponse { + type: "error"; + message: string; +} + +let correctionBuffer: Uint8ClampedArray | null = null; +let correctionBufferSize = 0; + +self.onmessage = (e: MessageEvent) => { + if (e.data.type !== "correct-stride") return; + + const { buffer, strideBytes, width, height } = e.data; + const expectedRowBytes = width * 4; + const expectedLength = expectedRowBytes * height; + + if (!correctionBuffer || correctionBufferSize < expectedLength) { + correctionBuffer = new Uint8ClampedArray(expectedLength); + correctionBufferSize = expectedLength; + } + + const srcData = new Uint8ClampedArray(buffer); + for (let row = 0; row < height; row++) { + const srcStart = row * strideBytes; + const destStart = row * expectedRowBytes; + correctionBuffer.set( + srcData.subarray(srcStart, srcStart + expectedRowBytes), + destStart, + ); + } + + const result = correctionBuffer.slice(0, expectedLength); + const response: StrideCorrectionResponse = { + type: "corrected", + buffer: result.buffer, + width, + height, + }; + self.postMessage(response, { transfer: [result.buffer] }); +}; + +export type { + StrideCorrectionRequest, + StrideCorrectionResponse, + ErrorResponse, +}; diff --git a/apps/desktop/src/utils/webgpu-renderer.ts b/apps/desktop/src/utils/webgpu-renderer.ts index b97d3ba359..306fed630f 100644 --- a/apps/desktop/src/utils/webgpu-renderer.ts +++ b/apps/desktop/src/utils/webgpu-renderer.ts @@ -153,6 +153,7 @@ export function renderFrameWebGPU( data: Uint8ClampedArray, width: number, height: number, + bytesPerRow: number = width * 4, ): void { const { device, context, pipeline, sampler, bindGroupLayout, canvas } = renderer; @@ -184,7 +185,7 @@ export function renderFrameWebGPU( return; } - const requiredBytes = width * height * 4; + const requiredBytes = bytesPerRow * height; if (data.byteLength < requiredBytes) { console.error( `WebGPU renderFrame: buffer too small. Expected at least ${requiredBytes} bytes, got ${data.byteLength}`, @@ -198,7 +199,7 @@ export function renderFrameWebGPU( device.queue.writeTexture( { texture: renderer.frameTexture }, textureData.buffer as unknown as GPUAllowSharedBufferSource, - { bytesPerRow: width * 4, rowsPerImage: height }, + { bytesPerRow, rowsPerImage: height }, { width, height }, ); diff --git a/crates/rendering/src/decoder/ffmpeg.rs b/crates/rendering/src/decoder/ffmpeg.rs index b49ee06110..4186ad676b 100644 --- a/crates/rendering/src/decoder/ffmpeg.rs +++ b/crates/rendering/src/decoder/ffmpeg.rs @@ -33,16 +33,18 @@ struct ProcessedFrame { impl ProcessedFrame { fn to_decoded_frame(&self) -> DecodedFrame { match self.format { - PixelFormat::Rgba => DecodedFrame::new((*self.data).clone(), self.width, self.height), - PixelFormat::Nv12 => DecodedFrame::new_nv12( - (*self.data).clone(), + PixelFormat::Rgba => { + DecodedFrame::new_with_arc(Arc::clone(&self.data), self.width, self.height) + } + PixelFormat::Nv12 => DecodedFrame::new_nv12_with_arc( + Arc::clone(&self.data), self.width, self.height, self.y_stride, self.uv_stride, ), - PixelFormat::Yuv420p => DecodedFrame::new_yuv420p( - (*self.data).clone(), + PixelFormat::Yuv420p => DecodedFrame::new_yuv420p_with_arc( + Arc::clone(&self.data), self.width, self.height, self.y_stride, diff --git a/crates/rendering/src/frame_pipeline.rs b/crates/rendering/src/frame_pipeline.rs index 49084728ee..290e85bd3b 100644 --- a/crates/rendering/src/frame_pipeline.rs +++ b/crates/rendering/src/frame_pipeline.rs @@ -10,6 +10,8 @@ pub struct PendingReadback { padded_bytes_per_row: u32, width: u32, height: u32, + frame_number: u32, + frame_rate: u32, } impl PendingReadback { @@ -44,16 +46,22 @@ impl PendingReadback { let buffer_slice = self.buffer.slice(..); let data = buffer_slice.get_mapped_range(); - let data_vec = data.to_vec(); + let mut data_vec = Vec::with_capacity(data.len() + 24); + data_vec.extend_from_slice(&data); drop(data); self.buffer.unmap(); + let target_time_ns = + (self.frame_number as u64 * 1_000_000_000) / self.frame_rate.max(1) as u64; + Ok(RenderedFrame { data: data_vec, padded_bytes_per_row: self.padded_bytes_per_row, width: self.width, height: self.height, + frame_number: self.frame_number, + target_time_ns, }) } } @@ -162,6 +170,8 @@ impl PipelinedGpuReadback { padded_bytes_per_row, width: uniforms.output_size.0, height: uniforms.output_size.1, + frame_number: uniforms.frame_number, + frame_rate: uniforms.frame_rate, }); Ok(()) @@ -309,6 +319,8 @@ pub struct RenderedFrame { pub width: u32, pub height: u32, pub padded_bytes_per_row: u32, + pub frame_number: u32, + pub target_time_ns: u64, } // impl FramePipelineEncoder { diff --git a/crates/rendering/src/lib.rs b/crates/rendering/src/lib.rs index 48f2b20889..8458d5c5ed 100644 --- a/crates/rendering/src/lib.rs +++ b/crates/rendering/src/lib.rs @@ -464,6 +464,7 @@ pub struct ProjectUniforms { pub output_size: (u32, u32), pub cursor_size: f32, pub frame_rate: u32, + pub frame_number: u32, display: CompositeVideoFrameUniforms, camera: Option, camera_only: Option, @@ -1557,6 +1558,7 @@ impl ProjectUniforms { scene, interpolated_cursor, frame_rate: fps, + frame_number, prev_cursor: prev_interpolated_cursor, display_parent_motion_px: display_motion_parent, motion_blur_amount: user_motion_blur,