Skip to content

Commit c3aab2f

Browse files
committed
wip3
1 parent 288c208 commit c3aab2f

File tree

5 files changed

+80
-111
lines changed

5 files changed

+80
-111
lines changed

lib/ex_webrtc/rtp/av1/depayloader.ex

Lines changed: 58 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -14,26 +14,16 @@ defmodule ExWebRTC.RTP.Depayloader.AV1 do
1414

1515
alias ExWebRTC.RTP.AV1.{OBU, Payload}
1616

17-
@temporal_delimiter %OBU{
18-
type: 2,
19-
x: 0,
20-
s: 1,
21-
payload: <<>>
22-
}
23-
|> OBU.serialize()
24-
2517
@type t :: %__MODULE__{
2618
current_temporal_unit: [OBU.t()],
2719
current_obu: binary() | nil,
2820
current_timestamp: ExRTP.Packet.uint32() | nil
2921
}
3022

31-
defstruct [current_temporal_unit: [], current_obu: nil, current_timestamp: nil]
23+
defstruct current_temporal_unit: [], current_obu: nil, current_timestamp: nil
3224

33-
# XXX where warnings, where debugs?
34-
# XXX MUST ignore TD OBU, Tile List OBU
3525
@impl true
36-
def new() do
26+
def new do
3727
%__MODULE__{}
3828
end
3929

@@ -45,21 +35,51 @@ defmodule ExWebRTC.RTP.Depayloader.AV1 do
4535
def depayload(depayloader, packet) do
4636
case Payload.parse(packet.payload) do
4737
{:ok, av1_payload} ->
48-
do_depayload2(depayloader, packet, av1_payload)
38+
do_depayload(depayloader, packet, av1_payload)
4939

5040
{:error, reason} ->
5141
Logger.warning("""
5242
Couldn't parse payload, reason: #{reason}. \
5343
Resetting depayloader state. Payload: #{inspect(packet.payload)}.\
5444
""")
5545

56-
{:ok, %{depayloader | current_temporal_unit: [], current_timestamp: nil}}
46+
{:ok, %__MODULE__{}}
47+
end
48+
end
49+
50+
defp do_depayload(depayloader, packet, %Payload{z: z, y: y} = av1_payload) do
51+
{obus, current_obu_fragment, next_obu_fragment} =
52+
av1_payload
53+
|> Payload.depayload_obu_elements()
54+
|> parse_obu_elements(z, y)
55+
56+
# TODO: handle marker, or not (?)
57+
# av1-rtp-spec sec. 4.2.: It is possible for a receiver to receive the last packet of a temporal unit
58+
# without the marker bit being set equal to 1, and a receiver should be able to handle this case.
59+
# at the moment, we're looking at the timestamps only, and it seems to work
60+
#
61+
# TODO: handle the case where depayloader.current_timestamp > packet.timestamp
62+
new_temporal_unit? = depayloader.current_timestamp != packet.timestamp
63+
64+
{depayloader, obus, next_obu_fragment} =
65+
depayloader
66+
|> update_current_obu(current_obu_fragment, new_temporal_unit?)
67+
|> maybe_flush_current_obu(obus, next_obu_fragment, y)
68+
69+
if new_temporal_unit? do
70+
{temporal_unit, depayloader} = flush_temporal_unit(depayloader)
71+
72+
{temporal_unit,
73+
update_temporal_unit(depayloader, obus, next_obu_fragment, packet.timestamp)}
74+
else
75+
{nil, update_temporal_unit(depayloader, obus, next_obu_fragment, packet.timestamp)}
5776
end
5877
end
5978

6079
defp parse_obu_elements(obu_elements, z, y)
6180

6281
defp parse_obu_elements([], _, _) do
82+
# TODO: decide where to use debug logs and where warnings
6383
Logger.debug("AV1 payload contains no valid OBU elements. Dropping packet.")
6484
{[], nil, nil}
6585
end
@@ -85,7 +105,7 @@ defmodule ExWebRTC.RTP.Depayloader.AV1 do
85105
{obus, current_obu_fragment, next_obu_fragment}
86106
end
87107

88-
defp update_current_obu(depayloader, current_obu_fragment, new_frame?)
108+
defp update_current_obu(depayloader, current_obu_fragment, new_temporal_unit?)
89109

90110
defp update_current_obu(depayloader, current_obu_fragment, true) do
91111
if depayloader.current_obu != nil do
@@ -95,7 +115,9 @@ defmodule ExWebRTC.RTP.Depayloader.AV1 do
95115
end
96116

97117
if current_obu_fragment != nil do
98-
Logger.debug("Received ")
118+
Logger.debug(
119+
"Received middle OBU fragment from a new temporal unit without beginning the OBU. Dropping this OBU fragment."
120+
)
99121
end
100122

101123
%{depayloader | current_obu: nil}
@@ -136,7 +158,7 @@ defmodule ExWebRTC.RTP.Depayloader.AV1 do
136158
end
137159

