Skip to content

Commit 381ce2b

Browse files
committed
close dropped connections and ping added connections on balancer.Update()
1 parent 26a84de commit 381ce2b

File tree

6 files changed

+215
-27
lines changed

6 files changed

+215
-27
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Pinged new connections on discovery attempt, closed dropped ones, so `ydb_go_sdk_ydb_driver_conns` metric is correct
2+
13
## v3.108.3
24
* Fixed handling of zero values for DyNumber
35
* Fixed the decimal yql slice bounds out of range

internal/balancer/balancer.go

Lines changed: 53 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,34 @@ func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context, cc *grpc.ClientC
175175
return nil
176176
}
177177

178+
func buildConnectionsState(ctx context.Context, pool interface {
179+
GetIfPresent(endpoint endpoint.Endpoint) conn.Conn
180+
Allow(ctx context.Context, cc conn.Conn)
181+
EndpointsToConnections(endpoints []endpoint.Endpoint) []conn.Conn
182+
}, newest []endpoint.Endpoint,
183+
dropped []endpoint.Endpoint,
184+
config balancerConfig.Config,
185+
selfLocation balancerConfig.Info,
186+
) *connectionsState {
187+
connections := pool.EndpointsToConnections(newest)
188+
for _, c := range connections {
189+
pool.Allow(ctx, c)
190+
c.Endpoint().Touch()
191+
_ = c.Ping(ctx)
192+
}
193+
194+
state := newConnectionsState(connections, config.Filter, selfLocation, config.AllowFallback)
195+
196+
for _, e := range dropped {
197+
c := pool.GetIfPresent(e)
198+
if c != nil {
199+
_ = c.Close(ctx)
200+
}
201+
}
202+
203+
return state
204+
}
205+
178206
func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, newest []endpoint.Endpoint, localDC string) {
179207
var (
180208
onDone = trace.DriverOnBalancerUpdate(
@@ -186,10 +214,12 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, newest []endpoi
186214
)
187215
previous = b.connections().All()
188216
)
217+
218+
_, added, dropped := xslices.Diff(previous, newest, func(lhs, rhs endpoint.Endpoint) int {
219+
return strings.Compare(lhs.Address(), rhs.Address())
220+
})
221+
189222
defer func() {
190-
_, added, dropped := xslices.Diff(previous, newest, func(lhs, rhs endpoint.Endpoint) int {
191-
return strings.Compare(lhs.Address(), rhs.Address())
192-
})
193223
onDone(
194224
xslices.Transform(newest, func(t endpoint.Endpoint) trace.EndpointInfo { return t }),
195225
xslices.Transform(added, func(t endpoint.Endpoint) trace.EndpointInfo { return t }),
@@ -198,21 +228,27 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, newest []endpoi
198228
)
199229
}()
200230

201-
connections := endpointsToConnections(b.pool, newest)
202-
for _, c := range connections {
203-
b.pool.Allow(ctx, c)
204-
c.Endpoint().Touch()
205-
}
206-
207231
info := balancerConfig.Info{SelfLocation: localDC}
208-
state := newConnectionsState(connections, b.balancerConfig.Filter, info, b.balancerConfig.AllowFallback)
209-
210-
endpointsInfo := make([]endpoint.Info, len(newest))
211-
for i, e := range newest {
212-
endpointsInfo[i] = e
213-
}
214-
215-
b.connectionsState.Store(state)
232+
b.connectionsState.Store(buildConnectionsState(ctx, b.pool, newest, dropped, b.balancerConfig, info))
233+
234+
//connections := b.pool.EndpointsToConnections(newest)
235+
//for _, c := range connections {
236+
// b.pool.Allow(ctx, c)
237+
// c.Endpoint().Touch()
238+
// _ = c.Ping(ctx)
239+
//}
240+
//
241+
//info := balancerConfig.Info{SelfLocation: localDC}
242+
//state := newConnectionsState(connections, b.balancerConfig.Filter, info, b.balancerConfig.AllowFallback)
243+
//
244+
//b.connectionsState.Store(state)
245+
//
246+
//for _, e := range dropped {
247+
// c := b.pool.GetIfPresent(e)
248+
// if c != nil {
249+
// _ = c.Close(ctx)
250+
// }
251+
//}
216252
}
217253

