Skip to content

Commit

Permalink
add MemberDowngradeUpgrade failpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Siyuan Zhang <[email protected]>
  • Loading branch information
siyuanfoundation committed Jan 3, 2025
1 parent fce823a commit c6e3a18
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 25 deletions.
140 changes: 115 additions & 25 deletions tests/robustness/failpoint/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ import (
)

var (
MemberReplace Failpoint = memberReplace{}
MemberDowngrade Failpoint = memberDowngrade{}
MemberReplace Failpoint = memberReplace{}
MemberDowngrade Failpoint = memberDowngrade{}
MemberDowngradeUpgrade Failpoint = memberDowngradeUpgrade{}
)

type memberReplace struct{}
Expand Down Expand Up @@ -148,14 +149,12 @@ func (f memberReplace) Available(config e2e.EtcdProcessClusterConfig, member e2e
type memberDowngrade struct{}

func (f memberDowngrade) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) {
v, err := e2e.GetVersionFromBinary(e2e.BinPath.Etcd)
currentVersion, err := e2e.GetVersionFromBinary(e2e.BinPath.Etcd)
if err != nil {
return nil, err
}
targetVersion := semver.Version{Major: v.Major, Minor: v.Minor - 1}
lastVersion := &semver.Version{Major: currentVersion.Major, Minor: currentVersion.Minor - 1}
numberOfMembersToDowngrade := rand.Int()%len(clus.Procs) + 1
membersToDowngrade := rand.Perm(len(clus.Procs))[:numberOfMembersToDowngrade]
lg.Info("Test downgrading members", zap.Any("members", membersToDowngrade))

member := clus.Procs[0]
endpoints := []string{member.EndpointsGRPC()[0]}
Expand All @@ -173,27 +172,12 @@ func (f memberDowngrade) Inject(ctx context.Context, t *testing.T, lg *zap.Logge
// Need to wait health interval for cluster to accept changes
time.Sleep(etcdserver.HealthInterval)
lg.Info("Enable downgrade")
err = enableDowngrade(ctx, cc, &targetVersion)
err = enableDowngrade(ctx, cc, lastVersion)
if err != nil {
return nil, err
}
// Need to wait health interval for cluster to prepare for downgrade
time.Sleep(etcdserver.HealthInterval)

for _, memberID := range membersToDowngrade {
member = clus.Procs[memberID]
lg.Info("Downgrading member", zap.String("member", member.Config().Name))
if err = member.Stop(); err != nil {
return nil, err
}
member.Config().ExecPath = e2e.BinPath.EtcdLastRelease
lg.Info("Restarting member", zap.String("member", member.Config().Name))
err = member.Start(ctx)
if err != nil {
return nil, err
}
err = verifyVersion(t, clus, member, targetVersion)
}
err = downgradeUpgradeMembers(ctx, t, lg, clus, numberOfMembersToDowngrade, currentVersion, lastVersion)
time.Sleep(etcdserver.HealthInterval)
return nil, err
}
Expand All @@ -215,6 +199,64 @@ func (f memberDowngrade) Available(config e2e.EtcdProcessClusterConfig, member e
return v.Compare(v3_6) >= 0 && (config.Version == e2e.CurrentVersion && member.Config().ExecPath == e2e.BinPath.Etcd)
}

type memberDowngradeUpgrade struct{}

func (f memberDowngradeUpgrade) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) {
currentVersion, err := e2e.GetVersionFromBinary(e2e.BinPath.Etcd)
if err != nil {
return nil, err
}
lastVersion := &semver.Version{Major: currentVersion.Major, Minor: currentVersion.Minor - 1}

member := clus.Procs[0]
endpoints := []string{member.EndpointsGRPC()[0]}
cc, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
Logger: zap.NewNop(),
DialKeepAliveTime: 10 * time.Second,
DialKeepAliveTimeout: 100 * time.Millisecond,
})
if err != nil {
return nil, err
}
defer cc.Close()

// Need to wait health interval for cluster to accept changes
time.Sleep(etcdserver.HealthInterval)
lg.Info("Enable downgrade")
err = enableDowngrade(ctx, cc, lastVersion)
if err != nil {
return nil, err
}
// downgrade all members first
err = downgradeUpgradeMembers(ctx, t, lg, clus, len(clus.Procs), currentVersion, lastVersion)
if err != nil {
return nil, err
}
// partial upgrade the cluster
numberOfMembersToUpgrade := rand.Int()%len(clus.Procs) + 1
err = downgradeUpgradeMembers(ctx, t, lg, clus, numberOfMembersToUpgrade, lastVersion, currentVersion)
time.Sleep(etcdserver.HealthInterval)
return nil, err
}

