From 74d8ca7ad261557bd492abb978f09d3450b705a6 Mon Sep 17 00:00:00 2001 From: Jakub Pisarek <99591440+sgfn@users.noreply.github.com> Date: Wed, 19 Feb 2025 13:37:52 +0100 Subject: [PATCH] Extract recorder to separate package --- lib/ex_webrtc/recorder.ex | 196 ---------------------------- lib/ex_webrtc/recorder/converter.ex | 182 -------------------------- 2 files changed, 378 deletions(-) delete mode 100644 lib/ex_webrtc/recorder.ex delete mode 100644 lib/ex_webrtc/recorder/converter.ex diff --git a/lib/ex_webrtc/recorder.ex b/lib/ex_webrtc/recorder.ex deleted file mode 100644 index 930e7248..00000000 --- a/lib/ex_webrtc/recorder.ex +++ /dev/null @@ -1,196 +0,0 @@ -defmodule ExWebRTC.Recorder do - @moduledoc """ - Saves received RTP packets to a file for later processing/analysis. - - Dumps raw RTP packets fed to it in a custom format. Use `Recorder.Converter` to process them. - """ - - use GenServer - - alias ExWebRTC.MediaStreamTrack - - require Logger - - @default_base_dir "./recordings" - - @typedoc """ - Options that can be passed to `start_link/1`. - - * `base_dir` - Base directory where Recorder will save its artifacts. `#{@default_base_dir}` by default. - * `on_start` - Callback that will be executed just after the Recorder is (re)started. - It should return the initial list of tracks to be added. - """ - @type option :: - {:base_dir, String.t()} - | {:on_start, (-> [MediaStreamTrack.t()])} - - @type options :: [option()] - - # Necessary to start Recorder under a supervisor using `{Recorder, [recorder_opts, gen_server_opts]}` - @doc false - @spec child_spec(list()) :: Supervisor.child_spec() - def child_spec(args) do - %{ - id: __MODULE__, - start: {__MODULE__, :start_link, args} - } - end - - @doc """ - Starts a new `ExWebRTC.Recorder` process. - - `ExWebRTC.Recorder` is a `GenServer` under the hood, thus this function allows for - passing the generic `t:GenServer.options/0` as an argument. - """ - @spec start(options(), GenServer.options()) :: GenServer.on_start() - def start(recorder_opts \\ [], gen_server_opts \\ []) do - GenServer.start(__MODULE__, recorder_opts, gen_server_opts) - end - - @doc """ - Starts a new `ExWebRTC.Recorder` process. - - Works identically to `start/2`, but links to the calling process. - """ - @spec start_link(options(), GenServer.options()) :: GenServer.on_start() - def start_link(recorder_opts \\ [], gen_server_opts \\ []) do - GenServer.start_link(__MODULE__, recorder_opts, gen_server_opts) - end - - @doc """ - Adds new tracks to the recording. - """ - @spec add_tracks(GenServer.server(), [MediaStreamTrack.t()]) :: :ok - def add_tracks(recorder, tracks) do - GenServer.call(recorder, {:add_tracks, tracks}) - end - - @doc """ - Records a received packet on the given track. - """ - @spec record( - GenServer.server(), - MediaStreamTrack.id(), - MediaStreamTrack.rid() | nil, - ExRTP.Packet.t() - ) :: :ok - def record(recorder, track_id, rid, %ExRTP.Packet{} = packet) do - recv_time = System.monotonic_time(:millisecond) - GenServer.cast(recorder, {:record, track_id, rid, recv_time, packet}) - end - - @impl true - def init(config) do - base_dir = - (config[:base_dir] || @default_base_dir) - |> Path.join(current_datetime()) - |> Path.expand() - - :ok = File.mkdir_p!(base_dir) - Logger.info("Starting recorder. Recordings will be saved under: #{base_dir}") - - state = %{ - base_dir: base_dir, - tracks: %{} - } - - case config[:on_start] do - nil -> - {:ok, state} - - callback -> - {:ok, state, {:continue, {:on_start, callback}}} - end - end - - @impl true - def handle_continue({:on_start, on_start}, state) do - case on_start.() do - [] -> - {:noreply, state} - - tracks -> - state = do_add_tracks(tracks, state) - {:noreply, state} - end - end - - @impl true - def handle_call({:add_tracks, tracks}, _from, state) do - state = do_add_tracks(tracks, state) - {:reply, :ok, state} - end - - @impl true - def handle_cast({:record, track_id, rid, recv_time, packet}, state) - when is_map_key(state.tracks, track_id) do - %{file: file, rid_map: rid_map} = state.tracks[track_id] - - case rid_map do - %{^rid => rid_idx} -> - :ok = IO.binwrite(file, serialize_packet(packet, rid_idx, recv_time)) - - _other -> - Logger.warning(""" - Tried to save packet for unknown rid. Ignoring. Track id: #{inspect(track_id)}, rid: #{inspect(rid)}.\ - """) - end - - {:noreply, state} - end - - @impl true - def handle_cast({:record, track_id, _rid, _recv_time, _packet}, state) do - Logger.warning(""" - Tried to save packet for unknown track id. Ignoring. Track id: #{inspect(track_id)}.\ - """) - - {:noreply, state} - end - - @impl true - def handle_info(_msg, state) do - {:noreply, state} - end - - defp do_add_tracks(tracks, state) do - start_time = DateTime.utc_now() - - tracks = - Map.new(tracks, fn track -> - path = Path.join(state.base_dir, "#{track.id}.rtpx") - file = File.open!(path, [:write]) - rid_map = (track.rids || [nil]) |> Enum.with_index() |> Map.new() - - {track.id, - %{kind: track.kind, rid_map: rid_map, path: path, file: file, start_time: start_time}} - end) - - state = %{state | tracks: Map.merge(state.tracks, tracks)} - report_path = Path.join(state.base_dir, "report.json") - - report = - Map.new(state.tracks, fn {id, track} -> - track = Map.delete(track, :file) - {id, track} - end) - - :ok = File.write!(report_path, Jason.encode!(report)) - - state - end - - defp serialize_packet(packet, rid_idx, recv_time) do - packet = ExRTP.Packet.encode(packet) - packet_size = byte_size(packet) - <> - end - - defp current_datetime() do - {{y, mo, d}, {h, m, s}} = :calendar.local_time() - - # e.g. 20240130-120315 - :io_lib.format("~4..0w~2..0w~2..0w-~2..0w~2..0w~2..0w", [y, mo, d, h, m, s]) - |> to_string() - end -end diff --git a/lib/ex_webrtc/recorder/converter.ex b/lib/ex_webrtc/recorder/converter.ex deleted file mode 100644 index 8a404834..00000000 --- a/lib/ex_webrtc/recorder/converter.ex +++ /dev/null @@ -1,182 +0,0 @@ -defmodule ExWebRTC.Recorder.Converter do - @moduledoc """ - Processes RTP packet files saved by `Recorder`. - - At the moment, `Converter` works only with VP8 video and Opus audio. - """ - - require Logger - - alias ExWebRTC.RTP.JitterBuffer.PacketStore - alias ExWebRTC.RTPCodecParameters - alias ExWebRTC.RTP.Depayloader - alias ExWebRTC.Media.{IVF, Ogg} - - # TODO: Allow changing these values - @ivf_header_opts [ - # <> = "VP80" - fourcc: 808_996_950, - height: 720, - width: 1280, - num_frames: 1024, - timebase_denum: 24, - timebase_num: 1 - ] - - # TODO: Support codecs other than VP8/Opus - @video_codec_params %RTPCodecParameters{ - payload_type: 96, - mime_type: "video/VP8", - clock_rate: 90_000 - } - - @audio_codec_params %RTPCodecParameters{ - payload_type: 111, - mime_type: "audio/opus", - clock_rate: 48_000, - channels: 2 - } - - @default_output_path "./converter_output" - - @doc """ - Convert the saved dumps of tracks in the report to IVF and Ogg files. - """ - @spec convert!(Path.t(), Path.t()) :: :ok | no_return() - def convert!(report_path, output_path \\ @default_output_path) do - report_path = - report_path - |> Path.expand() - |> then( - &if(File.dir?(&1), - do: Path.join(&1, "report.json"), - else: &1 - ) - ) - - output_path = Path.expand(output_path) - File.mkdir_p!(output_path) - - report = - report_path - |> File.read!() - |> Jason.decode!() - - for {id, track} <- report do - %{ - "path" => path, - "kind" => kind, - "rid_map" => rid_map - } = track - - file = File.open!(path) - - packets = - read_packets(file, Map.new(rid_map, fn {_rid, rid_idx} -> {rid_idx, %PacketStore{}} end)) - - case kind do - "video" -> - convert_video_track(id, rid_map, output_path, packets) - - "audio" -> - convert_audio_track(id, output_path, packets |> Map.values() |> hd()) - end - end - - :ok - end - - defp convert_video_track(id, rid_map, output_path, packets) do - for {rid, rid_idx} <- rid_map do - filename = if rid == "nil", do: "#{id}.ivf", else: "#{id}_#{rid}.ivf" - - {:ok, writer} = - output_path - |> Path.join(filename) - |> IVF.Writer.open(@ivf_header_opts) - - {:ok, depayloader} = Depayloader.new(@video_codec_params) - do_convert_video_track(packets[rid_idx], depayloader, writer) - end - end - - defp do_convert_video_track(packets, depayloader, writer, frames_cnt \\ 0) - defp do_convert_video_track([], _depayloader, writer, _frames_cnt), do: IVF.Writer.close(writer) - - defp do_convert_video_track([packet | rest], depayloader, writer, frames_cnt) do - case Depayloader.depayload(depayloader, packet) do - {nil, depayloader} -> - do_convert_video_track(rest, depayloader, writer, frames_cnt) - - {vp8_frame, depayloader} -> - frame = %IVF.Frame{timestamp: frames_cnt, data: vp8_frame} - {:ok, writer} = IVF.Writer.write_frame(writer, frame) - do_convert_video_track(rest, depayloader, writer, frames_cnt + 1) - end - end - - defp convert_audio_track(id, output_path, packets) do - {:ok, writer} = - output_path - |> Path.join("#{id}.ogg") - |> Ogg.Writer.open() - - {:ok, depayloader} = Depayloader.new(@audio_codec_params) - do_convert_audio_track(packets, depayloader, writer) - end - - defp do_convert_audio_track([], _depayloader, writer), do: Ogg.Writer.close(writer) - - defp do_convert_audio_track([packet | rest], depayloader, writer) do - {opus_packet, depayloader} = Depayloader.depayload(depayloader, packet) - {:ok, writer} = Ogg.Writer.write_packet(writer, opus_packet) - do_convert_audio_track(rest, depayloader, writer) - end - - defp read_packets(file, stores) do - case read_packet(file) do - {:ok, rid_idx, packet} -> - stores = Map.update!(stores, rid_idx, &insert_packet_to_store(&1, packet)) - read_packets(file, stores) - - {:error, reason} -> - Logger.warning("Error decoding RTP packet: #{inspect(reason)}") - read_packets(file, stores) - - :eof -> - Map.new(stores, fn {rid_idx, store} -> - {rid_idx, store |> PacketStore.dump() |> Enum.reject(&is_nil/1)} - end) - end - end - - defp read_packet(file) do - with {:ok, <>} <- read_exactly_n_bytes(file, 13), - {:ok, packet_data} <- read_exactly_n_bytes(file, packet_size), - {:ok, packet} <- ExRTP.Packet.decode(packet_data) do - {:ok, rid_idx, packet} - end - end - - defp read_exactly_n_bytes(file, byte_cnt) do - with data when is_binary(data) <- IO.binread(file, byte_cnt), - true <- byte_cnt == byte_size(data) do - {:ok, data} - else - :eof -> :eof - false -> {:error, :not_enough_data} - {:error, _reason} = error -> error - end - end - - defp insert_packet_to_store(store, packet) do - case PacketStore.insert(store, packet) do - {:ok, store} -> - store - - {:error, :late_packet} -> - Logger.warning("Decoded late RTP packet") - store - end - end -end