Skip to content

Commit 9a55f93

Browse files
committed
Atomically perform bucket refreshes
This should make things considerably less racy under failure. Needs more actual race testing with the race detector, though. There are numerous caveats here since the bucket is still used both for generating and parsing bucket descriptions on the wire *and* used to represent a bucket within a running system. The parts we expect to change may now do so freely. cbugg: close bug-909 bug-903 bug-853
1 parent 794e447 commit 9a55f93

File tree

8 files changed

+102
-57
lines changed

8 files changed

+102
-57
lines changed

client.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,10 @@ var MaxBulkRetries = 10
4848
// your command will only be executed only once.
4949
func (b *Bucket) Do(k string, f func(mc *memcached.Client, vb uint16) error) error {
5050
vb := b.VBHash(k)
51-
maxTries := len(b.VBucketServerMap.ServerList) * 2
51+
maxTries := len(b.Nodes()) * 2
5252
for i := 0; i < maxTries; i++ {
53-
masterId := b.VBucketServerMap.VBucketMap[vb][0]
53+
vbm := b.VBServerMap()
54+
masterId := vbm.VBucketMap[vb][0]
5455
pool := b.getConnPool(masterId)
5556
conn, err := pool.Get()
5657
defer pool.Return(conn)
@@ -84,7 +85,7 @@ type gathered_stats struct {
8485

8586
func getStatsParallel(b *Bucket, offset int, which string,
8687
ch chan<- gathered_stats) {
87-
sn := b.VBucketServerMap.ServerList[offset]
88+
sn := b.VBServerMap().ServerList[offset]
8889

8990
results := map[string]string{}
9091
pool := b.getConnPool(offset)
@@ -108,19 +109,20 @@ func getStatsParallel(b *Bucket, offset int, which string,
108109
func (b *Bucket) GetStats(which string) map[string]map[string]string {
109110
rv := map[string]map[string]string{}
110111

111-
if b.VBucketServerMap.ServerList == nil {
112+
vsm := b.VBServerMap()
113+
if vsm.ServerList == nil {
112114
return rv
113115
}
114116
// Go grab all the things at once.
115-
todo := len(b.VBucketServerMap.ServerList)
117+
todo := len(vsm.ServerList)
116118
ch := make(chan gathered_stats, todo)
117119

118-
for offset := range b.VBucketServerMap.ServerList {
120+
for offset := range vsm.ServerList {
119121
go getStatsParallel(b, offset, which, ch)
120122
}
121123

122124
// Gather the results
123-
for i := 0; i < len(b.VBucketServerMap.ServerList); i++ {
125+
for i := 0; i < len(vsm.ServerList); i++ {
124126
g := <-ch
125127
if len(g.vals) > 0 {
126128
rv[g.sn] = g.vals
@@ -148,7 +150,7 @@ func (b *Bucket) doBulkGet(vb uint16, keys []string,
148150
attempts := 0
149151
done := false
150152
for attempts < MaxBulkRetries && !done {
151-
masterId := b.VBucketServerMap.VBucketMap[vb][0]
153+
masterId := b.VBServerMap().VBucketMap[vb][0]
152154
attempts++
153155

154156
// This stack frame exists to ensure we can clean up

examples/hello/hello.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,11 @@ func doOps(b *couchbase.Bucket) {
3535
}
3636

3737
func exploreBucket(bucket *couchbase.Bucket) {
38-
fmt.Printf(" %v uses %s\n", bucket.Name,
39-
bucket.VBucketServerMap.HashAlgorithm)
40-
for pos, server := range bucket.VBucketServerMap.ServerList {
38+
vbm := bucket.VBServerMap()
39+
fmt.Printf(" %v uses %s\n", bucket.Name, vbm.HashAlgorithm)
40+
for pos, server := range vbm.ServerList {
4141
vbs := make([]string, 0, 1024)
42-
for vb, a := range bucket.VBucketServerMap.VBucketMap {
42+
for vb, a := range vbm.VBucketMap {
4343
if a[0] == pos {
4444
vbs = append(vbs, strconv.Itoa(vb))
4545
}

pools.go

Lines changed: 59 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import (
1313
"runtime"
1414
"sort"
1515
"strings"
16+
"sync/atomic"
17+
"unsafe"
1618
)
1719

1820
// The HTTP Client To Use
@@ -73,6 +75,13 @@ type Pool struct {
7375
client Client
7476
}
7577

78+
type VBucketServerMap struct {
79+
HashAlgorithm string `json:"hashAlgorithm"`
80+
NumReplicas int `json:"numReplicas"`
81+
ServerList []string `json:"serverList"`
82+
VBucketMap [][]int `json:"vBucketMap"`
83+
}
84+
7685
// An individual bucket. Herein lives the most useful stuff.
7786
type Bucket struct {
7887
AuthType string `json:"authType"`
@@ -81,7 +90,6 @@ type Bucket struct {
8190
Type string `json:"bucketType"`
8291
Name string `json:"name"`
8392
NodeLocator string `json:"nodeLocator"`
84-
Nodes []Node `json:"nodes"`
8593
Quota map[string]float64 `json:"quota,omitempty"`
8694
Replicas int `json:"replicaNumber"`
8795
Password string `json:"saslPassword"`
@@ -92,22 +100,46 @@ type Bucket struct {
92100
DDocs struct {
93101
URI string `json:"uri"`
94102
} `json:"ddocs,omitempty"`
95-
VBucketServerMap struct {
96-
HashAlgorithm string `json:"hashAlgorithm"`
97-
NumReplicas int `json:"numReplicas"`
98-
ServerList []string `json:"serverList"`
99-
VBucketMap [][]int `json:"vBucketMap"`
100-
} `json:"vBucketServerMap"`
101103
BasicStats map[string]interface{} `json:"basicStats,omitempty"`
102104
Controllers map[string]interface{} `json:"controllers,omitempty"`
103105

104-
pool *Pool
105-
connPools []*connectionPool
106-
commonSufix string
106+
// These are used for JSON IO, but isn't used for processing
107+
// since it needs to be swapped out safely.
108+
VBSMJson VBucketServerMap `json:"vBucketServerMap"`
109+
NodesJson []Node `json:"nodes"`
110+
111+
pool *Pool
112+
connPools unsafe.Pointer // *[]*connectionPool
113+
vBucketServerMap unsafe.Pointer // *VBucketServerMap
114+
nodeList unsafe.Pointer // *[]Node
115+
commonSufix string
116+
}
117+
118+
// Get the current vbucket server map
119+
func (b Bucket) VBServerMap() *VBucketServerMap {
120+
return (*VBucketServerMap)(atomic.LoadPointer(&b.vBucketServerMap))
121+
}
122+
123+
func (b Bucket) Nodes() []Node {
124+
return *(*[]Node)(atomic.LoadPointer(&b.nodeList))
107125
}
108126

109127
func (b Bucket) getConnPools() []*connectionPool {
110-
return b.connPools
128+
return *(*[]*connectionPool)(atomic.LoadPointer(&b.connPools))
129+
}
130+
131+
func (b *Bucket) replaceConnPools(with []*connectionPool) {
132+
for {
133+
old := atomic.LoadPointer(&b.connPools)
134+
if atomic.CompareAndSwapPointer(&b.connPools, old, unsafe.Pointer(&with)) {
135+
if old != nil {
136+
for _, pool := range *(*[]*connectionPool)(old) {
137+
pool.Close()
138+
}
139+
}
140+
return
141+
}
142+
}
111143
}
112144

113145
func (b Bucket) getConnPool(i int) *connectionPool {
@@ -130,16 +162,17 @@ func (b Bucket) authHandler() (ah AuthHandler) {
130162

131163
// Get the (sorted) list of memcached node addresses (hostname:port).
132164
func (b Bucket) NodeAddresses() []string {
133-
rv := make([]string, len(b.VBucketServerMap.ServerList))
134-
copy(rv, b.VBucketServerMap.ServerList)
165+
vsm := b.VBServerMap()
166+
rv := make([]string, len(vsm.ServerList))
167+
copy(rv, vsm.ServerList)
135168
sort.Strings(rv)
136169
return rv
137170
}
138171

139172
// Get the longest common suffix of all host:port strings in the node list.
140173
func (b Bucket) CommonAddressSuffix() string {
141174
input := []string{}
142-
for _, n := range b.Nodes {
175+
for _, n := range b.Nodes() {
143176
input = append(input, n.Hostname)
144177
}
145178
return FindCommonSuffix(input)
@@ -235,16 +268,20 @@ func Connect(baseU string) (Client, error) {
235268

236269
func (b *Bucket) refresh() error {
237270
pool := b.pool
238-
err := pool.client.parseURLResponse(b.URI, b)
271+
tmpb := &Bucket{}
272+
err := pool.client.parseURLResponse(b.URI, tmpb)
239273
if err != nil {
240274
return err
241275
}
242-
b.pool = pool
243-
for i := range b.connPools {
244-
b.connPools[i] = newConnectionPool(
245-
b.VBucketServerMap.ServerList[i],
276+
newcps := make([]*connectionPool, len(b.VBSMJson.ServerList))
277+
for i := range newcps {
278+
newcps[i] = newConnectionPool(
279+
tmpb.VBSMJson.ServerList[i],
246280
b.authHandler(), PoolSize, PoolOverflow)
247281
}
282+
b.replaceConnPools(newcps)
283+
atomic.StorePointer(&b.vBucketServerMap, unsafe.Pointer(&b.VBSMJson))
284+
atomic.StorePointer(&b.nodeList, unsafe.Pointer(&b.NodesJson))
248285
return nil
249286
}
250287

@@ -258,7 +295,8 @@ func (p *Pool) refresh() (err error) {
258295
}
259296
for _, b := range buckets {
260297
b.pool = p
261-
b.connPools = make([]*connectionPool, len(b.VBucketServerMap.ServerList))
298+
b.nodeList = unsafe.Pointer(&b.NodesJson)
299+
b.replaceConnPools(make([]*connectionPool, len(b.VBSMJson.ServerList)))
262300

263301
p.BucketMap[b.Name] = b
264302
}
@@ -288,7 +326,7 @@ func (c *Client) GetPool(name string) (p Pool, err error) {
288326
// Mark this bucket as no longer needed, closing connections it may have open.
289327
func (b *Bucket) Close() {
290328
if b.connPools != nil {
291-
for _, c := range b.connPools {
329+
for _, c := range b.getConnPools() {
292330
if c != nil {
293331
c.Close()
294332
}

pools_test.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package couchbase
33
import (
44
"encoding/json"
55
"testing"
6+
"unsafe"
67
)
78

89
var samplePools = `{
@@ -288,24 +289,25 @@ func TestPool(t *testing.T) {
288289
}
289290

290291
func TestCommonAddressSuffixEmpty(t *testing.T) {
291-
b := Bucket{}
292+
b := Bucket{nodeList: mkNL([]Node{})}
292293
assert(t, "empty", "", b.CommonAddressSuffix())
293294
}
294295

295296
func TestCommonAddressSuffixUncommon(t *testing.T) {
296-
b := Bucket{}
297-
b.VBucketServerMap.ServerList = []string{"somestring", "unrelated"}
297+
b := Bucket{vBucketServerMap: unsafe.Pointer(&VBucketServerMap{
298+
ServerList: []string{"somestring", "unrelated"}}),
299+
nodeList: mkNL([]Node{}),
300+
}
298301
assert(t, "shouldn't match", "", b.CommonAddressSuffix())
299302
}
300303

301304
func TestCommonAddressSuffixCommon(t *testing.T) {
302-
b := Bucket{}
303-
b.Nodes = []Node{
305+
b := Bucket{nodeList: unsafe.Pointer(&[]Node{
304306
{Hostname: "server1.example.com:11210"},
305307
{Hostname: "server2.example.com:11210"},
306308
{Hostname: "server3.example.com:11210"},
307309
{Hostname: "server4.example.com:11210"},
308-
}
310+
})}
309311
assert(t, "useful suffix", ".example.com:11210",
310312
b.CommonAddressSuffix())
311313
}

vbmap.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,5 +74,6 @@ func (b *Bucket) VBHash(key string) uint32 {
7474
for x := 0; x < len(key); x++ {
7575
crc = (crc >> 8) ^ crc32tab[(uint64(crc)^uint64(key[x]))&0xff]
7676
}
77-
return ((^crc) >> 16) & 0x7fff & (uint32(len(b.VBucketServerMap.VBucketMap)) - 1)
77+
vbm := b.VBServerMap()
78+
return ((^crc) >> 16) & 0x7fff & (uint32(len(vbm.VBucketMap)) - 1)
7879
}

vbmap_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@ package couchbase
22

33
import (
44
"testing"
5+
"unsafe"
56
)
67

78
func testBucket() Bucket {
8-
b := Bucket{}
9-
b.VBucketServerMap.VBucketMap = make([][]int, 256)
9+
b := Bucket{vBucketServerMap: unsafe.Pointer(&VBucketServerMap{
10+
VBucketMap: make([][]int, 256),
11+
})}
1012
return b
1113
}
1214

views.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,12 @@ type ViewResult struct {
3232
}
3333

3434
func (b *Bucket) randomBaseURL() (*url.URL, error) {
35-
if len(b.Nodes) == 0 {
35+
if len(b.Nodes()) == 0 {
3636
return nil, errors.New("no couch rest URLs")
3737
}
38-
nodeNo := rand.Intn(len(b.Nodes))
39-
node := b.Nodes[nodeNo]
38+
nodes := b.Nodes()
39+
nodeNo := rand.Intn(len(nodes))
40+
node := nodes[nodeNo]
4041
if node.CouchAPIBase == "" {
4142
// Probably in "warmup" state
4243
return nil, fmt.Errorf("Bucket is in %q state, not ready for view queries", node.Status)

views_test.go

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,30 @@ package couchbase
22

33
import (
44
"testing"
5+
"unsafe"
56
)
67

7-
func TestViewURL(t *testing.T) {
8-
b := Bucket{}
9-
// No URLs
10-
v, err := b.ViewURL("a", "b", nil)
11-
if err == nil {
12-
t.Errorf("Expected error on empty bucket, got %v", v)
13-
}
8+
func mkNL(in []Node) unsafe.Pointer {
9+
return unsafe.Pointer(&in)
10+
}
1411

12+
func TestViewURL(t *testing.T) {
1513
// Missing URL
16-
b = Bucket{Nodes: []Node{{}}}
17-
v, err = b.ViewURL("a", "b", nil)
14+
b := Bucket{nodeList: mkNL([]Node{{}})}
15+
v, err := b.ViewURL("a", "b", nil)
1816
if err == nil {
1917
t.Errorf("Expected error on missing URL, got %v", v)
2018
}
2119

2220
// Invalidish URL
23-
b = Bucket{Nodes: []Node{{CouchAPIBase: "::gopher:://localhost:80x92/"}}}
21+
b = Bucket{nodeList: mkNL([]Node{{CouchAPIBase: "::gopher:://localhost:80x92/"}})}
2422
v, err = b.ViewURL("a", "b", nil)
2523
if err == nil {
2624
t.Errorf("Expected error on broken URL, got %v", v)
2725
}
2826

2927
// Unmarshallable parameter
30-
b = Bucket{Nodes: []Node{{CouchAPIBase: "http:://localhost:8092/"}}}
28+
b = Bucket{nodeList: mkNL([]Node{{CouchAPIBase: "http:://localhost:8092/"}})}
3129
v, err = b.ViewURL("a", "b",
3230
map[string]interface{}{"ch": make(chan bool)})
3331
if err == nil {
@@ -59,7 +57,8 @@ func TestViewURL(t *testing.T) {
5957
{"", "_all_docs", nil, "/x/_all_docs", map[string]string{}},
6058
}
6159

62-
b = Bucket{Name: "x", Nodes: []Node{{CouchAPIBase: "http://localhost:8092/"}}}
60+
b = Bucket{Name: "x",
61+
nodeList: mkNL([]Node{{CouchAPIBase: "http://localhost:8092/"}})}
6362
for _, test := range tests {
6463
us, err := b.ViewURL(test.ddoc, test.name, test.params)
6564
if err != nil {

0 commit comments

Comments
 (0)