Skip to content
This repository was archived by the owner on Jul 7, 2024. It is now read-only.

Commit b5c849b

Browse files
committed
srv support
1 parent 3db9ae0 commit b5c849b

File tree

10 files changed

+277
-14
lines changed

10 files changed

+277
-14
lines changed

api/main.go

+14
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,20 @@ type PushS struct {
2020
PubKeys map[string]wgtypes.Key
2121
}
2222

23+
type SRVUpdateQ struct {
24+
CentralToken util.Token
25+
SRVs []SRV
26+
}
27+
28+
type SRVUpdateS struct{}
29+
30+
type SRV struct {
31+
NetworkName string
32+
PeerName string
33+
34+
central.SRV
35+
}
36+
2337
type GenerateQ struct {
2438
CNNs []string
2539
}

central/main.go

+71-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package central
33

44
import (
55
"encoding/json"
6+
"errors"
67
"net"
78
"time"
89

@@ -53,10 +54,12 @@ type Peer struct {
5354
CanForward bool `yaml:"canForward" json:"canForward"`
5455
// CanSee determines whether this Peer can see anything (nil) or specfic peers only (non-nil).
5556
// TODO: when CanSee.Only is blank, this is interpreted as nil → no way to distinguish between seeing nothing and everything
56-
CanSee *CanSee `yaml:"canSee" json:"canSee"`
57+
CanSee *CanSee `yaml:"canSee" json:"canSee"`
58+
AllowedSRVs []SRVAllowance
5759

5860
PubKey wgtypes.Key
5961
ForwardingPeers []string
62+
SRVs []SRV
6063
}
6164

6265
func (p *Peer) String() string {
@@ -86,6 +89,73 @@ func (c *CanSee) Same(c2 *CanSee) bool {
8689
return Same3(c.Only, c2.Only)
8790
}
8891

92+
type SRVAllowable interface {
93+
// AllowedBy returns nil this is allowed by the SRVAllowance, and a non-nil error otherwise.
94+
AllowedBy(SRVAllowance) error
95+
}
96+
97+
func AllowedByAny(sa SRVAllowable, a2s []SRVAllowance) bool {
98+
for _, a2 := range a2s {
99+
err := sa.AllowedBy(a2)
100+
if err == nil {
101+
return true
102+
}
103+
}
104+
return false
105+
}
106+
107+
type SRVAllowance struct {
108+
Service string
109+
ServiceAny bool
110+
Name string
111+
NameAny bool
112+
PriorityMin uint16
113+
PriorityMax uint16
114+
WeightMin uint16
115+
WeightMax uint16
116+
}
117+
118+
func (a SRVAllowance) AllowedBy(a2 SRVAllowance) error {
119+
if !a2.ServiceAny && a.Service != a2.Service {
120+
return errors.New("Service mismatch")
121+
}
122+
if !a2.NameAny && a.Name != a2.Name {
123+
return errors.New("Name mismatch")
124+
}
125+
if !(a.PriorityMin >= a2.PriorityMin && a.PriorityMax <= a2.PriorityMax) {
126+
return errors.New("Priority not in range")
127+
}
128+
if !(a.WeightMin >= a2.WeightMin && a.WeightMax <= a2.WeightMax) {
129+
return errors.New("Weight not in range")
130+
}
131+
return nil
132+
}
133+
134+
type SRV struct {
135+
Service string
136+
Protocol string
137+
Name string
138+
Priority uint16
139+
Weight uint16
140+
Port uint16
141+
}
142+
143+
func (s SRV) AllowedBy(a2 SRVAllowance) error {
144+
if !a2.ServiceAny && s.Service != a2.Service {
145+
return errors.New("Service mismatch")
146+
}
147+
if !a2.NameAny && s.Name != a2.Name {
148+
return errors.New("Name mismatch")
149+
}
150+
if !(a2.PriorityMin <= s.Priority && s.Priority <= a2.PriorityMax) {
151+
return errors.New("Priority not in range")
152+
}
153+
if !(a2.WeightMin <= s.Weight && s.Weight <= a2.WeightMax) {
154+
return errors.New("Weight not in range")
155+
}
156+
return nil
157+
}
158+
89159
// Duration is a encoding-friendly time.Duration.
90160
type Duration time.Duration
91161

cmd/runner-node/main.go

+2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ type config struct {
2323
CS csConfig `yaml:"cs"`
2424
Hokuto hokutoConfig `yaml:"hokuto"`
2525
EndpointOverride string `yaml:"endpointOverride"`
26+
SRVList string `yaml:"srvList"`
2627
}
2728

2829
type hokutoConfig struct {
@@ -130,6 +131,7 @@ func main() {
130131
HokutoExtraParents: c.Hokuto.ExtraParents,
131132
EndpointOverride: c.EndpointOverride,
132133
BackportPath: filepath.Join(os.Getenv("STATE_DIRECTORY"), "node-backport.json"),
134+
SRVList: c.SRVList,
133135
}
134136
if os.Getenv("HOKUTO_ADDR") != "" {
135137
nc.HokutoAddr = os.Getenv("HOKUTO_ADDR")

cs/api.go

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ func (c *CentralSource) newHandler() {
1616
h.Handle("ping", c.ping)
1717
h.Handle("sync", c.sync)
1818
h.Handle("azusa", c.azusa)
19+
h.Handle("srvUpdate", c.srvUpdate)
1920
c.handler = h
2021
}
2122

cs/azusa.go

+11-3
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,25 @@ func (c *CentralSource) azusa(cl *rpc2.Client, q *api.AzusaQ, s *api.AzusaS) err
2626
}
2727
peer, ok := cn.Peers[q.Networks[cnn].Name]
2828
if !ok {
29-
return fmt.Errorf("net %s no exist :(", cnn)
29+
return fmt.Errorf("net %s peer %s no exist :(", cnn, q.Networks[cnn].Name)
3030
}
3131
peer.Name = q.Networks[cnn].Name
32-
err = checkPeer(ti, cnn, peer)
32+
err = checkPeer(ti, cnn, *peer)
3333
if err != nil {
3434
return err
3535
}
36-
_, ok := c.cc.Networks[cnn]
36+
_, ok = c.cc.Networks[cnn]
3737
if !ok {
3838
return fmt.Errorf("net %s no exist :(", cnn)
3939
}
40+
if !ti.SRVAllowancesAny {
41+
for saI, sa := range q.Networks[cnn].AllowedSRVs {
42+
if !central.AllowedByAny(sa, ti.SRVAllowances) {
43+
return fmt.Errorf("peer allowance %d: not allowed by any token-level allowances", saI)
44+
}
45+
}
46+
}
47+
// TODO: token-level restrictions on SRVAllowance and SRVs
4048
fmt.Fprintf(&desc, "\n- net %s peer %s: %#v", cnn, peer.Name, peer)
4149
}
4250
util.S.Infof("azusa from token %s to push %d:\n%s", ti.Name, len(q.Networks), &desc)

cs/srvupdate.go

+73
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package cs
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/cenkalti/rpc2"
7+
"github.com/nyiyui/qrystal/api"
8+
"github.com/nyiyui/qrystal/central"
9+
"github.com/nyiyui/qrystal/util"
10+
"golang.org/x/exp/slices"
11+
)
12+
13+
func (c *CentralSource) srvUpdate(cl *rpc2.Client, q *api.SRVUpdateQ, s *api.SRVUpdateS) error {
14+
ti, ok, err := c.Tokens.getToken(&q.CentralToken)
15+
if err != nil {
16+
return err
17+
}
18+
if !ok {
19+
return newTokenAuthError(q.CentralToken)
20+
}
21+
for _, srv := range q.SRVs {
22+
cn, ok := c.cc.Networks[srv.NetworkName]
23+
if !ok {
24+
return fmt.Errorf("net %s no exist :(", srv.NetworkName)
25+
}
26+
peer, ok := cn.Peers[srv.PeerName]
27+
if !ok {
28+
return fmt.Errorf("net %s peer %s no exist :(", srv.NetworkName, srv.PeerName)
29+
}
30+
err = checkPeer(ti, srv.NetworkName, *peer)
31+
if err != nil {
32+
return err
33+
}
34+
}
35+
util.S.Infof("srvUpdate from token %s to push %d:\n%#v", ti.Name, len(q.SRVs), q.SRVs)
36+
ti.StartUse()
37+
err = c.Tokens.UpdateToken(ti)
38+
if err != nil {
39+
return err
40+
}
41+
defer func() {
42+
ti.StopUse()
43+
err = c.Tokens.UpdateToken(ti)
44+
if err != nil {
45+
util.S.Errorf("UpdateToken %s: %s", ti.key, err)
46+
}
47+
}()
48+
c.ccLock.Lock()
49+
defer c.ccLock.Unlock()
50+
for srvI, srv := range q.SRVs {
51+
cn, ok := c.cc.Networks[srv.NetworkName]
52+
if !ok {
53+
return fmt.Errorf("net %s no exist :(", srv.NetworkName)
54+
}
55+
peer, ok := cn.Peers[srv.PeerName]
56+
if !ok {
57+
return fmt.Errorf("net %s peer %s no exist :(", srv.NetworkName, srv.PeerName)
58+
}
59+
if !central.AllowedByAny(srv, peer.AllowedSRVs) {
60+
return fmt.Errorf("srv %d: not allowed", srvI)
61+
}
62+
i := slices.IndexFunc(peer.SRVs, func(s central.SRV) bool { return s.Service == srv.Service })
63+
if srv.Name == "" && i != -1 {
64+
peer.SRVs = append(peer.SRVs[:i], peer.SRVs[i+1:]...)
65+
} else if i == -1 {
66+
i = len(peer.SRVs)
67+
peer.SRVs = append(peer.SRVs, srv.SRV)
68+
} else {
69+
peer.SRVs[i] = srv.SRV
70+
}
71+
}
72+
return nil
73+
}

cs/token.go

+13-6
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"time"
88

9+
"github.com/nyiyui/qrystal/central"
910
"github.com/nyiyui/qrystal/util"
1011
"github.com/tidwall/buntdb"
1112
"gopkg.in/yaml.v3"
@@ -126,13 +127,19 @@ func (s *TokenStore) convertToMap() (m map[string]string, err error) {
126127
}
127128

128129
type TokenInfo struct {
129-
key string `json:"-"`
130-
Name string
131-
Networks map[string]string
132-
CanPull bool
133-
CanPush *CanPush
134-
CanAdminTokens *CanAdminTokens
130+
key string `json:"-"`
131+
Name string
132+
Networks map[string]string
133+
CanPull bool
134+
CanPush *CanPush
135135
// CanAdminTokens specifies whether this token can add *or remove* tokens.
136+
CanAdminTokens *CanAdminTokens
137+
// CanSRVUpdate allows updating SRV allowances for peers it can push to.
138+
CanSRVUpdate bool
139+
// SRVAllowance is a ORed list of SRV alloances for peers it can push to.
140+
SRVAllowances []central.SRVAllowance
141+
// SRVAllowancesAny allows updating SRV allowances without restrictions by SRVAllowances.
142+
SRVAllowancesAny bool
136143

137144
Using bool
138145
LastUsed time.Time

node/loop.go

+39-4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ package node
44

55
import (
66
"fmt"
7+
"os"
8+
"os/signal"
9+
"syscall"
710
"time"
811

912
"github.com/cenkalti/rpc2"
@@ -17,15 +20,48 @@ type listenError struct {
1720
}
1821

1922
func (n *Node) ListenCS() {
23+
go func() {
24+
err := n.handleSRV()
25+
util.S.Errorf("srv error: %s", err)
26+
panic(fmt.Sprintf("srv error: %s", err))
27+
}()
2028
util.S.Debug("listening…")
2129
err := n.listenCS()
2230
util.S.Errorf("cs listen error: %s", err)
2331
}
2432

33+
func (n *Node) handleSRV() error {
34+
reloadCh := make(chan os.Signal)
35+
signal.Notify(reloadCh, syscall.SIGUSR1)
36+
return util.Backoff(func() (resetBackoff bool, err error) { return n.handleSRVOnce(reloadCh) }, func(backoff time.Duration, err error) error {
37+
util.S.Errorf("srv: %s; retry in %s", err, backoff)
38+
util.S.Errorw("srv: error",
39+
"err", err,
40+
"backoff", backoff,
41+
)
42+
return nil
43+
})
44+
}
45+
46+
func (n *Node) handleSRVOnce(reloadCh <-chan os.Signal) (resetBackoff bool, err error) {
47+
util.S.Debug("newClient…")
48+
cl, _, err := n.newClient()
49+
if err != nil {
50+
return false, fmt.Errorf("newClient: %w", err)
51+
}
52+
53+
for range reloadCh {
54+
err = n.loadSRVList(cl)
55+
if err != nil {
56+
err = fmt.Errorf("srv: %w", err)
57+
return
58+
}
59+
}
60+
return true, nil
61+
}
62+
2563
func (n *Node) listenCS() error {
26-
return util.Backoff(func() (resetBackoff bool, err error) {
27-
return n.listenCSOnce()
28-
}, func(backoff time.Duration, err error) error {
64+
return util.Backoff(n.listenCSOnce, func(backoff time.Duration, err error) error {
2965
util.Notify(fmt.Sprintf("STATUS=connecting to CS: %s (retrying in %s)", err, backoff))
3066
util.S.Errorf("listen: %s; retry in %s", err, backoff)
3167
util.S.Errorw("listen: error",
@@ -37,7 +73,6 @@ func (n *Node) listenCS() error {
3773
}
3874

3975
func (n *Node) listenCSOnce() (resetBackoff bool, err error) {
40-
// Setup
4176
util.S.Debug("newClient…")
4277
cl, _, err := n.newClient()
4378
if err != nil {

node/main.go

+3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ type NodeConfig struct {
2323
CS CSConfig
2424
EndpointOverride string
2525
BackportPath string
26+
SRVList string
2627
}
2728

2829
// There must be only one Node instance as a Node can trigger a trace to stop.
@@ -48,6 +49,7 @@ func NewNode(cfg NodeConfig) (*Node, error) {
4849
hokuto: hh,
4950
endpointOverridePath: cfg.EndpointOverride,
5051
backportPath: cfg.BackportPath,
52+
srvListPath: cfg.SRVList,
5153
}
5254
if cfg.HokutoDNSAddr != "" {
5355
addr, err := net.ResolveUDPAddr("udp", cfg.HokutoDNSAddr)
@@ -84,4 +86,5 @@ type Node struct {
8486
eoStateLock sync.Mutex
8587

8688
backportPath string
89+
srvListPath string
8790
}

0 commit comments

Comments
 (0)