diff --git a/CHANGES.md b/CHANGES.md index 5dbf8ca6..3606a814 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,7 @@ ## Unreleased +- Processor: Updated Kinesis Lambda processor to understand AWS DMS ## 2024/07/25 v0.0.16 - `ctk load table`: Added support for MongoDB Change Streams diff --git a/cratedb_toolkit/io/processor/kinesis_lambda.py b/cratedb_toolkit/io/processor/kinesis_lambda.py index bc948f3a..c9c60a78 100644 --- a/cratedb_toolkit/io/processor/kinesis_lambda.py +++ b/cratedb_toolkit/io/processor/kinesis_lambda.py @@ -1,19 +1,31 @@ # Copyright (c) 2021-2024, Crate.io Inc. # Distributed under the terms of the Apache 2 license. """ -Consume an AWS Kinesis Stream and relay into CrateDB. -https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis-example.html -https://docs.aws.amazon.com/lambda/latest/dg/python-logging.html -https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis-example.html#with-kinesis-example-create-function +Using an AWS Lambda, consume an AWS Kinesis Stream of CDC data, and relay +into CrateDB, re-materializing the original information into an OBJECT +column `data`. -In order to run, this module/program needs the following 3rd party -libraries, defined using inline script metadata. +Currently supported CDC message formats: + +- AWS DMS +- AWS DynamoDB + +Details: +When using `ON_ERROR = exit`, the processor uses Linux exit codes for +signalling error conditions, see https://stackoverflow.com/a/76187305. + +Resources: +- https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis-example.html +- https://docs.aws.amazon.com/lambda/latest/dg/python-logging.html """ +# In order to run, this module/program needs the following +# 3rd party libraries, defined using inline script metadata. +# # /// script # requires-python = ">=3.9" # dependencies = [ -# "commons-codec==0.0.2", +# "commons-codec==0.0.3", # "sqlalchemy-cratedb==0.38.0", # ] # /// @@ -22,25 +34,70 @@ import logging import os import sys -import typing as t import sqlalchemy as sa +from commons_codec.exception import UnknownOperationError +from commons_codec.model import ColumnTypeMapStore +from commons_codec.transform.aws_dms import DMSTranslatorCrateDB from commons_codec.transform.dynamodb import DynamoCDCTranslatorCrateDB +from sqlalchemy.util import asbool -logger = logging.getLogger(__name__) +LOG_LEVEL: str = os.environ.get("LOG_LEVEL", "INFO") +USE_BATCH_PROCESSING: bool = asbool(os.environ.get("USE_BATCH_PROCESSING", "false")) +ON_ERROR: str = os.environ.get("ON_ERROR", "exit") +SQL_ECHO: bool = asbool(os.environ.get("SQL_ECHO", "false")) -# TODO: Control using environment variable. -logger.setLevel("INFO") +MESSAGE_FORMAT: str = os.environ.get("MESSAGE_FORMAT", "unknown") +COLUMN_TYPES: str = os.environ.get("COLUMN_TYPES", "") +CRATEDB_SQLALCHEMY_URL: str = os.environ.get("CRATEDB_SQLALCHEMY_URL", "crate://") +CRATEDB_TABLE: str = os.environ.get("CRATEDB_TABLE", "default") -# TODO: Control using environment variables. -USE_BATCH_PROCESSING: bool = False -ON_ERROR: t.Literal["exit", "noop", "raise"] = "exit" - -# TODO: Control `echo` using environment variable. -engine = sa.create_engine(os.environ.get("CRATEDB_SQLALCHEMY_URL", "crate://"), echo=True) - -# TODO: Automatically create destination table? How? -cdc = DynamoCDCTranslatorCrateDB(table_name=os.environ.get("CRATEDB_TABLE", "default")) +logger = logging.getLogger(__name__) +logger.setLevel(LOG_LEVEL) + + +# Sanity checks. +# If any value is invalid, terminate by signalling "22 - Invalid argument". +error_strategies = ["exit", "ignore", "raise"] +message_formats = ["dms", "dynamodb"] +if ON_ERROR not in error_strategies: + message = f"Invalid value for ON_ERROR: {ON_ERROR}. Use one of: {error_strategies}" + logger.fatal(message) + sys.exit(22) +if MESSAGE_FORMAT not in message_formats: + message = f"Invalid value for MESSAGE_FORMAT: {MESSAGE_FORMAT}. Use one of: {message_formats}" + logger.fatal(message) + sys.exit(22) +try: + column_types = ColumnTypeMapStore.from_json(COLUMN_TYPES) +except Exception as ex: + message = f"Invalid value for COLUMN_TYPES: {COLUMN_TYPES}. Reason: {ex}. Use JSON." + logger.fatal(message) + sys.exit(22) + +# TODO: Automatically create destination table. +# TODO: Propagate mapping definitions and other settings. +if MESSAGE_FORMAT == "dms": + cdc = DMSTranslatorCrateDB(column_types=column_types) +elif MESSAGE_FORMAT == "dynamodb": + cdc = DynamoCDCTranslatorCrateDB(table_name=CRATEDB_TABLE) + +# Create the database connection outside the handler to allow +# connections to be re-used by subsequent function invocations. +# TODO: Examine long-running jobs about successful reconnection behavior. +try: + engine = sa.create_engine(CRATEDB_SQLALCHEMY_URL, echo=SQL_ECHO) + connection = engine.connect() + logger.info(f"Connection to sink database succeeded: {CRATEDB_SQLALCHEMY_URL}") +except Exception as ex: + logger.exception(f"Connection to sink database failed: {CRATEDB_SQLALCHEMY_URL}") + if ON_ERROR == "exit": + # Signal "Resource temporarily unavailable" when connection to database fails. + sys.exit(11) + elif ON_ERROR == "ignore": + pass + elif ON_ERROR == "raise": + raise ex def handler(event, context): @@ -51,46 +108,44 @@ def handler(event, context): """ cur_record_sequence_number = "" - logger.info("context: %s", context) + logger.debug("context: %s", context) for record in event["Records"]: + logger.debug(f"Record: {record}") + event_id = record["eventID"] try: # Log and decode event. - # TODO: Remove log statements. - logger.info(f"Processed Kinesis Event - EventID: {record['eventID']}") - logger.info(f"Event Data: {record}") + # TODO: Remove log statements for better performance? + logger.debug(f"Processed Kinesis Event - EventID: {event_id}") record_data = json.loads(base64.b64decode(record["kinesis"]["data"]).decode("utf-8")) - logger.info(f"Record Data: {record_data}") + logger.debug(f"Record Data: {record_data}") # Process record. sql = cdc.to_sql(record_data) - run_sql(sql) + connection.execute(sa.text(sql)) + connection.commit() # Bookkeeping. cur_record_sequence_number = record["kinesis"]["sequenceNumber"] + except UnknownOperationError as ex: + logger.warning(f"Ignoring message. Reason: {ex}. Record: {ex.record}") + except Exception as ex: - error_message = "An error occurred" + error_message = f"An error occurred processing event: {event_id}" logger.exception(error_message) if USE_BATCH_PROCESSING: # Return failed record's sequence number. return {"batchItemFailures": [{"itemIdentifier": cur_record_sequence_number}]} if ON_ERROR == "exit": - sys.exit(6) - if ON_ERROR == "raise": + # Signal "Input/output error" when error happens while processing data. + sys.exit(5) + elif ON_ERROR == "ignore": + pass + elif ON_ERROR == "raise": raise ex - logger.info(f"Successfully processed {len(event['Records'])} records.") + logger.info(f"Successfully processed {len(event['Records'])} records") if USE_BATCH_PROCESSING: return {"batchItemFailures": []} return None - - -def run_sql(sql: str): - """ - Execute an SQL statement. - - TODO: Optimize performance. - """ - with engine.connect() as connection: - connection.execute(sa.text(sql)) diff --git a/pyproject.toml b/pyproject.toml index 1d497888..57888a59 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -153,7 +153,7 @@ kinesis = [ "lorrystream @ git+https://github.com/daq-tools/lorrystream.git@55cf456fdcd3", ] mongodb = [ - "commons-codec[mongodb]==0.0.2", + "commons-codec[mongodb]==0.0.3", "cratedb-toolkit[io]", "orjson<4,>=3.3.1", "pymongo<5,>=3.10.1",