Skip to content

Commit

Permalink
Processor: Update Kinesis Lambda processor to understand AWS DMS
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl authored and hammerhead committed Aug 9, 2024
1 parent d0d98e0 commit a485ff7
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 41 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


## Unreleased
- Processor: Updated Kinesis Lambda processor to understand AWS DMS

## 2024/07/25 v0.0.16
- `ctk load table`: Added support for MongoDB Change Streams
Expand Down
135 changes: 95 additions & 40 deletions cratedb_toolkit/io/processor/kinesis_lambda.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,31 @@
# Copyright (c) 2021-2024, Crate.io Inc.
# Distributed under the terms of the Apache 2 license.
"""
Consume an AWS Kinesis Stream and relay into CrateDB.
https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis-example.html
https://docs.aws.amazon.com/lambda/latest/dg/python-logging.html
https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis-example.html#with-kinesis-example-create-function
Using an AWS Lambda, consume an AWS Kinesis Stream of CDC data, and relay
into CrateDB, re-materializing the original information into an OBJECT
column `data`.
In order to run, this module/program needs the following 3rd party
libraries, defined using inline script metadata.
Currently supported CDC message formats:
- AWS DMS
- AWS DynamoDB
Details:
When using `ON_ERROR = exit`, the processor uses Linux exit codes for
signalling error conditions, see https://stackoverflow.com/a/76187305.
Resources:
- https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis-example.html
- https://docs.aws.amazon.com/lambda/latest/dg/python-logging.html
"""

# In order to run, this module/program needs the following
# 3rd party libraries, defined using inline script metadata.
#
# /// script
# requires-python = ">=3.9"
# dependencies = [
# "commons-codec==0.0.2",
# "commons-codec==0.0.3",
# "sqlalchemy-cratedb==0.38.0",
# ]
# ///
Expand All @@ -22,25 +34,70 @@
import logging
import os
import sys
import typing as t

import sqlalchemy as sa
from commons_codec.exception import UnknownOperationError
from commons_codec.model import ColumnTypeMapStore
from commons_codec.transform.aws_dms import DMSTranslatorCrateDB
from commons_codec.transform.dynamodb import DynamoCDCTranslatorCrateDB
from sqlalchemy.util import asbool

logger = logging.getLogger(__name__)
LOG_LEVEL: str = os.environ.get("LOG_LEVEL", "INFO")
USE_BATCH_PROCESSING: bool = asbool(os.environ.get("USE_BATCH_PROCESSING", "false"))
ON_ERROR: str = os.environ.get("ON_ERROR", "exit")
SQL_ECHO: bool = asbool(os.environ.get("SQL_ECHO", "false"))

# TODO: Control using environment variable.
logger.setLevel("INFO")
MESSAGE_FORMAT: str = os.environ.get("MESSAGE_FORMAT", "unknown")
COLUMN_TYPES: str = os.environ.get("COLUMN_TYPES", "")
CRATEDB_SQLALCHEMY_URL: str = os.environ.get("CRATEDB_SQLALCHEMY_URL", "crate://")
CRATEDB_TABLE: str = os.environ.get("CRATEDB_TABLE", "default")

# TODO: Control using environment variables.
USE_BATCH_PROCESSING: bool = False
ON_ERROR: t.Literal["exit", "noop", "raise"] = "exit"

# TODO: Control `echo` using environment variable.
engine = sa.create_engine(os.environ.get("CRATEDB_SQLALCHEMY_URL", "crate://"), echo=True)

# TODO: Automatically create destination table? How?
cdc = DynamoCDCTranslatorCrateDB(table_name=os.environ.get("CRATEDB_TABLE", "default"))
logger = logging.getLogger(__name__)
logger.setLevel(LOG_LEVEL)


