Skip to content

Commit 751bff9

Browse files
zimnxmmatczuk
authored andcommitted
Add ability to connect to nodes through proxy
Services deployed in cloud often are hidden behind proxy which dispatches connections based on Server Name Identifier (SNI) taken from TLS Client Hello packet. New method of creating ClusterConfig - NewCloudCluster - allows to connect to nodes behind SNI proxy based on provided configuration file. Because each datacenter may have different TLS configurtion (CA, proxy address etc), more granular method of configuring connection details was needed. CloudCluster use special HostDialer which connect to nodes using information taken from HostInfo (datacenter, host_id) to go through SNI proxy.
1 parent 204b4f9 commit 751bff9

11 files changed

+1039
-3
lines changed

cloud_cluster_test.go

Lines changed: 304 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,304 @@
1+
//go:build integration && scylla
2+
// +build integration,scylla
3+
4+
package gocql_test
5+
6+
import (
7+
"bytes"
8+
"context"
9+
"crypto/tls"
10+
"fmt"
11+
"io"
12+
"net"
13+
"os"
14+
"strings"
15+
"sync"
16+
"testing"
17+
"time"
18+
19+
"github.com/gocql/gocql"
20+
"github.com/gocql/gocql/scyllacloud"
21+
"sigs.k8s.io/yaml"
22+
)
23+
24+
func TestCloudConnection(t *testing.T) {
25+
if !*gocql.FlagRunSslTest {
26+
t.Skip("Skipping because SSL is not enabled on cluster")
27+
}
28+
29+
const (
30+
sslPort = 9142
31+
datacenterName = "datacenter1"
32+
)
33+
ctx, cancel := context.WithCancel(context.Background())
34+
defer cancel()
35+
36+
hosts := map[string]string{}
37+
38+
cluster := gocql.CreateCluster(func(config *gocql.ClusterConfig) {
39+
config.Port = sslPort
40+
})
41+
session, err := cluster.CreateSession()
42+
if err != nil {
43+
t.Fatal(err)
44+
}
45+
46+
var localAddress string
47+
var localHostID gocql.UUID
48+
scanner := session.Query("SELECT broadcast_address, host_id FROM system.local").Iter().Scanner()
49+
if scanner.Next() {
50+
if err := scanner.Scan(&localAddress, &localHostID); err != nil {
51+
t.Fatal(err)
52+
}
53+
hosts[localHostID.String()] = net.JoinHostPort(localAddress, fmt.Sprintf("%d", sslPort))
54+
}
55+
56+
var peerAddress string
57+
var peerHostID gocql.UUID
58+
scanner = session.Query("SELECT peer, host_id FROM system.peers").Iter().Scanner()
59+
for scanner.Next() {
60+
if err := scanner.Scan(&peerAddress, &peerHostID); err != nil {
61+
t.Fatal(err)
62+
}
63+
hosts[peerHostID.String()] = net.JoinHostPort(peerAddress, fmt.Sprintf("%d", sslPort))
64+
}
65+
66+
session.Close()
67+
68+
logger := gocql.TestLogger
69+
defer func() {
70+
if t.Failed() {
71+
os.Stdout.WriteString(logger.String())
72+
}
73+
}()
74+
75+
proxy := &sniProxy{
76+
hosts: hosts,
77+
defaultBackend: net.JoinHostPort(localAddress, fmt.Sprintf("%d", sslPort)),
78+
logger: logger,
79+
}
80+
81+
proxyAddress, err := proxy.Run(ctx)
82+
if err != nil {
83+
t.Fatal(err)
84+
}
85+
defer proxy.Close()
86+
87+
cc := &scyllacloud.ConnectionConfig{
88+
Datacenters: map[string]*scyllacloud.Datacenter{
89+
datacenterName: {
90+
CertificateAuthorityPath: "testdata/pki/ca.crt",
91+
Server: proxyAddress,
92+
TLSServerName: "any",
93+
NodeDomain: "cloud.scylladb.com",
94+
InsecureSkipTLSVerify: true,
95+
},
96+
},
97+
AuthInfos: map[string]*scyllacloud.AuthInfo{
98+
"ai-1": {
99+
Username: "username",
100+
Password: "password",
101+
ClientKeyPath: "testdata/pki/gocql.key",
102+
ClientCertificatePath: "testdata/pki/gocql.crt",
103+
},
104+
},
105+
Contexts: map[string]*scyllacloud.Context{
106+
"default-context": {
107+
AuthInfoName: "ai-1",
108+
DatacenterName: datacenterName,
109+
},
110+
},
111+
CurrentContext: "default-context",
112+
}
113+
114+
configPath, err := writeYamlToTempFile(cc)
115+
if err != nil {
116+
t.Fatal(err)
117+
}
118+
defer os.RemoveAll(configPath)
119+
120+
cluster, err = scyllacloud.NewCloudCluster(configPath)
121+
if err != nil {
122+
t.Fatal(err)
123+
}
124+
// Forward connections directed to node domain to our test sni proxy.
125+
cluster.Dialer = dialerContextFunc(func(ctx context.Context, network, addr string) (net.Conn, error) {
126+
if strings.Contains(addr, cc.Datacenters[datacenterName].NodeDomain) {
127+
addr = cc.Datacenters[datacenterName].Server
128+
}
129+
return net.Dial(network, addr)
130+
})
131+
132+
session, err = cluster.CreateSession()
133+
if err != nil {
134+
t.Fatal(err)
135+
}
136+
137+
if err := gocql.WaitUntilPoolsStopFilling(ctx, session, 10*time.Second); err != nil {
138+
t.Fatal(err)
139+
}
140+
141+
ringHosts := gocql.GetRingAllHosts(session)
142+
if len(ringHosts) != len(hosts) {
143+
t.Errorf("expected %d hosts in ring, got %d", len(hosts), len(ringHosts))
144+
}
145+
146+
snisCount := map[string]int{}
147+
events := proxy.GetEvents()
148+
for _, event := range events {
149+
snisCount[event]++
150+
}
151+
152+
for hostID := range hosts {
153+
sni := fmt.Sprintf("%s.%s", hostID, cc.Datacenters[datacenterName].NodeDomain)
154+
count, ok := snisCount[sni]
155+
if !ok {
156+
t.Errorf("not found connection to host %q", hostID)
157+
}
158+
if count != cluster.NumConns {
159+
t.Errorf("expected %d connections to host %q, got %d", cluster.NumConns, sni, count)
160+
}
161+
}
162+
}
163+
164+
func writeYamlToTempFile(obj interface{}) (string, error) {
165+
f, err := os.CreateTemp(os.TempDir(), "gocql-cloud")
166+
if err != nil {
167+
return "", fmt.Errorf("create temp file: %w", err)
168+
}
169+
if err := f.Close(); err != nil {
170+
return "", fmt.Errorf("close temp file: %w", err)
171+
}
172+
173+
buf, err := yaml.Marshal(obj)
174+
if err != nil {
175+
return "", fmt.Errorf("marshal yaml: %w", err)
176+
}
177+
if err := os.WriteFile(f.Name(), buf, 0600); err != nil {
178+
return "", fmt.Errorf("write to file %q: %w", f.Name(), err)
179+
}
180+
181+
return f.Name(), nil
182+
}
183+
184+
type dialerContextFunc func(ctx context.Context, network, addr string) (net.Conn, error)
185+
186+
func (d dialerContextFunc) DialContext(ctx context.Context, network, addr string) (net.Conn, error) {
187+
return d(ctx, network, addr)
188+
}
189+
190+
type sniProxy struct {
191+
hosts map[string]string
192+
defaultBackend string
193+
logger gocql.StdLogger
194+
195+
listener net.Listener
196+
events []string
197+
mu sync.Mutex
198+
}
199+
200+
func (p *sniProxy) Run(ctx context.Context) (string, error) {
201+
listener, err := net.Listen("tcp", "127.0.0.1:0")
202+
if err != nil {
203+
return "", fmt.Errorf("failed to listen: %w", err)
204+
}
205+
206+
p.listener = listener
207+
208+
go func() {
209+
for {
210+
conn, err := p.listener.Accept()
211+
if err != nil {
212+
p.logger.Println("failed to accept connection", err)
213+
return
214+
}
215+
216+
go p.handleConnection(conn)
217+
}
218+
219+
}()
220+
221+
return listener.Addr().String(), nil
222+
}
223+
224+
func (p *sniProxy) handleConnection(conn net.Conn) {
225+
defer conn.Close()
226+
227+
var hello *tls.ClientHelloInfo
228+
229+
peekedBytes := &bytes.Buffer{}
230+
// Ignore error because TLS library returns it when nil TLSConfig is returned.
231+
_ = tls.Server(readOnlyConn{reader: io.TeeReader(conn, peekedBytes)}, &tls.Config{
232+
GetConfigForClient: func(argHello *tls.ClientHelloInfo) (*tls.Config, error) {
233+
hello = &tls.ClientHelloInfo{}
234+
*hello = *argHello
235+
return nil, nil
236+
},
237+
}).Handshake()
238+
239+
if hello == nil {
240+
p.logger.Println("client hello not sent")
241+
return
242+
}
243+
244+
p.mu.Lock()
245+
p.events = append(p.events, hello.ServerName)
246+
p.mu.Unlock()
247+
248+
backend, ok := p.hosts[hello.ServerName]
249+
if !ok {
250+
backend = p.defaultBackend
251+
}
252+
253+
p.logger.Println("Dialing backend", backend, "SNI", hello.ServerName)
254+
backendConn, err := net.Dial("tcp", backend)
255+
if err != nil {
256+
p.logger.Println("failed to dial backend", backend, err)
257+
return
258+
}
259+
defer backendConn.Close()
260+
261+
var wg sync.WaitGroup
262+
wg.Add(2)
263+
264+
go func() {
265+
_, _ = io.Copy(conn, backendConn)
266+
wg.Done()
267+
}()
268+
go func() {
269+
_, _ = io.Copy(backendConn, peekedBytes)
270+
_, _ = io.Copy(backendConn, conn)
271+
wg.Done()
272+
}()
273+
274+
wg.Wait()
275+
}
276+
277+
func (p *sniProxy) Close() error {
278+
return p.listener.Close()
279+
}
280+
281+
func (p *sniProxy) GetEvents() []string {
282+
p.mu.Lock()
283+
defer p.mu.Unlock()
284+
events := make([]string, 0, len(p.events))
285+
for _, e := range p.events {
286+
events = append(events, e)
287+
}
288+
return events
289+
}
290+
291+
type readOnlyConn struct {
292+
reader io.Reader
293+
}
294+
295+
var _ net.Conn = readOnlyConn{}
296+
297+
func (conn readOnlyConn) Read(p []byte) (int, error) { return conn.reader.Read(p) }
298+
func (conn readOnlyConn) Write(p []byte) (int, error) { return 0, io.ErrClosedPipe }
299+
func (conn readOnlyConn) Close() error { return nil }
300+
func (conn readOnlyConn) LocalAddr() net.Addr { return nil }
301+
func (conn readOnlyConn) RemoteAddr() net.Addr { return nil }
302+
func (conn readOnlyConn) SetDeadline(t time.Time) error { return nil }
303+
func (conn readOnlyConn) SetReadDeadline(t time.Time) error { return nil }
304+
func (conn readOnlyConn) SetWriteDeadline(t time.Time) error { return nil }

