From 30f31244ea4f80669dbf4d55cbc20cc10f27685d Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Wed, 30 Apr 2025 15:29:31 -0700 Subject: [PATCH 01/12] DGS-20366 Support schema ID in header --- .../schema_registry/__init__.py | 2 + src/confluent_kafka/schema_registry/avro.py | 6 +- .../schema_registry/json_schema.py | 6 +- .../schema_registry/protobuf.py | 6 +- .../rules/encryption/encrypt_executor.py | 4 +- src/confluent_kafka/schema_registry/serde.py | 178 +++++++++++++++++- tests/schema_registry/test_schema_id.py | 61 ++++++ .../schema_registry/test_wildcard_matcher.py | 2 +- 8 files changed, 252 insertions(+), 13 deletions(-) create mode 100644 tests/schema_registry/test_schema_id.py diff --git a/src/confluent_kafka/schema_registry/__init__.py b/src/confluent_kafka/schema_registry/__init__.py index e4ad4be17..9fb450078 100644 --- a/src/confluent_kafka/schema_registry/__init__.py +++ b/src/confluent_kafka/schema_registry/__init__.py @@ -36,6 +36,8 @@ ) _MAGIC_BYTE = 0 +_MAGIC_BYTE_V0 = _MAGIC_BYTE +_MAGIC_BYTE_V1 = 1 __all__ = [ "ConfigCompatibilityLevel", diff --git a/src/confluent_kafka/schema_registry/avro.py b/src/confluent_kafka/schema_registry/avro.py index 368507752..4a12804f4 100644 --- a/src/confluent_kafka/schema_registry/avro.py +++ b/src/confluent_kafka/schema_registry/avro.py @@ -30,7 +30,7 @@ validate) from fastavro.schema import load_schema -from . import (_MAGIC_BYTE, +from . import (_MAGIC_BYTE_V0, Schema, topic_subject_name_strategy, RuleMode, @@ -378,7 +378,7 @@ def __call__(self, obj: object, ctx: Optional[SerializationContext] = None) -> O with _ContextStringIO() as fo: # Write the magic byte and schema ID in network byte order (big endian) - fo.write(pack('>bI', _MAGIC_BYTE, self._schema_id)) + fo.write(pack('>bI', _MAGIC_BYTE_V0, self._schema_id)) # write the record to the rest of the buffer schemaless_writer(fo, parsed_schema, value) @@ -564,7 +564,7 @@ def __call__(self, data: bytes, ctx: Optional[SerializationContext] = None) -> U with _ContextStringIO(data) as payload: magic, schema_id = unpack('>bI', payload.read(5)) - if magic != _MAGIC_BYTE: + if magic != _MAGIC_BYTE_V0: raise SerializationError("Unexpected magic byte {}. This message " "was not produced with a Confluent " "Schema Registry serializer".format(magic)) diff --git a/src/confluent_kafka/schema_registry/json_schema.py b/src/confluent_kafka/schema_registry/json_schema.py index 157d0dd7f..87501c347 100644 --- a/src/confluent_kafka/schema_registry/json_schema.py +++ b/src/confluent_kafka/schema_registry/json_schema.py @@ -31,7 +31,7 @@ from referencing import Registry, Resource from referencing._core import Resolver -from confluent_kafka.schema_registry import (_MAGIC_BYTE, +from confluent_kafka.schema_registry import (_MAGIC_BYTE_V0, Schema, topic_subject_name_strategy, RuleKind, @@ -386,7 +386,7 @@ def __call__(self, obj: object, ctx: Optional[SerializationContext] = None) -> O with _ContextStringIO() as fo: # Write the magic byte and schema ID in network byte order (big endian) - fo.write(struct.pack(">bI", _MAGIC_BYTE, self._schema_id)) + fo.write(struct.pack(">bI", _MAGIC_BYTE_V0, self._schema_id)) # JSON dump always writes a str never bytes # https://docs.python.org/3/library/json.html encoded_value = self._json_encode(value) @@ -590,7 +590,7 @@ def __call__(self, data: bytes, ctx: Optional[SerializationContext] = None) -> U with _ContextStringIO(data) as payload: magic, schema_id = struct.unpack('>bI', payload.read(5)) - if magic != _MAGIC_BYTE: + if magic != _MAGIC_BYTE_V0: raise SerializationError("Unexpected magic byte {}. This message " "was not produced with a Confluent " "Schema Registry serializer".format(magic)) diff --git a/src/confluent_kafka/schema_registry/protobuf.py b/src/confluent_kafka/schema_registry/protobuf.py index dfa5c1ffe..fb1839041 100644 --- a/src/confluent_kafka/schema_registry/protobuf.py +++ b/src/confluent_kafka/schema_registry/protobuf.py @@ -40,7 +40,7 @@ from google.protobuf.message import DecodeError, Message from google.protobuf.message_factory import GetMessageClass -from . import (_MAGIC_BYTE, +from . import (_MAGIC_BYTE_V0, reference_subject_name_strategy, topic_subject_name_strategy, SchemaRegistryClient) from .confluent.types import decimal_pb2 @@ -603,7 +603,7 @@ def __call__(self, message: Message, ctx: Optional[SerializationContext] = None) with _ContextStringIO() as fo: # Write the magic byte and schema ID in network byte order # (big endian) - fo.write(struct.pack('>bI', _MAGIC_BYTE, self._schema_id)) + fo.write(struct.pack('>bI', _MAGIC_BYTE_V0, self._schema_id)) # write the index array that specifies the message descriptor # of the serialized data. self._encode_varints(fo, self._index_array, @@ -862,7 +862,7 @@ def __call__(self, data: bytes, ctx: Optional[SerializationContext] = None) -> O with _ContextStringIO(data) as payload: magic, schema_id = struct.unpack('>bI', payload.read(5)) - if magic != _MAGIC_BYTE: + if magic != _MAGIC_BYTE_V0: raise SerializationError("Unknown magic byte. This message was " "not produced with a Confluent " "Schema Registry serializer") diff --git a/src/confluent_kafka/schema_registry/rules/encryption/encrypt_executor.py b/src/confluent_kafka/schema_registry/rules/encryption/encrypt_executor.py index 01838364a..cb480f145 100644 --- a/src/confluent_kafka/schema_registry/rules/encryption/encrypt_executor.py +++ b/src/confluent_kafka/schema_registry/rules/encryption/encrypt_executor.py @@ -23,7 +23,7 @@ from tink.proto import tink_pb2, aes_siv_pb2 from confluent_kafka.schema_registry import SchemaRegistryError, RuleMode, \ - _MAGIC_BYTE + _MAGIC_BYTE_V0 from confluent_kafka.schema_registry.rule_registry import RuleRegistry from confluent_kafka.schema_registry.rules.encryption.dek_registry.dek_registry_client import \ DekRegistryClient, Kek, KekId, DekId, Dek, DekAlgorithm @@ -381,7 +381,7 @@ def transform(self, ctx: RuleContext, field_ctx: FieldContext, field_value: Any) raise RuleError(f"unsupported rule mode {ctx.rule_mode}") def _prefix_version(self, version: int, ciphertext: bytes) -> bytes: - return bytes([_MAGIC_BYTE]) + version.to_bytes(4, byteorder="big") + ciphertext + return bytes([_MAGIC_BYTE_V0]) + version.to_bytes(4, byteorder="big") + ciphertext def _extract_version(self, ciphertext: bytes) -> Tuple[Optional[int], bytes]: if len(ciphertext) < 5: diff --git a/src/confluent_kafka/schema_registry/serde.py b/src/confluent_kafka/schema_registry/serde.py index 1cfb384e1..f0a9b4b83 100644 --- a/src/confluent_kafka/schema_registry/serde.py +++ b/src/confluent_kafka/schema_registry/serde.py @@ -31,12 +31,17 @@ 'RuleExecutor'] import abc +import io import logging +import struct +import uuid from enum import Enum from threading import Lock from typing import Callable, List, Optional, Set, Dict, Any, TypeVar -from confluent_kafka.schema_registry import RegisteredSchema +from confluent_kafka.schema_registry import (RegisteredSchema, + _MAGIC_BYTE_V0, + _MAGIC_BYTE_V1) from confluent_kafka.schema_registry.schema_registry_client import RuleMode, \ Rule, RuleKind, Schema, RuleSet from confluent_kafka.schema_registry.wildcard_matcher import wildcard_match @@ -47,6 +52,177 @@ log = logging.getLogger(__name__) +class SchemaId(object): + __slots__ = ['schema_type', 'id', 'guid', 'message_indexes'] + + def __init__(self, schema_type: str): + self.schema_type = schema_type + self.id = None + self.guid = None + self.message_indexes = None + + def from_bytes(self, payload: io.BytesIO) -> io.BytesIO: + magic = struct.unpack('>b', payload.read(1))[0] + if magic == _MAGIC_BYTE_V0: + self.id = struct.unpack('>I', payload.read(4))[0] + elif magic == _MAGIC_BYTE_V1: + self.guid = uuid.UUID(bytes=payload.read(16)) + else: + raise SerializationError("Invalid magic byte") + if self.schema_type == "PROTOBUF": + self.message_indexes = self._read_index_array(payload, zigzag=True) + return payload + + def id_to_bytes(self) -> bytes: + if self.id is None: + raise SerializationError("Schema ID is not set") + buf = io.BytesIO() + buf.write(struct.pack('>bI', _MAGIC_BYTE_V0, self.id)) + if self.message_indexes is not None: + self._encode_varints(buf, self.message_indexes, zigzag=True) + return buf.getvalue() + + def guid_to_bytes(self) -> bytes: + if self.guid is None: + raise SerializationError("Schema GUID is not set") + buf = io.BytesIO() + buf.write(struct.pack('>b', _MAGIC_BYTE_V1)) + buf.write(self.guid.bytes) + if self.message_indexes is not None: + self._encode_varints(buf, self.message_indexes, zigzag=True) + return buf.getvalue() + + @staticmethod + def _decode_varint(buf: io.BytesIO, zigzag: bool = True) -> int: + """ + Decodes a single varint from a buffer. + + Args: + buf (BytesIO): buffer to read from + zigzag (bool): decode as zigzag or uvarint + + Returns: + int: decoded varint + + Raises: + EOFError: if buffer is empty + """ + + value = 0 + shift = 0 + try: + while True: + i = SchemaId._read_byte(buf) + + value |= (i & 0x7f) << shift + shift += 7 + if not (i & 0x80): + break + + if zigzag: + value = (value >> 1) ^ -(value & 1) + + return value + + except EOFError: + raise EOFError("Unexpected EOF while reading index") + + @staticmethod + def _read_byte(buf: io.BytesIO) -> int: + """ + Read one byte from buf as an int. + + Args: + buf (BytesIO): The buffer to read from. + + .. _ord: + https://docs.python.org/2/library/functions.html#ord + """ + + i = buf.read(1) + if i == b'': + raise EOFError("Unexpected EOF encountered") + return ord(i) + + @staticmethod + def _read_index_array(buf: io.BytesIO, zigzag: bool = True) -> List[int]: + """ + Read an index array from buf that specifies the message + descriptor of interest in the file descriptor. + + Args: + buf (BytesIO): The buffer to read from. + + Returns: + list of int: The index array. + """ + + size = SchemaId._decode_varint(buf, zigzag=zigzag) + if size < 0 or size > 100000: + raise SerializationError("Invalid msgidx array length") + + if size == 0: + return [0] + + msg_index = [] + for _ in range(size): + msg_index.append(SchemaId._decode_varint(buf, + zigzag=zigzag)) + + return msg_index + + @staticmethod + def _write_varint(buf: io.BytesIO, val: int, zigzag: bool = True): + """ + Writes val to buf, either using zigzag or uvarint encoding. + + Args: + buf (BytesIO): buffer to write to. + val (int): integer to be encoded. + zigzag (bool): whether to encode in zigzag or uvarint encoding + """ + + if zigzag: + val = (val << 1) ^ (val >> 63) + + while (val & ~0x7f) != 0: + buf.write(SchemaId._bytes((val & 0x7f) | 0x80)) + val >>= 7 + buf.write(SchemaId._bytes(val)) + + @staticmethod + def _encode_varints(buf: io.BytesIO, ints: List[int], zigzag: bool = True): + """ + Encodes each int as a uvarint onto buf + + Args: + buf (BytesIO): buffer to write to. + ints ([int]): ints to be encoded. + zigzag (bool): whether to encode in zigzag or uvarint encoding + """ + + assert len(ints) > 0 + # The root element at the 0 position does not need a length prefix. + if ints == [0]: + buf.write(SchemaId._bytes(0x00)) + return + + SchemaId._write_varint(buf, len(ints), zigzag=zigzag) + + for value in ints: + SchemaId._write_varint(buf, value, zigzag=zigzag) + + @staticmethod + def _bytes(v: int) -> bytes: + """ + Convert int to bytes + + Args: + v (int): The int to convert to bytes. + """ + return bytes((v,)) + + class FieldType(str, Enum): RECORD = "RECORD" ENUM = "ENUM" diff --git a/tests/schema_registry/test_schema_id.py b/tests/schema_registry/test_schema_id.py new file mode 100644 index 000000000..546098c38 --- /dev/null +++ b/tests/schema_registry/test_schema_id.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright 2025 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import io +from confluent_kafka.schema_registry.serde import SchemaId + +def test_schema_guid(): + schema_id = SchemaId("AVRO") + input = bytes([ + 0x01, 0x89, 0x79, 0x17, 0x62, 0x23, 0x36, 0x41, 0x86, 0x96, 0x74, 0x29, 0x9b, 0x90, + 0xa8, 0x02, 0xe2 + ]) + schema_id.from_bytes(io.BytesIO(input)) + guid_str = str(schema_id.guid) + assert guid_str == "89791762-2336-4186-9674-299b90a802e2" + output = schema_id.guid_to_bytes() + assert output == input + + +def test_schema_id(): + schema_id = SchemaId("AVRO") + input = bytes([ + 0x00, 0x00, 0x00, 0x00, 0x01 + ]) + schema_id.from_bytes(io.BytesIO(input)) + id = schema_id.id + assert id == 1 + output = schema_id.id_to_bytes() + assert output == input + + +def test_schema_message_indexes(): + schema_id = SchemaId("PROTOBUF") + input = bytes([ + 0x00, 0x00, 0x00, 0x00, 0x01, 0x06, 0x02, 0x04, 0x06 + ]) + schema_id.from_bytes(io.BytesIO(input)) + id = schema_id.id + assert id == 1 + indexes = schema_id.message_indexes + assert indexes == [1, 2, 3] + output = schema_id.id_to_bytes() + assert output == input + + + + diff --git a/tests/schema_registry/test_wildcard_matcher.py b/tests/schema_registry/test_wildcard_matcher.py index af8db63ad..53828e46d 100644 --- a/tests/schema_registry/test_wildcard_matcher.py +++ b/tests/schema_registry/test_wildcard_matcher.py @@ -1,7 +1,7 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # -# Copyright 20244Confluent Inc. +# Copyright 2024 Confluent Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. From 47f5ccd48eb9c461176f517db1cfe458e4f77786 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Wed, 30 Apr 2025 15:32:27 -0700 Subject: [PATCH 02/12] Fix flake8 --- src/confluent_kafka/schema_registry/serde.py | 3 +-- tests/schema_registry/test_schema_id.py | 5 +---- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/src/confluent_kafka/schema_registry/serde.py b/src/confluent_kafka/schema_registry/serde.py index f0a9b4b83..e36a2097e 100644 --- a/src/confluent_kafka/schema_registry/serde.py +++ b/src/confluent_kafka/schema_registry/serde.py @@ -166,8 +166,7 @@ def _read_index_array(buf: io.BytesIO, zigzag: bool = True) -> List[int]: msg_index = [] for _ in range(size): - msg_index.append(SchemaId._decode_varint(buf, - zigzag=zigzag)) + msg_index.append(SchemaId._decode_varint(buf, zigzag=zigzag)) return msg_index diff --git a/tests/schema_registry/test_schema_id.py b/tests/schema_registry/test_schema_id.py index 546098c38..4ca578776 100644 --- a/tests/schema_registry/test_schema_id.py +++ b/tests/schema_registry/test_schema_id.py @@ -18,6 +18,7 @@ import io from confluent_kafka.schema_registry.serde import SchemaId + def test_schema_guid(): schema_id = SchemaId("AVRO") input = bytes([ @@ -55,7 +56,3 @@ def test_schema_message_indexes(): assert indexes == [1, 2, 3] output = schema_id.id_to_bytes() assert output == input - - - - From 910d00fe7f2b2c00ed64d1939d8514d5dc850b3c Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Fri, 2 May 2025 19:18:37 -0700 Subject: [PATCH 03/12] Use id serdes --- .../schema_registry/__init__.py | 101 ++++++- src/confluent_kafka/schema_registry/avro.py | 149 +++++----- .../schema_registry/json_schema.py | 177 ++++++------ .../mock_schema_registry_client.py | 19 ++ .../schema_registry/protobuf.py | 266 +++++++----------- .../schema_registry/schema_registry_client.py | 100 ++++++- src/confluent_kafka/schema_registry/serde.py | 23 +- tests/schema_registry/test_json.py | 23 +- tests/schema_registry/test_proto.py | 13 +- 9 files changed, 520 insertions(+), 351 deletions(-) diff --git a/src/confluent_kafka/schema_registry/__init__.py b/src/confluent_kafka/schema_registry/__init__.py index 9fb450078..268ac1673 100644 --- a/src/confluent_kafka/schema_registry/__init__.py +++ b/src/confluent_kafka/schema_registry/__init__.py @@ -15,6 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import io from typing import Optional from .schema_registry_client import ( @@ -34,6 +35,10 @@ SchemaReference, ServerConfig ) +from ..serialization import SerializationError, MessageField + +_KEY_SCHEMA_ID = "__key_schema_id" +_VALUE_SCHEMA_ID = "__value_schema_id" _MAGIC_BYTE = 0 _MAGIC_BYTE_V0 = _MAGIC_BYTE @@ -57,7 +62,11 @@ "ServerConfig", "topic_subject_name_strategy", "topic_record_subject_name_strategy", - "record_subject_name_strategy" + "record_subject_name_strategy", + "header_schema_id_serializer", + "prefix_schema_id_serializer", + "dual_schema_id_deserializer", + "prefix_schema_id_deserializer" ] @@ -115,3 +124,93 @@ def reference_subject_name_strategy(ctx, schema_ref: SchemaReference) -> Optiona """ return schema_ref.name if schema_ref is not None else None + + +def header_schema_id_serializer(payload: bytes, ctx, schema_id) -> bytes: + """ + Serializes the schema guid into the header. + + Args: + payload (bytes): The payload to serialize. + ctx (SerializationContext): Metadata pertaining to the serialization + operation. + schema_id (SchemaId): The schema ID to serialize. + + Returns: + bytes: The payload + """ + headers = ctx.headers + if headers is None: + raise SerializationError("Missing headers") + header_key = _KEY_SCHEMA_ID if ctx.field == MessageField.KEY else _VALUE_SCHEMA_ID + header_value = schema_id.guid_to_bytes() + if isinstance(headers, list): + headers.append((header_key, header_value)) + elif isinstance(headers, dict): + headers[header_key] = header_value + else: + raise SerializationError("Invalid headers type") + return payload + + +def prefix_schema_id_serializer(payload: bytes, ctx, schema_id) -> bytes: + """ + Serializes the schema id into the payload prefix. + + Args: + payload (bytes): The payload to serialize. + ctx (SerializationContext): Metadata pertaining to the serialization + operation. + schema_id (SchemaId): The schema ID to serialize. + + Returns: + bytes: The payload prefixed with the schema id + """ + return schema_id.id_to_bytes() + payload + + +def dual_schema_id_deserializer(payload: bytes, ctx, schema_id) -> io.BytesIO: + """ + Deserializes the schema id by first checking the header, then the payload prefix. + + Args: + payload (bytes): The payload to serialize. + ctx (SerializationContext): Metadata pertaining to the serialization + operation. + schema_id (SchemaId): The schema ID to serialize. + + Returns: + bytes: The payload + """ + headers = ctx.headers + header_key = _KEY_SCHEMA_ID if ctx.field == MessageField.KEY else _VALUE_SCHEMA_ID + if headers is not None: + header_value = None + if isinstance(headers, list): + # look for header_key in headers + for header in headers: + if header[0] == header_key: + header_value = header[1] + break + elif isinstance(headers, dict): + header_value = headers.get(header_key, None) + if header_value is not None: + schema_id.from_bytes(header_value) + return io.BytesIO(payload) + return schema_id.from_bytes(io.BytesIO(payload)) + + +def prefix_schema_id_deserializer(payload: bytes, ctx, schema_id) -> io.BytesIO: + """ + Deserializes the schema id from the payload prefix. + + Args: + payload (bytes): The payload to serialize. + ctx (SerializationContext): Metadata pertaining to the serialization + operation. + schema_id (SchemaId): The schema ID to serialize. + + Returns: + bytes: The payload + """ + return schema_id.from_bytes(io.BytesIO(payload)) diff --git a/src/confluent_kafka/schema_registry/avro.py b/src/confluent_kafka/schema_registry/avro.py index 4a12804f4..fcd3b79f7 100644 --- a/src/confluent_kafka/schema_registry/avro.py +++ b/src/confluent_kafka/schema_registry/avro.py @@ -21,7 +21,6 @@ from copy import deepcopy from io import BytesIO from json import loads -from struct import pack, unpack from typing import Dict, Union, Optional, Set, Callable from fastavro import (schemaless_reader, @@ -30,16 +29,18 @@ validate) from fastavro.schema import load_schema -from . import (_MAGIC_BYTE_V0, - Schema, +from . import (Schema, topic_subject_name_strategy, RuleMode, - RuleKind, SchemaRegistryClient) -from confluent_kafka.serialization import (SerializationError, - SerializationContext) + RuleKind, SchemaRegistryClient, prefix_schema_id_serializer, + dual_schema_id_deserializer) +from confluent_kafka.serialization import (SerializationContext) from .rule_registry import RuleRegistry from .serde import BaseSerializer, BaseDeserializer, RuleContext, FieldType, \ - FieldTransform, RuleConditionError, ParsedSchemaCache + FieldTransform, RuleConditionError, ParsedSchemaCache, SchemaId + + +AVRO_TYPE = "AVRO" AvroMessage = Union[ @@ -164,6 +165,12 @@ class AvroSerializer(BaseSerializer): | | | | | | | Defaults to topic_subject_name_strategy. | +-----------------------------+----------+--------------------------------------------------+ + | | | Callable(bytes, SerializationContext, schema_id) | + | | | -> bytes | + | | | | + | ``schema.id.serializer`` | callable | Defines how the schema id/guid is serialized. | + | | | Defaults to prefix_schema_id_serializer. | + +-----------------------------+----------+--------------------------------------------------+ Schemas are registered against subject names in Confluent Schema Registry that define a scope in which the schemas can be evolved. By default, the subject name @@ -223,7 +230,8 @@ class AvroSerializer(BaseSerializer): 'use.schema.id': None, 'use.latest.version': False, 'use.latest.with.metadata': None, - 'subject.name.strategy': topic_subject_name_strategy} + 'subject.name.strategy': topic_subject_name_strategy, + 'schema.id.serializer': prefix_schema_id_serializer} def __init__( self, @@ -286,6 +294,10 @@ def __init__( if not callable(self._subject_name_func): raise ValueError("subject.name.strategy must be callable") + self._schema_id_serializer = conf_copy.pop('schema.id.serializer') + if not callable(self._schema_id_serializer): + raise ValueError("schema.id.serializer must be callable") + if len(conf_copy) > 0: raise ValueError("Unrecognized properties: {}" .format(", ".join(conf_copy.keys()))) @@ -345,19 +357,20 @@ def __call__(self, obj: object, ctx: Optional[SerializationContext] = None) -> O subject = self._subject_name_func(ctx, self._schema_name) latest_schema = self._get_reader_schema(subject) if latest_schema is not None: - self._schema_id = latest_schema.schema_id + self._schema_id = SchemaId(AVRO_TYPE, latest_schema.schema_id, latest_schema.guid) elif subject not in self._known_subjects: # Check to ensure this schema has been registered under subject_name. if self._auto_register: # The schema name will always be the same. We can't however register # a schema without a subject so we set the schema_id here to handle # the initial registration. - self._schema_id = self._registry.register_schema( + registered_schema = self._registry.register_schema_full_response( subject, self._schema, self._normalize_schemas) + self._schema_id = SchemaId(AVRO_TYPE, registered_schema.schema_id, registered_schema.guid) else: registered_schema = self._registry.lookup_schema( subject, self._schema, self._normalize_schemas) - self._schema_id = registered_schema.schema_id + self._schema_id = SchemaId(AVRO_TYPE, registered_schema.schema_id, registered_schema.guid) self._known_subjects.add(subject) @@ -377,12 +390,9 @@ def __call__(self, obj: object, ctx: Optional[SerializationContext] = None) -> O parsed_schema = self._parsed_schema with _ContextStringIO() as fo: - # Write the magic byte and schema ID in network byte order (big endian) - fo.write(pack('>bI', _MAGIC_BYTE_V0, self._schema_id)) # write the record to the rest of the buffer schemaless_writer(fo, parsed_schema, value) - - return fo.getvalue() + return self._schema_id_serializer(fo.getvalue(), ctx, self._schema_id) def _get_parsed_schema(self, schema: Schema) -> AvroSchema: parsed_schema = self._parsed_schemas.get_parsed_schema(schema) @@ -425,6 +435,12 @@ class AvroDeserializer(BaseDeserializer): | | | | | | | Defaults to topic_subject_name_strategy. | +-----------------------------+----------+--------------------------------------------------+ + | | | Callable(bytes, SerializationContext, schema_id) | + | | | -> io.BytesIO | + | | | | + | ``schema.id.deserializer`` | callable | Defines how the schema id/guid is deserialized. | + | | | Defaults to dual_schema_id_deserializer. | + +-----------------------------+----------+--------------------------------------------------+ Note: By default, Avro complex types are returned as dicts. This behavior can @@ -462,7 +478,8 @@ class AvroDeserializer(BaseDeserializer): _default_conf = {'use.latest.version': False, 'use.latest.with.metadata': None, - 'subject.name.strategy': topic_subject_name_strategy} + 'subject.name.strategy': topic_subject_name_strategy, + 'schema.id.deserializer': dual_schema_id_deserializer} def __init__( self, @@ -507,6 +524,10 @@ def __init__( if not callable(self._subject_name_func): raise ValueError("subject.name.strategy must be callable") + self._schema_id_deserializer = conf_copy.pop('schema.id.deserializer') + if not callable(self._schema_id_deserializer): + raise ValueError("schema.id.deserializer must be callable") + if len(conf_copy) > 0: raise ValueError("Unrecognized properties: {}" .format(", ".join(conf_copy.keys()))) @@ -551,67 +572,57 @@ def __call__(self, data: bytes, ctx: Optional[SerializationContext] = None) -> U if data is None: return None - if len(data) <= 5: - raise SerializationError("Expecting data framing of length 6 bytes or " - "more but total data size is {} bytes. This " - "message was not produced with a Confluent " - "Schema Registry serializer".format(len(data))) - subject = self._subject_name_func(ctx, None) if ctx else None latest_schema = None if subject is not None: latest_schema = self._get_reader_schema(subject) - with _ContextStringIO(data) as payload: - magic, schema_id = unpack('>bI', payload.read(5)) - if magic != _MAGIC_BYTE_V0: - raise SerializationError("Unexpected magic byte {}. This message " - "was not produced with a Confluent " - "Schema Registry serializer".format(magic)) - - writer_schema_raw = self._registry.get_schema(schema_id) - writer_schema = self._get_parsed_schema(writer_schema_raw) - - if subject is None: - subject = self._subject_name_func(ctx, writer_schema.get("name")) if ctx else None - if subject is not None: - latest_schema = self._get_reader_schema(subject) - - if latest_schema is not None: - migrations = self._get_migrations(subject, writer_schema_raw, latest_schema, None) - reader_schema_raw = latest_schema.schema - reader_schema = self._get_parsed_schema(latest_schema.schema) - elif self._schema is not None: - migrations = None - reader_schema_raw = self._schema - reader_schema = self._reader_schema - else: - migrations = None - reader_schema_raw = writer_schema_raw - reader_schema = writer_schema - - if migrations: - obj_dict = schemaless_reader(payload, - writer_schema, - None, - self._return_record_name) - obj_dict = self._execute_migrations(ctx, subject, migrations, obj_dict) - else: - obj_dict = schemaless_reader(payload, - writer_schema, - reader_schema, - self._return_record_name) + schema_id = SchemaId(AVRO_TYPE) + payload = self._schema_id_deserializer(data, ctx, schema_id) + + writer_schema_raw = self._get_schema(schema_id, subject) + writer_schema = self._get_parsed_schema(writer_schema_raw) + + if subject is None: + subject = self._subject_name_func(ctx, writer_schema.get("name")) if ctx else None + if subject is not None: + latest_schema = self._get_reader_schema(subject) + + if latest_schema is not None: + migrations = self._get_migrations(subject, writer_schema_raw, latest_schema, None) + reader_schema_raw = latest_schema.schema + reader_schema = self._get_parsed_schema(latest_schema.schema) + elif self._schema is not None: + migrations = None + reader_schema_raw = self._schema + reader_schema = self._reader_schema + else: + migrations = None + reader_schema_raw = writer_schema_raw + reader_schema = writer_schema + + if migrations: + obj_dict = schemaless_reader(payload, + writer_schema, + None, + self._return_record_name) + obj_dict = self._execute_migrations(ctx, subject, migrations, obj_dict) + else: + obj_dict = schemaless_reader(payload, + writer_schema, + reader_schema, + self._return_record_name) - field_transformer = lambda rule_ctx, field_transform, message: ( # noqa: E731 - transform(rule_ctx, reader_schema, message, field_transform)) - obj_dict = self._execute_rules(ctx, subject, RuleMode.READ, None, - reader_schema_raw, obj_dict, get_inline_tags(reader_schema), - field_transformer) + field_transformer = lambda rule_ctx, field_transform, message: ( # noqa: E731 + transform(rule_ctx, reader_schema, message, field_transform)) + obj_dict = self._execute_rules(ctx, subject, RuleMode.READ, None, + reader_schema_raw, obj_dict, get_inline_tags(reader_schema), + field_transformer) - if self._from_dict is not None: - return self._from_dict(obj_dict, ctx) + if self._from_dict is not None: + return self._from_dict(obj_dict, ctx) - return obj_dict + return obj_dict def _get_parsed_schema(self, schema: Schema) -> AvroSchema: parsed_schema = self._parsed_schemas.get_parsed_schema(schema) diff --git a/src/confluent_kafka/schema_registry/json_schema.py b/src/confluent_kafka/schema_registry/json_schema.py index 87501c347..1e7067240 100644 --- a/src/confluent_kafka/schema_registry/json_schema.py +++ b/src/confluent_kafka/schema_registry/json_schema.py @@ -35,15 +35,20 @@ Schema, topic_subject_name_strategy, RuleKind, - RuleMode, SchemaRegistryClient) + RuleMode, SchemaRegistryClient, + prefix_schema_id_serializer, + dual_schema_id_deserializer) from confluent_kafka.schema_registry.rule_registry import RuleRegistry from confluent_kafka.schema_registry.serde import BaseSerializer, \ BaseDeserializer, RuleContext, FieldTransform, FieldType, \ - RuleConditionError, ParsedSchemaCache + RuleConditionError, ParsedSchemaCache, SchemaId from confluent_kafka.serialization import (SerializationError, SerializationContext) +JSON_TYPE = "JSON" + + JsonMessage = Union[ None, # 'null' Avro type str, # 'string' and 'enum' @@ -128,10 +133,10 @@ class JSONSerializer(BaseSerializer): | ``normalize.schemas`` | bool | transform schemas to have a consistent format, | | | | including ordering properties and references. | +-----------------------------+----------+----------------------------------------------------+ - | | | Whether to use the given schema ID for | - | ``use.schema.id`` | int | serialization. | - | | | | - +-----------------------------+----------+--------------------------------------------------+ + | | | Whether to use the given schema ID for | + | ``use.schema.id`` | int | serialization. | + | | | | + +-----------------------------+----------+----------------------------------------------------+ | | | Whether to use the latest subject version for | | ``use.latest.version`` | bool | serialization. | | | | | @@ -159,6 +164,12 @@ class JSONSerializer(BaseSerializer): | | | | | | | Defaults to topic_subject_name_strategy. | +-----------------------------+----------+----------------------------------------------------+ + | | | Callable(bytes, SerializationContext, schema_id) | + | | | -> bytes | + | | | | + | ``schema.id.serializer`` | callable | Defines how the schema id/guid is serialized. | + | | | Defaults to prefix_schema_id_serializer. | + +-----------------------------+----------+----------------------------------------------------+ | | | Whether to validate the payload against the | | ``validate`` | bool | the given schema. | | | | | @@ -224,6 +235,7 @@ class JSONSerializer(BaseSerializer): 'use.latest.version': False, 'use.latest.with.metadata': None, 'subject.name.strategy': topic_subject_name_strategy, + 'schema.id.serializer': prefix_schema_id_serializer, 'validate': True} def __init__( @@ -292,6 +304,10 @@ def __init__( if not callable(self._subject_name_func): raise ValueError("subject.name.strategy must be callable") + self._schema_id_serializer = conf_copy.pop('schema.id.serializer') + if not callable(self._schema_id_serializer): + raise ValueError("schema.id.serializer must be callable") + self._validate = conf_copy.pop('validate') if not isinstance(self._normalize_schemas, bool): raise ValueError("validate must be a boolean value") @@ -339,21 +355,20 @@ def __call__(self, obj: object, ctx: Optional[SerializationContext] = None) -> O subject = self._subject_name_func(ctx, self._schema_name) latest_schema = self._get_reader_schema(subject) if latest_schema is not None: - self._schema_id = latest_schema.schema_id + self._schema_id = SchemaId(JSON_TYPE, latest_schema.schema_id, latest_schema.guid) elif subject not in self._known_subjects: # Check to ensure this schema has been registered under subject_name. if self._auto_register: # The schema name will always be the same. We can't however register # a schema without a subject so we set the schema_id here to handle # the initial registration. - self._schema_id = self._registry.register_schema(subject, - self._schema, - self._normalize_schemas) + registered_schema = self._registry.register_schema_full_response( + subject, self._schema, self._normalize_schemas) + self._schema_id = SchemaId(JSON_TYPE, registered_schema.schema_id, registered_schema.guid) else: - registered_schema = self._registry.lookup_schema(subject, - self._schema, - self._normalize_schemas) - self._schema_id = registered_schema.schema_id + registered_schema = self._registry.lookup_schema( + subject, self._schema, self._normalize_schemas) + self._schema_id = SchemaId(JSON_TYPE, registered_schema.schema_id, registered_schema.guid) self._known_subjects.add(subject) @@ -385,16 +400,13 @@ def __call__(self, obj: object, ctx: Optional[SerializationContext] = None) -> O raise SerializationError(ve.message) with _ContextStringIO() as fo: - # Write the magic byte and schema ID in network byte order (big endian) - fo.write(struct.pack(">bI", _MAGIC_BYTE_V0, self._schema_id)) # JSON dump always writes a str never bytes # https://docs.python.org/3/library/json.html encoded_value = self._json_encode(value) if isinstance(encoded_value, str): encoded_value = encoded_value.encode("utf8") fo.write(encoded_value) - - return fo.getvalue() + return self._schema_id_serializer(fo.getvalue(), ctx, self._schema_id) def _get_parsed_schema(self, schema: Schema) -> Tuple[Optional[JsonSchema], Optional[Registry]]: if schema is None: @@ -453,6 +465,12 @@ class JSONDeserializer(BaseDeserializer): | | | | | | | Defaults to topic_subject_name_strategy. | +-----------------------------+----------+----------------------------------------------------+ + | | | Callable(bytes, SerializationContext, schema_id) | + | | | -> io.BytesIO | + | | | | + | ``schema.id.deserializer`` | callable | Defines how the schema id/guid is deserialized. | + | | | Defaults to dual_schema_id_deserializer. | + +-----------------------------+----------+----------------------------------------------------+ | | | Whether to validate the payload against the | | ``validate`` | bool | the given schema. | | | | | @@ -479,6 +497,7 @@ class JSONDeserializer(BaseDeserializer): _default_conf = {'use.latest.version': False, 'use.latest.with.metadata': None, 'subject.name.strategy': topic_subject_name_strategy, + 'schema.id.deserializer': dual_schema_id_deserializer, 'validate': True} def __init__( @@ -533,6 +552,10 @@ def __init__( if not callable(self._subject_name_func): raise ValueError("subject.name.strategy must be callable") + self._schema_id_deserializer = conf_copy.pop('schema.id.deserializer') + if not callable(self._subject_name_func): + raise ValueError("schema.id.deserializer must be callable") + self._validate = conf_copy.pop('validate') if not isinstance(self._validate, bool): raise ValueError("validate must be a boolean value") @@ -577,75 +600,65 @@ def __call__(self, data: bytes, ctx: Optional[SerializationContext] = None) -> U if data is None: return None - if len(data) <= 5: - raise SerializationError("Expecting data framing of length 6 bytes or " - "more but total data size is {} bytes. This " - "message was not produced with a Confluent " - "Schema Registry serializer".format(len(data))) - subject = self._subject_name_func(ctx, None) latest_schema = None if subject is not None and self._registry is not None: latest_schema = self._get_reader_schema(subject) - with _ContextStringIO(data) as payload: - magic, schema_id = struct.unpack('>bI', payload.read(5)) - if magic != _MAGIC_BYTE_V0: - raise SerializationError("Unexpected magic byte {}. This message " - "was not produced with a Confluent " - "Schema Registry serializer".format(magic)) - - # JSON documents are self-describing; no need to query schema - obj_dict = self._json_decode(payload.read()) - - if self._registry is not None: - writer_schema_raw = self._registry.get_schema(schema_id) - writer_schema, writer_ref_registry = self._get_parsed_schema(writer_schema_raw) - if subject is None: - subject = self._subject_name_func(ctx, writer_schema.get("title")) - if subject is not None: - latest_schema = self._get_reader_schema(subject) - else: - writer_schema_raw = None - writer_schema, writer_ref_registry = None, None - - if latest_schema is not None: - migrations = self._get_migrations(subject, writer_schema_raw, latest_schema, None) - reader_schema_raw = latest_schema.schema - reader_schema, reader_ref_registry = self._get_parsed_schema(latest_schema.schema) - elif self._schema is not None: - migrations = None - reader_schema_raw = self._schema - reader_schema, reader_ref_registry = self._reader_schema, self._ref_registry - else: - migrations = None - reader_schema_raw = writer_schema_raw - reader_schema, reader_ref_registry = writer_schema, writer_ref_registry - - if migrations: - obj_dict = self._execute_migrations(ctx, subject, migrations, obj_dict) - - reader_root_resource = Resource.from_contents( - reader_schema, default_specification=DEFAULT_SPEC) - reader_ref_resolver = reader_ref_registry.resolver_with_root(reader_root_resource) - field_transformer = lambda rule_ctx, field_transform, message: ( # noqa: E731 - transform(rule_ctx, reader_schema, reader_ref_registry, reader_ref_resolver, - "$", message, field_transform)) - obj_dict = self._execute_rules(ctx, subject, RuleMode.READ, None, - reader_schema_raw, obj_dict, None, - field_transformer) - - if self._validate: - try: - validator = self._get_validator(reader_schema_raw, reader_schema, reader_ref_registry) - validator.validate(obj_dict) - except ValidationError as ve: - raise SerializationError(ve.message) - - if self._from_dict is not None: - return self._from_dict(obj_dict, ctx) - - return obj_dict + schema_id = SchemaId(JSON_TYPE) + payload = self._schema_id_deserializer(data, ctx, schema_id) + + # JSON documents are self-describing; no need to query schema + obj_dict = self._json_decode(payload.read()) + + if self._registry is not None: + writer_schema_raw = self._get_schema(schema_id, subject) + writer_schema, writer_ref_registry = self._get_parsed_schema(writer_schema_raw) + if subject is None: + subject = self._subject_name_func(ctx, writer_schema.get("title")) + if subject is not None: + latest_schema = self._get_reader_schema(subject) + else: + writer_schema_raw = None + writer_schema, writer_ref_registry = None, None + + if latest_schema is not None: + migrations = self._get_migrations(subject, writer_schema_raw, latest_schema, None) + reader_schema_raw = latest_schema.schema + reader_schema, reader_ref_registry = self._get_parsed_schema(latest_schema.schema) + elif self._schema is not None: + migrations = None + reader_schema_raw = self._schema + reader_schema, reader_ref_registry = self._reader_schema, self._ref_registry + else: + migrations = None + reader_schema_raw = writer_schema_raw + reader_schema, reader_ref_registry = writer_schema, writer_ref_registry + + if migrations: + obj_dict = self._execute_migrations(ctx, subject, migrations, obj_dict) + + reader_root_resource = Resource.from_contents( + reader_schema, default_specification=DEFAULT_SPEC) + reader_ref_resolver = reader_ref_registry.resolver_with_root(reader_root_resource) + field_transformer = lambda rule_ctx, field_transform, message: ( # noqa: E731 + transform(rule_ctx, reader_schema, reader_ref_registry, reader_ref_resolver, + "$", message, field_transform)) + obj_dict = self._execute_rules(ctx, subject, RuleMode.READ, None, + reader_schema_raw, obj_dict, None, + field_transformer) + + if self._validate: + try: + validator = self._get_validator(reader_schema_raw, reader_schema, reader_ref_registry) + validator.validate(obj_dict) + except ValidationError as ve: + raise SerializationError(ve.message) + + if self._from_dict is not None: + return self._from_dict(obj_dict, ctx) + + return obj_dict def _get_parsed_schema(self, schema: Schema) -> Tuple[Optional[JsonSchema], Optional[Registry]]: if schema is None: diff --git a/src/confluent_kafka/schema_registry/mock_schema_registry_client.py b/src/confluent_kafka/schema_registry/mock_schema_registry_client.py index f1b100f44..d5ca01da5 100644 --- a/src/confluent_kafka/schema_registry/mock_schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/mock_schema_registry_client.py @@ -30,6 +30,7 @@ def __init__(self): self.lock = Lock() self.max_id = 0 self.schema_id_index = {} + self.schema_guid_index = {} self.schema_index = {} self.subject_schemas = defaultdict(set) @@ -38,11 +39,13 @@ def set(self, registered_schema: RegisteredSchema) -> RegisteredSchema: self.max_id += 1 rs = RegisteredSchema( schema_id=self.max_id, + guid=registered_schema.guid, schema=registered_schema.schema, subject=registered_schema.subject, version=registered_schema.version ) self.schema_id_index[rs.schema_id] = rs + self.schema_guid_index[rs.guid] = rs self.schema_index[rs.schema] = rs.schema_id self.subject_schemas[rs.subject].add(rs) return rs @@ -52,6 +55,11 @@ def get_schema(self, schema_id: int) -> Optional[Schema]: rs = self.schema_id_index.get(schema_id, None) return rs.schema if rs else None + def get_schema_by_guid(self, guid: str) -> Optional[Schema]: + with self.lock: + rs = self.schema_guid_index.get(guid, None) + return rs.schema if rs else None + def get_registered_schema_by_schema( self, subject_name: str, @@ -131,6 +139,7 @@ def remove_by_subject(self, subject_name: str) -> List[int]: def clear(self): with self.lock: self.schema_id_index.clear() + self.schema_guid_index.clear() self.schema_index.clear() self.subject_schemas.clear() @@ -161,6 +170,7 @@ def register_schema_full_response( registered_schema = RegisteredSchema( schema_id=0, + guid=None, schema=schema, subject=subject_name, version=latest_version @@ -180,6 +190,15 @@ def get_schema( raise SchemaRegistryError(404, 40400, "Schema Not Found") + def get_schema_by_guid( + self, guid: str, fmt: Optional[str] = None + ) -> 'Schema': + schema = self._store.get_schema_by_guid(guid) + if schema is not None: + return schema + + raise SchemaRegistryError(404, 40400, "Schema Not Found") + def lookup_schema( self, subject_name: str, schema: 'Schema', normalize_schemas: bool = False, deleted: bool = False diff --git a/src/confluent_kafka/schema_registry/protobuf.py b/src/confluent_kafka/schema_registry/protobuf.py index fb1839041..c7078d9cc 100644 --- a/src/confluent_kafka/schema_registry/protobuf.py +++ b/src/confluent_kafka/schema_registry/protobuf.py @@ -18,7 +18,6 @@ import io import sys import base64 -import struct import warnings from collections import deque from decimal import Context, Decimal, MAX_PREC @@ -40,9 +39,9 @@ from google.protobuf.message import DecodeError, Message from google.protobuf.message_factory import GetMessageClass -from . import (_MAGIC_BYTE_V0, - reference_subject_name_strategy, - topic_subject_name_strategy, SchemaRegistryClient) +from . import (reference_subject_name_strategy, + topic_subject_name_strategy, SchemaRegistryClient, + prefix_schema_id_serializer, dual_schema_id_deserializer) from .confluent.types import decimal_pb2 from .rule_registry import RuleRegistry from .schema_registry_client import (Schema, @@ -52,7 +51,7 @@ from confluent_kafka.serialization import SerializationError, \ SerializationContext from .serde import BaseSerializer, BaseDeserializer, RuleContext, \ - FieldTransform, FieldType, RuleConditionError, ParsedSchemaCache + FieldTransform, FieldType, RuleConditionError, ParsedSchemaCache, SchemaId # Convert an int to bytes (inverse of ord()) # Python3.chr() -> Unicode @@ -77,6 +76,9 @@ def _bytes(v: int) -> str: return chr(v) +PROTOBUF_TYPE = "PROTOBUF" + + class _ContextStringIO(io.BytesIO): """ Wrapper to allow use of StringIO via 'with' constructs. @@ -311,6 +313,12 @@ class ProtobufSerializer(BaseSerializer): | | | | | | | Defaults to reference_subject_name_strategy | +-------------------------------------+----------+------------------------------------------------------+ + | | | Callable(bytes, SerializationContext, schema_id) | + | | | -> bytes | + | | | | + | ``schema.id.serializer`` | callable | Defines how the schema id/guid is serialized. | + | | | Defaults to prefix_schema_id_serializer. | + +-------------------------------------+----------+------------------------------------------------------+ | ``use.deprecated.format`` | bool | Specifies whether the Protobuf serializer should | | | | serialize message indexes without zig-zag encoding. | | | | This option must be explicitly configured as older | @@ -372,6 +380,7 @@ class ProtobufSerializer(BaseSerializer): 'skip.known.types': True, 'subject.name.strategy': topic_subject_name_strategy, 'reference.subject.name.strategy': reference_subject_name_strategy, + 'schema.id.serializer': prefix_schema_id_serializer, 'use.deprecated.format': False, } @@ -446,6 +455,10 @@ def __init__( if not callable(self._ref_reference_subject_func): raise ValueError("subject.name.strategy must be callable") + self._schema_id_serializer = conf_copy.pop('schema.id.serializer') + if not callable(self._schema_id_serializer): + raise ValueError("schema.id.serializer must be callable") + if len(conf_copy) > 0: raise ValueError("Unrecognized properties: {}" .format(", ".join(conf_copy.keys()))) @@ -571,7 +584,7 @@ def __call__(self, message: Message, ctx: Optional[SerializationContext] = None) latest_schema = self._get_reader_schema(subject, fmt='serialized') if latest_schema is not None: - self._schema_id = latest_schema.schema_id + self._schema_id = SchemaId(PROTOBUF_TYPE, latest_schema.schema_id, latest_schema.guid) elif subject not in self._known_subjects and ctx is not None: references = self._resolve_dependencies(ctx, message.DESCRIPTOR.file) self._schema = Schema( @@ -581,12 +594,13 @@ def __call__(self, message: Message, ctx: Optional[SerializationContext] = None) ) if self._auto_register: - self._schema_id = self._registry.register_schema(subject, - self._schema, - self._normalize_schemas) + registered_schema = self._registry.register_schema_full_response( + subject, self._schema, self._normalize_schemas) + self._schema_id = SchemaId(PROTOBUF_TYPE, registered_schema.schema_id, registered_schema.guid) else: - self._schema_id = self._registry.lookup_schema( - subject, self._schema, self._normalize_schemas).schema_id + registered_schema = self._registry.lookup_schema( + subject, self._schema, self._normalize_schemas) + self._schema_id = SchemaId(PROTOBUF_TYPE, registered_schema.schema_id, registered_schema.guid) self._known_subjects.add(subject) @@ -601,16 +615,10 @@ def __call__(self, message: Message, ctx: Optional[SerializationContext] = None) field_transformer) with _ContextStringIO() as fo: - # Write the magic byte and schema ID in network byte order - # (big endian) - fo.write(struct.pack('>bI', _MAGIC_BYTE_V0, self._schema_id)) - # write the index array that specifies the message descriptor - # of the serialized data. - self._encode_varints(fo, self._index_array, - zigzag=not self._use_deprecated_format) # write the serialized data itself fo.write(message.SerializeToString()) - return fo.getvalue() + self._schema_id.message_indexes = self._index_array + return self._schema_id_serializer(fo.getvalue(), ctx, self._schema_id) def _get_parsed_schema(self, schema: Schema) -> Tuple[descriptor_pb2.FileDescriptorProto, DescriptorPool]: result = self._parsed_schemas.get_parsed_schema(schema) @@ -658,6 +666,12 @@ class ProtobufDeserializer(BaseDeserializer): | | | | | | | Defaults to topic_subject_name_strategy. | +-------------------------------------+----------+------------------------------------------------------+ + | | | Callable(bytes, SerializationContext, schema_id) | + | | | -> io.BytesIO | + | | | | + | ``schema.id.deserializer`` | callable | Defines how the schema id/guid is deserialized. | + | | | Defaults to dual_schema_id_deserializer. | + +-------------------------------------+----------+------------------------------------------------------+ | ``use.deprecated.format`` | bool | Specifies whether the Protobuf deserializer should | | | | deserialize message indexes without zig-zag encoding.| | | | This option must be explicitly configured as older | @@ -681,6 +695,7 @@ class ProtobufDeserializer(BaseDeserializer): 'use.latest.version': False, 'use.latest.with.metadata': None, 'subject.name.strategy': topic_subject_name_strategy, + 'schema.id.deserializer': dual_schema_id_deserializer, 'use.deprecated.format': False, } @@ -726,6 +741,10 @@ def __init__( if not callable(self._subject_name_func): raise ValueError("subject.name.strategy must be callable") + self._schema_id_deserializer = conf_copy.pop('schema.id.deserializer') + if not callable(self._schema_id_deserializer): + raise ValueError("schema.id.deserializer must be callable") + self._use_deprecated_format = conf_copy.pop('use.deprecated.format') if not isinstance(self._use_deprecated_format, bool): raise ValueError("use.deprecated.format must be a boolean value") @@ -746,85 +765,6 @@ def __init__( rule.configure(self._registry.config() if self._registry else {}, rule_conf if rule_conf else {}) - @staticmethod - def _decode_varint(buf: io.BytesIO, zigzag: bool = True) -> int: - """ - Decodes a single varint from a buffer. - - Args: - buf (BytesIO): buffer to read from - zigzag (bool): decode as zigzag or uvarint - - Returns: - int: decoded varint - - Raises: - EOFError: if buffer is empty - """ - - value = 0 - shift = 0 - try: - while True: - i = ProtobufDeserializer._read_byte(buf) - - value |= (i & 0x7f) << shift - shift += 7 - if not (i & 0x80): - break - - if zigzag: - value = (value >> 1) ^ -(value & 1) - - return value - - except EOFError: - raise EOFError("Unexpected EOF while reading index") - - @staticmethod - def _read_byte(buf: io.BytesIO) -> int: - """ - Read one byte from buf as an int. - - Args: - buf (BytesIO): The buffer to read from. - - .. _ord: - https://docs.python.org/2/library/functions.html#ord - """ - - i = buf.read(1) - if i == b'': - raise EOFError("Unexpected EOF encountered") - return ord(i) - - @staticmethod - def _read_index_array(buf: io.BytesIO, zigzag: bool = True) -> List[int]: - """ - Read an index array from buf that specifies the message - descriptor of interest in the file descriptor. - - Args: - buf (BytesIO): The buffer to read from. - - Returns: - list of int: The index array. - """ - - size = ProtobufDeserializer._decode_varint(buf, zigzag=zigzag) - if size < 0 or size > 100000: - raise DecodeError("Invalid Protobuf msgidx array length") - - if size == 0: - return [0] - - msg_index = [] - for _ in range(size): - msg_index.append(ProtobufDeserializer._decode_varint(buf, - zigzag=zigzag)) - - return msg_index - def __call__(self, data: bytes, ctx: Optional[SerializationContext] = None) -> Optional[Message]: """ Deserialize a serialized protobuf message with Confluent Schema Registry @@ -848,82 +788,70 @@ def __call__(self, data: bytes, ctx: Optional[SerializationContext] = None) -> O if data is None: return None - # SR wire protocol + msg_index length - if len(data) < 6: - raise SerializationError("Expecting data framing of length 6 bytes or " - "more but total data size is {} bytes. This " - "message was not produced with a Confluent " - "Schema Registry serializer".format(len(data))) - subject = self._subject_name_func(ctx, None) latest_schema = None if subject is not None and self._registry is not None: latest_schema = self._get_reader_schema(subject, fmt='serialized') - with _ContextStringIO(data) as payload: - magic, schema_id = struct.unpack('>bI', payload.read(5)) - if magic != _MAGIC_BYTE_V0: - raise SerializationError("Unknown magic byte. This message was " - "not produced with a Confluent " - "Schema Registry serializer") - - msg_index = self._read_index_array(payload, zigzag=not self._use_deprecated_format) - - if self._registry is not None: - writer_schema_raw = self._registry.get_schema(schema_id, fmt='serialized') - fd_proto, pool = self._get_parsed_schema(writer_schema_raw) - writer_schema = pool.FindFileByName(fd_proto.name) - writer_desc = self._get_message_desc(pool, writer_schema, msg_index) - if subject is None: - subject = self._subject_name_func(ctx, writer_desc.full_name) - if subject is not None: - latest_schema = self._get_reader_schema(subject, fmt='serialized') - else: - writer_schema_raw = None - writer_schema = None - - if latest_schema is not None: - migrations = self._get_migrations(subject, writer_schema_raw, latest_schema, None) - reader_schema_raw = latest_schema.schema - fd_proto, pool = self._get_parsed_schema(latest_schema.schema) - reader_schema = pool.FindFileByName(fd_proto.name) - else: - migrations = None - reader_schema_raw = writer_schema_raw - reader_schema = writer_schema - - if reader_schema is not None: - # Initialize reader desc to first message in file - reader_desc = self._get_message_desc(pool, reader_schema, [0]) - # Attempt to find a reader desc with the same name as the writer - reader_desc = reader_schema.message_types_by_name.get(writer_desc.name, reader_desc) - - if migrations: - msg = GetMessageClass(writer_desc)() - try: - msg.ParseFromString(payload.read()) - except DecodeError as e: - raise SerializationError(str(e)) - - obj_dict = json_format.MessageToDict(msg, True) - obj_dict = self._execute_migrations(ctx, subject, migrations, obj_dict) - msg = GetMessageClass(reader_desc)() - msg = json_format.ParseDict(obj_dict, msg) - else: - # Protobuf Messages are self-describing; no need to query schema - msg = self._msg_class() - try: - msg.ParseFromString(payload.read()) - except DecodeError as e: - raise SerializationError(str(e)) + schema_id = SchemaId(PROTOBUF_TYPE) + payload = self._schema_id_deserializer(data, ctx, schema_id) + msg_index = schema_id.message_indexes + + if self._registry is not None: + writer_schema_raw = self._get_schema(schema_id, subject, fmt='serialized') + fd_proto, pool = self._get_parsed_schema(writer_schema_raw) + writer_schema = pool.FindFileByName(fd_proto.name) + writer_desc = self._get_message_desc(pool, writer_schema, msg_index) + if subject is None: + subject = self._subject_name_func(ctx, writer_desc.full_name) + if subject is not None: + latest_schema = self._get_reader_schema(subject, fmt='serialized') + else: + writer_schema_raw = None + writer_schema = None + + if latest_schema is not None: + migrations = self._get_migrations(subject, writer_schema_raw, latest_schema, None) + reader_schema_raw = latest_schema.schema + fd_proto, pool = self._get_parsed_schema(latest_schema.schema) + reader_schema = pool.FindFileByName(fd_proto.name) + else: + migrations = None + reader_schema_raw = writer_schema_raw + reader_schema = writer_schema + + if reader_schema is not None: + # Initialize reader desc to first message in file + reader_desc = self._get_message_desc(pool, reader_schema, [0]) + # Attempt to find a reader desc with the same name as the writer + reader_desc = reader_schema.message_types_by_name.get(writer_desc.name, reader_desc) + + if migrations: + msg = GetMessageClass(writer_desc)() + try: + msg.ParseFromString(payload.read()) + except DecodeError as e: + raise SerializationError(str(e)) + + obj_dict = json_format.MessageToDict(msg, True) + obj_dict = self._execute_migrations(ctx, subject, migrations, obj_dict) + msg = GetMessageClass(reader_desc)() + msg = json_format.ParseDict(obj_dict, msg) + else: + # Protobuf Messages are self-describing; no need to query schema + msg = self._msg_class() + try: + msg.ParseFromString(payload.read()) + except DecodeError as e: + raise SerializationError(str(e)) - field_transformer = lambda rule_ctx, field_transform, message: ( # noqa: E731 - transform(rule_ctx, reader_desc, message, field_transform)) - msg = self._execute_rules(ctx, subject, RuleMode.READ, None, - reader_schema_raw, msg, None, - field_transformer) + field_transformer = lambda rule_ctx, field_transform, message: ( # noqa: E731 + transform(rule_ctx, reader_desc, message, field_transform)) + msg = self._execute_rules(ctx, subject, RuleMode.READ, None, + reader_schema_raw, msg, None, + field_transformer) - return msg + return msg def _get_parsed_schema(self, schema: Schema) -> Tuple[descriptor_pb2.FileDescriptorProto, DescriptorPool]: result = self._parsed_schemas.get_parsed_schema(schema) @@ -1091,7 +1019,7 @@ def _is_builtin(name: str) -> bool: name.startswith('google/type/') -def decimalToProtobuf(value: Decimal, scale: int) -> decimal_pb2.Decimal: +def decimal_to_protobuf(value: Decimal, scale: int) -> decimal_pb2.Decimal: """ Converts a Decimal to a Protobuf value. @@ -1132,7 +1060,7 @@ def decimalToProtobuf(value: Decimal, scale: int) -> decimal_pb2.Decimal: decimal_context = Context() -def protobufToDecimal(value: decimal_pb2.Decimal) -> Decimal: +def protobuf_to_decimal(value: decimal_pb2.Decimal) -> Decimal: """ Converts a Protobuf value to Decimal. diff --git a/src/confluent_kafka/schema_registry/schema_registry_client.py b/src/confluent_kafka/schema_registry/schema_registry_client.py index 4cadf8bfd..c83f7e04b 100644 --- a/src/confluent_kafka/schema_registry/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/schema_registry_client.py @@ -520,26 +520,31 @@ class _SchemaCache(object): def __init__(self): self.lock = Lock() self.schema_id_index = defaultdict(dict) + self.schema_guid_index = defaultdict(dict) self.schema_index = defaultdict(dict) self.rs_id_index = defaultdict(dict) self.rs_version_index = defaultdict(dict) self.rs_schema_index = defaultdict(dict) - def set_schema(self, subject: str, schema_id: int, schema: 'Schema'): + def set_schema(self, subject: Optional[str], schema_id: int, guid: Optional[str], schema: 'Schema'): """ Add a Schema identified by schema_id to the cache. Args: subject (str): The subject this schema is associated with - schema_id (int): Schema's registration id + schema_id (int): Schema's id + + guid (str): Schema's guid schema (Schema): Schema instance """ with self.lock: - self.schema_id_index[subject][schema_id] = schema + self.schema_id_index[subject][schema_id] = (guid, schema) self.schema_index[subject][schema] = schema_id + if guid is not None: + self.schema_guid_index[guid] = schema def set_registered_schema(self, schema: 'Schema', registered_schema: 'RegisteredSchema'): """ @@ -551,15 +556,17 @@ def set_registered_schema(self, schema: 'Schema', registered_schema: 'Registered subject = registered_schema.subject schema_id = registered_schema.schema_id + guid = registered_schema.guid version = registered_schema.version with self.lock: - self.schema_id_index[subject][schema_id] = schema + self.schema_id_index[subject][schema_id] = (guid, schema) + self.schema_guid_index[guid] = schema self.schema_index[subject][schema] = schema_id self.rs_id_index[subject][schema_id] = registered_schema self.rs_version_index[subject][version] = registered_schema self.rs_schema_index[subject][schema] = registered_schema - def get_schema_by_id(self, subject: str, schema_id: int) -> Optional['Schema']: + def get_schema_by_id(self, subject: str, schema_id: int) -> Optional[Tuple[str, 'Schema']]: """ Get the schema instance associated with schema id from the cache. @@ -569,12 +576,26 @@ def get_schema_by_id(self, subject: str, schema_id: int) -> Optional['Schema']: schema_id (int): Id used to identify a schema Returns: - Schema: The schema if known; else None + Tuple[str, Schema]: The guid and schema if known; else None """ with self.lock: return self.schema_id_index.get(subject, {}).get(schema_id, None) + def get_schema_by_guid(self, guid: str) -> Optional['Schema']: + """ + Get the schema instance associated with guid from the cache. + + Args: + guid (str): Guid used to identify a schema + + Returns: + Schema: The schema if known; else None + """ + + with self.lock: + return self.schema_guid_index.get(guid, None) + def get_id_by_schema(self, subject: str, schema: 'Schema') -> Optional[int]: """ Get the schema id associated with schema instance from the cache. @@ -697,6 +718,7 @@ def clear(self): with self.lock: self.schema_id_index.clear() + self.schema_guid_index.clear() self.schema_index.clear() self.rs_id_index.clear() self.rs_version_index.clear() @@ -843,7 +865,9 @@ def register_schema_full_response( schema_id = self._cache.get_id_by_schema(subject_name, schema) if schema_id is not None: - return RegisteredSchema(schema_id, schema, subject_name, None) + tuple = self._cache.get_schema_by_id(subject_name, schema_id) + if tuple is not None: + return RegisteredSchema(schema_id, tuple[0], tuple[1], subject_name, None) request = schema.to_dict() @@ -854,7 +878,8 @@ def register_schema_full_response( registered_schema = RegisteredSchema.from_dict(response) # The registered schema may not be fully populated - self._cache.set_schema(subject_name, registered_schema.schema_id, schema) + self._cache.set_schema(subject_name, registered_schema.schema_id, + registered_schema.guid, registered_schema.schema) return registered_schema @@ -881,9 +906,9 @@ def get_schema( `GET Schema API Reference `_ """ # noqa: E501 - schema = self._cache.get_schema_by_id(subject_name, schema_id) - if schema is not None: - return schema + tuple = self._cache.get_schema_by_id(subject_name, schema_id) + if tuple is not None: + return tuple[1] query = {'subject': subject_name} if subject_name is not None else None if fmt is not None: @@ -893,11 +918,49 @@ def get_schema( query = {'format': fmt} response = self._rest_client.get('schemas/ids/{}'.format(schema_id), query) - schema = Schema.from_dict(response) + registered_schema = RegisteredSchema.from_dict(response) - self._cache.set_schema(subject_name, schema_id, schema) + self._cache.set_schema(subject_name, schema_id, + registered_schema.guid, registered_schema.schema) - return schema + return registered_schema.schema + + def get_schema_by_guid( + self, guid: str, fmt: Optional[str] = None + ) -> 'Schema': + """ + Fetches the schema associated with ``guid`` from the + Schema Registry. The result is cached so subsequent attempts will not + require an additional round-trip to the Schema Registry. + + Args: + guid (str): Schema guid + fmt (str): Format of the schema + + Returns: + Schema: Schema instance identified by the ``guid`` + + Raises: + SchemaRegistryError: If schema can't be found. + + See Also: + `GET Schema API Reference `_ + """ # noqa: E501 + + schema = self._cache.get_schema_by_guid(guid) + if schema is not None: + return schema + + if fmt is not None: + query = {'format': fmt} + response = self._rest_client.get('schemas/guids/{}'.format(guid), query) + + registered_schema = RegisteredSchema.from_dict(response) + + self._cache.set_schema(None, registered_schema.schema_id, + registered_schema.guid, registered_schema.schema) + + return registered_schema.schema def lookup_schema( self, subject_name: str, schema: 'Schema', @@ -937,6 +1000,7 @@ def lookup_schema( # Ensure the schema matches the input registered_schema = RegisteredSchema( schema_id=result.schema_id, + guid=result.guid, subject=result.subject, version=result.version, schema=schema, @@ -1934,6 +1998,7 @@ class RegisteredSchema: """ schema_id: Optional[int] + guid: Optional[str] schema: Optional[Schema] subject: Optional[str] version: Optional[int] @@ -1943,6 +2008,8 @@ def to_dict(self) -> Dict[str, Any]: schema_id = self.schema_id + guid = self.guid + subject = self.subject version = self.version @@ -1952,6 +2019,8 @@ def to_dict(self) -> Dict[str, Any]: field_dict = schema.to_dict() if schema_id is not None: field_dict["id"] = schema_id + if guid is not None: + field_dict["guid"] = guid if subject is not None: field_dict["subject"] = subject if version is not None: @@ -1967,12 +2036,15 @@ def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: schema_id = d.pop("id", None) + guid = d.pop("guid", None) + subject = d.pop("subject", None) version = d.pop("version", None) schema = cls( schema_id=schema_id, + guid=guid, schema=schema, subject=subject, version=version, diff --git a/src/confluent_kafka/schema_registry/serde.py b/src/confluent_kafka/schema_registry/serde.py index e36a2097e..73b232c90 100644 --- a/src/confluent_kafka/schema_registry/serde.py +++ b/src/confluent_kafka/schema_registry/serde.py @@ -55,11 +55,13 @@ class SchemaId(object): __slots__ = ['schema_type', 'id', 'guid', 'message_indexes'] - def __init__(self, schema_type: str): + def __init__(self, schema_type: str, id: Optional[int] = None, + guid: Optional[str] = None, + message_indexes: Optional[List[int]] = None): self.schema_type = schema_type - self.id = None - self.guid = None - self.message_indexes = None + self.id = id + self.guid = uuid.UUID(guid) if guid is not None else None + self.message_indexes = message_indexes def from_bytes(self, payload: io.BytesIO) -> io.BytesIO: magic = struct.unpack('>b', payload.read(1))[0] @@ -586,11 +588,20 @@ def _get_rule_action(self, ctx: RuleContext, action_name: str) -> Optional[RuleA class BaseSerializer(BaseSerde, Serializer): - __slots__ = ['_auto_register', '_normalize_schemas'] + __slots__ = ['_auto_register', '_normalize_schemas', '_schema_id_serializer'] class BaseDeserializer(BaseSerde, Deserializer): - __slots__ = [] + __slots__ = ['_schema_id_deserializer'] + + def _get_schema(self, schema_id: SchemaId, subject: Optional[str] = None, + fmt: Optional[str] = None) -> Schema: + if schema_id.id is not None: + return self._registry.get_schema(schema_id.id, subject, fmt) + elif schema_id.guid is not None: + return self._registry.get_schema_by_guid(schema_id.guid, fmt) + else: + raise SerializationError("Schema ID or GUID is not set") def _has_rules(self, rule_set: RuleSet, mode: RuleMode) -> bool: if rule_set is None: diff --git a/tests/schema_registry/test_json.py b/tests/schema_registry/test_json.py index 3c1fffb1c..42fc5d92d 100644 --- a/tests/schema_registry/test_json.py +++ b/tests/schema_registry/test_json.py @@ -25,7 +25,7 @@ from confluent_kafka.schema_registry import ( Schema, SchemaReference, - SchemaRegistryClient, + SchemaRegistryClient, RegisteredSchema, ) from confluent_kafka.schema_registry.json_schema import JSONDeserializer, JSONSerializer from confluent_kafka.schema_registry.rule_registry import RuleRegistry @@ -69,7 +69,12 @@ def test_custom_json_encoder(): # Create mock SchemaRegistryClient mock_schema_registry_client = Mock(spec=SchemaRegistryClient) - mock_schema_registry_client.register_schema.return_value = 1 # schema_id + mock_schema_registry_client.register_schema_full_response.return_value = RegisteredSchema( + schema_id=1, + guid=None, + schema=Schema(schema_str), + subject="topic-name-value", + version=1) # Use orjson.dumps as the custom encoder serializer = JSONSerializer( @@ -128,7 +133,12 @@ def test_custom_encoder_decoder_chain(): ctx = SerializationContext("topic-name", "value") mock_schema_registry_client = Mock(spec=SchemaRegistryClient) - mock_schema_registry_client.register_schema.return_value = 1 + mock_schema_registry_client.register_schema_full_response.return_value = RegisteredSchema( + schema_id=1, + guid=None, + schema=Schema(schema_str), + subject="topic-name-value", + version=1) def custom_encoder(obj): return orjson.dumps(obj, option=orjson.OPT_SORT_KEYS) @@ -170,7 +180,12 @@ def test_custom_encoding_with_complex_data(): test_data = {"nested": {"array": [1, 2, 3], "string": "test"}} mock_schema_registry_client = Mock(spec=SchemaRegistryClient) - mock_schema_registry_client.register_schema.return_value = 1 + mock_schema_registry_client.register_schema_full_response.return_value = RegisteredSchema( + schema_id=1, + guid=None, + schema=Schema(schema_str), + subject="topic-name-value", + version=1) def custom_encoder(obj): return json.dumps(obj, indent=2) diff --git a/tests/schema_registry/test_proto.py b/tests/schema_registry/test_proto.py index 0affbcaf9..7b92239c3 100644 --- a/tests/schema_registry/test_proto.py +++ b/tests/schema_registry/test_proto.py @@ -24,8 +24,9 @@ from confluent_kafka.schema_registry.protobuf import (ProtobufSerializer, ProtobufDeserializer, _create_index_array, - decimalToProtobuf, - protobufToDecimal) + decimal_to_protobuf, + protobuf_to_decimal) +from confluent_kafka.schema_registry.serde import SchemaId from tests.integration.schema_registry.data.proto import (DependencyTestProto_pb2, metadata_proto_pb2) @@ -56,7 +57,7 @@ def test_index_serialization(pb2, zigzag): # reset buffer cursor buf.seek(0) - decoded_msg_idx = ProtobufDeserializer._read_index_array(buf, zigzag=zigzag) + decoded_msg_idx = SchemaId._read_index_array(buf, zigzag=zigzag) buf.close() assert decoded_msg_idx == msg_idx @@ -84,7 +85,7 @@ def test_index_encoder(msg_idx, zigzag, expected_hex): # reset reader and test decoder buf.seek(0) - decoded_msg_idx = ProtobufDeserializer._read_index_array(buf, zigzag=zigzag) + decoded_msg_idx = SchemaId._read_index_array(buf, zigzag=zigzag) assert decoded_msg_idx == msg_idx @@ -103,6 +104,6 @@ def test_index_encoder(msg_idx, zigzag, expected_hex): ]) def test_proto_decimal(decimal, scale): input = Decimal(decimal) - converted = decimalToProtobuf(input, scale) - result = protobufToDecimal(converted) + converted = decimal_to_protobuf(input, scale) + result = protobuf_to_decimal(converted) assert result == input From e5c7973e760d0a3cf8924aa6d255bd859c330784 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Fri, 2 May 2025 19:29:17 -0700 Subject: [PATCH 04/12] Minor cleanup --- .../schema_registry/json_schema.py | 4 +--- src/confluent_kafka/schema_registry/protobuf.py | 1 + .../schema_registry/schema_registry_client.py | 15 ++++++++------- src/confluent_kafka/schema_registry/serde.py | 7 ++++--- 4 files changed, 14 insertions(+), 13 deletions(-) diff --git a/src/confluent_kafka/schema_registry/json_schema.py b/src/confluent_kafka/schema_registry/json_schema.py index 1e7067240..c98ef0934 100644 --- a/src/confluent_kafka/schema_registry/json_schema.py +++ b/src/confluent_kafka/schema_registry/json_schema.py @@ -19,7 +19,6 @@ from io import BytesIO import json -import struct from typing import Union, Optional, List, Set, Tuple, Callable import httpx @@ -31,8 +30,7 @@ from referencing import Registry, Resource from referencing._core import Resolver -from confluent_kafka.schema_registry import (_MAGIC_BYTE_V0, - Schema, +from confluent_kafka.schema_registry import (Schema, topic_subject_name_strategy, RuleKind, RuleMode, SchemaRegistryClient, diff --git a/src/confluent_kafka/schema_registry/protobuf.py b/src/confluent_kafka/schema_registry/protobuf.py index c7078d9cc..91be7694e 100644 --- a/src/confluent_kafka/schema_registry/protobuf.py +++ b/src/confluent_kafka/schema_registry/protobuf.py @@ -1025,6 +1025,7 @@ def decimal_to_protobuf(value: Decimal, scale: int) -> decimal_pb2.Decimal: Args: value (Decimal): The Decimal value to convert. + scale (int): The number of decimal points to convert. Returns: The Protobuf value. diff --git a/src/confluent_kafka/schema_registry/schema_registry_client.py b/src/confluent_kafka/schema_registry/schema_registry_client.py index c83f7e04b..26f99aa81 100644 --- a/src/confluent_kafka/schema_registry/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/schema_registry_client.py @@ -520,7 +520,7 @@ class _SchemaCache(object): def __init__(self): self.lock = Lock() self.schema_id_index = defaultdict(dict) - self.schema_guid_index = defaultdict(dict) + self.schema_guid_index = {} self.schema_index = defaultdict(dict) self.rs_id_index = defaultdict(dict) self.rs_version_index = defaultdict(dict) @@ -551,6 +551,7 @@ def set_registered_schema(self, schema: 'Schema', registered_schema: 'Registered Add a RegisteredSchema to the cache. Args: + schema (Schema): Schema instance registered_schema (RegisteredSchema): RegisteredSchema instance """ @@ -865,9 +866,9 @@ def register_schema_full_response( schema_id = self._cache.get_id_by_schema(subject_name, schema) if schema_id is not None: - tuple = self._cache.get_schema_by_id(subject_name, schema_id) - if tuple is not None: - return RegisteredSchema(schema_id, tuple[0], tuple[1], subject_name, None) + result = self._cache.get_schema_by_id(subject_name, schema_id) + if result is not None: + return RegisteredSchema(schema_id, result[0], result[1], subject_name, None) request = schema.to_dict() @@ -906,9 +907,9 @@ def get_schema( `GET Schema API Reference `_ """ # noqa: E501 - tuple = self._cache.get_schema_by_id(subject_name, schema_id) - if tuple is not None: - return tuple[1] + result = self._cache.get_schema_by_id(subject_name, schema_id) + if result is not None: + return result[1] query = {'subject': subject_name} if subject_name is not None else None if fmt is not None: diff --git a/src/confluent_kafka/schema_registry/serde.py b/src/confluent_kafka/schema_registry/serde.py index 73b232c90..ade1d9784 100644 --- a/src/confluent_kafka/schema_registry/serde.py +++ b/src/confluent_kafka/schema_registry/serde.py @@ -28,7 +28,8 @@ 'RuleContext', 'RuleConditionError', 'RuleError', - 'RuleExecutor'] + 'RuleExecutor', + 'SchemaId'] import abc import io @@ -55,11 +56,11 @@ class SchemaId(object): __slots__ = ['schema_type', 'id', 'guid', 'message_indexes'] - def __init__(self, schema_type: str, id: Optional[int] = None, + def __init__(self, schema_type: str, schema_id: Optional[int] = None, guid: Optional[str] = None, message_indexes: Optional[List[int]] = None): self.schema_type = schema_type - self.id = id + self.id = schema_id self.guid = uuid.UUID(guid) if guid is not None else None self.message_indexes = message_indexes From 06a30798ed5a9d8df64e925229f8f84f32af2655 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Fri, 2 May 2025 22:53:09 -0700 Subject: [PATCH 05/12] Add test --- tests/schema_registry/test_schema_id.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/tests/schema_registry/test_schema_id.py b/tests/schema_registry/test_schema_id.py index 4ca578776..22e5a3d03 100644 --- a/tests/schema_registry/test_schema_id.py +++ b/tests/schema_registry/test_schema_id.py @@ -44,7 +44,22 @@ def test_schema_id(): assert output == input -def test_schema_message_indexes(): +def test_schema_guid_with_message_indexes(): + schema_id = SchemaId("PROTOBUF") + input = bytes([ + 0x01, 0x89, 0x79, 0x17, 0x62, 0x23, 0x36, 0x41, 0x86, 0x96, 0x74, 0x29, 0x9b, 0x90, + 0xa8, 0x02, 0xe2, 0x06, 0x02, 0x04, 0x06 + ]) + schema_id.from_bytes(io.BytesIO(input)) + guid_str = str(schema_id.guid) + assert guid_str == "89791762-2336-4186-9674-299b90a802e2" + indexes = schema_id.message_indexes + assert indexes == [1, 2, 3] + output = schema_id.guid_to_bytes() + assert output == input + + +def test_schema_id_with_message_indexes(): schema_id = SchemaId("PROTOBUF") input = bytes([ 0x00, 0x00, 0x00, 0x00, 0x01, 0x06, 0x02, 0x04, 0x06 From eec9b6b10f4fc598f7985fe21938d943488d3a66 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Sat, 3 May 2025 09:59:57 -0700 Subject: [PATCH 06/12] Minor renaming --- src/confluent_kafka/schema_registry/avro.py | 2 +- src/confluent_kafka/schema_registry/json_schema.py | 2 +- src/confluent_kafka/schema_registry/protobuf.py | 2 +- src/confluent_kafka/schema_registry/serde.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/confluent_kafka/schema_registry/avro.py b/src/confluent_kafka/schema_registry/avro.py index fcd3b79f7..1e0109160 100644 --- a/src/confluent_kafka/schema_registry/avro.py +++ b/src/confluent_kafka/schema_registry/avro.py @@ -580,7 +580,7 @@ def __call__(self, data: bytes, ctx: Optional[SerializationContext] = None) -> U schema_id = SchemaId(AVRO_TYPE) payload = self._schema_id_deserializer(data, ctx, schema_id) - writer_schema_raw = self._get_schema(schema_id, subject) + writer_schema_raw = self._get_writer_schema(schema_id, subject) writer_schema = self._get_parsed_schema(writer_schema_raw) if subject is None: diff --git a/src/confluent_kafka/schema_registry/json_schema.py b/src/confluent_kafka/schema_registry/json_schema.py index c98ef0934..e47093a71 100644 --- a/src/confluent_kafka/schema_registry/json_schema.py +++ b/src/confluent_kafka/schema_registry/json_schema.py @@ -610,7 +610,7 @@ def __call__(self, data: bytes, ctx: Optional[SerializationContext] = None) -> U obj_dict = self._json_decode(payload.read()) if self._registry is not None: - writer_schema_raw = self._get_schema(schema_id, subject) + writer_schema_raw = self._get_writer_schema(schema_id, subject) writer_schema, writer_ref_registry = self._get_parsed_schema(writer_schema_raw) if subject is None: subject = self._subject_name_func(ctx, writer_schema.get("title")) diff --git a/src/confluent_kafka/schema_registry/protobuf.py b/src/confluent_kafka/schema_registry/protobuf.py index 91be7694e..026d0394e 100644 --- a/src/confluent_kafka/schema_registry/protobuf.py +++ b/src/confluent_kafka/schema_registry/protobuf.py @@ -798,7 +798,7 @@ def __call__(self, data: bytes, ctx: Optional[SerializationContext] = None) -> O msg_index = schema_id.message_indexes if self._registry is not None: - writer_schema_raw = self._get_schema(schema_id, subject, fmt='serialized') + writer_schema_raw = self._get_writer_schema(schema_id, subject, fmt='serialized') fd_proto, pool = self._get_parsed_schema(writer_schema_raw) writer_schema = pool.FindFileByName(fd_proto.name) writer_desc = self._get_message_desc(pool, writer_schema, msg_index) diff --git a/src/confluent_kafka/schema_registry/serde.py b/src/confluent_kafka/schema_registry/serde.py index ade1d9784..28c808533 100644 --- a/src/confluent_kafka/schema_registry/serde.py +++ b/src/confluent_kafka/schema_registry/serde.py @@ -595,7 +595,7 @@ class BaseSerializer(BaseSerde, Serializer): class BaseDeserializer(BaseSerde, Deserializer): __slots__ = ['_schema_id_deserializer'] - def _get_schema(self, schema_id: SchemaId, subject: Optional[str] = None, + def _get_writer_schema(self, schema_id: SchemaId, subject: Optional[str] = None, fmt: Optional[str] = None) -> Schema: if schema_id.id is not None: return self._registry.get_schema(schema_id.id, subject, fmt) From 9e1b81705d49a6e30d8a15be34c387cf95d32504 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Sat, 3 May 2025 15:19:19 -0700 Subject: [PATCH 07/12] Minor fix --- .../schema_registry/mock_schema_registry_client.py | 6 +++--- .../schema_registry/schema_registry_client.py | 7 ++++--- src/confluent_kafka/schema_registry/serde.py | 2 +- tests/schema_registry/test_proto.py | 1 - 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/confluent_kafka/schema_registry/mock_schema_registry_client.py b/src/confluent_kafka/schema_registry/mock_schema_registry_client.py index d5ca01da5..19008a640 100644 --- a/src/confluent_kafka/schema_registry/mock_schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/mock_schema_registry_client.py @@ -15,7 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # - +import uuid from collections import defaultdict from threading import Lock from typing import List, Dict, Optional @@ -169,8 +169,8 @@ def register_schema_full_response( latest_version = 1 if latest_schema is None else latest_schema.version + 1 registered_schema = RegisteredSchema( - schema_id=0, - guid=None, + schema_id=1, + guid=str(uuid.uuid4()), schema=schema, subject=subject_name, version=latest_version diff --git a/src/confluent_kafka/schema_registry/schema_registry_client.py b/src/confluent_kafka/schema_registry/schema_registry_client.py index 26f99aa81..0eefab548 100644 --- a/src/confluent_kafka/schema_registry/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/schema_registry_client.py @@ -526,7 +526,7 @@ def __init__(self): self.rs_version_index = defaultdict(dict) self.rs_schema_index = defaultdict(dict) - def set_schema(self, subject: Optional[str], schema_id: int, guid: Optional[str], schema: 'Schema'): + def set_schema(self, subject: Optional[str], schema_id: Optional[int], guid: Optional[str], schema: 'Schema'): """ Add a Schema identified by schema_id to the cache. @@ -541,8 +541,9 @@ def set_schema(self, subject: Optional[str], schema_id: int, guid: Optional[str] """ with self.lock: - self.schema_id_index[subject][schema_id] = (guid, schema) - self.schema_index[subject][schema] = schema_id + if schema_id is not None: + self.schema_id_index[subject][schema_id] = (guid, schema) + self.schema_index[subject][schema] = schema_id if guid is not None: self.schema_guid_index[guid] = schema diff --git a/src/confluent_kafka/schema_registry/serde.py b/src/confluent_kafka/schema_registry/serde.py index 28c808533..551169a9a 100644 --- a/src/confluent_kafka/schema_registry/serde.py +++ b/src/confluent_kafka/schema_registry/serde.py @@ -596,7 +596,7 @@ class BaseDeserializer(BaseSerde, Deserializer): __slots__ = ['_schema_id_deserializer'] def _get_writer_schema(self, schema_id: SchemaId, subject: Optional[str] = None, - fmt: Optional[str] = None) -> Schema: + fmt: Optional[str] = None) -> Schema: if schema_id.id is not None: return self._registry.get_schema(schema_id.id, subject, fmt) elif schema_id.guid is not None: diff --git a/tests/schema_registry/test_proto.py b/tests/schema_registry/test_proto.py index 7b92239c3..deb52b0e8 100644 --- a/tests/schema_registry/test_proto.py +++ b/tests/schema_registry/test_proto.py @@ -22,7 +22,6 @@ import pytest from confluent_kafka.schema_registry.protobuf import (ProtobufSerializer, - ProtobufDeserializer, _create_index_array, decimal_to_protobuf, protobuf_to_decimal) From 14b5107635cd2991e39e562430285c8210ba2745 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Sat, 3 May 2025 15:22:16 -0700 Subject: [PATCH 08/12] Minor fix --- .../schema_registry/schema_registry_client.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/confluent_kafka/schema_registry/schema_registry_client.py b/src/confluent_kafka/schema_registry/schema_registry_client.py index 0eefab548..2ff34e275 100644 --- a/src/confluent_kafka/schema_registry/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/schema_registry_client.py @@ -561,10 +561,12 @@ def set_registered_schema(self, schema: 'Schema', registered_schema: 'Registered guid = registered_schema.guid version = registered_schema.version with self.lock: - self.schema_id_index[subject][schema_id] = (guid, schema) - self.schema_guid_index[guid] = schema - self.schema_index[subject][schema] = schema_id - self.rs_id_index[subject][schema_id] = registered_schema + if schema_id is not None: + self.schema_id_index[subject][schema_id] = (guid, schema) + self.schema_index[subject][schema] = schema_id + self.rs_id_index[subject][schema_id] = registered_schema + if guid is not None: + self.schema_guid_index[guid] = schema self.rs_version_index[subject][version] = registered_schema self.rs_schema_index[subject][schema] = registered_schema From 868c48cba42c3a67ada50bdca3ac74eb83793445 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Sat, 3 May 2025 15:43:38 -0700 Subject: [PATCH 09/12] Add tests --- .../schema_registry/__init__.py | 2 +- src/confluent_kafka/schema_registry/serde.py | 2 +- tests/schema_registry/test_avro_serdes.py | 36 +++++++++++++++- tests/schema_registry/test_json_serdes.py | 42 ++++++++++++++++++- tests/schema_registry/test_proto_serdes.py | 29 ++++++++++++- 5 files changed, 106 insertions(+), 5 deletions(-) diff --git a/src/confluent_kafka/schema_registry/__init__.py b/src/confluent_kafka/schema_registry/__init__.py index 268ac1673..2d81f44ac 100644 --- a/src/confluent_kafka/schema_registry/__init__.py +++ b/src/confluent_kafka/schema_registry/__init__.py @@ -195,7 +195,7 @@ def dual_schema_id_deserializer(payload: bytes, ctx, schema_id) -> io.BytesIO: elif isinstance(headers, dict): header_value = headers.get(header_key, None) if header_value is not None: - schema_id.from_bytes(header_value) + schema_id.from_bytes(io.BytesIO(header_value)) return io.BytesIO(payload) return schema_id.from_bytes(io.BytesIO(payload)) diff --git a/src/confluent_kafka/schema_registry/serde.py b/src/confluent_kafka/schema_registry/serde.py index 551169a9a..6de96e815 100644 --- a/src/confluent_kafka/schema_registry/serde.py +++ b/src/confluent_kafka/schema_registry/serde.py @@ -600,7 +600,7 @@ def _get_writer_schema(self, schema_id: SchemaId, subject: Optional[str] = None, if schema_id.id is not None: return self._registry.get_schema(schema_id.id, subject, fmt) elif schema_id.guid is not None: - return self._registry.get_schema_by_guid(schema_id.guid, fmt) + return self._registry.get_schema_by_guid(str(schema_id.guid), fmt) else: raise SerializationError("Schema ID or GUID is not set") diff --git a/tests/schema_registry/test_avro_serdes.py b/tests/schema_registry/test_avro_serdes.py index b3b62b0f6..e792e5660 100644 --- a/tests/schema_registry/test_avro_serdes.py +++ b/tests/schema_registry/test_avro_serdes.py @@ -25,7 +25,7 @@ from fastavro._logical_readers import UUID from confluent_kafka.schema_registry import SchemaRegistryClient, \ - Schema, Metadata, MetadataProperties + Schema, Metadata, MetadataProperties, header_schema_id_serializer from confluent_kafka.schema_registry.avro import AvroSerializer, \ AvroDeserializer from confluent_kafka.schema_registry.rules.cel.cel_executor import CelExecutor @@ -130,6 +130,40 @@ def test_avro_basic_serialization(): assert obj == obj2 +def test_avro_guid_in_header(): + conf = {'url': _BASE_URL} + client = SchemaRegistryClient.new_client(conf) + ser_conf = { + 'auto.register.schemas': True, + 'schema.id.serializer': header_schema_id_serializer + } + obj = { + 'intField': 123, + 'doubleField': 45.67, + 'stringField': 'hi', + 'booleanField': True, + 'bytesField': b'foobar', + } + schema = { + 'type': 'record', + 'name': 'test', + 'fields': [ + {'name': 'intField', 'type': 'int'}, + {'name': 'doubleField', 'type': 'double'}, + {'name': 'stringField', 'type': 'string'}, + {'name': 'booleanField', 'type': 'boolean'}, + {'name': 'bytesField', 'type': 'bytes'}, + ] + } + ser = AvroSerializer(client, schema_str=json.dumps(schema), conf=ser_conf) + ser_ctx = SerializationContext(_TOPIC, MessageField.VALUE, {}) + obj_bytes = ser(obj, ser_ctx) + + deser = AvroDeserializer(client) + obj2 = deser(obj_bytes, ser_ctx) + assert obj == obj2 + + def test_avro_serialize_use_schema_id(): conf = {'url': _BASE_URL} client = SchemaRegistryClient.new_client(conf) diff --git a/tests/schema_registry/test_json_serdes.py b/tests/schema_registry/test_json_serdes.py index 5f6226af3..59f53fc87 100644 --- a/tests/schema_registry/test_json_serdes.py +++ b/tests/schema_registry/test_json_serdes.py @@ -21,7 +21,7 @@ import pytest from confluent_kafka.schema_registry import SchemaRegistryClient, \ - Schema, Metadata, MetadataProperties + Schema, Metadata, MetadataProperties, header_schema_id_serializer from confluent_kafka.schema_registry.json_schema import JSONSerializer, \ JSONDeserializer from confluent_kafka.schema_registry.rules.cel.cel_executor import CelExecutor @@ -121,6 +121,46 @@ def test_json_basic_serialization(): assert obj == obj2 +def test_json_guid_in_header(): + conf = {'url': _BASE_URL} + client = SchemaRegistryClient.new_client(conf) + ser_conf = { + 'auto.register.schemas': True, + 'schema.id.serializer': header_schema_id_serializer + } + obj = { + 'intField': 123, + 'doubleField': 45.67, + 'stringField': 'hi', + 'booleanField': True, + 'bytesField': base64.b64encode(b'foobar').decode('utf-8'), + } + schema = { + "type": "object", + "properties": { + "intField": {"type": "integer"}, + "doubleField": {"type": "number"}, + "stringField": { + "type": "string", + "confluent:tags": ["PII"] + }, + "booleanField": {"type": "boolean"}, + "bytesField": { + "type": "string", + "contentEncoding": "base64", + "confluent:tags": ["PII"] + } + } + } + ser = JSONSerializer(json.dumps(schema), client, conf=ser_conf) + ser_ctx = SerializationContext(_TOPIC, MessageField.VALUE, {}) + obj_bytes = ser(obj, ser_ctx) + + deser = JSONDeserializer(None, schema_registry_client=client) + obj2 = deser(obj_bytes, ser_ctx) + assert obj == obj2 + + def test_json_basic_deserialization_no_client(): conf = {'url': _BASE_URL} client = SchemaRegistryClient.new_client(conf) diff --git a/tests/schema_registry/test_proto_serdes.py b/tests/schema_registry/test_proto_serdes.py index 735bb1693..694a05304 100644 --- a/tests/schema_registry/test_proto_serdes.py +++ b/tests/schema_registry/test_proto_serdes.py @@ -20,7 +20,7 @@ import pytest from confluent_kafka.schema_registry import SchemaRegistryClient, \ - Schema, Metadata, MetadataProperties + Schema, Metadata, MetadataProperties, header_schema_id_serializer from confluent_kafka.schema_registry.protobuf import ProtobufSerializer, \ ProtobufDeserializer, _schema_to_str from confluent_kafka.schema_registry.rules.cel.cel_executor import CelExecutor @@ -120,6 +120,33 @@ def test_proto_basic_serialization(): assert obj == obj2 +def test_proto_guid_in_header(): + conf = {'url': _BASE_URL} + client = SchemaRegistryClient.new_client(conf) + ser_conf = { + 'auto.register.schemas': True, + 'use.deprecated.format': False, + 'schema.id.serializer': header_schema_id_serializer + } + obj = example_pb2.Author( + name='Kafka', + id=123, + picture=b'foobar', + works=['The Castle', 'TheTrial'], + oneof_string='oneof' + ) + ser = ProtobufSerializer(example_pb2.Author, client, conf=ser_conf) + ser_ctx = SerializationContext(_TOPIC, MessageField.VALUE, {}) + obj_bytes = ser(obj, ser_ctx) + + deser_conf = { + 'use.deprecated.format': False + } + deser = ProtobufDeserializer(example_pb2.Author, deser_conf, client) + obj2 = deser(obj_bytes, ser_ctx) + assert obj == obj2 + + def test_proto_basic_deserialization_no_client(): conf = {'url': _BASE_URL} client = SchemaRegistryClient.new_client(conf) From 1a3b8ba7dc34254ac93e4b8298de443e2a3f2313 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Sat, 3 May 2025 21:02:49 -0700 Subject: [PATCH 10/12] Fix test --- src/confluent_kafka/schema_registry/protobuf.py | 17 ----------------- .../schema_registry/test_proto_serializers.py | 2 +- tests/schema_registry/test_proto_serdes.py | 2 -- 3 files changed, 1 insertion(+), 20 deletions(-) diff --git a/src/confluent_kafka/schema_registry/protobuf.py b/src/confluent_kafka/schema_registry/protobuf.py index 026d0394e..398393e58 100644 --- a/src/confluent_kafka/schema_registry/protobuf.py +++ b/src/confluent_kafka/schema_registry/protobuf.py @@ -394,13 +394,6 @@ def __init__( ): super().__init__() - if conf is None or 'use.deprecated.format' not in conf: - raise RuntimeError( - "ProtobufSerializer: the 'use.deprecated.format' configuration " - "property must be explicitly set due to backward incompatibility " - "with older confluent-kafka-python Protobuf producers and consumers. " - "See the release notes for more details") - conf_copy = self._default_conf.copy() if conf is not None: conf_copy.update(conf) @@ -714,16 +707,6 @@ def __init__( self._parsed_schemas = ParsedSchemaCache() self._use_schema_id = None - # Require use.deprecated.format to be explicitly configured - # during a transitionary period since old/new format are - # incompatible. - if conf is None or 'use.deprecated.format' not in conf: - raise RuntimeError( - "ProtobufDeserializer: the 'use.deprecated.format' configuration " - "property must be explicitly set due to backward incompatibility " - "with older confluent-kafka-python Protobuf producers and consumers. " - "See the release notes for more details") - conf_copy = self._default_conf.copy() if conf is not None: conf_copy.update(conf) diff --git a/tests/integration/schema_registry/test_proto_serializers.py b/tests/integration/schema_registry/test_proto_serializers.py index 16de4ea6b..c958bb533 100644 --- a/tests/integration/schema_registry/test_proto_serializers.py +++ b/tests/integration/schema_registry/test_proto_serializers.py @@ -92,7 +92,7 @@ def test_protobuf_reference_registration(kafka_cluster, pb2, expected_refs): producer.produce(topic, key=pb2(), partition=0) producer.flush() - registered_refs = sr.get_schema(serializer._schema_id).references + registered_refs = sr.get_schema(serializer._schema_id.id).references assert expected_refs.sort() == [ref.name for ref in registered_refs].sort() diff --git a/tests/schema_registry/test_proto_serdes.py b/tests/schema_registry/test_proto_serdes.py index 694a05304..26298c38a 100644 --- a/tests/schema_registry/test_proto_serdes.py +++ b/tests/schema_registry/test_proto_serdes.py @@ -125,7 +125,6 @@ def test_proto_guid_in_header(): client = SchemaRegistryClient.new_client(conf) ser_conf = { 'auto.register.schemas': True, - 'use.deprecated.format': False, 'schema.id.serializer': header_schema_id_serializer } obj = example_pb2.Author( @@ -140,7 +139,6 @@ def test_proto_guid_in_header(): obj_bytes = ser(obj, ser_ctx) deser_conf = { - 'use.deprecated.format': False } deser = ProtobufDeserializer(example_pb2.Author, deser_conf, client) obj2 = deser(obj_bytes, ser_ctx) From 43467c44ca465baab65fb6a588bd9b4f3cca5fc3 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Sun, 4 May 2025 11:44:27 -0700 Subject: [PATCH 11/12] Fix schema lookup --- src/confluent_kafka/schema_registry/schema_registry_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/confluent_kafka/schema_registry/schema_registry_client.py b/src/confluent_kafka/schema_registry/schema_registry_client.py index 2ff34e275..36bc622e4 100644 --- a/src/confluent_kafka/schema_registry/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/schema_registry_client.py @@ -883,7 +883,7 @@ def register_schema_full_response( # The registered schema may not be fully populated self._cache.set_schema(subject_name, registered_schema.schema_id, - registered_schema.guid, registered_schema.schema) + registered_schema.guid, schema) return registered_schema From a6198244c356e5f8b4665725d5d1f3cdd4255313 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Sun, 4 May 2025 19:21:04 -0700 Subject: [PATCH 12/12] Minor fix --- src/confluent_kafka/schema_registry/schema_registry_client.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/confluent_kafka/schema_registry/schema_registry_client.py b/src/confluent_kafka/schema_registry/schema_registry_client.py index 36bc622e4..6c27d1564 100644 --- a/src/confluent_kafka/schema_registry/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/schema_registry_client.py @@ -882,8 +882,9 @@ def register_schema_full_response( registered_schema = RegisteredSchema.from_dict(response) # The registered schema may not be fully populated + s = registered_schema.schema if registered_schema.schema.schema_str is not None else schema self._cache.set_schema(subject_name, registered_schema.schema_id, - registered_schema.guid, schema) + registered_schema.guid, s) return registered_schema