Skip to content

Commit

Permalink
Merge pull request #11801 from YoyinZyc/downgrade-server
Browse files Browse the repository at this point in the history
[Etcd downgrade] Implement downgrade validate, enable and cancel
  • Loading branch information
gyuho authored May 18, 2020
2 parents 52edb7d + d230e6b commit 5e2815e
Show file tree
Hide file tree
Showing 7 changed files with 211 additions and 20 deletions.
18 changes: 18 additions & 0 deletions etcdserver/api/v3rpc/rpctypes/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ var (
ErrGPRCNotSupportedForLearner = status.New(codes.Unavailable, "etcdserver: rpc not supported for learner").Err()
ErrGRPCBadLeaderTransferee = status.New(codes.FailedPrecondition, "etcdserver: bad leader transferee").Err()

ErrGRPCClusterVersionUnavailable = status.New(codes.Unavailable, "etcdserver: cluster version not found during downgrade").Err()
ErrGRPCWrongDowngradeVersionFormat = status.New(codes.InvalidArgument, "etcdserver: wrong downgrade target version format").Err()
ErrGRPCInvalidDowngradeTargetVersion = status.New(codes.InvalidArgument, "etcdserver: invalid downgrade target version").Err()
ErrGRPCDowngradeInProcess = status.New(codes.FailedPrecondition, "etcdserver: cluster has a downgrade job in progress").Err()
ErrGRPCNoInflightDowngrade = status.New(codes.FailedPrecondition, "etcdserver: no inflight downgrade job").Err()

errStringToError = map[string]error{
ErrorDesc(ErrGRPCEmptyKey): ErrGRPCEmptyKey,
ErrorDesc(ErrGRPCKeyNotFound): ErrGRPCKeyNotFound,
Expand Down Expand Up @@ -132,6 +138,12 @@ var (
ErrorDesc(ErrGRPCCorrupt): ErrGRPCCorrupt,
ErrorDesc(ErrGPRCNotSupportedForLearner): ErrGPRCNotSupportedForLearner,
ErrorDesc(ErrGRPCBadLeaderTransferee): ErrGRPCBadLeaderTransferee,

ErrorDesc(ErrGRPCClusterVersionUnavailable): ErrGRPCClusterVersionUnavailable,
ErrorDesc(ErrGRPCWrongDowngradeVersionFormat): ErrGRPCWrongDowngradeVersionFormat,
ErrorDesc(ErrGRPCInvalidDowngradeTargetVersion): ErrGRPCInvalidDowngradeTargetVersion,
ErrorDesc(ErrGRPCDowngradeInProcess): ErrGRPCDowngradeInProcess,
ErrorDesc(ErrGRPCNoInflightDowngrade): ErrGRPCNoInflightDowngrade,
}
)

Expand Down Expand Up @@ -190,6 +202,12 @@ var (
ErrUnhealthy = Error(ErrGRPCUnhealthy)
ErrCorrupt = Error(ErrGRPCCorrupt)
ErrBadLeaderTransferee = Error(ErrGRPCBadLeaderTransferee)

ErrClusterVersionUnavailable = Error(ErrGRPCClusterVersionUnavailable)
ErrWrongDowngradeVersionFormat = Error(ErrGRPCWrongDowngradeVersionFormat)
ErrInvalidDowngradeTargetVersion = Error(ErrGRPCInvalidDowngradeTargetVersion)
ErrDowngradeInProcess = Error(ErrGRPCDowngradeInProcess)
ErrNoInflightDowngrade = Error(ErrGRPCNoInflightDowngrade)
)

// EtcdError defines gRPC server errors.
Expand Down
6 changes: 6 additions & 0 deletions etcdserver/api/v3rpc/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ var toGRPCErrorMap = map[error]error{
etcdserver.ErrCorrupt: rpctypes.ErrGRPCCorrupt,
etcdserver.ErrBadLeaderTransferee: rpctypes.ErrGRPCBadLeaderTransferee,

etcdserver.ErrClusterVersionUnavailable: rpctypes.ErrGRPCClusterVersionUnavailable,
etcdserver.ErrWrongDowngradeVersionFormat: rpctypes.ErrGRPCWrongDowngradeVersionFormat,
etcdserver.ErrInvalidDowngradeTargetVersion: rpctypes.ErrGRPCInvalidDowngradeTargetVersion,
etcdserver.ErrDowngradeInProcess: rpctypes.ErrGRPCDowngradeInProcess,
etcdserver.ErrNoInflightDowngrade: rpctypes.ErrGRPCNoInflightDowngrade,

lease.ErrLeaseNotFound: rpctypes.ErrGRPCLeaseNotFound,
lease.ErrLeaseExists: rpctypes.ErrGRPCLeaseExist,
lease.ErrLeaseTTLTooLarge: rpctypes.ErrGRPCLeaseTTLTooLarge,
Expand Down
11 changes: 11 additions & 0 deletions etcdserver/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type applyResult struct {
type applierV3Internal interface {
ClusterVersionSet(r *membershippb.ClusterVersionSetRequest)
ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest)
DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest)
}

// applierV3 is the interface for processing V3 raft messages
Expand Down Expand Up @@ -195,6 +196,8 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
a.s.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet)
case r.ClusterMemberAttrSet != nil:
a.s.applyV3Internal.ClusterMemberAttrSet(r.ClusterMemberAttrSet)
case r.DowngradeInfoSet != nil:
a.s.applyV3Internal.DowngradeInfoSet(r.DowngradeInfoSet)
default:
panic("not implemented")
}
Expand Down Expand Up @@ -882,6 +885,14 @@ func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAtt
)
}

