Skip to content

feat(kafka): New Kafka utility #1898

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 42 commits into from
Jun 18, 2025
Merged
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
9900ab3
Add initial code for KafkaJson and KafkaAvro request handlers.
phipag May 15, 2025
47cbc76
Add deserialization via @Deserialization annotation.
phipag May 16, 2025
680b979
Add TODOs in code.
phipag May 16, 2025
15f2923
Fix typos and make AbstractKafkaDeserializer package private.
phipag May 16, 2025
7c4bf30
Merge phipag/kafka-event from public repository into feature/kafka
phipag May 21, 2025
6ac583f
Remove request handler implementation in favor for @Deserialization a…
phipag May 21, 2025
92ae6ab
Parse Timestamp type correctly.
phipag May 21, 2025
8e13dd6
Remove custom RequestHandler implementation example.
phipag May 21, 2025
623585a
Merge branch 'main' into feature/kafka
phipag May 26, 2025
5ea6d49
Make AspectJ version compatible with min version Java 11.
phipag May 26, 2025
73e64e4
Clarify exception message when deserialization fails.
phipag May 27, 2025
cbe9181
Add more advanced JSON escpaing to JSONSerializer in logging module.
phipag May 27, 2025
64e7080
Add protobuf deserialization logic and fully working example.
phipag May 27, 2025
f081424
Add Maven profile to compile a JAR with different dependency combinat…
phipag May 27, 2025
e11db9a
Add minimal kafka example.
phipag May 28, 2025
4282a77
Add missing copyright.
phipag May 28, 2025
64f7e18
Add unit tests for kafka utility.
phipag May 28, 2025
1191c56
Add minimal kafka example to examples module in pom.xml.
phipag May 29, 2025
db9b989
Add some comments.
phipag May 29, 2025
b64dcbd
Update powertools-examples-kafka with README and make it more minimal…
phipag Jun 6, 2025
4624c12
Implement PR feedback from Karthik.
phipag Jun 6, 2025
598cc27
Fix SAM outputs.
phipag Jun 6, 2025
92f6f8f
Do not fail on unknown properties when deserializating into KafkaEvent.
phipag Jun 6, 2025
7fcc989
Merge branch 'main' into feature/kafka
phipag Jun 16, 2025
77845af
Allow customers to bring their own kafka-clients dependency.
phipag Jun 16, 2025
e4875d8
Add Kafka utility documentation.
phipag Jun 16, 2025
767109b
Update project version consistently to 2.0.0.
phipag Jun 16, 2025
3e6a8b7
fix: Fix bug where abbreviated _HANDLER env var did not detect the De…
phipag Jun 17, 2025
ef04849
fix: Bug when trying to deserialize a type into itself for Lambda def…
phipag Jun 17, 2025
6da89a3
When falling back to Lambda default, handle conversion between InputS…
phipag Jun 17, 2025
2be14dd
Raise a runtime exception when the KafkaEvent is invalid.
phipag Jun 17, 2025
04cf14a
docs: Announce deprecation of v1
phipag Jun 16, 2025
f02c8fd
fix(metrics): Do not flush when no metrics were added to avoid printi…
phipag Jun 16, 2025
fa29f60
Merge branch 'main' into feature/kafka
phipag Jun 18, 2025
3c76357
Rename docs to Kafka Consumer and add line highlights for code examples.
phipag Jun 18, 2025
ffebe8c
Fix Spotbug issues.
phipag Jun 18, 2025
55c860a
Reduce cognitive complexity of DeserializationUtils making it more mo…
phipag Jun 18, 2025
03e5b11
Reduce cognitive complexity of AbstractKafkaDeserializer.
phipag Jun 18, 2025
a5689e9
Enable removal policy DESTROY on e2e test for kinesis streams and SQS…
phipag Jun 18, 2025
8f12c04
Replace System.out with Powertools Logging.
phipag Jun 18, 2025
335279e
Add notice about kafka-clients compatibility.
phipag Jun 18, 2025
e9654a9
Add sentence stating that Avro / Protobuf classes can be autogenerated.
phipag Jun 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,001 changes: 1,001 additions & 0 deletions docs/utilities/kafka.md

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion examples/pom.xml
Original file line number Diff line number Diff line change
@@ -39,6 +39,7 @@
<module>powertools-examples-parameters/sam</module>
<module>powertools-examples-parameters/sam-graalvm</module>
<module>powertools-examples-serialization</module>
<module>powertools-examples-kafka</module>
<module>powertools-examples-batch</module>
<module>powertools-examples-validation</module>
<module>powertools-examples-cloudformation</module>
@@ -58,4 +59,4 @@
</plugins>
</build>

</project>
</project>
77 changes: 77 additions & 0 deletions examples/powertools-examples-kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Powertools for AWS Lambda (Java) - Kafka Example

This project demonstrates how to use Powertools for AWS Lambda (Java) to deserialize Kafka Lambda events directly into strongly typed Kafka ConsumerRecords<K, V> using different serialization formats.

## Overview

The example showcases automatic deserialization of Kafka Lambda events into ConsumerRecords using three formats:
- JSON - Using standard JSON serialization
- Avro - Using Apache Avro schema-based serialization
- Protobuf - Using Google Protocol Buffers serialization

Each format has its own Lambda function handler that demonstrates how to use the `@Deserialization` annotation with the appropriate `DeserializationType`, eliminating the need to handle complex deserialization logic manually.

## Build and Deploy

### Prerequisites
- [AWS SAM CLI](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html)
- Java 11+
- Maven

### Build

```bash
# Build the application
sam build
```

### Deploy

```bash
# Deploy the application to AWS
sam deploy --guided
```

During the guided deployment, you'll be prompted to provide values for required parameters. After deployment, SAM will output the ARNs of the deployed Lambda functions.

### Build with Different Serialization Formats

The project includes Maven profiles to build with different serialization formats:

```bash
# Build with JSON only (no Avro or Protobuf)
mvn clean package -P base

# Build with Avro only
mvn clean package -P avro-only

# Build with Protobuf only
mvn clean package -P protobuf-only

# Build with all formats (default)
mvn clean package -P full
```

## Testing

The `events` directory contains sample events for each serialization format:
- `kafka-json-event.json` - Sample event with JSON-serialized products
- `kafka-avro-event.json` - Sample event with Avro-serialized products
- `kafka-protobuf-event.json` - Sample event with Protobuf-serialized products

You can use these events to test the Lambda functions:

```bash
# Test the JSON deserialization function
sam local invoke JsonDeserializationFunction --event events/kafka-json-event.json

# Test the Avro deserialization function
sam local invoke AvroDeserializationFunction --event events/kafka-avro-event.json

# Test the Protobuf deserialization function
sam local invoke ProtobufDeserializationFunction --event events/kafka-protobuf-event.json
```

## Sample Generator Tool

