Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Processor: Update Kinesis Lambda processor to understand AWS DMS #217

Merged
merged 1 commit into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


## Unreleased
- Processor: Updated Kinesis Lambda processor to understand AWS DMS

## 2024/07/25 v0.0.16
- `ctk load table`: Added support for MongoDB Change Streams
Expand Down
135 changes: 95 additions & 40 deletions cratedb_toolkit/io/processor/kinesis_lambda.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,31 @@
# Copyright (c) 2021-2024, Crate.io Inc.
# Distributed under the terms of the Apache 2 license.
"""
Consume an AWS Kinesis Stream and relay into CrateDB.
https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis-example.html
https://docs.aws.amazon.com/lambda/latest/dg/python-logging.html
https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis-example.html#with-kinesis-example-create-function
Using an AWS Lambda, consume an AWS Kinesis Stream of CDC data, and relay
into CrateDB, re-materializing the original information into an OBJECT
column `data`.

In order to run, this module/program needs the following 3rd party
libraries, defined using inline script metadata.
Currently supported CDC message formats:

- AWS DMS
- AWS DynamoDB

Details:
When using `ON_ERROR = exit`, the processor uses Linux exit codes for
signalling error conditions, see https://stackoverflow.com/a/76187305.

Resources:
- https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis-example.html
- https://docs.aws.amazon.com/lambda/latest/dg/python-logging.html
"""

# In order to run, this module/program needs the following
# 3rd party libraries, defined using inline script metadata.
#
# /// script
# requires-python = ">=3.9"
# dependencies = [
# "commons-codec==0.0.2",
# "commons-codec==0.0.3",
# "sqlalchemy-cratedb==0.38.0",
# ]
# ///
Expand All @@ -22,25 +34,70 @@
import logging
import os
import sys
import typing as t

import sqlalchemy as sa
from commons_codec.exception import UnknownOperationError
from commons_codec.model import ColumnTypeMapStore
from commons_codec.transform.aws_dms import DMSTranslatorCrateDB
from commons_codec.transform.dynamodb import DynamoCDCTranslatorCrateDB
from sqlalchemy.util import asbool

logger = logging.getLogger(__name__)
LOG_LEVEL: str = os.environ.get("LOG_LEVEL", "INFO")
USE_BATCH_PROCESSING: bool = asbool(os.environ.get("USE_BATCH_PROCESSING", "false"))
ON_ERROR: str = os.environ.get("ON_ERROR", "exit")
SQL_ECHO: bool = asbool(os.environ.get("SQL_ECHO", "false"))

# TODO: Control using environment variable.
logger.setLevel("INFO")
MESSAGE_FORMAT: str = os.environ.get("MESSAGE_FORMAT", "unknown")
COLUMN_TYPES: str = os.environ.get("COLUMN_TYPES", "")
CRATEDB_SQLALCHEMY_URL: str = os.environ.get("CRATEDB_SQLALCHEMY_URL", "crate://")
CRATEDB_TABLE: str = os.environ.get("CRATEDB_TABLE", "default")
Comment on lines +52 to +53
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the only deviation compared to the upstream variant: There, those variables are called SINK_SQLALCHEMY_URL and SINK_TABLE.


# TODO: Control using environment variables.
USE_BATCH_PROCESSING: bool = False
ON_ERROR: t.Literal["exit", "noop", "raise"] = "exit"

# TODO: Control `echo` using environment variable.
engine = sa.create_engine(os.environ.get("CRATEDB_SQLALCHEMY_URL", "crate://"), echo=True)

# TODO: Automatically create destination table? How?
cdc = DynamoCDCTranslatorCrateDB(table_name=os.environ.get("CRATEDB_TABLE", "default"))
logger = logging.getLogger(__name__)
logger.setLevel(LOG_LEVEL)


