Skip to content

Commit 25a9855

Browse files
authored
Wait for an ACK for inbound calls. (#445)
1 parent 0d79eff commit 25a9855

File tree

5 files changed

+40
-11
lines changed

5 files changed

+40
-11
lines changed

pkg/sip/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ func (c *Client) onBye(req *sip.Request, tx sip.ServerTransaction) bool {
293293
_ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusCallTransactionDoesNotExists, "Call does not exist", nil))
294294
return false
295295
}
296-
call.log.Infow("BYE")
296+
call.log.Infow("BYE from remote")
297297
go func(call *outboundCall) {
298298
call.cc.AcceptBye(req, tx)
299299
call.CloseWithReason(CallHangup, "bye", livekit.DisconnectReason_CLIENT_INITIATED)

pkg/sip/inbound.go

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ const (
5555
// audioBridgeMaxDelay delays sending audio for certain time, unless RTP packet is received.
5656
// This is done because of audio cutoff at the beginning of calls observed in the wild.
5757
audioBridgeMaxDelay = 1 * time.Second
58+
59+
inviteOkAckTimeout = 5 * time.Second
5860
)
5961

6062
// hashPassword creates a SHA256 hash of the password for logging purposes
@@ -364,21 +366,36 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE
364366
}
365367

366368
func (s *Server) onOptions(log *slog.Logger, req *sip.Request, tx sip.ServerTransaction) {
367-
_ = tx.Respond(sip.NewResponseFromRequest(req, 200, "OK", nil))
369+
_ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusOK, "OK", nil))
370+
}
371+
372+
func (s *Server) onAck(log *slog.Logger, req *sip.Request, tx sip.ServerTransaction) {
373+
tag, err := getFromTag(req)
374+
if err != nil {
375+
return
376+
}
377+
s.cmu.RLock()
378+
c := s.activeCalls[tag]
379+
s.cmu.RUnlock()
380+
if c == nil {
381+
return
382+
}
383+
c.log.Infow("ACK from remote")
384+
c.cc.AcceptAck(req, tx)
368385
}
369386

370387
func (s *Server) onBye(log *slog.Logger, req *sip.Request, tx sip.ServerTransaction) {
371388
tag, err := getFromTag(req)
372389
if err != nil {
373-
_ = tx.Respond(sip.NewResponseFromRequest(req, 400, "", nil))
390+
_ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusBadRequest, "", nil))
374391
return
375392
}
376393

377394
s.cmu.RLock()
378395
c := s.activeCalls[tag]
379396
s.cmu.RUnlock()
380397
if c != nil {
381-
c.log.Infow("BYE")
398+
c.log.Infow("BYE from remote")
382399
c.cc.AcceptBye(req, tx)
383400
_ = c.Close()
384401
return
@@ -417,7 +434,7 @@ func (s *Server) OnNoRoute(log *slog.Logger, req *sip.Request, tx sip.ServerTran
417434
func (s *Server) onNotify(log *slog.Logger, req *sip.Request, tx sip.ServerTransaction) {
418435
tag, err := getFromTag(req)
419436
if err != nil {
420-
_ = tx.Respond(sip.NewResponseFromRequest(req, 400, "", nil))
437+
_ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusBadRequest, "", nil))
421438
return
422439
}
423440

@@ -597,7 +614,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
597614
}
598615
c.log.Infow("Accepting the call", "headers", headers)
599616
if err := c.cc.Accept(ctx, answerData, headers); err != nil {
600-
c.log.Errorw("Cannot respond to INVITE", err)
617+
c.log.Errorw("Cannot accept the call", err)
601618
return false, err
602619
}
603620
c.media.EnableTimeout(true)
@@ -1168,6 +1185,7 @@ type sipInbound struct {
11681185
nextRequestCSeq uint32
11691186
referCseq uint32
11701187
ringing chan struct{}
1188+
acked core.Fuse
11711189
setHeaders setHeadersFunc
11721190
}
11731191

@@ -1343,7 +1361,7 @@ func (c *sipInbound) Accept(ctx context.Context, sdpData []byte, headers map[str
13431361
if c.inviteTx == nil {
13441362
return errors.New("call already rejected")
13451363
}
1346-
r := sip.NewResponseFromRequest(c.invite, 200, "OK", sdpData)
1364+
r := sip.NewResponseFromRequest(c.invite, sip.StatusOK, "OK", sdpData)
13471365

13481366
// This will effectively redirect future SIP requests to this server instance (if host address is not LB).
13491367
r.AppendHeader(c.contact)
@@ -1358,11 +1376,23 @@ func (c *sipInbound) Accept(ctx context.Context, sdpData []byte, headers map[str
13581376
if err := c.inviteTx.Respond(r); err != nil {
13591377
return err
13601378
}
1379+
ackCtx, ackCancel := context.WithTimeout(ctx, inviteOkAckTimeout)
1380+
defer ackCancel()
1381+
select {
1382+
case <-ackCtx.Done():
1383+
return errors.New("no ACK received for 200 OK")
1384+
case <-c.inviteTx.Acks():
1385+
case <-c.acked.Watch():
1386+
}
13611387
c.inviteOk = r
13621388
c.inviteTx = nil // accepted
13631389
return nil
13641390
}
13651391

1392+
func (c *sipInbound) AcceptAck(req *sip.Request, tx sip.ServerTransaction) {
1393+
c.acked.Break()
1394+
}
1395+
13661396
func (c *sipInbound) AcceptBye(req *sip.Request, tx sip.ServerTransaction) {
13671397
_ = tx.Respond(sip.NewResponseFromRequest(req, 200, "OK", nil))
13681398
c.mu.Lock()

pkg/sip/server.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -267,13 +267,12 @@ func (s *Server) Start(agent *sipgo.UserAgent, sc *ServiceConfig, tlsConf *tls.C
267267

268268
s.sipSrv.OnOptions(s.onOptions)
269269
s.sipSrv.OnInvite(s.onInvite)
270+
s.sipSrv.OnAck(s.onAck)
270271
s.sipSrv.OnBye(s.onBye)
271272
s.sipSrv.OnNotify(s.onNotify)
272273
s.sipSrv.OnNoRoute(s.OnNoRoute)
273274
s.sipUnhandled = unhandled
274275

275-
// Ignore ACKs
276-
s.sipSrv.OnAck(func(log *slog.Logger, req *sip.Request, tx sip.ServerTransaction) {})
277276
listenIP := s.conf.ListenIP
278277
if listenIP == "" {
279278
listenIP = "0.0.0.0"

pkg/siptest/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func NewClient(id string, conf ClientConfig) (*Client, error) {
8787
conf.Log.Debug("setting local address", "ip", localIP)
8888
}
8989
if conf.Port == 0 {
90-
conf.Port = 5060 + uint16(rand.Intn(100))
90+
conf.Port = 5060 + uint16(rand.Intn(1000))
9191
}
9292
if conf.Number == "" {
9393
conf.Number = "1000"

test/integration/sip_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func runSIPServer(t testing.TB, lk *LiveKit) *SIPServer {
5757
if err != nil {
5858
t.Fatal(err)
5959
}
60-
sipPort := 5060 + rand.Intn(100)
60+
sipPort := 5060 + rand.Intn(1000)
6161
local, err := config.GetLocalIP()
6262
require.NoError(t, err)
6363
conf := &config.Config{

0 commit comments

Comments
 (0)