forked from scylladb/gocql
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtablet_integration_test.go
134 lines (104 loc) · 3.1 KB
/
tablet_integration_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
//go:build tablet
// +build tablet
package gocql
import (
"bytes"
"context"
"fmt"
"regexp"
"strings"
"testing"
)
// Check if TokenAwareHostPolicy works correctly when using tablets
func TestTablets(t *testing.T) {
cluster := createMultiNodeCluster()
fallback := RoundRobinHostPolicy()
cluster.PoolConfig.HostSelectionPolicy = TokenAwareHostPolicy(fallback)
session := createSessionFromMultiNodeCluster(cluster, t)
defer session.Close()
if err := createTable(session, fmt.Sprintf(`CREATE TABLE %s.%s (pk int, ck int, v int, PRIMARY KEY (pk, ck));
`, "test1", "table1")); err != nil {
panic(fmt.Sprintf("unable to create table: %v", err))
}
hosts, _, err := session.hostSource.GetHosts()
assertTrue(t, "err == nil", err == nil)
hostAddresses := []string{}
for _, host := range hosts {
hostAddresses = append(hostAddresses, host.connectAddress.String())
}
ctx := context.Background()
i := 0
for i < 50 {
i = i + 1
err = session.Query(`INSERT INTO test1.table1 (pk, ck, v) VALUES (?, ?, ?);`, i, i%5, i%2).WithContext(ctx).Exec()
if err != nil {
t.Fatal(err)
}
}
i = 0
for i < 50 {
i = i + 1
var pk int
var ck int
var v int
buf := &bytes.Buffer{}
trace := NewTraceWriter(session, buf)
err = session.Query(`SELECT pk, ck, v FROM test1.table1 WHERE pk = ?;`, i).WithContext(ctx).Consistency(One).Trace(trace).Scan(&pk, &ck, &v)
if err != nil {
t.Fatal(err)
}
queriedHosts := 0
for _, hostAddress := range hostAddresses {
if strings.Contains(buf.String(), hostAddress) {
queriedHosts = queriedHosts + 1
}
}
assertEqual(t, "queriedHosts", 1, queriedHosts)
}
}
// Check if shard awareness works correctly when using tablets
func TestTabletsShardAwareness(t *testing.T) {
cluster := createMultiNodeCluster()
fallback := RoundRobinHostPolicy()
cluster.PoolConfig.HostSelectionPolicy = TokenAwareHostPolicy(fallback)
session := createSessionFromMultiNodeCluster(cluster, t)
defer session.Close()
if err := createTable(session, fmt.Sprintf(`CREATE TABLE %s.%s (pk int, ck int, v int, PRIMARY KEY (pk, ck));
`, "test1", "table_shard")); err != nil {
panic(fmt.Sprintf("unable to create table: %v", err))
}
ctx := context.Background()
i := 0
for i < 50 {
i = i + 1
err := session.Query(`INSERT INTO test1.table_shard (pk, ck, v) VALUES (?, ?, ?);`, i, i%5, i%2).WithContext(ctx).Exec()
if err != nil {
t.Fatal(err)
}
}
i = 0
for i < 50 {
i = i + 1
var pk int
var ck int
var v int
buf := &bytes.Buffer{}
trace := NewTraceWriter(session, buf)
err := session.Query(`SELECT pk, ck, v FROM test1.table_shard WHERE pk = ?;`, i).WithContext(ctx).Consistency(One).Trace(trace).Scan(&pk, &ck, &v)
if err != nil {
t.Fatal(err)
}
re := regexp.MustCompile(`\[shard .*\]`)
shards := re.FindAllString(buf.String(), -1)
// find duplicates to check how many shards are used
allShards := make(map[string]bool)
shardList := []string{}
for _, item := range shards {
if _, value := allShards[item]; !value {
allShards[item] = true
shardList = append(shardList, item)
}
}
assertEqual(t, "shardList", 1, len(shardList))
}
}