The project includes a tool to generate sample JSON, Avro, and Protobuf serialized data. See the [tools/README.md](tools/README.md) for more information.
51 changes: 51 additions & 0 deletions examples/powertools-examples-kafka/events/kafka-avro-event.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
{
"eventSource": "aws:kafka",
"eventSourceArn": "arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4",
"bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
"records": {
"mytopic-0": [
{
"topic": "mytopic",
"partition": 0,
"offset": 15,
"timestamp": 1545084650987,
"timestampType": "CREATE_TIME",
"key": "NDI=",
"value": "0g8MTGFwdG9wUrgehes/j0A=",
"headers": [
{
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
}
]
},
{
"topic": "mytopic",
"partition": 0,
"offset": 16,
"timestamp": 1545084650988,
"timestampType": "CREATE_TIME",
"key": "NDI=",
"value": "1A8UU21hcnRwaG9uZVK4HoXrv4JA",
"headers": [
{
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
}
]
},
{
"topic": "mytopic",
"partition": 0,
"offset": 17,
"timestamp": 1545084650989,
"timestampType": "CREATE_TIME",
"key": null,
"value": "1g8USGVhZHBob25lc0jhehSuv2JA",
"headers": [
{
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
}
]
}
]
}
}
51 changes: 51 additions & 0 deletions examples/powertools-examples-kafka/events/kafka-json-event.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
{
"eventSource": "aws:kafka",
"eventSourceArn": "arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4",
"bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
"records": {
"mytopic-0": [
{
"topic": "mytopic",
"partition": 0,
"offset": 15,
"timestamp": 1545084650987,
"timestampType": "CREATE_TIME",
"key": "NDI=",
"value": "eyJwcmljZSI6OTk5Ljk5LCJuYW1lIjoiTGFwdG9wIiwiaWQiOjEwMDF9",
"headers": [
{
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
}
]
},
{
"topic": "mytopic",
"partition": 0,
"offset": 15,
"timestamp": 1545084650987,
"timestampType": "CREATE_TIME",
"key": "NDI=",
"value": "eyJwcmljZSI6NTk5Ljk5LCJuYW1lIjoiU21hcnRwaG9uZSIsImlkIjoxMDAyfQ==",
"headers": [
{
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
}
]
},
{
"topic": "mytopic",
"partition": 0,
"offset": 15,
"timestamp": 1545084650987,
"timestampType": "CREATE_TIME",
"key": null,
"value": "eyJwcmljZSI6MTQ5Ljk5LCJuYW1lIjoiSGVhZHBob25lcyIsImlkIjoxMDAzfQ==",
"headers": [
{
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
}
]
}
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
{
"eventSource": "aws:kafka",
"eventSourceArn": "arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4",
"bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
"records": {
"mytopic-0": [
{
"topic": "mytopic",
"partition": 0,
"offset": 15,
"timestamp": 1545084650987,
"timestampType": "CREATE_TIME",
"key": "NDI=",
"value": "COkHEgZMYXB0b3AZUrgehes/j0A=",
"headers": [
{
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
}
]
},
{
"topic": "mytopic",
"partition": 0,
"offset": 16,
"timestamp": 1545084650988,
"timestampType": "CREATE_TIME",
"key": "NDI=",
"value": "COoHEgpTbWFydHBob25lGVK4HoXrv4JA",
"headers": [
{
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
}
]
},
{
"topic": "mytopic",
"partition": 0,
"offset": 17,
"timestamp": 1545084650989,
"timestampType": "CREATE_TIME",
"key": null,
"value": "COsHEgpIZWFkcGhvbmVzGUjhehSuv2JA",
"headers": [
{
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
}
]
}
]
}
}
232 changes: 232 additions & 0 deletions examples/powertools-examples-kafka/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>software.amazon.lambda.examples</groupId>
<version>2.0.0</version>
<artifactId>powertools-examples-kafka</artifactId>
<packaging>jar</packaging>
<name>Powertools for AWS Lambda (Java) - Examples - Kafka</name>

<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<aspectj.version>1.9.20.1</aspectj.version>
<avro.version>1.12.0</avro.version>
<protobuf.version>4.31.0</protobuf.version>
</properties>

<dependencies>
<dependency>
<groupId>software.amazon.lambda</groupId>
<artifactId>powertools-kafka</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>4.0.0</version> <!-- Supports >= 3.0.0 -->
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>

<!-- Basic logging setup -->
<dependency>
<groupId>software.amazon.lambda</groupId>
<artifactId>powertools-logging-log4j</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjrt</artifactId>
<version>${aspectj.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<!-- Don't deploy the example -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<version>3.1.4</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.6.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<transformers>
<transformer
implementation="org.apache.logging.log4j.maven.plugins.shade.transformer.Log4j2PluginCacheFileTransformer" />
</transformers>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-transform-maven-shade-plugin-extensions</artifactId>
<version>0.2.0</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>dev.aspectj</groupId>
<artifactId>aspectj-maven-plugin</artifactId>
<version>1.14</version>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
<complianceLevel>${maven.compiler.target}</complianceLevel>
<aspectLibraries>
<aspectLibrary>
<groupId>software.amazon.lambda</groupId>
<artifactId>powertools-logging</artifactId>
</aspectLibrary>
</aspectLibraries>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjtools</artifactId>
<version>${aspectj.version}</version>
</dependency>
</dependencies>
</plugin>
<!-- Generate Avro classes from schema -->
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
<stringType>String</stringType>
</configuration>
</execution>
</executions>
</plugin>
<!-- Generate Protobuf classes from schema -->
<plugin>
<groupId>io.github.ascopes</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>3.3.0</version>
<executions>
<execution>
<goals>
<goal>generate</goal>
</goals>
<phase>generate-sources</phase>
<configuration>
<protocVersion>${protobuf.version}</protocVersion>
<sourceDirectories>
<sourceDirectory>${project.basedir}/src/main/proto</sourceDirectory>
</sourceDirectories>
<outputDirectory>${project.basedir}/src/main/java</outputDirectory>
<clearOutputDirectory>false</clearOutputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

<profiles>
<!-- Base profile without Avro or Protobuf (compatible with JSON only) -->
<profile>
<id>base</id>
<properties>
<active.profile>base</active.profile>
</properties>
<dependencies>
<!-- Exclude both Avro and Protobuf -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</profile>

<!-- Profile with only Avro -->
<profile>
<id>avro-only</id>
<properties>
<active.profile>avro-only</active.profile>
</properties>
<dependencies>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</profile>

<!-- Profile with only Protobuf -->
<profile>
<id>protobuf-only</id>
<properties>
<active.profile>protobuf-only</active.profile>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</profile>

<!-- Profile with both Avro and Protobuf (default) -->
<profile>
<id>full</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<active.profile>full</active.profile>
</properties>
</profile>
</profiles>
</project>
10 changes: 10 additions & 0 deletions examples/powertools-examples-kafka/src/main/avro/AvroProduct.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"namespace": "org.demo.kafka.avro",
"type": "record",
"name": "AvroProduct",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "price", "type": "double"}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.demo.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.demo.kafka.avro.AvroProduct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;

import software.amazon.lambda.powertools.kafka.Deserialization;
import software.amazon.lambda.powertools.kafka.DeserializationType;
import software.amazon.lambda.powertools.logging.Logging;

public class AvroDeserializationFunction implements RequestHandler<ConsumerRecords<String, AvroProduct>, String> {

private static final Logger LOGGER = LoggerFactory.getLogger(AvroDeserializationFunction.class);

@Override
@Logging
@Deserialization(type = DeserializationType.KAFKA_AVRO)
public String handleRequest(ConsumerRecords<String, AvroProduct> records, Context context) {
for (ConsumerRecord<String, AvroProduct> consumerRecord : records) {
LOGGER.info("ConsumerRecord: {}", consumerRecord);

AvroProduct product = consumerRecord.value();
LOGGER.info("AvroProduct: {}", product);

String key = consumerRecord.key();
LOGGER.info("Key: {}", key);
}

return "OK";
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.demo.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;

import software.amazon.lambda.powertools.kafka.Deserialization;
import software.amazon.lambda.powertools.kafka.DeserializationType;
import software.amazon.lambda.powertools.logging.Logging;

public class JsonDeserializationFunction implements RequestHandler<ConsumerRecords<String, Product>, String> {

private static final Logger LOGGER = LoggerFactory.getLogger(JsonDeserializationFunction.class);

@Override
@Logging
@Deserialization(type = DeserializationType.KAFKA_JSON)
public String handleRequest(ConsumerRecords<String, Product> consumerRecords, Context context) {
for (ConsumerRecord<String, Product> consumerRecord : consumerRecords) {
LOGGER.info("ConsumerRecord: {}", consumerRecord);

Product product = consumerRecord.value();
LOGGER.info("Product: {}", product);

String key = consumerRecord.key();
LOGGER.info("Key: {}", key);
}

return "OK";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2023 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.demo.kafka;

public class Product {
private long id;
private String name;
private double price;

public Product() {

Check failure on line 22 in examples/powertools-examples-kafka/src/main/java/org/demo/kafka/Product.java

GitHub Actions / pmd_analyse

Document empty constructor

Uncommented Empty Constructor finds instances where a constructor does not contain statements, but there is no comment. By explicitly commenting empty constructors it is easier to distinguish between intentional (commented) and unintentional empty constructors. UncommentedEmptyConstructor (Priority: 1, Ruleset: Documentation) https://docs.pmd-code.org/pmd-doc-7.14.0/pmd_rules_java_documentation.html#uncommentedemptyconstructor
}

public Product(long id, String name, double price) {
this.id = id;
this.name = name;
this.price = price;
}

public long getId() {
return id;
}

public void setId(long id) {
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public double getPrice() {
return price;
}

public void setPrice(double price) {
this.price = price;
}

@Override
public String toString() {
return "Product{" +
"id=" + id +
", name='" + name + '\'' +
", price=" + price +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.demo.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.demo.kafka.protobuf.ProtobufProduct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;

import software.amazon.lambda.powertools.kafka.Deserialization;
import software.amazon.lambda.powertools.kafka.DeserializationType;
import software.amazon.lambda.powertools.logging.Logging;

public class ProtobufDeserializationFunction
implements RequestHandler<ConsumerRecords<String, ProtobufProduct>, String> {

private static final Logger LOGGER = LoggerFactory.getLogger(ProtobufDeserializationFunction.class);

@Override
@Logging
@Deserialization(type = DeserializationType.KAFKA_PROTOBUF)
public String handleRequest(ConsumerRecords<String, ProtobufProduct> records, Context context) {
for (ConsumerRecord<String, ProtobufProduct> consumerRecord : records) {
LOGGER.info("ConsumerRecord: {}", consumerRecord);

ProtobufProduct product = consumerRecord.value();
LOGGER.info("ProtobufProduct: {}", product);

String key = consumerRecord.key();
LOGGER.info("Key: {}", key);
}

return "OK";
}

}

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
syntax = "proto3";

package org.demo.kafka.protobuf;

option java_package = "org.demo.kafka.protobuf";
option java_outer_classname = "ProtobufProductOuterClass";
option java_multiple_files = true;

message ProtobufProduct {
int32 id = 1;
string name = 2;
double price = 3;
}
16 changes: 16 additions & 0 deletions examples/powertools-examples-kafka/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<Configuration packages="com.amazonaws.services.lambda.runtime.log4j2">
<Appenders>
<Console name="JsonAppender" target="SYSTEM_OUT">
<JsonTemplateLayout eventTemplateUri="classpath:LambdaJsonLayout.json" />
</Console>
</Appenders>
<Loggers>
<Logger name="JsonLogger" level="INFO" additivity="false">
<AppenderRef ref="JsonAppender"/>
</Logger>
<Root level="info">
<AppenderRef ref="JsonAppender"/>
</Root>
</Loggers>
</Configuration>
59 changes: 59 additions & 0 deletions examples/powertools-examples-kafka/template.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
AWSTemplateFormatVersion: "2010-09-09"
Transform: AWS::Serverless-2016-10-31
Description: >
Kafka Deserialization example with Kafka Lambda ESM
Globals:
Function:
Timeout: 20
Runtime: java11
MemorySize: 512
Tracing: Active

Resources:
JsonDeserializationFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: .
Handler: org.demo.kafka.JsonDeserializationFunction::handleRequest
Environment:
Variables:
JAVA_TOOL_OPTIONS: "-XX:+TieredCompilation -XX:TieredStopAtLevel=1"
POWERTOOLS_LOG_LEVEL: DEBUG
POWERTOOLS_SERVICE_NAME: JsonDeserialization
POWERTOOLS_METRICS_NAMESPACE: JsonDeserializationFunction

AvroDeserializationFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: .
Handler: org.demo.kafka.AvroDeserializationFunction::handleRequest
Environment:
Variables:
JAVA_TOOL_OPTIONS: "-XX:+TieredCompilation -XX:TieredStopAtLevel=1"
POWERTOOLS_LOG_LEVEL: DEBUG
POWERTOOLS_SERVICE_NAME: AvroDeserialization
POWERTOOLS_METRICS_NAMESPACE: AvroDeserializationFunction

ProtobufDeserializationFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: .
Handler: org.demo.kafka.ProtobufDeserializationFunction::handleRequest
Environment:
Variables:
JAVA_TOOL_OPTIONS: "-XX:+TieredCompilation -XX:TieredStopAtLevel=1"
POWERTOOLS_LOG_LEVEL: DEBUG
POWERTOOLS_SERVICE_NAME: ProtobufDeserialization
POWERTOOLS_METRICS_NAMESPACE: ProtobufDeserializationFunction

Outputs:
JsonFunction:
Description: "Kafka JSON Lambda Function ARN"
Value: !GetAtt JsonDeserializationFunction.Arn
AvroFunction:
Description: "Kafka Avro Lambda Function ARN"
Value: !GetAtt AvroDeserializationFunction.Arn
ProtobufFunction:
Description: "Kafka Protobuf Lambda Function ARN"
Value: !GetAtt ProtobufDeserializationFunction.Arn
66 changes: 66 additions & 0 deletions examples/powertools-examples-kafka/tools/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Kafka Sample Generator Tool

This tool generates base64-encoded serialized products for testing the Kafka consumer functions with different serialization formats.

## Supported Formats

- **JSON**: Generates base64-encoded JSON serialized products
- **Avro**: Generates base64-encoded Avro serialized products
- **Protobuf**: Generates base64-encoded Protobuf serialized products

## Usage

Run the following Maven commands from this directory:

```bash
# Generate Avro and Protobuf classes from schemas
mvn generate-sources

# Compile the code
mvn compile
```

### Generate JSON Samples

```bash
# Run the JSON sample generator
mvn exec:java -Dexec.mainClass="org.demo.kafka.tools.GenerateJsonSamples"
```

The tool will output base64-encoded values for JSON products that can be used in `../events/kafka-json-event.json`.

### Generate Avro Samples

```bash
# Run the Avro sample generator
mvn exec:java -Dexec.mainClass="org.demo.kafka.tools.GenerateAvroSamples"
```

The tool will output base64-encoded values for Avro products that can be used in `../events/kafka-avro-event.json`.

### Generate Protobuf Samples

```bash
# Run the Protobuf sample generator
mvn exec:java -Dexec.mainClass="org.demo.kafka.tools.GenerateProtobufSamples"
```

The tool will output base64-encoded values for Protobuf products that can be used in `../events/kafka-protobuf-event.json`.

## Output

Each generator produces:

1. Three different products (Laptop, Smartphone, Headphones)
2. An integer key (42) and one entry with a nullish key to test for edge-cases
3. A complete sample event structure that can be used directly for testing

## Example

After generating the samples, you can copy the output into the respective event files:

- `../events/kafka-json-event.json` for JSON samples
- `../events/kafka-avro-event.json` for Avro samples
- `../events/kafka-protobuf-event.json` for Protobuf samples

These event files can then be used to test the Lambda functions with the appropriate deserializer.
104 changes: 104 additions & 0 deletions examples/powertools-examples-kafka/tools/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>software.amazon.lambda.examples</groupId>
<artifactId>powertools-examples-kafka-tools</artifactId>
<version>2.0.0</version>

<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<avro.version>1.12.0</avro.version>
<protobuf.version>4.31.0</protobuf.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.19.0</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/../src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
<stringType>String</stringType>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>io.github.ascopes</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>3.3.0</version>
<executions>
<execution>
<goals>
<goal>generate</goal>
</goals>
<phase>generate-sources</phase>
<configuration>
<protocVersion>${protobuf.version}</protocVersion>
<sourceDirectories>
<sourceDirectory>${project.basedir}/../src/main/proto</sourceDirectory>
</sourceDirectories>
<outputDirectory>${project.basedir}/src/main/java</outputDirectory>
<clearOutputDirectory>false</clearOutputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<id>generate-json-samples</id>
<configuration>
<mainClass>org.demo.kafka.tools.GenerateJsonSamples</mainClass>
</configuration>
</execution>
<execution>
<id>generate-avro-samples</id>
<configuration>
<mainClass>org.demo.kafka.tools.GenerateAvroSamples</mainClass>
</configuration>
</execution>
<execution>
<id>generate-protobuf-samples</id>
<configuration>
<mainClass>org.demo.kafka.tools.GenerateProtobufSamples</mainClass>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package org.demo.kafka.tools;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Base64;

import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.demo.kafka.avro.AvroProduct;

/**
* Utility class to generate base64-encoded Avro serialized products
* for use in test events.
*/
public class GenerateAvroSamples {

public static void main(String[] args) throws IOException {
// Create three different products
AvroProduct product1 = new AvroProduct(1001, "Laptop", 999.99);
AvroProduct product2 = new AvroProduct(1002, "Smartphone", 599.99);
AvroProduct product3 = new AvroProduct(1003, "Headphones", 149.99);

// Serialize and encode each product
String encodedProduct1 = serializeAndEncode(product1);
String encodedProduct2 = serializeAndEncode(product2);
String encodedProduct3 = serializeAndEncode(product3);

// Serialize and encode an integer key
String encodedKey = serializeAndEncodeInteger(42);

// Print the results
System.out.println("Base64 encoded Avro products for use in kafka-avro-event.json:");
System.out.println("\nProduct 1 (with key):");
System.out.println("key: \"" + encodedKey + "\",");
System.out.println("value: \"" + encodedProduct1 + "\",");

System.out.println("\nProduct 2 (with key):");
System.out.println("key: \"" + encodedKey + "\",");
System.out.println("value: \"" + encodedProduct2 + "\",");

System.out.println("\nProduct 3 (without key):");
System.out.println("key: null,");
System.out.println("value: \"" + encodedProduct3 + "\",");

// Print a sample event structure
System.out.println("\nSample event structure:");
printSampleEvent(encodedKey, encodedProduct1, encodedProduct2, encodedProduct3);
}

private static String serializeAndEncode(AvroProduct product) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(baos, null);
DatumWriter<AvroProduct> writer = new SpecificDatumWriter<>(AvroProduct.class);

writer.write(product, encoder);
encoder.flush();

return Base64.getEncoder().encodeToString(baos.toByteArray());
}

private static String serializeAndEncodeInteger(Integer value) throws IOException {
// For simple types like integers, we'll just convert to string and encode
return Base64.getEncoder().encodeToString(value.toString().getBytes());
}

private static void printSampleEvent(String key, String product1, String product2, String product3) {
System.out.println("{\n" +
" \"eventSource\": \"aws:kafka\",\n" +
" \"eventSourceArn\": \"arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4\",\n" +
" \"bootstrapServers\": \"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092\",\n" +
" \"records\": {\n" +
" \"mytopic-0\": [\n" +
" {\n" +
" \"topic\": \"mytopic\",\n" +
" \"partition\": 0,\n" +
" \"offset\": 15,\n" +
" \"timestamp\": 1545084650987,\n" +
" \"timestampType\": \"CREATE_TIME\",\n" +
" \"key\": \"" + key + "\",\n" +
" \"value\": \"" + product1 + "\",\n" +
" \"headers\": [\n" +
" {\n" +
" \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" +
" }\n" +
" ]\n" +
" },\n" +
" {\n" +
" \"topic\": \"mytopic\",\n" +
" \"partition\": 0,\n" +
" \"offset\": 16,\n" +
" \"timestamp\": 1545084650988,\n" +
" \"timestampType\": \"CREATE_TIME\",\n" +
" \"key\": \"" + key + "\",\n" +
" \"value\": \"" + product2 + "\",\n" +
" \"headers\": [\n" +
" {\n" +
" \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" +
" }\n" +
" ]\n" +
" },\n" +
" {\n" +
" \"topic\": \"mytopic\",\n" +
" \"partition\": 0,\n" +
" \"offset\": 17,\n" +
" \"timestamp\": 1545084650989,\n" +
" \"timestampType\": \"CREATE_TIME\",\n" +
" \"key\": null,\n" +
" \"value\": \"" + product3 + "\",\n" +
" \"headers\": [\n" +
" {\n" +
" \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" +
" }\n" +
" ]\n" +
" }\n" +
" ]\n" +
" }\n" +
"}");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package org.demo.kafka.tools;

import java.io.IOException;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;

import com.fasterxml.jackson.databind.ObjectMapper;

/**
* Utility class to generate base64-encoded JSON serialized products
* for use in test events.
*/
public class GenerateJsonSamples {

public static void main(String[] args) throws IOException {
// Create three different products
Map<String, Object> product1 = new HashMap<>();
product1.put("id", 1001);
product1.put("name", "Laptop");
product1.put("price", 999.99);

Map<String, Object> product2 = new HashMap<>();
product2.put("id", 1002);
product2.put("name", "Smartphone");
product2.put("price", 599.99);

Map<String, Object> product3 = new HashMap<>();
product3.put("id", 1003);
product3.put("name", "Headphones");
product3.put("price", 149.99);

// Serialize and encode each product
String encodedProduct1 = serializeAndEncode(product1);
String encodedProduct2 = serializeAndEncode(product2);
String encodedProduct3 = serializeAndEncode(product3);

// Serialize and encode an integer key
String encodedKey = serializeAndEncodeInteger(42);

// Print the results
System.out.println("Base64 encoded JSON products for use in kafka-json-event.json:");
System.out.println("\nProduct 1 (with key):");
System.out.println("key: \"" + encodedKey + "\",");
System.out.println("value: \"" + encodedProduct1 + "\",");

System.out.println("\nProduct 2 (with key):");
System.out.println("key: \"" + encodedKey + "\",");
System.out.println("value: \"" + encodedProduct2 + "\",");

System.out.println("\nProduct 3 (without key):");
System.out.println("key: null,");
System.out.println("value: \"" + encodedProduct3 + "\",");

// Print a sample event structure
System.out.println("\nSample event structure:");
printSampleEvent(encodedKey, encodedProduct1, encodedProduct2, encodedProduct3);
}

private static String serializeAndEncode(Map<String, Object> product) throws IOException {
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(product);
return Base64.getEncoder().encodeToString(json.getBytes());
}

private static String serializeAndEncodeInteger(Integer value) {
// For simple types like integers, we'll just convert to string and encode
return Base64.getEncoder().encodeToString(value.toString().getBytes());
}

private static void printSampleEvent(String key, String product1, String product2, String product3) {
System.out.println("{\n" +
" \"eventSource\": \"aws:kafka\",\n" +
" \"eventSourceArn\": \"arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4\",\n"
+
" \"bootstrapServers\": \"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092\",\n"
+
" \"records\": {\n" +
" \"mytopic-0\": [\n" +
" {\n" +
" \"topic\": \"mytopic\",\n" +
" \"partition\": 0,\n" +
" \"offset\": 15,\n" +
" \"timestamp\": 1545084650987,\n" +
" \"timestampType\": \"CREATE_TIME\",\n" +
" \"key\": \"" + key + "\",\n" +
" \"value\": \"" + product1 + "\",\n" +
" \"headers\": [\n" +
" {\n" +
" \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" +
" }\n" +
" ]\n" +
" },\n" +
" {\n" +
" \"topic\": \"mytopic\",\n" +
" \"partition\": 0,\n" +
" \"offset\": 15,\n" +
" \"timestamp\": 1545084650987,\n" +
" \"timestampType\": \"CREATE_TIME\",\n" +
" \"key\": \"" + key + "\",\n" +
" \"value\": \"" + product2 + "\",\n" +
" \"headers\": [\n" +
" {\n" +
" \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" +
" }\n" +
" ]\n" +
" },\n" +
" {\n" +
" \"topic\": \"mytopic\",\n" +
" \"partition\": 0,\n" +
" \"offset\": 15,\n" +
" \"timestamp\": 1545084650987,\n" +
" \"timestampType\": \"CREATE_TIME\",\n" +
" \"key\": null,\n" +
" \"value\": \"" + product3 + "\",\n" +
" \"headers\": [\n" +
" {\n" +
" \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" +
" }\n" +
" ]\n" +
" }\n" +
" ]\n" +
" }\n" +
"}");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package org.demo.kafka.tools;

import java.io.IOException;
import java.util.Base64;

import org.demo.kafka.protobuf.ProtobufProduct;

/**
* Utility class to generate base64-encoded Protobuf serialized products
* for use in test events.
*/
public class GenerateProtobufSamples {

public static void main(String[] args) throws IOException {
// Create three different products
ProtobufProduct product1 = ProtobufProduct.newBuilder()
.setId(1001)
.setName("Laptop")
.setPrice(999.99)
.build();

ProtobufProduct product2 = ProtobufProduct.newBuilder()
.setId(1002)
.setName("Smartphone")
.setPrice(599.99)
.build();

ProtobufProduct product3 = ProtobufProduct.newBuilder()
.setId(1003)
.setName("Headphones")
.setPrice(149.99)
.build();

// Serialize and encode each product
String encodedProduct1 = serializeAndEncode(product1);
String encodedProduct2 = serializeAndEncode(product2);
String encodedProduct3 = serializeAndEncode(product3);

// Serialize and encode an integer key
String encodedKey = serializeAndEncodeInteger(42);

// Print the results
System.out.println("Base64 encoded Protobuf products for use in kafka-protobuf-event.json:");
System.out.println("\nProduct 1 (with key):");
System.out.println("key: \"" + encodedKey + "\",");
System.out.println("value: \"" + encodedProduct1 + "\",");

System.out.println("\nProduct 2 (with key):");
System.out.println("key: \"" + encodedKey + "\",");
System.out.println("value: \"" + encodedProduct2 + "\",");

System.out.println("\nProduct 3 (without key):");
System.out.println("key: null,");
System.out.println("value: \"" + encodedProduct3 + "\",");

// Print a sample event structure
System.out.println("\nSample event structure:");
printSampleEvent(encodedKey, encodedProduct1, encodedProduct2, encodedProduct3);
}

private static String serializeAndEncode(ProtobufProduct product) {
return Base64.getEncoder().encodeToString(product.toByteArray());
}

private static String serializeAndEncodeInteger(Integer value) {
// For simple types like integers, we'll just convert to string and encode
return Base64.getEncoder().encodeToString(value.toString().getBytes());
}

private static void printSampleEvent(String key, String product1, String product2, String product3) {
System.out.println("{\n" +
" \"eventSource\": \"aws:kafka\",\n" +
" \"eventSourceArn\": \"arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4\",\n"
+
" \"bootstrapServers\": \"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092\",\n"
+
" \"records\": {\n" +
" \"mytopic-0\": [\n" +
" {\n" +
" \"topic\": \"mytopic\",\n" +
" \"partition\": 0,\n" +
" \"offset\": 15,\n" +
" \"timestamp\": 1545084650987,\n" +
" \"timestampType\": \"CREATE_TIME\",\n" +
" \"key\": \"" + key + "\",\n" +
" \"value\": \"" + product1 + "\",\n" +
" \"headers\": [\n" +
" {\n" +
" \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" +
" }\n" +
" ]\n" +
" },\n" +
" {\n" +
" \"topic\": \"mytopic\",\n" +
" \"partition\": 0,\n" +
" \"offset\": 16,\n" +
" \"timestamp\": 1545084650988,\n" +
" \"timestampType\": \"CREATE_TIME\",\n" +
" \"key\": \"" + key + "\",\n" +
" \"value\": \"" + product2 + "\",\n" +
" \"headers\": [\n" +
" {\n" +
" \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" +
" }\n" +
" ]\n" +
" },\n" +
" {\n" +
" \"topic\": \"mytopic\",\n" +
" \"partition\": 0,\n" +
" \"offset\": 17,\n" +
" \"timestamp\": 1545084650989,\n" +
" \"timestampType\": \"CREATE_TIME\",\n" +
" \"key\": null,\n" +
" \"value\": \"" + product3 + "\",\n" +
" \"headers\": [\n" +
" {\n" +
" \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" +
" }\n" +
" ]\n" +
" }\n" +
" ]\n" +
" }\n" +
"}");
}
}
7 changes: 5 additions & 2 deletions mkdocs.yml
Original file line number Diff line number Diff line change
@@ -15,8 +15,9 @@ nav:
- Utilities:
- utilities/idempotency.md
- utilities/parameters.md
- utilities/large_messages.md
- utilities/batch.md
- utilities/kafka.md
- utilities/large_messages.md
- utilities/validation.md
- utilities/custom_resources.md
- utilities/serialization.md
@@ -101,8 +102,9 @@ plugins:
Utilities:
- utilities/idempotency.md
- utilities/parameters.md
- utilities/large_messages.md
- utilities/batch.md
- utilities/kafka.md
- utilities/large_messages.md
- utilities/validation.md
- utilities/custom_resources.md
- utilities/serialization.md
@@ -115,6 +117,7 @@ extra_css:
extra_javascript:
- javascript/aws-amplify.min.js
- javascript/extra.js
- https://docs.powertools.aws.dev/shared/mermaid.min.js

extra:
powertools:
7 changes: 5 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -56,6 +56,7 @@
<modules>
<module>powertools-common</module>
<module>powertools-serialization</module>
<module>powertools-kafka</module>
<module>powertools-logging</module>
<module>powertools-logging/powertools-logging-log4j</module>
<module>powertools-logging/powertools-logging-logback</module>
@@ -113,7 +114,9 @@
<aws.sdk.v1.version>1.12.781</aws.sdk.v1.version>
<versions-maven-plugin.version>2.18.0</versions-maven-plugin.version>
<elastic.version>1.6.0</elastic.version>
<mockito.version>5.17.0</mockito.version>
<mockito.version>5.18.0</mockito.version>
<mockito-junit-jupiter.version>5.18.0</mockito-junit-jupiter.version>
<junit-pioneer.version>2.3.0</junit-pioneer.version>

<!-- As we have a .mvn directory at the root of the project, this will evaluate to the root directory
regardless of where maven is run - sub-module, or root. -->
@@ -355,7 +358,7 @@
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<version>${mockito.version}</version>
<version>${mockito-junit-jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
2 changes: 1 addition & 1 deletion powertools-e2e-tests/handlers/batch/pom.xml
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@
<parent>
<groupId>software.amazon.lambda</groupId>
<artifactId>e2e-test-handlers-parent</artifactId>
<version>2.0.0-SNAPSHOT</version>
<version>2.0.0</version>
</parent>

<artifactId>e2e-test-handler-batch</artifactId>
2 changes: 1 addition & 1 deletion powertools-e2e-tests/handlers/idempotency/pom.xml
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@
<parent>
<groupId>software.amazon.lambda</groupId>
<artifactId>e2e-test-handlers-parent</artifactId>
<version>2.0.0-SNAPSHOT</version>
<version>2.0.0</version>
</parent>

<artifactId>e2e-test-handler-idempotency</artifactId>
2 changes: 1 addition & 1 deletion powertools-e2e-tests/handlers/largemessage/pom.xml
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@
<parent>
<groupId>software.amazon.lambda</groupId>
<artifactId>e2e-test-handlers-parent</artifactId>
<version>2.0.0-SNAPSHOT</version>
<version>2.0.0</version>
</parent>

<artifactId>e2e-test-handler-largemessage</artifactId>
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@
<parent>
<groupId>software.amazon.lambda</groupId>
<artifactId>e2e-test-handlers-parent</artifactId>
<version>2.0.0-SNAPSHOT</version>
<version>2.0.0</version>
</parent>

<artifactId>e2e-test-handler-large-msg-idempotent</artifactId>
2 changes: 1 addition & 1 deletion powertools-e2e-tests/handlers/logging/pom.xml
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@
<parent>
<groupId>software.amazon.lambda</groupId>
<artifactId>e2e-test-handlers-parent</artifactId>
<version>2.0.0-SNAPSHOT</version>
<version>2.0.0</version>
</parent>

<artifactId>e2e-test-handler-logging</artifactId>
2 changes: 1 addition & 1 deletion powertools-e2e-tests/handlers/metrics/pom.xml
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@
<parent>
<groupId>software.amazon.lambda</groupId>
<artifactId>e2e-test-handlers-parent</artifactId>
<version>2.0.0-SNAPSHOT</version>
<version>2.0.0</version>
</parent>

<artifactId>e2e-test-handler-metrics</artifactId>
2 changes: 1 addition & 1 deletion powertools-e2e-tests/handlers/parameters/pom.xml
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@
<parent>
<groupId>software.amazon.lambda</groupId>
<artifactId>e2e-test-handlers-parent</artifactId>
<version>2.0.0-SNAPSHOT</version>
<version>2.0.0</version>
</parent>

<artifactId>e2e-test-handler-parameters</artifactId>
4 changes: 2 additions & 2 deletions powertools-e2e-tests/handlers/pom.xml
Original file line number Diff line number Diff line change
@@ -4,13 +4,13 @@

<groupId>software.amazon.lambda</groupId>
<artifactId>e2e-test-handlers-parent</artifactId>
<version>2.0.0-SNAPSHOT</version>
<version>2.0.0</version>
<packaging>pom</packaging>
<name>Handlers for End-to-End tests</name>
<description>Fake handlers that use Powertools for AWS Lambda (Java).</description>

<properties>
<lambda.powertools.version>2.0.0-SNAPSHOT</lambda.powertools.version>
<lambda.powertools.version>2.0.0</lambda.powertools.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
2 changes: 1 addition & 1 deletion powertools-e2e-tests/handlers/tracing/pom.xml
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@
<parent>
<groupId>software.amazon.lambda</groupId>
<artifactId>e2e-test-handlers-parent</artifactId>
<version>2.0.0-SNAPSHOT</version>
<version>2.0.0</version>
</parent>

<artifactId>e2e-test-handler-tracing</artifactId>
2 changes: 1 addition & 1 deletion powertools-e2e-tests/handlers/validation-alb-event/pom.xml
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@
<parent>
<groupId>software.amazon.lambda</groupId>
<artifactId>e2e-test-handlers-parent</artifactId>
<version>2.0.0-SNAPSHOT</version>
<version>2.0.0</version>
</parent>

<artifactId>e2e-test-handler-validation-alb-event</artifactId>
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@
<parent>
<groupId>software.amazon.lambda</groupId>
<artifactId>e2e-test-handlers-parent</artifactId>
<version>2.0.0-SNAPSHOT</version>
<version>2.0.0</version>
</parent>

<artifactId>e2e-test-handler-validation-apigw-event</artifactId>
Original file line number Diff line number Diff line change
@@ -290,6 +290,7 @@ private Stack createStackWithLambda() {
.queueName(queue)
.visibilityTimeout(Duration.seconds(timeout * 6))
.retentionPeriod(Duration.seconds(timeout * 6))
.removalPolicy(RemovalPolicy.DESTROY)
.build();
DeadLetterQueue.builder()
.queue(sqsQueue)
@@ -314,6 +315,7 @@ private Stack createStackWithLambda() {
.create(e2eStack, "KinesisStream")
.streamMode(StreamMode.ON_DEMAND)
.streamName(kinesisStream)
.removalPolicy(RemovalPolicy.DESTROY)
.build();

stream.grantRead(function);
223 changes: 223 additions & 0 deletions powertools-kafka/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2023 Amazon.com, Inc. or its affiliates.
~ Licensed under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~ http://www.apache.org/licenses/LICENSE-2.0
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>powertools-parent</artifactId>
<groupId>software.amazon.lambda</groupId>
<version>2.0.0</version>
</parent>

<artifactId>powertools-kafka</artifactId>
<packaging>jar</packaging>

<name>Powertools for AWS Lambda (Java) - Kafka Consumer</name>
<description></description>

<properties>
<kafka-clients.version>4.0.0</kafka-clients.version>
<avro.version>1.12.0</avro.version>
<protobuf.version>4.31.0</protobuf.version>
<lambda-serialization.version>1.1.5</lambda-serialization.version>
</properties>

<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-core</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-events</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka-clients.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-serialization</artifactId>
<version>${lambda-serialization.version}</version>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit-pioneer</groupId>
<artifactId>junit-pioneer</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
<testResources>
<testResource>
<directory>src/test/resources</directory>
</testResource>
</testResources>
<plugins>
<plugin>
<groupId>dev.aspectj</groupId>
<artifactId>aspectj-maven-plugin</artifactId>
<version>${aspectj-maven-plugin.version}</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<!-- https://junit-pioneer.org/docs/environment-variables/#warnings-for-reflective-access -->
<!-- @{argLine} makes sure not other args are lost. They are required for jacoco coverage reports. -->
<argLine>
@{argLine}
--add-opens java.base/java.util=ALL-UNNAMED
--add-opens java.base/java.lang=ALL-UNNAMED
</argLine>
</configuration>
</plugin>
<!-- Avro plugin for test classes only -->
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<id>generate-test-sources</id>
<phase>generate-test-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/test/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/target/generated-test-sources/avro/</outputDirectory>
<stringType>String</stringType>
<testSourceDirectory>${project.basedir}/src/test/avro/</testSourceDirectory>
<testOutputDirectory>${project.basedir}/target/generated-test-sources/avro/</testOutputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<!-- Protobuf plugin for test classes only -->
<plugin>
<groupId>io.github.ascopes</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>3.3.0</version>
<executions>
<execution>
<id>generate-test-sources</id>
<goals>
<goal>generate-test</goal>
</goals>
<phase>generate-test-sources</phase>
<configuration>
<protocVersion>${protobuf.version}</protocVersion>
<sourceDirectories>
<sourceDirectory>${project.basedir}/src/test/proto</sourceDirectory>
</sourceDirectories>
<outputDirectory>${project.basedir}/target/generated-test-sources/protobuf</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<!-- Add generated test sources to build -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.6.0</version>
<executions>
<execution>
<id>add-test-source</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>${project.basedir}/target/generated-test-sources/avro</source>
<source>${project.basedir}/target/generated-test-sources/protobuf</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2023 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.lambda.powertools.kafka;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Annotation to specify the deserialization type for Kafka messages.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Deserialization {
/**
* The type of deserialization to use.
* @return the deserialization type
*/
DeserializationType type();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright 2023 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.lambda.powertools.kafka;

public enum DeserializationType {
LAMBDA_DEFAULT, KAFKA_JSON, KAFKA_AVRO, KAFKA_PROTOBUF
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2023 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.lambda.powertools.kafka;

import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Type;
import java.util.Map;

import com.amazonaws.services.lambda.runtime.CustomPojoSerializer;
import com.amazonaws.services.lambda.runtime.serialization.factories.JacksonFactory;

import software.amazon.lambda.powertools.kafka.internal.DeserializationUtils;
import software.amazon.lambda.powertools.kafka.serializers.KafkaAvroDeserializer;
import software.amazon.lambda.powertools.kafka.serializers.KafkaJsonDeserializer;
import software.amazon.lambda.powertools.kafka.serializers.KafkaProtobufDeserializer;
import software.amazon.lambda.powertools.kafka.serializers.LambdaDefaultDeserializer;
import software.amazon.lambda.powertools.kafka.serializers.PowertoolsDeserializer;

/**
* Custom Lambda serializer supporting Kafka events. It delegates to the appropriate deserializer based on the
* deserialization type specified by {@link software.amazon.lambda.powertools.kafka.Deserialization} annotation.
*
* Kafka serializers need to be specified explicitly, otherwise, the default Lambda serializer from
* {@link com.amazonaws.services.lambda.runtime.serialization.factories.JacksonFactory} will be used.
*/
public class PowertoolsSerializer implements CustomPojoSerializer {
private static final Map<DeserializationType, PowertoolsDeserializer> DESERIALIZERS = Map.of(
DeserializationType.KAFKA_JSON, new KafkaJsonDeserializer(),
DeserializationType.KAFKA_AVRO, new KafkaAvroDeserializer(),
DeserializationType.KAFKA_PROTOBUF, new KafkaProtobufDeserializer(),
DeserializationType.LAMBDA_DEFAULT, new LambdaDefaultDeserializer());

private final PowertoolsDeserializer deserializer;

public PowertoolsSerializer() {
this.deserializer = DESERIALIZERS.getOrDefault(
DeserializationUtils.determineDeserializationType(),
new LambdaDefaultDeserializer());
}

@Override
public <T> T fromJson(InputStream input, Type type) {
return deserializer.fromJson(input, type);
}

@Override
public <T> T fromJson(String input, Type type) {
return deserializer.fromJson(input, type);
}

@Override
public <T> void toJson(T value, OutputStream output, Type type) {
// This is the Lambda default Output serialization
JacksonFactory.getInstance().getSerializer(type).toJson(value, output);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2023 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.lambda.powertools.kafka.internal;

import java.lang.reflect.Method;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.services.lambda.runtime.RequestHandler;

import software.amazon.lambda.powertools.kafka.Deserialization;
import software.amazon.lambda.powertools.kafka.DeserializationType;

/**
* Utility class to determine the deserialization type from Lambda request handler methods annotated with
* {@link Deserialization} utility.
*
* Relies on the Lambda _HANDLER environment variable to detect the currently active handler method.
*/
public final class DeserializationUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(DeserializationUtils.class);

private DeserializationUtils() {
}

public static DeserializationType determineDeserializationType() {
String handler = System.getenv("_HANDLER");
if (handler == null || handler.trim().isEmpty()) {
LOGGER.error("Cannot determine deserialization type. No valid handler found in _HANDLER: {}", handler);
return DeserializationType.LAMBDA_DEFAULT;
}

try {
HandlerInfo handlerInfo = parseHandler(handler);
Class<?> handlerClazz = Class.forName(handlerInfo.className);

if (!RequestHandler.class.isAssignableFrom(handlerClazz)) {
LOGGER.warn("Class '{}' does not implement RequestHandler. Ignoring.", handlerInfo.className);
return DeserializationType.LAMBDA_DEFAULT;
}

return findDeserializationType(handlerClazz, handlerInfo.methodName);
} catch (Exception e) {
LOGGER.warn("Cannot determine deserialization type. Defaulting to standard.", e);
return DeserializationType.LAMBDA_DEFAULT;
}
}

private static HandlerInfo parseHandler(String handler) {
if (handler.contains("::")) {
int separatorIndex = handler.indexOf("::");
String className = handler.substring(0, separatorIndex);
String methodName = handler.substring(separatorIndex + 2);
return new HandlerInfo(className, methodName);
}

return new HandlerInfo(handler);
}

private static DeserializationType findDeserializationType(Class<?> handlerClass, String methodName) {
for (Method method : handlerClass.getDeclaredMethods()) {
if (method.getName().equals(methodName) && method.isAnnotationPresent(Deserialization.class)) {
Deserialization annotation = method.getAnnotation(Deserialization.class);
LOGGER.debug("Found deserialization type: {}", annotation.type());
return annotation.type();
}
}

return DeserializationType.LAMBDA_DEFAULT;
}

private static class HandlerInfo {
final String className;
final String methodName;

HandlerInfo(String className) {
this(className, "handleRequest");
}

HandlerInfo(String className, String methodName) {
this.className = className;
this.methodName = methodName;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,294 @@
/*
* Copyright 2023 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.lambda.powertools.kafka.serializers;

import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;

import com.amazonaws.services.lambda.runtime.events.KafkaEvent;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;

/**
* Abstract base class for Kafka deserializers that implements common functionality.
*/
abstract class AbstractKafkaDeserializer implements PowertoolsDeserializer {
protected static final ObjectMapper objectMapper = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

/**
* Deserialize JSON from InputStream into ConsumerRecords
*
* @param input InputStream containing JSON data
* @param type Type representing ConsumerRecords<K, V>
* @param <T> The type to deserialize to
* @return Deserialized ConsumerRecords object
* @throws IllegalArgumentException if type is not ConsumerRecords
*/
@SuppressWarnings("unchecked")
@Override
public <T> T fromJson(InputStream input, Type type) {
if (!isConsumerRecordsType(type)) {
throw new IllegalArgumentException("Type must be ConsumerRecords<K, V> when using this deserializer");
}

try {
// Parse the KafkaEvent from the input stream
KafkaEvent kafkaEvent = objectMapper.readValue(input, KafkaEvent.class);

// Extract the key and value types from the ConsumerRecords<K, V> type
ParameterizedType parameterizedType = (ParameterizedType) type;
Type[] typeArguments = parameterizedType.getActualTypeArguments();
Class<?> keyType = (Class<?>) typeArguments[0];
Class<?> valueType = (Class<?>) typeArguments[1];

// Convert KafkaEvent to ConsumerRecords
return (T) convertToConsumerRecords(kafkaEvent, keyType, valueType);
} catch (IOException e) {
throw new RuntimeException("Failed to deserialize Lambda handler input to ConsumerRecords", e);
}
}

/**
* Deserialize JSON from String into ConsumerRecords
*
* @param input String containing JSON data
* @param type Type representing ConsumerRecords<K, V>
* @param <T> The type to deserialize to
* @return Deserialized ConsumerRecords object
* @throws IllegalArgumentException if type is not ConsumerRecords
*/
@SuppressWarnings("unchecked")
@Override
public <T> T fromJson(String input, Type type) {
if (!isConsumerRecordsType(type)) {
throw new IllegalArgumentException("Type must be ConsumerRecords<K, V> when using this deserializer");
}

try {
// Parse the KafkaEvent from the input string
KafkaEvent kafkaEvent = objectMapper.readValue(input, KafkaEvent.class);

// Extract the key and value types from the ConsumerRecords<K, V> type
ParameterizedType parameterizedType = (ParameterizedType) type;
Type[] typeArguments = parameterizedType.getActualTypeArguments();
Class<?> keyType = (Class<?>) typeArguments[0];
Class<?> valueType = (Class<?>) typeArguments[1];

// Convert KafkaEvent to ConsumerRecords
return (T) convertToConsumerRecords(kafkaEvent, keyType, valueType);
} catch (IOException e) {
throw new RuntimeException("Failed to deserialize Lambda handler input to ConsumerRecords", e);
}
}

private boolean isConsumerRecordsType(Type type) {
if (!(type instanceof ParameterizedType)) {
return false;
}

ParameterizedType parameterizedType = (ParameterizedType) type;
return parameterizedType.getRawType().equals(ConsumerRecords.class);
}

private <K, V> ConsumerRecords<K, V> convertToConsumerRecords(KafkaEvent kafkaEvent, Class<K> keyType,
Class<V> valueType) {
// Validate that this is actually a Kafka event by checking for required properties
if (kafkaEvent == null || kafkaEvent.getEventSource() == null) {
throw new RuntimeException(
"Failed to deserialize Lambda handler input to ConsumerRecords: Input is not a valid Kafka event.");
}

if (kafkaEvent.getRecords() == null) {
return ConsumerRecords.empty();
}

Map<TopicPartition, List<ConsumerRecord<K, V>>> recordsMap = new HashMap<>();

for (Map.Entry<String, List<KafkaEvent.KafkaEventRecord>> entry : kafkaEvent.getRecords().entrySet()) {
String topic = entry.getKey();

for (KafkaEvent.KafkaEventRecord eventRecord : entry.getValue()) {
ConsumerRecord<K, V> consumerRecord = convertToConsumerRecord(topic, eventRecord, keyType, valueType);

TopicPartition topicPartition = new TopicPartition(topic, eventRecord.getPartition());
recordsMap.computeIfAbsent(topicPartition, k -> new ArrayList<>()).add(consumerRecord);
}
}

return createConsumerRecords(recordsMap);
}

/**
* Creates ConsumerRecords with compatibility for both Kafka 3.x.x and 4.x.x.
*
* @param <K> Key type
* @param <V> Value type
* @param records Map of records by topic partition
* @return ConsumerRecords instance
*/
protected <K, V> ConsumerRecords<K, V> createConsumerRecords(
Map<TopicPartition, List<ConsumerRecord<K, V>>> records) {
try {
// Try to use the Kafka 4.x.x constructor with nextOffsets parameter
return new ConsumerRecords<>(records, Map.of());
} catch (NoSuchMethodError e) {
// Fall back to Kafka 3.x.x constructor if 4.x.x is not available
return new ConsumerRecords<>(records);
}
}

private <K, V> ConsumerRecord<K, V> convertToConsumerRecord(
String topic,
KafkaEvent.KafkaEventRecord eventRecord,
Class<K> keyType,
Class<V> valueType) {

K key = deserializeField(eventRecord.getKey(), keyType, "key");
V value = deserializeField(eventRecord.getValue(), valueType, "value");
Headers headers = extractHeaders(eventRecord);

return new ConsumerRecord<>(
topic,
eventRecord.getPartition(),
eventRecord.getOffset(),
eventRecord.getTimestamp(),
TimestampType.valueOf(eventRecord.getTimestampType()),
// We set these to NULL_SIZE since they are not relevant in the Lambda environment due to ESM
// pre-processing.
ConsumerRecord.NULL_SIZE,
ConsumerRecord.NULL_SIZE,
key,
value,
headers,
Optional.empty());
}

private <T> T deserializeField(String encodedData, Class<T> type, String fieldName) {
if (encodedData == null) {
return null;
}

try {
byte[] decodedBytes = Base64.getDecoder().decode(encodedData);
return deserialize(decodedBytes, type);
} catch (Exception e) {
throw new RuntimeException("Failed to deserialize Kafka record " + fieldName + ".", e);
}
}

private Headers extractHeaders(KafkaEvent.KafkaEventRecord eventRecord) {
Headers headers = new RecordHeaders();
if (eventRecord.getHeaders() != null) {
for (Map<String, byte[]> headerMap : eventRecord.getHeaders()) {
for (Map.Entry<String, byte[]> header : headerMap.entrySet()) {
if (header.getValue() != null) {
headers.add(header.getKey(), header.getValue());
}
}
}
}

return headers;
}

/**
* Template method to be implemented by subclasses for specific deserialization logic
* for complex types (non-primitives).
*
* @param <T> The type to deserialize to
* @param data The byte array to deserialize coming from the base64 decoded Kafka field
* @param type The class type to deserialize to
* @return The deserialized object
* @throws IOException If deserialization fails
*/
protected abstract <T> T deserializeObject(byte[] data, Class<T> type) throws IOException;

/**
* Main deserialize method that handles primitive types and delegates to subclasses for complex types.
*
* @param <T> The type to deserialize to
* @param data The byte array to deserialize
* @param type The class type to deserialize to
* @return The deserialized object
* @throws IOException If deserialization fails
*/
private <T> T deserialize(byte[] data, Class<T> type) throws IOException {
// First try to deserialize as a primitive type
T result = deserializePrimitive(data, type);
if (result != null) {
return result;
}

// Delegate to subclass for complex type deserialization
return deserializeObject(data, type);
}

/**
* Helper method for handling primitive types and String deserialization.
*
* @param <T> The type to deserialize to
* @param data The byte array to deserialize
* @param type The class type to deserialize to
* @return The deserialized primitive or String, or null if not a primitive or String
*/
@SuppressWarnings("unchecked")
private <T> T deserializePrimitive(byte[] data, Class<T> type) {
// Handle String type
if (type == String.class) {
return (T) new String(data, StandardCharsets.UTF_8);
}

// Handle primitive types and their wrappers
String str = new String(data, StandardCharsets.UTF_8);

if (type == Integer.class || type == int.class) {
return (T) Integer.valueOf(str);
} else if (type == Long.class || type == long.class) {
return (T) Long.valueOf(str);
} else if (type == Double.class || type == double.class) {
return (T) Double.valueOf(str);
} else if (type == Float.class || type == float.class) {
return (T) Float.valueOf(str);
} else if (type == Boolean.class || type == boolean.class) {
return (T) Boolean.valueOf(str);
} else if (type == Byte.class || type == byte.class) {
return (T) Byte.valueOf(str);
} else if (type == Short.class || type == short.class) {
return (T) Short.valueOf(str);
} else if (type == Character.class || type == char.class) {
if (!str.isEmpty()) {
return (T) Character.valueOf(str.charAt(0));
}
throw new IllegalArgumentException("Cannot convert empty string to char");
}

return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2023 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.lambda.powertools.kafka.serializers;

import java.io.IOException;

import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificRecordBase;

/**
* Deserializer for Kafka records using Avro format.
*/
public class KafkaAvroDeserializer extends AbstractKafkaDeserializer {

@Override
protected <T> T deserializeObject(byte[] data, Class<T> type) throws IOException {
// If no Avro generated class is passed we cannot deserialize using Avro
if (SpecificRecordBase.class.isAssignableFrom(type)) {
try {
DatumReader<T> datumReader = new SpecificDatumReader<>(type);
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);

return datumReader.read(null, decoder);
} catch (Exception e) {
throw new IOException("Failed to deserialize Avro data.", e);
}
} else {
throw new IOException("Unsupported type for Avro deserialization: " + type.getName() + ". "
+ "Avro deserialization requires a type of org.apache.avro.specific.SpecificRecord. "
+ "Consider using an alternative Deserializer.");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2023 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.lambda.powertools.kafka.serializers;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
* Deserializer for Kafka records using JSON format.
*/
public class KafkaJsonDeserializer extends AbstractKafkaDeserializer {

@Override
protected <T> T deserializeObject(byte[] data, Class<T> type) throws IOException {
String decodedStr = new String(data, StandardCharsets.UTF_8);

return objectMapper.readValue(decodedStr, type);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2023 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.lambda.powertools.kafka.serializers;

import java.io.IOException;
import com.google.protobuf.Message;
import com.google.protobuf.Parser;

/**
* Deserializer for Kafka records using Protocol Buffers format.
*/
public class KafkaProtobufDeserializer extends AbstractKafkaDeserializer {

@Override
@SuppressWarnings("unchecked")
protected <T> T deserializeObject(byte[] data, Class<T> type) throws IOException {
// If no Protobuf generated class is passed we cannot deserialize using Protobuf
if (Message.class.isAssignableFrom(type)) {
try {
// Get the parser from the generated Protobuf class
Parser<Message> parser = (Parser<Message>) type.getMethod("parser").invoke(null);
Message message = parser.parseFrom(data);
return type.cast(message);
} catch (Exception e) {
throw new IOException("Failed to deserialize Protobuf data.", e);
}
} else {
throw new IOException("Unsupported type for Protobuf deserialization: " + type.getName() + ". "
+ "Protobuf deserialization requires a type of com.google.protobuf.Message. "
+ "Consider using an alternative Deserializer.");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2023 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.lambda.powertools.kafka.serializers;

import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;

import com.amazonaws.services.lambda.runtime.serialization.factories.JacksonFactory;

/**
* Default deserializer for Kafka events proxying to Lambda default behavior.
*
* This deserializer uses the default Jackson ObjectMapper to deserialize the event from
* {@link com.amazonaws.services.lambda.runtime.serialization}.
*/
public class LambdaDefaultDeserializer implements PowertoolsDeserializer {

@SuppressWarnings("unchecked")
@Override
public <T> T fromJson(InputStream input, Type type) {
// If the target type does not require conversion, simply return the value itself
if (type.equals(InputStream.class)) {
return (T) input;
}

// If the target type is String, read the input stream as a String
if (type.equals(String.class)) {
try {
return (T) new String(input.readAllBytes(), StandardCharsets.UTF_8);
} catch (IOException e) {
throw new RuntimeException("Failed to read input stream as String", e);
}
}

return (T) JacksonFactory.getInstance().getSerializer(type).fromJson(input);
}

@SuppressWarnings("unchecked")
@Override
public <T> T fromJson(String input, Type type) {
// If the target type does not require conversion, simply return the value itself
if (type.equals(String.class)) {
return (T) input;
}

// If the target type is InputStream, read the input stream as a String
if (type.equals(InputStream.class)) {
return (T) input.getBytes(StandardCharsets.UTF_8);
}

return (T) JacksonFactory.getInstance().getSerializer(type).fromJson(input);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2023 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.lambda.powertools.kafka.serializers;

import java.io.InputStream;
import java.lang.reflect.Type;

/**
* Interface for deserializers that can handle both String and InputStream inputs.
*
* Similar to {@link com.amazonaws.services.lambda.runtime.CustomPojoSerializer} but only for input deserialization.
*/
public interface PowertoolsDeserializer {
<T> T fromJson(InputStream input, Type type);

<T> T fromJson(String input, Type type);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
software.amazon.lambda.powertools.kafka.PowertoolsSerializer
10 changes: 10 additions & 0 deletions powertools-kafka/src/test/avro/TestProduct.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"namespace": "software.amazon.lambda.powertools.kafka.serializers.test.avro",
"type": "record",
"name": "TestProduct",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "price", "type": "double"}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2023 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.lambda.powertools.kafka;

import static org.assertj.core.api.Assertions.assertThat;

import java.lang.reflect.Method;

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.junit.jupiter.api.Test;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;

class DeserializationTest {

@Test
void shouldHaveCorrectAnnotationRetention() {
// Given
Class<Deserialization> annotationClass = Deserialization.class;

// When/Then
assertThat(annotationClass.isAnnotation()).isTrue();
assertThat(annotationClass.getAnnotation(java.lang.annotation.Retention.class).value())
.isEqualTo(java.lang.annotation.RetentionPolicy.RUNTIME);
assertThat(annotationClass.getAnnotation(java.lang.annotation.Target.class).value())
.contains(java.lang.annotation.ElementType.METHOD);
}

@Test
void shouldHaveTypeMethod() throws NoSuchMethodException {
// Given
Class<Deserialization> annotationClass = Deserialization.class;

// When
java.lang.reflect.Method typeMethod = annotationClass.getMethod("type");

// Then
assertThat(typeMethod.getReturnType()).isEqualTo(DeserializationType.class);
}

@Test
void shouldBeAccessibleReflectivelyAtRuntime() throws NoSuchMethodException, SecurityException {
// Given
class TestHandler implements RequestHandler<ConsumerRecords<String, Object>, String> {
@Override
@Deserialization(type = DeserializationType.KAFKA_JSON)
public String handleRequest(ConsumerRecords<String, Object> input, Context context) {
return "OK";
}
}

// When
Method handleRequestMethod = TestHandler.class.getMethod("handleRequest", ConsumerRecords.class, Context.class);

// Then
Deserialization annotation = handleRequestMethod.getAnnotation(Deserialization.class);
assertThat(annotation).isNotNull();
assertThat(annotation.type()).isEqualTo(DeserializationType.KAFKA_JSON);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2023 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.lambda.powertools.kafka;

import static org.assertj.core.api.Assertions.assertThat;

import org.junit.jupiter.api.Test;

// Mainly present to remind us to write unit tests once we add support for a new Deserializer. If we add a new type in
// the enum it will fail this test.
class DeserializationTypeTest {

@Test
void shouldHaveExpectedEnumValues() {
// Given/When
DeserializationType[] values = DeserializationType.values();

// Then
assertThat(values).contains(
DeserializationType.LAMBDA_DEFAULT,
DeserializationType.KAFKA_JSON,
DeserializationType.KAFKA_AVRO,
DeserializationType.KAFKA_PROTOBUF);
}

@Test
void shouldBeAbleToValueOf() {
// Given/When
DeserializationType jsonType = DeserializationType.valueOf("KAFKA_JSON");
DeserializationType avroType = DeserializationType.valueOf("KAFKA_AVRO");
DeserializationType protobufType = DeserializationType.valueOf("KAFKA_PROTOBUF");
DeserializationType defaultType = DeserializationType.valueOf("LAMBDA_DEFAULT");

// Then
assertThat(jsonType).isEqualTo(DeserializationType.KAFKA_JSON);
assertThat(avroType).isEqualTo(DeserializationType.KAFKA_AVRO);
assertThat(protobufType).isEqualTo(DeserializationType.KAFKA_PROTOBUF);
assertThat(defaultType).isEqualTo(DeserializationType.LAMBDA_DEFAULT);
}
}
Loading