Skip to content

Add SecretsManager and StepFunctions support. #203

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 37 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -3,7 +3,9 @@
# The packages are installed in the `/autoinstrumentation` directory. This is required as when instrumenting the pod by CWOperator,
# one init container will be created to copy all the content in `/autoinstrumentation` directory to app's container. Then
# update the `PYTHONPATH` environment variable accordingly. Then in the second stage, copy the directory to `/autoinstrumentation`.
FROM python:3.11 AS build

# Stage 1: Install ADOT Python in the /operator-build folder
FROM public.ecr.aws/docker/library/python:3.11 AS build

WORKDIR /operator-build

@@ -18,11 +20,42 @@ RUN sed -i "/opentelemetry-exporter-otlp-proto-grpc/d" ./aws-opentelemetry-distr

RUN mkdir workspace && pip install --target workspace ./aws-opentelemetry-distro

FROM public.ecr.aws/amazonlinux/amazonlinux:minimal
# Stage 2: Build the cp-utility binary
FROM public.ecr.aws/docker/library/rust:1.75 as builder

WORKDIR /usr/src/cp-utility
COPY ./tools/cp-utility .

## TARGETARCH is defined by buildx
# https://docs.docker.com/engine/reference/builder/#automatic-platform-args-in-the-global-scope
ARG TARGETARCH

# Run validations and audit only on amd64 because it is faster and those two steps
# are only used to validate the source code and don't require anything that is
# architecture specific.

# Validations
# Validate formatting
RUN if [ $TARGETARCH = "amd64" ]; then rustup component add rustfmt && cargo fmt --check ; fi

# Audit dependencies
RUN if [ $TARGETARCH = "amd64" ]; then cargo install cargo-audit && cargo audit ; fi


# Cross-compile based on the target platform.
RUN if [ $TARGETARCH = "amd64" ]; then export ARCH="x86_64" ; \
elif [ $TARGETARCH = "arm64" ]; then export ARCH="aarch64" ; \
else false; \
fi \
&& rustup target add ${ARCH}-unknown-linux-musl \
&& cargo test --target ${ARCH}-unknown-linux-musl \
&& cargo install --target ${ARCH}-unknown-linux-musl --path . --root .

# Stage 3: Build the distribution image by copying the THIRD-PARTY-LICENSES, the custom built cp command from stage 2, and the installed ADOT Python from stage 1 to their respective destinations
FROM scratch

# Required to copy attribute files to distributed docker images
ADD THIRD-PARTY-LICENSES ./THIRD-PARTY-LICENSES

COPY --from=builder /usr/src/cp-utility/bin/cp-utility /bin/cp
COPY --from=build /operator-build/workspace /autoinstrumentation

RUN chmod -R go+r /autoinstrumentation
Original file line number Diff line number Diff line change
@@ -16,3 +16,6 @@
AWS_QUEUE_URL: str = "aws.sqs.queue_url"
AWS_QUEUE_NAME: str = "aws.sqs.queue_name"
AWS_STREAM_NAME: str = "aws.kinesis.stream_name"
AWS_SECRET_ARN: str = "aws.secretsmanager.secret_arn"
AWS_STATE_MACHINE_ARN: str = "aws.stepfunctions.state_machine_arn"
AWS_ACTIVITY_ARN: str = "aws.stepfunctions.activity_arn"
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@
from urllib.parse import ParseResult, urlparse

