Skip to content
26 changes: 18 additions & 8 deletions cmd/bootstrap/gr/group_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,14 @@ func (m *mysqlsh) rescanCluster(ctx context.Context) error {
}

type SQLResult struct {
Error string `json:"error,omitempty"`
Rows []map[string]string `json:"rows,omitempty"`
Error string `json:"error,omitempty"`
Rows []map[string]any `json:"rows,omitempty"`
}

func (m *mysqlsh) runSQL(ctx context.Context, sql string) (SQLResult, error) {
var stdoutb, stderrb bytes.Buffer

cmd := fmt.Sprintf("session.runSql(\"%s\")", sql)
cmd := fmt.Sprintf("session.runSql(`%s`)", sql)
args := []string{"--uri", m.getURI(), "--js", "--json=raw", "--interactive", "--quiet-start", "2", "-e", cmd}

c := exec.CommandContext(ctx, "mysqlsh", args...)
Expand Down Expand Up @@ -159,7 +159,12 @@ func (m *mysqlsh) getGTIDExecuted(ctx context.Context) (string, error) {
return "", errors.Errorf("unexpected output: %+v", result)
}

return v, nil
s, ok := v.(string)
if !ok {
return "", errors.Errorf("unexpected type: %T", v)
}

return s, nil
}

func (m *mysqlsh) getGTIDPurged(ctx context.Context) (string, error) {
Expand All @@ -173,7 +178,12 @@ func (m *mysqlsh) getGTIDPurged(ctx context.Context) (string, error) {
return "", errors.Errorf("unexpected output: %+v", result)
}

return v, nil
s, ok := v.(string)
if !ok {
return "", errors.Errorf("unexpected type: %T", v)
}

return s, nil
}