func (f memberDowngradeUpgrade) Name() string {
return "MemberDowngradeUpgrade"
}

func (f memberDowngradeUpgrade) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess, profile traffic.Profile) bool {
if !fileutil.Exist(e2e.BinPath.EtcdLastRelease) {
return false
}
v, err := e2e.GetVersionFromBinary(e2e.BinPath.Etcd)
if err != nil {
panic("Failed checking etcd version binary")
}
v3_6 := semver.Version{Major: 3, Minor: 6}
// only current version cluster can be downgraded.
return v.Compare(v3_6) >= 0 && (config.Version == e2e.CurrentVersion && member.Config().ExecPath == e2e.BinPath.Etcd)
}

func getID(ctx context.Context, cc *clientv3.Client, name string) (id uint64, found bool, err error) {
// Ensure linearized MemberList by first making a linearized Get request from the same member.
// This is required for v3.4 support as it doesn't support linearized MemberList https://github.com/etcd-io/etcd/issues/18929
Expand Down Expand Up @@ -257,9 +299,57 @@ func enableDowngrade(ctx context.Context, cc *clientv3.Client, targetVersion *se
return err
}

func verifyVersion(t *testing.T, clus *e2e.EtcdProcessCluster, member e2e.EtcdProcess, expectedVersion semver.Version) error {
func downgradeUpgradeMembers(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, numberOfMembersToChange int, currentVersion, targetVersion *semver.Version) error {
isDowngrade := targetVersion.LessThan(*currentVersion)
opString := "upgrading"
newExecPath := e2e.BinPath.Etcd
if isDowngrade {
opString = "downgrading"
newExecPath = e2e.BinPath.EtcdLastRelease
}
membersToChange := rand.Perm(len(clus.Procs))[:numberOfMembersToChange]
lg.Info(fmt.Sprintf("Test %s members", opString), zap.Any("members", membersToChange))

// Need to wait health interval for cluster to prepare for downgrade/upgrade
time.Sleep(etcdserver.HealthInterval)

membersChanged := 0
for _, memberID := range membersToChange {
member := clus.Procs[memberID]
if member.Config().ExecPath == newExecPath {
return fmt.Errorf("member:%s is already running with the %s target binary - %s", member.Config().Name, opString, member.Config().ExecPath)
}
lg.Info(fmt.Sprintf("%s member", opString), zap.String("member", member.Config().Name))
if err := member.Stop(); err != nil {
return err
}
member.Config().ExecPath = newExecPath
lg.Info("Restarting member", zap.String("member", member.Config().Name))
err := member.Start(ctx)
if err != nil {
return err
}
membersChanged++
if isDowngrade {
err = verifyVersion(t, clus, member, targetVersion, targetVersion)
} else {
// for upgrade, the cluster version will be changed to the higher version if all the members have finished upgrading.
if membersChanged == len(clus.Procs) {
err = verifyVersion(t, clus, member, targetVersion, targetVersion)
} else {
err = verifyVersion(t, clus, member, targetVersion, currentVersion)
}
}
if err != nil {
return err
}
}
return nil
}

func verifyVersion(t *testing.T, clus *e2e.EtcdProcessCluster, member e2e.EtcdProcess, expectedServerVersion, expectedClusterVersion *semver.Version) error {
var err error
expected := fmt.Sprintf(`"etcdserver":"%d.%d\..*"etcdcluster":"%d\.%d\.`, expectedVersion.Major, expectedVersion.Minor, expectedVersion.Major, expectedVersion.Minor)
expected := fmt.Sprintf(`"etcdserver":"%d.%d\..*"etcdcluster":"%d\.%d\.`, expectedServerVersion.Major, expectedServerVersion.Minor, expectedClusterVersion.Major, expectedClusterVersion.Minor)
for i := 0; i < 35; i++ {
if err = e2e.CURLGetFromMember(clus, member, e2e.CURLReq{Endpoint: "/version", Expected: expect.ExpectedResponse{Value: expected, IsRegularExpr: true}}); err != nil {
t.Logf("#%d: v3 is not ready yet (%v)", i, err)
Expand Down
1 change: 1 addition & 0 deletions tests/robustness/failpoint/failpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ var allFailpoints = []Failpoint{
BeforeApplyOneConfChangeSleep,
MemberReplace,
MemberDowngrade,
MemberDowngradeUpgrade,
DropPeerNetwork,
RaftBeforeSaveSleep,
RaftAfterSaveSleep,
Expand Down

0 comments on commit c6e3a18

Please sign in to comment.