diff --git a/Dockerfile b/Dockerfile
index e8aa35db1..23fc2c907 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -3,7 +3,9 @@
 #   The packages are installed in the `/autoinstrumentation` directory. This is required as when instrumenting the pod by CWOperator,
 #   one init container will be created to copy all the content in `/autoinstrumentation` directory to app's container. Then
 #   update the `PYTHONPATH` environment variable accordingly. Then in the second stage, copy the directory to `/autoinstrumentation`.
-FROM python:3.11 AS build
+
+# Stage 1: Install ADOT Python in the /operator-build folder
+FROM public.ecr.aws/docker/library/python:3.11 AS build
 
 WORKDIR /operator-build
 
@@ -18,11 +20,42 @@ RUN sed -i "/opentelemetry-exporter-otlp-proto-grpc/d" ./aws-opentelemetry-distr
 
 RUN mkdir workspace && pip install --target workspace ./aws-opentelemetry-distro
 
-FROM public.ecr.aws/amazonlinux/amazonlinux:minimal
+# Stage 2: Build the cp-utility binary
+FROM public.ecr.aws/docker/library/rust:1.75 as builder
+
+WORKDIR /usr/src/cp-utility
+COPY ./tools/cp-utility .
+
+## TARGETARCH is defined by buildx
+# https://docs.docker.com/engine/reference/builder/#automatic-platform-args-in-the-global-scope
+ARG TARGETARCH
+
+# Run validations and audit only on amd64 because it is faster and those two steps
+# are only used to validate the source code and don't require anything that is
+# architecture specific.
+
+# Validations
+# Validate formatting
+RUN if [ $TARGETARCH = "amd64" ]; then rustup component add rustfmt && cargo fmt --check ; fi
+
+# Audit dependencies
+RUN if [ $TARGETARCH = "amd64" ]; then cargo install cargo-audit && cargo audit ; fi
+
+
+# Cross-compile based on the target platform.
+RUN if [ $TARGETARCH = "amd64" ]; then export ARCH="x86_64" ; \
+    elif [ $TARGETARCH = "arm64" ]; then export ARCH="aarch64" ; \
+    else false; \
+    fi \
+    && rustup target add ${ARCH}-unknown-linux-musl \
+    && cargo test  --target ${ARCH}-unknown-linux-musl \
+    && cargo install --target ${ARCH}-unknown-linux-musl --path . --root .
+
+# Stage 3: Build the distribution image by copying the THIRD-PARTY-LICENSES, the custom built cp command from stage 2, and the installed ADOT Python from stage 1 to their respective destinations
+FROM scratch
 
 # Required to copy attribute files to distributed docker images
 ADD THIRD-PARTY-LICENSES ./THIRD-PARTY-LICENSES
 
+COPY --from=builder /usr/src/cp-utility/bin/cp-utility /bin/cp
 COPY --from=build /operator-build/workspace /autoinstrumentation
-
-RUN chmod -R go+r /autoinstrumentation
diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py
index f6498ac76..e3f711688 100644
--- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py
+++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py
@@ -16,3 +16,4 @@
 AWS_QUEUE_URL: str = "aws.sqs.queue_url"
 AWS_QUEUE_NAME: str = "aws.sqs.queue_name"
 AWS_STREAM_NAME: str = "aws.kinesis.stream_name"
+AWS_STREAM_CONSUMER_NAME: str = "aws.stream.consumer_name"
diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py
index 577d28f63..1e8f83338 100644
--- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py
+++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py
@@ -15,6 +15,7 @@
     AWS_REMOTE_RESOURCE_TYPE,
     AWS_REMOTE_SERVICE,
     AWS_SPAN_KIND,
+    AWS_STREAM_CONSUMER_NAME,
     AWS_STREAM_NAME,
 )
 from amazon.opentelemetry.distro._aws_span_processing_util import (
@@ -361,6 +362,9 @@ def _set_remote_type_and_identifier(span: ReadableSpan, attributes: BoundedAttri
         elif is_key_present(span, AWS_STREAM_NAME):
             remote_resource_type = _NORMALIZED_KINESIS_SERVICE_NAME + "::Stream"
             remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_STREAM_NAME))
