Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 211 additions & 0 deletions internal/service/bedrockagentcore/agent_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import (
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/bedrockagentcorecontrol"
awstypes "github.com/aws/aws-sdk-go-v2/service/bedrockagentcorecontrol/types"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
logstypes "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"
"github.com/aws/aws-sdk-go-v2/service/xray"
xraytypes "github.com/aws/aws-sdk-go-v2/service/xray/types"
"github.com/hashicorp/aws-sdk-go-base/v2/tfawserr"
"github.com/hashicorp/terraform-plugin-framework-timeouts/resource/timeouts"
"github.com/hashicorp/terraform-plugin-framework-validators/listvalidator"
Expand Down Expand Up @@ -216,6 +220,19 @@ func (r *agentRuntimeResource) Schema(ctx context.Context, request resource.Sche
},
},
},
"observability": schema.ListNestedBlock{
CustomType: fwtypes.NewListNestedObjectTypeOf[observabilityConfigurationModel](ctx),
Validators: []validator.List{
listvalidator.SizeAtMost(1),
},
NestedObject: schema.NestedBlockObject{
Attributes: map[string]schema.Attribute{
"enabled": schema.BoolAttribute{
Required: true,
},
},
},
},
names.AttrTimeouts: timeouts.Block(ctx, timeouts.Opts{
Create: true,
Update: true,
Expand Down Expand Up @@ -283,11 +300,45 @@ func (r *agentRuntimeResource) Create(ctx context.Context, request resource.Crea
return
}

if !data.Observability.IsNull() {
obsConfig, d := data.Observability.ToPtr(ctx)
smerr.EnrichAppend(ctx, &response.Diagnostics, d)
if response.Diagnostics.HasError() {
return
}

if obsConfig != nil && obsConfig.Enabled.ValueBool() {

if err := r.ensureXRayResourcePolicy(ctx); err != nil {
smerr.AddError(ctx, &response.Diagnostics, err, smerr.ID, agentRuntimeID)
return
}

if err := r.waitForXRayResourcePolicy(ctx, propagationTimeout); err != nil {
smerr.AddError(ctx, &response.Diagnostics, fmt.Errorf("waiting for X-Ray resource policy: %w", err), smerr.ID, agentRuntimeID)
return
}

if err := configureObservability(ctx, conn, r.Meta().XRayClient(ctx), runtime, data.EnvironmentVariables); err != nil {
smerr.AddError(ctx, &response.Diagnostics, err, smerr.ID, agentRuntimeID)
return
}

runtime, err = findAgentRuntimeByID(ctx, conn, agentRuntimeID)
if err != nil {
smerr.AddError(ctx, &response.Diagnostics, err, smerr.ID, agentRuntimeID)
return
}
}
}

// Set values for unknowns.
userEnvVars := data.EnvironmentVariables
smerr.EnrichAppend(ctx, &response.Diagnostics, fwflex.Flatten(ctx, runtime, &data, fwflex.WithFieldNamePrefix("AgentRuntime")))
if response.Diagnostics.HasError() {
return
}
data.EnvironmentVariables = userEnvVars

smerr.EnrichAppend(ctx, &response.Diagnostics, response.State.Set(ctx, data))
}
Expand All @@ -313,11 +364,20 @@ func (r *agentRuntimeResource) Read(ctx context.Context, request resource.ReadRe
return
}

userEnvVars := data.EnvironmentVariables
smerr.EnrichAppend(ctx, &response.Diagnostics, fwflex.Flatten(ctx, out, &data, fwflex.WithFieldNamePrefix("AgentRuntime")))
if response.Diagnostics.HasError() {
return
}

if !data.Observability.IsNull() {
obsConfig, d := data.Observability.ToPtr(ctx)
smerr.EnrichAppend(ctx, &response.Diagnostics, d)
if !response.Diagnostics.HasError() && obsConfig != nil && obsConfig.Enabled.ValueBool() {
data.EnvironmentVariables = userEnvVars
}
}

smerr.EnrichAppend(ctx, &response.Diagnostics, response.State.Set(ctx, &data))
}

Expand Down Expand Up @@ -516,12 +576,17 @@ type agentRuntimeResourceModel struct {
TagsAll tftags.Map `tfsdk:"tags_all"`
Timeouts timeouts.Value `tfsdk:"timeouts"`
WorkloadIdentityDetails fwtypes.ListNestedObjectValueOf[workloadIdentityDetailsModel] `tfsdk:"workload_identity_details"`
Observability fwtypes.ListNestedObjectValueOf[observabilityConfigurationModel] `tfsdk:"observability"`
}

type agentRuntimeArtifactModel struct {
ContainerConfiguration fwtypes.ListNestedObjectValueOf[containerConfigurationModel] `tfsdk:"container_configuration"`
}

type observabilityConfigurationModel struct {
Enabled types.Bool `tfsdk:"enabled"`
}

