Skip to content

Commit 37fee1e

Browse files
committed
add tg discovery support
1 parent c58cd79 commit 37fee1e

File tree

7 files changed

+199
-69
lines changed

7 files changed

+199
-69
lines changed

TODO.md

+7-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
1-
# TODOs
1+
# Features not supported yet
2+
1. Authenticating callIds using STIR
3+
2. Trunking usecase (customer trunk group registration flows)
4+
3. Authorization via OAuth
5+
6+
# Code TODOs
27
1. combine grpc server with ws and h3 server
38
2. Support bi-directional media
49
3. See if the media latency can be improved
@@ -7,3 +12,4 @@
712
6. Occasional portaudio crashes
813
7. Define media and control framing
914
8. tests/tests/tests
15+

api/ript-api.go

+30-33
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,16 @@
11
package api
22

3-
import "crypto/sha1"
4-
53
// API definitions for ript
64
// TODO: Use RAML/Swagger for auto generating the code
75

86

9-
type HandlerRequest struct {
10-
HandlerId string `json:"handler-id"`
11-
Advertisement string `json:"advertisement"`
12-
}
13-
14-
type HandlerResponse struct {
15-
Uri string `json:"uri"`
16-
}
17-
187
const (
19-
ContentPacket PacketType = 1
20-
ContentRequestPacket PacketType = 2
21-
RegisterHandlerPacket PacketType = 3
8+
TrunkGroupDiscoveryPacket PacketType = 1
9+
RegisterHandlerPacket PacketType = 2
10+
ContentPacket PacketType = 3
2211
)
2312

