Skip to content

Commit f84a5f3

Browse files
committed
messages: WIP
End to end test.
1 parent 55727a3 commit f84a5f3

11 files changed

+383
-8
lines changed

go/vt/tabletserver/message_manager.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ func NewMessageManager(tsv *TabletServer, table *TableInfo, connpool *ConnPool)
123123
"select time_next, epoch, id, message from %v where time_next < %a order by time_next desc limit %a",
124124
mm.name, ":time_next", ":max")
125125
mm.ackQuery = buildParsedQuery(
126-
"update %v set time_acked = %a, time_next = null where id in %a",
126+
"update %v set time_acked = %a, time_next = null where id in %a and time_acked is null",
127127
mm.name, ":time_acked", "::ids")
128128
mm.postponeQuery = buildParsedQuery(
129129
"update %v set time_next = %a+(%a<<epoch), epoch = epoch+1 where id in %a and time_acked is null",

go/vt/tabletserver/message_manager_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,7 @@ func TestMMGenerate(t *testing.T) {
538538
mm.Open()
539539
defer mm.Close()
540540
query, bv := mm.GenerateAckQuery([]string{"1", "2"})
541-
wantQuery := "update foo set time_acked = :time_acked, time_next = null where id in ::ids"
541+
wantQuery := "update foo set time_acked = :time_acked, time_next = null where id in ::ids and time_acked is null"
542542
if query != wantQuery {
543543
t.Errorf("GenerateAckQuery query: %s, want %s", query, wantQuery)
544544
}

go/vt/tabletserver/tabletserver_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -1471,13 +1471,13 @@ func TestMessageAck(t *testing.T) {
14711471
}
14721472

14731473
_, err = tsv.MessageAck(ctx, &target, "msg", ids)
1474-
want = "error: query: select time_scheduled, id from msg where id in ('1', '2') limit 10001 for update is not supported"
1474+
want = "error: query: select time_scheduled, id from msg where id in ('1', '2') and time_acked is null limit 10001 for update is not supported"
14751475
if err == nil || err.Error() != want {
14761476
t.Errorf("tsv.MessageAck(invalid): %v, want %s", err, want)
14771477
}
14781478

14791479
db.AddQuery(
1480-
"select time_scheduled, id from msg where id in ('1', '2') limit 10001 for update",
1480+
"select time_scheduled, id from msg where id in ('1', '2') and time_acked is null limit 10001 for update",
14811481
&sqltypes.Result{
14821482
RowsAffected: 1,
14831483
Rows: [][]sqltypes.Value{{

go/vt/vtgate/scatter_conn.go

+6
Original file line numberDiff line numberDiff line change
@@ -436,10 +436,16 @@ func (stc *ScatterConn) StreamExecuteMulti(
436436
func (stc *ScatterConn) MessageStream(ctx context.Context, keyspace string, shards []string, name string, sendReply func(*sqltypes.Result) error) error {
437437
// mu is used to merge multiple sendReply calls into one.
438438
var mu sync.Mutex
439+
fieldSent := false
439440
allErrors := stc.multiGo(ctx, "MessageStream", keyspace, shards, topodatapb.TabletType_MASTER, func(target *querypb.Target) error {
440441
return stc.gateway.MessageStream(ctx, target, name, func(qr *sqltypes.Result) error {
441442
mu.Lock()
442443
defer mu.Unlock()
444+
if fieldSent && len(qr.Rows) == 0 {
445+
return nil
446+
}
447+
// First result is always the field info.
448+
fieldSent = true
443449
return sendReply(qr)
444450
})
445451
})

go/vt/vtgate/topo_utils.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
package vtgate
66

77
import (
8-
"bytes"
98
"encoding/hex"
109
"fmt"
1110

@@ -168,14 +167,14 @@ func mapExactShards(ctx context.Context, topoServ topo.SrvTopoServer, cell, keys
168167
}
169168
shardnum := 0
170169
for shardnum < len(allShards) {
171-
if bytes.Compare(kr.Start, []byte(allShards[shardnum].KeyRange.Start)) == 0 {
170+
if key.KeyRangeStartEqual(kr, allShards[shardnum].KeyRange) {
172171
break
173172
}
174173
shardnum++
175174
}
176175
for shardnum < len(allShards) {
177176
shards = append(shards, allShards[shardnum].Name)
178-
if bytes.Compare(kr.End, []byte(allShards[shardnum].KeyRange.End)) == 0 {
177+
if key.KeyRangeEndEqual(kr, allShards[shardnum].KeyRange) {
179178
return keyspace, shards, nil
180179
}
181180
shardnum++

go/vt/vtgate/vtgate.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ type VTGate struct {
127127
logStreamExecuteKeyRanges *logutil.ThrottledLogger
128128
logStreamExecuteShards *logutil.ThrottledLogger
129129
logUpdateStream *logutil.ThrottledLogger
130+
logMessageStream *logutil.ThrottledLogger
130131
}
131132

132133
// RegisterVTGate defines the type of registration mechanism.
@@ -177,6 +178,7 @@ func Init(ctx context.Context, hc discovery.HealthCheck, topoServer topo.Server,
177178
logStreamExecuteKeyRanges: logutil.NewThrottledLogger("StreamExecuteKeyRanges", 5*time.Second),
178179
logStreamExecuteShards: logutil.NewThrottledLogger("StreamExecuteShards", 5*time.Second),
179180
logUpdateStream: logutil.NewThrottledLogger("UpdateStream", 5*time.Second),
181+
logMessageStream: logutil.NewThrottledLogger("MessageStream", 5*time.Second),
180182
}
181183

182184
normalErrors = stats.NewMultiCounters("VtgateApiErrorCounts", []string{"Operation", "Keyspace", "DbType"})
@@ -828,7 +830,7 @@ func (vtg *VTGate) MessageStream(ctx context.Context, keyspace string, shard str
828830
"TabletType": ltt,
829831
"MessageName": name,
830832
}
831-
logError(err, query, vtg.logUpdateStream)
833+
logError(err, query, vtg.logMessageStream)
832834
}
833835
return formatError(err)
834836
}

py/vtdb/grpc_vtgate_client.py

+51
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,57 @@ def row_generator():
249249

250250
return row_generator()
251251

252+
@vtgate_utils.exponential_backoff_retry((dbexceptions.ThrottledError,
253+
dbexceptions.TransientError))
254+
def message_stream(
255+
self, keyspace, name,
256+
shard=None, key_range=None,
257+
effective_caller_id=None,
258+
**kwargs):
259+
260+
try:
261+
request = self.message_stream_request(
262+
keyspace, shard, key_range,
263+
name, effective_caller_id)
264+
it = self.stub.MessageStream(request, self.timeout)
265+
first_response = it.next()
266+
except (grpc.RpcError, vtgate_utils.VitessError) as e:
267+
raise _convert_exception(
268+
e, 'MessageStream', name=name,
269+
keyspace=keyspace)
270+
271+
fields, convs = self.build_conversions(first_response.result.fields)
272+
273+
def row_generator():
274+
try:
275+
for response in it:
276+
for row in response.result.rows:
277+
yield tuple(proto3_encoding.make_row(row, convs))
278+
except Exception:
279+
logging.exception('gRPC low-level error')
280+
raise
281+
282+
return row_generator(), fields
283+
284+
@vtgate_utils.exponential_backoff_retry((dbexceptions.ThrottledError,
285+
dbexceptions.TransientError))
286+
def message_ack(
287+
self,
288+
name, ids,
289+
keyspace=None, effective_caller_id=None,
290+
**kwargs):
291+
292+
try:
293+
request = self.message_ack_request(
294+
keyspace, name, ids, effective_caller_id)
295+
response = self.stub.MessageAck(request, self.timeout)
296+
except (grpc.RpcError, vtgate_utils.VitessError) as e:
297+
raise _convert_exception(
298+
e, 'MessageAck', name=name, ids=ids,
299+
keyspace=keyspace)
300+
301+
return response.count
302+
252303

253304
def _convert_exception(exc, *args, **kwargs):
254305
"""This parses the protocol exceptions to the api interface exceptions.

py/vtdb/proto3_encoding.py

+59
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,13 @@ def make_row(row, convs):
8484
return converted_row
8585

8686

87+
def build_value(v):
88+
"""Build a proto value from any valid input."""
89+
val = query_pb2.Value()
90+
convert_value(v, val)
91+
return val
92+
93+
8794
def convert_value(value, proto_value, allow_lists=False):
8895
"""Convert a variable from python type to proto type+value.
8996
@@ -547,6 +554,58 @@ def update_stream_request(self,
547554
self._add_caller_id(request, effective_caller_id)
548555
return request
549556

557+
def message_stream_request(self,
558+
keyspace_name,
559+
shard,
560+
key_range,
561+
name,
562+
effective_caller_id):
563+
"""Builds the right vtgate_pb2 MessageStreamRequest.
564+
565+
Args:
566+
keyspace_name: keyspace to apply the query to.
567+
shard: shard to ask for.
568+
key_range: keyrange.KeyRange object.
569+
name: message table name.
570+
effective_caller_id: optional vtgate_client.CallerID.
571+
572+
Returns:
573+
A vtgate_pb2.MessageStreamRequest object.
574+
"""
575+
request = vtgate_pb2.MessageStreamRequest(keyspace=keyspace_name,
576+
name=name,
577+
shard=shard)
578+
if key_range:
579+
request.key_range.start = key_range.Start
580+
request.key_range.end = key_range.End
581+
self._add_caller_id(request, effective_caller_id)
582+
return request
583+
584+
def message_ack_request(self,
585+
keyspace_name,
586+
name,
587+
ids,
588+
effective_caller_id):
589+
"""Builds the right vtgate_pb2 MessageAckRequest.
590+
591+
Args:
592+
keyspace_name: keyspace to apply the query to.
593+
name: message table name.
594+
ids: list of message ids.
595+
effective_caller_id: optional vtgate_client.CallerID.
596+
597+
Returns:
598+
A vtgate_pb2.MessageAckRequest object.
599+
"""
600+
vals = []
601+
for v in ids:
602+
vals.append(build_value(v))
603+
request = vtgate_pb2.MessageAckRequest(keyspace=keyspace_name,
604+
name=name,
605+
ids=vals)
606+
self._add_caller_id(request, effective_caller_id)
607+
return request
608+
550609
def stream_execute_request_and_name(self, sql, bind_variables, tablet_type,
551610
keyspace_name,
552611
shards,

py/vtdb/vtgate_client.py

+55
Original file line numberDiff line numberDiff line change
@@ -389,3 +389,58 @@ def update_stream(self,
389389
dbexceptions.FatalError: this query should not be retried.
390390
"""
391391
raise NotImplementedError('Child class needs to implement this')
392+
393+
def message_stream(self,
394+
keyspace, name,
395+
shard=None, key_range=None,
396+
effective_caller_id=None,
397+
**kwargs):
398+
"""Asks for a message stream.
399+
400+
Args:
401+
keyspace: the keyspace of the message table.
402+
name: the name of the message table.
403+
shard: the shard name to listen for.
404+
Incompatible with key_range.
405+
key_range: the key range to listen for.
406+
Incompatible with shard.
407+
effective_caller_id: CallerID object.
408+
**kwargs: implementation specific parameters.
409+
410+
Returns:
411+
A (row generator, fields) pair.
412+
413+
Raises:
414+
dbexceptions.TimeoutError: for connection timeout.
415+
dbexceptions.TransientError: the server is overloaded, and this query
416+
is asked to back off.
417+
dbexceptions.DatabaseError: generic database error.
418+
dbexceptions.FatalError: this query should not be retried.
419+
"""
420+
raise NotImplementedError('Child class needs to implement this')
421+
422+
def message_ack(self,
423+
name, ids,
424+
keyspace=None, effective_caller_id=None,
425+
**kwargs):
426+
"""Acks a list of messages.
427+
428+
Args:
429+
name: the name of the message table.
430+
ids: list of message ids to ack.
431+
keyspace: the keyspace of the message table.
432+
Not required if table can be auto-resolved.
433+
effective_caller_id: CallerID object.
434+
**kwargs: implementation specific parameters.
435+
436+
Returns:
437+
The number of rows acked.
438+
439+
Raises:
440+
dbexceptions.TimeoutError: for connection timeout.
441+
dbexceptions.TransientError: the server is overloaded, and this query
442+
is asked to back off.
443+
dbexceptions.DatabaseError: generic database error.
444+
dbexceptions.FatalError: this query should not be retried.
445+
"""
446+
raise NotImplementedError('Child class needs to implement this')

test/config.json

+9
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,15 @@
467467
"RetryMax": 0,
468468
"Tags": []
469469
},
470+
"messaging": {
471+
"File": "messaging.py",
472+
"Args": [],
473+
"Command": [],
474+
"Manual": false,
475+
"Shard": 4,
476+
"RetryMax": 0,
477+
"Tags": []
478+
},
470479
"vttest_sample": {
471480
"File": "vttest_sample_test.py",
472481
"Args": [],

0 commit comments

Comments
 (0)