+        elif is_key_present(span, AWS_STREAM_CONSUMER_NAME):
+            remote_resource_type = _NORMALIZED_KINESIS_SERVICE_NAME + "::StreamConsumer"
+            remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_STREAM_CONSUMER_NAME))
         elif is_key_present(span, _AWS_BUCKET_NAME):
             remote_resource_type = _NORMALIZED_S3_SERVICE_NAME + "::Bucket"
             remote_resource_identifier = _escape_delimiters(span.attributes.get(_AWS_BUCKET_NAME))
diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py
index cf73fb345..072478c8b 100644
--- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py
+++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py
@@ -94,3 +94,6 @@ def extract_attributes(self, attributes: _AttributeMapT):
         stream_name = self._call_context.params.get("StreamName")
         if stream_name:
             attributes["aws.kinesis.stream_name"] = stream_name
+        consumer_name = self._call_context.params.get("ConsumerName")
+        if consumer_name:
+            attributes["aws.stream.consumer_name"] = consumer_name
diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py
index 072e6eeb0..e961385fa 100644
--- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py
+++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py
@@ -18,6 +18,7 @@
     AWS_REMOTE_RESOURCE_TYPE,
     AWS_REMOTE_SERVICE,
     AWS_SPAN_KIND,
+    AWS_STREAM_CONSUMER_NAME,
     AWS_STREAM_NAME,
 )
 from amazon.opentelemetry.distro._aws_metric_attribute_generator import _AwsMetricAttributeGenerator
@@ -950,6 +951,32 @@ def test_sdk_client_span_with_remote_resource_attributes(self):
         self._validate_remote_resource_attributes("AWS::Kinesis::Stream", "aws_stream_name")
         self._mock_attribute([AWS_STREAM_NAME], [None])
 
+        # Validate behaviour of AWS_STREAM_CONSUMER_ARN attribute, then remove it.
+        self._mock_attribute(
+            [AWS_STREAM_CONSUMER_NAME],
+            ["aws_stream_consumer_name"],
+            keys,
+            values,
+        )
+        self._validate_remote_resource_attributes(
+            "AWS::Kinesis::StreamConsumer",
+            "aws_stream_consumer_name",
+        )
+        self._mock_attribute([AWS_STREAM_CONSUMER_NAME], [None])
+
+        # Validate both AWS_STREAM_NAME and AWS_STREAM_CONSUMER_ARN present, then remove it.
+        self._mock_attribute(
+            [AWS_STREAM_NAME, AWS_STREAM_CONSUMER_NAME],
+            [
+                "aws_stream_name",
+                "aws_stream_consumer_name",
+            ],
+            keys,
+            values,
+        )
+        self._validate_remote_resource_attributes("AWS::Kinesis::Stream", "aws_stream_name")
+        self._mock_attribute([AWS_STREAM_NAME, AWS_STREAM_CONSUMER_NAME], [None, None])
+
         # Validate behaviour of SpanAttributes.AWS_DYNAMODB_TABLE_NAMES attribute with one table name, then remove it.
         self._mock_attribute([SpanAttributes.AWS_DYNAMODB_TABLE_NAMES], [["aws_table_name"]], keys, values)
         self._validate_remote_resource_attributes("AWS::DynamoDB::Table", "aws_table_name")
diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py
index 124abfad9..f0b4a4172 100644
--- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py
+++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py
@@ -11,6 +11,7 @@
 from opentelemetry.semconv.trace import SpanAttributes
 
 _STREAM_NAME: str = "streamName"
+_CONSUMER_NAME: str = "consumerName"
 _BUCKET_NAME: str = "bucketName"
 _QUEUE_NAME: str = "queueName"
 _QUEUE_URL: str = "queueUrl"
@@ -99,6 +100,8 @@ def _test_patched_botocore_instrumentation(self):
         kinesis_attributes: Dict[str, str] = _do_extract_kinesis_attributes()
         self.assertTrue("aws.kinesis.stream_name" in kinesis_attributes)
         self.assertEqual(kinesis_attributes["aws.kinesis.stream_name"], _STREAM_NAME)