func (a *applierV3backend) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest) {
d := membership.DowngradeInfo{Enabled: false}
if r.Enabled {
d = membership.DowngradeInfo{Enabled: true, TargetVersion: r.Ver}
}
a.s.cluster.SetDowngradeInfo(&d)
}

type quotaApplierV3 struct {
applierV3
q Quota
Expand Down
19 changes: 19 additions & 0 deletions etcdserver/cluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,3 +355,22 @@ func promoteMemberHTTP(ctx context.Context, url string, id uint64, peerRt http.R
}
return membs, nil
}

func convertToClusterVersion(v string) (*semver.Version, error) {
ver, err := semver.NewVersion(v)
if err != nil {
// allow input version format Major.Minor
ver, err = semver.NewVersion(v + ".0")
if err != nil {
return nil, ErrWrongDowngradeVersionFormat
}
}
// cluster version only keeps major.minor, remove patch version
ver = &semver.Version{Major: ver.Major, Minor: ver.Minor}
return ver, nil
}

// Todo: handle the case that downgrading from higher major version(e.g. downgrade from v4.0 to v3.x)
func allowedDowngradeVersion(ver *semver.Version) *semver.Version {
return &semver.Version{Major: ver.Major, Minor: ver.Minor - 1}
}
43 changes: 43 additions & 0 deletions etcdserver/cluster_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,46 @@ func TestIsCompatibleWithVers(t *testing.T) {
}
}
}

func TestConvertToClusterVersion(t *testing.T) {
tests := []struct {
name string
inputVerStr string
expectedVer string
hasError bool
}{
{
"Succeeded: Major.Minor.Patch",
"3.4.2",
"3.4.0",
false,
},
{
"Succeeded: Major.Minor",
"3.4",
"3.4.0",
false,
},
{
"Failed: wrong version format",
"3*.9",
"",
true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ver, err := convertToClusterVersion(tt.inputVerStr)
hasError := err != nil
if hasError != tt.hasError {
t.Errorf("Expected error status is %v; Got %v", tt.hasError, err)
}
if tt.hasError {
return
}
if ver == nil || tt.expectedVer != ver.String() {
t.Errorf("Expected output cluster version is %v; Got %v", tt.expectedVer, ver)
}
})
}
}
43 changes: 24 additions & 19 deletions etcdserver/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,30 @@ import (
)

var (
ErrUnknownMethod = errors.New("etcdserver: unknown method")
ErrStopped = errors.New("etcdserver: server stopped")
ErrCanceled = errors.New("etcdserver: request cancelled")
ErrTimeout = errors.New("etcdserver: request timed out")
ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure")
ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost")
ErrTimeoutLeaderTransfer = errors.New("etcdserver: request timed out, leader transfer took too long")
ErrLeaderChanged = errors.New("etcdserver: leader changed")
ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members")
ErrLearnerNotReady = errors.New("etcdserver: can only promote a learner member which is in sync with leader")
ErrNoLeader = errors.New("etcdserver: no leader")
ErrNotLeader = errors.New("etcdserver: not leader")
ErrRequestTooLarge = errors.New("etcdserver: request is too large")
ErrNoSpace = errors.New("etcdserver: no space")
ErrTooManyRequests = errors.New("etcdserver: too many requests")
ErrUnhealthy = errors.New("etcdserver: unhealthy cluster")
ErrKeyNotFound = errors.New("etcdserver: key not found")
ErrCorrupt = errors.New("etcdserver: corrupt cluster")
ErrBadLeaderTransferee = errors.New("etcdserver: bad leader transferee")
ErrUnknownMethod = errors.New("etcdserver: unknown method")
ErrStopped = errors.New("etcdserver: server stopped")
ErrCanceled = errors.New("etcdserver: request cancelled")
ErrTimeout = errors.New("etcdserver: request timed out")
ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure")
ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost")
ErrTimeoutLeaderTransfer = errors.New("etcdserver: request timed out, leader transfer took too long")
ErrLeaderChanged = errors.New("etcdserver: leader changed")
ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members")
ErrLearnerNotReady = errors.New("etcdserver: can only promote a learner member which is in sync with leader")
ErrNoLeader = errors.New("etcdserver: no leader")
ErrNotLeader = errors.New("etcdserver: not leader")
ErrRequestTooLarge = errors.New("etcdserver: request is too large")
ErrNoSpace = errors.New("etcdserver: no space")
ErrTooManyRequests = errors.New("etcdserver: too many requests")
ErrUnhealthy = errors.New("etcdserver: unhealthy cluster")
ErrKeyNotFound = errors.New("etcdserver: key not found")
ErrCorrupt = errors.New("etcdserver: corrupt cluster")
ErrBadLeaderTransferee = errors.New("etcdserver: bad leader transferee")
ErrClusterVersionUnavailable = errors.New("etcdserver: cluster version not found during downgrade")
ErrWrongDowngradeVersionFormat = errors.New("etcdserver: wrong downgrade target version format")
ErrInvalidDowngradeTargetVersion = errors.New("etcdserver: invalid downgrade target version")
ErrDowngradeInProcess = errors.New("etcdserver: cluster has a downgrade job in progress")
ErrNoInflightDowngrade = errors.New("etcdserver: no inflight downgrade job")
)

