Skip to content

Commit f217c59

Browse files
committed
Centralize access to connection pools
1 parent 721e361 commit f217c59

File tree

3 files changed

+34
-18
lines changed

3 files changed

+34
-18
lines changed

client.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,12 @@ var MaxBulkRetries = 10
4848
// your command will only be executed only once.
4949
func (b *Bucket) Do(k string, f func(mc *memcached.Client, vb uint16) error) error {
5050
vb := b.VBHash(k)
51-
for i := 0; i < len(b.connections)*2; i++ {
51+
maxTries := len(b.VBucketServerMap.ServerList) * 2
52+
for i := 0; i < maxTries; i++ {
5253
masterId := b.VBucketServerMap.VBucketMap[vb][0]
53-
conn, err := b.connections[masterId].Get()
54-
defer b.connections[masterId].Return(conn)
54+
pool := b.getConnPool(masterId)
55+
conn, err := pool.Get()
56+
defer pool.Return(conn)
5557
if err != nil {
5658
return err
5759
}
@@ -71,8 +73,8 @@ func (b *Bucket) Do(k string, f func(mc *memcached.Client, vb uint16) error) err
7173
}
7274
}
7375

74-
return fmt.Errorf("Unable to complete action after %v retries",
75-
len(b.connections)*2)
76+
return fmt.Errorf("Unable to complete action after %v attemps",
77+
maxTries)
7678
}
7779

7880
type gathered_stats struct {
@@ -85,8 +87,9 @@ func getStatsParallel(b *Bucket, offset int, which string,
8587
sn := b.VBucketServerMap.ServerList[offset]
8688

8789
results := map[string]string{}
88-
conn, err := b.connections[offset].Get()
89-
defer b.connections[offset].Return(conn)
90+
pool := b.getConnPool(offset)
91+
conn, err := pool.Get()
92+
defer pool.Return(conn)
9093
if err != nil {
9194
ch <- gathered_stats{sn, results}
9295
} else {
@@ -151,12 +154,13 @@ func (b *Bucket) doBulkGet(vb uint16, keys []string,
151154
// This stack frame exists to ensure we can clean up
152155
// connection at a reasonable time.
153156
err := func() error {
154-
conn, err := b.connections[masterId].Get()
157+
pool := b.getConnPool(masterId)
158+
conn, err := pool.Get()
155159
if err != nil {
156160
ch <- map[string]*gomemcached.MCResponse{}
157161
return err
158162
}
159-
defer b.connections[masterId].Return(conn)
163+
defer pool.Return(conn)
160164

161165
m, err := conn.GetBulk(vb, keys)
162166
switch err.(type) {

pools.go

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -102,10 +102,22 @@ type Bucket struct {
102102
Controllers map[string]interface{} `json:"controllers,omitempty"`
103103

104104
pool *Pool
105-
connections []*connectionPool
105+
connPools []*connectionPool
106106
commonSufix string
107107
}
108108

109+
func (b Bucket) getConnPools() []*connectionPool {
110+
return b.connPools
111+
}
112+
113+
func (b Bucket) getConnPool(i int) *connectionPool {
114+
p := b.getConnPools()
115+
if len(p) > i {
116+
return p[i]
117+
}
118+
return nil
119+
}
120+
109121
func (b Bucket) authHandler() (ah AuthHandler) {
110122
if b.pool != nil {
111123
ah = b.pool.client.ah
@@ -228,8 +240,8 @@ func (b *Bucket) refresh() (err error) {
228240
return err
229241
}
230242
b.pool = pool
231-
for i := range b.connections {
232-
b.connections[i] = newConnectionPool(
243+
for i := range b.connPools {
244+
b.connPools[i] = newConnectionPool(
233245
b.VBucketServerMap.ServerList[i],
234246
b.authHandler(), PoolSize, PoolOverflow)
235247
}
@@ -246,7 +258,7 @@ func (p *Pool) refresh() (err error) {
246258
}
247259
for _, b := range buckets {
248260
b.pool = p
249-
b.connections = make([]*connectionPool, len(b.VBucketServerMap.ServerList))
261+
b.connPools = make([]*connectionPool, len(b.VBucketServerMap.ServerList))
250262

251263
p.BucketMap[b.Name] = b
252264
}
@@ -275,18 +287,18 @@ func (c *Client) GetPool(name string) (p Pool, err error) {
275287

276288
// Mark this bucket as no longer needed, closing connections it may have open.
277289
func (b *Bucket) Close() {
278-
if b.connections != nil {
279-
for _, c := range b.connections {
290+
if b.connPools != nil {
291+
for _, c := range b.connPools {
280292
if c != nil {
281293
c.Close()
282294
}
283295
}
284-
b.connections = nil
296+
b.connPools = nil
285297
}
286298
}
287299

288300
func bucket_finalizer(b *Bucket) {
289-
if b.connections != nil {
301+
if b.connPools != nil {
290302
log.Printf("Warning: Finalizing a bucket with active connections.")
291303
}
292304
}

tap.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func (feed *TapFeed) run() {
8181

8282
func (feed *TapFeed) connectToNodes() (killSwitch chan bool, err error) {
8383
killSwitch = make(chan bool)
84-
for _, serverConn := range feed.bucket.connections {
84+
for _, serverConn := range feed.bucket.getConnPools() {
8585
var singleFeed *memcached.TapFeed
8686
singleFeed, err = serverConn.StartTapFeed(feed.args)
8787
if err != nil {

0 commit comments

Comments
 (0)