+        self.assertTrue("aws.stream.consumer_name" in kinesis_attributes)
+        self.assertEqual(kinesis_attributes["aws.stream.consumer_name"], _CONSUMER_NAME)
 
         # S3
         self.assertTrue("s3" in _KNOWN_EXTENSIONS)
@@ -140,7 +143,7 @@ def _reset_mocks(self):
 
 def _do_extract_kinesis_attributes() -> Dict[str, str]:
     service_name: str = "kinesis"
-    params: Dict[str, str] = {"StreamName": _STREAM_NAME}
+    params: Dict[str, str] = {"StreamName": _STREAM_NAME, "ConsumerName": _CONSUMER_NAME}
     return _do_extract_attributes(service_name, params)
 
 
diff --git a/contract-tests/README.md b/contract-tests/README.md
index 7e6ff49b2..f4f0002c2 100644
--- a/contract-tests/README.md
+++ b/contract-tests/README.md
@@ -28,6 +28,9 @@ The steps to add a new test for a library or framework are:
     * The test class should be created in `contract-tests/tests/amazon/<framework-name>`.
     * The test class should extend `contract_test_base.py`
 
+Note: For botocore applications, when creating new resources in [prepare_aws_server()](https://github.com/aws-observability/aws-otel-python-instrumentation/blob/166c4cb36da6634cb070df5a312a62f6b0136a9c/contract-tests/images/applications/botocore/botocore_server.py#L215), make sure to check if the resource already exist before creation. 
+This is because each test pull the "aws-application-signals-tests-botocore-app" image and start a new container running `prepare_aws_server()` once, only the first attempt can succeeds, all subsequent attempts will fail due to the resources already existing. 
+
 # How to run the tests locally?
 
 Pre-requirements:
diff --git a/contract-tests/images/applications/botocore/botocore_server.py b/contract-tests/images/applications/botocore/botocore_server.py
index dd1e34c6b..4760746fd 100644
--- a/contract-tests/images/applications/botocore/botocore_server.py
+++ b/contract-tests/images/applications/botocore/botocore_server.py
@@ -5,6 +5,7 @@
 import tempfile
 from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
 from threading import Thread
+from typing import List
 
 import boto3
 import requests
@@ -200,6 +201,12 @@ def _handle_kinesis_request(self) -> None:
         elif self.in_path("putrecord/my-stream"):
             set_main_status(200)
             kinesis_client.put_record(StreamName="test_stream", Data=b"test", PartitionKey="partition_key")
+        elif self.in_path("describestreamconsumer/my-consumer"):
+            set_main_status(200)
+            kinesis_client.describe_stream_consumer(
+                StreamARN="arn:aws:kinesis:us-west-2:000000000000:stream/test_stream",
+                ConsumerName="test_consumer",
+            )
         else:
             set_main_status(404)
 
@@ -212,17 +219,24 @@ def set_main_status(status: int) -> None:
     RequestHandler.main_status = status
 
 
+# pylint: disable=too-many-locals
 def prepare_aws_server() -> None:
     requests.Request(method="POST", url="http://localhost:4566/_localstack/state/reset")
     try:
         # Set up S3 so tests can access buckets and retrieve a file.
         s3_client: BaseClient = boto3.client("s3", endpoint_url=_AWS_SDK_S3_ENDPOINT, region_name=_AWS_REGION)
-        s3_client.create_bucket(
-            Bucket="test-put-object-bucket-name", CreateBucketConfiguration={"LocationConstraint": _AWS_REGION}
-        )
-        s3_client.create_bucket(
-            Bucket="test-get-object-bucket-name", CreateBucketConfiguration={"LocationConstraint": _AWS_REGION}
-        )
+        bucket_names: List[str] = [bucket["Name"] for bucket in s3_client.list_buckets()["Buckets"]]
+        put_bucket_name: str = "test-put-object-bucket-name"
+        if put_bucket_name not in bucket_names:
+            s3_client.create_bucket(
+                Bucket=put_bucket_name, CreateBucketConfiguration={"LocationConstraint": _AWS_REGION}
+            )
+
+        get_bucket_name: str = "test-get-object-bucket-name"
+        if get_bucket_name not in bucket_names:
+            s3_client.create_bucket(
+                Bucket=get_bucket_name, CreateBucketConfiguration={"LocationConstraint": _AWS_REGION}
+            )
         with tempfile.NamedTemporaryFile(delete=True) as temp_file:
             temp_file_name: str = temp_file.name
             temp_file.write(b"This is temp file for S3 upload")
@@ -231,22 +245,34 @@ def prepare_aws_server() -> None:
 
         # Set up DDB so tests can access a table.
         ddb_client: BaseClient = boto3.client("dynamodb", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION)
-        ddb_client.create_table(
-            TableName="put_test_table",
-            KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
-            AttributeDefinitions=[
-                {"AttributeName": "id", "AttributeType": "S"},
-            ],
-            BillingMode="PAY_PER_REQUEST",
-        )
+        table_names: List[str] = ddb_client.list_tables()["TableNames"]
+
+        table_name: str = "put_test_table"
+        if table_name not in table_names:
+            ddb_client.create_table(
+                TableName=table_name,
+                KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
+                AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],
+                BillingMode="PAY_PER_REQUEST",
+            )
 
         # Set up SQS so tests can access a queue.
         sqs_client: BaseClient = boto3.client("sqs", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION)