138160
# Packet contained exactly 1 OBU fragment, current_obu will be continued. Do not flush
139-
# XXX check the nil
161+
# TODO: make sure `nil` is correct here
140162
defp maybe_flush_current_obu(%__MODULE__{current_obu: incomplete_obu} = depayloader, [], nil, 1) do
141163
{depayloader, [], incomplete_obu}
142164
end
@@ -165,100 +187,43 @@ defmodule ExWebRTC.RTP.Depayloader.AV1 do
165187
}
166188
end
167189

168-
# XXX UWAGA NA TIMESTAMPA ŚMIERDZIELA
169-
defp flush_temporal_unit(%__MODULE__{current_temporal_unit: tu} = depayloader) when tu != [] do
170-
# Force s=1 for the low overhead bitstring format
190+
defp flush_temporal_unit(%__MODULE__{current_temporal_unit: tu}) when tu != [] do
191+
# Force s=1 for the low overhead bitstream format
171192
tu_binary =
172193
tu
173194
|> Stream.map(&%OBU{&1 | s: 1})
174195
|> Stream.map(&OBU.serialize/1)
175196
|> Enum.reverse()
176197
|> :erlang.iolist_to_binary()
177198

178-
# if current obu not nil, log
179-
{@temporal_delimiter <> tu_binary,
180-
%{depayloader | current_temporal_unit: [], current_obu: nil, current_timestamp: nil}}
199+
# TODO: is it possible that `current_obu != nil` here?
200+
{OBU.temporal_delimiter() <> tu_binary, %__MODULE__{}}
181201
end
182202

183203
defp flush_temporal_unit(depayloader) do
184-
Logger.debug("WRITEME")
185-
# XXX maybe zero?
204+
Logger.debug("Previous temporal unit is empty, nothing to flush")
186205
{nil, depayloader}
187206
end
188207

189-
defp do_depayload2(depayloader, packet, %Payload{z: z, y: y} = av1_payload) do
190-
{obus, current_obu_fragment, next_obu_fragment} =
191-
av1_payload
192-
|> Payload.depayload_obu_elements()
193-
|> parse_obu_elements(z, y)
194-
195-
# {[A], 0, 0} -> {[A], nil, nil}
196-
# {[A], 0, 1} -> {[], nil, A}
197-
# {[A], 1, 0} -> {[], A, nil}
198-
# {[A], 1, 1} -> {[], A, nil}
199-
#
200-
# {[B, C, D], 0, 0} -> {[B, C, D], nil, nil}
201-
# {[B, C, D], 0, 1} -> {[B, C], nil, D}
202-
# {[B, C, D], 1, 0} -> {[C, D], B, nil}
203-
# {[B, C, D], 1, 1} -> {[C], B, D}
204-
205-
new_temporal_unit? = depayloader.current_timestamp != packet.timestamp
206-
207-
{depayloader, obus, next_obu_fragment} =
208-
depayloader
209-
|> update_current_obu(current_obu_fragment, new_temporal_unit?)
210-
|> maybe_flush_current_obu(obus, next_obu_fragment, y)
211-
212-
# {[A], 0, 0} -> nil
213-
# {[A], 0, 1} -> nil
214-
# {[A], 1, 0} -> if CO != nil, do: CO <> A, else: nil
215-
# {[A], 1, 1} -> if CO != nil, do: CO <> A, else: nil
216-
#
217-
# {[B, C, D], 0, 0} -> nil
218-
# {[B, C, D], 0, 1} -> nil
219-
# {[B, C, D], 1, 0} -> if CO != nil, do: CO <> B, else: nil
220-
# {[B, C, D], 1, 1} -> if CO != nil, do: CO <> B, else: nil
221-
222-
# {[A], 0, 0} -> {nil, [A]}
223-
# {[A], 0, 1} -> {A, []}
224-
# {[A], 1, _}, cat err -> {nil, []}
225-
# {[A], 1, 0}, cat ok -> {nil, [CO]}
226-
# {[A], 1, 1}, cat ok -> {CO, []}
227-
#
228-
# {[B, C, D], 0, 0} -> {nil, [B, C, D]}
229-
# {[B, C, D], 0, 1} -> {D, [B, C]}
230-
# {[B, C, D], 1, 0}, cat err -> {nil, [C, D]}
231-
# {[B, C, D], 1, 1}, cat err -> {D, [C]}
232-
# {[B, C, D], 1, 0}, cat ok -> {nil, [CO, C, D]}
233-
# {[B, C, D], 1, 1}, cat ok -> {D, [CO, C]}
234-
235-
# XXX what to do if we're ready to flush two?
236-
# depayloader
237-
# |> update_temporal_unit(obus, next_obu_fragment)
238-
# |> maybe_flush_temporal_unit(packet)
239-
240-
if new_temporal_unit? do
241-
{temporal_unit, depayloader} = flush_temporal_unit(depayloader)
242-
243-
{temporal_unit,
244-
update_temporal_unit(depayloader, obus, next_obu_fragment, packet.timestamp)}
245-
else
246-
{nil, update_temporal_unit(depayloader, obus, next_obu_fragment, packet.timestamp)}
247-
end
248-
end
249-
250208
defp append_obus([], tu), do: tu
251209