func (m *mysqlsh) setGroupSeeds(ctx context.Context, seeds string) error {
Expand Down Expand Up @@ -308,7 +318,7 @@ func (m *mysqlsh) removeInstance(ctx context.Context, instanceDef string, force
return nil
}

func connectToLocal(ctx context.Context, version *v.Version) (*mysqlsh, error) {
func connectToLocal(version *v.Version) (*mysqlsh, error) {
fqdn, err := utils.GetFQDN(os.Getenv("SERVICE_NAME"))
if err != nil {
return nil, errors.Wrap(err, "get FQDN")
Expand All @@ -333,7 +343,7 @@ func connectToCluster(ctx context.Context, peers sets.Set[string], version *v.Ve
}

func handleFullClusterCrash(ctx context.Context, version *v.Version) error {
localShell, err := connectToLocal(ctx, version)
localShell, err := connectToLocal(version)
if err != nil {
return errors.Wrap(err, "connect to local")
}
Expand Down Expand Up @@ -403,7 +413,7 @@ func Bootstrap(ctx context.Context) error {
}
log.Println("mysql-shell version:", mysqlshVer)

localShell, err := connectToLocal(ctx, mysqlshVer)
localShell, err := connectToLocal(mysqlshVer)
if err != nil {
return errors.Wrap(err, "connect to local")
}
Expand Down
42 changes: 29 additions & 13 deletions cmd/bootstrap/gr/recovery_method.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"log"
"strconv"
"strings"

"github.com/pkg/errors"
Expand Down Expand Up @@ -34,7 +33,12 @@ func gtidSubtract(ctx context.Context, shell SQLRunner, a, b string) (string, er
return "", errors.Errorf("unexpected output: %+v", result)
}

return v, nil
s, ok := v.(string)
if !ok {
return "", errors.Errorf("unexpected type: %T", v)
}

return s, nil
}

func gtidSubtractIntersection(ctx context.Context, shell SQLRunner, a, b string) (string, error) {
Expand All @@ -53,7 +57,12 @@ func gtidSubtractIntersection(ctx context.Context, shell SQLRunner, a, b string)
return "", errors.Errorf("unexpected output: %+v", result)
}

return v, nil
s, ok := v.(string)
if !ok {
return "", errors.Errorf("unexpected type: %T", v)
}

return s, nil
}

type GTIDSetRelation string
Expand Down Expand Up @@ -114,25 +123,25 @@ func compareGTIDs(ctx context.Context, shell SQLRunner, a, b string) (GTIDSetRel

// If purged has more gtids than the executed on the replica
// it means some data will not be recoverable
func comparePrimaryPurged(ctx context.Context, shell SQLRunner, purged, executed string) bool {
query := fmt.Sprintf("SELECT GTID_SUBTRACT('%s', '%s') = ''", purged, executed)
func comparePrimaryPurged(ctx context.Context, shell SQLRunner, purged, executed string) (bool, error) {
query := fmt.Sprintf("SELECT GTID_SUBTRACT('%s', '%s') = '' AS is_subset", purged, executed)

result, err := shell.runSQL(ctx, query)
if err != nil {
return false
return false, errors.Wrap(err, "run sql")
}

v, ok := result.Rows[0]["GTID_SUBTRACT"]
v, ok := result.Rows[0]["is_subset"]
if !ok {
return false
return false, errors.Errorf("unexpected output: %+v", result)
}

sub, err := strconv.Atoi(v)
if err != nil {
return false
s, ok := v.(float64)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why float and not int? as far as i remember SELECT GTID_SUBTRACT('%s', '%s') = '' returns 0 or 1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it returns 1 or 0, but json.Unmarshal makes it a float64 for some reason, so v.(int) fails

if !ok {
return false, errors.Errorf("unexpected type: %T", v)
}

return sub == 0
return s == 1, nil
}

func checkReplicaState(ctx context.Context, primary, replica SQLRunner) (innodbcluster.ReplicaGtidState, error) {
Expand Down Expand Up @@ -169,7 +178,14 @@ func checkReplicaState(ctx context.Context, primary, replica SQLRunner) (innodbc
case GTIDSetEqual:
return innodbcluster.ReplicaGtidIdentical, nil
case GTIDSetContains:
if primaryPurged == "" || comparePrimaryPurged(ctx, primary, primaryPurged, replicaExecuted) {
if primaryPurged == "" {
return innodbcluster.ReplicaGtidRecoverable, nil
}
compareRes, err := comparePrimaryPurged(ctx, primary, primaryPurged, replicaExecuted)
if err != nil {
return "", errors.Wrap(err, "compare primary purged")
}
if compareRes {
return innodbcluster.ReplicaGtidRecoverable, nil
}
return innodbcluster.ReplicaGtidIrrecoverable, nil
Expand Down
30 changes: 7 additions & 23 deletions cmd/bootstrap/gr/recovery_method_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ import (
)

type mockSQLRunner struct {
sqlResponses map[string]string
sqlResponses map[string]any
gtidExecuted string
gtidPurged string
}

func newMockSQLRunner() *mockSQLRunner {
return &mockSQLRunner{
sqlResponses: make(map[string]string),
sqlResponses: make(map[string]any),
gtidExecuted: "",
gtidPurged: "",
}
Expand All @@ -27,10 +27,10 @@ func newMockSQLRunner() *mockSQLRunner {
func (m *mockSQLRunner) runSQL(ctx context.Context, sql string) (SQLResult, error) {
if result, ok := m.sqlResponses[sql]; ok {
return SQLResult{
Rows: []map[string]string{
Rows: []map[string]any{
{
"sub": result,
"GTID_SUBTRACT": result, // for comparePrimaryPurged
"sub": result,
"is_subset": result, // for comparePrimaryPurged
},
},
}, nil
Expand Down Expand Up @@ -66,7 +66,6 @@ func TestCompareGTIDs_Equal(t *testing.T) {
mock.setGTIDSubtractResponse("b:1-3", "a:1-5", "")

result, err := compareGTIDs(ctx, mock, "a:1-5", "b:1-3")

if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand All @@ -84,7 +83,6 @@ func TestCompareGTIDs_AContainsB(t *testing.T) {
mock.setGTIDSubtractResponse("a:1-5", "a:1-10", "")

result, err := compareGTIDs(ctx, mock, "a:1-10", "a:1-5")

if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand All @@ -102,7 +100,6 @@ func TestCompareGTIDs_BContainsA(t *testing.T) {
mock.setGTIDSubtractResponse("a:1-10", "a:1-5", "a:6-10")

result, err := compareGTIDs(ctx, mock, "a:1-5", "a:1-10")

if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand All @@ -122,7 +119,6 @@ func TestCompareGTIDs_Disjoint(t *testing.T) {
mock.setGTIDSubtractIntersectionResponse("a:1-5", "b:1-5", "")

result, err := compareGTIDs(ctx, mock, "a:1-5", "b:1-5")

if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand All @@ -142,7 +138,6 @@ func TestCompareGTIDs_Intersects(t *testing.T) {
mock.setGTIDSubtractIntersectionResponse("a:1-10", "a:6-15", "a:6-10")

result, err := compareGTIDs(ctx, mock, "a:1-10", "a:6-15")

if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand All @@ -156,7 +151,6 @@ func TestCompareGTIDs_BothEmpty(t *testing.T) {
mock := newMockSQLRunner()

result, err := compareGTIDs(ctx, mock, "", "")

if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand All @@ -170,7 +164,6 @@ func TestCompareGTIDs_AEmpty(t *testing.T) {
mock := newMockSQLRunner()

result, err := compareGTIDs(ctx, mock, "", "b:1-5")

if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand All @@ -184,7 +177,6 @@ func TestCompareGTIDs_BEmpty(t *testing.T) {
mock := newMockSQLRunner()

result, err := compareGTIDs(ctx, mock, "a:1-5", "")

if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand All @@ -207,7 +199,6 @@ func TestCheckReplicaState_New(t *testing.T) {
replica := newMockSQLRunnerWithGTIDs("", "")

result, err := checkReplicaState(ctx, primary, replica)

if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand All @@ -226,7 +217,6 @@ func TestCheckReplicaState_Identical(t *testing.T) {
primary.setGTIDSubtractResponse("a:1-10", "a:1-10", "")

result, err := checkReplicaState(ctx, primary, replica)

if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand All @@ -245,7 +235,6 @@ func TestCheckReplicaState_Recoverable_NoPurged(t *testing.T) {
primary.setGTIDSubtractResponse("a:1-5", "a:1-10", "")

result, err := checkReplicaState(ctx, primary, replica)

if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand All @@ -264,10 +253,9 @@ func TestCheckReplicaState_Recoverable_WithPurgedButOK(t *testing.T) {
primary.setGTIDSubtractResponse("a:1-5", "a:1-10", "")

// comparePrimaryPurged check - purged is subset of replica executed
primary.sqlResponses["SELECT GTID_SUBTRACT('a:1-3', 'a:1-5') = ''"] = "0"
primary.sqlResponses["SELECT GTID_SUBTRACT('a:1-3', 'a:1-5') = '' AS is_subset"] = float64(1.0)

result, err := checkReplicaState(ctx, primary, replica)

if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand All @@ -286,10 +274,9 @@ func TestCheckReplicaState_Irrecoverable(t *testing.T) {
primary.setGTIDSubtractResponse("a:1-5", "a:1-10", "")

// comparePrimaryPurged check fails - purged has more than replica executed
primary.sqlResponses["SELECT GTID_SUBTRACT('a:1-8', 'a:1-5') = ''"] = "1"
primary.sqlResponses["SELECT GTID_SUBTRACT('a:1-8', 'a:1-5') = '' AS is_subset"] = float64(0.0)

result, err := checkReplicaState(ctx, primary, replica)

if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand All @@ -309,7 +296,6 @@ func TestCheckReplicaState_Diverged_Intersects(t *testing.T) {
primary.setGTIDSubtractIntersectionResponse("a:1-10", "a:5-15", "a:5-10")

result, err := checkReplicaState(ctx, primary, replica)

if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand All @@ -329,7 +315,6 @@ func TestCheckReplicaState_Diverged_Disjoint(t *testing.T) {
primary.setGTIDSubtractIntersectionResponse("a:1-5", "b:1-5", "")

result, err := checkReplicaState(ctx, primary, replica)

if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand All @@ -348,7 +333,6 @@ func TestCheckReplicaState_Diverged_Contained(t *testing.T) {
primary.setGTIDSubtractResponse("a:1-10", "a:1-5", "a:6-10")

result, err := checkReplicaState(ctx, primary, replica)

if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand Down
Loading