Skip to content

Commit 888c91c

Browse files
Use tablets in TokenAwareHostPolicy
Add mechanism to parse system.tablets periodically. In TokenAwareHostPolicy check if keyspace uses tablets if so try to use them to find replicas. Make shard awareness work when using tablets.
1 parent 0d93d26 commit 888c91c

11 files changed

+408
-25
lines changed

conn.go

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,8 @@ type Conn struct {
208208

209209
timeouts int64
210210

211-
logger StdLogger
211+
logger StdLogger
212+
tabletsRoutingV1 bool
212213
}
213214

214215
// connect establishes a connection to a Cassandra node using session's connection config.
@@ -724,6 +725,9 @@ func (c *Conn) recv(ctx context.Context) error {
724725
} else if head.stream == -1 {
725726
// TODO: handle cassandra event frames, we shouldnt get any currently
726727
framer := newFramerWithExts(c.compressor, c.version, c.cqlProtoExts)
728+
c.mu.Lock()
729+
c.tabletsRoutingV1 = framer.tabletsRoutingV1
730+
c.mu.Unlock()
727731
if err := framer.readFrame(c, &head); err != nil {
728732
return err
729733
}
@@ -733,6 +737,9 @@ func (c *Conn) recv(ctx context.Context) error {
733737
// reserved stream that we dont use, probably due to a protocol error
734738
// or a bug in Cassandra, this should be an error, parse it and return.
735739
framer := newFramerWithExts(c.compressor, c.version, c.cqlProtoExts)
740+
c.mu.Lock()
741+
c.tabletsRoutingV1 = framer.tabletsRoutingV1
742+
c.mu.Unlock()
736743
if err := framer.readFrame(c, &head); err != nil {
737744
return err
738745
}
@@ -1069,6 +1076,9 @@ func (c *Conn) exec(ctx context.Context, req frameBuilder, tracer Tracer) (*fram
10691076

10701077
// resp is basically a waiting semaphore protecting the framer
10711078
framer := newFramerWithExts(c.compressor, c.version, c.cqlProtoExts)
1079+
c.mu.Lock()
1080+
c.tabletsRoutingV1 = framer.tabletsRoutingV1
1081+
c.mu.Unlock()
10721082

10731083
call := &callReq{
10741084
timeout: make(chan struct{}),
@@ -1453,6 +1463,63 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter {
14531463
return &Iter{err: err}
14541464
}
14551465

1466+
if len(framer.customPayload) > 0 {
1467+
if tabletInfo, ok := framer.customPayload["tablets-routing-v1"]; ok {
1468+
var firstToken string
1469+
var lastToken string
1470+
var replicas [][]interface{}
1471+
tabletInfoValue := []interface{}{&firstToken, &lastToken, &replicas}
1472+
Unmarshal(TupleTypeInfo{
1473+
NativeType: NativeType{proto: c.version, typ: TypeTuple},
1474+
Elems: []TypeInfo{
1475+
NativeType{typ: TypeBigInt},
1476+
NativeType{typ: TypeBigInt},
1477+
CollectionType{
1478+
NativeType: NativeType{proto: c.version, typ: TypeList},
1479+
Elem: TupleTypeInfo{
1480+
NativeType: NativeType{proto: c.version, typ: TypeTuple},
1481+
Elems: []TypeInfo{
1482+
NativeType{proto: c.version, typ: TypeUUID},
1483+
NativeType{proto: c.version, typ: TypeInt},
1484+
}},
1485+
},
1486+
},
1487+
}, tabletInfo, tabletInfoValue)
1488+
1489+
tablet := TabletInfo{}
1490+
tablet.firstToken, err = strconv.ParseInt(firstToken, 10, 64)
1491+
if err != nil {
1492+
return &Iter{err: err}
1493+
}
1494+
tablet.lastToken, err = strconv.ParseInt(lastToken, 10, 64)
1495+
if err != nil {
1496+
return &Iter{err: err}
1497+
}
1498+
1499+
tabletReplicas := make([]ReplicaInfo, 0, len(replicas))
1500+
for _, replica := range replicas {
1501+
if len(replica) != 2 {
1502+
return &Iter{err: err}
1503+
}
1504+
if hostId, ok := replica[0].(UUID); ok {
1505+
if shardId, ok := replica[1].(int); ok {
1506+
repInfo := ReplicaInfo{hostId, shardId}
1507+
tabletReplicas = append(tabletReplicas, repInfo)
1508+
} else {
1509+
return &Iter{err: err}
1510+
}
1511+
} else {
1512+
return &Iter{err: err}
1513+
}
1514+
}
1515+
tablet.replicas = tabletReplicas
1516+
tablet.keyspaceName = qry.routingInfo.keyspace
1517+
tablet.tableName = qry.routingInfo.table
1518+
1519+
addTablet(c.session.hostSource, &tablet)
1520+
}
1521+
}
1522+
14561523
if len(framer.traceID) > 0 && qry.trace != nil {
14571524
qry.trace.Trace(framer.traceID)
14581525
}

connectionpool.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ type SetPartitioner interface {
2626
SetPartitioner(partitioner string)
2727
}
2828

29+
// interface to implement to receive the tablets value
30+
// Experimental, this interface and use may change
31+
type SetTablets interface {
32+
SetTablets(tablets []*TabletInfo)
33+
}
34+
2935
func setupTLSConfig(sslOpts *SslOptions) (*tls.Config, error) {
3036
// Config.InsecureSkipVerify | EnableHostVerification | Result
3137
// Config is nil | true | verify host
@@ -312,7 +318,7 @@ func newHostConnPool(session *Session, host *HostInfo, port, size int,
312318
}
313319

314320
// Pick a connection from this connection pool for the given query.
315-
func (pool *hostConnPool) Pick(token token) *Conn {
321+
func (pool *hostConnPool) Pick(token token, keyspace string, table string) *Conn {
316322
pool.mu.RLock()
317323
defer pool.mu.RUnlock()
318324

@@ -330,7 +336,7 @@ func (pool *hostConnPool) Pick(token token) *Conn {
330336
}
331337
}
332338

333-
return pool.connPicker.Pick(token)
339+
return pool.connPicker.Pick(token, keyspace, table)
334340
}
335341

336342
// Size returns the number of connections currently active in the pool

connpicker.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
)
88

99
type ConnPicker interface {
10-
Pick(token) *Conn
10+
Pick(token, string, string) *Conn
1111
Put(*Conn)
1212
Remove(conn *Conn)
1313
Size() (int, int)
@@ -65,7 +65,7 @@ func (p *defaultConnPicker) Size() (int, int) {
6565
return size, p.size - size
6666
}
6767

68-
func (p *defaultConnPicker) Pick(token) *Conn {
68+
func (p *defaultConnPicker) Pick(token, string, string) *Conn {
6969
pos := int(atomic.AddUint32(&p.pos, 1) - 1)
7070
size := len(p.conns)
7171

@@ -104,7 +104,7 @@ func (*defaultConnPicker) NextShard() (shardID, nrShards int) {
104104
// to the point where we have first connection.
105105
type nopConnPicker struct{}
106106

107-
func (nopConnPicker) Pick(token) *Conn {
107+
func (nopConnPicker) Pick(token, string, string) *Conn {
108108
return nil
109109
}
110110

frame.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,7 @@ type framer struct {
367367

368368
flagLWT int
369369
rateLimitingErrorCode int
370+
tabletsRoutingV1 bool
370371
}
371372

372373
func newFramer(compressor Compressor, version byte) *framer {
@@ -398,6 +399,8 @@ func newFramer(compressor Compressor, version byte) *framer {
398399
f.header = nil
399400
f.traceID = nil
400401

402+
f.tabletsRoutingV1 = false
403+
401404
return f
402405
}
403406

@@ -435,6 +438,7 @@ func newFramerWithExts(compressor Compressor, version byte, cqlProtoExts []cqlPr
435438
tabletsRoutingV1, tabletsRoutingV1Ext{}))
436439
return f
437440
}
441+
f.tabletsRoutingV1 = true
438442
}
439443

440444
return f

host_source.go

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,12 +472,151 @@ func (h *HostInfo) ScyllaShardAwarePortTLS() uint16 {
472472
return h.scyllaShardAwarePortTLS
473473
}
474474

475+
// Experimental, this interface and use may change
476+
type ReplicaInfo struct {
477+
hostId UUID
478+
shardId int
479+
}
480+
481+
// Experimental, this interface and use may change
482+
type TabletInfo struct {
483+
mu sync.RWMutex
484+
keyspaceName string
485+
tableName string
486+
firstToken int64
487+
lastToken int64
488+
replicas []ReplicaInfo
489+
}
490+
491+
func (t *TabletInfo) KeyspaceName() string {
492+
t.mu.RLock()
493+
defer t.mu.RUnlock()
494+
return t.keyspaceName
495+
}
496+
497+
func (t *TabletInfo) FirstToken() int64 {
498+
t.mu.RLock()
499+
defer t.mu.RUnlock()
500+
return t.firstToken
501+
}
502+
503+
func (t *TabletInfo) LastToken() int64 {
504+
t.mu.RLock()
505+
defer t.mu.RUnlock()
506+
return t.lastToken
507+
}
508+
509+
func (t *TabletInfo) TableName() string {
510+
t.mu.RLock()
511+
defer t.mu.RUnlock()
512+
return t.tableName
513+
}
514+
515+
func (t *TabletInfo) Replicas() []ReplicaInfo {
516+
t.mu.RLock()
517+
defer t.mu.RUnlock()
518+
return t.replicas
519+
}
520+
521+
// Search for place in tablets table with specific Keyspace and Table name
522+
func findTablets(tablets []*TabletInfo, k string, t string) (int, int) {
523+
l := -1
524+
r := -1
525+
for i, tablet := range tablets {
526+
if tablet.KeyspaceName() == k && tablet.TableName() == t {
527+
if l == -1 {
528+
l = i
529+
}
530+
r = i
531+
} else if l != -1 {
532+
break
533+
}
534+
}
535+
536+
return l, r
537+
}
538+
539+
func addTabletToTabletsList(tablets []*TabletInfo, tablet *TabletInfo) []*TabletInfo {
540+
l, r := findTablets(tablets, tablet.keyspaceName, tablet.tableName)
541+
if l == -1 && r == -1 {
542+
l = 0
543+
r = 0
544+
} else {
545+
r = r + 1
546+
}
547+
548+
l1, r1 := l, r
549+
l2, r2 := l1, r1
550+
551+
// find first overlaping range
552+
for l1 < r1 {
553+
mid := (l1 + r1) / 2
554+
if tablets[mid].FirstToken() < tablet.FirstToken() {
555+
l1 = mid + 1
556+
} else {
557+
r1 = mid
558+
}
559+
}
560+
start := l1
561+
562+
if start > l && tablets[start-1].LastToken() > tablet.FirstToken() {
563+
start = start - 1
564+
}
565+
566+
// find last overlaping range
567+
for l2 < r2 {
568+
mid := (l2 + r2) / 2
569+
if tablets[mid].LastToken() < tablet.LastToken() {
570+
l2 = mid + 1
571+
} else {
572+
r2 = mid
573+
}
574+
}
575+
end := l2
576+
if end < r && tablets[end].FirstToken() >= tablet.LastToken() {
577+
end = end - 1
578+
}
579+
if end == len(tablets) {
580+
end = end - 1
581+
}
582+
583+
updated_tablets := tablets
584+
if start <= end {
585+
// Delete elements from index start to end
586+
updated_tablets = append(tablets[:start], tablets[end+1:]...)
587+
}
588+
// Insert tablet element at index start
589+
updated_tablets2 := append(updated_tablets[:start], append([]*TabletInfo{tablet}, updated_tablets[start:]...)...)
590+
return updated_tablets2
591+
}
592+
593+
// Search for place in tablets table for token starting from index l to index r
594+
func findTabletForToken(tablets []*TabletInfo, token token, l int, r int) *TabletInfo {
595+
for l < r {
596+
var m int
597+
if r*l > 0 {
598+
m = l + (r-l)/2
599+
} else {
600+
m = (r + l) / 2
601+
}
602+
if int64Token(tablets[m].LastToken()).Less(token) {
603+
l = m + 1
604+
} else {
605+
r = m
606+
}
607+
}
608+
609+
return tablets[l]
610+
}
611+
475612
// Polls system.peers at a specific interval to find new hosts
476613
type ringDescriber struct {
477614
session *Session
478615
mu sync.Mutex
479616
prevHosts []*HostInfo
480617
prevPartitioner string
618+
// Experimental, this interface and use may change
619+
prevTablets []*TabletInfo
481620
}
482621

483622
// Returns true if we are using system_schema.keyspaces instead of system.schema_keyspaces
@@ -835,6 +974,21 @@ func refreshRing(r *ringDescriber) error {
835974

836975
r.session.metadata.setPartitioner(partitioner)
837976
r.session.policy.SetPartitioner(partitioner)
977+
978+
return nil
979+
}
980+
981+
// Experimental, this interface and use may change
982+
func addTablet(r *ringDescriber, tablet *TabletInfo) error {
983+
r.mu.Lock()
984+
defer r.mu.Unlock()
985+
986+
tablets := r.session.getTablets()
987+
tablets = addTabletToTabletsList(tablets, tablet)
988+
989+
r.session.ring.setTablets(tablets)
990+
r.session.policy.SetTablets(tablets)
991+
838992
return nil
839993
}
840994

0 commit comments

Comments
 (0)