Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdserver: Move version monitor logic to separate module #13132

Merged
merged 1 commit into from
Jun 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions server/etcdserver/adapters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package etcdserver

import (
"context"

"github.com/coreos/go-semver/semver"
"go.uber.org/zap"

"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
serverversion "go.etcd.io/etcd/server/v3/etcdserver/version"
)

// serverVersionAdapter implements Server interface needed by serverversion.Monitor
type serverVersionAdapter struct {
*EtcdServer
}

var _ serverversion.Server = (*serverVersionAdapter)(nil)

func (s *serverVersionAdapter) UpdateClusterVersion(version string) {
// TODO switch to updateClusterVersionV3 in 3.6
s.GoAttach(func() { s.updateClusterVersionV2(version) })
}

func (s *serverVersionAdapter) DowngradeCancel() {
ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
if _, err := s.downgradeCancel(ctx); err != nil {
s.lg.Warn("failed to cancel downgrade", zap.Error(err))
}
cancel()
}

func (s *serverVersionAdapter) GetClusterVersion() *semver.Version {
return s.cluster.Version()
}

func (s *serverVersionAdapter) GetDowngradeInfo() *membership.DowngradeInfo {
return s.cluster.DowngradeInfo()
}

func (s *serverVersionAdapter) GetVersions() map[string]*version.Versions {
return getVersions(s.lg, s.cluster, s.id, s.peerRt)
}
67 changes: 0 additions & 67 deletions server/etcdserver/cluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,44 +161,6 @@ func getVersions(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt
return vers
}

// decideClusterVersion decides the cluster version based on the versions map.
// The returned version is the min server version in the map, or nil if the min
// version in unknown.
func decideClusterVersion(lg *zap.Logger, vers map[string]*version.Versions) *semver.Version {
var cv *semver.Version
lv := semver.Must(semver.NewVersion(version.Version))

for mid, ver := range vers {
if ver == nil {
return nil
}
v, err := semver.NewVersion(ver.Server)
if err != nil {
lg.Warn(
"failed to parse server version of remote member",
zap.String("remote-member-id", mid),
zap.String("remote-member-version", ver.Server),
zap.Error(err),
)
return nil
}
if lv.LessThan(*v) {
lg.Warn(
"leader found higher-versioned member",
zap.String("local-member-version", lv.String()),
zap.String("remote-member-id", mid),
zap.String("remote-member-version", ver.Server),
)
}
if cv == nil {
cv = v
} else if v.LessThan(*cv) {
cv = v
}
}
return cv
}

// allowedVersionRange decides the available version range of the cluster that local server can join in;
// if the downgrade enabled status is true, the version window is [oneMinorHigher, oneMinorHigher]
// if the downgrade is not enabled, the version window is [MinClusterVersion, localVersion]
Expand Down Expand Up @@ -438,35 +400,6 @@ func getDowngradeEnabled(lg *zap.Logger, m *membership.Member, rt http.RoundTrip
return false, err
}

// isMatchedVersions returns true if all server versions are equal to target version, otherwise return false.
// It can be used to decide the whether the cluster finishes downgrading to target version.
func isMatchedVersions(lg *zap.Logger, targetVersion *semver.Version, vers map[string]*version.Versions) bool {
for mid, ver := range vers {
if ver == nil {
return false
}
v, err := semver.NewVersion(ver.Cluster)
if err != nil {
lg.Warn(
"failed to parse server version of remote member",
zap.String("remote-member-id", mid),
zap.String("remote-member-version", ver.Server),
zap.Error(err),
)
return false
}
if !targetVersion.Equal(*v) {
lg.Warn("remotes server has mismatching etcd version",
zap.String("remote-member-id", mid),
zap.String("current-server-version", v.String()),
zap.String("target-version", targetVersion.String()),
)
return false
}
}
return true
}

