@@ -16,17 +16,21 @@ package rtp
1616
1717import (
1818 "net"
19+ "net/netip"
1920 "sync"
2021 "sync/atomic"
2122 "time"
2223
2324 "github.com/frostbyte73/core"
2425 "github.com/pion/rtp"
26+
27+ "github.com/livekit/protocol/logger"
2528)
2629
2730var _ Writer = (* Conn )(nil )
2831
2932type ConnConfig struct {
33+ Log logger.Logger
3034 MediaTimeoutInitial time.Duration
3135 MediaTimeout time.Duration
3236 TimeoutCallback func ()
@@ -40,14 +44,18 @@ func NewConnWith(conn UDPConn, conf *ConnConfig) *Conn {
4044 if conf == nil {
4145 conf = & ConnConfig {}
4246 }
47+ if conf .Log == nil {
48+ conf .Log = logger .GetLogger ()
49+ }
4350 if conf .MediaTimeoutInitial <= 0 {
4451 conf .MediaTimeoutInitial = 30 * time .Second
4552 }
4653 if conf .MediaTimeout <= 0 {
4754 conf .MediaTimeout = 15 * time .Second
4855 }
4956 c := & Conn {
50- readBuf : make ([]byte , 1500 ), // MTU
57+ log : conf .Log ,
58+ readBuf : make ([]byte , MTUSize + 1 ), // larger buffer to detect overflow
5159 received : make (chan struct {}),
5260 conn : conn ,
5361 timeout : conf .MediaTimeout ,
@@ -67,7 +75,9 @@ type UDPConn interface {
6775 Close () error
6876}
6977
78+ // Deprecated: use MediaPort instead
7079type Conn struct {
80+ log logger.Logger
7181 wmu sync.Mutex
7282 conn UDPConn
7383 closed core.Fuse
@@ -131,9 +141,11 @@ func (c *Conn) Listen(portMin, portMax int, listenAddr string) error {
131141 if listenAddr == "" {
132142 listenAddr = "0.0.0.0"
133143 }
134-
135- var err error
136- c .conn , err = ListenUDPPortRange (portMin , portMax , net .ParseIP (listenAddr ))
144+ ip , err := netip .ParseAddr (listenAddr )
145+ if err != nil {
146+ return err
147+ }
148+ c .conn , err = ListenUDPPortRange (portMin , portMax , ip )
137149 if err != nil {
138150 return err
139151 }
@@ -150,13 +162,21 @@ func (c *Conn) ListenAndServe(portMin, portMax int, listenAddr string) error {
150162
151163func (c * Conn ) readLoop () {
152164 conn , buf := c .conn , c .readBuf
165+ overflow := false
153166 var p rtp.Packet
154167 for {
155168 n , srcAddr , err := conn .ReadFromUDP (buf )
156169 if err != nil {
157170 return
158171 }
159172 c .dest .Store (srcAddr )
173+ if n > MTUSize {
174+ if ! overflow {
175+ overflow = true
176+ c .log .Errorw ("RTP packet is larger than MTU limit" , nil )
177+ }
178+ continue // ignore partial messages
179+ }
160180
161181 p = rtp.Packet {}
162182 if err := p .Unmarshal (buf [:n ]); err != nil {
@@ -167,24 +187,23 @@ func (c *Conn) readLoop() {
167187 close (c .received )
168188 }
169189 if h := c .onRTP .Load (); h != nil {
170- _ = (* h ).HandleRTP (& p )
190+ _ = (* h ).HandleRTP (& p . Header , p . Payload )
171191 }
172192 }
173193}
174194
175- func (c * Conn ) WriteRTP (p * rtp.Packet ) error {
195+ func (c * Conn ) WriteRTP (h * rtp.Header , payload [] byte ) ( int , error ) {
176196 addr := c .dest .Load ()
177197 if addr == nil {
178- return nil
198+ return 0 , nil
179199 }
180- data , err := p .Marshal ()
200+ data , err := ( & rtp. Packet { Header : * h , Payload : payload }) .Marshal ()
181201 if err != nil {
182- return err
202+ return 0 , err
183203 }
184204 c .wmu .Lock ()
185205 defer c .wmu .Unlock ()
186- _ , err = c .conn .WriteToUDP (data , addr )
187- return err
206+ return c .conn .WriteToUDP (data , addr )
188207}
189208
190209func (c * Conn ) ReadRTP () (* rtp.Packet , * net.UDPAddr , error ) {
0 commit comments