from amazon.opentelemetry.distro._aws_attribute_keys import (
AWS_ACTIVITY_ARN,
AWS_LOCAL_OPERATION,
AWS_LOCAL_SERVICE,
AWS_QUEUE_NAME,
@@ -14,7 +15,9 @@
AWS_REMOTE_RESOURCE_IDENTIFIER,
AWS_REMOTE_RESOURCE_TYPE,
AWS_REMOTE_SERVICE,
AWS_SECRET_ARN,
AWS_SPAN_KIND,
AWS_STATE_MACHINE_ARN,
AWS_STREAM_NAME,
)
from amazon.opentelemetry.distro._aws_span_processing_util import (
@@ -78,6 +81,8 @@
_NORMALIZED_KINESIS_SERVICE_NAME: str = "AWS::Kinesis"
_NORMALIZED_S3_SERVICE_NAME: str = "AWS::S3"
_NORMALIZED_SQS_SERVICE_NAME: str = "AWS::SQS"
_NORMALIZED_SECRETSMANAGER_SERVICE_NAME: str = "AWS::SecretsManager"
_NORMALIZED_STEPFUNCTIONS_SERVICE_NAME: str = "AWS::StepFunctions"
_DB_CONNECTION_STRING_TYPE: str = "DB::Connection"

# Special DEPENDENCY attribute value if GRAPHQL_OPERATION_TYPE attribute key is present.
@@ -290,7 +295,11 @@ def _normalize_remote_service_name(span: ReadableSpan, service_name: str) -> str
resource format</a> as much as possible. Long term, we would like to normalize service name in the upstream.
"""
if is_aws_sdk_span(span):
return "AWS::" + service_name
aws_sdk_service_mapping = {
"Secrets Manager": _NORMALIZED_SECRETSMANAGER_SERVICE_NAME,
"SFN": _NORMALIZED_STEPFUNCTIONS_SERVICE_NAME,
}
return aws_sdk_service_mapping.get(service_name, "AWS::" + service_name)
return service_name


@@ -372,6 +381,15 @@ def _set_remote_type_and_identifier(span: ReadableSpan, attributes: BoundedAttri
remote_resource_identifier = _escape_delimiters(
SqsUrlParser.get_queue_name(span.attributes.get(AWS_QUEUE_URL))
)
elif is_key_present(span, AWS_SECRET_ARN):
remote_resource_type = _NORMALIZED_SECRETSMANAGER_SERVICE_NAME + "::Secret"
remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_SECRET_ARN))
elif is_key_present(span, AWS_STATE_MACHINE_ARN):
remote_resource_type = _NORMALIZED_STEPFUNCTIONS_SERVICE_NAME + "::StateMachine"
remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_STATE_MACHINE_ARN))
elif is_key_present(span, AWS_ACTIVITY_ARN):
remote_resource_type = _NORMALIZED_STEPFUNCTIONS_SERVICE_NAME + "::Activity"
remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_ACTIVITY_ARN))
elif is_db_span(span):
remote_resource_type = _DB_CONNECTION_STRING_TYPE
remote_resource_identifier = _get_db_connection(span)
Original file line number Diff line number Diff line change
@@ -5,18 +5,21 @@

from opentelemetry.instrumentation.botocore.extensions import _KNOWN_EXTENSIONS
from opentelemetry.instrumentation.botocore.extensions.sqs import _SqsExtension
from opentelemetry.instrumentation.botocore.extensions.types import _AttributeMapT, _AwsSdkExtension
from opentelemetry.instrumentation.botocore.extensions.types import _AttributeMapT, _AwsSdkExtension, _BotoResultT
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace.span import Span


def _apply_botocore_instrumentation_patches() -> None:
"""Botocore instrumentation patches

Adds patches to provide additional support and Java parity for Kinesis, S3, and SQS.
Adds patches to provide additional support and Java parity for Kinesis, S3, SQS, SecretsManager and StepFunctions.
"""
_apply_botocore_kinesis_patch()
_apply_botocore_s3_patch()
_apply_botocore_sqs_patch()
_apply_botocore_secretsmanager_patch()
_apply_botocore_stepfunctions_patch()


def _apply_botocore_kinesis_patch() -> None:
@@ -65,6 +68,26 @@ def patch_extract_attributes(self, attributes: _AttributeMapT):
_SqsExtension.extract_attributes = patch_extract_attributes


def _apply_botocore_secretsmanager_patch() -> None:
"""Botocore instrumentation patch for SecretsManager

This patch adds an extension to the upstream's list of known extension for SecretsManager.
Extensions allow for custom logic for adding service-specific information to spans,
such as attributes. Specifically, we are adding logic to add the AWS_SECRET_ARN attribute.
"""
_KNOWN_EXTENSIONS["secretsmanager"] = _lazy_load(".", "_SecretsManagerExtension")


def _apply_botocore_stepfunctions_patch() -> None:
"""Botocore instrumentation patch for StepFunctions

This patch adds an extension to the upstream's list of known extension for StepFunctions.
Extensions allow for custom logic for adding service-specific information to spans,
such as attributes. Specifically, we are adding logic to add the AWS_STATE_MACHINE_ARN attribute.
"""
_KNOWN_EXTENSIONS["stepfunctions"] = _lazy_load(".", "_StepFunctionsExtension")


# The OpenTelemetry Authors code
def _lazy_load(module, cls):
"""Clone of upstream opentelemetry.instrumentation.botocore.extensions.lazy_load
@@ -94,3 +117,48 @@ def extract_attributes(self, attributes: _AttributeMapT):
stream_name = self._call_context.params.get("StreamName")
if stream_name:
attributes["aws.kinesis.stream_name"] = stream_name


class _SecretsManagerExtension(_AwsSdkExtension):
def extract_attributes(self, attributes: _AttributeMapT):
"""
SecretId can be secret name or secret arn, the function extracts attributes only if the SecretId parameter
is provided as arn which starts with 'arn:aws:secretsmanager:'.
"""
secret_id = self._call_context.params.get("SecretId")
if secret_id and secret_id.startswith("arn:aws:secretsmanager:"):
attributes["aws.secretsmanager.secret_arn"] = secret_id

# pylint: disable=no-self-use
def on_success(self, span: Span, result: _BotoResultT):
secret_arn = result.get("ARN")
if secret_arn:
span.set_attribute(
"aws.secretsmanager.secret_arn",
secret_arn,
)


class _StepFunctionsExtension(_AwsSdkExtension):
def extract_attributes(self, attributes: _AttributeMapT):
state_machine_arn = self._call_context.params.get("stateMachineArn")
if state_machine_arn:
attributes["aws.stepfunctions.state_machine_arn"] = state_machine_arn
activity_arn = self._call_context.params.get("activityArn")
if activity_arn:
attributes["aws.stepfunctions.activity_arn"] = activity_arn

# pylint: disable=no-self-use
def on_success(self, span: Span, result: _BotoResultT):
state_machine_arn = result.get("stateMachineArn")
if state_machine_arn:
span.set_attribute(
"aws.stepfunctions.state_machine_arn",
state_machine_arn,
)
activity_arn = result.get("activityArn")
if activity_arn:
span.set_attribute(
"aws.stepfunctions.activity_arn",
activity_arn,
)
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@
from unittest.mock import MagicMock

from amazon.opentelemetry.distro._aws_attribute_keys import (
AWS_ACTIVITY_ARN,
AWS_CONSUMER_PARENT_SPAN_KIND,
AWS_LOCAL_OPERATION,
AWS_LOCAL_SERVICE,
@@ -17,7 +18,9 @@
AWS_REMOTE_RESOURCE_IDENTIFIER,
AWS_REMOTE_RESOURCE_TYPE,
AWS_REMOTE_SERVICE,
AWS_SECRET_ARN,
AWS_SPAN_KIND,
AWS_STATE_MACHINE_ARN,
AWS_STREAM_NAME,
)
from amazon.opentelemetry.distro._aws_metric_attribute_generator import _AwsMetricAttributeGenerator
@@ -821,6 +824,8 @@ def test_normalize_remote_service_name_aws_sdk(self):
self.validate_aws_sdk_service_normalization("Kinesis", "AWS::Kinesis")
self.validate_aws_sdk_service_normalization("S3", "AWS::S3")
self.validate_aws_sdk_service_normalization("SQS", "AWS::SQS")
self.validate_aws_sdk_service_normalization("Secrets Manager", "AWS::SecretsManager")
self.validate_aws_sdk_service_normalization("SFN", "AWS::StepFunctions")

def validate_aws_sdk_service_normalization(self, service_name: str, expected_remote_service: str):
self._mock_attribute([SpanAttributes.RPC_SYSTEM, SpanAttributes.RPC_SERVICE], ["aws-api", service_name])
@@ -977,6 +982,39 @@ def test_sdk_client_span_with_remote_resource_attributes(self):
self._validate_remote_resource_attributes("AWS::DynamoDB::Table", "aws_table^^name")
self._mock_attribute([SpanAttributes.AWS_DYNAMODB_TABLE_NAMES], [None])

# Validate behaviour of AWS_SECRET_ARN attribute, then remove it.
self._mock_attribute(
[AWS_SECRET_ARN], ["arn:aws:secretsmanager:us-east-1:123456789012:secret:secret_name-lERW9H"], keys, values
)
self._validate_remote_resource_attributes(
"AWS::SecretsManager::Secret", "arn:aws:secretsmanager:us-east-1:123456789012:secret:secret_name-lERW9H"
)
self._mock_attribute([AWS_SECRET_ARN], [None])

# Validate behaviour of AWS_STATE_MACHINE_ARN attribute, then remove it.
self._mock_attribute(
[AWS_STATE_MACHINE_ARN],
["arn:aws:states:us-east-1:123456789012:stateMachine:test_state_machine"],
keys,
values,
)
self._validate_remote_resource_attributes(
"AWS::StepFunctions::StateMachine", "arn:aws:states:us-east-1:123456789012:stateMachine:test_state_machine"
)
self._mock_attribute([AWS_STATE_MACHINE_ARN], [None])

# Validate behaviour of AWS_ACTIVITY_ARN attribute, then remove it.
self._mock_attribute(
[AWS_ACTIVITY_ARN],
["arn:aws:states:us-east-1:007003123456789012:activity:testActivity"],
keys,
values,
)
self._validate_remote_resource_attributes(
"AWS::StepFunctions::Activity", "arn:aws:states:us-east-1:007003123456789012:activity:testActivity"
)
self._mock_attribute([AWS_ACTIVITY_ARN], [None])

self._mock_attribute([SpanAttributes.RPC_SYSTEM], [None])

def test_client_db_span_with_remote_resource_attributes(self):
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
from typing import Dict
from typing import Any, Dict
from unittest import TestCase
from unittest.mock import MagicMock, patch

@@ -9,11 +9,15 @@
from amazon.opentelemetry.distro.patches._instrumentation_patch import apply_instrumentation_patches
from opentelemetry.instrumentation.botocore.extensions import _KNOWN_EXTENSIONS
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace.span import Span

_STREAM_NAME: str = "streamName"
_BUCKET_NAME: str = "bucketName"
_QUEUE_NAME: str = "queueName"
_QUEUE_URL: str = "queueUrl"
_SECRET_ARN: str = "arn:aws:secretsmanager:us-west-2:000000000000:secret:testSecret-ABCDEF"
_STATE_MACHINE_ARN: str = "arn:aws:states:us-west-2:000000000000:stateMachine:testStateMachine"
_ACTIVITY_ARN: str = "arn:aws:states:us-east-1:007003123456789012:activity:testActivity"

# Patch names
GET_DISTRIBUTION_PATCH: str = (
@@ -93,6 +97,12 @@ def _test_unpatched_botocore_instrumentation(self):
self.assertFalse("aws.sqs.queue_url" in attributes)
self.assertFalse("aws.sqs.queue_name" in attributes)

# SecretsManager
self.assertFalse("secretsmanager" in _KNOWN_EXTENSIONS, "Upstream has added a SecretsManager extension")

# StepFunctions
self.assertFalse("stepfunctions" in _KNOWN_EXTENSIONS, "Upstream has added a StepFunctions extension")

def _test_patched_botocore_instrumentation(self):
# Kinesis
self.assertTrue("kinesis" in _KNOWN_EXTENSIONS)
@@ -115,6 +125,28 @@ def _test_patched_botocore_instrumentation(self):
self.assertTrue("aws.sqs.queue_name" in sqs_attributes)
self.assertEqual(sqs_attributes["aws.sqs.queue_name"], _QUEUE_NAME)

# SecretsManager
self.assertTrue("secretsmanager" in _KNOWN_EXTENSIONS)
secretsmanager_attributes: Dict[str, str] = _do_extract_secretsmanager_attributes()
self.assertTrue("aws.secretsmanager.secret_arn" in secretsmanager_attributes)
self.assertEqual(secretsmanager_attributes["aws.secretsmanager.secret_arn"], _SECRET_ARN)
secretsmanager_sucess_attributes: Dict[str, str] = _do_secretsmanager_on_success()
self.assertTrue("aws.secretsmanager.secret_arn" in secretsmanager_sucess_attributes)
self.assertEqual(secretsmanager_sucess_attributes["aws.secretsmanager.secret_arn"], _SECRET_ARN)

# StepFunctions
self.assertTrue("stepfunctions" in _KNOWN_EXTENSIONS)
stepfunctions_attributes: Dict[str, str] = _do_extract_stepfunctions_attributes()
self.assertTrue("aws.stepfunctions.state_machine_arn" in stepfunctions_attributes)
self.assertEqual(stepfunctions_attributes["aws.stepfunctions.state_machine_arn"], _STATE_MACHINE_ARN)
self.assertTrue("aws.stepfunctions.activity_arn" in stepfunctions_attributes)
self.assertEqual(stepfunctions_attributes["aws.stepfunctions.activity_arn"], _ACTIVITY_ARN)
stepfunctions_sucess_attributes: Dict[str, str] = _do_stepfunctions_on_success()
self.assertTrue("aws.stepfunctions.state_machine_arn" in stepfunctions_sucess_attributes)
self.assertEqual(stepfunctions_sucess_attributes["aws.stepfunctions.state_machine_arn"], _STATE_MACHINE_ARN)
self.assertTrue("aws.stepfunctions.activity_arn" in stepfunctions_sucess_attributes)
self.assertEqual(stepfunctions_sucess_attributes["aws.stepfunctions.activity_arn"], _ACTIVITY_ARN)

def _test_botocore_installed_flag(self):
with patch(
"amazon.opentelemetry.distro.patches._botocore_patches._apply_botocore_instrumentation_patches"
@@ -156,10 +188,48 @@ def _do_extract_sqs_attributes() -> Dict[str, str]:
return _do_extract_attributes(service_name, params)


def _do_extract_secretsmanager_attributes() -> Dict[str, str]:
service_name: str = "secretsmanager"
params: Dict[str, str] = {"SecretId": _SECRET_ARN}
return _do_extract_attributes(service_name, params)


def _do_secretsmanager_on_success() -> Dict[str, str]:
service_name: str = "secretsmanager"
result: Dict[str, Any] = {"ARN": _SECRET_ARN}
return _do_on_success(service_name, result)


def _do_extract_stepfunctions_attributes() -> Dict[str, str]:
service_name: str = "stepfunctions"
params: Dict[str, str] = {"stateMachineArn": _STATE_MACHINE_ARN, "activityArn": _ACTIVITY_ARN}
return _do_extract_attributes(service_name, params)


def _do_stepfunctions_on_success() -> Dict[str, str]:
service_name: str = "stepfunctions"
result: Dict[str, Any] = {"stateMachineArn": _STATE_MACHINE_ARN, "activityArn": _ACTIVITY_ARN}
return _do_on_success(service_name, result)


def _do_extract_attributes(service_name: str, params: Dict[str, str]) -> Dict[str, str]:
mock_call_context: MagicMock = MagicMock()
mock_call_context.params = params
attributes: Dict[str, str] = {}
sqs_extension = _KNOWN_EXTENSIONS[service_name]()(mock_call_context)
sqs_extension.extract_attributes(attributes)
return attributes


def _do_on_success(service_name: str, result: Dict[str, Any]) -> Dict[str, str]:
span_mock: Span = MagicMock()
span_attributes: Dict[str, str] = {}

def set_side_effect(set_key, set_value):
span_attributes[set_key] = set_value

span_mock.set_attribute.side_effect = set_side_effect
extension = _KNOWN_EXTENSIONS[service_name]()(span_mock)
extension.on_success(span_mock, result)

return span_attributes
185 changes: 169 additions & 16 deletions contract-tests/images/applications/botocore/botocore_server.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import atexit
import json
import os
import tempfile
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from threading import Thread
from typing import List

import boto3
import requests
from botocore.client import BaseClient
from botocore.config import Config
from botocore.exceptions import ClientError
from typing_extensions import Tuple, override

_PORT: int = 8080
@@ -41,6 +44,10 @@ def do_GET(self):
self._handle_sqs_request()
if self.in_path("kinesis"):
self._handle_kinesis_request()
if self.in_path("secretsmanager"):
self._handle_secretsmanager_request()
if self.in_path("stepfunctions"):
self._handle_stepsfunction_request()

self._end_request(self.main_status)

@@ -203,6 +210,71 @@ def _handle_kinesis_request(self) -> None:
else:
set_main_status(404)

def _handle_secretsmanager_request(self) -> None:
secretsmanager_client = boto3.client("secretsmanager", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION)
if self.in_path(_ERROR):
set_main_status(400)
try:
error_client = boto3.client("secretsmanager", endpoint_url=_ERROR_ENDPOINT, region_name=_AWS_REGION)
error_client.describe_secret(
SecretId="arn:aws:secretsmanager:us-west-2:000000000000:secret:unExistSecret"
)
except Exception as exception:
print("Expected exception occurred", exception)
elif self.in_path(_FAULT):
set_main_status(500)
try:
fault_client = boto3.client(
"secretsmanager", endpoint_url=_FAULT_ENDPOINT, region_name=_AWS_REGION, config=_NO_RETRY_CONFIG
)
fault_client.get_secret_value(
SecretId="arn:aws:secretsmanager:us-west-2:000000000000:secret:nonexistent-secret"
)
except Exception as exception:
print("Expected exception occurred", exception)
elif self.in_path("describesecret/my-secret"):
set_main_status(200)
secretsmanager_client.describe_secret(SecretId="testSecret")
else:
set_main_status(404)

def _handle_stepsfunction_request(self) -> None:
sfn_client = boto3.client("stepfunctions", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION)
if self.in_path(_ERROR):
set_main_status(400)
try:
error_client = boto3.client("stepfunctions", endpoint_url=_ERROR_ENDPOINT, region_name=_AWS_REGION)
error_client.describe_state_machine(
stateMachineArn="arn:aws:states:us-west-2:000000000000:stateMachine:unExistStateMachine"
)
except Exception as exception:
print("Expected exception occurred", exception)
elif self.in_path(_FAULT):
set_main_status(500)
try:
fault_client = boto3.client(
"stepfunctions", endpoint_url=_FAULT_ENDPOINT, region_name=_AWS_REGION, config=_NO_RETRY_CONFIG
)
fault_client.meta.events.register(
"before-call.stepfunctions.ListStateMachineVersions",
lambda **kwargs: inject_500_error("ListStateMachineVersions", **kwargs),
)
fault_client.list_state_machine_versions(
stateMachineArn="arn:aws:states:us-west-2:000000000000:stateMachine:invalid-state-machine",
)
except Exception as exception:
print("Expected exception occurred", exception)
elif self.in_path("describestatemachine/my-state-machine"):
set_main_status(200)
sfn_client.describe_state_machine(
stateMachineArn="arn:aws:states:us-west-2:000000000000:stateMachine:testStateMachine"
)
elif self.in_path("describeactivity/my-activity"):
set_main_status(200)
sfn_client.describe_activity(activityArn="arn:aws:states:us-west-2:000000000000:activity:testActivity")
else:
set_main_status(404)

def _end_request(self, status_code: int):
self.send_response_only(status_code)
self.end_headers()
@@ -212,17 +284,24 @@ def set_main_status(status: int) -> None:
RequestHandler.main_status = status


# pylint: disable=too-many-locals, too-many-statements
def prepare_aws_server() -> None:
requests.Request(method="POST", url="http://localhost:4566/_localstack/state/reset")
try:
# Set up S3 so tests can access buckets and retrieve a file.
s3_client: BaseClient = boto3.client("s3", endpoint_url=_AWS_SDK_S3_ENDPOINT, region_name=_AWS_REGION)
s3_client.create_bucket(
Bucket="test-put-object-bucket-name", CreateBucketConfiguration={"LocationConstraint": _AWS_REGION}
)
s3_client.create_bucket(
Bucket="test-get-object-bucket-name", CreateBucketConfiguration={"LocationConstraint": _AWS_REGION}
)
bucket_names: List[str] = [bucket["Name"] for bucket in s3_client.list_buckets()["Buckets"]]
put_bucket_name: str = "test-put-object-bucket-name"
if put_bucket_name not in bucket_names:
s3_client.create_bucket(
Bucket=put_bucket_name, CreateBucketConfiguration={"LocationConstraint": _AWS_REGION}
)

get_bucket_name: str = "test-get-object-bucket-name"
if get_bucket_name not in bucket_names:
s3_client.create_bucket(
Bucket=get_bucket_name, CreateBucketConfiguration={"LocationConstraint": _AWS_REGION}
)
with tempfile.NamedTemporaryFile(delete=True) as temp_file:
temp_file_name: str = temp_file.name
temp_file.write(b"This is temp file for S3 upload")
@@ -231,26 +310,100 @@ def prepare_aws_server() -> None:

# Set up DDB so tests can access a table.
ddb_client: BaseClient = boto3.client("dynamodb", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION)
ddb_client.create_table(
TableName="put_test_table",
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
AttributeDefinitions=[
{"AttributeName": "id", "AttributeType": "S"},
],
BillingMode="PAY_PER_REQUEST",
)
table_names: List[str] = ddb_client.list_tables()["TableNames"]

table_name: str = "put_test_table"
if table_name not in table_names:
ddb_client.create_table(
TableName=table_name,
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)

# Set up SQS so tests can access a queue.
sqs_client: BaseClient = boto3.client("sqs", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION)
sqs_client.create_queue(QueueName="test_put_get_queue")
queue_name: str = "test_put_get_queue"
queues_response = sqs_client.list_queues(QueueNamePrefix=queue_name)
queues: List[str] = queues_response["QueueUrls"] if "QueueUrls" in queues_response else []
if not queues:
sqs_client.create_queue(QueueName=queue_name)

# Set up Kinesis so tests can access a stream.
kinesis_client: BaseClient = boto3.client("kinesis", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION)
kinesis_client.create_stream(StreamName="test_stream", ShardCount=1)
stream_name: str = "test_stream"
stream_response = kinesis_client.list_streams()
if not stream_response["StreamNames"]:
kinesis_client.create_stream(StreamName=stream_name, ShardCount=1)
kinesis_client.register_stream_consumer(
StreamARN="arn:aws:kinesis:us-west-2:000000000000:stream/" + stream_name, ConsumerName="test_consumer"
)

# Set up Secrets Manager so tests can access a stream.
secretsmanager_client = boto3.client("secretsmanager", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION)

secretsmanager_response = secretsmanager_client.list_secrets()
secret = next((s for s in secretsmanager_response["SecretList"] if s["Name"] == "testSecret"), None)
if not secret:
secretsmanager_client.create_secret(
Name="testSecret", SecretString="secretValue", Description="This is a test secret"
)

# Set up IAM and create a role so StepFunctions use it to create a state machine.
iam_client = boto3.client("iam", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION)
role_name = "StepFunctionsExecutionTestRole"
iam_response = iam_client.list_roles()
role = next((r for r in iam_response["Roles"] if r["RoleName"] == role_name), None)
if not role:
assume_role_policy = {
"Version": "2012-10-17",
"Statement": [
{"Effect": "Allow", "Principal": {"Service": "states.amazonaws.com"}, "Action": "sts:AssumeRole"}
],
}
iam_client.create_role(RoleName=role_name, AssumeRolePolicyDocument=json.dumps(assume_role_policy))
iam_client.attach_role_policy(
RoleName=role_name, PolicyArn="arn:aws:iam::aws:policy/service-role/AWSLambdaRole"
)

# Set up StepFucntion so tests can access a state machine.
sfn_client = boto3.client("stepfunctions", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION)
state_machine_name = "testStateMachine"
state_machine_response = sfn_client.list_state_machines()
state_machine = next(
(st for st in state_machine_response["stateMachines"] if st["name"] == state_machine_name), None
)
if not state_machine:
definition = {
"Comment": "A simple AWS Step Functions state machine",
"StartAt": "SimpleState",
"States": {"SimpleState": {"Type": "Pass", "Result": "Hello, State Machine!", "End": True}},
}

sfn_client.create_state_machine(
name=state_machine_name,
definition=json.dumps(definition),
roleArn="arn:aws:iam::000000000000:role/StepFunctionsExecutionTestRole",
)
activity_response = sfn_client.list_activities()
activity = next((a for a in activity_response["activities"] if a["name"] == "testActivity"), None)
if not activity:
sfn_client.create_activity(name="testActivity")
# arn:aws:states:us-west-2:000000000000:stateMachine:testStateMachine
except Exception as exception:
print("Unexpected exception occurred", exception)


def inject_500_error(api_name, **kwargs):
raise ClientError(
{
"Error": {"Code": "InternalServerError", "Message": "Internal Server Error"},
"ResponseMetadata": {"HTTPStatusCode": 500, "RequestId": "mock-request-id"},
},
api_name,
)


def main() -> None:
prepare_aws_server()
server_address: Tuple[str, int] = ("0.0.0.0", _PORT)
17 changes: 17 additions & 0 deletions contract-tests/tests/test/amazon/base/contract_test_base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import re
import time
from logging import INFO, Logger, getLogger
from typing import Dict, List
@@ -168,6 +169,12 @@ def _assert_int_attribute(self, attributes_dict: Dict[str, AnyValue], key: str,
self.assertIsNotNone(actual_value)
self.assertEqual(expected_value, actual_value.int_value)

def _assert_match_attribute(self, attributes_dict: Dict[str, AnyValue], key: str, pattern: str) -> None:
self.assertIn(key, attributes_dict)
actual_value: AnyValue = attributes_dict[key]
self.assertIsNotNone(actual_value)
self.assertRegex(actual_value.string_value, pattern)

def check_sum(self, metric_name: str, actual_sum: float, expected_sum: float) -> None:
if metric_name is LATENCY_METRIC:
self.assertTrue(0 < actual_sum < expected_sum)
@@ -218,3 +225,13 @@ def _assert_metric_attributes(
self, resource_scope_metrics: List[ResourceScopeMetric], metric_name: str, expected_sum: int, **kwargs
):
self.fail("Tests must implement this function")

def _is_valid_regex(self, pattern: str) -> bool:
if not isinstance(pattern, str):
return False

try:
re.compile(pattern)
return True
except re.error:
return False
152 changes: 146 additions & 6 deletions contract-tests/tests/test/amazon/botocore/botocore_test.py
Original file line number Diff line number Diff line change
@@ -29,6 +29,9 @@
_AWS_QUEUE_URL: str = "aws.sqs.queue_url"
_AWS_QUEUE_NAME: str = "aws.sqs.queue_name"
_AWS_STREAM_NAME: str = "aws.kinesis.stream_name"
_AWS_SECRET_ARN: str = "aws.secretsmanager.secret_arn"
_AWS_STATE_MACHINE_ARN: str = "aws.stepfunctions.state_machine_arn"
_AWS_ACTIVITY_ARN: str = "aws.stepfunctions.activity_arn"


# pylint: disable=too-many-public-methods
@@ -372,6 +375,133 @@ def test_kinesis_fault(self):
span_name="Kinesis.PutRecord",
)

def test_secretsmanager_describe_secret(self):
self.do_test_requests(
"secretsmanager/describesecret/my-secret",
"GET",
200,
0,
0,
rpc_service="Secrets Manager",
remote_service="AWS::SecretsManager",
remote_operation="DescribeSecret",
remote_resource_type="AWS::SecretsManager::Secret",
remote_resource_identifier=r"arn:aws:secretsmanager:us-west-2:000000000000:"
r"secret:testSecret-[a-zA-Z0-9]{6}$",
request_specific_attributes={
_AWS_SECRET_ARN: r"arn:aws:secretsmanager:us-west-2:000000000000:" r"secret:testSecret-[a-zA-Z0-9]{6}$",
},
span_name="Secrets Manager.DescribeSecret",
)

def test_secretsmanager_error(self):
self.do_test_requests(
"secretsmanager/error",
"GET",
400,
1,
0,
rpc_service="Secrets Manager",
remote_service="AWS::SecretsManager",
remote_operation="DescribeSecret",
remote_resource_type="AWS::SecretsManager::Secret",
remote_resource_identifier="arn:aws:secretsmanager:us-west-2:000000000000:secret:unExistSecret",
request_specific_attributes={
_AWS_SECRET_ARN: "arn:aws:secretsmanager:us-west-2:000000000000:secret:unExistSecret",
},
span_name="Secrets Manager.DescribeSecret",
)

def test_secretsmanager_fault(self):
self.do_test_requests(
"secretsmanager/fault",
"GET",
500,
0,
1,
rpc_service="Secrets Manager",
remote_service="AWS::SecretsManager",
remote_operation="GetSecretValue",
remote_resource_type="AWS::SecretsManager::Secret",
remote_resource_identifier="arn:aws:secretsmanager:us-west-2:000000000000:secret:nonexistent-secret",
request_specific_attributes={
_AWS_SECRET_ARN: "arn:aws:secretsmanager:us-west-2:000000000000:secret:nonexistent-secret",
},
span_name="Secrets Manager.GetSecretValue",
)

def test_stepfunctions_describe_state_machine(self):
self.do_test_requests(
"stepfunctions/describestatemachine/my-state-machine",
"GET",
200,
0,
0,
rpc_service="SFN",
remote_service="AWS::StepFunctions",
remote_operation="DescribeStateMachine",
remote_resource_type="AWS::StepFunctions::StateMachine",
remote_resource_identifier="arn:aws:states:us-west-2:000000000000:stateMachine:testStateMachine",
request_specific_attributes={
_AWS_STATE_MACHINE_ARN: "arn:aws:states:us-west-2:000000000000:stateMachine:testStateMachine",
},
span_name="SFN.DescribeStateMachine",
)

def test_stepfunctions_activity(self):
self.do_test_requests(
"stepfunctions/describeactivity/my-activity",
"GET",
200,
0,
0,
rpc_service="SFN",
remote_service="AWS::StepFunctions",
remote_operation="DescribeActivity",
remote_resource_type="AWS::StepFunctions::Activity",
remote_resource_identifier="arn:aws:states:us-west-2:000000000000:activity:testActivity",
request_specific_attributes={
_AWS_ACTIVITY_ARN: "arn:aws:states:us-west-2:000000000000:activity:testActivity",
},
span_name="SFN.DescribeActivity",
)

def test_stepfunctions_error(self):
self.do_test_requests(
"stepfunctions/error",
"GET",
400,
1,
0,
rpc_service="SFN",
remote_service="AWS::StepFunctions",
remote_operation="DescribeStateMachine",
remote_resource_type="AWS::StepFunctions::StateMachine",
remote_resource_identifier="arn:aws:states:us-west-2:000000000000:stateMachine:unExistStateMachine",
request_specific_attributes={
_AWS_STATE_MACHINE_ARN: "arn:aws:states:us-west-2:000000000000:stateMachine:unExistStateMachine",
},
span_name="SFN.DescribeStateMachine",
)

def test_stepfunctions_fault(self):
self.do_test_requests(
"stepfunctions/fault",
"GET",
500,
0,
1,
rpc_service="SFN",
remote_service="AWS::StepFunctions",
remote_operation="ListStateMachineVersions",
remote_resource_type="AWS::StepFunctions::StateMachine",
remote_resource_identifier="arn:aws:states:us-west-2:000000000000:stateMachine:invalid-state-machine",
request_specific_attributes={
_AWS_STATE_MACHINE_ARN: "arn:aws:states:us-west-2:000000000000:stateMachine:invalid-state-machine",
},
span_name="SFN.ListStateMachineVersions",
)

@override
def _assert_aws_span_attributes(self, resource_scope_spans: List[ResourceScopeSpan], path: str, **kwargs) -> None:
target_spans: List[Span] = []
@@ -409,7 +539,12 @@ def _assert_aws_attributes(
if remote_resource_type != "None":
self._assert_str_attribute(attributes_dict, AWS_REMOTE_RESOURCE_TYPE, remote_resource_type)
if remote_resource_identifier != "None":
self._assert_str_attribute(attributes_dict, AWS_REMOTE_RESOURCE_IDENTIFIER, remote_resource_identifier)
if self._is_valid_regex(remote_resource_identifier):
self._assert_match_attribute(
attributes_dict, AWS_REMOTE_RESOURCE_IDENTIFIER, remote_resource_identifier
)
else:
self._assert_str_attribute(attributes_dict, AWS_REMOTE_RESOURCE_IDENTIFIER, remote_resource_identifier)
# See comment above AWS_LOCAL_OPERATION
self._assert_str_attribute(attributes_dict, AWS_SPAN_KIND, span_kind)

@@ -427,7 +562,7 @@ def _assert_semantic_conventions_span_attributes(
self.assertEqual(target_spans[0].name, kwargs.get("span_name"))
self._assert_semantic_conventions_attributes(
target_spans[0].attributes,
kwargs.get("remote_service"),
kwargs.get("rpc_service") if "rpc_service" in kwargs else kwargs.get("remote_service").split("::")[-1],
kwargs.get("remote_operation"),
status_code,
kwargs.get("request_specific_attributes", {}),
@@ -437,20 +572,22 @@ def _assert_semantic_conventions_span_attributes(
def _assert_semantic_conventions_attributes(
self,
attributes_list: List[KeyValue],
service: str,
rpc_service: str,
operation: str,
status_code: int,
request_specific_attributes: dict,
) -> None:
attributes_dict: Dict[str, AnyValue] = self._get_attributes_dict(attributes_list)
self._assert_str_attribute(attributes_dict, SpanAttributes.RPC_METHOD, operation)
self._assert_str_attribute(attributes_dict, SpanAttributes.RPC_SYSTEM, "aws-api")
self._assert_str_attribute(attributes_dict, SpanAttributes.RPC_SERVICE, service.split("::")[-1])
self._assert_str_attribute(attributes_dict, SpanAttributes.RPC_SERVICE, rpc_service)
self._assert_int_attribute(attributes_dict, SpanAttributes.HTTP_STATUS_CODE, status_code)
# TODO: botocore instrumentation is not respecting PEER_SERVICE
# self._assert_str_attribute(attributes_dict, SpanAttributes.PEER_SERVICE, "backend:8080")
for key, value in request_specific_attributes.items():
if isinstance(value, str):
if self._is_valid_regex(value):
self._assert_match_attribute(attributes_dict, key, value)
elif isinstance(value, str):
self._assert_str_attribute(attributes_dict, key, value)
elif isinstance(value, int):
self._assert_int_attribute(attributes_dict, key, value)
@@ -492,7 +629,10 @@ def _assert_metric_attributes(
if remote_resource_type != "None":
self._assert_str_attribute(attribute_dict, AWS_REMOTE_RESOURCE_TYPE, remote_resource_type)
if remote_resource_identifier != "None":
self._assert_str_attribute(attribute_dict, AWS_REMOTE_RESOURCE_IDENTIFIER, remote_resource_identifier)
if self._is_valid_regex(remote_resource_identifier):
self._assert_match_attribute(attribute_dict, AWS_REMOTE_RESOURCE_IDENTIFIER, remote_resource_identifier)
else:
self._assert_str_attribute(attribute_dict, AWS_REMOTE_RESOURCE_IDENTIFIER, remote_resource_identifier)
self.check_sum(metric_name, dependency_dp.sum, expected_sum)

attribute_dict: Dict[str, AnyValue] = self._get_attributes_dict(service_dp.attributes)