Skip to content

Commit

Permalink
Adding adaptive retry logic in RCI call for non-terminal errors
Browse files Browse the repository at this point in the history
  • Loading branch information
Tianze Shan committed Feb 11, 2025
1 parent 3aac51c commit 34b63af
Show file tree
Hide file tree
Showing 3 changed files with 226 additions and 4 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 61 additions & 2 deletions ecs-agent/api/ecs/client/ecs_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ const (
setInstanceIdRetryBackoffMax = 5 * time.Second
setInstanceIdRetryBackoffJitter = 0.2
setInstanceIdRetryBackoffMultiple = 2
// Below constants are used for RegisterContainerInstance retry with exponential backoff when receiving non-termianl errors.
// To ensure parity in all regions, and on all launch types, we should set any time limit on the RCI timeout.
// Thus, setting the max RCI retry timeout allowed to 1 hour, and capping max retry backoff at 192 seconds (3 * 2^6).
rciMaxRetryTimeAllowed = 1 * time.Hour
rciMinBackoff = 3 * time.Second
rciMaxBackoff = 192 * time.Second
rciRetryJitter = 0.2
rciRetryMultiple = 2.0
)

// ecsClient implements ECSClient interface.
Expand Down Expand Up @@ -173,7 +181,7 @@ func (client *ecsClient) RegisterContainerInstance(containerInstanceArn string,
defer client.configAccessor.UpdateCluster(clusterRef)
// Attempt to register without checking existence of the cluster so that we don't require
// excess permissions in the case where the cluster already exists and is active.
containerInstanceArn, availabilityzone, err := client.registerContainerInstance(clusterRef,
containerInstanceArn, availabilityzone, err := client.registerContainerInstanceWithRetry(clusterRef,
containerInstanceArn, attributes, tags, registrationToken, platformDevices, outpostARN)
if err == nil {
return containerInstanceArn, availabilityzone, nil
Expand All @@ -188,10 +196,49 @@ func (client *ecsClient) RegisterContainerInstance(containerInstanceArn string,
}
}
}
return client.registerContainerInstance(clusterRef, containerInstanceArn, attributes, tags, registrationToken,
return client.registerContainerInstanceWithRetry(clusterRef, containerInstanceArn, attributes, tags, registrationToken,
platformDevices, outpostARN)
}

// registerContainerInstanceWithRetry wraps around registerContainerInstance with exponential backoff retry implementation.
func (client *ecsClient) registerContainerInstanceWithRetry(clusterRef string, containerInstanceArn string,
attributes []*ecsmodel.Attribute, tags []*ecsmodel.Tag, registrationToken string,
platformDevices []*ecsmodel.PlatformDevice, outpostARN string) (string, string, error) {

var containerInstanceARN, availabilityZone string
var errFromRCI error
backoff := retry.NewExponentialBackoff(rciMinBackoff, rciMaxBackoff, rciRetryJitter, rciRetryMultiple)
ctx, cancel := context.WithTimeout(context.Background(), rciMaxRetryTimeAllowed)
defer cancel()
err := retry.RetryWithBackoffCtx(ctx, backoff,
func() error {
containerInstanceARN, availabilityZone, errFromRCI = client.registerContainerInstance(
clusterRef, containerInstanceArn, attributes, tags, registrationToken, platformDevices, outpostARN)
if errFromRCI != nil {
if !isTransientError(errFromRCI) {
logger.Error("Received terminal error from RegisterContainerInstance call, exiting", logger.Fields{
field.Error: errFromRCI,
})
// Mark the error as non-retriable, to stop the retry loop in RetryWithBackoffCtx.
return apierrors.NewRetriableError(apierrors.NewRetriable(false), errFromRCI)
} else {
logger.Error("Received non-terminal error from RegisterContainerInstance call, retrying with exponential backoff", logger.Fields{
field.Error: errFromRCI,
})
// Mark non-terminal errors as retriable, to continue the retry loop in RetryWithBackoffCtx.
return apierrors.NewRetriableError(apierrors.NewRetriable(true), errFromRCI)
}
}
return nil
})
if err != nil {
// return errFromRCI instead of err returned by the retry wrapper, as err wraps around the original error thrown by RCI.
// errFromRCI has implementation to mark the exit code terminal, so that systemd won't restart the agent binary.
return "", "", errFromRCI
}
return containerInstanceARN, availabilityZone, nil
}

func (client *ecsClient) registerContainerInstance(clusterRef string, containerInstanceArn string,
attributes []*ecsmodel.Attribute, tags []*ecsmodel.Tag, registrationToken string,
platformDevices []*ecsmodel.PlatformDevice, outpostARN string) (string, string, error) {
Expand Down Expand Up @@ -877,3 +924,15 @@ func trimString(inputString string, maxLen int) string {
return inputString
}
}

func isTransientError(err error) bool {
var awsErr awserr.Error
// Using errors.As to unwrap as opposed to errors.Is.
if errors.As(err, &awsErr) {
switch awsErr.Code() {
case ecsmodel.ErrCodeServerException, "ThrottlingException":
return true
}
}
return false
}
104 changes: 104 additions & 0 deletions ecs-agent/api/ecs/client/ecs_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,110 @@ func TestRegisterContainerInstance(t *testing.T) {
}
}