type DiscoveryError struct {
Expand Down
91 changes: 90 additions & 1 deletion etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"go.etcd.io/etcd/v3/auth"
"go.etcd.io/etcd/v3/etcdserver/api/membership"
"go.etcd.io/etcd/v3/etcdserver/api/membership/membershippb"
pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb"
"go.etcd.io/etcd/v3/lease"
"go.etcd.io/etcd/v3/lease/leasehttp"
Expand Down Expand Up @@ -806,5 +807,93 @@ func (s *EtcdServer) AuthInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error
}

func (s *EtcdServer) Downgrade(ctx context.Context, r *pb.DowngradeRequest) (*pb.DowngradeResponse, error) {
return nil, nil
switch r.Action {
case pb.DowngradeRequest_VALIDATE:
return s.downgradeValidate(ctx, r.Version)
case pb.DowngradeRequest_ENABLE:
return s.downgradeEnable(ctx, r)
case pb.DowngradeRequest_CANCEL:
return s.downgradeCancel(ctx)
default:
return nil, ErrUnknownMethod
}
}

func (s *EtcdServer) downgradeValidate(ctx context.Context, v string) (*pb.DowngradeResponse, error) {
resp := &pb.DowngradeResponse{}

targetVersion, err := convertToClusterVersion(v)
if err != nil {
return nil, err
}

// gets leaders commit index and wait for local store to finish applying that index
// to avoid using stale downgrade information
err = s.linearizableReadNotify(ctx)
if err != nil {
return nil, err
}

cv := s.ClusterVersion()
if cv == nil {
return nil, ErrClusterVersionUnavailable
}
resp.Version = cv.String()

allowedTargetVersion := allowedDowngradeVersion(cv)
if !targetVersion.Equal(*allowedTargetVersion) {
return nil, ErrInvalidDowngradeTargetVersion
}

downgradeInfo := s.cluster.DowngradeInfo()
if downgradeInfo.Enabled {
// Todo: return the downgrade status along with the error msg
return nil, ErrDowngradeInProcess
}
return resp, nil
}

func (s *EtcdServer) downgradeEnable(ctx context.Context, r *pb.DowngradeRequest) (*pb.DowngradeResponse, error) {
// validate downgrade capability before starting downgrade
v := r.Version
lg := s.getLogger()
if resp, err := s.downgradeValidate(ctx, v); err != nil {
lg.Warn("reject downgrade request", zap.Error(err))
return resp, err
}
targetVersion, err := convertToClusterVersion(v)
if err != nil {
lg.Warn("reject downgrade request", zap.Error(err))
return nil, err
}

raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: true, Ver: targetVersion.String()}
_, err = s.raftRequest(ctx, pb.InternalRaftRequest{DowngradeInfoSet: &raftRequest})
if err != nil {
lg.Warn("reject downgrade request", zap.Error(err))
return nil, err
}
resp := pb.DowngradeResponse{Version: s.ClusterVersion().String()}
return &resp, nil
}

func (s *EtcdServer) downgradeCancel(ctx context.Context) (*pb.DowngradeResponse, error) {
// gets leaders commit index and wait for local store to finish applying that index
// to avoid using stale downgrade information
if err := s.linearizableReadNotify(ctx); err != nil {
return nil, err
}

downgradeInfo := s.cluster.DowngradeInfo()
if !downgradeInfo.Enabled {
return nil, ErrNoInflightDowngrade
}

raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: false}
_, err := s.raftRequest(ctx, pb.InternalRaftRequest{DowngradeInfoSet: &raftRequest})
if err != nil {
return nil, err
}
resp := pb.DowngradeResponse{Version: s.ClusterVersion().String()}
return &resp, nil
}

0 comments on commit 5e2815e

Please sign in to comment.