Skip to content

Commit 15855db

Browse files
author
Mikaël Cluseau
committed
boltindex: batch seen key writes
1 parent 53daffc commit 15855db

File tree

7 files changed

+177
-112
lines changed

7 files changed

+177
-112
lines changed

boltindex/boltindex.go

+123-77
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,16 @@ package boltindex
22

33
import (
44
"bytes"
5+
"log"
6+
"sync"
57

68
"github.com/boltdb/bolt"
79

810
diff "github.com/mcluseau/go-diff"
911
)
1012

13+
const seenBatchSize = 1000
14+
1115
var (
1216
resumeKeyKey = []byte("resumeKey")
1317
metaPrefix = []byte("meta:")
@@ -26,6 +30,8 @@ type Index struct {
2630
metaBucketName []byte
2731
recordSeen bool
2832
seenBucketName []byte
33+
seenStream chan hash
34+
seenWG sync.WaitGroup
2935
}
3036

3137
func New(db *bolt.DB, bucket []byte, recordSeen bool) (idx *Index, err error) {
@@ -56,72 +62,60 @@ func New(db *bolt.DB, bucket []byte, recordSeen bool) (idx *Index, err error) {
5662
metaBucketName: append(metaPrefix, bucket...),
5763
recordSeen: recordSeen,
5864
seenBucketName: seenBucketName,
65+
seenWG: sync.WaitGroup{},
5966
}
6067
return
6168
}
6269

6370
var _ diff.Index = &Index{}
6471

65-
func (i *Index) bucket(writable bool) (tx *bolt.Tx, bucket *bolt.Bucket, err error) {
66-
tx, err = i.db.Begin(writable)
67-
if err != nil {
68-
return
69-
}
70-
71-
bucket = tx.Bucket(i.bucketName)
72-
return
73-
}
74-
7572
// Cleanup removes temp data produced by this index
7673
func (i *Index) Cleanup() (err error) {
77-
if i.seenBucketName == nil {
78-
return
74+
if i.seenStream != nil {
75+
close(i.seenStream)
76+
i.seenWG.Wait()
7977
}
8078

81-
err = i.db.Update(func(tx *bolt.Tx) (err error) {
82-
tx.DeleteBucket(i.seenBucketName)
83-
tx.OnCommit(func() {
84-
i.seenBucketName = nil
79+
if i.seenBucketName != nil {
80+
err = i.db.Update(func(tx *bolt.Tx) (err error) {
81+
tx.DeleteBucket(i.seenBucketName)
82+
tx.OnCommit(func() {
83+
i.seenBucketName = nil
84+
})
85+
return
8586
})
86-
return
87-
})
88-
if err != nil {
89-
return
87+
if err != nil {
88+
return
89+
}
9090
}
9191

9292
return
9393
}
9494

95-
func commitOrRollback(tx *bolt.Tx, err error) {
96-
if err == nil {
97-
tx.Commit()
98-
} else {
99-
tx.Rollback()
100-
}
101-
}
102-
103-
func (i *Index) Index(kv KeyValue, resumeKey []byte) (err error) {
104-
tx, bucket, err := i.bucket(true)
105-
if err != nil {
106-
return
107-
}
108-
109-
defer commitOrRollback(tx, err)
95+
func (i *Index) Index(kvs <-chan KeyValue, resumeKey <-chan []byte) (err error) {
96+
return i.db.Update(func(tx *bolt.Tx) (err error) {
97+
bucket := tx.Bucket(i.bucketName)
98+
99+
for kv := range kvs {
100+
if len(kv.Value) == 0 {
101+
// deletion
102+
err = bucket.Delete(kv.Key)
103+
} else {
104+
// create/update
105+
err = bucket.Put(kv.Key, hashOf(kv.Value).Sum(nil))
106+
}
110107

111-
if resumeKey != nil {
112-
// record resumeKey
113-
err = i.storeResumeKey(tx, resumeKey)
114-
}
108+
if err != nil {
109+
return
110+
}
111+
}
115112

116-
if len(kv.Value) == 0 {
117-
// deletion
118-
err = bucket.Delete(kv.Key)
113+
if resumeKey != nil {
114+
// record resumeKey
115+
err = i.storeResumeKey(tx, <-resumeKey)
116+
}
119117
return
120-
}
121-
122-
// create/update
123-
err = bucket.Put(kv.Key, hashOf(kv.Value).Sum(nil))
124-
return
118+
})
125119
}
126120

127121
func (i *Index) storeResumeKey(tx *bolt.Tx, resumeKey []byte) (err error) {
@@ -150,19 +144,26 @@ func (i *Index) Compare(kv KeyValue) (result diff.CompareResult, err error) {
150144
panic("nil values are not allowed here")
151145
}
152146

153-
tx, bucket, err := i.bucket(i.recordSeen)
147+
var currentValueHash []byte
148+
149+
err = i.db.View(func(tx *bolt.Tx) error {
150+
currentValueHash = tx.Bucket(i.bucketName).Get(kv.Key)
151+
return nil
152+
})
153+
154154
if err != nil {
155155
return
156156
}
157157

158-
defer commitOrRollback(tx, err)
159-
160158
if i.recordSeen {
161-
seenBucket := tx.Bucket(i.seenBucketName)
162-
err = seenBucket.Put(hashOf(kv.Key).Sum(nil), nil)
163-
}
159+
if i.seenStream == nil {
160+
i.seenStream = make(chan hash, seenBatchSize)
161+
i.seenWG.Add(1)
162+
go i.writeSeen()
163+
}
164164

165-
currentValueHash := bucket.Get(kv.Key)
165+
i.seenStream <- hashOf(kv.Key)
166+
}
166167

167168
if currentValueHash == nil {
168169
return diff.MissingKey, nil
@@ -177,40 +178,85 @@ func (i *Index) Compare(kv KeyValue) (result diff.CompareResult, err error) {
177178
}
178179
}
179180

181+
func (i *Index) writeSeen() {
182+
defer i.seenWG.Done()
183+
184+
batchCount := 0
185+
batch := make([]byte, 0, seenBatchSize*hashLen)
186+
187+
saveBatch := func() (err error) {
188+
log.Printf("save batch: %d entries", batchCount)
189+
err = i.db.Update(func(tx *bolt.Tx) (err error) {
190+
bucket := tx.Bucket(i.seenBucketName)
191+
192+
for i := 0; i < batchCount; i++ {
193+
bucket.Put(batch[i*hashLen:i+1*hashLen], []byte{})
194+
}
195+
196+
return
197+
})
198+
if err == nil {
199+
batch = batch[0:0]
200+
batchCount = 0
201+
}
202+
return
203+
}
204+
205+
for h := range i.seenStream {
206+
h.Sum(batch)
207+
batchCount++
208+
209+
if batchCount == seenBatchSize {
210+
saveBatch()
211+
}
212+
}
213+
214+
if batchCount != 0 {
215+
saveBatch()
216+
}
217+
}
218+
180219
func (i *Index) KeysNotSeen() <-chan []byte {
181220
if !i.recordSeen {
182221
return nil
183222
}
184223

185224
ch := make(chan []byte, 10)
186225

187-
go func() {
188-
defer close(ch)
189-
190-
if err := i.db.View(func(tx *bolt.Tx) (err error) {
191-
keysBucket := tx.Bucket(i.bucketName)
192-
seenBucket := tx.Bucket(i.seenBucketName)
193-
194-
err = keysBucket.ForEach(func(k, v []byte) (err error) {
195-
if seenBucket == nil {
196-
// no seenBucket => nothing was seen
197-
ch <- k
198-
}
199-
if seenBucket.Get(hashOf(k).Sum(nil)) == nil {
200-
ch <- k
201-
}
202-
return
203-
})
204-
return
205-
206-
}); err != nil {
207-
panic(err)
208-
}
209-
}()
226+
go i.sendKeysNotSeen(ch)
210227

211228
return ch
212229
}
213230

231+
func (i *Index) sendKeysNotSeen(ch chan []byte) {
232+
defer close(ch)
233+
234+
if i.seenStream != nil {
235+
close(i.seenStream)
236+
i.seenWG.Wait()
237+
}
238+
239+
if err := i.db.View(func(tx *bolt.Tx) (err error) {
240+
keysBucket := tx.Bucket(i.bucketName)
241+
seenBucket := tx.Bucket(i.seenBucketName)
242+
243+
err = keysBucket.ForEach(func(k, v []byte) (err error) {
244+
if seenBucket == nil {
245+
// no seenBucket => nothing was seen
246+
ch <- k
247+
}
248+
if seenBucket.Get(hashOf(k).Sum(nil)) == nil {
249+
ch <- k
250+
}
251+
return
252+
})
253+
return
254+
255+
}); err != nil {
256+
panic(err)
257+
}
258+
}
259+
214260
func (i *Index) Value(key []byte) []byte {
215261
panic("should not be called")
216262
}

boltindex/boltindex_test.go

+15-2
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,25 @@ import (
1010

1111
func TestKeysNotSeen(t *testing.T) {
1212
withDB(t, func(db *bolt.DB) {
13+
// -------------------------------------------------------------
1314
idx, err := New(db, []byte("test"), true)
1415
if err != nil {
1516
t.Fatal(err)
1617
}
1718

18-
idx.Index(KeyValue{Key: []byte("a"), Value: []byte("value a")}, nil)
19+
idx.Index(singleValue("a", "value a"), nil)
1920

2021
k := <-idx.KeysNotSeen()
2122
if ks := string(k); ks != "a" {
2223
t.Errorf("did not read \"a\" but %q", ks)
2324
}
2425

25-
idx.Index(KeyValue{Key: []byte("b"), Value: []byte("value b")}, nil)
26+
// -------------------------------------------------------------
27+
idx, err = New(db, []byte("test"), true)
28+
if err != nil {
29+
t.Fatal(err)
30+
}
31+
idx.Index(singleValue("b", "value b"), nil)
2632

2733
idx.Compare(KeyValue{Key: []byte("a"), Value: []byte("not value a")})
2834

@@ -39,6 +45,13 @@ func TestKeysNotSeen(t *testing.T) {
3945
})
4046
}
4147

48+
func singleValue(key, value string) (ch chan KeyValue) {
49+
ch = make(chan KeyValue, 1)
50+
ch <- KeyValue{Key: []byte(key), Value: []byte(value)}
51+
close(ch)
52+
return
53+
}
54+
4255
func withDB(t *testing.T, do func(db *bolt.DB)) {
4356
f, err := ioutil.TempFile(os.TempDir(), "boltindex-test-")
4457
if err != nil {

boltindex/hash.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ const (
1111
hashLen = hashBits / 8
1212
)
1313

14-
type hash [hashLen]byte
14+
type hash = murmur3.Hash128
1515

1616
// store the original key at 'k'+hash
1717
func dbKeyKey(keyH gohash.Hash) []byte {
@@ -23,7 +23,7 @@ func dbValueHashKey(keyH gohash.Hash) []byte {
2323
return keyH.Sum(append(make([]byte, 0, 1+hashLen), 'v'))
2424
}
2525

26-
func hashOf(data []byte) gohash.Hash {
26+
func hashOf(data []byte) hash {
2727
h := murmur3.New128()
2828
h.Write(data)
2929
return h

diff.go

+10-8
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,19 @@ func Diff(referenceValues, currentValues <-chan KeyValue, changes chan Change, c
2424
wg.Add(2)
2525

2626
go func() {
27-
for kv := range referenceValues {
28-
referenceIndex.Index(kv, nil)
27+
defer wg.Done()
28+
err := referenceIndex.Index(referenceValues, nil)
29+
if err != nil {
30+
panic(err)
2931
}
30-
wg.Done()
3132
}()
3233

3334
go func() {
34-
for kv := range currentValues {
35-
currentIndex.Index(kv, nil)
35+
defer wg.Done()
36+
err := currentIndex.Index(currentValues, nil)
37+
if err != nil {
38+
panic(err)
3639
}
37-
wg.Done()
3840
}()
3941

4042
wg.Wait()
@@ -52,8 +54,8 @@ func Diff(referenceValues, currentValues <-chan KeyValue, changes chan Change, c
5254
func DiffStreamReference(referenceValues, currentValues <-chan KeyValue, changes chan Change, cancel <-chan bool) {
5355
currentIndex := NewIndex(false)
5456

55-
for kv := range currentValues {
56-
currentIndex.Index(kv, nil)
57+
if err := currentIndex.Index(currentValues, nil); err != nil {
58+
panic(err)
5759
}
5860

5961
DiffStreamIndex(referenceValues, currentIndex, changes, cancel)

0 commit comments

Comments
 (0)