218254
func (b *Balancer) Close(ctx context.Context) (err error) {
@@ -444,12 +480,3 @@ func (b *Balancer) nextConn(ctx context.Context) (c conn.Conn, err error) {
444480

445481
return c, nil
446482
}
447-
448-
func endpointsToConnections(p *conn.Pool, endpoints []endpoint.Endpoint) []conn.Conn {
449-
conns := make([]conn.Conn, 0, len(endpoints))
450-
for _, e := range endpoints {
451-
conns = append(conns, p.Get(e))
452-
}
453-
454-
return conns
455-
}

internal/balancer/balancer_test.go

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
package balancer
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/stretchr/testify/assert"
8+
9+
balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
10+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
11+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
12+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/mock"
13+
)
14+
15+
type fakePool struct {
16+
connections map[string]*mock.Conn
17+
}
18+
19+
func (fp *fakePool) EndpointsToConnections(eps []endpoint.Endpoint) []conn.Conn {
20+
var conns []conn.Conn
21+
for _, ep := range eps {
22+
if c, ok := fp.connections[ep.Address()]; ok {
23+
conns = append(conns, c)
24+
}
25+
}
26+
return conns
27+
}
28+
29+
func (fp *fakePool) Allow(_ context.Context, _ conn.Conn) {}
30+
31+
func (fp *fakePool) GetIfPresent(ep endpoint.Endpoint) conn.Conn {
32+
if c, ok := fp.connections[ep.Address()]; ok {
33+
return c
34+
}
35+
return nil
36+
}
37+
38+
func TestBuildConnectionsState(t *testing.T) {
39+
ctx := context.Background()
40+
41+
tests := []struct {
42+
name string
43+
newEndpoints []endpoint.Endpoint
44+
oldEndpoints []endpoint.Endpoint
45+
initialConns map[string]*mock.Conn
46+
conf balancerConfig.Config
47+
selfLoc balancerConfig.Info
48+
expectPinged []string
49+
expectClosed []string
50+
}{
51+
{
52+
name: "single new and old endpoint",
53+
newEndpoints: []endpoint.Endpoint{&mock.Endpoint{AddrField: "127.0.0.1"}},
54+
oldEndpoints: []endpoint.Endpoint{&mock.Endpoint{AddrField: "127.0.0.2"}},
55+
initialConns: map[string]*mock.Conn{
56+
"127.0.0.1": {
57+
AddrField: "127.0.0.1",
58+
State: conn.Online,
59+
},
60+
"127.0.0.2": {
61+
AddrField: "127.0.0.2",
62+
State: conn.Offline,
63+
},
64+
},
65+
conf: balancerConfig.Config{
66+
AllowFallback: true,
67+
DetectNearestDC: true,
68+
},
69+
selfLoc: balancerConfig.Info{SelfLocation: "local"},
70+
expectPinged: []string{"127.0.0.1"},
71+
expectClosed: []string{"127.0.0.2"},
72+
},
73+
{
74+
newEndpoints: []endpoint.Endpoint{&mock.Endpoint{AddrField: "a1"}, &mock.Endpoint{AddrField: "a2"}},
75+
oldEndpoints: []endpoint.Endpoint{&mock.Endpoint{AddrField: "a3"}},
76+
initialConns: map[string]*mock.Conn{
77+
"a1": {
78+
AddrField: "a1",
79+
LocationField: "local",
80+
State: conn.Offline,
81+
},
82+
"a2": {
83+
AddrField: "a2",
84+
State: conn.Offline,
85+
},
86+
"a3": {
87+
AddrField: "a3",
88+
State: conn.Online,
89+
},
90+
},
91+
conf: balancerConfig.Config{
92+
AllowFallback: true,
93+
DetectNearestDC: true,
94+
},
95+
selfLoc: balancerConfig.Info{SelfLocation: "local"},
96+
expectPinged: []string{"a1", "a2"},
97+
expectClosed: []string{"a3"},
98+
},
99+
}
100+
101+
for _, tt := range tests {
102+
t.Run(tt.name, func(t *testing.T) {
103+
fp := &fakePool{connections: make(map[string]*mock.Conn)}
104+
for addr, c := range tt.initialConns {
105+
fp.connections[addr] = c
106+
}
107+
108+
state := buildConnectionsState(ctx, fp, tt.newEndpoints, tt.oldEndpoints, tt.conf, tt.selfLoc)
109+
assert.NotNil(t, state)
110+
for _, addr := range tt.expectPinged {
111+
c := fp.connections[addr]
112+
assert.True(t, c.Pinged, "connection %s should be pinged", addr)
113+
assert.True(t, c.State == conn.Online || c.PingErr != nil)
114+
}
115+
for _, addr := range tt.expectClosed {
116+
c := fp.connections[addr]
117+
assert.True(t, c.Closed, "connection %s should be closed", addr)
118+
assert.True(t, c.State == conn.Offline, "connection %s should be offline", addr)
119+
}
120+
})
121+
}
122+
}