cluster.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ func NewCluster(hosts ...string) *ClusterConfig {
264264
ReconnectionPolicy: &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second},
265265
WriteCoalesceWaitTime: 200 * time.Microsecond,
266266
}
267+
267268
return cfg
268269
}
269270

conn.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1526,9 +1526,8 @@ func (c *Conn) localHostInfo(ctx context.Context) (*HostInfo, error) {
15261526
}
15271527

15281528
port := c.conn.RemoteAddr().(*net.TCPAddr).Port
1529-
15301529
// TODO(zariel): avoid doing this here
1531-
host, err := c.session.hostInfoFromMap(row, &HostInfo{connectAddress: c.host.connectAddress, port: port})
1530+
host, err := c.session.hostInfoFromMap(row, &HostInfo{hostname: c.host.connectAddress.String(), connectAddress: c.host.connectAddress, port: port})
15321531
if err != nil {
15331532
return nil, err
15341533
}

export_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
//go:build integration || unit
2+
// +build integration unit
3+
4+
package gocql
5+
6+
var FlagRunSslTest = flagRunSslTest
7+
var CreateCluster = createCluster
8+
var TestLogger = &testLogger{}
9+
var WaitUntilPoolsStopFilling = waitUntilPoolsStopFilling
10+
11+
func GetRingAllHosts(sess *Session) []*HostInfo {
12+
return sess.ring.allHosts()
13+
}

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ require (
88
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed
99
github.com/kr/pretty v0.1.0 // indirect
1010
github.com/stretchr/testify v1.3.0 // indirect
11+
golang.org/x/net v0.0.0-20220526153639-5463443f8c37
1112
gopkg.in/inf.v0 v0.9.1
13+
sigs.k8s.io/yaml v1.3.0
1214
)
1315

1416
go 1.13

go.sum

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@ github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYE
22
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k=
33
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
44
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
5-
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
65
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
6+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
7+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
78
github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
89
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
910
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
@@ -20,7 +21,20 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
2021
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
2122
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
2223
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
24+
golang.org/x/net v0.0.0-20220526153639-5463443f8c37 h1:lUkvobShwKsOesNfWWlCS5q7fnbG1MEliIzwu886fn8=
25+
golang.org/x/net v0.0.0-20220526153639-5463443f8c37/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
26+
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
27+
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
28+
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
29+
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
30+
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
2331
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
2432
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
33+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
34+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
2535
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
2636
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
37+
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
38+
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
39+
sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo=
40+
sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8=

0 commit comments

Comments
 (0)