# Sanity checks.
# If any value is invalid, terminate by signalling "22 - Invalid argument".
error_strategies = ["exit", "ignore", "raise"]
message_formats = ["dms", "dynamodb"]
if ON_ERROR not in error_strategies:
message = f"Invalid value for ON_ERROR: {ON_ERROR}. Use one of: {error_strategies}"
logger.fatal(message)
sys.exit(22)
if MESSAGE_FORMAT not in message_formats:
message = f"Invalid value for MESSAGE_FORMAT: {MESSAGE_FORMAT}. Use one of: {message_formats}"
logger.fatal(message)
sys.exit(22)
try:
column_types = ColumnTypeMapStore.from_json(COLUMN_TYPES)
except Exception as ex:
message = f"Invalid value for COLUMN_TYPES: {COLUMN_TYPES}. Reason: {ex}. Use JSON."
logger.fatal(message)
sys.exit(22)

# TODO: Automatically create destination table.
# TODO: Propagate mapping definitions and other settings.
if MESSAGE_FORMAT == "dms":
cdc = DMSTranslatorCrateDB(column_types=column_types)
elif MESSAGE_FORMAT == "dynamodb":
cdc = DynamoCDCTranslatorCrateDB(table_name=CRATEDB_TABLE)

# Create the database connection outside the handler to allow
# connections to be re-used by subsequent function invocations.
# TODO: Examine long-running jobs about successful reconnection behavior.
try:
engine = sa.create_engine(CRATEDB_SQLALCHEMY_URL, echo=SQL_ECHO)
connection = engine.connect()
logger.info(f"Connection to sink database succeeded: {CRATEDB_SQLALCHEMY_URL}")
except Exception as ex:
logger.exception(f"Connection to sink database failed: {CRATEDB_SQLALCHEMY_URL}")
if ON_ERROR == "exit":
# Signal "Resource temporarily unavailable" when connection to database fails.
sys.exit(11)
elif ON_ERROR == "ignore":
pass
elif ON_ERROR == "raise":
raise ex


def handler(event, context):
Expand All @@ -51,46 +108,44 @@ def handler(event, context):
"""

cur_record_sequence_number = ""
logger.info("context: %s", context)
logger.debug("context: %s", context)

for record in event["Records"]:
logger.debug(f"Record: {record}")
event_id = record["eventID"]
try:
# Log and decode event.
# TODO: Remove log statements.
logger.info(f"Processed Kinesis Event - EventID: {record['eventID']}")
logger.info(f"Event Data: {record}")
# TODO: Remove log statements for better performance?
logger.debug(f"Processed Kinesis Event - EventID: {event_id}")
record_data = json.loads(base64.b64decode(record["kinesis"]["data"]).decode("utf-8"))
logger.info(f"Record Data: {record_data}")
logger.debug(f"Record Data: {record_data}")

# Process record.
sql = cdc.to_sql(record_data)
run_sql(sql)
connection.execute(sa.text(sql))
connection.commit()

# Bookkeeping.
cur_record_sequence_number = record["kinesis"]["sequenceNumber"]

except UnknownOperationError as ex:
logger.warning(f"Ignoring message. Reason: {ex}. Record: {ex.record}")

except Exception as ex:
error_message = "An error occurred"
error_message = f"An error occurred processing event: {event_id}"
logger.exception(error_message)
if USE_BATCH_PROCESSING:
# Return failed record's sequence number.
return {"batchItemFailures": [{"itemIdentifier": cur_record_sequence_number}]}
if ON_ERROR == "exit":
sys.exit(6)
if ON_ERROR == "raise":
# Signal "Input/output error" when error happens while processing data.
sys.exit(5)
elif ON_ERROR == "ignore":
pass
elif ON_ERROR == "raise":
raise ex

logger.info(f"Successfully processed {len(event['Records'])} records.")
logger.info(f"Successfully processed {len(event['Records'])} records")
if USE_BATCH_PROCESSING:
return {"batchItemFailures": []}
return None


def run_sql(sql: str):
"""
Execute an SQL statement.
TODO: Optimize performance.
"""
with engine.connect() as connection:
connection.execute(sa.text(sql))
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ kinesis = [
"lorrystream @ git+https://github.com/daq-tools/lorrystream.git@55cf456fdcd3",
]
mongodb = [
"commons-codec[mongodb]==0.0.2",
"commons-codec[mongodb]==0.0.3",
"cratedb-toolkit[io]",
"orjson<4,>=3.3.1",
"pymongo<5,>=3.10.1",
Expand Down

0 comments on commit a485ff7

Please sign in to comment.