252210
defp append_obus([obu_binary | rest], tu) do
253-
case OBU.parse(obu_binary) do
254-
{:ok, obu, rest_of_binary} ->
255-
if rest_of_binary != <<>>, do: Logger.debug("WRITEME")
256-
257-
append_obus(rest, [obu | tu])
258-
211+
with {:ok, obu, rest_of_binary} <- OBU.parse(obu_binary),
212+
true <- OBU.should_be_transmitted?(obu) do
213+
if rest_of_binary != <<>>,
214+
do:
215+
Logger.debug(
216+
"OBU binary contains additional data after the decoded OBU, dropping the additional data"
217+
)
218+
219+
append_obus(rest, [obu | tu])
220+
else
259221
{:error, :invalid_av1_bitstream} ->
260-
Logger.debug("WRITEME")
222+
Logger.debug("Unable to parse OBU from binary data, dropping")
223+
append_obus(rest, tu)
261224

225+
false ->
226+
Logger.debug("Dropping temporal delimiter/tile list OBU")
262227
append_obus(rest, tu)
263228
end
264229
end

lib/ex_webrtc/rtp/av1/obu.ex

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ defmodule ExWebRTC.RTP.AV1.OBU do
2424

2525
@obu_sequence_header 1
2626
@obu_temporal_delimiter 2
27+
@obu_tile_list 8
2728
@obu_padding 15
2829

2930
@type t :: %__MODULE__{
@@ -145,4 +146,19 @@ defmodule ExWebRTC.RTP.AV1.OBU do
145146
end
146147

147148
def disable_dropping_in_decoder_if_applicable(obu), do: obu
149+
150+
@spec temporal_delimiter :: binary()
151+
def temporal_delimiter,
152+
do: %__MODULE__{type: @obu_temporal_delimiter, x: 0, s: 1, payload: <<>>} |> serialize()
153+
154+
@doc """
155+
Determines whether the OBU should be removed when transmitting, and must be ignored when receiving
156+
in accordance with av1-rtp-spec sec. 5.
157+
"""
158+
@spec should_be_transmitted?(t()) :: boolean()
159+
def should_be_transmitted?(obu)
160+
161+
def should_be_transmitted?(%__MODULE__{type: @obu_temporal_delimiter}), do: false
162+
def should_be_transmitted?(%__MODULE__{type: @obu_tile_list}), do: false
163+
def should_be_transmitted?(_obu), do: true
148164
end

lib/ex_webrtc/rtp/av1/payload.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ defmodule ExWebRTC.RTP.AV1.Payload do
112112

113113
defp parse_obu_elements(<<>>, count, obu_elements) do
114114
if count > 0,
115-
do: Logger.warning("Invalid AV1 RTP payload: expected #{count} more OBU elements.")
115+
do: Logger.debug("Invalid AV1 RTP aggregation header: expected #{count} more OBU elements.")
116116

117117
Enum.reverse(obu_elements)
118118
end

lib/ex_webrtc/rtp/av1/payloader.ex

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ defmodule ExWebRTC.RTP.Payloader.AV1 do
1616
alias ExWebRTC.Utils
1717

1818
@obu_sequence_header 1
19-
@obu_temporal_delimiter 2
2019

2120
@aggregation_header_size_bytes 1
2221

@@ -36,25 +35,14 @@ defmodule ExWebRTC.RTP.Payloader.AV1 do
3635
def payload(payloader, temporal_unit) when temporal_unit != <<>> do
3736
# In AV1, a temporal unit consists of all OBUs associated with a specific time instant.
3837
# Temporal units always start with a temporal delimiter OBU. They may contain multiple AV1 frames.
39-
# av1-rtp-spec sec. 5: The temporal delimiter OBU should be removed when transmitting.
40-
obus =
41-
case parse_obus(temporal_unit) do
42-
[%OBU{type: @obu_temporal_delimiter} | next_obus] ->
43-
next_obus
44-
45-
_ ->
46-
Logger.warning("""
47-
Invalid AV1 temporal unit: does not start with temporal delimiter OBU. \
48-
Dropping temporal unit.\
49-
""")
50-
51-
[]
52-
end
5338

5439
# With the current implementation, each RTP packet will contain one OBU element.
5540
# This element can be an entire OBU, or a fragment of an OBU bigger than max_payload_size.
5641
rtp_packets =
57-
Stream.flat_map(obus, fn obu ->
42+
temporal_unit
43+
|> parse_obus()
44+
|> Stream.filter(&OBU.should_be_transmitted?/1)
45+
|> Stream.flat_map(fn obu ->
5846
n_bit = Utils.to_int(obu.type == @obu_sequence_header)
5947

6048
obu

lib/ex_webrtc/rtp/vp8/depayloader.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ defmodule ExWebRTC.RTP.Depayloader.VP8 do
3838
Resetting depayloader state. Payload: #{inspect(packet.payload)}.\
3939
""")
4040

41-
{:ok, %{depayloader | current_frame: nil, current_timestamp: nil}}
41+
{nil, %{depayloader | current_frame: nil, current_timestamp: nil}}
4242
end
4343
end
4444

0 commit comments

Comments
 (0)