13+
// types of content carried by the Content Paclet
2414
const (
2515
ContentFilterMediaForward ContentFilter = 1
2616
ContentFilterMediaReverse ContentFilter = 2
@@ -30,36 +20,19 @@ type FaceName string
3020
type PacketType byte
3121
type ContentFilter byte
3222
type DeliveryAddress string
33-
type ContentHash []byte
3423

3524
type ContentMessage struct {
3625
To DeliveryAddress
3726
Id int32
3827
Content []byte
3928
}
4029

41-
func (msg ContentMessage) Hash() ContentHash {
42-
h := sha1.New()
43-
h.Write([]byte(msg.Content))
44-
return ContentHash(h.Sum(nil))
45-
}
46-
47-
type ContentRequestMessage struct {
48-
To DeliveryAddress
49-
Id int32
50-
}
51-
52-
type RegisterHandlerMessage struct {
53-
HandlerRequest HandlerRequest
54-
HandlerResponse HandlerResponse
55-
}
56-
5730
type Packet struct {
5831
Type PacketType
5932
Filter ContentFilter
6033
Content ContentMessage
61-
ContentRequest ContentRequestMessage
6234
RegisterHandler RegisterHandlerMessage
35+
TrunkGroupsInfo TrunkGroupsInfoMessage
6336
}
6437

6538
type PacketEvent struct {
@@ -73,7 +46,7 @@ type PacketEvent struct {
7346

7447
type Advertisement string
7548

76-
/// Handler
49+
/// Handler Definition
7750
type HandlerInfo struct {
7851
Id string
7952
Advertisement Advertisement
@@ -87,4 +60,28 @@ func (h HandlerInfo) matchCaps(other HandlerInfo) bool {
8760
return true
8861
}
8962
return false
63+
}
64+
65+
type HandlerRequest struct {
66+
HandlerId string `json:"handler-id"`
67+
Advertisement string `json:"advertisement"`
68+
}
69+
70+
type HandlerResponse struct {
71+
Uri string `json:"uri"`
72+
}
73+
74+
type RegisterHandlerMessage struct {
75+
HandlerRequest HandlerRequest
76+
HandlerResponse HandlerResponse
77+
}
78+
79+
//// Trunk
80+
81+
type TrunkGroup struct {
82+
Uri string
83+
}
84+
85+
type TrunkGroupsInfoMessage struct {
86+
TrunkGroups map[string][]TrunkGroup
9087
}

ript_client/main.go

+45-5
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,28 @@ import (
1313
"syscall"
1414
)
1515

16+
// info about the provider
17+
type riptProviderInfo struct {
18+
baseUrl string
19+
trunkGroups map[string][]api.TrunkGroup
20+
}
21+
22+
func (p *riptProviderInfo) getTrunkGroupUri() string {
23+
// pick the outbound url as default
24+
tg := p.trunkGroups["outbound"][0]
25+
return tg.Uri
26+
}
27+
1628
type riptClient struct {
1729
client ript_net.Face
18-
stopChan chan bool
30+
stopChan chan bool
1931
doneChan chan bool
2032
recvChan chan api.PacketEvent
2133
// ript semantics
2234
handlerInfo api.HandlerInfo
35+
providerInfo *riptProviderInfo
2336
}
2437

25-
2638
func (c *riptClient) registerHandler() {
2739
pkt := api.Packet{
2840
Type: api.RegisterHandlerPacket,
@@ -49,6 +61,26 @@ func (c *riptClient) registerHandler() {
4961
log.Printf("registerHandler: handlerInfo with uri: [%v]", c.handlerInfo)
5062
}
5163

64+
func (c *riptClient) retrieveTrunkGroups() {
65+
pkt := api.Packet{
66+
Type: api.TrunkGroupDiscoveryPacket,
67+
}
68+
69+
err := c.client.Send(pkt)
70+
if err != nil {
71+
log.Fatalf("retrieveTrunkGroups: error [%v]", err)
72+
panic(err)
73+
}
74+
75+
// await response
76+
select {
77+
case response := <- c.recvChan:
78+
c.providerInfo.trunkGroups = response.Packet.TrunkGroupsInfo.TrunkGroups
79+
}
80+
81+
log.Printf("trukGroupDiscovery: tgs [%v]", c.providerInfo.trunkGroups)
82+
}
83+
5284
func (c *riptClient) recordContent(client ript_net.Face) {
5385
defer func() {
5486
c.doneChan <- true
@@ -127,7 +159,7 @@ func (c *riptClient) stop() {
127159
}
128160

129161

130-
func NewRIPTClient(client ript_net.Face) *riptClient {
162+
func NewRIPTClient(client ript_net.Face, providerInfo *riptProviderInfo) *riptClient {
131163
hId, err := uuid.NewUUID()
132164
ad := "1 in: opus; PCMU; PCMA; 2 out: opus; PCMU; PCMA;"
133165
if err != nil {
@@ -143,6 +175,7 @@ func NewRIPTClient(client ript_net.Face) *riptClient {
143175
Id: hId.String(),
144176
Advertisement: api.Advertisement(ad),
145177
},
178+
providerInfo: providerInfo,
146179
}
147180

148181
ript.client.SetReceiveChan(ript.recvChan)
@@ -178,8 +211,11 @@ func main() {
178211

179212
var client ript_net.Face
180213
var err error
214+
provider := &riptProviderInfo{
215+
baseUrl: baseUri,
216+
}
181217
if xport == "h3" {
182-
client = NewQuicClientFace()
218+
client = NewQuicClientFace(provider)
183219
} else if xport == "ws" {
184220
client, err = ript_net.NewWebSocketClientFace("ws://localhost:8080/")
185221
if err != nil {
@@ -190,8 +226,12 @@ func main() {
190226
return
191227
}
192228

193-
riptClient := NewRIPTClient(client)
229+
riptClient := NewRIPTClient(client, provider)
230+
231+
// 1. retrieve trunk groups
232+
riptClient.retrieveTrunkGroups()
194233

234+
// 2. register this handler
195235
riptClient.registerHandler()
196236

197237
if mode == "push" {

ript_client/quic_client.go

+26-4
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,17 @@ import (
1818
)
1919

2020
const (
21-
registerHandlerUrl = "https://localhost:6121/.well-known/ript/v1/providerTgs/trunk123/handlers"
21+
baseUri = "https://localhost:6121"
22+
trunkDiscoveryUrl = baseUri+"/.well-known/ript/v1/providertgs"
23+
registerHandlerUrl = baseUri+"/.well-known/ript/v1/providertgs/trunk123/handlers"
2224

2325
mediaPushUrl = "https://localhost:6121/media/forward"
2426
mediaPullUrl = "https://localhost:6121/media/reverse"
2527
)
2628

2729
type QuicClientFace struct {
2830
client *http.Client
31+
serverInfo *riptProviderInfo
2932
name api.FaceName
3033
recvChan chan api.PacketEvent
3134
haveRecv bool
@@ -35,7 +38,7 @@ type QuicClientFace struct {
3538
inboundContentId int32
3639
}
3740

38-
func NewQuicClientFace() *QuicClientFace {
41+
func NewQuicClientFace(serverInfo *riptProviderInfo) *QuicClientFace {
3942
pool, err := x509.SystemCertPool()
4043
if err != nil {
4144
fmt.Printf("cert pool creation error")
@@ -77,11 +80,11 @@ func NewQuicClientFace() *QuicClientFace {
7780

7881
return &QuicClientFace {
7982
client: client,
83+
serverInfo: serverInfo,
8084
haveRecv: false,
8185
haveClosed: false,
8286
closeChan: make(chan error, 1),
8387
inboundContentId: -1,
84-
8588
}
8689
}
8790

@@ -151,7 +154,8 @@ func (c *QuicClientFace) Send(pkt api.Packet) error {
151154
}
152155

153156
case api.RegisterHandlerPacket:
154-
res, err = c.client.Post(registerHandlerUrl, "application/json; charset=utf-8", buf)
157+
url := c.serverInfo.baseUrl + c.serverInfo.getTrunkGroupUri() + "/handlers"
158+
res, err = c.client.Post(url, "application/json; charset=utf-8", buf)
155159
if err != nil || res.StatusCode != 200 {
156160
break
157161
}
@@ -163,6 +167,24 @@ func (c *QuicClientFace) Send(pkt api.Packet) error {
163167

164168
log.Printf("HandlerRegistration response [%v]", res)
165169

170+
// forward the packet for further processing
171+
c.recvChan <- api.PacketEvent{
172+
Packet: responsePacket,
173+
}
174+
case api.TrunkGroupDiscoveryPacket:
175+
fmt.Printf("trunkDiscovery url [%s]", trunkDiscoveryUrl)
176+
res, err = c.client.Get(trunkDiscoveryUrl)
177+
if err != nil || res.StatusCode != 200 {
178+
break
179+
}
180+
181+
responsePacket, err = httpResponseToRiptPacket(res)
182+
if err != nil {
183+
break
184+
}
185+
186+
log.Printf("TrunkGroupDiscoveryy response [%v]", res)
187+
166188
// forward the packet for further processing
167189
c.recvChan <- api.PacketEvent{
168190
Packet: responsePacket,

ript_net/quic.go

+50-6
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@ type QuicFace struct {
2323
haveRecv bool
2424
// inbound face to router for processsing
2525
recvChan chan api.PacketEvent
26-
// app/router to face for outbound transport
26+
// app/router to face for outbound transport/handlers
2727
contentChan chan api.Packet
28+
// channel for trunk discovery
29+
tgDiscChan chan api.Packet
2830
closeChan chan error
2931
closed bool
3032
name string
@@ -45,8 +47,14 @@ func (f *QuicFace) Name() api.FaceName {
4547

4648

4749
func (f *QuicFace) Send(pkt api.Packet) error {
48-
log.Printf("send: passing on the content [%d] to content chan, face [%s]", pkt.Content.Id, f.name)
49-
f.contentChan <- pkt
50+
switch pkt.Type {
51+
case api.TrunkGroupDiscoveryPacket:
52+
log.Printf("send: passing on the content to trunk discovery chan, face [%s]", f.name)
53+
f.tgDiscChan <- pkt
54+
default:
55+
log.Printf("send: passing on the content to general contnet chan, face [%s]", f.name)
56+
f.contentChan <- pkt
57+
}
5058
return nil
5159
}
5260

@@ -73,6 +81,7 @@ func NewQuicFace(name string) *QuicFace {
7381
haveRecv: false,
7482
closeChan: make(chan error, 1),
7583
contentChan: make(chan api.Packet, 1),
84+
tgDiscChan: make(chan api.Packet, 1),
7685
closed: false,
7786
name: name,
7887
}
@@ -185,6 +194,33 @@ func HandlerRegistration(face *QuicFace, writer http.ResponseWriter, request *ht
185194
}
186195
}
187196

197+
198+
func HandleTgDiscovery(face *QuicFace, writer http.ResponseWriter, request *http.Request) {
199+
// query service for list of trunk groups available
200+
face.recvChan <- api.PacketEvent{
201+
Sender: face.Name(),
202+
Packet: api.Packet{
203+
Type: api.TrunkGroupDiscoveryPacket,
204+
},
205+
}
206+
207+
// await response or timeout
208+
select {
209+
case <-time.After(2 * time.Second):
210+
log.Errorf("HandleTgDiscovery: no content received .. ")
211+
writer.WriteHeader(404)
212+
return
213+
case resPkt := <- face.tgDiscChan:
214+
log.Printf("HandleTgDiscovery [%s] got content [%v]", face.Name(), resPkt)
215+
enc, err := json.Marshal(resPkt)
216+
if err != nil {
217+
writer.WriteHeader(400)
218+
return
219+
}
220+
writer.Write(enc)
221+
}
222+
}
223+
188224
// Mux handler for routing various h3 endpoints
189225
func setupHandler(server *QuicFaceServer) http.Handler {
190226
router := mux.NewRouter()
@@ -232,11 +268,19 @@ func setupHandler(server *QuicFaceServer) http.Handler {
232268
}
233269

234270

235-
// todo: tg discovery
236-
//mux.HandleFunc("/.well-known/ript/v1/providerTgs", tgDiscFn)
271+
tgDiscFn := func(w http.ResponseWriter, r *http.Request) {
272+
log.Printf("trunk group discovery from [%v]", r.RemoteAddr)
273+
// get the face
274+
face := server.faceMap[r.RemoteAddr]
275+
HandleTgDiscovery(face, w, r)
276+
}
277+
278+
279+
fmt.Printf("trunkDiscoveryUrl [%s]", baseUrl+"/providerTgs")
280+
router.HandleFunc(baseUrl+"/providertgs", tgDiscFn)
237281

238282
// handler registrations
239-
router.HandleFunc("/.well-known/ript/v1/providerTgs/{trunkGroupId}/handlers",
283+
router.HandleFunc("/.well-known/ript/v1/providertgs/{trunkGroupId}/handlers",
240284
regHandlerFn).Methods(http.MethodPost)
241285

242286
router.HandleFunc("/media/join", joinFn)

0 commit comments

Comments
 (0)