Skip to content

Commit d621465

Browse files
committed
messages: review comments
1 parent f84a5f3 commit d621465

File tree

12 files changed

+246
-221
lines changed

12 files changed

+246
-221
lines changed

go/vt/proto/query/query.pb.go

+182-174
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

go/vt/tabletserver/config.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ var (
3131

3232
func init() {
3333
flag.IntVar(&qsConfig.PoolSize, "queryserver-config-pool-size", DefaultQsConfig.PoolSize, "query server connection pool size, connection pool is used by regular queries (non streaming, not in a transaction)")
34-
flag.IntVar(&qsConfig.StreamPoolSize, "queryserver-config-stream-pool-size", DefaultQsConfig.StreamPoolSize, "query server stream pool size, stream pool is used by stream queries: queries that return results to client in a streaming fashion")
35-
flag.IntVar(&qsConfig.MessagePoolSize, "queryserver-config-message-pool-size", DefaultQsConfig.MessagePoolSize, "query server message pool size, message pool is used by message managers: recommended value is one per message table")
34+
flag.IntVar(&qsConfig.StreamPoolSize, "queryserver-config-stream-pool-size", DefaultQsConfig.StreamPoolSize, "query server stream connection pool size, stream pool is used by stream queries: queries that return results to client in a streaming fashion")
35+
flag.IntVar(&qsConfig.MessagePoolSize, "queryserver-config-message-conn-pool-size", DefaultQsConfig.MessagePoolSize, "query server message connection pool size, message pool is used by message managers: recommended value is one per message table")
3636
flag.IntVar(&qsConfig.TransactionCap, "queryserver-config-transaction-cap", DefaultQsConfig.TransactionCap, "query server transaction cap is the maximum number of transactions allowed to happen at any given point of a time for a single vttablet. E.g. by setting transaction cap to 100, there are at most 100 transactions will be processed by a vttablet and the 101th transaction will be blocked (and fail if it cannot get connection within specified timeout)")
3737
flag.Float64Var(&qsConfig.TransactionTimeout, "queryserver-config-transaction-timeout", DefaultQsConfig.TransactionTimeout, "query server transaction timeout (in seconds), a transaction will be killed if it takes longer than this value")
3838
flag.Float64Var(&qsConfig.TxShutDownGracePeriod, "transaction_shutdown_grace_period", DefaultQsConfig.TxShutDownGracePeriod, "how long to wait (in seconds) for transactions to complete during graceful shutdown.")

go/vt/tabletserver/grpcqueryservice/server.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,9 @@ func (q *query) MessageAck(ctx context.Context, request *querypb.MessageAckReque
330330
return nil, vterrors.ToGRPCError(err)
331331
}
332332
return &querypb.MessageAckResponse{
333-
Count: count,
333+
Result: &querypb.QueryResult{
334+
RowsAffected: uint64(count),
335+
},
334336
}, nil
335337
}
336338

go/vt/tabletserver/grpctabletconn/conn.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -547,7 +547,7 @@ func (conn *gRPCQueryClient) MessageAck(ctx context.Context, target *querypb.Tar
547547
if err != nil {
548548
return 0, tabletconn.TabletErrorFromGRPC(err)
549549
}
550-
return reply.Count, nil
550+
return int64(reply.Result.RowsAffected), nil
551551
}
552552

553553
// SplitQuery is the stub for TabletServer.SplitQuery RPC

go/vt/tabletserver/schema_info.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,8 @@ func (si *SchemaInfo) GetTable(tableName sqlparser.TableIdent) *TableInfo {
490490
return si.tables[tableName.String()]
491491
}
492492

493-
// GetSchema returns a copy of the schema.
493+
// GetSchema returns the current schema. The Tables are a shared
494+
// data strucutre and must be treated as read-only.
494495
func (si *SchemaInfo) GetSchema() map[string]*schema.Table {
495496
si.mu.Lock()
496497
defer si.mu.Unlock()

go/vt/vtgate/grpcvtgateconn/conn.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -484,7 +484,7 @@ func (conn *vtgateConn) MessageAck(ctx context.Context, keyspace string, name st
484484
if err != nil {
485485
return 0, vterrors.FromGRPCError(err)
486486
}
487-
return r.Count, nil
487+
return int64(r.Result.RowsAffected), nil
488488
}
489489

490490
func (conn *vtgateConn) SplitQuery(

go/vt/vtgate/grpcvtgateservice/server.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,11 @@ func (vtg *VTGate) MessageAck(ctx context.Context, request *vtgatepb.MessageAckR
416416
if vtgErr != nil {
417417
return nil, vterrors.ToGRPCError(vtgErr)
418418
}
419-
return &querypb.MessageAckResponse{Count: count}, nil
419+
return &querypb.MessageAckResponse{
420+
Result: &querypb.QueryResult{
421+
RowsAffected: uint64(count),
422+
},
423+
}, nil
420424
}
421425

422426
// SplitQuery is the RPC version of vtgateservice.VTGateService method

go/vt/vtgate/vtgate.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -806,7 +806,8 @@ func (vtg *VTGate) GetSrvKeyspace(ctx context.Context, keyspace string) (*topoda
806806
return vtg.resolver.toposerv.GetSrvKeyspace(ctx, vtg.resolver.cell, keyspace)
807807
}
808808

809-
// MessageStream is part of the vtgate service API.
809+
// MessageStream is part of the vtgate service API. This is a V2 level API that's sent
810+
// to the Resolver.
810811
func (vtg *VTGate) MessageStream(ctx context.Context, keyspace string, shard string, keyRange *topodatapb.KeyRange, name string, sendReply func(*sqltypes.Result) error) error {
811812
startTime := time.Now()
812813
ltt := topoproto.TabletTypeLString(topodatapb.TabletType_MASTER)
@@ -835,7 +836,9 @@ func (vtg *VTGate) MessageStream(ctx context.Context, keyspace string, shard str
835836
return formatError(err)
836837
}
837838

838-
// MessageAck is part of the vtgate service API.
839+
// MessageAck is part of the vtgate service API. This is a V3 level API that's sent
840+
// to the Router. The table name will be resolved using V3 rules, and the routing
841+
// will make use of vindexes for sharded keyspaces.
839842
func (vtg *VTGate) MessageAck(ctx context.Context, keyspace string, name string, ids []*querypb.Value) (int64, error) {
840843
startTime := time.Now()
841844
ltt := topoproto.TabletTypeLString(topodatapb.TabletType_MASTER)

go/vt/vtgate/vtgateconntest/client.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -724,7 +724,7 @@ func (f *fakeVTGateService) MessageAck(ctx context.Context, keyspace string, nam
724724
if !reflect.DeepEqual(ids, messageids) {
725725
return 0, errors.New("MessageAck ids mismatch")
726726
}
727-
return 1, nil
727+
return messageAckRowsAffected, nil
728728
}
729729

730730
// querySplitQuery contains all the fields we use to test SplitQuery
@@ -1882,7 +1882,10 @@ func testMessageStreamPanic(t *testing.T, conn *vtgateconn.VTGateConn) {
18821882

18831883
func testMessageAck(t *testing.T, conn *vtgateconn.VTGateConn) {
18841884
ctx := newContext()
1885-
_, err := conn.MessageAck(ctx, "", messageName, messageids)
1885+
got, err := conn.MessageAck(ctx, "", messageName, messageids)
1886+
if got != messageAckRowsAffected {
1887+
t.Errorf("MessageAck: %d, want %d", got, messageAckRowsAffected)
1888+
}
18861889
if err != nil {
18871890
t.Error(err)
18881891
}
@@ -2635,3 +2638,4 @@ var messageids = []*querypb.Value{
26352638
sqltypes.MakeString([]byte("1")).ToProtoValue(),
26362639
sqltypes.MakeString([]byte("3")).ToProtoValue(),
26372640
}
2641+
var messageAckRowsAffected = int64(1)

proto/query.proto

+4-2
Original file line numberDiff line numberDiff line change
@@ -588,8 +588,10 @@ message MessageAckRequest {
588588

589589
// MessageAckResponse is the response for MessageAck.
590590
message MessageAckResponse {
591-
// count is the number of messages acked.
592-
int64 count = 1;
591+
// result contains the result of the ack operation.
592+
// Since this acts like a DML, only
593+
// RowsAffected is returned in the result.
594+
QueryResult result = 1;
593595
}
594596

595597
// SplitQueryRequest is the payload for SplitQuery sent by VTGate to a VTTablet.

py/vtdb/grpc_vtgate_client.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ def message_ack(
298298
e, 'MessageAck', name=name, ids=ids,
299299
keyspace=keyspace)
300300

301-
return response.count
301+
return response.result.rows_affected
302302

303303

304304
def _convert_exception(exc, *args, **kwargs):

0 commit comments

Comments
 (0)