Skip to content

Commit d3fe09b

Browse files
committed
Add docs for the changefeed kafka header option
1 parent 93476e3 commit d3fe09b

File tree

2 files changed

+84
-0
lines changed

2 files changed

+84
-0
lines changed

src/current/v25.2/changefeed-messages.md

+83
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,89 @@ CREATE CHANGEFEED INTO 'scheme://sink-URI' WITH updated AS SELECT column, column
531531

532532
For details on syntax and examples, refer to the [Change Data Capture Queries]({% link {{ page.version.version }}/cdc-queries.md %}) page.
533533

534+
### Specify a column as a Kafka header
535+
536+
{% include_cached new-in.html version="v25.2" %} Use the `headers_json_column_name` option to specify a [`JSONB`]({% link {{ page.version.version }}/jsonb.md %}) column that the changefeed will emit as a Kafka header for each row. You can send metadata, such as routing or tracing information, at the protocol level in the header separate from the message payload. This allows for Kafka brokers or routers to filter the metadata the header contains without deserializing the payload.
537+
538+
{{site.data.alerts.callout_info}}
539+
The `headers_json_column_name` option is supported with changefeeds emitting [JSON](#json) or [Avro](#avro) messages to [Kafka sinks]({% link {{ page.version.version }}/changefeed-sinks.md %}).
540+
{{site.data.alerts.end}}
541+
542+
For example, define a table that updates customer records. This schema includes a `route_info` column of type `JSONB`, which will be used to store operation type, schema version, and message origin metadata for the Kafka header:
543+
544+
{% include_cached copy-clipboard.html %}
545+
~~~ sql
546+
CREATE TABLE customer_updates (
547+
update_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
548+
customer_id UUID NOT NULL,
549+
operation_type STRING NOT NULL,
550+
change_version STRING NOT NULL,
551+
source_system STRING NOT NULL DEFAULT 'crdb_app',
552+
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
553+
change_description STRING,
554+
route_info JSONB
555+
);
556+
~~~
557+
558+
Insert example rows into the table, populating the `route_info` column with the `JSONB` data. The changefeed will emit this column as Kafka headers alongside the row changes:
559+
560+
{% include_cached copy-clipboard.html %}
561+
~~~ sql
562+
INSERT INTO customer_updates (
563+
customer_id, operation_type, change_version, source_system, change_description, route_info
564+
) VALUES
565+
(gen_random_uuid(), 'insert', 'v1', 'crm_web', 'New customer created', '{"operation": "insert", "version": "v1", "origin": "crm_web"}'),
566+
(gen_random_uuid(), 'update', 'v2', 'crm_mobile', 'Updated phone number', '{"operation": "update", "version": "v2", "origin": "crm_mobile"}'),
567+
(gen_random_uuid(), 'delete', 'v1', 'crm_web', 'Customer deleted due to inactivity', '{"operation": "delete", "version": "v1", "origin": "crm_web"}'),
568+
(gen_random_uuid(), 'update', 'v2', 'support_portal', 'Changed mailing address', '{"operation": "update", "version": "v2", "origin": "support_portal"}'),
569+
(gen_random_uuid(), 'insert', 'v1', 'api_batch', 'Bulk customer import', '{"operation": "insert", "version": "v1", "origin": "api_batch"}');
570+
~~~
571+
572+
Create a changefeed that emits messages from the `customer_updates` table to Kafka and define the `route_info` column in the `headers_json_column_name` option:
573+
574+
{% include_cached copy-clipboard.html %}
575+
~~~ sql
576+
CREATE CHANGEFEED FOR TABLE customer_updates INTO 'kafka://localhost:9092' WITH headers_json_column_name = 'route_info';
577+
~~~
578+
579+
The changefeed will emit each row’s `route_info` data as Kafka message headers, which Kafka brokers or stream processors can use to access the metadata without inspecting the payload.
580+
581+
The Kafka topic receives the message payload containing the row-level change:
582+
583+
~~~json
584+
{"after": {"change_description": "Updated phone number", "change_version": "v2", "customer_id": "5896dc90-a972-43e8-b69b-8b5a52691ce2", "operation_type": "update", "source_system": "crm_mobile", "update_id": "39a7bb4c-ee3b-4897-88fd-cfed94558e72", "updated_at": "2025-05-06T14:57:42.378814Z"}}
585+
{"after": {"change_description": "Bulk customer import", "change_version": "v1", "customer_id": "0fccf7a5-5f61-4d65-ad6b-55d8740a0874", "operation_type": "insert", "source_system": "api_batch", "update_id": "e058a098-89ba-48b0-a117-93b6b00c7eba", "updated_at": "2025-05-06T14:57:42.378814Z"}}
586+
{"after": {"change_description": "Changed mailing address", "change_version": "v2", "customer_id": "ccca656e-96b3-43b7-83ad-ee044627d67d", "operation_type": "update", "source_system": "support_portal", "update_id": "d0f730ce-d269-4bd5-9d2d-07c6fe65fc51", "updated_at": "2025-05-06T14:57:42.378814Z"}}
587+
. . .
588+
~~~
589+
590+
You can query the `route_info` column to view the metadata that will be present in the Kafka headers for each corresponding message:
591+
592+
{% include_cached copy-clipboard.html %}
593+
~~~ sql
594+
SELECT
595+
update_id,
596+
route_info->>'operation' AS operation,
597+
route_info->>'version' AS version,
598+
route_info->>'origin' AS origin
599+
FROM customer_updates;
600+
~~~
601+
~~~
602+
update_id | kafka_header_operation | kafka_header_version | kafka_header_origin
603+
---------------------------------------+------------------------+----------------------+----------------------
604+
3366969e-4f9f-472d-9e07-ac65e12b1e19 | delete | v1 | crm_web
605+
39a7bb4c-ee3b-4897-88fd-cfed94558e72 | update | v2 | crm_mobile
606+
b9cd25ab-e44c-4bbc-a806-7cf701ad131e | insert | v1 | crm_web
607+
d0f730ce-d269-4bd5-9d2d-07c6fe65fc51 | update | v2 | support_portal
608+
e058a098-89ba-48b0-a117-93b6b00c7eba | insert | v1 | api_batch
609+
(5 rows)
610+
~~~
611+
612+
You may need to duplicate fields between the message envelope and headers to support efficient routing and filtering by intermediate systems, such as Kafka brokers, stream processors, or observability tools, but still maintain the full context of the change in the message for downstream applications.
613+
614+
If you would like to filter the table columns that a changefeed emits, refer to the [CDC Queries]({% link {{ page.version.version }}/cdc-queries.md %}) page. Or, if you would like to customize the message envelope, refer to the [Changefeed Message Envelope](#message-envelopes) page.
615+
{% comment %}update message envelope link to the new page once PR #19542 is merged{% endcomment %}
616+
534617
## Message formats
535618

536619
{% include {{ page.version.version }}/cdc/message-format-list.md %}

src/current/v25.2/create-changefeed.md

+1
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ Option | Value | Description
122122
<a name="format"></a>`format` | `json` / `avro` / `csv` / `parquet` | Format of the emitted message. <br><br>`avro`: For mappings of CockroachDB types to Avro types, [refer-to-the-table]({% link {{ page.version.version }}/changefeed-messages.md %}#avro-types) and detail on [Avro-limitations]({% link {{ page.version.version }}/changefeed-messages.md %}#avro-limitations). **Note:** [`confluent_schema_registry`](#confluent-schema-registry) is required with `format=avro`. <br><br>`csv`: You cannot combine `format=csv` with the [`diff`](#diff) or [`resolved`](#resolved) options. Changefeeds use the same CSV format as the [`EXPORT`](export.html) statement. Refer to [Export-data-with-changefeeds]({% link {{ page.version.version }}/export-data-with-changefeeds.md %}) for details using these options to create a changefeed as an alternative to `EXPORT`. **Note:** [`initial_scan = 'only'`](#initial-scan) is required with `format=csv`. <br><br>`parquet`: Cloud storage is the only supported sink. The [`topic_in_value`](#topic-in-value) option is not compatible with `parquet` format.<br><br>Default: `format=json`.
123123
<a name="full-table-name"></a>`full_table_name` | N/A | Use fully qualified table name in topics, subjects, schemas, and record output instead of the default table name. This can prevent unintended behavior when the same table name is present in multiple databases.<br><br>**Note:** This option cannot modify existing table names used as topics, subjects, etc., as part of an [`ALTER CHANGEFEED`]({% link {{ page.version.version }}/alter-changefeed.md %}) statement. To modify a topic, subject, etc., to use a fully qualified table name, create a new changefeed with this option. <br><br>Example: `CREATE CHANGEFEED FOR foo... WITH full_table_name` will create the topic name `defaultdb.public.foo` instead of `foo`.
124124
<a name="gc-protect-expires-after"></a>`gc_protect_expires_after` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Automatically expires protected timestamp records that are older than the defined duration. In the case where a changefeed job remains paused, `gc_protect_expires_after` will trigger the underlying protected timestamp record to expire and cancel the changefeed job to prevent accumulation of protected data.<br><br>Refer to [Protect-Changefeed-Data-from-Garbage-Collection]({% link {{ page.version.version }}/protect-changefeed-data.md %}) for more detail on protecting changefeed data.
125+
<a name="headers-json-column-name"></a><span class="version-tag">New in v25.2:</span>`headers_json_column_name` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Specify a [`JSONB`]({% link {{ page.version.version }}/jsonb.md %}) column that the changefeed will emit as a Kafka header for each row. Supported for JSON and Avro message format emitting to Kafka sinks. For more details, refer to [Specify a column as a Kafka header]({% link {{ page.version.version }}/changefeed-messages.md %}#specify-a-column-as-a-kafka-header).
125126
<a name="ignore-disable-changefeed-replication"></a>`ignore_disable_changefeed_replication` | [`BOOL`]({% link {{ page.version.version }}/bool.md %}) | When set to `true`, the changefeed **will emit** events even if CDC filtering for TTL jobs is configured using the `disable_changefeed_replication` [session variable]({% link {{ page.version.version }}/set-vars.md %}), `sql.ttl.changefeed_replication.disabled` [cluster setting]({% link {{ page.version.version }}/cluster-settings.md %}), or the `ttl_disable_changefeed_replication` [table storage parameter]({% link {{ page.version.version }}/row-level-ttl.md %}).<br><br>Refer to [Filter changefeeds for tables using TTL](#filter-changefeeds-for-tables-using-row-level-ttl) for usage details.
126127
<a name="initial-scan"></a>`initial_scan` | `yes`/`no`/`only` | Control whether or not an initial scan will occur at the start time of a changefeed. Only one `initial_scan` option (`yes`, `no`, or `only`) can be used. If none of these are set, an initial scan will occur if there is no [`cursor`](#cursor), and will not occur if there is one. This preserves the behavior from previous releases. With `initial_scan = 'only'` set, the changefeed job will end with a successful status (`succeeded`) after the initial scan completes. You cannot specify `yes`, `no`, `only` simultaneously. <br><br>If used in conjunction with `cursor`, an initial scan will be performed at the cursor timestamp. If no `cursor` is specified, the initial scan is performed at `now()`. <br><br>Although the [`initial_scan` / `no_initial_scan`]({% link {{ page.version.version }}/create-changefeed.md %}#initial-scan) syntax from previous versions is still supported, you cannot combine the previous and current syntax.<br><br>Default: `initial_scan = 'yes'`
127128
<a name="kafka-sink-config"></a>`kafka_sink_config` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Set fields to configure the required level of message acknowledgement from the Kafka server, the version of the server, and batching parameters for Kafka sinks. Set the message file compression type. See [Kafka sink configuration]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka-sink-configuration) for more detail on configuring all the available fields for this option. <br><br>Example: `CREATE CHANGEFEED FOR table INTO 'kafka://localhost:9092' WITH kafka_sink_config='{"Flush": {"MaxMessages": 1, "Frequency": "1s"}, "RequiredAcks": "ONE"}'`

0 commit comments

Comments
 (0)