diff --git a/src/cloudevents/core/bindings/__init__.py b/src/cloudevents/core/bindings/__init__.py index c7c8532a..2379308a 100644 --- a/src/cloudevents/core/bindings/__init__.py +++ b/src/cloudevents/core/bindings/__init__.py @@ -12,20 +12,9 @@ # License for the specific language governing permissions and limitations # under the License. -from cloudevents.core.bindings.http import ( - HTTPMessage, - from_binary, - from_http, - from_structured, - to_binary, - to_structured, -) +""" +CloudEvents protocol bindings. -__all__ = [ - "HTTPMessage", - "to_binary", - "from_binary", - "to_structured", - "from_structured", - "from_http", -] +This package provides protocol-specific bindings for CloudEvents, including HTTP and Kafka. +Each binding module provides functions to convert CloudEvents to/from protocol-specific messages. +""" diff --git a/src/cloudevents/core/bindings/common.py b/src/cloudevents/core/bindings/common.py new file mode 100644 index 00000000..7fced491 --- /dev/null +++ b/src/cloudevents/core/bindings/common.py @@ -0,0 +1,68 @@ +# Copyright 2018-Present The CloudEvents Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Common utilities for CloudEvents protocol bindings. + +This module provides shared functionality for protocol bindings (HTTP, Kafka, etc.) +to handle CloudEvent attribute encoding and decoding per the CloudEvents specification. +""" + +from datetime import datetime +from typing import Any, Final +from urllib.parse import quote, unquote + +from dateutil.parser import isoparse + +TIME_ATTR: Final[str] = "time" +CONTENT_TYPE_HEADER: Final[str] = "content-type" +DATACONTENTTYPE_ATTR: Final[str] = "datacontenttype" + + +def encode_header_value(value: Any) -> str: + """ + Encode a CloudEvent attribute value for use in a protocol header. + + Handles special encoding for datetime objects (ISO 8601 with 'Z' suffix for UTC) + and applies percent-encoding for non-ASCII and special characters per RFC 3986. + + :param value: The attribute value to encode + :return: Percent-encoded string suitable for protocol headers + """ + if isinstance(value, datetime): + str_value = value.isoformat() + if str_value.endswith("+00:00"): + str_value = str_value[:-6] + "Z" + return quote(str_value, safe="") + + return quote(str(value), safe="") + + +def decode_header_value(attr_name: str, value: str) -> Any: + """ + Decode a CloudEvent attribute value from a protocol header. + + Applies percent-decoding and special parsing for the 'time' attribute + (converts to datetime object using RFC 3339 parsing). + + :param attr_name: The name of the CloudEvent attribute + :param value: The percent-encoded header value + :return: Decoded value (datetime for 'time' attribute, string otherwise) + """ + decoded = unquote(value) + + if attr_name == TIME_ATTR: + return isoparse(decoded) + + return decoded diff --git a/src/cloudevents/core/bindings/http.py b/src/cloudevents/core/bindings/http.py index 14fc6935..cded9a9f 100644 --- a/src/cloudevents/core/bindings/http.py +++ b/src/cloudevents/core/bindings/http.py @@ -13,17 +13,18 @@ # under the License. from dataclasses import dataclass -from datetime import datetime from typing import Any, Callable, Final -from urllib.parse import quote, unquote - -from dateutil.parser import isoparse from cloudevents.core.base import BaseCloudEvent +from cloudevents.core.bindings.common import ( + CONTENT_TYPE_HEADER, + DATACONTENTTYPE_ATTR, + decode_header_value, + encode_header_value, +) from cloudevents.core.formats.base import Format CE_PREFIX: Final[str] = "ce-" -CONTENT_TYPE_HEADER: Final[str] = "content-type" @dataclass(frozen=True) @@ -44,44 +45,6 @@ class HTTPMessage: body: bytes -def _encode_header_value(value: Any) -> str: - """ - Encode a CloudEvent attribute value for use in an HTTP header. - - Handles special encoding for datetime objects (ISO 8601 with 'Z' suffix for UTC) - and applies percent-encoding for non-ASCII and special characters per RFC 3986. - - :param value: The attribute value to encode - :return: Percent-encoded string suitable for HTTP headers - """ - if isinstance(value, datetime): - str_value = value.isoformat() - if str_value.endswith("+00:00"): - str_value = str_value[:-6] + "Z" - return quote(str_value, safe="") - - return quote(str(value), safe="") - - -def _decode_header_value(attr_name: str, value: str) -> Any: - """ - Decode a CloudEvent attribute value from an HTTP header. - - Applies percent-decoding and special parsing for the 'time' attribute - (converts to datetime object using RFC 3339 parsing). - - :param attr_name: The name of the CloudEvent attribute - :param value: The percent-encoded header value - :return: Decoded value (datetime for 'time' attribute, string otherwise) - """ - decoded = unquote(value) - - if attr_name == "time": - return isoparse(decoded) - - return decoded - - def to_binary(event: BaseCloudEvent, event_format: Format) -> HTTPMessage: """ Convert a CloudEvent to HTTP binary content mode. @@ -113,14 +76,14 @@ def to_binary(event: BaseCloudEvent, event_format: Format) -> HTTPMessage: if attr_value is None: continue - if attr_name == "datacontenttype": + if attr_name == DATACONTENTTYPE_ATTR: headers[CONTENT_TYPE_HEADER] = str(attr_value) else: header_name = f"{CE_PREFIX}{attr_name}" - headers[header_name] = _encode_header_value(attr_value) + headers[header_name] = encode_header_value(attr_value) data = event.get_data() - datacontenttype = attributes.get("datacontenttype") + datacontenttype = attributes.get(DATACONTENTTYPE_ATTR) body = event_format.write_data(data, datacontenttype) return HTTPMessage(headers=headers, body=body) @@ -163,11 +126,11 @@ def from_binary( if normalized_name.startswith(CE_PREFIX): attr_name = normalized_name[len(CE_PREFIX) :] - attributes[attr_name] = _decode_header_value(attr_name, header_value) + attributes[attr_name] = decode_header_value(attr_name, header_value) elif normalized_name == CONTENT_TYPE_HEADER: - attributes["datacontenttype"] = header_value + attributes[DATACONTENTTYPE_ATTR] = header_value - datacontenttype = attributes.get("datacontenttype") + datacontenttype = attributes.get(DATACONTENTTYPE_ATTR) data = event_format.read_data(message.body, datacontenttype) return event_factory(attributes, data) diff --git a/src/cloudevents/core/bindings/kafka.py b/src/cloudevents/core/bindings/kafka.py new file mode 100644 index 00000000..9c2e16b7 --- /dev/null +++ b/src/cloudevents/core/bindings/kafka.py @@ -0,0 +1,322 @@ +# Copyright 2018-Present The CloudEvents Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from dataclasses import dataclass +from typing import Any, Callable, Final + +from cloudevents.core.base import BaseCloudEvent +from cloudevents.core.bindings.common import ( + CONTENT_TYPE_HEADER, + DATACONTENTTYPE_ATTR, + decode_header_value, + encode_header_value, +) +from cloudevents.core.formats.base import Format + +CE_PREFIX: Final[str] = "ce_" +PARTITIONKEY_ATTR: Final[str] = "partitionkey" + +KeyMapper = Callable[[BaseCloudEvent], str | bytes | None] + + +@dataclass(frozen=True) +class KafkaMessage: + """ + Represents a Kafka message containing CloudEvent data. + + This dataclass encapsulates Kafka message components for transmitting CloudEvents + over Kafka. It is immutable to prevent accidental modifications and works with + any Kafka client library (kafka-python, confluent-kafka, etc.). + + Attributes: + headers: Kafka message headers as bytes (per Kafka protocol requirement) + key: Optional Kafka message key for partitioning + value: Kafka message value/payload as bytes + """ + + headers: dict[str, bytes] + key: str | bytes | None + value: bytes + + +def _default_key_mapper(event: BaseCloudEvent) -> str | bytes | None: + """ + Default key mapper that extracts the partitionkey extension attribute. + + :param event: The CloudEvent to extract key from + :return: The partitionkey extension attribute value, or None if not present + """ + value = event.get_extension(PARTITIONKEY_ATTR) + # Type narrowing: get_extension returns Any, but we know partitionkey should be str/bytes/None + return value if value is None or isinstance(value, (str, bytes)) else str(value) + + +def to_binary( + event: BaseCloudEvent, + event_format: Format, + key_mapper: KeyMapper | None = None, +) -> KafkaMessage: + """ + Convert a CloudEvent to Kafka binary content mode. + + In binary mode, CloudEvent attributes are mapped to Kafka headers with the 'ce_' prefix, + except for 'datacontenttype' which maps to the 'content-type' header. The event data + is placed in the Kafka message value. The message key is derived from the partitionkey + extension attribute or a custom key_mapper function. + + Example: + >>> from cloudevents.core.v1.event import CloudEvent + >>> from cloudevents.core.formats.json import JSONFormat + >>> + >>> event = CloudEvent( + ... attributes={"type": "com.example.test", "source": "/test"}, + ... data={"message": "Hello"} + ... ) + >>> message = to_binary(event, JSONFormat()) + >>> # message.headers = {"ce_type": b"com.example.test", "ce_source": b"/test", ...} + >>> # message.value = b'{"message": "Hello"}' + >>> # message.key = None + + :param event: The CloudEvent to convert + :param event_format: Format implementation for data serialization + :param key_mapper: Optional function to extract message key from event (defaults to partitionkey attribute) + :return: KafkaMessage with ce_-prefixed headers and event data as value + """ + headers: dict[str, bytes] = {} + attributes = event.get_attributes() + + # Apply key mapper + if key_mapper is None: + key_mapper = _default_key_mapper + message_key = key_mapper(event) + + for attr_name, attr_value in attributes.items(): + if attr_value is None: + continue + + # Skip partitionkey - it goes in the message key, not headers + if attr_name == PARTITIONKEY_ATTR: + continue + + if attr_name == DATACONTENTTYPE_ATTR: + headers[CONTENT_TYPE_HEADER] = str(attr_value).encode("utf-8") + else: + header_name = f"{CE_PREFIX}{attr_name}" + headers[header_name] = encode_header_value(attr_value).encode("utf-8") + + data = event.get_data() + datacontenttype = attributes.get(DATACONTENTTYPE_ATTR) + value = event_format.write_data(data, datacontenttype) + + return KafkaMessage(headers=headers, key=message_key, value=value) + + +def from_binary( + message: KafkaMessage, + event_format: Format, + event_factory: Callable[ + [dict[str, Any], dict[str, Any] | str | bytes | None], BaseCloudEvent + ], +) -> BaseCloudEvent: + """ + Parse a Kafka binary content mode message to a CloudEvent. + + Extracts CloudEvent attributes from ce_-prefixed Kafka headers and treats the + 'content-type' header as the 'datacontenttype' attribute. The Kafka message value + is parsed as event data according to the content type. If the message has a key, + it is added as the 'partitionkey' extension attribute. + + Example: + >>> from cloudevents.core.v1.event import CloudEvent + >>> from cloudevents.core.formats.json import JSONFormat + >>> + >>> message = KafkaMessage( + ... headers={"ce_type": b"com.example.test", "ce_source": b"/test", + ... "ce_id": b"123", "ce_specversion": b"1.0"}, + ... key=b"partition-key-123", + ... value=b'{"message": "Hello"}' + ... ) + >>> event = from_binary(message, JSONFormat(), CloudEvent) + + :param message: KafkaMessage to parse + :param event_format: Format implementation for data deserialization + :param event_factory: Factory function to create CloudEvent instances + :return: CloudEvent instance + """ + attributes: dict[str, Any] = {} + + for header_name, header_value_bytes in message.headers.items(): + header_value = header_value_bytes.decode("utf-8") + + normalized_name = header_name.lower() + + if normalized_name.startswith(CE_PREFIX): + attr_name = normalized_name[len(CE_PREFIX) :] + attributes[attr_name] = decode_header_value(attr_name, header_value) + elif normalized_name == CONTENT_TYPE_HEADER: + attributes[DATACONTENTTYPE_ATTR] = header_value + + # If message has a key, add it as partitionkey extension attribute + if message.key is not None: + key_value = ( + message.key.decode("utf-8") + if isinstance(message.key, bytes) + else message.key + ) + attributes[PARTITIONKEY_ATTR] = key_value + + datacontenttype = attributes.get(DATACONTENTTYPE_ATTR) + data = event_format.read_data(message.value, datacontenttype) + + return event_factory(attributes, data) + + +def to_structured( + event: BaseCloudEvent, + event_format: Format, + key_mapper: KeyMapper | None = None, +) -> KafkaMessage: + """ + Convert a CloudEvent to Kafka structured content mode. + + In structured mode, the entire CloudEvent (attributes and data) is serialized + into the Kafka message value using the specified format. The content-type header + is set to the format's media type. The message key is derived from the partitionkey + extension attribute or a custom key_mapper function. + + Example: + >>> from cloudevents.core.v1.event import CloudEvent + >>> from cloudevents.core.formats.json import JSONFormat + >>> + >>> event = CloudEvent( + ... attributes={"type": "com.example.test", "source": "/test"}, + ... data={"message": "Hello"} + ... ) + >>> message = to_structured(event, JSONFormat()) + >>> # message.headers = {"content-type": b"application/cloudevents+json"} + >>> # message.value = b'{"type": "com.example.test", "source": "/test", ...}' + + :param event: The CloudEvent to convert + :param event_format: Format implementation for serialization + :param key_mapper: Optional function to extract message key from event (defaults to partitionkey attribute) + :return: KafkaMessage with structured content in value + """ + content_type = event_format.get_content_type() + + headers = {CONTENT_TYPE_HEADER: content_type.encode("utf-8")} + + value = event_format.write(event) + + if key_mapper is None: + key_mapper = _default_key_mapper + message_key = key_mapper(event) + + return KafkaMessage(headers=headers, key=message_key, value=value) + + +def from_structured( + message: KafkaMessage, + event_format: Format, + event_factory: Callable[ + [dict[str, Any], dict[str, Any] | str | bytes | None], BaseCloudEvent + ], +) -> BaseCloudEvent: + """ + Parse a Kafka structured content mode message to a CloudEvent. + + Deserializes the CloudEvent from the Kafka message value using the specified format. + Any ce_-prefixed headers are ignored as the value contains all event metadata. + If the message has a key, it is added as the 'partitionkey' extension attribute. + + Example: + >>> from cloudevents.core.v1.event import CloudEvent + >>> from cloudevents.core.formats.json import JSONFormat + >>> + >>> message = KafkaMessage( + ... headers={"content-type": b"application/cloudevents+json"}, + ... key=b"partition-key-123", + ... value=b'{"type": "com.example.test", "source": "/test", ...}' + ... ) + >>> event = from_structured(message, JSONFormat(), CloudEvent) + + :param message: KafkaMessage to parse + :param event_format: Format implementation for deserialization + :param event_factory: Factory function to create CloudEvent instances + :return: CloudEvent instance + """ + event = event_format.read(event_factory, message.value) + + # If message has a key, we need to add it as partitionkey extension attribute + # Since the event is already created, we need to reconstruct it with the additional attribute + if message.key is not None: + key_value = ( + message.key.decode("utf-8") + if isinstance(message.key, bytes) + else message.key + ) + attributes = event.get_attributes() + attributes[PARTITIONKEY_ATTR] = key_value + data = event.get_data() + event = event_factory(attributes, data) + + return event + + +def from_kafka( + message: KafkaMessage, + event_format: Format, + event_factory: Callable[ + [dict[str, Any], dict[str, Any] | str | bytes | None], BaseCloudEvent + ], +) -> BaseCloudEvent: + """ + Parse a Kafka message to a CloudEvent with automatic mode detection. + + Automatically detects whether the message uses binary or structured content mode: + - If any ce_ prefixed headers are present β†’ binary mode + - Otherwise β†’ structured mode + + This function provides a convenient way to handle both content modes without + requiring the caller to determine the mode beforehand. + + Example: + >>> from cloudevents.core.v1.event import CloudEvent + >>> from cloudevents.core.formats.json import JSONFormat + >>> + >>> # Works with binary mode + >>> binary_msg = KafkaMessage( + ... headers={"ce_type": b"com.example.test", ...}, + ... key=None, + ... value=b'...' + ... ) + >>> event1 = from_kafka(binary_msg, JSONFormat(), CloudEvent) + >>> + >>> # Also works with structured mode + >>> structured_msg = KafkaMessage( + ... headers={"content-type": b"application/cloudevents+json"}, + ... key=None, + ... value=b'{"type": "com.example.test", ...}' + ... ) + >>> event2 = from_kafka(structured_msg, JSONFormat(), CloudEvent) + + :param message: KafkaMessage to parse + :param event_format: Format implementation for deserialization + :param event_factory: Factory function to create CloudEvent instances + :return: CloudEvent instance + """ + for header_name in message.headers.keys(): + if header_name.lower().startswith(CE_PREFIX): + return from_binary(message, event_format, event_factory) + + return from_structured(message, event_format, event_factory) diff --git a/tests/test_core/test_bindings/test_kafka.py b/tests/test_core/test_bindings/test_kafka.py new file mode 100644 index 00000000..e7d0d45a --- /dev/null +++ b/tests/test_core/test_bindings/test_kafka.py @@ -0,0 +1,620 @@ +# Copyright 2018-Present The CloudEvents Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from datetime import datetime, timezone +from typing import Any + +import pytest + +from cloudevents.core.base import BaseCloudEvent +from cloudevents.core.bindings.kafka import ( + KafkaMessage, + from_binary, + from_kafka, + from_structured, + to_binary, + to_structured, +) +from cloudevents.core.formats.json import JSONFormat +from cloudevents.core.v1.event import CloudEvent + + +@pytest.fixture +def minimal_attributes() -> dict[str, str]: + """Minimal valid CloudEvent attributes""" + return { + "type": "com.example.test", + "source": "/test", + "id": "test-id-123", + "specversion": "1.0", + } + + +def create_event( + extra_attrs: dict[str, Any] | None = None, + data: dict[str, Any] | str | bytes | None = None, +) -> CloudEvent: + """Helper to create CloudEvent with valid required attributes""" + attrs: dict[str, Any] = { + "type": "com.example.test", + "source": "/test", + "id": "test-id-123", + "specversion": "1.0", + } + if extra_attrs: + attrs.update(extra_attrs) + return CloudEvent(attributes=attrs, data=data) + + +def test_kafka_message_creation() -> None: + """Test basic KafkaMessage creation""" + message = KafkaMessage( + headers={"content-type": b"application/json"}, + key=b"test-key", + value=b"test", + ) + assert message.headers == {"content-type": b"application/json"} + assert message.key == b"test-key" + assert message.value == b"test" + + +def test_kafka_message_immutable() -> None: + """Test that KafkaMessage is immutable (frozen dataclass)""" + message = KafkaMessage(headers={"test": b"value"}, key=None, value=b"data") + + with pytest.raises(Exception): # FrozenInstanceError + message.headers = {b"new": b"dict"} + + with pytest.raises(Exception): # FrozenInstanceError + message.value = b"new data" + + +def test_to_binary_required_attributes() -> None: + """Test to_binary with only required attributes""" + event = create_event() + message = to_binary(event, JSONFormat()) + + assert "ce_type" in message.headers + assert message.headers["ce_type"] == b"com.example.test" + assert "ce_source" in message.headers + assert ( + message.headers["ce_source"] == b"%2Ftest" + ) # Forward slash is percent-encoded + assert "ce_id" in message.headers + assert message.headers["ce_id"] == b"test-id-123" + assert "ce_specversion" in message.headers + assert message.headers["ce_specversion"] == b"1.0" + + +def test_to_binary_with_optional_attributes() -> None: + """Test to_binary with optional attributes""" + event = create_event( + {"subject": "test-subject", "dataschema": "https://example.com/schema"}, + data=None, + ) + message = to_binary(event, JSONFormat()) + + assert message.headers["ce_subject"] == b"test-subject" + # All special characters including : and / are percent-encoded + assert message.headers["ce_dataschema"] == b"https%3A%2F%2Fexample.com%2Fschema" + + +def test_to_binary_with_extensions() -> None: + """Test to_binary with extension attributes""" + event = create_event( + {"customext": "custom-value", "anotherext": "another-value"}, + data=None, + ) + message = to_binary(event, JSONFormat()) + + assert message.headers["ce_customext"] == b"custom-value" + assert message.headers["ce_anotherext"] == b"another-value" + + +def test_to_binary_with_json_data() -> None: + """Test to_binary with dict (JSON) data and datacontenttype""" + event = create_event( + {"datacontenttype": "application/json"}, data={"message": "Hello", "count": 42} + ) + message = to_binary(event, JSONFormat()) + + # With application/json datacontenttype, data should be serialized as JSON + assert b'"message"' in message.value + assert b'"Hello"' in message.value + assert message.value != b"" + + +def test_to_binary_with_string_data() -> None: + """Test to_binary with string data""" + event = create_event(data="Hello World") + message = to_binary(event, JSONFormat()) + + assert message.value == b"Hello World" + + +def test_to_binary_with_bytes_data() -> None: + """Test to_binary with bytes data""" + event = create_event(data=b"\x00\x01\x02\x03") + message = to_binary(event, JSONFormat()) + + assert message.value == b"\x00\x01\x02\x03" + + +def test_to_binary_with_none_data() -> None: + """Test to_binary with None data""" + event = create_event(data=None) + message = to_binary(event, JSONFormat()) + + assert message.value == b"" + + +def test_to_binary_datetime_encoding() -> None: + """Test to_binary with datetime attribute""" + test_time = datetime(2023, 1, 15, 10, 30, 45, tzinfo=timezone.utc) + event = create_event({"time": test_time}) + message = to_binary(event, JSONFormat()) + + assert "ce_time" in message.headers + # Should be ISO 8601 with Z suffix, percent-encoded + assert b"2023-01-15T10%3A30%3A45Z" in message.headers["ce_time"] + + +def test_to_binary_special_characters() -> None: + """Test to_binary with special characters in attributes""" + event = create_event({"subject": 'Hello World! "quotes" & special'}) + message = to_binary(event, JSONFormat()) + + assert "ce_subject" in message.headers + assert b"%" in message.headers["ce_subject"] # Percent encoding present + + +def test_to_binary_datacontenttype_mapping() -> None: + """Test that datacontenttype maps to content-type header""" + event = create_event({"datacontenttype": "application/json"}, data={"test": "data"}) + message = to_binary(event, JSONFormat()) + + assert "content-type" in message.headers + assert message.headers["content-type"] == b"application/json" + assert "ce_datacontenttype" not in message.headers + + +def test_to_binary_partitionkey_in_key() -> None: + """Test that partitionkey extension attribute becomes message key""" + event = create_event({"partitionkey": "user-123"}) + message = to_binary(event, JSONFormat()) + + assert message.key == "user-123" + assert "ce_partitionkey" not in message.headers + + +def test_to_binary_custom_key_mapper() -> None: + """Test to_binary with custom key mapper""" + + def custom_mapper(event: BaseCloudEvent) -> str: + return f"custom-{event.get_type()}" + + event = create_event() + message = to_binary(event, JSONFormat(), key_mapper=custom_mapper) + + assert message.key == "custom-com.example.test" + + +def test_to_binary_no_partitionkey() -> None: + """Test to_binary without partitionkey returns None key""" + event = create_event() + message = to_binary(event, JSONFormat()) + + assert message.key is None + + +def test_from_binary_required_attributes() -> None: + """Test from_binary extracts required attributes""" + message = KafkaMessage( + headers={ + "ce_type": b"com.example.test", + "ce_source": b"%2Ftest", + "ce_id": b"test-123", + "ce_specversion": b"1.0", + }, + key=None, + value=b"", + ) + event = from_binary(message, JSONFormat(), CloudEvent) + + assert event.get_type() == "com.example.test" + assert event.get_source() == "/test" # Percent-decoded + assert event.get_id() == "test-123" + assert event.get_specversion() == "1.0" + + +def test_from_binary_with_optional_attributes() -> None: + """Test from_binary with optional attributes""" + message = KafkaMessage( + headers={ + "ce_type": b"com.example.test", + "ce_source": b"/test", + "ce_id": b"123", + "ce_specversion": b"1.0", + "ce_subject": b"test-subject", + "ce_dataschema": b"https%3A%2F%2Fexample.com%2Fschema", + }, + key=None, + value=b"", + ) + event = from_binary(message, JSONFormat(), CloudEvent) + + assert event.get_subject() == "test-subject" + assert event.get_dataschema() == "https://example.com/schema" # Percent-decoded + + +def test_from_binary_with_extensions() -> None: + """Test from_binary with extension attributes""" + message = KafkaMessage( + headers={ + "ce_type": b"com.example.test", + "ce_source": b"/test", + "ce_id": b"123", + "ce_specversion": b"1.0", + "ce_customext": b"custom-value", + }, + key=None, + value=b"", + ) + event = from_binary(message, JSONFormat(), CloudEvent) + + assert event.get_extension("customext") == "custom-value" + + +def test_from_binary_with_json_data() -> None: + """Test from_binary with JSON data""" + message = KafkaMessage( + headers={ + "ce_type": b"com.example.test", + "ce_source": b"/test", + "ce_id": b"123", + "ce_specversion": b"1.0", + "content-type": b"application/json", + }, + key=None, + value=b'{"message": "Hello", "count": 42}', + ) + event = from_binary(message, JSONFormat(), CloudEvent) + + data = event.get_data() + assert isinstance(data, dict) + assert data["message"] == "Hello" + assert data["count"] == 42 + + +def test_from_binary_datetime_parsing() -> None: + """Test from_binary parses datetime correctly""" + message = KafkaMessage( + headers={ + "ce_type": b"com.example.test", + "ce_source": b"/test", + "ce_id": b"123", + "ce_specversion": b"1.0", + "ce_time": b"2023-01-15T10%3A30%3A45Z", + }, + key=None, + value=b"", + ) + event = from_binary(message, JSONFormat(), CloudEvent) + + time = event.get_time() + assert isinstance(time, datetime) + assert time.year == 2023 + assert time.month == 1 + assert time.day == 15 + + +def test_from_binary_case_insensitive_headers() -> None: + """Test from_binary handles case-insensitive headers""" + message = KafkaMessage( + headers={ + "CE_TYPE": b"com.example.test", + "CE_SOURCE": b"/test", + "ce_id": b"123", + "Ce_Specversion": b"1.0", + }, + key=None, + value=b"", + ) + event = from_binary(message, JSONFormat(), CloudEvent) + + assert event.get_type() == "com.example.test" + assert event.get_source() == "/test" + + +def test_from_binary_content_type_as_datacontenttype() -> None: + """Test that content-type header becomes datacontenttype attribute""" + message = KafkaMessage( + headers={ + "ce_type": b"com.example.test", + "ce_source": b"/test", + "ce_id": b"123", + "ce_specversion": b"1.0", + "content-type": b"application/json", + }, + key=None, + value=b'{"test": "data"}', + ) + event = from_binary(message, JSONFormat(), CloudEvent) + + assert event.get_datacontenttype() == "application/json" + + +def test_from_binary_key_to_partitionkey() -> None: + """Test that message key becomes partitionkey extension attribute""" + message = KafkaMessage( + headers={ + "ce_type": b"com.example.test", + "ce_source": b"/test", + "ce_id": b"123", + "ce_specversion": b"1.0", + }, + key=b"user-123", + value=b"", + ) + event = from_binary(message, JSONFormat(), CloudEvent) + + assert event.get_extension("partitionkey") == "user-123" + + +def test_from_binary_round_trip() -> None: + """Test round-trip conversion preserves all data""" + original = create_event( + { + "time": datetime(2023, 1, 15, 10, 30, 45, tzinfo=timezone.utc), + "subject": "test-subject", + "partitionkey": "user-456", + }, + data={"message": "Hello", "count": 42}, + ) + + message = to_binary(original, JSONFormat()) + recovered = from_binary(message, JSONFormat(), CloudEvent) + + assert recovered.get_type() == original.get_type() + assert recovered.get_source() == original.get_source() + assert recovered.get_id() == original.get_id() + assert recovered.get_subject() == original.get_subject() + assert recovered.get_extension("partitionkey") == "user-456" + + +def test_to_structured_basic_event() -> None: + """Test to_structured with basic event""" + event = create_event(data={"message": "Hello"}) + message = to_structured(event, JSONFormat()) + + assert "content-type" in message.headers + assert message.headers["content-type"] == b"application/cloudevents+json" + assert b"type" in message.value + assert b"source" in message.value + + +def test_to_structured_with_all_attributes() -> None: + """Test to_structured with all optional attributes""" + event = create_event( + { + "time": datetime(2023, 1, 15, 10, 30, 45, tzinfo=timezone.utc), + "subject": "test-subject", + "datacontenttype": "application/json", + "dataschema": "https://example.com/schema", + }, + data={"message": "Hello"}, + ) + message = to_structured(event, JSONFormat()) + + assert b"time" in message.value + assert b"subject" in message.value + assert b"datacontenttype" in message.value + + +def test_to_structured_partitionkey_in_key() -> None: + """Test that partitionkey becomes message key in structured mode""" + event = create_event({"partitionkey": "user-789"}) + message = to_structured(event, JSONFormat()) + + assert message.key == "user-789" + + +def test_to_structured_custom_key_mapper() -> None: + """Test to_structured with custom key mapper""" + + def custom_mapper(event: BaseCloudEvent) -> str: + return f"type-{event.get_type().split('.')[-1]}" + + event = create_event() + message = to_structured(event, JSONFormat(), key_mapper=custom_mapper) + + assert message.key == "type-test" + + +def test_to_structured_with_binary_data() -> None: + """Test to_structured with binary data (should be base64 encoded)""" + event = create_event(data=b"\x00\x01\x02\x03") + message = to_structured(event, JSONFormat()) + + # Binary data should be base64 encoded in structured mode + assert b"data_base64" in message.value + + +def test_from_structured_basic_event() -> None: + """Test from_structured with basic event""" + message = KafkaMessage( + headers={"content-type": b"application/cloudevents+json"}, + key=None, + value=b'{"type":"com.example.test","source":"/test","id":"123","specversion":"1.0","data":{"message":"Hello"}}', + ) + event = from_structured(message, JSONFormat(), CloudEvent) + + assert event.get_type() == "com.example.test" + assert event.get_source() == "/test" + assert event.get_data() == {"message": "Hello"} + + +def test_from_structured_key_to_partitionkey() -> None: + """Test that message key becomes partitionkey in structured mode""" + message = KafkaMessage( + headers={"content-type": b"application/cloudevents+json"}, + key=b"user-999", + value=b'{"type":"com.example.test","source":"/test","id":"123","specversion":"1.0"}', + ) + event = from_structured(message, JSONFormat(), CloudEvent) + + assert event.get_extension("partitionkey") == "user-999" + + +def test_from_structured_round_trip() -> None: + """Test structured mode round-trip""" + original = create_event( + { + "time": datetime(2023, 1, 15, 10, 30, 45, tzinfo=timezone.utc), + "subject": "test-subject", + "partitionkey": "key-123", + }, + data={"message": "Hello", "count": 42}, + ) + + message = to_structured(original, JSONFormat()) + recovered = from_structured(message, JSONFormat(), CloudEvent) + + assert recovered.get_type() == original.get_type() + assert recovered.get_source() == original.get_source() + assert recovered.get_extension("partitionkey") == "key-123" + + +def test_from_kafka_detects_binary_mode() -> None: + """Test from_kafka detects binary mode (ce_ headers present)""" + message = KafkaMessage( + headers={ + "ce_type": b"com.example.test", + "ce_source": b"/test", + "ce_id": b"123", + "ce_specversion": b"1.0", + }, + key=None, + value=b'{"message": "Hello"}', + ) + event = from_kafka(message, JSONFormat(), CloudEvent) + + assert event.get_type() == "com.example.test" + + +def test_from_kafka_detects_structured_mode() -> None: + """Test from_kafka detects structured mode (no ce_ headers)""" + message = KafkaMessage( + headers={"content-type": b"application/cloudevents+json"}, + key=None, + value=b'{"type":"com.example.test","source":"/test","id":"123","specversion":"1.0"}', + ) + event = from_kafka(message, JSONFormat(), CloudEvent) + + assert event.get_type() == "com.example.test" + + +def test_from_kafka_case_insensitive_detection() -> None: + """Test from_kafka detection is case-insensitive""" + message = KafkaMessage( + headers={ + "CE_TYPE": b"com.example.test", + "CE_SOURCE": b"/test", + "ce_id": b"123", + "ce_specversion": b"1.0", + }, + key=None, + value=b"", + ) + event = from_kafka(message, JSONFormat(), CloudEvent) + + assert event.get_type() == "com.example.test" + + +def test_from_kafka_binary_with_partitionkey() -> None: + """Test from_kafka binary mode with partition key""" + message = KafkaMessage( + headers={ + "ce_type": b"com.example.test", + "ce_source": b"/test", + "ce_id": b"123", + "ce_specversion": b"1.0", + }, + key=b"user-555", + value=b"", + ) + event = from_kafka(message, JSONFormat(), CloudEvent) + + assert event.get_extension("partitionkey") == "user-555" + + +def test_from_kafka_structured_with_partitionkey() -> None: + """Test from_kafka structured mode with partition key""" + message = KafkaMessage( + headers={"content-type": b"application/cloudevents+json"}, + key=b"user-666", + value=b'{"type":"com.example.test","source":"/test","id":"123","specversion":"1.0"}', + ) + event = from_kafka(message, JSONFormat(), CloudEvent) + + assert event.get_extension("partitionkey") == "user-666" + + +def test_empty_headers() -> None: + """Test handling of empty headers in structured mode""" + message = KafkaMessage( + headers={}, + key=None, + value=b'{"type":"com.example.test","source":"/test","id":"123","specversion":"1.0"}', + ) + # Should default to structured mode + event = from_kafka(message, JSONFormat(), CloudEvent) + assert event.get_type() == "com.example.test" + + +def test_unicode_in_attributes() -> None: + """Test handling of unicode characters in attributes""" + event = create_event({"subject": "Hello δΈ–η•Œ 🌍"}) + message = to_binary(event, JSONFormat()) + recovered = from_binary(message, JSONFormat(), CloudEvent) + + assert recovered.get_subject() == "Hello δΈ–η•Œ 🌍" + + +def test_unicode_in_data() -> None: + """Test handling of unicode characters in data""" + event = create_event( + {"datacontenttype": "application/json"}, data={"message": "Hello δΈ–η•Œ 🌍"} + ) + message = to_binary(event, JSONFormat()) + recovered = from_binary(message, JSONFormat(), CloudEvent) + + assert isinstance(recovered.get_data(), dict) + assert recovered.get_data()["message"] == "Hello δΈ–η•Œ 🌍" + + +def test_string_key_vs_bytes_key() -> None: + """Test that both string and bytes keys work""" + # String key + event1 = create_event({"partitionkey": "string-key"}) + msg1 = to_binary(event1, JSONFormat()) + assert msg1.key == "string-key" + + # Bytes key through custom mapper + def bytes_mapper(event: BaseCloudEvent) -> bytes: + return b"bytes-key" + + event2 = create_event() + msg2 = to_binary(event2, JSONFormat(), key_mapper=bytes_mapper) + assert msg2.key == b"bytes-key"