Skip to content

Commit 76dfb6b

Browse files
authored
Support for schema id in header (#1978)
* DGS-20366 Support schema ID in header * Fix flake8 * Use id serdes * Minor cleanup * Add test * Minor renaming * Minor fix * Minor fix * Add tests * Fix test * Fix schema lookup * Minor fix
1 parent cc2656c commit 76dfb6b

16 files changed

+887
-382
lines changed

src/confluent_kafka/schema_registry/__init__.py

+102-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# See the License for the specific language governing permissions and
1616
# limitations under the License.
1717
#
18+
import io
1819
from typing import Optional
1920

2021
from .schema_registry_client import (
@@ -34,8 +35,14 @@
3435
SchemaReference,
3536
ServerConfig
3637
)
38+
from ..serialization import SerializationError, MessageField
39+
40+
_KEY_SCHEMA_ID = "__key_schema_id"
41+
_VALUE_SCHEMA_ID = "__value_schema_id"
3742

3843
_MAGIC_BYTE = 0
44+
_MAGIC_BYTE_V0 = _MAGIC_BYTE
45+
_MAGIC_BYTE_V1 = 1
3946

4047
__all__ = [
4148
"ConfigCompatibilityLevel",
@@ -55,7 +62,11 @@
5562
"ServerConfig",
5663
"topic_subject_name_strategy",
5764
"topic_record_subject_name_strategy",
58-
"record_subject_name_strategy"
65+
"record_subject_name_strategy",
66+
"header_schema_id_serializer",
67+
"prefix_schema_id_serializer",
68+
"dual_schema_id_deserializer",
69+
"prefix_schema_id_deserializer"
5970
]
6071

6172

@@ -113,3 +124,93 @@ def reference_subject_name_strategy(ctx, schema_ref: SchemaReference) -> Optiona
113124
114125
"""
115126
return schema_ref.name if schema_ref is not None else None
127+
128+
129+
def header_schema_id_serializer(payload: bytes, ctx, schema_id) -> bytes:
130+
"""
131+
Serializes the schema guid into the header.
132+
133+
Args:
134+
payload (bytes): The payload to serialize.
135+
ctx (SerializationContext): Metadata pertaining to the serialization
136+
operation.
137+
schema_id (SchemaId): The schema ID to serialize.
138+
139+
Returns:
140+
bytes: The payload
141+
"""
142+
headers = ctx.headers
143+
if headers is None:
144+
raise SerializationError("Missing headers")
145+
header_key = _KEY_SCHEMA_ID if ctx.field == MessageField.KEY else _VALUE_SCHEMA_ID
146+
header_value = schema_id.guid_to_bytes()
147+
if isinstance(headers, list):
148+
headers.append((header_key, header_value))
149+
elif isinstance(headers, dict):
150+
headers[header_key] = header_value
151+
else:
152+
raise SerializationError("Invalid headers type")
153+
return payload
154+
155+
156+
def prefix_schema_id_serializer(payload: bytes, ctx, schema_id) -> bytes:
157+
"""
158+
Serializes the schema id into the payload prefix.
159+
160+
Args:
161+
payload (bytes): The payload to serialize.
162+
ctx (SerializationContext): Metadata pertaining to the serialization
163+
operation.
164+
schema_id (SchemaId): The schema ID to serialize.
165+
166+
Returns:
167+
bytes: The payload prefixed with the schema id
168+
"""
169+
return schema_id.id_to_bytes() + payload
170+
171+
172+
def dual_schema_id_deserializer(payload: bytes, ctx, schema_id) -> io.BytesIO:
173+
"""
174+
Deserializes the schema id by first checking the header, then the payload prefix.
175+
176+
Args:
177+
payload (bytes): The payload to serialize.
178+
ctx (SerializationContext): Metadata pertaining to the serialization
179+
operation.
180+
schema_id (SchemaId): The schema ID to serialize.
181+
182+
Returns:
183+
bytes: The payload
184+
"""
185+
headers = ctx.headers
186+
header_key = _KEY_SCHEMA_ID if ctx.field == MessageField.KEY else _VALUE_SCHEMA_ID
187+
if headers is not None:
188+
header_value = None
189+
if isinstance(headers, list):
190+
# look for header_key in headers
191+
for header in headers:
192+
if header[0] == header_key:
193+
header_value = header[1]
194+
break
195+
elif isinstance(headers, dict):
196+
header_value = headers.get(header_key, None)
197+
if header_value is not None:
198+
schema_id.from_bytes(io.BytesIO(header_value))
199+
return io.BytesIO(payload)
200+
return schema_id.from_bytes(io.BytesIO(payload))
201+
202+
203+
def prefix_schema_id_deserializer(payload: bytes, ctx, schema_id) -> io.BytesIO:
204+
"""
205+
Deserializes the schema id from the payload prefix.
206+
207+
Args:
208+
payload (bytes): The payload to serialize.
209+
ctx (SerializationContext): Metadata pertaining to the serialization
210+
operation.
211+
schema_id (SchemaId): The schema ID to serialize.
212+
213+
Returns:
214+
bytes: The payload
215+
"""
216+
return schema_id.from_bytes(io.BytesIO(payload))

src/confluent_kafka/schema_registry/avro.py

+80-69
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
from copy import deepcopy
2222
from io import BytesIO
2323
from json import loads
24-
from struct import pack, unpack
2524
from typing import Dict, Union, Optional, Set, Callable
2625

2726
from fastavro import (schemaless_reader,
@@ -30,16 +29,18 @@
3029
validate)
3130
from fastavro.schema import load_schema
3231

33-
from . import (_MAGIC_BYTE,
34-
Schema,
32+
from . import (Schema,
3533
topic_subject_name_strategy,
3634
RuleMode,
37-
RuleKind, SchemaRegistryClient)
38-
from confluent_kafka.serialization import (SerializationError,
39-
SerializationContext)
35+
RuleKind, SchemaRegistryClient, prefix_schema_id_serializer,
36+
dual_schema_id_deserializer)
37+
from confluent_kafka.serialization import (SerializationContext)
4038
from .rule_registry import RuleRegistry
4139
from .serde import BaseSerializer, BaseDeserializer, RuleContext, FieldType, \
42-
FieldTransform, RuleConditionError, ParsedSchemaCache
40+
FieldTransform, RuleConditionError, ParsedSchemaCache, SchemaId
41+
42+
43+
AVRO_TYPE = "AVRO"
4344

4445

4546
AvroMessage = Union[
@@ -164,6 +165,12 @@ class AvroSerializer(BaseSerializer):
164165
| | | |
165166
| | | Defaults to topic_subject_name_strategy. |
166167
+-----------------------------+----------+--------------------------------------------------+
168+
| | | Callable(bytes, SerializationContext, schema_id) |
169+
| | | -> bytes |
170+
| | | |
171+
| ``schema.id.serializer`` | callable | Defines how the schema id/guid is serialized. |
172+
| | | Defaults to prefix_schema_id_serializer. |
173+
+-----------------------------+----------+--------------------------------------------------+
167174
168175
Schemas are registered against subject names in Confluent Schema Registry that
169176
define a scope in which the schemas can be evolved. By default, the subject name
@@ -223,7 +230,8 @@ class AvroSerializer(BaseSerializer):
223230
'use.schema.id': None,
224231
'use.latest.version': False,
225232
'use.latest.with.metadata': None,
226-
'subject.name.strategy': topic_subject_name_strategy}
233+
'subject.name.strategy': topic_subject_name_strategy,
234+
'schema.id.serializer': prefix_schema_id_serializer}
227235

228236
def __init__(
229237
self,
@@ -286,6 +294,10 @@ def __init__(
286294
if not callable(self._subject_name_func):
287295
raise ValueError("subject.name.strategy must be callable")
288296

297+
self._schema_id_serializer = conf_copy.pop('schema.id.serializer')
298+
if not callable(self._schema_id_serializer):
299+
raise ValueError("schema.id.serializer must be callable")
300+
289301
if len(conf_copy) > 0:
290302
raise ValueError("Unrecognized properties: {}"
291303
.format(", ".join(conf_copy.keys())))
@@ -345,19 +357,20 @@ def __call__(self, obj: object, ctx: Optional[SerializationContext] = None) -> O
345357
subject = self._subject_name_func(ctx, self._schema_name)
346358
latest_schema = self._get_reader_schema(subject)
347359
if latest_schema is not None:
348-
self._schema_id = latest_schema.schema_id
360+
self._schema_id = SchemaId(AVRO_TYPE, latest_schema.schema_id, latest_schema.guid)
349361
elif subject not in self._known_subjects:
350362
# Check to ensure this schema has been registered under subject_name.
351363
if self._auto_register:
352364
# The schema name will always be the same. We can't however register
353365
# a schema without a subject so we set the schema_id here to handle
354366
# the initial registration.
355-
self._schema_id = self._registry.register_schema(
367+
registered_schema = self._registry.register_schema_full_response(
356368
subject, self._schema, self._normalize_schemas)
369+
self._schema_id = SchemaId(AVRO_TYPE, registered_schema.schema_id, registered_schema.guid)
357370
else:
358371
registered_schema = self._registry.lookup_schema(
359372
subject, self._schema, self._normalize_schemas)
360-
self._schema_id = registered_schema.schema_id
373+
self._schema_id = SchemaId(AVRO_TYPE, registered_schema.schema_id, registered_schema.guid)
361374

362375
self._known_subjects.add(subject)
363376

@@ -377,12 +390,9 @@ def __call__(self, obj: object, ctx: Optional[SerializationContext] = None) -> O
377390
parsed_schema = self._parsed_schema
378391

379392
with _ContextStringIO() as fo:
380-
# Write the magic byte and schema ID in network byte order (big endian)
381-
fo.write(pack('>bI', _MAGIC_BYTE, self._schema_id))
382393
# write the record to the rest of the buffer
383394
schemaless_writer(fo, parsed_schema, value)
384-
385-
return fo.getvalue()
395+
return self._schema_id_serializer(fo.getvalue(), ctx, self._schema_id)
386396

387397
def _get_parsed_schema(self, schema: Schema) -> AvroSchema:
388398
parsed_schema = self._parsed_schemas.get_parsed_schema(schema)
@@ -425,6 +435,12 @@ class AvroDeserializer(BaseDeserializer):
425435
| | | |
426436
| | | Defaults to topic_subject_name_strategy. |
427437
+-----------------------------+----------+--------------------------------------------------+
438+
| | | Callable(bytes, SerializationContext, schema_id) |
439+
| | | -> io.BytesIO |
440+
| | | |
441+
| ``schema.id.deserializer`` | callable | Defines how the schema id/guid is deserialized. |
442+
| | | Defaults to dual_schema_id_deserializer. |
443+
+-----------------------------+----------+--------------------------------------------------+
428444
429445
Note:
430446
By default, Avro complex types are returned as dicts. This behavior can
@@ -462,7 +478,8 @@ class AvroDeserializer(BaseDeserializer):
462478

463479
_default_conf = {'use.latest.version': False,
464480
'use.latest.with.metadata': None,
465-
'subject.name.strategy': topic_subject_name_strategy}
481+
'subject.name.strategy': topic_subject_name_strategy,
482+
'schema.id.deserializer': dual_schema_id_deserializer}
466483

467484
def __init__(
468485
self,
@@ -507,6 +524,10 @@ def __init__(
507524
if not callable(self._subject_name_func):
508525
raise ValueError("subject.name.strategy must be callable")
509526

527+
self._schema_id_deserializer = conf_copy.pop('schema.id.deserializer')
528+
if not callable(self._schema_id_deserializer):
529+
raise ValueError("schema.id.deserializer must be callable")
530+
510531
if len(conf_copy) > 0:
511532
raise ValueError("Unrecognized properties: {}"
512533
.format(", ".join(conf_copy.keys())))
@@ -551,67 +572,57 @@ def __call__(self, data: bytes, ctx: Optional[SerializationContext] = None) -> U
551572
if data is None:
552573
return None
553574

554-
if len(data) <= 5:
555-
raise SerializationError("Expecting data framing of length 6 bytes or "
556-
"more but total data size is {} bytes. This "
557-
"message was not produced with a Confluent "
558-
"Schema Registry serializer".format(len(data)))
559-
560575
subject = self._subject_name_func(ctx, None) if ctx else None
561576
latest_schema = None
562577
if subject is not None:
563578
latest_schema = self._get_reader_schema(subject)
564579

565-
with _ContextStringIO(data) as payload:
566-
magic, schema_id = unpack('>bI', payload.read(5))
567-
if magic != _MAGIC_BYTE:
568-
raise SerializationError("Unexpected magic byte {}. This message "
569-
"was not produced with a Confluent "
570-
"Schema Registry serializer".format(magic))
571-
572-
writer_schema_raw = self._registry.get_schema(schema_id)
573-
writer_schema = self._get_parsed_schema(writer_schema_raw)
574-
575-
if subject is None:
576-
subject = self._subject_name_func(ctx, writer_schema.get("name")) if ctx else None
577-
if subject is not None:
578-
latest_schema = self._get_reader_schema(subject)
579-
580-
if latest_schema is not None:
581-
migrations = self._get_migrations(subject, writer_schema_raw, latest_schema, None)
582-
reader_schema_raw = latest_schema.schema
583-
reader_schema = self._get_parsed_schema(latest_schema.schema)
584-
elif self._schema is not None:
585-
migrations = None
586-
reader_schema_raw = self._schema
587-
reader_schema = self._reader_schema
588-
else:
589-
migrations = None
590-
reader_schema_raw = writer_schema_raw
591-
reader_schema = writer_schema
592-
593-
if migrations:
594-
obj_dict = schemaless_reader(payload,
595-
writer_schema,
596-
None,
597-
self._return_record_name)
598-
obj_dict = self._execute_migrations(ctx, subject, migrations, obj_dict)
599-
else:
600-
obj_dict = schemaless_reader(payload,
601-
writer_schema,
602-
reader_schema,
603-
self._return_record_name)
580+
schema_id = SchemaId(AVRO_TYPE)
581+
payload = self._schema_id_deserializer(data, ctx, schema_id)
582+
583+
writer_schema_raw = self._get_writer_schema(schema_id, subject)
584+
writer_schema = self._get_parsed_schema(writer_schema_raw)
585+
586+
if subject is None:
587+
subject = self._subject_name_func(ctx, writer_schema.get("name")) if ctx else None
588+
if subject is not None:
589+
latest_schema = self._get_reader_schema(subject)
590+
591+
if latest_schema is not None:
592+
migrations = self._get_migrations(subject, writer_schema_raw, latest_schema, None)
593+
reader_schema_raw = latest_schema.schema
594+
reader_schema = self._get_parsed_schema(latest_schema.schema)
595+
elif self._schema is not None:
596+
migrations = None
597+
reader_schema_raw = self._schema
598+
reader_schema = self._reader_schema
599+
else:
600+
migrations = None
601+
reader_schema_raw = writer_schema_raw
602+
reader_schema = writer_schema
603+
604+
if migrations:
605+
obj_dict = schemaless_reader(payload,
606+
writer_schema,
607+
None,
608+
self._return_record_name)
609+
obj_dict = self._execute_migrations(ctx, subject, migrations, obj_dict)
610+
else:
611+
obj_dict = schemaless_reader(payload,
612+
writer_schema,
613+
reader_schema,
614+
self._return_record_name)
604615

605-
field_transformer = lambda rule_ctx, field_transform, message: ( # noqa: E731
606-
transform(rule_ctx, reader_schema, message, field_transform))
607-
obj_dict = self._execute_rules(ctx, subject, RuleMode.READ, None,
608-
reader_schema_raw, obj_dict, get_inline_tags(reader_schema),
609-
field_transformer)
616+
field_transformer = lambda rule_ctx, field_transform, message: ( # noqa: E731
617+
transform(rule_ctx, reader_schema, message, field_transform))
618+
obj_dict = self._execute_rules(ctx, subject, RuleMode.READ, None,
619+
reader_schema_raw, obj_dict, get_inline_tags(reader_schema),
620+
field_transformer)
610621

611-
if self._from_dict is not None:
612-
return self._from_dict(obj_dict, ctx)
622+
if self._from_dict is not None:
623+
return self._from_dict(obj_dict, ctx)
613624

614-
return obj_dict
625+
return obj_dict
615626

616627
def _get_parsed_schema(self, schema: Schema) -> AvroSchema:
617628
parsed_schema = self._parsed_schemas.get_parsed_schema(schema)

0 commit comments

Comments
 (0)