func convertToClusterVersion(v string) (*semver.Version, error) {
ver, err := semver.NewVersion(v)
if err != nil {
Expand Down
86 changes: 0 additions & 86 deletions server/etcdserver/cluster_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package etcdserver

import (
"reflect"
"testing"

"go.etcd.io/etcd/api/v3/version"
Expand All @@ -27,42 +26,6 @@ import (

var testLogger = zap.NewExample()

func TestDecideClusterVersion(t *testing.T) {
tests := []struct {
vers map[string]*version.Versions
wdver *semver.Version
}{
{
map[string]*version.Versions{"a": {Server: "2.0.0"}},
semver.Must(semver.NewVersion("2.0.0")),
},
// unknown
{
map[string]*version.Versions{"a": nil},
nil,
},
{
map[string]*version.Versions{"a": {Server: "2.0.0"}, "b": {Server: "2.1.0"}, "c": {Server: "2.1.0"}},
semver.Must(semver.NewVersion("2.0.0")),
},
{
map[string]*version.Versions{"a": {Server: "2.1.0"}, "b": {Server: "2.1.0"}, "c": {Server: "2.1.0"}},
semver.Must(semver.NewVersion("2.1.0")),
},
{
map[string]*version.Versions{"a": nil, "b": {Server: "2.1.0"}, "c": {Server: "2.1.0"}},
nil,
},
}

for i, tt := range tests {
dver := decideClusterVersion(testLogger, tt.vers)
if !reflect.DeepEqual(dver, tt.wdver) {
t.Errorf("#%d: ver = %+v, want %+v", i, dver, tt.wdver)
}
}
}

func TestIsCompatibleWithVers(t *testing.T) {
tests := []struct {
vers map[string]*version.Versions
Expand Down Expand Up @@ -215,52 +178,3 @@ func TestDecideAllowedVersionRange(t *testing.T) {
})
}
}

func TestIsMatchedVersions(t *testing.T) {
tests := []struct {
name string
targetVersion *semver.Version
versionMap map[string]*version.Versions
expectedFinished bool
}{
{
"When downgrade finished",
&semver.Version{Major: 3, Minor: 4},
map[string]*version.Versions{
"mem1": {Server: "3.4.1", Cluster: "3.4.0"},
"mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"},
"mem3": {Server: "3.4.2", Cluster: "3.4.0"},
},
true,
},
{
"When cannot parse peer version",
&semver.Version{Major: 3, Minor: 4},
map[string]*version.Versions{
"mem1": {Server: "3.4.1", Cluster: "3.4"},
"mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"},
"mem3": {Server: "3.4.2", Cluster: "3.4.0"},
},
false,
},
{
"When downgrade not finished",
&semver.Version{Major: 3, Minor: 4},
map[string]*version.Versions{
"mem1": {Server: "3.4.1", Cluster: "3.4.0"},
"mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"},
"mem3": {Server: "3.5.2", Cluster: "3.5.0"},
},
false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actual := isMatchedVersions(zap.NewNop(), tt.targetVersion, tt.versionMap)
if actual != tt.expectedFinished {
t.Errorf("expected downgrade finished is %v; got %v", tt.expectedFinished, actual)
}
})
}
}
54 changes: 7 additions & 47 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api/v3alarm"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3compactor"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
serverversion "go.etcd.io/etcd/server/v3/etcdserver/version"
"go.etcd.io/etcd/server/v3/lease"
"go.etcd.io/etcd/server/v3/lease/leasehttp"
"go.etcd.io/etcd/server/v3/mvcc"
Expand Down Expand Up @@ -2430,12 +2431,9 @@ func (s *EtcdServer) ClusterVersion() *semver.Version {
return s.cluster.Version()
}

// monitorVersions checks the member's version every monitorVersionInterval.
// It updates the cluster version if all members agrees on a higher one.
// It prints out log if there is a member with a higher version than the
// local version.
// TODO switch to updateClusterVersionV3 in 3.6
// monitorVersions every monitorVersionInterval checks if it's the leader and updates cluster version if needed.
func (s *EtcdServer) monitorVersions() {
monitor := serverversion.NewMonitor(s.Logger(), &serverVersionAdapter{s})
for {
select {
case <-s.FirstCommitInTermNotify():
Expand All @@ -2447,31 +2445,7 @@ func (s *EtcdServer) monitorVersions() {
if s.Leader() != s.ID() {
continue
}

v := decideClusterVersion(s.Logger(), getVersions(s.Logger(), s.cluster, s.id, s.peerRt))
if v != nil {
// only keep major.minor version for comparison
v = &semver.Version{
Major: v.Major,
Minor: v.Minor,
}
}

// if the current version is nil:
// 1. use the decided version if possible
// 2. or use the min cluster version
if s.cluster.Version() == nil {
verStr := version.MinClusterVersion
if v != nil {
verStr = v.String()
}
s.GoAttach(func() { s.updateClusterVersionV2(verStr) })
continue
}

if v != nil && membership.IsValidVersionChange(s.cluster.Version(), v) {
s.GoAttach(func() { s.updateClusterVersionV2(v.String()) })
}
monitor.UpdateClusterVersionIfNeeded()
}
}

Expand Down Expand Up @@ -2551,12 +2525,13 @@ func (s *EtcdServer) updateClusterVersionV3(ver string) {
}
}

// monitorDowngrade every DowngradeCheckTime checks if it's the leader and cancels downgrade if needed.
func (s *EtcdServer) monitorDowngrade() {
monitor := serverversion.NewMonitor(s.Logger(), &serverVersionAdapter{s})
t := s.Cfg.DowngradeCheckTime
if t == 0 {
return
}
lg := s.Logger()
for {
select {
case <-time.After(t):
Expand All @@ -2567,22 +2542,7 @@ func (s *EtcdServer) monitorDowngrade() {
if !s.isLeader() {
continue
}

d := s.cluster.DowngradeInfo()
if !d.Enabled {
continue
}

targetVersion := d.TargetVersion
v := semver.Must(semver.NewVersion(targetVersion))
if isMatchedVersions(s.Logger(), v, getVersions(s.Logger(), s.cluster, s.id, s.peerRt)) {
lg.Info("the cluster has been downgraded", zap.String("cluster-version", targetVersion))
ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
if _, err := s.downgradeCancel(ctx); err != nil {
lg.Warn("failed to cancel downgrade", zap.Error(err))
}
cancel()
}
monitor.CancelDowngradeIfNeeded()
}
}

Expand Down
Loading