Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 125 additions & 60 deletions internal/cmds/cmds.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"io"
"os"
"path/filepath"
"slices"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -75,6 +76,9 @@ var (
RunCmd = runCmd
DataDir = constants.DataDir

// Used by unit tests to mock out executing the command
ExecCmdInDir = exec.ExecCmdInDir

ErrAlreadyProcessed = errors.New("the script configuration has already been processed, will not run again")
)

Expand All @@ -85,17 +89,23 @@ func update(ctx *log.Context, h types.HandlerEnvironment, report *types.RunComma
return "", "", err, exitCode
}

err = rehydrateMrSeqFilesForProblematicUpgrades(ctx, h, extensionEvents)
if err != nil {
// If we fail on update, then there's a risk we could re-execute the customer's script. Don't take that chance.
// By failing Update, the extension goal state will fail. WALA will try us again on the next goal state.
ctx.Log("event", "Unable to rehydrate mrseq files")
return "", "", err, constants.ExitCode_CouldNotRehydrateMrSeq
// Figure out the directories from which and to where we're upgrading. We cannot entirely rely on the environment variables from the Guest Agent
Comment thread
jscalev marked this conversation as resolved.
upgradeFromVersionDirectory, upgradeToVersionDirectory, upgradeFromVersion := determineUpgradeVersionDirectories(ctx, extensionEvents)

if compareVersions(constants.FirstVersionNoRehydration, upgradeFromVersion) > 0 {
// Rehydrate any mrseq files from the corresponding status file.
err = rehydrateMrSeqFilesForProblematicUpgrades(ctx, upgradeFromVersionDirectory, upgradeToVersionDirectory, extensionEvents)
if err != nil {
// If we fail on update, then there's a risk we could re-execute the customer's script. Don't take that chance.
// By failing Update, the extension goal state will fail. WALA will try us again on the next goal state.
ctx.Log("event", "Unable to rehydrate mrseq files")
return "", "", err, constants.ExitCode_CouldNotRehydrateMrSeq
}
}

// Copy any .mrseq or .status files -Most Recently executed Sequence number files and status files for Run Commands from old version to new version.
// This is necessary to prevent rerunning of already executed Run Commands after upgrade of extension version, and also return their statuses.
copyError := CopyStateForUpdate(ctx, extensionEvents)
copyError := CopyStateForUpdate(ctx, upgradeFromVersionDirectory, upgradeToVersionDirectory, extensionEvents)
if copyError != nil {
return "", "", errors.Wrap(copyError, "Migrating *.mrseq or .status files failed during update."), constants.ExitCode_CopyStateForUpdateFailed
}
Expand Down Expand Up @@ -417,15 +427,15 @@ func resetSeqNum(ctx log.Logger, mrseqPath string, extensionEvents *extensioneve
}

// Copy state of the extension from old version to new version during update (.mrseq files, .status files)
func CopyStateForUpdate(ctx log.Logger, extensionEvents *extensionevents.ExtensionEventManager) error {
func CopyStateForUpdate(ctx log.Logger, upgradeFromVersionDirectory string, upgradeToVersionDirectory string, extensionEvents *extensionevents.ExtensionEventManager) error {
// Copy .mrseq files (Most Recently executed Sequence number) that helps determine whether a sequence number of Run Command has been previously executed or not.
mrseqFilesNameList, mrseqFileCopyErr := copyFiles(ctx, constants.MrSeqFileExtension, "", extensionEvents)
mrseqFilesNameList, mrseqFileCopyErr := copyFiles(ctx, constants.MrSeqFileExtension, "", upgradeFromVersionDirectory, upgradeToVersionDirectory, extensionEvents)
if mrseqFileCopyErr != nil {
return mrseqFileCopyErr
}

// Copy .status files of already executed sequence numbers
_, statusFileCopyErr := copyFiles(ctx, ".status", constants.StatusFileDirectory, extensionEvents)
_, statusFileCopyErr := copyFiles(ctx, ".status", constants.StatusFileDirectory, upgradeFromVersionDirectory, upgradeToVersionDirectory, extensionEvents)
if statusFileCopyErr != nil {
return statusFileCopyErr
}
Expand All @@ -440,41 +450,102 @@ func CopyStateForUpdate(ctx log.Logger, extensionEvents *extensionevents.Extensi
return nil
}

func rehydrateMrSeqFilesForProblematicUpgrades(ctx *log.Context, h types.HandlerEnvironment, extensionEvents *extensionevents.ExtensionEventManager) error {
// First, determine whether we're upgrading from a 'problematic' version, defined as one
// where we mistakenly deleted the mrseq files in the Disable call
newExtensionVersion := os.Getenv(constants.ExtensionVersionEnvName)
oldExtensionVersion := os.Getenv(constants.ExtensionVersionUpdatingFromEnvName)
newExtensionDirectory := os.Getenv(constants.ExtensionPathEnvName)
oldExtensionDirectory := strings.ReplaceAll(newExtensionDirectory, newExtensionVersion, oldExtensionVersion)

// The following are problematic versions:
// Production: 1.3.17
// Test: 1.8.0, 1.9.0
isProblematicVersion := false
isTestExtension := strings.Contains(oldExtensionDirectory, constants.RunCommandTestExtensionName)
if isTestExtension {
isProblematicVersion = (oldExtensionVersion == constants.FirstTestVersionThatDeletesMrSeqFiles || oldExtensionVersion == constants.SecondTestVersionThatDeletesMrSeqFiles)
func determineUpgradeVersionDirectories(ctx *log.Context, extensionEvents *extensionevents.ExtensionEventManager) (upgradeFromVersionDirectory string, upgradeToVersionDirectory string, upgradeFromVersion string) {
// These two environment variables will tell us the extension versions involved, but won't actually tell us
// the from/to versions
upgradeToVersion := os.Getenv(constants.VersionEnvName)
extensionVersionValue := os.Getenv(constants.ExtensionVersionEnvName)
updatingFromVersionValue := os.Getenv(constants.ExtensionVersionUpdatingFromEnvName)

// In some WALA versions, there is a bug where on downgrade it will send the same value for upgradeToVersion and upgradeFromVersion
// Newer versions will send the correct value for upgradeToVersion
// Therefore:
// Action | Old WALA | New WALA
// ---------------------------------------------------| -------------------------------------
// Downgrade | upgradeToVersion == upgradeFromVersion | upgradeToVersion < upgradeFromVersion
// ------------------------------------------------------------------------------------------
// Upgrade | upgradeToVersion > upgradeFromVersion | upgradeToVersion > upgradeFromVersion
// ------------------------------------------------------------------------------------------
if upgradeToVersion == updatingFromVersionValue {
// This is a downgrade. We therefore need to use the extension version
upgradeFromVersion = extensionVersionValue
} else {
isProblematicVersion = (oldExtensionVersion == constants.ProductionVersionThatDeletesMrSeqFiles)
// This is an upgrade on the old WALA or an upgrade or downgrade on the new WALA
upgradeFromVersion = updatingFromVersionValue
}

if isProblematicVersion {
message := fmt.Sprintf("Rehydrating mrseq files deleted by version '%s' using status files", oldExtensionVersion)
ctx.Log("message", message)
extensionEvents.LogInformationalEvent("rehydratemrseq", message)
return doRehydrateMrSeqFilesForProblematicUpgrades(ctx, oldExtensionDirectory, newExtensionDirectory, extensionEvents)
// Determine the corresponding extension directories
extensionDirectory := os.Getenv(constants.ExtensionPathEnvName)
if strings.Contains(extensionDirectory, upgradeToVersion) {
upgradeToVersionDirectory = extensionDirectory
upgradeFromVersionDirectory = strings.ReplaceAll(extensionDirectory, upgradeToVersion, upgradeFromVersion)
} else {
message := fmt.Sprintf("Previous extension version '%s' does not require mrseq hydration", oldExtensionVersion)
ctx.Log("message", message)
extensionEvents.LogInformationalEvent("rehydratemrseq", message)
upgradeFromVersionDirectory = extensionDirectory
upgradeToVersionDirectory = strings.ReplaceAll(extensionDirectory, upgradeFromVersion, upgradeToVersion)
}

return nil
msg := fmt.Sprintf("determineUpgradeVersionDirectories: move from='%s' to='%s'", upgradeFromVersionDirectory, upgradeToVersionDirectory)
ctx.Log("message", msg)
extensionEvents.LogInformationalEvent("determineUpgradeVersions", msg)

return upgradeFromVersionDirectory, upgradeToVersionDirectory, upgradeFromVersion
}

// compareVersions compares two dotted version strings (e.g., "2.1", "2.1.0", "2.1.0.3").
// Returns: +1 if a>b, -1 if a<b, 0 if equal.
func compareVersions(a, b string) int {
aParts := splitVersion(a)
bParts := splitVersion(b)

// Normalize lengths to the same number of components (4 segments max is common for extensions)
const maxSeg = 4
aParts = padTo(aParts, maxSeg)
bParts = padTo(bParts, maxSeg)

for i := 0; i < maxSeg; i++ {
if aParts[i] > bParts[i] {
return 1
}
if aParts[i] < bParts[i] {
return -1
}
}
return 0
}

func doRehydrateMrSeqFilesForProblematicUpgrades(ctx *log.Context, oldExtensionDirectory string, newExtensionDirectory string, extensionEvents *extensionevents.ExtensionEventManager) error {
oldExtensionStatusDirectory := filepath.Join(oldExtensionDirectory, constants.StatusFileDirectory)
// splitVersion converts "x.y.z.t" → []int{ x, y, z, t } (non-numeric parts treated as 0).
func splitVersion(v string) []int {
parts := strings.Split(v, ".")
out := make([]int, 0, len(parts))
for _, p := range parts {
p = strings.TrimSpace(p)
n, err := strconv.Atoi(p)
if err != nil {
n = 0
}
Comment thread
jscalev marked this conversation as resolved.
out = append(out, n)
}
return out
}

func padTo(in []int, size int) []int {

if len(in) >= size {
return in[:size]
}
n := size - len(in)

// Ensure capacity for the extra n elements without reallocating.
out := slices.Grow(in, n)

// Extend length to size by appending n zero-values.
out = append(out, make([]int, n)...)
return out

}
Comment thread
jscalev marked this conversation as resolved.

func rehydrateMrSeqFilesForProblematicUpgrades(ctx *log.Context, updateFromVersionDirectory string, updateToVersionDirectory string, extensionEvents *extensionevents.ExtensionEventManager) error {
oldExtensionStatusDirectory := filepath.Join(updateFromVersionDirectory, constants.StatusFileDirectory)

extensionStatusDirectoryFDRef, err := os.Open(oldExtensionStatusDirectory)
if err != nil {
Expand All @@ -489,7 +560,7 @@ func doRehydrateMrSeqFilesForProblematicUpgrades(ctx *log.Context, oldExtensionD
// If we find any status files missing their corresponding mrseq, then rehydrate it by taking the seqNo from the status file name
statusFiles, err := extensionStatusDirectoryFDRef.ReadDir(0)
if err != nil {
errMessage := fmt.Sprintf("could not read directory entries from status directory %s", oldExtensionDirectory)
errMessage := fmt.Sprintf("could not read directory entries from status directory %s", updateFromVersionDirectory)
ctx.Log("message", errMessage)
extensionEvents.LogErrorEvent("rehydratemrseq", errMessage)
return errors.Wrap(err, errMessage)
Expand All @@ -507,7 +578,7 @@ func doRehydrateMrSeqFilesForProblematicUpgrades(ctx *log.Context, oldExtensionD
seqNo := parts[1]
seqNoAsInt, _ := strconv.Atoi(seqNo)
mrSeqFileName := extensionName + constants.MrSeqFileExtension
mrSeqFilePath := filepath.Join(newExtensionDirectory, mrSeqFileName)
mrSeqFilePath := filepath.Join(updateToVersionDirectory, mrSeqFileName)

_, err = os.Stat(mrSeqFilePath)
if err != nil {
Expand Down Expand Up @@ -563,53 +634,47 @@ func doRehydrateMrSeqFilesForProblematicUpgrades(ctx *log.Context, oldExtensionD
}

// Copy files like *.mrseq (Most Recently executed Sequence number), .status files from old extension version to new extension version during update.
func copyFiles(ctx log.Logger, fileExtensionSuffix string, extensionSubdirectory string, extensionEvents *extensionevents.ExtensionEventManager) (*list.List, error) {
func copyFiles(ctx log.Logger, fileExtensionSuffix string, extensionSubdirectory string, upgradeFromVersionDirectory string, upgradeToVersionDirectory string, extensionEvents *extensionevents.ExtensionEventManager) (*list.List, error) {

newExtensionVersion := os.Getenv(constants.ExtensionVersionEnvName)
oldExtensionVersion := os.Getenv(constants.ExtensionVersionUpdatingFromEnvName)

message := fmt.Sprintf("Migrating '%s' files from extension version '%s' to '%s'", fileExtensionSuffix, oldExtensionVersion, newExtensionVersion)
message := fmt.Sprintf("Migrating '%s' files from '%s' to '%s'", fileExtensionSuffix, upgradeFromVersionDirectory, upgradeToVersionDirectory)
ctx.Log("message", message)
extensionEvents.LogInformationalEvent("copyfiles", message)

newExtensionDirectory := os.Getenv(constants.ExtensionPathEnvName)
oldExtensionDirectory := strings.ReplaceAll(newExtensionDirectory, newExtensionVersion, oldExtensionVersion)

// Append subdirectory like "status" under extension folder if provided.
if extensionSubdirectory != "" {
newExtensionDirectory = filepath.Join(newExtensionDirectory, extensionSubdirectory)
oldExtensionDirectory = filepath.Join(oldExtensionDirectory, extensionSubdirectory)
upgradeToVersionDirectory = filepath.Join(upgradeToVersionDirectory, extensionSubdirectory)
upgradeFromVersionDirectory = filepath.Join(upgradeFromVersionDirectory, extensionSubdirectory)

// Create subdirectory like "status" directory if it does not exist
_, err := os.Open(newExtensionDirectory)
_, err := os.Open(upgradeToVersionDirectory)
if err != nil {
errr := os.Mkdir(newExtensionDirectory, 0700)
errr := os.Mkdir(upgradeToVersionDirectory, 0700)
if errr != nil {
errMessage := fmt.Sprintf("Failed to create directory '%s'", newExtensionDirectory)
errMessage := fmt.Sprintf("Failed to create directory '%s'", upgradeToVersionDirectory)
extensionEvents.LogErrorEvent("copyfiles", errMessage)
return nil, errors.Wrap(errr, errMessage)
}
}
}

if oldExtensionDirectory == "" || newExtensionDirectory == "" {
if upgradeFromVersionDirectory == "" || upgradeToVersionDirectory == "" {
errMessage := "oldExtesionDirectory or newExtensionDirectory is empty"
extensionEvents.LogErrorEvent("copyfiles", errMessage)
return nil, errors.New(errMessage)
}

// Check if the directory exists
sourceDirectoryFDRef, err := os.Open(oldExtensionDirectory)
sourceDirectoryFDRef, err := os.Open(upgradeFromVersionDirectory)
if err != nil {
errMessage := fmt.Sprintf("could not open sourceDirectory %s", oldExtensionDirectory)
errMessage := fmt.Sprintf("could not open sourceDirectory %s", upgradeFromVersionDirectory)
ctx.Log("message", errMessage)
extensionEvents.LogErrorEvent("copyfiles", errMessage)
return nil, errors.Wrap(err, errMessage)
}

directoryEntries, err := sourceDirectoryFDRef.ReadDir(0)
if err != nil {
errMessage := fmt.Sprintf("could not read directory entries from sourceDirectory %s", oldExtensionDirectory)
errMessage := fmt.Sprintf("could not read directory entries from sourceDirectory %s", upgradeFromVersionDirectory)
ctx.Log("message", errMessage)
extensionEvents.LogErrorEvent("copyfiles", errMessage)
return nil, errors.Wrap(err, errMessage)
Expand All @@ -622,8 +687,8 @@ func copyFiles(ctx log.Logger, fileExtensionSuffix string, extensionSubdirectory
fileName := dirEntry.Name()

if strings.HasSuffix(fileName, fileExtensionSuffix) {
sourceFileFullPath := filepath.Join(oldExtensionDirectory, fileName)
destinationFileFullPath := filepath.Join(newExtensionDirectory, fileName)
sourceFileFullPath := filepath.Join(upgradeFromVersionDirectory, fileName)
destinationFileFullPath := filepath.Join(upgradeToVersionDirectory, fileName)

sourceFile, sourceFileOpenError := os.Open(sourceFileFullPath)
if sourceFileOpenError != nil {
Expand Down Expand Up @@ -660,7 +725,7 @@ func copyFiles(ctx log.Logger, fileExtensionSuffix string, extensionSubdirectory
}
}

message = fmt.Sprintf("Migrated %d '%s' files from extension version '%s' to '%s'", numberOfFilesMigrated, fileExtensionSuffix, oldExtensionVersion, newExtensionVersion)
message = fmt.Sprintf("Migrated %d '%s' files from extension version '%s' to '%s'", numberOfFilesMigrated, fileExtensionSuffix, upgradeFromVersionDirectory, upgradeToVersionDirectory)
ctx.Log("message", message)
extensionEvents.LogInformationalEvent("copyfiles", message)

Expand Down Expand Up @@ -867,7 +932,7 @@ func runCmd(ctx *log.Context, dir string, scriptFilePath string, cfg *handlerset
defer pid.DeleteCurrentPidAndStartTime(metadata.PidFilePath)

begin := time.Now()
err, exitCode = exec.ExecCmdInDir(ctx, scriptFilePath, dir, cfg)
err, exitCode = ExecCmdInDir(ctx, scriptFilePath, dir, cfg)
elapsed := time.Since(begin)
isSuccess := err == nil

Expand Down
Loading
Loading