var (
_ fwflex.Expander = agentRuntimeArtifactModel{}
_ fwflex.Flattener = &agentRuntimeArtifactModel{}
Expand Down Expand Up @@ -679,6 +744,152 @@ func (m requestHeaderConfigurationModel) Expand(ctx context.Context) (any, diag.
return nil, diags
}

func configureObservability(ctx context.Context, conn *bedrockagentcorecontrol.Client, xrayConn *xray.Client, runtime *bedrockagentcorecontrol.GetAgentRuntimeOutput, existingEnvVars fwtypes.MapOfString) error {
runtimeID := aws.ToString(runtime.AgentRuntimeId)
runtimeName := aws.ToString(runtime.AgentRuntimeName)
logGroup := fmt.Sprintf("/aws/bedrock-agentcore/runtimes/%s", runtimeID)

obsEnvVars := map[string]string{
"AGENT_OBSERVABILITY_ENABLED": "true",
"OTEL_PYTHON_DISTRO": "aws_distro",
"OTEL_PYTHON_CONFIGURATOR": "aws_configurator",
"OTEL_RESOURCE_ATTRIBUTES": fmt.Sprintf("service.name=%s,aws.log.group.names=%s", runtimeName, logGroup),
"OTEL_EXPORTER_OTLP_LOGS_HEADERS": fmt.Sprintf("x-aws-log-group=%s,x-aws-log-stream=runtime-logs,x-aws-metric-namespace=bedrock-agentcore", logGroup),
"OTEL_EXPORTER_OTLP_PROTOCOL": "http/protobuf",
"OTEL_TRACES_EXPORTER": "otlp",
}

mergedEnvVars := make(map[string]string)

if !existingEnvVars.IsNull() {
for k, v := range existingEnvVars.Elements() {
if strVal, ok := v.(types.String); ok {
mergedEnvVars[k] = strVal.ValueString()
}
}
}

for k, v := range obsEnvVars {
mergedEnvVars[k] = v
}

updateInput := &bedrockagentcorecontrol.UpdateAgentRuntimeInput{
AgentRuntimeId: aws.String(runtimeID),
AgentRuntimeArtifact: runtime.AgentRuntimeArtifact,
RoleArn: runtime.RoleArn,
NetworkConfiguration: runtime.NetworkConfiguration,
EnvironmentVariables: mergedEnvVars,
AuthorizerConfiguration: runtime.AuthorizerConfiguration,
Description: runtime.Description,
ProtocolConfiguration: runtime.ProtocolConfiguration,
}

if _, err := conn.UpdateAgentRuntime(ctx, updateInput); err != nil {
return fmt.Errorf("updating runtime with observability env vars: %w", err)
}

if err := configureXRayForObservability(ctx, xrayConn, runtimeID); err != nil {
return fmt.Errorf("configuring XRay: %w", err)
}

return nil
}

func configureXRayForObservability(ctx context.Context, xrayConn *xray.Client, runtimeName string) error {
getOutput, err := xrayConn.GetTraceSegmentDestination(ctx, &xray.GetTraceSegmentDestinationInput{})
if err != nil {
return fmt.Errorf("getting trace segment destination: %w", err)
}

if getOutput.Destination != xraytypes.TraceSegmentDestinationCloudWatchLogs {
_, err = xrayConn.UpdateTraceSegmentDestination(ctx, &xray.UpdateTraceSegmentDestinationInput{
Destination: xraytypes.TraceSegmentDestinationCloudWatchLogs,
})
if err != nil {
return fmt.Errorf("updating trace segment destination: %w", err)
}
}

return nil
}

const xrayResourcePolicyName = "BedrockAgentXRayPolicy"

func (r *agentRuntimeResource) ensureXRayResourcePolicy(ctx context.Context) error {
meta := r.Meta()

logsconn := meta.LogsClient(ctx)
region := meta.Region(ctx)
accountID := meta.AccountID(ctx)

policyDocument := fmt.Sprintf(`{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "TransactionSearchXRayAccess",
"Effect": "Allow",
"Principal": {
"Service": "xray.amazonaws.com"
},
"Action": "logs:PutLogEvents",
"Resource": [
"arn:aws:logs:%[1]s:%[2]s:log-group:aws/spans:*",
"arn:aws:logs:%[1]s:%[2]s:log-group:/aws/application-signals/data:*"
],
"Condition": {
"ArnLike": {
"aws:SourceArn": "arn:aws:xray:%[1]s:%[2]s:*"
},
"StringEquals": {
"aws:SourceAccount": "%[2]s"
}
}
}
]
}`, region, accountID)

_, err := logsconn.PutResourcePolicy(ctx,
&cloudwatchlogs.PutResourcePolicyInput{
PolicyName: aws.String(xrayResourcePolicyName),
PolicyDocument: aws.String(policyDocument),
})

if errs.IsA[*logstypes.ResourceAlreadyExistsException](err) {
return nil
}

if err != nil {
return fmt.Errorf("error putting CloudWatch Logs resource policy for X-Ray: %w", err)
}

return nil
}

func (r *agentRuntimeResource) waitForXRayResourcePolicy(ctx context.Context, timeout time.Duration) error {
stateConf := &sdkretry.StateChangeConf{
Pending: []string{"notfound"},
Target: []string{"found"},
Timeout: timeout,
Refresh: func() (interface{}, string, error) {
logsconn := r.Meta().LogsClient(ctx)
out, err := logsconn.DescribeResourcePolicies(ctx, &cloudwatchlogs.DescribeResourcePoliciesInput{
Limit: aws.Int32(50),
})
if err != nil {
return nil, "", err
}
for _, policy := range out.ResourcePolicies {
if aws.ToString(policy.PolicyName) == xrayResourcePolicyName {
return policy, "found", nil
}
}
return nil, "notfound", nil
},
}
_, err := stateConf.WaitForStateContext(ctx)
return err
}

type workloadIdentityDetailsModel struct {
WorkloadIdentityARN types.String `tfsdk:"workload_identity_arn"`
}
Loading