Skip to content

Commit

Permalink
When doing a full copy, first send table schemas as DDL statement events
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Nov 5, 2024
1 parent a9bab82 commit b1490c7
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 1 deletion.
2 changes: 1 addition & 1 deletion examples/local/vstream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
*/
func main() {
ctx := context.Background()
streamCustomer := true
streamCustomer := false
var vgtid *binlogdatapb.VGtid
if streamCustomer {
vgtid = &binlogdatapb.VGtid{
Expand Down
39 changes: 39 additions & 0 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ const tabletPickerContextTimeout = 90 * time.Second
// ending the stream from the tablet.
const stopOnReshardDelay = 500 * time.Millisecond

type globalTableName struct {
Keyspace string
Shard string
Table string
}

// vstream contains the metadata for one VStream request.
type vstream struct {
// mu protects parts of vgtid, the semantics of a send, and journaler.
Expand Down Expand Up @@ -123,6 +129,9 @@ type vstream struct {
// the shard map tracking the copy completion, keyed by streamId. streamId is of the form <keyspace>.<shard>
copyCompletedShard map[string]struct{}

// A map of initial table schemas (CREATE TABLE) sent to the client if we're copying tables.
copySchemaSent map[globalTableName]struct{}

vsm *vstreamManager

eventCh chan []*binlogdatapb.VEvent
Expand Down Expand Up @@ -595,6 +604,36 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
}
}

if sgtid.Gtid == "" { // We're copying the tables
if vs.copySchemaSent == nil {
vs.copySchemaSent = make(map[globalTableName]struct{}, 5)
}
ddlevents := make([]*binlogdatapb.VEvent, 0, 5)
if err := tabletConn.GetSchema(ctx, target, querypb.SchemaTableType_TABLES, nil, func(res *querypb.GetSchemaResponse) error {
for tableName, schema := range res.TableDefinition {
key := globalTableName{Keyspace: sgtid.Keyspace, Shard: sgtid.Shard, Table: tableName}
func() {
vs.mu.Lock()
defer vs.mu.Unlock()
if _, ok := vs.copySchemaSent[key]; !ok {
ev := &binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_DDL,
Statement: schema,
}
ddlevents = append(ddlevents, ev)
vs.copySchemaSent[key] = struct{}{}
}
}()
}
return nil
}); err != nil {
return vterrors.Wrapf(err, "failed to get schema for keyspace %s", sgtid.Keyspace)
}
if err := vs.send(ddlevents); err != nil {
return vterrors.Wrapf(err, "failed to send schema DDL events for keyspace %s", sgtid.Keyspace)
}
}

// Safe to access sgtid.Gtid here (because it can't change until streaming begins).
req := &binlogdatapb.VStreamRequest{
Target: target,
Expand Down

0 comments on commit b1490c7

Please sign in to comment.