Skip to content

Commit

Permalink
Create a v2 snapshot when running etcdutl migrate command
Browse files Browse the repository at this point in the history
Also added test to cover the etcdutl migrate command

Signed-off-by: Benjamin Wang <[email protected]>
  • Loading branch information
ahrtr committed Jan 13, 2025
1 parent a03bdaa commit acffdb5
Show file tree
Hide file tree
Showing 4 changed files with 256 additions and 45 deletions.
88 changes: 88 additions & 0 deletions etcdutl/etcdutl/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,19 @@ package etcdutl

import (
"errors"
"fmt"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.etcd.io/etcd/client/pkg/v3/logutil"
"go.etcd.io/etcd/pkg/v3/cobrautl"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/datadir"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/etcd/server/v3/storage/wal"
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
"go.etcd.io/raft/v3/raftpb"
Expand Down Expand Up @@ -68,3 +73,86 @@ func getLatestV2Snapshot(lg *zap.Logger, dataDir string) (*raftpb.Snapshot, erro

return snapshot, nil
}

func createV2SnapshotFromV3Store(dataDir string, be backend.Backend) error {
var (
lg = GetLogger()

Check warning on line 79 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L77-L79

Added lines #L77 - L79 were not covered by tests

snapDir = datadir.ToSnapDir(dataDir)
walDir = datadir.ToWALDir(dataDir)
)

Check warning on line 83 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L81-L83

Added lines #L81 - L83 were not covered by tests

ci, term := schema.ReadConsistentIndex(be.ReadTx())

Check warning on line 85 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L85

Added line #L85 was not covered by tests

cl := membership.NewCluster(lg)
cl.SetBackend(schema.NewMembershipBackend(lg, be))
cl.UnsafeLoad()

Check warning on line 89 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L87-L89

Added lines #L87 - L89 were not covered by tests

latestWALSnap, err := getLatestWALSnap(lg, dataDir)
if err != nil {
return err

Check warning on line 93 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L91-L93

Added lines #L91 - L93 were not covered by tests
}

// Each time before creating the v2 snapshot, etcdserve always flush
// the backend storage (bbolt db), so the consistent index should never
// less than the Index or term of the latest snapshot.
if ci < latestWALSnap.Index || term < latestWALSnap.Term {

Check warning on line 99 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L99

Added line #L99 was not covered by tests
// This should never happen
return fmt.Errorf("consistent_index [Index: %d, Term: %d] is less than the latest snapshot [Index: %d, Term: %d]", ci, term, latestWALSnap.Index, latestWALSnap.Term)

Check warning on line 101 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L101

Added line #L101 was not covered by tests
}

if ci == latestWALSnap.Index {
lg.Info("The latest snapshot is already up to date", zap.Uint64("consistent_index", ci))
return nil

Check warning on line 106 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L104-L106

Added lines #L104 - L106 were not covered by tests
}

voters, learners := getVotersAndLearners(cl)
confState := raftpb.ConfState{
Voters: voters,
Learners: learners,

Check warning on line 112 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L109-L112

Added lines #L109 - L112 were not covered by tests
}

// create the v2 snaspshot file
raftSnap := raftpb.Snapshot{
Data: etcdserver.GetMembershipInfoInV2Format(lg, cl),
Metadata: raftpb.SnapshotMetadata{
Index: ci,
Term: term,
ConfState: confState,
},

Check warning on line 122 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L116-L122

Added lines #L116 - L122 were not covered by tests
}
sn := snap.New(lg, snapDir)
if err = sn.SaveSnap(raftSnap); err != nil {
return err

Check warning on line 126 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L124-L126

Added lines #L124 - L126 were not covered by tests
}

// save WAL snapshot record
w, err := wal.Open(lg, walDir, latestWALSnap)
if err != nil {
return err

Check warning on line 132 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L130-L132

Added lines #L130 - L132 were not covered by tests
}
defer w.Close()

Check warning on line 134 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L134

Added line #L134 was not covered by tests
// We must read all records to locate the tail of the last valid WAL file.
if _, _, _, err = w.ReadAll(); err != nil {
return err

Check warning on line 137 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L136-L137

Added lines #L136 - L137 were not covered by tests
}

return w.SaveSnapshot(walpb.Snapshot{Index: ci, Term: term, ConfState: &confState})

Check warning on line 140 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L140

Added line #L140 was not covered by tests
}

func getVotersAndLearners(cl *membership.RaftCluster) ([]uint64, []uint64) {
var (
voters []uint64
learners []uint64
)
for _, m := range cl.Members() {
if m.IsLearner {
learners = append(learners, uint64(m.ID))
continue

Check warning on line 151 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L143-L151

Added lines #L143 - L151 were not covered by tests
}

voters = append(voters, uint64(m.ID))

Check warning on line 154 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L154

Added line #L154 was not covered by tests
}

return voters, learners

Check warning on line 157 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L157

Added line #L157 was not covered by tests
}
80 changes: 61 additions & 19 deletions etcdutl/etcdutl/migrate_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ func (o *migrateOptions) AddFlags(cmd *cobra.Command) {

func (o *migrateOptions) Config() (*migrateConfig, error) {
c := &migrateConfig{
force: o.force,
lg: GetLogger(),
force: o.force,
dataDir: o.dataDir,
lg: GetLogger(),

Check warning on line 79 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L77-L79

Added lines #L77 - L79 were not covered by tests
}
var err error
dotCount := strings.Count(o.targetVersion, ".")
Expand All @@ -90,47 +91,75 @@ func (o *migrateOptions) Config() (*migrateConfig, error) {
return nil, fmt.Errorf(`target version %q not supported. Minimal "3.5"`, storageVersionToString(c.targetVersion))
}

dbPath := datadir.ToBackendFileName(o.dataDir)
c.be = backend.NewDefaultBackend(GetLogger(), dbPath)
return c, nil

Check warning on line 94 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L94

Added line #L94 was not covered by tests
}

type migrateConfig struct {
lg *zap.Logger
be backend.Backend
targetVersion *semver.Version
walVersion schema.WALVersion
dataDir string
force bool
}

walPath := datadir.ToWALDir(o.dataDir)
walSnap, err := getLatestWALSnap(c.lg, o.dataDir)
func (c *migrateConfig) finalize() error {
walPath := datadir.ToWALDir(c.dataDir)
walSnap, err := getLatestWALSnap(c.lg, c.dataDir)

Check warning on line 108 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L106-L108

Added lines #L106 - L108 were not covered by tests
if err != nil {
return nil, fmt.Errorf("failed to get the lastest snapshot: %w", err)
return fmt.Errorf("failed to get the lastest snapshot: %w", err)

Check warning on line 110 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L110

Added line #L110 was not covered by tests
}
w, err := wal.OpenForRead(c.lg, walPath, walSnap)
if err != nil {
return nil, fmt.Errorf(`failed to open wal: %w`, err)
return fmt.Errorf(`failed to open wal: %w`, err)

Check warning on line 114 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L114

Added line #L114 was not covered by tests
}
defer w.Close()
c.walVersion, err = wal.ReadWALVersion(w)
if err != nil {
return nil, fmt.Errorf(`failed to read wal: %w`, err)
return fmt.Errorf(`failed to read wal: %w`, err)

Check warning on line 119 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L119

Added line #L119 was not covered by tests
}

return c, nil
}

type migrateConfig struct {
lg *zap.Logger
be backend.Backend
targetVersion *semver.Version
walVersion schema.WALVersion
force bool
return nil

Check warning on line 122 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L122

Added line #L122 was not covered by tests
}

func migrateCommandFunc(c *migrateConfig) error {
dbPath := datadir.ToBackendFileName(c.dataDir)
c.be = backend.NewDefaultBackend(GetLogger(), dbPath)

Check warning on line 127 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L126-L127

Added lines #L126 - L127 were not covered by tests
defer c.be.Close()

tx := c.be.BatchTx()
current, err := schema.DetectSchemaVersion(c.lg, c.be.ReadTx())
if err != nil {
c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older")
c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older", zap.Error(err))

Check warning on line 133 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L133

Added line #L133 was not covered by tests
return err
}
if current == *c.targetVersion {
c.lg.Info("storage version up-to-date", zap.String("storage-version", storageVersionToString(&current)))
return nil
}

downgrade, err := isDowngrade(c.lg, tx, c.targetVersion)
if err != nil {
return err

Check warning on line 143 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L141-L143

Added lines #L141 - L143 were not covered by tests
}
if downgrade {

Check warning on line 145 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L145

Added line #L145 was not covered by tests
// Update cluster version
be := schema.NewMembershipBackend(c.lg, c.be)
be.MustSaveClusterVersionToBackend(c.targetVersion)

Check warning on line 148 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L147-L148

Added lines #L147 - L148 were not covered by tests

// forcibly create a v2 snapshot file
// TODO: remove in 3.8
if err = createV2SnapshotFromV3Store(c.dataDir, c.be); err != nil {
c.lg.Error("Failed to create v2 snapshot file", zap.Error(err))
return err

Check warning on line 154 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L152-L154

Added lines #L152 - L154 were not covered by tests
}
}

if err = c.finalize(); err != nil {
c.lg.Error("Failed to finalize config", zap.Error(err))
return err

Check warning on line 160 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L158-L160

Added lines #L158 - L160 were not covered by tests
}

err = schema.Migrate(c.lg, tx, c.walVersion, *c.targetVersion)
if err != nil {
if !c.force {
Expand All @@ -139,7 +168,9 @@ func migrateCommandFunc(c *migrateConfig) error {
c.lg.Info("normal migrate failed, trying with force", zap.Error(err))
migrateForce(c.lg, tx, c.targetVersion)
}

c.be.ForceCommit()

return nil
}

Expand All @@ -156,6 +187,17 @@ func migrateForce(lg *zap.Logger, tx backend.BatchTx, target *semver.Version) {
}
}

func isDowngrade(lg *zap.Logger, tx backend.BatchTx, target *semver.Version) (bool, error) {
tx.LockOutsideApply()
defer tx.Unlock()
ver, err := schema.UnsafeDetectSchemaVersion(lg, tx)
if err != nil {
lg.Error("Failed to detect current storage version", zap.Error(err))
return false, err

Check warning on line 196 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L190-L196

Added lines #L190 - L196 were not covered by tests
}
return target.LessThan(ver), nil

Check warning on line 198 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L198

Added line #L198 was not covered by tests
}

func storageVersionToString(ver *semver.Version) string {
return fmt.Sprintf("%d.%d", ver.Major, ver.Minor)
}
16 changes: 11 additions & 5 deletions server/etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,22 +256,28 @@ func (c *RaftCluster) SetVersionChangedNotifier(n *notify.Notifier) {
c.versionChanged = n
}

func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
c.Lock()
defer c.Unlock()

func (c *RaftCluster) UnsafeLoad() {
if c.be != nil {
c.version = c.be.ClusterVersionFromBackend()
c.members, c.removed = c.be.MustReadMembersFromBackend()
} else {
c.version = clusterVersionFromStore(c.lg, c.v2store)
c.members, c.removed = membersFromStore(c.lg, c.v2store)
}
c.buildMembershipMetric()

if c.be != nil {
c.downgradeInfo = c.be.DowngradeInfoFromBackend()
}
}

func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
c.Lock()
defer c.Unlock()

c.UnsafeLoad()

c.buildMembershipMetric()

sv := semver.Must(semver.NewVersion(version.Version))
if c.downgradeInfo != nil && c.downgradeInfo.Enabled {
c.lg.Info(
Expand Down
Loading

0 comments on commit acffdb5

Please sign in to comment.