# Sanity checks.
# If any value is invalid, terminate by signalling "22 - Invalid argument".
error_strategies = ["exit", "ignore", "raise"]
message_formats = ["dms", "dynamodb"]
if ON_ERROR not in error_strategies:
message = f"Invalid value for ON_ERROR: {ON_ERROR}. Use one of: {error_strategies}"
logger.fatal(message)
sys.exit(22)
if MESSAGE_FORMAT not in message_formats:
message = f"Invalid value for MESSAGE_FORMAT: {MESSAGE_FORMAT}. Use one of: {message_formats}"
logger.fatal(message)
sys.exit(22)
try:
column_types = ColumnTypeMapStore.from_json(COLUMN_TYPES)
except Exception as ex:
message = f"Invalid value for COLUMN_TYPES: {COLUMN_TYPES}. Reason: {ex}. Use JSON."
logger.fatal(message)
sys.exit(22)

# TODO: Automatically create destination table.
# TODO: Propagate mapping definitions and other settings.
if MESSAGE_FORMAT == "dms":
cdc = DMSTranslatorCrateDB(column_types=column_types)
elif MESSAGE_FORMAT == "dynamodb":
cdc = DynamoCDCTranslatorCrateDB(table_name=CRATEDB_TABLE)

# Create the database connection outside the handler to allow
# connections to be re-used by subsequent function invocations.
# TODO: Examine long-running jobs about successful reconnection behavior.
try:
engine = sa.create_engine(CRATEDB_SQLALCHEMY_URL, echo=SQL_ECHO)
connection = engine.connect()
logger.info(f"Connection to sink database succeeded: {CRATEDB_SQLALCHEMY_URL}")
except Exception as ex:
logger.exception(f"Connection to sink database failed: {CRATEDB_SQLALCHEMY_URL}")
if ON_ERROR == "exit":
# Signal "Resource temporarily unavailable" when connection to database fails.
sys.exit(11)
elif ON_ERROR == "ignore":
pass
elif ON_ERROR == "raise":
raise ex


def handler(event, context):
Expand All @@ -51,46 +108,44 @@ def handler(event, context):
"""

cur_record_sequence_number = ""
logger.info("context: %s", context)
logger.debug("context: %s", context)

for record in event["Records"]:
logger.debug(f"Record: {record}")
event_id = record["eventID"]
try:
# Log and decode event.
# TODO: Remove log statements.
logger.info(f"Processed Kinesis Event - EventID: {record['eventID']}")
logger.info(f"Event Data: {record}")
# TODO: Remove log statements for better performance?
logger.debug(f"Processed Kinesis Event - EventID: {event_id}")
record_data = json.loads(base64.b64decode(record["kinesis"]["data"]).decode("utf-8"))
logger.info(f"Record Data: {record_data}")
logger.debug(f"Record Data: {record_data}")

# Process record.
sql = cdc.to_sql(record_data)
run_sql(sql)
connection.execute(sa.text(sql))
connection.commit()

# Bookkeeping.
cur_record_sequence_number = record["kinesis"]["sequenceNumber"]

except UnknownOperationError as ex:
logger.warning(f"Ignoring message. Reason: {ex}. Record: {ex.record}")

except Exception as ex:
error_message = "An error occurred"
error_message = f"An error occurred processing event: {event_id}"
logger.exception(error_message)
if USE_BATCH_PROCESSING:
# Return failed record's sequence number.
return {"batchItemFailures": [{"itemIdentifier": cur_record_sequence_number}]}
if ON_ERROR == "exit":
sys.exit(6)
if ON_ERROR == "raise":
# Signal "Input/output error" when error happens while processing data.
sys.exit(5)
elif ON_ERROR == "ignore":
pass
elif ON_ERROR == "raise":
raise ex

logger.info(f"Successfully processed {len(event['Records'])} records.")
logger.info(f"Successfully processed {len(event['Records'])} records")
if USE_BATCH_PROCESSING:
return {"batchItemFailures": []}
return None


def run_sql(sql: str):
"""
Execute an SQL statement.

TODO: Optimize performance.
"""
with engine.connect() as connection:
connection.execute(sa.text(sql))
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ kinesis = [
"lorrystream @ git+https://github.com/daq-tools/lorrystream.git@55cf456fdcd3",
]
mongodb = [
"commons-codec[mongodb]==0.0.2",
"commons-codec[mongodb]==0.0.3",
"cratedb-toolkit[io]",
"orjson<4,>=3.3.1",
"pymongo<5,>=3.10.1",
Expand Down
Loading