Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use UDPMux for Server reflexive candidates #419

Merged
merged 1 commit into from
Feb 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,10 @@ type Agent struct {
loggerFactory logging.LoggerFactory
log logging.LeveledLogger

net *vnet.Net
tcpMux TCPMux
udpMux UDPMux
net *vnet.Net
tcpMux TCPMux
udpMux UDPMux
udpMuxSrflx UniversalUDPMux

interfaceFilter func(string) bool

Expand Down Expand Up @@ -319,6 +320,7 @@ func NewAgent(config *AgentConfig) (*Agent, error) { //nolint:gocognit
a.tcpMux = newInvalidTCPMux()
}
a.udpMux = config.UDPMux
a.udpMuxSrflx = config.UDPMuxSrflx

if a.net == nil {
a.net = vnet.NewNet(nil)
Expand Down Expand Up @@ -892,6 +894,9 @@ func (a *Agent) removeUfragFromMux() {
if a.udpMux != nil {
a.udpMux.RemoveConnByUfrag(a.localUfrag)
}
if a.udpMuxSrflx != nil {
a.udpMuxSrflx.RemoveConnByUfrag(a.localUfrag)
}
}

// Close cleans up the Agent
Expand Down
6 changes: 6 additions & 0 deletions agent_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ type AgentConfig struct {
// defer to UDPMux for incoming connections
UDPMux UDPMux

// UDPMuxSrflx is used for multiplexing multiple incoming UDP connections of server reflexive candidates
// on a single port when this is set, the agent ignores PortMin and PortMax configurations and will
// defer to UDPMuxSrflx for incoming connections
// It embeds UDPMux to do the actual connection multiplexing
UDPMuxSrflx UniversalUDPMux

// Proxy Dialer is a dialer that should be implemented by the user based on golang.org/x/net/proxy
// dial interface in order to support corporate proxies
ProxyDialer proxy.Dialer
Expand Down
4 changes: 4 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,8 @@ var (
errICEWriteSTUNMessage = errors.New("the ICE conn can't write STUN messages")
errUDPMuxDisabled = errors.New("UDPMux is not enabled")
errCandidateIPNotFound = errors.New("could not determine local IP for Mux candidate")
errNoXorAddrMapping = errors.New("no address mapping")
errSendSTUNPacket = errors.New("failed to send STUN packet")
errXORMappedAddrTimeout = errors.New("timeout while waiting for XORMappedAddr")
errNotImplemented = errors.New("not implemented yet")
)
68 changes: 67 additions & 1 deletion gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ func (a *Agent) gatherCandidates(ctx context.Context) {
case CandidateTypeServerReflexive:
wg.Add(1)
go func() {
a.gatherCandidatesSrflx(ctx, a.urls, a.networkTypes)
if a.udpMuxSrflx != nil {
a.gatherCandidatesSrflxUDPMux(ctx, a.urls, a.networkTypes)
} else {
a.gatherCandidatesSrflx(ctx, a.urls, a.networkTypes)
}
wg.Done()
}()
if a.extIPMapper != nil && a.extIPMapper.candidateType == CandidateTypeServerReflexive {
Expand Down Expand Up @@ -333,6 +337,68 @@ func (a *Agent) gatherCandidatesSrflxMapped(ctx context.Context, networkTypes []
}
}

func (a *Agent) gatherCandidatesSrflxUDPMux(ctx context.Context, urls []*URL, networkTypes []NetworkType) {
var wg sync.WaitGroup
defer wg.Wait()

for _, networkType := range networkTypes {
if networkType.IsTCP() {
continue
}

for i := range urls {
wg.Add(1)
go func(url URL, network string) {
defer wg.Done()

hostPort := fmt.Sprintf("%s:%d", url.Host, url.Port)
serverAddr, err := a.net.ResolveUDPAddr(network, hostPort)
if err != nil {
a.log.Warnf("failed to resolve stun host: %s: %v", hostPort, err)
return
}

xoraddr, err := a.udpMuxSrflx.GetXORMappedAddr(serverAddr, stunGatherTimeout)
if err != nil {
a.log.Warnf("could not get server reflexive address %s %s: %v\n", network, url, err)
return
}

conn, err := a.udpMuxSrflx.GetConnForURL(a.localUfrag, url.String())
if err != nil {
a.log.Warnf("could not find connection in UDPMuxSrflx %s %s: %v\n", network, url, err)
return
}

ip := xoraddr.IP
port := xoraddr.Port

laddr := conn.LocalAddr().(*net.UDPAddr)
srflxConfig := CandidateServerReflexiveConfig{
Network: network,
Address: ip.String(),
Port: port,
Component: ComponentRTP,
RelAddr: laddr.IP.String(),
RelPort: laddr.Port,
}
c, err := NewCandidateServerReflexive(&srflxConfig)
if err != nil {
closeConnAndLog(conn, a.log, fmt.Sprintf("Failed to create server reflexive candidate: %s %s %d: %v\n", network, ip, port, err))
return
}

if err := a.addCandidate(ctx, c, conn); err != nil {
if closeErr := c.close(); closeErr != nil {
a.log.Warnf("Failed to close candidate: %v", closeErr)
}
a.log.Warnf("Failed to append to localCandidates and run onCandidateHdlr: %v\n", err)
}
}(*urls[i], networkType.String())
}
}
}

func (a *Agent) gatherCandidatesSrflx(ctx context.Context, urls []*URL, networkTypes []NetworkType) {
var wg sync.WaitGroup
defer wg.Wait()
Expand Down
92 changes: 92 additions & 0 deletions gather_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ import (
"reflect"
"sort"
"strconv"
"sync"
"testing"
"time"

"github.com/pion/dtls/v2"
"github.com/pion/dtls/v2/pkg/crypto/selfsign"
"github.com/pion/logging"
"github.com/pion/stun"
"github.com/pion/transport/test"
"github.com/pion/turn/v2"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -484,3 +486,93 @@ func TestTURNProxyDialer(t *testing.T) {

assert.NoError(t, a.Close())
}

// Assert that UniversalUDPMux is used while gathering when configured in the Agent
func TestUniversalUDPMuxUsage(t *testing.T) {
report := test.CheckRoutines(t)
defer report()

lim := test.TimeOut(time.Second * 30)
defer lim.Stop()

conn, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.IP{127, 0, 0, 1}, Port: randomPort(t)})
assert.NoError(t, err)
defer func() {
_ = conn.Close()
}()

udpMuxSrflx := &universalUDPMuxMock{
conn: conn,
}

numSTUNS := 3
urls := []*URL{}
for i := 0; i < numSTUNS; i++ {
urls = append(urls, &URL{
Scheme: SchemeTypeSTUN,
Host: "127.0.0.1",
Port: 3478 + i,
})
}

a, err := NewAgent(&AgentConfig{
NetworkTypes: supportedNetworkTypes(),
Urls: urls,
CandidateTypes: []CandidateType{CandidateTypeServerReflexive},
UDPMuxSrflx: udpMuxSrflx,
})
assert.NoError(t, err)

candidateGathered, candidateGatheredFunc := context.WithCancel(context.Background())
assert.NoError(t, a.OnCandidate(func(c Candidate) {
if c == nil {
candidateGatheredFunc()
return
}
t.Log(c.NetworkType(), c.Priority(), c)
}))
assert.NoError(t, a.GatherCandidates())

<-candidateGathered.Done()

assert.NoError(t, a.Close())
// twice because of 2 STUN servers configured
assert.Equal(t, numSTUNS, udpMuxSrflx.getXORMappedAddrUsedTimes, "expected times that GetXORMappedAddr should be called")
// one for Restart() when agent has been initialized and one time when Close() the agent
assert.Equal(t, 2, udpMuxSrflx.removeConnByUfragTimes, "expected times that RemoveConnByUfrag should be called")
// twice because of 2 STUN servers configured
assert.Equal(t, numSTUNS, udpMuxSrflx.getConnForURLTimes, "expected times that GetConnForURL should be called")
}

type universalUDPMuxMock struct {
UDPMux
getXORMappedAddrUsedTimes int
removeConnByUfragTimes int
getConnForURLTimes int
mu sync.Mutex
conn *net.UDPConn
}

func (m *universalUDPMuxMock) GetRelayedAddr(turnAddr net.Addr, deadline time.Duration) (*net.Addr, error) {
return nil, errNotImplemented
}

func (m *universalUDPMuxMock) GetConnForURL(ufrag string, url string) (net.PacketConn, error) {
m.mu.Lock()
defer m.mu.Unlock()
m.getConnForURLTimes++
return m.conn, nil
}

func (m *universalUDPMuxMock) GetXORMappedAddr(serverAddr net.Addr, deadline time.Duration) (*stun.XORMappedAddress, error) {
m.mu.Lock()
defer m.mu.Unlock()
m.getXORMappedAddrUsedTimes++
return &stun.XORMappedAddress{IP: net.IP{100, 64, 0, 1}, Port: 77878}, nil
}

func (m *universalUDPMuxMock) RemoveConnByUfrag(ufrag string) {
m.mu.Lock()
defer m.mu.Unlock()
m.removeConnByUfragTimes++
}
Loading