Skip to content

Commit

Permalink
Create a v2 snapshot when running etcdutl migrate command
Browse files Browse the repository at this point in the history
Also added test to cover the etcdutl migrate command

Signed-off-by: Benjamin Wang <[email protected]>
  • Loading branch information
ahrtr committed Jan 13, 2025
1 parent a03bdaa commit 43a213e
Show file tree
Hide file tree
Showing 4 changed files with 256 additions and 45 deletions.
88 changes: 88 additions & 0 deletions etcdutl/etcdutl/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,19 @@ package etcdutl

import (
"errors"
"fmt"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.etcd.io/etcd/client/pkg/v3/logutil"
"go.etcd.io/etcd/pkg/v3/cobrautl"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/datadir"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/etcd/server/v3/storage/wal"
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
"go.etcd.io/raft/v3/raftpb"
Expand Down Expand Up @@ -68,3 +73,86 @@ func getLatestV2Snapshot(lg *zap.Logger, dataDir string) (*raftpb.Snapshot, erro

return snapshot, nil
}

func createV2SnapshotFromV3Store(dataDir string, be backend.Backend) error {
var (
lg = GetLogger()

snapDir = datadir.ToSnapDir(dataDir)
walDir = datadir.ToWALDir(dataDir)
)

ci, term := schema.ReadConsistentIndex(be.ReadTx())

cl := membership.NewCluster(lg)
cl.SetBackend(schema.NewMembershipBackend(lg, be))
cl.Load()

latestWALSnap, err := getLatestWALSnap(lg, dataDir)
if err != nil {
return err
}

// Each time before creating the v2 snapshot, etcdserve always flush
// the backend storage (bbolt db), so the consistent index should never
// less than the Index or term of the latest snapshot.
if ci < latestWALSnap.Index || term < latestWALSnap.Term {
// This should never happen
return fmt.Errorf("consistent_index [Index: %d, Term: %d] is less than the latest snapshot [Index: %d, Term: %d]", ci, term, latestWALSnap.Index, latestWALSnap.Term)
}

if ci == latestWALSnap.Index {
lg.Info("The latest snapshot is already up to date", zap.Int("consistent_index", ci))
return nil
}

voters, learners := getVotersAndLearners(cl)
confState := raftpb.ConfState{
Voters: voters,
Learners: learners,
}

// create the v2 snaspshot file
raftSnap := raftpb.Snapshot{
Data: etcdserver.GetMembershipInfoInV2Format(lg, cl),
Metadata: raftpb.SnapshotMetadata{
Index: ci,
Term: term,
ConfState: confState,
},
}
sn := snap.New(lg, snapDir)
if err = sn.SaveSnap(raftSnap); err != nil {
return err
}

// save WAL snapshot record
w, err := wal.Open(lg, walDir, latestWALSnap)
if err != nil {
return err
}
defer w.Close()
// We must read all records to locate the tail of the last valid WAL file.
if _, _, _, err = w.ReadAll(); err != nil {
return err
}

return w.SaveSnapshot(walpb.Snapshot{Index: ci, Term: term, ConfState: &confState})
}

func getVotersAndLearners(cl *membership.RaftCluster) ([]uint64, []uint64) {
var (
voters []uint64
learners []uint64
)
for _, m := range cl.Members() {
if m.IsLearner {
learners = append(learners, uint64(m.ID))
continue
}

voters = append(voters, uint64(m.ID))
}

return voters, learners
}
80 changes: 61 additions & 19 deletions etcdutl/etcdutl/migrate_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ func (o *migrateOptions) AddFlags(cmd *cobra.Command) {

func (o *migrateOptions) Config() (*migrateConfig, error) {
c := &migrateConfig{
force: o.force,
lg: GetLogger(),
force: o.force,
dataDir: o.dataDir,
lg: GetLogger(),
}
var err error
dotCount := strings.Count(o.targetVersion, ".")
Expand All @@ -90,47 +91,75 @@ func (o *migrateOptions) Config() (*migrateConfig, error) {
return nil, fmt.Errorf(`target version %q not supported. Minimal "3.5"`, storageVersionToString(c.targetVersion))
}

dbPath := datadir.ToBackendFileName(o.dataDir)
c.be = backend.NewDefaultBackend(GetLogger(), dbPath)
return c, nil
}

type migrateConfig struct {
lg *zap.Logger
be backend.Backend
targetVersion *semver.Version
walVersion schema.WALVersion
dataDir string
force bool
}

walPath := datadir.ToWALDir(o.dataDir)
walSnap, err := getLatestWALSnap(c.lg, o.dataDir)
func (c *migrateConfig) finalize() error {
walPath := datadir.ToWALDir(c.dataDir)
walSnap, err := getLatestWALSnap(c.lg, c.dataDir)
if err != nil {
return nil, fmt.Errorf("failed to get the lastest snapshot: %w", err)
return fmt.Errorf("failed to get the lastest snapshot: %w", err)
}
w, err := wal.OpenForRead(c.lg, walPath, walSnap)
if err != nil {
return nil, fmt.Errorf(`failed to open wal: %w`, err)
return fmt.Errorf(`failed to open wal: %w`, err)
}
defer w.Close()
c.walVersion, err = wal.ReadWALVersion(w)
if err != nil {
return nil, fmt.Errorf(`failed to read wal: %w`, err)
return fmt.Errorf(`failed to read wal: %w`, err)
}

return c, nil
}

type migrateConfig struct {
lg *zap.Logger
be backend.Backend
targetVersion *semver.Version
walVersion schema.WALVersion
force bool
return nil
}

func migrateCommandFunc(c *migrateConfig) error {
dbPath := datadir.ToBackendFileName(c.dataDir)
c.be = backend.NewDefaultBackend(GetLogger(), dbPath)
defer c.be.Close()

tx := c.be.BatchTx()
current, err := schema.DetectSchemaVersion(c.lg, c.be.ReadTx())
if err != nil {
c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older")
c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older", zap.Error(err))
return err
}
if current == *c.targetVersion {
c.lg.Info("storage version up-to-date", zap.String("storage-version", storageVersionToString(&current)))
return nil
}

downgrade, err := isDowngrade(c.lg, tx, c.targetVersion)
if err != nil {
return err
}
if downgrade {
// Update cluster version
be := schema.NewMembershipBackend(c.lg, c.be)
be.MustSaveClusterVersionToBackend(c.targetVersion)

// forcibly create a v2 snapshot file
// TODO: remove in 3.8
if err = createV2SnapshotFromV3Store(c.dataDir, c.be); err != nil {
c.lg.Error("Failed to create v2 snapshot file", zap.Error(err))
return err
}
}

if err = c.finalize(); err != nil {
c.lg.Error("Failed to finalize config", zap.Error(err))
return err
}

err = schema.Migrate(c.lg, tx, c.walVersion, *c.targetVersion)
if err != nil {
if !c.force {
Expand All @@ -139,7 +168,9 @@ func migrateCommandFunc(c *migrateConfig) error {
c.lg.Info("normal migrate failed, trying with force", zap.Error(err))
migrateForce(c.lg, tx, c.targetVersion)
}

c.be.ForceCommit()

return nil
}

Expand All @@ -156,6 +187,17 @@ func migrateForce(lg *zap.Logger, tx backend.BatchTx, target *semver.Version) {
}
}

func isDowngrade(lg *zap.Logger, tx backend.BatchTx, target *semver.Version) (bool, error) {
tx.LockOutsideApply()
defer tx.Unlock()
ver, err := schema.UnsafeDetectSchemaVersion(lg, tx)
if err != nil {
lg.Error("Failed to detect current storage version", zap.Error(err))
return false, err
}
return target.LessThan(ver), nil
}

func storageVersionToString(ver *semver.Version) string {
return fmt.Sprintf("%d.%d", ver.Major, ver.Minor)
}
16 changes: 11 additions & 5 deletions server/etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,22 +256,28 @@ func (c *RaftCluster) SetVersionChangedNotifier(n *notify.Notifier) {
c.versionChanged = n
}

func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
c.Lock()
defer c.Unlock()

func (c *RaftCluster) Load() {
if c.be != nil {
c.version = c.be.ClusterVersionFromBackend()
c.members, c.removed = c.be.MustReadMembersFromBackend()
} else {
c.version = clusterVersionFromStore(c.lg, c.v2store)
c.members, c.removed = membersFromStore(c.lg, c.v2store)
}
c.buildMembershipMetric()

if c.be != nil {
c.downgradeInfo = c.be.DowngradeInfoFromBackend()
}
}

func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
c.Lock()
defer c.Unlock()

c.Load()

c.buildMembershipMetric()

sv := semver.Must(semver.NewVersion(version.Version))
if c.downgradeInfo != nil && c.downgradeInfo.Enabled {
c.lg.Info(
Expand Down
Loading

0 comments on commit 43a213e

Please sign in to comment.