internal/conn/conn.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"google.golang.org/grpc/metadata"
1414
"google.golang.org/grpc/stats"
1515

16+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/closer"
1617
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
1718
"github.com/ydb-platform/ydb-go-sdk/v3/internal/meta"
1819
"github.com/ydb-platform/ydb-go-sdk/v3/internal/operation"
@@ -36,6 +37,7 @@ var (
3637

3738
type Conn interface {
3839
grpc.ClientConnInterface
40+
closer.Closer
3941

4042
Endpoint() endpoint.Endpoint
4143

internal/conn/pool.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,20 @@ func (p *Pool) GrpcDialOptions() []grpc.DialOption {
4040
return p.dialOptions
4141
}
4242

43+
func (p *Pool) GetIfPresent(endpoint endpoint.Endpoint) Conn {
44+
var (
45+
address = endpoint.Address()
46+
cc *conn
47+
has bool
48+
)
49+
50+
if cc, has = p.conns.Get(address); has {
51+
return cc
52+
}
53+
54+
return nil
55+
}
56+
4357
func (p *Pool) Get(endpoint endpoint.Endpoint) Conn {
4458
var (
4559
address = endpoint.Address()
@@ -252,3 +266,12 @@ func NewPool(ctx context.Context, config Config) *Pool {
252266

253267
return p
254268
}
269+
270+
func (p *Pool) EndpointsToConnections(endpoints []endpoint.Endpoint) []Conn {
271+
conns := make([]Conn, 0, len(endpoints))
272+
for _, e := range endpoints {
273+
conns = append(conns, p.Get(e))
274+
}
275+
276+
return conns
277+
}

internal/mock/conn.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ type Conn struct {
1717
NodeIDField uint32
1818
State conn.State
1919
LocalDCField bool
20+
Pinged bool
21+
Closed bool
2022
}
2123

2224
func (c *Conn) Invoke(
@@ -53,7 +55,17 @@ func (c *Conn) Park(ctx context.Context) (err error) {
5355
panic("not implemented in mock")
5456
}
5557

58+
func (c *Conn) Close(ctx context.Context) error {
59+
c.Closed = true
60+
c.SetState(ctx, conn.Offline)
61+
return nil
62+
}
63+
5664
func (c *Conn) Ping(ctx context.Context) error {
65+
c.Pinged = true
66+
if c.PingErr == nil {
67+
c.SetState(ctx, conn.Online)
68+
}
5769
return c.PingErr
5870
}
5971

@@ -116,7 +128,7 @@ func (e *Endpoint) LoadFactor() float32 {
116128
}
117129

118130
func (e *Endpoint) OverrideHost() string {
119-
panic("not implemented in mock")
131+
return ""
120132
}
121133

122134
func (e *Endpoint) String() string {

0 commit comments

Comments
 (0)