-
Notifications
You must be signed in to change notification settings - Fork 22
Enhance kinesis Consumer support #200
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 8 commits
b6e9392
8dd74b9
5ec390e
2a08e02
d636f49
fd73ad2
2a1ed66
d9ce47b
fb0c647
d670649
920ce00
846d435
dc468f3
103c653
41571fc
72c59b8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ | |
import tempfile | ||
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer | ||
from threading import Thread | ||
from typing import List, Optional | ||
|
||
import boto3 | ||
import requests | ||
|
@@ -30,6 +31,10 @@ | |
class RequestHandler(BaseHTTPRequestHandler): | ||
main_status: int = 200 | ||
|
||
def __init__(self, request, client_address, server, *args, consumer_arn=None, **kwargs): | ||
self.consumer_arn = consumer_arn | ||
super().__init__(request, client_address, server, *args, **kwargs) | ||
|
||
@override | ||
# pylint: disable=invalid-name | ||
def do_GET(self): | ||
|
@@ -200,6 +205,9 @@ def _handle_kinesis_request(self) -> None: | |
elif self.in_path("putrecord/my-stream"): | ||
set_main_status(200) | ||
kinesis_client.put_record(StreamName="test_stream", Data=b"test", PartitionKey="partition_key") | ||
elif self.in_path("describestreamconsumer/my-consumer"): | ||
set_main_status(200) | ||
kinesis_client.describe_stream_consumer(ConsumerARN=self.consumer_arn) | ||
else: | ||
set_main_status(404) | ||
|
||
|
@@ -212,17 +220,24 @@ def set_main_status(status: int) -> None: | |
RequestHandler.main_status = status | ||
|
||
|
||
def prepare_aws_server() -> None: | ||
# pylint: disable=too-many-locals | ||
def prepare_aws_server() -> Optional[List[str]]: | ||
requests.Request(method="POST", url="http://localhost:4566/_localstack/state/reset") | ||
try: | ||
# Set up S3 so tests can access buckets and retrieve a file. | ||
s3_client: BaseClient = boto3.client("s3", endpoint_url=_AWS_SDK_S3_ENDPOINT, region_name=_AWS_REGION) | ||
s3_client.create_bucket( | ||
Bucket="test-put-object-bucket-name", CreateBucketConfiguration={"LocationConstraint": _AWS_REGION} | ||
) | ||
s3_client.create_bucket( | ||
Bucket="test-get-object-bucket-name", CreateBucketConfiguration={"LocationConstraint": _AWS_REGION} | ||
) | ||
bucket_names: List[str] = [bucket["Name"] for bucket in s3_client.list_buckets()["Buckets"]] | ||
put_bucket_name: str = "test-put-object-bucket-name" | ||
if put_bucket_name not in bucket_names: | ||
s3_client.create_bucket( | ||
Bucket=put_bucket_name, CreateBucketConfiguration={"LocationConstraint": _AWS_REGION} | ||
) | ||
|
||
get_bucket_name: str = "test-get-object-bucket-name" | ||
if get_bucket_name not in bucket_names: | ||
s3_client.create_bucket( | ||
Bucket=get_bucket_name, CreateBucketConfiguration={"LocationConstraint": _AWS_REGION} | ||
) | ||
Comment on lines
+228
to
+239
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this needed? Same with changes to DDB, SQS sections? Is it because we are not refreshing the AWS container between tests? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For each test, we will pull the "aws-application-signals-tests-botocore-app", and start a new container. Only the first container will succeed, all the following containers are fail with Same for other resources. So, for all the services, only the first container succeed with creating resources. All the following container fails. This was fine as long as we already created the resource before. But After adding kinesis_client consumer, we will have trouble, because we want to have different consumer_arn returned for each container. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Interesting, thanks for investigating and improving! Not blocking: Consider adding documentation to this logic that explains it really only runs once, to avoid confusion in future. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added. |
||
with tempfile.NamedTemporaryFile(delete=True) as temp_file: | ||
temp_file_name: str = temp_file.name | ||
temp_file.write(b"This is temp file for S3 upload") | ||
|
@@ -231,31 +246,54 @@ def prepare_aws_server() -> None: | |
|
||
# Set up DDB so tests can access a table. | ||
ddb_client: BaseClient = boto3.client("dynamodb", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION) | ||
ddb_client.create_table( | ||
TableName="put_test_table", | ||
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}], | ||
AttributeDefinitions=[ | ||
{"AttributeName": "id", "AttributeType": "S"}, | ||
], | ||
BillingMode="PAY_PER_REQUEST", | ||
) | ||
table_names: List[str] = ddb_client.list_tables()["TableNames"] | ||
|
||
table_name: str = "put_test_table" | ||
if table_name not in table_names: | ||
ddb_client.create_table( | ||
TableName=table_name, | ||
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}], | ||
AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}], | ||
BillingMode="PAY_PER_REQUEST", | ||
) | ||
|
||
# Set up SQS so tests can access a queue. | ||
sqs_client: BaseClient = boto3.client("sqs", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION) | ||
sqs_client.create_queue(QueueName="test_put_get_queue") | ||
queue_name: str = "test_put_get_queue" | ||
queues_response = sqs_client.list_queues(QueueNamePrefix=queue_name) | ||
queues: List[str] = queues_response["QueueUrls"] if "QueueUrls" in queues_response else [] | ||
if not queues: | ||
sqs_client.create_queue(QueueName=queue_name) | ||
|
||
# Set up Kinesis so tests can access a stream. | ||
kinesis_client: BaseClient = boto3.client("kinesis", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION) | ||
kinesis_client.create_stream(StreamName="test_stream", ShardCount=1) | ||
stream_name: str = "test_stream" | ||
stream_response = kinesis_client.list_streams() | ||
if not stream_response["StreamNames"]: | ||
kinesis_client.create_stream(StreamName=stream_name, ShardCount=1) | ||
response = kinesis_client.register_stream_consumer( | ||
StreamARN="arn:aws:kinesis:us-west-2:000000000000:stream/" + stream_name, ConsumerName="test_consumer" | ||
) | ||
consumer_arn: str = response["Consumer"]["ConsumerARN"] | ||
else: | ||
response = kinesis_client.list_stream_consumers( | ||
StreamARN="arn:aws:kinesis:us-west-2:000000000000:stream/test_stream" | ||
) | ||
consumer_arn: str = response.get("Consumers", [])[0]["ConsumerARN"] | ||
|
||
return [consumer_arn] | ||
except Exception as exception: | ||
print("Unexpected exception occurred", exception) | ||
return [None] | ||
|
||
|
||
def main() -> None: | ||
prepare_aws_server() | ||
[consumer_arn] = prepare_aws_server() | ||
thpierce marked this conversation as resolved.
Show resolved
Hide resolved
|
||
server_address: tuple[str, int] = ("0.0.0.0", _PORT) | ||
request_handler_class: type = RequestHandler | ||
requests_server: ThreadingHTTPServer = ThreadingHTTPServer(server_address, request_handler_class) | ||
requests_server: ThreadingHTTPServer = ThreadingHTTPServer( | ||
server_address, lambda *args, **kwargs: request_handler_class(*args, consumer_arn=consumer_arn, **kwargs) | ||
) | ||
atexit.register(requests_server.shutdown) | ||
server_thread: Thread = Thread(target=requests_server.serve_forever) | ||
server_thread.start() | ||
|
Uh oh!
There was an error while loading. Please reload this page.