diff --git a/IDEA.md b/IDEA.md index b5763d5..9e90c66 100644 --- a/IDEA.md +++ b/IDEA.md @@ -8,7 +8,7 @@ This is intended to be a framework that can be used by client code to define the 1. **Scheduled Lambda Functions**: The goal here is that the client can provide their own lambda function (as a container image) and, from that, we will run it on a schedule defined by the client. The lambda function needs to know 1 or more SNS topics to which it will publish messages when it runs; different "types" of messages can go to different SNS topics, which will then be subscribed to by different notification channels. This will include the lambda execution role and the scheduled events. 2. **SNS Topics**: These will be manually created by the client code, but ARNs might be needed to give the lambda permissions to publish. -3. **Notification Channels**: We will provide modules for different notification channels (e.g., email via SES, SMS via Twilio, etc.). Each notification module owns the SNS->SQS->Lambda wiring: it provisions the FIFO SQS queue/subscription used for deduplication and triggers its handler; the user should not create that queue manually. The shared notification runtime will pull results from SQS, render a Jinja2 template, and pass the rendered content to a channel-specific notifier. +3. **Notification Channels**: We will provide modules for different notification channels (e.g., email via SES, SMS via Twilio, etc.). Each notification module owns the SNS->SQS->Lambda wiring: it provisions the FIFO SQS queue/subscription used for deduplication and triggers its handler; the user should not create that queue manually. Each channel ships its own container image (build or republish) that renders a Jinja2 template and delivers via its notifier. 4. **Lambda Image Utilities**: In addition to republishing an existing Lambda container, we will provide a module to build an image from a local directory containing a Dockerfile and publish it to ECR for use by the scheduled-lambda module. 5. **Python Runtime Library**: Provide reusable Python code in `src/cloud_cron/` that makes authoring custom scheduled lambdas easy (task base class, SNS dispatch helpers, and ergonomic handler wiring). This includes a template provider abstraction so notification handlers can source templates from env vars, URLs, or S3. diff --git a/PLAN.md b/PLAN.md index c31bc64..d20c87d 100644 --- a/PLAN.md +++ b/PLAN.md @@ -55,21 +55,23 @@ To-do: - [x] Add `src/cloud_cron/notifications/` for shared handler logic (SES/Twilio/etc.) that can be imported by notification runtimes. - [x] Update `src/cloud_cron/HOWTO-custom-lambda.md` to show the current recommended pattern and env var expectations. - [x] Add pytest cases with moto/mocks to cover SNS publish and mismatch errors. -- [ ] Document a minimal example task module that can be used in `examples/basic` or in a client repo. +- [x] Document a minimal example task module that can be used in `examples/basic` or in a client repo. ## Phase 4: Build notification modules -### Phase 4.1: Shared notification container and queueing infra -- [ ] Create a thin runtime wrapper in `modules/notification-runtime/` that packages a Lambda image and selects handlers via env var/routing key; handler implementations live in `src/cloud_cron/notifications/` (Phase 3) and are imported into the runtime. Reuse shared helpers from `src/cloud_cron/` for logging/dispatch conventions. -- [ ] Terraform: shared container build/publish for notifications; SQS FIFO queue for deduplication between SNS topic and Lambdas; SNS subscription to FIFO SQS with content-based dedup; SQS trigger to Lambda; IAM for SQS poll, logs, SES send, Secrets/SSM read, Twilio access. +### Phase 4.1: Notification containers and queueing infra +- [ ] Build one container per notification channel (email, SMS, print) using shared helpers from `src/cloud_cron/notifications/`; allow build or republish via `lambda-image-build` or `lambda-container`. +- [x] Add a minimal "print" notifier handler that renders the template and logs/prints it for easy testing. +- [x] Terraform: reusable notification plumbing module (SNS FIFO topic -> SQS FIFO queue -> Lambda event source mapping) with SQS access policy output. +- [ ] Terraform: per-channel container build/publish; channel modules use the plumbing module and add channel-specific IAM and config. - [ ] Inputs per module: `sns_topic_arn`, `fifo_queue_name`/settings, handler selector/env vars; shared tags/log retention. -- [ ] Verify: `terraform validate`; example `plan`; container build succeeds locally; pytest skeleton runs. -- [ ] Example touchpoint: extend `examples/basic` to include the notification container + FIFO SQS subscription to the sample SNS topic; run `terraform validate/plan` to confirm SNS->SQS->Lambda path. +- [x] Verify: `terraform validate`; example `plan`; container build succeeds locally; pytest skeleton runs. +- [x] Example touchpoint: extend `examples/basic` to include the print notifier + FIFO SNS/SQS wiring to the sample SNS topic. ### Phase 4.2: Email via SES handler (`modules/email-notification`) - [ ] Define handler contract: expect message payload with subject/template vars; support optional config set/reply-to; log delivery status. - [ ] Python code: SES client wrapper; load template (managed via Terraform) and render with variables; handle throttling/retries and DLQ-safe errors. -- [ ] Terraform: SES template creation; Lambda configuration/env (sender, recipients, template name, config set); permissions for SES send + logs; wire to shared container image and handler selection. +- [ ] Terraform: SES template creation; Lambda configuration/env (sender, recipients, template name, config set); permissions for SES send + logs; wire to the SES-specific container image. - [ ] Tests: pytest with sample SNS/SQS events; stub/moto SES; validate error handling and idempotency. - [ ] Verify: `terraform validate`; handler unit tests green; document smoke test (publish SNS message to topic -> email delivered/SES sandbox note). - [ ] Example touchpoint: wire the email module into `examples/basic` with sample SES template/resources and document the SNS publish -> email expectation. @@ -77,7 +79,7 @@ To-do: ### Phase 4.3: SMS via Twilio handler (`modules/sms-notification`) - [ ] Define handler contract: expect message payload with body/recipients; support per-message override of to-numbers; log Twilio SID/error. - [ ] Python code: Twilio REST client wrapper; read SID/auth token from SSM/Secrets; handle rate limits/retries; sanitize phone numbers; DLQ-safe errors. -- [ ] Terraform: Lambda configuration/env (from-number, default recipients, secret ARNs), IAM for Secrets Manager/SSM read + logs; wire to shared container image and handler selection. +- [ ] Terraform: Lambda configuration/env (from-number, default recipients, secret ARNs), IAM for Secrets Manager/SSM read + logs; wire to the Twilio-specific container image. - [ ] Tests: pytest with mocked Twilio client; cover success/failure paths and secret fetch. - [ ] Verify: `terraform validate`; handler unit tests green; document smoke test (publish SNS message to topic -> SMS sent). - [ ] Example touchpoint: add the SMS module to `examples/basic` (guard secrets/recipients via variables) and include a smoke path in the README. diff --git a/examples/basic/main.tf b/examples/basic/main.tf index ad77e8a..d87b512 100644 --- a/examples/basic/main.tf +++ b/examples/basic/main.tf @@ -26,6 +26,22 @@ module "lambda_image_build" { tags = local.common_tags } +module "print_lambda_image_build" { + source = "../../modules/lambda-image-build" + + source_dir = "${path.module}/../.." + dockerfile_path = "${path.module}/print-notifier/Dockerfile" + build_context_paths = [ + "${path.module}/print-notifier", + "${path.module}/../../src/cloud_cron", + ] + repository_name = var.print_repository_name + image_tag = var.image_tag + platform = var.platform + build_args = var.build_args + tags = local.common_tags +} + module "lambda_container_republish" { count = var.enable_republish ? 1 : 0 source = "../../modules/lambda-container" @@ -41,13 +57,14 @@ module "lambda_container_republish" { locals { active_lambda_image_uri = var.enable_republish ? module.lambda_container_republish[0].lambda_image_uri_with_digest : module.lambda_image_build.image_uri_with_digest + active_print_image_uri = module.print_lambda_image_build.image_uri_with_digest } module "sns_topics" { source = "../../modules/sns-topics" topic_names = { - example = "example-topic" + example = "example-topic.fifo" } tags = local.common_tags @@ -64,6 +81,17 @@ module "scheduled_lambda" { tags = local.common_tags } +module "print_notification" { + source = "../../modules/print-notification" + + sns_topic_arn = module.sns_topics.topic_arns.example + fifo_queue_name = "example-print.fifo" + lambda_image_uri = local.active_print_image_uri + template_file = "${path.module}/templates/print.txt" + + tags = local.common_tags +} + output "built_image_uri" { description = "Image URI built from examples/basic/lambda." value = module.lambda_image_build.image_uri diff --git a/examples/basic/print-notifier/Dockerfile b/examples/basic/print-notifier/Dockerfile new file mode 100644 index 0000000..666dfdf --- /dev/null +++ b/examples/basic/print-notifier/Dockerfile @@ -0,0 +1,10 @@ +FROM public.ecr.aws/lambda/python:3.13 + +RUN pip install --no-cache-dir jinja2 + +# Copy function code and runtime helpers +COPY examples/basic/print-notifier/handler.py ${LAMBDA_TASK_ROOT} +COPY src/cloud_cron ${LAMBDA_TASK_ROOT}/cloud_cron + +# Set the Lambda handler +CMD ["handler.handler"] diff --git a/examples/basic/print-notifier/handler.py b/examples/basic/print-notifier/handler.py new file mode 100644 index 0000000..de6f688 --- /dev/null +++ b/examples/basic/print-notifier/handler.py @@ -0,0 +1,10 @@ +from cloud_cron.notifications.base import EnvVarTemplateProvider +from cloud_cron.notifications.print_handler import PrintNotificationHandler + +handler_instance = PrintNotificationHandler( + template_provider=EnvVarTemplateProvider(), +) + + +def handler(event, context): + handler_instance.lambda_handler(event, context) diff --git a/examples/basic/templates/print.txt b/examples/basic/templates/print.txt new file mode 100644 index 0000000..0e00b0c --- /dev/null +++ b/examples/basic/templates/print.txt @@ -0,0 +1 @@ +Example result: {{ message }} diff --git a/examples/basic/variables.tf b/examples/basic/variables.tf index 4d4a525..7660ac0 100644 --- a/examples/basic/variables.tf +++ b/examples/basic/variables.tf @@ -10,6 +10,12 @@ variable "repository_name" { default = null } +variable "print_repository_name" { + description = "Optional repository name for the print notifier image." + type = string + default = "cloud-cron-print" +} + variable "image_tag" { description = "Tag to use for the locally built image." type = string diff --git a/modules/notification-plumbing/README.md b/modules/notification-plumbing/README.md new file mode 100644 index 0000000..cce86d4 --- /dev/null +++ b/modules/notification-plumbing/README.md @@ -0,0 +1,39 @@ +# Notification Plumbing Module + +Shared SNS → SQS FIFO → Lambda wiring for notification handlers. FIFO SQS queues require FIFO SNS topics. + +## Usage + +```hcl +module "notification_plumbing" { + source = "./modules/notification-plumbing" + + sns_topic_arn = aws_sns_topic.example.arn + lambda_function_arn = aws_lambda_function.print.arn + fifo_queue_name = "example-notifications.fifo" +} +``` + +## Inputs + +- `sns_topic_arn` (string): SNS topic ARN that feeds the notification queue. +- `lambda_function_arn` (string): ARN of the Lambda function that processes SQS messages. +- `fifo_queue_name` (string): Name of the FIFO SQS queue (must end with `.fifo`). +- `content_based_deduplication` (bool): Enable content-based deduplication. Default `true`. +- `visibility_timeout_seconds` (number): Visibility timeout for the queue. Default `30`. +- `message_retention_seconds` (number): Retention period for messages. Default `1209600`. +- `create_dlq` (bool): Whether to create a DLQ. Default `true`. +- `max_receive_count` (number): Receives before moving to DLQ. Default `5`. +- `batch_size` (number): Max records per Lambda invocation. Default `10`. +- `enabled` (bool): Enable the event source mapping. Default `true`. +- `tags` (map(string)): Tags applied to resources. + +## Outputs + +- `queue_arn`: ARN of the notification queue. +- `queue_url`: URL of the notification queue. +- `queue_name`: Name of the notification queue. +- `dlq_arn`: ARN of the DLQ (if created). +- `subscription_arn`: ARN of the SNS subscription. +- `event_source_mapping_uuid`: UUID of the SQS event source mapping. +- `lambda_sqs_policy_json`: IAM policy JSON for Lambda SQS permissions. diff --git a/modules/notification-plumbing/main.tf b/modules/notification-plumbing/main.tf new file mode 100644 index 0000000..84535d4 --- /dev/null +++ b/modules/notification-plumbing/main.tf @@ -0,0 +1,81 @@ +locals { + tags = merge({ managed_by = "cloudcron" }, var.tags) +} + +resource "aws_sqs_queue" "dlq" { + count = var.create_dlq ? 1 : 0 + + name = replace(var.fifo_queue_name, ".fifo", "-dlq.fifo") + fifo_queue = true + content_based_deduplication = var.content_based_deduplication + message_retention_seconds = var.message_retention_seconds + visibility_timeout_seconds = var.visibility_timeout_seconds + tags = local.tags +} + +resource "aws_sqs_queue" "queue" { + name = var.fifo_queue_name + fifo_queue = true + content_based_deduplication = var.content_based_deduplication + message_retention_seconds = var.message_retention_seconds + visibility_timeout_seconds = var.visibility_timeout_seconds + tags = local.tags + + redrive_policy = var.create_dlq ? jsonencode({ + deadLetterTargetArn = aws_sqs_queue.dlq[0].arn + maxReceiveCount = var.max_receive_count + }) : null +} + +resource "aws_sqs_queue_policy" "allow_sns" { + queue_url = aws_sqs_queue.queue.id + + policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + Sid = "AllowSnsPublish" + Effect = "Allow" + Principal = { + Service = "sns.amazonaws.com" + } + Action = "sqs:SendMessage" + Resource = aws_sqs_queue.queue.arn + Condition = { + ArnEquals = { + "aws:SourceArn" = var.sns_topic_arn + } + } + } + ] + }) +} + +resource "aws_sns_topic_subscription" "queue" { + topic_arn = var.sns_topic_arn + protocol = "sqs" + endpoint = aws_sqs_queue.queue.arn + + raw_message_delivery = true +} + +resource "aws_lambda_event_source_mapping" "sqs" { + event_source_arn = aws_sqs_queue.queue.arn + function_name = var.lambda_function_arn + enabled = var.enabled + batch_size = var.batch_size +} + +data "aws_iam_policy_document" "lambda_sqs_access" { + statement { + effect = "Allow" + actions = [ + "sqs:ChangeMessageVisibility", + "sqs:DeleteMessage", + "sqs:GetQueueAttributes", + "sqs:GetQueueUrl", + "sqs:ReceiveMessage", + ] + resources = [aws_sqs_queue.queue.arn] + } +} diff --git a/modules/notification-plumbing/outputs.tf b/modules/notification-plumbing/outputs.tf new file mode 100644 index 0000000..9d09dd6 --- /dev/null +++ b/modules/notification-plumbing/outputs.tf @@ -0,0 +1,34 @@ +output "queue_arn" { + description = "ARN of the notification SQS queue." + value = aws_sqs_queue.queue.arn +} + +output "queue_url" { + description = "URL of the notification SQS queue." + value = aws_sqs_queue.queue.id +} + +output "queue_name" { + description = "Name of the notification SQS queue." + value = aws_sqs_queue.queue.name +} + +output "dlq_arn" { + description = "ARN of the dead-letter queue (if created)." + value = var.create_dlq ? aws_sqs_queue.dlq[0].arn : null +} + +output "subscription_arn" { + description = "ARN of the SNS subscription." + value = aws_sns_topic_subscription.queue.arn +} + +output "event_source_mapping_uuid" { + description = "UUID of the Lambda event source mapping." + value = aws_lambda_event_source_mapping.sqs.uuid +} + +output "lambda_sqs_policy_json" { + description = "IAM policy JSON granting Lambda access to the SQS queue." + value = data.aws_iam_policy_document.lambda_sqs_access.json +} diff --git a/modules/notification-plumbing/variables.tf b/modules/notification-plumbing/variables.tf new file mode 100644 index 0000000..95f7162 --- /dev/null +++ b/modules/notification-plumbing/variables.tf @@ -0,0 +1,70 @@ +variable "sns_topic_arn" { + description = "SNS topic ARN that feeds the notification queue." + type = string + validation { + condition = can(regex("\\.fifo$", var.sns_topic_arn)) + error_message = "sns_topic_arn must be a FIFO SNS topic ARN ending with .fifo." + } +} + +variable "lambda_function_arn" { + description = "ARN of the Lambda function that processes SQS messages." + type = string +} + +variable "fifo_queue_name" { + description = "Name of the FIFO SQS queue (must end with .fifo)." + type = string + validation { + condition = endswith(var.fifo_queue_name, ".fifo") + error_message = "fifo_queue_name must end with .fifo." + } +} + +variable "content_based_deduplication" { + description = "Enable content-based deduplication for the FIFO queue." + type = bool + default = true +} + +variable "visibility_timeout_seconds" { + description = "Visibility timeout for the SQS queue." + type = number + default = 30 +} + +variable "message_retention_seconds" { + description = "Retention period for messages in the queue." + type = number + default = 1209600 +} + +variable "create_dlq" { + description = "Whether to create a dead-letter queue." + type = bool + default = true +} + +variable "max_receive_count" { + description = "Number of receives before sending to the DLQ." + type = number + default = 5 +} + +variable "batch_size" { + description = "Maximum number of records per Lambda invocation." + type = number + default = 10 +} + +variable "enabled" { + description = "Enable the SQS event source mapping." + type = bool + default = true +} + +variable "tags" { + description = "Tags to apply to created resources." + type = map(string) + default = {} +} diff --git a/modules/notification-plumbing/versions.tf b/modules/notification-plumbing/versions.tf new file mode 100644 index 0000000..80e2b27 --- /dev/null +++ b/modules/notification-plumbing/versions.tf @@ -0,0 +1,10 @@ +terraform { + required_version = ">= 1.5.0" + + required_providers { + aws = { + source = "hashicorp/aws" + version = ">= 4.0" + } + } +} diff --git a/modules/print-notification/README.md b/modules/print-notification/README.md new file mode 100644 index 0000000..3cb2d72 --- /dev/null +++ b/modules/print-notification/README.md @@ -0,0 +1,38 @@ +# Print Notification Module + +Notification Lambda that renders a template and prints it. Intended for testing. + +## Usage + +```hcl +module "print_notification" { + source = "./modules/print-notification" + + sns_topic_arn = aws_sns_topic.example.arn + fifo_queue_name = "example-print.fifo" + lambda_image_uri = module.print_image.image_uri_with_digest + + template_file = "${path.module}/templates/print.txt" +} +``` + +## Inputs + +- `sns_topic_arn` (string): SNS topic ARN that feeds the notification queue. +- `fifo_queue_name` (string): Name of the FIFO SQS queue (must end with `.fifo`). +- `lambda_image_uri` (string): URI of the Lambda container image. +- `lambda_name` (string): Optional name for the Lambda function. +- `template_env_var` (string): Environment variable for the template. Default `TEMPLATE`. +- `template_file` (string): Path to the template file stored in the template env var. +- `timeout` (number): Lambda timeout in seconds. Default `30`. +- `memory_size` (number): Lambda memory size in MB. Default `256`. +- `batch_size` (number): Max records per Lambda invocation. Default `10`. +- `enabled` (bool): Enable the event source mapping. Default `true`. +- `tags` (map(string)): Tags applied to resources. + +## Outputs + +- `lambda_arn`: ARN of the print notification Lambda. +- `queue_arn`: ARN of the notification SQS queue. +- `queue_url`: URL of the notification SQS queue. +- `subscription_arn`: ARN of the SNS subscription. diff --git a/modules/print-notification/main.tf b/modules/print-notification/main.tf new file mode 100644 index 0000000..bcc9a34 --- /dev/null +++ b/modules/print-notification/main.tf @@ -0,0 +1,86 @@ +locals { + tags = merge({ managed_by = "cloudcron" }, var.tags) + lambda_name = coalesce(var.lambda_name, "cloudcron-print-${terraform.workspace}") +} + +data "aws_caller_identity" "current" {} + +resource "aws_iam_role" "lambda_role" { + name = "${local.lambda_name}-role" + assume_role_policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + Effect = "Allow" + Principal = { + Service = "lambda.amazonaws.com" + } + Action = "sts:AssumeRole" + } + ] + }) + tags = local.tags +} + +module "plumbing" { + source = "../notification-plumbing" + + sns_topic_arn = var.sns_topic_arn + lambda_function_arn = aws_lambda_function.print.arn + fifo_queue_name = var.fifo_queue_name + batch_size = var.batch_size + enabled = var.enabled + tags = local.tags +} + +resource "aws_iam_policy" "lambda_logs_policy" { + name = "${local.lambda_name}-logs" + policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + Effect = "Allow" + Action = [ + "logs:CreateLogGroup", + "logs:CreateLogStream", + "logs:PutLogEvents", + ] + Resource = "arn:aws:logs:*:${data.aws_caller_identity.current.account_id}:*" + }, + ] + }) + tags = local.tags +} + +resource "aws_iam_role_policy_attachment" "lambda_logs_attachment" { + role = aws_iam_role.lambda_role.name + policy_arn = aws_iam_policy.lambda_logs_policy.arn +} + +resource "aws_lambda_function" "print" { + function_name = local.lambda_name + role = aws_iam_role.lambda_role.arn + package_type = "Image" + image_uri = var.lambda_image_uri + timeout = var.timeout + memory_size = var.memory_size + + environment { + variables = { + (var.template_env_var) = file(var.template_file) + } + } + + tags = local.tags +} + +resource "aws_iam_policy" "lambda_sqs_policy" { + name = "${local.lambda_name}-sqs" + policy = module.plumbing.lambda_sqs_policy_json + tags = local.tags +} + +resource "aws_iam_role_policy_attachment" "lambda_sqs_attachment" { + role = aws_iam_role.lambda_role.name + policy_arn = aws_iam_policy.lambda_sqs_policy.arn +} diff --git a/modules/print-notification/outputs.tf b/modules/print-notification/outputs.tf new file mode 100644 index 0000000..5b1561e --- /dev/null +++ b/modules/print-notification/outputs.tf @@ -0,0 +1,19 @@ +output "lambda_arn" { + description = "ARN of the print notification Lambda." + value = aws_lambda_function.print.arn +} + +output "queue_arn" { + description = "ARN of the SQS queue feeding the Lambda." + value = module.plumbing.queue_arn +} + +output "queue_url" { + description = "URL of the SQS queue feeding the Lambda." + value = module.plumbing.queue_url +} + +output "subscription_arn" { + description = "ARN of the SNS subscription to the queue." + value = module.plumbing.subscription_arn +} diff --git a/modules/print-notification/variables.tf b/modules/print-notification/variables.tf new file mode 100644 index 0000000..a7303eb --- /dev/null +++ b/modules/print-notification/variables.tf @@ -0,0 +1,61 @@ +variable "sns_topic_arn" { + description = "SNS topic ARN that feeds the notification queue." + type = string +} + +variable "fifo_queue_name" { + description = "Name of the FIFO SQS queue (must end with .fifo)." + type = string +} + +variable "lambda_image_uri" { + description = "URI of the Lambda container image." + type = string +} + +variable "lambda_name" { + description = "Optional name for the Lambda function." + type = string + default = null +} + +variable "template_env_var" { + description = "Environment variable that stores the template." + type = string + default = "TEMPLATE" +} + +variable "template_file" { + description = "Path to the template file to store in the template environment variable." + type = string +} + +variable "timeout" { + description = "Lambda timeout in seconds." + type = number + default = 30 +} + +variable "memory_size" { + description = "Lambda memory size in MB." + type = number + default = 256 +} + +variable "batch_size" { + description = "Maximum number of records per Lambda invocation." + type = number + default = 10 +} + +variable "enabled" { + description = "Enable the SQS event source mapping." + type = bool + default = true +} + +variable "tags" { + description = "Tags to apply to created resources." + type = map(string) + default = {} +} diff --git a/modules/print-notification/versions.tf b/modules/print-notification/versions.tf new file mode 100644 index 0000000..80e2b27 --- /dev/null +++ b/modules/print-notification/versions.tf @@ -0,0 +1,10 @@ +terraform { + required_version = ">= 1.5.0" + + required_providers { + aws = { + source = "hashicorp/aws" + version = ">= 4.0" + } + } +} diff --git a/modules/sns-topics/main.tf b/modules/sns-topics/main.tf index aab47dd..6931e0f 100644 --- a/modules/sns-topics/main.tf +++ b/modules/sns-topics/main.tf @@ -1,10 +1,16 @@ locals { tags = merge({ managed_by = "cloudcron" }, var.tags) + topic_names = { + for key, name in var.topic_names : + key => !endswith(name, ".fifo") ? "${name}.fifo" : name + } } resource "aws_sns_topic" "topics" { - for_each = var.topic_names + for_each = local.topic_names - name = each.value - tags = local.tags + name = each.value + fifo_topic = true + content_based_deduplication = true + tags = local.tags } diff --git a/src/cloud_cron/lambda_task.py b/src/cloud_cron/lambda_task.py index f0cfda2..cba6b40 100644 --- a/src/cloud_cron/lambda_task.py +++ b/src/cloud_cron/lambda_task.py @@ -149,6 +149,23 @@ def load_sns_topics(env_var: str = "SNS_TOPICS") -> dict[str, str]: return raw +def load_sns_message_group_id(env_var: str = "SNS_MESSAGE_GROUP_ID") -> str: + """ + Load the SNS FIFO message group ID from the environment. + + Parameters + ---------- + env_var : str, optional + Environment variable name containing the group ID. + + Returns + ------- + str + Message group ID for FIFO SNS topics. + """ + return os.environ.get(env_var, "cloudcron") + + def validate_sns_result( result: Mapping[str, Any], sns_topics: Mapping[str, str] ) -> None: @@ -204,6 +221,7 @@ def dispatch_sns_messages( TopicArn=topic_arn, Message=json.dumps(message), Subject=f"Notification for {topic_name}", + MessageGroupId=load_sns_message_group_id(), ) logger.info( "sns_publish", diff --git a/src/cloud_cron/notifications/__init__.py b/src/cloud_cron/notifications/__init__.py index 9ed4687..db73138 100644 --- a/src/cloud_cron/notifications/__init__.py +++ b/src/cloud_cron/notifications/__init__.py @@ -3,9 +3,11 @@ NotificationHandler, TemplateProvider, ) +from cloud_cron.notifications.print_handler import PrintNotificationHandler __all__ = [ "EnvVarTemplateProvider", "NotificationHandler", + "PrintNotificationHandler", "TemplateProvider", ] diff --git a/src/cloud_cron/notifications/print_handler.py b/src/cloud_cron/notifications/print_handler.py new file mode 100644 index 0000000..7140258 --- /dev/null +++ b/src/cloud_cron/notifications/print_handler.py @@ -0,0 +1,39 @@ +from typing import Any, Mapping + +from cloud_cron.notifications.base import NotificationHandler + + +class PrintNotificationHandler(NotificationHandler): + """ + Notification handler that logs rendered templates for testing. + + Parameters + ---------- + template_provider : TemplateProvider + Provider that returns the template string for rendering. + expected_queue_arn : str, optional + Queue ARN to validate incoming SQS records. + logger : logging.Logger, optional + Logger used for structured logging. + """ + + def notify( + self, + *, + result: Mapping[str, Any], + rendered: str, + record: Mapping[str, Any], + ) -> None: + """ + Log the rendered notification payload. + + Parameters + ---------- + result : Mapping[str, Any] + Parsed result payload from the SNS-to-SQS pipeline. + rendered : str + Rendered template output. + record : Mapping[str, Any] + Original SQS record for additional metadata. + """ + print(rendered) diff --git a/tests/notifications/test_print_handler.py b/tests/notifications/test_print_handler.py new file mode 100644 index 0000000..caae545 --- /dev/null +++ b/tests/notifications/test_print_handler.py @@ -0,0 +1,16 @@ +import json +from cloud_cron.notifications.base import EnvVarTemplateProvider +from cloud_cron.notifications.print_handler import PrintNotificationHandler + + +def test_print_handler_prints_rendered_template(monkeypatch, capsys): + monkeypatch.setenv("TEMPLATE", "Hello {{ name }}") + handler = PrintNotificationHandler(template_provider=EnvVarTemplateProvider()) + event = { + "Records": [{"body": json.dumps({"name": "Ada"}), "eventSource": "aws:sqs"}] + } + + handler.lambda_handler(event, context=None) + + captured = capsys.readouterr() + assert captured.out.strip() == "Hello Ada" diff --git a/tests/test_lambda_task.py b/tests/test_lambda_task.py index 68fab95..db4129e 100644 --- a/tests/test_lambda_task.py +++ b/tests/test_lambda_task.py @@ -9,6 +9,7 @@ CronLambdaTask, dispatch_sns_messages, extract_context_metadata, + load_sns_message_group_id, load_sns_topics, validate_sns_result, ) @@ -37,6 +38,16 @@ def test_load_sns_topics_invalid_mapping(monkeypatch): load_sns_topics() +def test_load_sns_message_group_id_default(monkeypatch): + monkeypatch.delenv("SNS_MESSAGE_GROUP_ID", raising=False) + assert load_sns_message_group_id() == "cloudcron" + + +def test_load_sns_message_group_id_override(monkeypatch): + monkeypatch.setenv("SNS_MESSAGE_GROUP_ID", "custom-group") + assert load_sns_message_group_id() == "custom-group" + + def test_validate_sns_result_allows_subset(): sns_topics = {"success": "arn:one", "failure": "arn:two"} result = {"success": {"ok": True}} @@ -68,11 +79,13 @@ def test_dispatch_sns_messages_publishes(caplog): TopicArn="arn:one", Message=json.dumps({"ok": True}), Subject="Notification for success", + MessageGroupId="cloudcron", ) sns_client.publish.assert_any_call( TopicArn="arn:two", Message=json.dumps({"ok": False}), Subject="Notification for failure", + MessageGroupId="cloudcron", ) @@ -98,6 +111,7 @@ def _perform_task(self, event, context): TopicArn="arn:one", Message=json.dumps({"ok": True}), Subject="Notification for success", + MessageGroupId="cloudcron", )