// TestRegisterContainerInstanceWithRetry tests the RegisterContainerInstanceWithRetry wrapper.
// The wrapper implements registerContainerInstance with an additional layer of retry with exponential backoff.
// RCI call has 4 failure types: ServerException, ClientException, InvalidParameterException, ThrottlingException.
// ServerException and ThrottlingException are considered transient, and should be retried upon receiving such failures.
// We have 2 subtests, one for happy path and one for unhappy path.
// For both tests, we will make 3 RCI calls, and the first 2 will fail with ThrottlingException and ServerException, respectively.
// In the happy test, the last RCI call will succeed, and we will examine the expected attributes are present.
// In the unhappy test, the last RCI call will fail with ClientException, and an appropriate error will be returned.
// For both test cases, the last RCI call should effectively terminate the retry loop.
func TestRegisterContainerInstanceWithRetryNonTerminalError(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockEC2Metadata := mock_ec2.NewMockEC2MetadataClient(ctrl)
additionalAttributes := map[string]string{"my_custom_attribute": "Custom_Value1",
"my_other_custom_attribute": "Custom_Value2",
}
cfgAccessorOverrideFunc := func(cfgAccessor *mock_config.MockAgentConfigAccessor) {
cfgAccessor.EXPECT().InstanceAttributes().Return(additionalAttributes).AnyTimes()
}
tester := setup(t, ctrl, mockEC2Metadata, cfgAccessorOverrideFunc)

fakeCapabilities := []string{"capability1", "capability2"}
expectedAttributes := map[string]string{
"ecs.os-type": tester.mockCfgAccessor.OSType(),
"ecs.os-family": tester.mockCfgAccessor.OSFamily(),
"my_custom_attribute": "Custom_Value1",
"my_other_custom_attribute": "Custom_Value2",
"ecs.availability-zone": availabilityZone,
"ecs.outpost-arn": outpostARN,
cpuArchAttrName: getCPUArch(),
}
capabilities := buildAttributeList(fakeCapabilities, nil)
platformDevices := []*ecsmodel.PlatformDevice{
{
Id: aws.String("id1"),
Type: aws.String(ecsmodel.PlatformDeviceTypeGpu),
},
{
Id: aws.String("id2"),
Type: aws.String(ecsmodel.PlatformDeviceTypeGpu),
},
{
Id: aws.String("id3"),
Type: aws.String(ecsmodel.PlatformDeviceTypeGpu),
},
}

testCases := []struct {
name string
finalRCICallResponse *ecsmodel.RegisterContainerInstanceOutput
finalRCICallError error
expectedContainerInstanceARN string
expectedAZ string
}{
{
name: "Happy Path",
finalRCICallResponse: &ecsmodel.RegisterContainerInstanceOutput{
ContainerInstance: &ecsmodel.ContainerInstance{
ContainerInstanceArn: aws.String(containerInstanceARN),
Attributes: buildAttributeList(fakeCapabilities, expectedAttributes),
},
},
finalRCICallError: nil,
expectedContainerInstanceARN: containerInstanceARN,
expectedAZ: availabilityZone,
},
{
name: "UnHappy Path",
finalRCICallResponse: nil,
finalRCICallError: awserr.New("ClientException", "", nil),
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
gomock.InOrder(
mockEC2Metadata.EXPECT().GetDynamicData(ec2.InstanceIdentityDocumentResource).
Return("instanceIdentityDocument", nil),
mockEC2Metadata.EXPECT().GetDynamicData(ec2.InstanceIdentityDocumentSignatureResource).
Return("signature", nil),
tester.mockStandardClient.EXPECT().RegisterContainerInstance(gomock.Any()).
Return(nil, awserr.New("ThrottlingException", "", nil)),
mockEC2Metadata.EXPECT().GetDynamicData(ec2.InstanceIdentityDocumentResource).
Return("instanceIdentityDocument", nil),
mockEC2Metadata.EXPECT().GetDynamicData(ec2.InstanceIdentityDocumentSignatureResource).
Return("signature", nil),
tester.mockStandardClient.EXPECT().RegisterContainerInstance(gomock.Any()).
Return(nil, awserr.New("ServerException", "", nil)),
mockEC2Metadata.EXPECT().GetDynamicData(ec2.InstanceIdentityDocumentResource).
Return("instanceIdentityDocument", nil),
mockEC2Metadata.EXPECT().GetDynamicData(ec2.InstanceIdentityDocumentSignatureResource).
Return("signature", nil),
tester.mockStandardClient.EXPECT().RegisterContainerInstance(gomock.Any()).
Return(tc.finalRCICallResponse, tc.finalRCICallError),
)
arn, availabilityzone, err := tester.client.RegisterContainerInstance("", capabilities,
containerInstanceTags, registrationToken, platformDevices, outpostARN)
assert.Equal(t, tc.finalRCICallError, err)
assert.Equal(t, tc.expectedContainerInstanceARN, arn)
assert.Equal(t, tc.expectedAZ, availabilityzone)
})
}
}

func TestReRegisterContainerInstance(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down

0 comments on commit 34b63af

Please sign in to comment.