-        sqs_client.create_queue(QueueName="test_put_get_queue")
+        queue_name: str = "test_put_get_queue"
+        queues_response = sqs_client.list_queues(QueueNamePrefix=queue_name)
+        queues: List[str] = queues_response["QueueUrls"] if "QueueUrls" in queues_response else []
+        if not queues:
+            sqs_client.create_queue(QueueName=queue_name)
 
         # Set up Kinesis so tests can access a stream.
         kinesis_client: BaseClient = boto3.client("kinesis", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION)
-        kinesis_client.create_stream(StreamName="test_stream", ShardCount=1)
+        stream_name: str = "test_stream"
+        stream_response = kinesis_client.list_streams()
+        if not stream_response["StreamNames"]:
+            kinesis_client.create_stream(StreamName=stream_name, ShardCount=1)
+            kinesis_client.register_stream_consumer(
+                StreamARN="arn:aws:kinesis:us-west-2:000000000000:stream/" + stream_name, ConsumerName="test_consumer"
+            )
     except Exception as exception:
         print("Unexpected exception occurred", exception)
 
diff --git a/contract-tests/tests/test/amazon/botocore/botocore_test.py b/contract-tests/tests/test/amazon/botocore/botocore_test.py
index 6fe278d3b..12d01684c 100644
--- a/contract-tests/tests/test/amazon/botocore/botocore_test.py
+++ b/contract-tests/tests/test/amazon/botocore/botocore_test.py
@@ -29,6 +29,7 @@
 _AWS_QUEUE_URL: str = "aws.sqs.queue_url"
 _AWS_QUEUE_NAME: str = "aws.sqs.queue_name"
 _AWS_STREAM_NAME: str = "aws.kinesis.stream_name"
+_AWS_STREAM_CONSUMER_NAME: str = "aws.stream.consumer_name"
 
 
 # pylint: disable=too-many-public-methods
@@ -338,6 +339,23 @@ def test_kinesis_put_record(self):
             span_name="Kinesis.PutRecord",
         )
 
+    def test_kinesis_describe_stream_consumer(self):
+        self.do_test_requests(
+            "kinesis/describestreamconsumer/my-consumer",
+            "GET",
+            200,
+            0,
+            0,
+            remote_service="AWS::Kinesis",
+            remote_operation="DescribeStreamConsumer",
+            remote_resource_type="AWS::Kinesis::StreamConsumer",
+            remote_resource_identifier="test_consumer",
+            request_specific_attributes={
+                _AWS_STREAM_CONSUMER_NAME: "test_consumer",
+            },
+            span_name="Kinesis.DescribeStreamConsumer",
+        )
+
     def test_kinesis_error(self):
         self.do_test_requests(
             "kinesis/error",
@@ -458,6 +476,7 @@ def _assert_semantic_conventions_attributes(
                 self._assert_array_value_ddb_table_name(attributes_dict, key, value)
 
     @override
+    # pylint: disable=too-many-locals
     def _assert_metric_attributes(
         self,
         resource_scope_metrics: List[ResourceScopeMetric],