Skip to content

Commit 67a2193

Browse files
authored
Add code to handle disable actions and support for merging HGAP immediate GS with local GS cache (#53)
* Create go.yml * initial changes to support disabling a command * Print payload * update bits * update bits * Only report not empty statuses * Only report not empty statuses * Only report not empty statuses * Update bits * Update bits * Update bits * Update bits * Update bits * Update status to uploadf * Update status to uploadf * update bits * Add extension name in status * add logging for reporting status * change the format of the status to upload * change the format of the status to upload * change the format of the status to upload * update bits * Fix unit tests * Add notification when the extension is deleted * Add notification when the extension is deleted * Handle non-modified vmsettings * Remove unused bits * Fix comment * Fix unit test * Add fake server for other unit tests * Reset seqnum when disabling extensions * Add code to store in the cache goal states in terminal states * Remove status after it's beeing store * Add some logging * Add logs * Remove disable goal states after they were processed * Report goal states in terminal status to HGAP * Remove disable status from local cache * Add logging * Change the check for when the extension is disabled * Fix issue of not reporting back updated goal states * Change the frequency to report the status of a command from 30 to 15 seconds * remove unused error * Change file extension of immediate goal states to make sure it gets copied when upgrading the extension * Test logs * Add changes to immediately storing final states in local file * Use correct StatusType strings * Lock status file to make sure it keeps consistent * Fix unit tests
1 parent f6419b8 commit 67a2193

19 files changed

Lines changed: 375 additions & 85 deletions

internal/cmds/cmds.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/appendblob"
2121
"github.com/Azure/azure-sdk-for-go/storage"
2222
"github.com/Azure/run-command-handler-linux/internal/cleanup"
23+
"github.com/Azure/run-command-handler-linux/internal/commandProcessor"
2324
"github.com/Azure/run-command-handler-linux/internal/constants"
2425
"github.com/Azure/run-command-handler-linux/internal/exec"
2526
"github.com/Azure/run-command-handler-linux/internal/files"
@@ -39,7 +40,7 @@ import (
3940

4041
const (
4142
maxScriptSize = 256 * 1024
42-
updateStatusInSeconds = 30
43+
updateStatusInSeconds = 15
4344
)
4445

4546
const (
@@ -86,13 +87,17 @@ func update(ctx *log.Context, h types.HandlerEnvironment, report *types.RunComma
8687
}
8788

8889
func disable(ctx *log.Context, h types.HandlerEnvironment, report *types.RunCommandInstanceView, metadata types.RCMetadata, c types.Cmd) (string, string, error, int) {
89-
exitCode, err := immediatecmds.Disable(ctx, h, metadata.ExtName, metadata.SeqNum)
90-
if err != nil {
91-
return "", "", err, exitCode
90+
extensionHandlerName := commandProcessor.GetExtensionName(ctx)
91+
if extensionHandlerName == constants.ImmediateRunCommandHandlerName {
92+
exitCode, err := immediatecmds.Disable(ctx, h, metadata.ExtName, metadata.SeqNum)
93+
if err != nil {
94+
return "", "", err, exitCode
95+
}
9296
}
9397

9498
ctx.Log("event", "disable")
9599
pid.KillPreviousExtension(ctx, metadata.PidFilePath)
100+
resetSeqNum(ctx, metadata.MostRecentSequence)
96101
return "", "", nil, constants.ExitCode_Okay
97102
}
98103

@@ -335,10 +340,16 @@ func checkAndSaveSeqNum(ctx log.Logger, seq int, mrseqPath string) (shouldExit b
335340
return false, nil
336341
}
337342

343+
// resetSeqNum deletes the seqNum file to reset the sequence number
344+
func resetSeqNum(ctx log.Logger, mrseqPath string) {
345+
ctx.Log("event", "resetting seqnum by deleting file", "path", mrseqPath)
346+
os.Remove(mrseqPath)
347+
}
348+
338349
// Copy state of the extension from old version to new version during update (.mrseq files, .status files)
339350
func CopyStateForUpdate(ctx log.Logger) error {
340351
// Copy .mrseq files (Most Recently executed Sequence number) that helps determine whether a sequence number of Run Command has been previously executed or not.
341-
mrseqFilesNameList, mrseqFileCopyErr := copyFiles(ctx, ".mrseq", "")
352+
mrseqFilesNameList, mrseqFileCopyErr := copyFiles(ctx, constants.MrSeqFileExtension, "")
342353
if mrseqFileCopyErr != nil {
343354
return mrseqFileCopyErr
344355
}
@@ -536,7 +547,7 @@ func createDummyStatusFilesIfNeeded(ctx log.Logger, mrseqFilesNameList *list.Lis
536547
continue
537548
}
538549

539-
statusReport := types.NewStatusReport(types.StatusSuccess, "Enable", instanceViewMessage)
550+
statusReport := types.NewStatusReport(types.StatusSuccess, "Enable", instanceViewMessage, extensionName)
540551
rootStatusJson, err = status.MarshalStatusReportIntoJson(statusReport, true)
541552
if err != nil {
542553
errorMessage = fmt.Sprintf("failed to marshal status report into json for status file '%s' with error '%s'", statusFilePath, err.Error())

internal/commandProcessor/commandProcessor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func getRequiredInitialVariables(ctx *log.Context) (types.HandlerEnvironment, st
121121
return hEnv, extensionName, seqNum, errors.Wrap(err, "failed to parse handlerEnv")
122122
}
123123

124-
extensionName = getExtensionName(ctx)
124+
extensionName = GetExtensionName(ctx)
125125
seqNum, err = getSeqNum(&ctx, hEnv, extensionName)
126126
if err != nil {
127127
return hEnv, extensionName, seqNum, errors.Wrap(err, "failed to get seqNum")
@@ -162,7 +162,7 @@ func getHandlerEnv(ctx *log.Context) (types.HandlerEnvironment, error) {
162162
return hEnv, nil
163163
}
164164

165-
func getExtensionName(ctx *log.Context) string {
165+
func GetExtensionName(ctx *log.Context) string {
166166
extensionName := os.Getenv(constants.ConfigExtensionNameEnvName)
167167
ctx.Log("extensionName", extensionName)
168168
return extensionName

internal/commandProcessor/commandProcessor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ func Test_GetExtNameFromEnvVariable(t *testing.T) {
112112
extName := "testExtension"
113113
os.Setenv(constants.ConfigExtensionNameEnvName, extName)
114114

115-
actualExtName := getExtensionName(ctx)
115+
actualExtName := GetExtensionName(ctx)
116116
require.Equal(t, extName, actualExtName)
117117
}
118118

internal/constants/constants.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ const (
2525

2626
StatusFileDirectory = "status"
2727

28+
// The directory where the immediate run command status that have reached the terminal status are stored.
29+
ImmediateStatusFileDirectory = "status"
30+
2831
// General failed exit code when extension provisioning fails due to service errors.
2932
FailedExitCodeGeneral = -1
3033

@@ -51,4 +54,13 @@ const (
5154
// The path of the extension in the VM with full name. This value is provided by the agent for all commands.
5255
// See more in: https://github.com/Azure/azure-vmextension-publishing/wiki/2.0-Partner-Guide-Handler-Design-Details#236-summary
5356
ExtensionPathEnvName = "AZURE_GUEST_AGENT_EXTENSION_PATH"
57+
58+
// The name of the immediate run command service
59+
ImmediateRunCommandHandlerName = "runCommandService"
60+
61+
// The time to wait between each poll of the goal states
62+
PolingIntervalInSeconds = 1
63+
64+
// The name of the file that contains the immediate goal states that reached the terminal status
65+
ImmediateGoalStatesInTerminalStatusFileName = "immediateGoalStatesInTerminalStatusFile.status"
5466
)

internal/goalstate/goalstate.go

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,15 @@ import (
1919

2020
const (
2121
enableCommand string = "enable"
22+
disableCommand string = "disable"
2223
maxExecutionTimeInMinutes int32 = 90
2324
)
2425

26+
var statusToCommandMap = map[string]string{
27+
"enabled": enableCommand,
28+
"disabled": disableCommand,
29+
}
30+
2531
// HandleImmediateGoalState handles the immediate goal state by executing the command and waiting for it to finish.
2632
// ctx: The logger context.
2733
// setting: The settings for the command.
@@ -32,12 +38,14 @@ func HandleImmediateGoalState(ctx *log.Context, setting settings.SettingsCommon,
3238
err := make(chan error)
3339
go startAsync(ctx, setting, notifier, done, err)
3440
select {
35-
case <-err:
36-
return constants.ExitCode_ImmediateTaskFailed, errors.Wrapf(<-err, "error when trying to execute goal state")
41+
case e := <-err:
42+
ctx.Log("error", fmt.Sprintf("error when trying to execute goal state: %v", e))
43+
return constants.ExitCode_ImmediateTaskFailed, errors.Wrapf(e, "error when trying to execute goal state")
3744
case <-done:
3845
ctx.Log("message", "goal state successfully finished")
3946
return constants.ExitCode_Okay, nil
4047
case <-time.After(time.Minute * time.Duration(maxExecutionTimeInMinutes)):
48+
ctx.Log("message", "timeout when trying to execute goal state")
4149
return constants.ExitCode_ImmediateTaskTimeout, errors.New("timeout when trying to execute goal state")
4250
}
4351
}
@@ -52,9 +60,11 @@ func ReportFinalStatusForImmediateGoalState(ctx *log.Context, notifier *observer
5260
return errors.New("notifier is nil. Cannot report status to HGAP")
5361
}
5462

55-
cmd, ok := commands.Cmds[enableCommand]
63+
extensionState := goalStateKey.RuntimeSettingsState
64+
cmdToReport := statusToCommandMap[extensionState]
65+
cmd, ok := commands.Cmds[cmdToReport]
5666
if !ok {
57-
return errors.New("missing enable command")
67+
return errors.New(fmt.Sprintf("missing command %v", extensionState))
5868
}
5969

6070
if !cmd.ShouldReportStatus {
@@ -67,7 +77,7 @@ func ReportFinalStatusForImmediateGoalState(ctx *log.Context, notifier *observer
6777
return errors.Wrapf(err, "failed to marshal instance view")
6878
}
6979

70-
statusItem, err := status.GetSingleStatusItem(ctx, statusType, cmd, string(msg))
80+
statusItem, err := status.GetSingleStatusItem(ctx, statusType, cmd, string(msg), goalStateKey.ExtensionName)
7181
if err != nil {
7282
return errors.Wrap(err, "failed to get status item")
7383
}
@@ -88,29 +98,34 @@ func startAsync(ctx *log.Context, setting settings.SettingsCommon, notifier *obs
8898
return
8999
}
90100

91-
cmd, ok := commands.Cmds[enableCommand]
101+
extensionState := *setting.ExtensionState
102+
ctx.Log("message", fmt.Sprintf("starting command for extension state %v", extensionState))
103+
104+
cmdToReport := statusToCommandMap[extensionState]
105+
cmd, ok := commands.Cmds[cmdToReport]
92106
if !ok {
93-
err <- errors.New("missing enable command")
107+
err <- errors.New(fmt.Sprintf("missing command %v", extensionState))
94108
return
95109
}
96110

97111
// Overwrite function to report status to HGAP. This function prepares the status to be sent to the HGAP and then calls the notifier to send it.
98112
cmd.Functions.ReportStatus = func(ctx *log.Context, _ types.HandlerEnvironment, metadata types.RCMetadata, statusType types.StatusType, c types.Cmd, msg string) error {
99113
if !c.ShouldReportStatus {
100-
ctx.Log("status", "not reported for operation (by design)")
114+
ctx.Log("status", fmt.Sprintf("status not reported for operation %v (by design)", c.Name))
101115
return nil
102116
}
103117

104-
statusItem, err := status.GetSingleStatusItem(ctx, statusType, c, msg)
118+
statusItem, err := status.GetSingleStatusItem(ctx, statusType, c, msg, metadata.ExtName)
105119
if err != nil {
106120
return errors.Wrap(err, "failed to get status item")
107121
}
108122

109123
ctx.Log("message", fmt.Sprintf("reporting status by notifying the observer to then send to HGAP for extension name %v and seq number %v", metadata.ExtName, metadata.SeqNum))
110124
return notifier.Notify(types.StatusEventArgs{
111125
StatusKey: types.GoalStateKey{
112-
ExtensionName: metadata.ExtName,
113-
SeqNumber: metadata.SeqNum,
126+
ExtensionName: metadata.ExtName,
127+
SeqNumber: metadata.SeqNum,
128+
RuntimeSettingsState: extensionState,
114129
},
115130
TopLevelStatus: statusItem,
116131
})

internal/goalstate/goalstate_test.go

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,16 @@ import (
1818
func Test_handleSkippedImmediateGoalState_NotifyObserver(t *testing.T) {
1919
ctx := log.NewContext(log.NewSyncLogger(log.NewLogfmtLogger(os.Stdout))).With("time", log.DefaultTimestamp)
2020

21+
// Create a temporary directory for the test
22+
tempDir, err := os.MkdirTemp("", "")
23+
defer os.RemoveAll(tempDir) // Clean up the temp directory after the test
24+
require.NoError(t, err, "Failed to create temp directory")
25+
26+
err = os.Mkdir(tempDir+"/status", 0755)
27+
require.NoError(t, err, "Failed to create status directory")
28+
29+
os.Setenv(constants.ExtensionPathEnvName, tempDir)
30+
2131
ctx.Log("msg", "Creating status observer")
2232
obs := &status.StatusObserver{}
2333
obs.Initialize(ctx)
@@ -28,8 +38,9 @@ func Test_handleSkippedImmediateGoalState_NotifyObserver(t *testing.T) {
2838
notifier.Register(obs)
2939

3040
goalStateKey := types.GoalStateKey{
31-
ExtensionName: "test",
32-
SeqNumber: 1,
41+
ExtensionName: "test",
42+
SeqNumber: 1,
43+
RuntimeSettingsState: "enabled",
3344
}
3445

3546
errorMsg := "Test error message"
@@ -43,15 +54,22 @@ func Test_handleSkippedImmediateGoalState_NotifyObserver(t *testing.T) {
4354
EndTime: time.Now().UTC().Format(time.RFC3339),
4455
}
4556

46-
err := ReportFinalStatusForImmediateGoalState(ctx, notifier, goalStateKey, types.StatusSkipped, &instView)
57+
err = ReportFinalStatusForImmediateGoalState(ctx, notifier, goalStateKey, types.StatusSkipped, &instView)
4758
require.Nil(t, err, "HandleSkippedImmediateGoalState should not return an error")
4859

4960
ctx.Log("msg", "Unregistering observer")
5061
notifier.Unregister()
5162

5263
ctx.Log("msg", "Check that the status item was received by the observer")
53-
latestStatus, ok := obs.GetStatusForKey(goalStateKey)
54-
require.True(t, ok, "Status item should be found")
64+
_, ok := obs.GetStatusForKey(goalStateKey)
65+
require.False(t, ok, "Status item should not be found because it is in a terminal state and saved locally")
66+
67+
// Get the status item from the local file
68+
items, err := status.GetGoalStatesInTerminalStatus(ctx)
69+
require.Nil(t, err, "GetGoalStatesInTerminalStatus should not return an error")
70+
require.Len(t, items, 1, "There should be one status item in the local file")
71+
72+
latestStatus := items[0]
5573
require.Equal(t, "Enable", latestStatus.Status.Operation, "Operation should be equal")
5674
require.Equal(t, types.StatusSkipped, latestStatus.Status.Status, "Status should be equal")
5775

internal/immediateruncommand/immediateruncommand.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ func StartImmediateRunCommand(ctx *log.Context) error {
4242
communicator := hostgacommunicator.NewHostGACommunicator(vmRequestManager)
4343
goalStateEventObserver.Initialize(ctx)
4444

45+
ctx.Log("message", fmt.Sprintf("Polling for goal state every %v seconds", constants.PolingIntervalInSeconds))
4546
for {
4647
ctx.Log("message", "processing new immediate run command goal states. Last processed ETag: "+lastProcessedETag)
4748
newProcessedETag, err := processImmediateRunCommandGoalStates(ctx, communicator, lastProcessedETag)
@@ -52,6 +53,7 @@ func StartImmediateRunCommand(ctx *log.Context) error {
5253
time.Sleep(time.Second * time.Duration(5))
5354
} else {
5455
lastProcessedETag = newProcessedETag
56+
time.Sleep(time.Second * time.Duration(constants.PolingIntervalInSeconds))
5557
}
5658
}
5759
}
@@ -69,10 +71,17 @@ func processImmediateRunCommandGoalStates(ctx *log.Context, communicator hostgac
6971
return newEtag, errors.Wrapf(err, "could not retrieve goal states for immediate run command")
7072
}
7173

74+
// VM Settings have not changed and we should not process any new goal states
75+
if newEtag == lastProcessedETag {
76+
return newEtag, nil
77+
}
78+
7279
var goalStateKeys []types.GoalStateKey
7380
for _, s := range goalStates {
7481
for _, setting := range s.Settings {
75-
goalStateKeys = append(goalStateKeys, types.GoalStateKey{ExtensionName: *setting.ExtensionName, SeqNumber: *setting.SeqNo})
82+
if setting.ExtensionState != nil {
83+
goalStateKeys = append(goalStateKeys, types.GoalStateKey{ExtensionName: *setting.ExtensionName, SeqNumber: *setting.SeqNo, RuntimeSettingsState: *setting.ExtensionState})
84+
}
7685
}
7786
}
7887
goalStateEventObserver.RemoveProcessedGoalStates(goalStateKeys)
@@ -90,7 +99,7 @@ func processImmediateRunCommandGoalStates(ctx *log.Context, communicator hostgac
9099
executingTasks.Increment()
91100

92101
ctx.Log("message", "adding goal state to the event map")
93-
statusKey := types.GoalStateKey{ExtensionName: *state.ExtensionName, SeqNumber: *state.SeqNo}
102+
statusKey := types.GoalStateKey{ExtensionName: *state.ExtensionName, SeqNumber: *state.SeqNo, RuntimeSettingsState: *state.ExtensionState}
94103
defaultTopStatus := types.StatusItem{}
95104
status := types.StatusEventArgs{TopLevelStatus: defaultTopStatus, StatusKey: statusKey}
96105

@@ -116,7 +125,7 @@ func processImmediateRunCommandGoalStates(ctx *log.Context, communicator hostgac
116125
StartTime: startTime,
117126
EndTime: time.Now().UTC().Format(time.RFC3339),
118127
}
119-
goalstate.ReportFinalStatusForImmediateGoalState(ctx, notifier, statusKey, types.StatusSkipped, &instView)
128+
goalstate.ReportFinalStatusForImmediateGoalState(ctx, notifier, statusKey, types.StatusError, &instView)
120129

121130
}
122131
}(newGoalStates[idx])
@@ -130,7 +139,7 @@ func processImmediateRunCommandGoalStates(ctx *log.Context, communicator hostgac
130139
if len(skippedGoalStates) > 0 {
131140
ctx.Log("message", fmt.Sprintf("skipped %v goal states due to reaching the maximum concurrent tasks", len(skippedGoalStates)))
132141
for _, skippedGoalState := range skippedGoalStates {
133-
statusKey := types.GoalStateKey{ExtensionName: *skippedGoalState.ExtensionName, SeqNumber: *skippedGoalState.SeqNo}
142+
statusKey := types.GoalStateKey{ExtensionName: *skippedGoalState.ExtensionName, SeqNumber: *skippedGoalState.SeqNo, RuntimeSettingsState: *skippedGoalState.ExtensionState}
134143
notifier := &observer.Notifier{}
135144
notifier.Register(&goalStateEventObserver)
136145

@@ -165,7 +174,7 @@ func getGoalStatesToProcess(goalStates []hostgacommunicator.ImmediateExtensionGo
165174

166175
if validSignature {
167176
for _, s := range el.Settings {
168-
statusKey := types.GoalStateKey{ExtensionName: *s.ExtensionName, SeqNumber: *s.SeqNo}
177+
statusKey := types.GoalStateKey{ExtensionName: *s.ExtensionName, SeqNumber: *s.SeqNo, RuntimeSettingsState: *s.ExtensionState}
169178
_, goalStateAlreadyProcessed := goalStateEventObserver.GetStatusForKey(statusKey)
170179
if !goalStateAlreadyProcessed {
171180
if len(newGoalStates) < maxTasksToFetch {

internal/observer/observer.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,6 @@ import "github.com/Azure/run-command-handler-linux/internal/types"
77
type Observer interface {
88
// OnNotify is called when the status changes
99
OnNotify(types.StatusEventArgs) error
10+
// OnDemandNotify is called when the observer needs to report the status immediately
11+
OnDemandNotify() error
1012
}

0 commit comments

Comments
 (0)