Skip to content

Commit 4855144

Browse files
author
Mikaël Cluseau
committed
initial commit
0 parents  commit 4855144

File tree

5 files changed

+449
-0
lines changed

5 files changed

+449
-0
lines changed

change.go

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package diff
2+
3+
import (
4+
"fmt"
5+
)
6+
7+
type ChangeType int
8+
9+
const (
10+
Unchanged = iota
11+
Created
12+
Modified
13+
Deleted
14+
)
15+
16+
type Change struct {
17+
Type ChangeType
18+
Key []byte
19+
// The new value (Created and Modified events only)
20+
Value []byte
21+
}
22+
23+
func (c Change) String() string {
24+
var s string
25+
switch c.Type {
26+
case Unchanged:
27+
s = "U"
28+
case Created:
29+
s = "C"
30+
case Modified:
31+
s = "M"
32+
case Deleted:
33+
s = "D"
34+
default:
35+
panic(fmt.Errorf("Invalid change type: %d", c.Type))
36+
}
37+
38+
return fmt.Sprintf("%s %q %q", s, c.Key, c.Value)
39+
}

diff.go

+158
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
package diff
2+
3+
import (
4+
"sync"
5+
)
6+
7+
// Compares a store (currentValues) with a reference (referenceValues), streaming the reference.
8+
//
9+
// The referenceValues channel provide values in the reference store. It will be indexed.
10+
//
11+
// The currentValues channel provide values in the target store. It will be indexed.
12+
//
13+
// The changes channel will receive the changes, including Unchanged.
14+
//
15+
// See other diff implementations for less faster and less memory consumming alternatives if
16+
// you can provide better garanties from your stores.
17+
func Diff(referenceValues, currentValues <-chan KeyValue, changes chan Change) {
18+
referenceIndex := NewIndex(true)
19+
currentIndex := NewIndex(false)
20+
21+
wg := sync.WaitGroup{}
22+
wg.Add(2)
23+
24+
go func() {
25+
for kv := range referenceValues {
26+
referenceIndex.Index(kv)
27+
}
28+
wg.Done()
29+
}()
30+
31+
go func() {
32+
for kv := range currentValues {
33+
currentIndex.Index(kv)
34+
}
35+
wg.Done()
36+
}()
37+
38+
wg.Wait()
39+
40+
DiffIndexIndex(referenceIndex, currentIndex, changes)
41+
}
42+
43+
// Compares a store (currentValues) with a reference (referenceValues), streaming the reference.
44+
//
45+
// The referenceValues channel provide values in the reference store. It MUST NOT produce duplicate keys.
46+
//
47+
// The currentValues channel provide values in the target store. It will be indexed.
48+
//
49+
// The changes channel will receive the changes, including Unchanged.
50+
func DiffStreamReference(referenceValues, currentValues <-chan KeyValue, changes chan Change) {
51+
currentIndex := NewIndex(false)
52+
53+
for kv := range currentValues {
54+
currentIndex.Index(kv)
55+
}
56+
57+
DiffStreamIndex(referenceValues, currentIndex, changes)
58+
}
59+
60+
// Compares a store (currentIndex) with a reference (referenceValues), streaming the reference.
61+
//
62+
// The referenceValues channel provide values in the reference store. It MUST NOT produce duplicate keys.
63+
//
64+
// The currentIndex is the indexed target store.
65+
//
66+
// The changes channel will receive the changes, including Unchanged.
67+
func DiffStreamIndex(referenceValues <-chan KeyValue, currentIndex *Index, changes chan Change) {
68+
for kv := range referenceValues {
69+
kv := kv
70+
71+
switch currentIndex.Compare(kv) {
72+
case MissingKey:
73+
changes <- Change{
74+
Type: Created,
75+
Key: kv.Key,
76+
Value: kv.Value,
77+
}
78+
79+
case ModifiedKey:
80+
changes <- Change{
81+
Type: Modified,
82+
Key: kv.Key,
83+
Value: kv.Value,
84+
}
85+
86+
case UnchangedKey:
87+
changes <- Change{
88+
Type: Unchanged,
89+
Key: kv.Key,
90+
}
91+
92+
}
93+
}
94+
95+
for key := range currentIndex.KeysNotSeen() {
96+
changes <- Change{
97+
Type: Deleted,
98+
Key: key,
99+
}
100+
}
101+
}
102+
103+
// Compares a store (currentValues) with a reference (referenceIndex), streaming the reference.
104+
//
105+
// The referenceIndex is the indexed target store. It MUST store the values.
106+
//
107+
// The currentValues channel provide values in the reference store. It MUST NOT produce duplicate keys.
108+
//
109+
// The changes channel will receive the changes, including Unchanged.
110+
func DiffIndexStream(referenceIndex *Index, currentValues <-chan KeyValue, changes chan Change) {
111+
if !referenceIndex.recordValues {
112+
panic("referenceIndex must record values")
113+
}
114+
for kv := range currentValues {
115+
kv := kv
116+
117+
switch referenceIndex.Compare(kv) {
118+
case MissingKey:
119+
changes <- Change{
120+
Type: Deleted,
121+
Key: kv.Key,
122+
}
123+
124+
case ModifiedKey:
125+
changes <- Change{
126+
Type: Modified,
127+
Key: kv.Key,
128+
Value: kv.Value,
129+
}
130+
131+
case UnchangedKey:
132+
changes <- Change{
133+
Type: Unchanged,
134+
Key: kv.Key,
135+
}
136+
137+
}
138+
}
139+
140+
for key := range referenceIndex.KeysNotSeen() {
141+
changes <- Change{
142+
Type: Created,
143+
Key: key,
144+
Value: referenceIndex.Value(key),
145+
}
146+
}
147+
}
148+
149+
// Compares a store (currentValues) with a reference (referenceIndex), streaming the reference.
150+
//
151+
// The referenceIndex is the indexed target store. It MUST store the values.
152+
//
153+
// The currentIndex is the indexed target store.
154+
//
155+
// The changes channel will receive the changes, including Unchanged.
156+
func DiffIndexIndex(referenceIndex *Index, currentIndex *Index, changes chan Change) {
157+
DiffStreamIndex(referenceIndex.KeyValues(), currentIndex, changes)
158+
}

