Skip to content

Commit

Permalink
DynamoDB Relay: Refactor "drop stream" action into test fixture
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Sep 16, 2024
1 parent 8289282 commit 6ea86cc
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 29 deletions.
58 changes: 52 additions & 6 deletions tests/io/dynamodb/conftest.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
import logging
import time
import typing

import botocore
import pytest
from yarl import URL

from cratedb_toolkit.io.dynamodb.adapter import DynamoDBAdapter
from cratedb_toolkit.io.kinesis.adapter import KinesisAdapter
from tests.io.dynamodb.manager import DynamoDBTestManager

logger = logging.getLogger(__name__)


# Define databases to be deleted before running each test case.
# Define streams to be deleted before running each test case.
RESET_STREAMS = [
"demo",
]

# Define tables to be deleted before running each test case.
RESET_TABLES = [
"ProductCatalog",
]
Expand All @@ -17,11 +27,15 @@
class DynamoDBFixture:
"""
A little helper wrapping Testcontainer's `LocalStackContainer`.
TODO: Generalize into `LocalStackFixture`.
"""

def __init__(self):
self.container = None
self.url = None
self.dynamodb_adapter: typing.Union[DynamoDBAdapter, None] = None
self.kinesis_adapter: typing.Union[KinesisAdapter, None] = None
self.setup()

def setup(self):
Expand All @@ -32,17 +46,49 @@ def setup(self):
self.container.with_services("dynamodb", "kinesis")
self.container.start()

self.dynamodb_adapter = DynamoDBAdapter(URL(f"{self.get_connection_url_dynamodb()}/?region=us-east-1"))
self.kinesis_adapter = KinesisAdapter(
URL(f"{self.get_connection_url_kinesis_dynamodb_cdc()}/?region=us-east-1")
)

def finalize(self):
self.container.stop()

def reset(self):
"""
Drop all databases used for testing.
Reset all resources to provide each test case with a fresh canvas.
"""
self.reset_streams()
self.reset_tables()

def reset_streams(self):
"""
Drop all Kinesis streams used for testing.
"""
kinesis_client = self.kinesis_adapter.kinesis_client
for stream_name in RESET_STREAMS:
try:
kinesis_client.delete_stream(StreamName=stream_name)
except botocore.exceptions.ClientError as error:
if error.response["Error"]["Code"] != "ResourceNotFoundException":
raise
waiter = kinesis_client.get_waiter("stream_not_exists")
waiter.wait(StreamName=stream_name, WaiterConfig={"Delay": 0.3, "MaxAttempts": 15})
time.sleep(0.25)

def reset_tables(self):
"""
Drop all DynamoDB tables used for testing.
"""
# FIXME
return
for database_name in RESET_TABLES:
self.client.drop_database(database_name)
dynamodb_client = self.dynamodb_adapter.dynamodb_client
for table_name in RESET_TABLES:
try:
dynamodb_client.delete_table(TableName=table_name)
except botocore.exceptions.ClientError as error:
if error.response["Error"]["Code"] != "ResourceNotFoundException":
raise
waiter = dynamodb_client.get_waiter("table_not_exists")
waiter.wait(TableName=table_name, WaiterConfig={"Delay": 0.3, "MaxAttempts": 15})

def get_connection_url_dynamodb(self):
url = URL(self.container.get_url())
Expand Down
23 changes: 0 additions & 23 deletions tests/io/dynamodb/test_relay.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import threading
import time

import botocore
import pytest

from cratedb_toolkit.io.kinesis.relay import KinesisRelay
Expand Down Expand Up @@ -45,17 +44,6 @@ def test_kinesis_earliest_dynamodb_cdc_insert_update(caplog, cratedb, dynamodb):
# Initialize table loader.
table_loader = KinesisRelay(kinesis_url=kinesis_url, cratedb_url=cratedb_url)

# Delete stream for blank canvas.
try:
table_loader.kinesis_adapter.kinesis_client.delete_stream(StreamName="demo", EnforceConsumerDeletion=True)
except botocore.exceptions.ClientError as error:
if error.response["Error"]["Code"] != "ResourceNotFoundException":
raise

# LocalStack needs a while when deleting the Stream.
# FIXME: Can this be made more efficient?
time.sleep(0.5)

# Populate source database with data.
for event in events:
table_loader.kinesis_adapter.produce(event)
Expand Down Expand Up @@ -98,17 +86,6 @@ def test_kinesis_latest_dynamodb_cdc_insert_update(caplog, cratedb, dynamodb):
# Initialize table loader.
table_loader = KinesisRelay(kinesis_url=kinesis_url, cratedb_url=cratedb_url)

# Delete stream for blank canvas.
try:
table_loader.kinesis_adapter.kinesis_client.delete_stream(StreamName="demo")
except botocore.exceptions.ClientError as error:
if error.response["Error"]["Code"] != "ResourceNotFoundException":
raise

# LocalStack needs a while when deleting the Stream.
# FIXME: Can this be made more efficient instead of waiting multiple times to orchestrate this sequence?
time.sleep(0.5)

# Start event processor / stream consumer in separate thread, consuming forever.
thread = threading.Thread(target=table_loader.start)
thread.start()
Expand Down

0 comments on commit 6ea86cc

Please sign in to comment.