diff_test.go

+144
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
package diff
2+
3+
import (
4+
"bytes"
5+
"testing"
6+
)
7+
8+
func TestDiffAllChangeTypes(t *testing.T) {
9+
testChanges(t, []KeyValue{
10+
kvS("k1", "v1"),
11+
kvS("k2", "v2"),
12+
kvS("k3", "v3.1"),
13+
}, []KeyValue{
14+
kvS("k1", "v1"),
15+
kvS("k3", "v3"),
16+
kvS("k4", "v4"),
17+
}, []Change{
18+
{Unchanged, []byte("k1"), nil},
19+
{Created, []byte("k2"), []byte("v2")},
20+
{Modified, []byte("k3"), []byte("v3.1")},
21+
{Deleted, []byte("k4"), nil},
22+
})
23+
}
24+
25+
func TestDiffEmpty(t *testing.T) {
26+
testChanges(t, []KeyValue{}, []KeyValue{}, []Change{})
27+
}
28+
29+
func TestDiffCreate(t *testing.T) {
30+
testChanges(t, []KeyValue{
31+
kvS("k1", "v1"),
32+
}, []KeyValue{}, []Change{
33+
{Created, []byte("k1"), []byte("v1")},
34+
})
35+
}
36+
37+
func TestDiffSame(t *testing.T) {
38+
testChanges(t, []KeyValue{
39+
kvS("k1", "v1"),
40+
}, []KeyValue{
41+
kvS("k1", "v1"),
42+
}, []Change{
43+
{Unchanged, []byte("k1"), nil},
44+
})
45+
}
46+
47+
func TestDiffModify(t *testing.T) {
48+
testChanges(t, []KeyValue{
49+
kvS("k1", "v1.1"),
50+
}, []KeyValue{
51+
kvS("k1", "v1"),
52+
}, []Change{
53+
{Modified, []byte("k1"), []byte("v1.1")},
54+
})
55+
}
56+
57+
func TestDiffDelete(t *testing.T) {
58+
testChanges(t, []KeyValue{}, []KeyValue{
59+
kvS("k1", "v1.1"),
60+
}, []Change{
61+
{Deleted, []byte("k1"), nil},
62+
})
63+
}
64+
65+
func TestDiffIndexStream(t *testing.T) {
66+
changes := make(chan Change, 10)
67+
68+
refIndex := NewIndex(true)
69+
refIndex.Index(kvS("k2", "v2"))
70+
71+
go func() {
72+
DiffIndexStream(refIndex, stream([]KeyValue{
73+
kvS("k1", "v1"),
74+
}), changes)
75+
close(changes)
76+
}()
77+
78+
expectChanges(t, changes, []Change{
79+
{Deleted, []byte("k1"), nil},
80+
{Created, []byte("k2"), []byte("v2")},
81+
})
82+
}
83+
84+
func stream(kvs []KeyValue) <-chan KeyValue {
85+
c := make(chan KeyValue, 1)
86+
go func() {
87+
for _, kv := range kvs {
88+
c <- kv
89+
}
90+
close(c)
91+
}()
92+
return c
93+
}
94+
95+
func kvS(key, value string) KeyValue {
96+
return KeyValue{[]byte(key), []byte(value)}
97+
}
98+
99+
func testChanges(t *testing.T, ref, current []KeyValue, exp []Change) {
100+
t.Helper()
101+
102+
changes := make(chan Change, 10)
103+
104+
go func() {
105+
Diff(stream(ref), stream(current), changes)
106+
close(changes)
107+
}()
108+
109+
expectChanges(t, changes, exp)
110+
}
111+
112+
func expectChanges(t *testing.T, changes <-chan Change, exp []Change) {
113+
t.Helper()
114+
115+
for change := range changes {
116+
found := false
117+
for idx, expChange := range exp {
118+
if eq(change, expChange) {
119+
exp = append(exp[0:idx], exp[idx+1:]...)
120+
found = true
121+
}
122+
}
123+
if !found {
124+
t.Error("unexpected change: ", change)
125+
}
126+
}
127+
128+
for _, noFound := range exp {
129+
t.Error("expected change not found: ", noFound)
130+
}
131+
}
132+
133+
func eq(c1, c2 Change) bool {
134+
if c1.Type != c2.Type {
135+
return false
136+
}
137+
if !bytes.Equal(c1.Key, c2.Key) {
138+
return false
139+
}
140+
if !bytes.Equal(c1.Value, c2.Value) {
141+
return false
142+
}
143+
return true
144+
}

0 commit comments

Comments
 (0)