From de127b86a92d2a0f5d4b3cf7a991fdaa2cfb6e48 Mon Sep 17 00:00:00 2001 From: katmayb Date: Tue, 6 May 2025 13:42:52 -0400 Subject: [PATCH 1/2] Add docs for the changefeed kafka header option --- src/current/v25.2/changefeed-messages.md | 83 ++++++++++++++++++++++++ src/current/v25.2/create-changefeed.md | 1 + 2 files changed, 84 insertions(+) diff --git a/src/current/v25.2/changefeed-messages.md b/src/current/v25.2/changefeed-messages.md index fcef98d0c3e..37509fe3174 100644 --- a/src/current/v25.2/changefeed-messages.md +++ b/src/current/v25.2/changefeed-messages.md @@ -531,6 +531,89 @@ CREATE CHANGEFEED INTO 'scheme://sink-URI' WITH updated AS SELECT column, column For details on syntax and examples, refer to the [Change Data Capture Queries]({% link {{ page.version.version }}/cdc-queries.md %}) page. +### Specify a column as a Kafka header + +{% include_cached new-in.html version="v25.2" %} Use the `headers_json_column_name` option to specify a [`JSONB`]({% link {{ page.version.version }}/jsonb.md %}) column that the changefeed will emit as a Kafka header for each row. You can send metadata, such as routing or tracing information, at the protocol level in the header separate from the message payload. This allows for Kafka brokers or routers to filter the metadata the header contains without deserializing the payload. + +{{site.data.alerts.callout_info}} +The `headers_json_column_name` option is supported with changefeeds emitting [JSON](#json) or [Avro](#avro) messages to [Kafka sinks]({% link {{ page.version.version }}/changefeed-sinks.md %}). +{{site.data.alerts.end}} + +For example, define a table that updates customer records. This schema includes a `route_info` column of type `JSONB`, which will be used to store operation type, schema version, and message origin metadata for the Kafka header: + +{% include_cached copy-clipboard.html %} +~~~ sql +CREATE TABLE customer_updates ( + update_id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + customer_id UUID NOT NULL, + operation_type STRING NOT NULL, + change_version STRING NOT NULL, + source_system STRING NOT NULL DEFAULT 'crdb_app', + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + change_description STRING, + route_info JSONB +); +~~~ + +Insert example rows into the table, populating the `route_info` column with the `JSONB` data. The changefeed will emit this column as Kafka headers alongside the row changes: + +{% include_cached copy-clipboard.html %} +~~~ sql +INSERT INTO customer_updates ( + customer_id, operation_type, change_version, source_system, change_description, route_info +) VALUES +(gen_random_uuid(), 'insert', 'v1', 'crm_web', 'New customer created', '{"operation": "insert", "version": "v1", "origin": "crm_web"}'), +(gen_random_uuid(), 'update', 'v2', 'crm_mobile', 'Updated phone number', '{"operation": "update", "version": "v2", "origin": "crm_mobile"}'), +(gen_random_uuid(), 'delete', 'v1', 'crm_web', 'Customer deleted due to inactivity', '{"operation": "delete", "version": "v1", "origin": "crm_web"}'), +(gen_random_uuid(), 'update', 'v2', 'support_portal', 'Changed mailing address', '{"operation": "update", "version": "v2", "origin": "support_portal"}'), +(gen_random_uuid(), 'insert', 'v1', 'api_batch', 'Bulk customer import', '{"operation": "insert", "version": "v1", "origin": "api_batch"}'); +~~~ + +Create a changefeed that emits messages from the `customer_updates` table to Kafka and define the `route_info` column in the `headers_json_column_name` option: + +{% include_cached copy-clipboard.html %} +~~~ sql +CREATE CHANGEFEED FOR TABLE customer_updates INTO 'kafka://localhost:9092' WITH headers_json_column_name = 'route_info'; +~~~ + +The changefeed will emit each row’s `route_info` data as Kafka message headers, which Kafka brokers or stream processors can use to access the metadata without inspecting the payload. + +The Kafka topic receives the message payload containing the row-level change: + +~~~json +{"after": {"change_description": "Updated phone number", "change_version": "v2", "customer_id": "5896dc90-a972-43e8-b69b-8b5a52691ce2", "operation_type": "update", "source_system": "crm_mobile", "update_id": "39a7bb4c-ee3b-4897-88fd-cfed94558e72", "updated_at": "2025-05-06T14:57:42.378814Z"}} +{"after": {"change_description": "Bulk customer import", "change_version": "v1", "customer_id": "0fccf7a5-5f61-4d65-ad6b-55d8740a0874", "operation_type": "insert", "source_system": "api_batch", "update_id": "e058a098-89ba-48b0-a117-93b6b00c7eba", "updated_at": "2025-05-06T14:57:42.378814Z"}} +{"after": {"change_description": "Changed mailing address", "change_version": "v2", "customer_id": "ccca656e-96b3-43b7-83ad-ee044627d67d", "operation_type": "update", "source_system": "support_portal", "update_id": "d0f730ce-d269-4bd5-9d2d-07c6fe65fc51", "updated_at": "2025-05-06T14:57:42.378814Z"}} +. . . +~~~ + +You can query the `route_info` column to view the metadata that will be present in the Kafka headers for each corresponding message: + +{% include_cached copy-clipboard.html %} +~~~ sql +SELECT + update_id, + route_info->>'operation' AS operation, + route_info->>'version' AS version, + route_info->>'origin' AS origin +FROM customer_updates; +~~~ +~~~ + update_id | kafka_header_operation | kafka_header_version | kafka_header_origin +---------------------------------------+------------------------+----------------------+---------------------- + 3366969e-4f9f-472d-9e07-ac65e12b1e19 | delete | v1 | crm_web + 39a7bb4c-ee3b-4897-88fd-cfed94558e72 | update | v2 | crm_mobile + b9cd25ab-e44c-4bbc-a806-7cf701ad131e | insert | v1 | crm_web + d0f730ce-d269-4bd5-9d2d-07c6fe65fc51 | update | v2 | support_portal + e058a098-89ba-48b0-a117-93b6b00c7eba | insert | v1 | api_batch +(5 rows) +~~~ + +You may need to duplicate fields between the message envelope and headers to support efficient routing and filtering by intermediate systems, such as Kafka brokers, stream processors, or observability tools, but still maintain the full context of the change in the message for downstream applications. + +If you would like to filter the table columns that a changefeed emits, refer to the [CDC Queries]({% link {{ page.version.version }}/cdc-queries.md %}) page. Or, if you would like to customize the message envelope, refer to the [Changefeed Message Envelope](#message-envelopes) page. +{% comment %}update message envelope link to the new page once PR #19542 is merged{% endcomment %} + ## Message formats {% include {{ page.version.version }}/cdc/message-format-list.md %} diff --git a/src/current/v25.2/create-changefeed.md b/src/current/v25.2/create-changefeed.md index be5fb5dbb50..b1165a692fd 100644 --- a/src/current/v25.2/create-changefeed.md +++ b/src/current/v25.2/create-changefeed.md @@ -122,6 +122,7 @@ Option | Value | Description `format` | `json` / `avro` / `csv` / `parquet` | Format of the emitted message.

`avro`: For mappings of CockroachDB types to Avro types, [refer-to-the-table]({% link {{ page.version.version }}/changefeed-messages.md %}#avro-types) and detail on [Avro-limitations]({% link {{ page.version.version }}/changefeed-messages.md %}#avro-limitations). **Note:** [`confluent_schema_registry`](#confluent-schema-registry) is required with `format=avro`.

`csv`: You cannot combine `format=csv` with the [`diff`](#diff) or [`resolved`](#resolved) options. Changefeeds use the same CSV format as the [`EXPORT`](export.html) statement. Refer to [Export-data-with-changefeeds]({% link {{ page.version.version }}/export-data-with-changefeeds.md %}) for details using these options to create a changefeed as an alternative to `EXPORT`. **Note:** [`initial_scan = 'only'`](#initial-scan) is required with `format=csv`.

`parquet`: Cloud storage is the only supported sink. The [`topic_in_value`](#topic-in-value) option is not compatible with `parquet` format.

Default: `format=json`. `full_table_name` | N/A | Use fully qualified table name in topics, subjects, schemas, and record output instead of the default table name. This can prevent unintended behavior when the same table name is present in multiple databases.

**Note:** This option cannot modify existing table names used as topics, subjects, etc., as part of an [`ALTER CHANGEFEED`]({% link {{ page.version.version }}/alter-changefeed.md %}) statement. To modify a topic, subject, etc., to use a fully qualified table name, create a new changefeed with this option.

Example: `CREATE CHANGEFEED FOR foo... WITH full_table_name` will create the topic name `defaultdb.public.foo` instead of `foo`. `gc_protect_expires_after` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Automatically expires protected timestamp records that are older than the defined duration. In the case where a changefeed job remains paused, `gc_protect_expires_after` will trigger the underlying protected timestamp record to expire and cancel the changefeed job to prevent accumulation of protected data.

Refer to [Protect-Changefeed-Data-from-Garbage-Collection]({% link {{ page.version.version }}/protect-changefeed-data.md %}) for more detail on protecting changefeed data. +New in v25.2:`headers_json_column_name` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Specify a [`JSONB`]({% link {{ page.version.version }}/jsonb.md %}) column that the changefeed will emit as a Kafka header for each row. Supported for JSON and Avro message format emitting to Kafka sinks. For more details, refer to [Specify a column as a Kafka header]({% link {{ page.version.version }}/changefeed-messages.md %}#specify-a-column-as-a-kafka-header). `ignore_disable_changefeed_replication` | [`BOOL`]({% link {{ page.version.version }}/bool.md %}) | When set to `true`, the changefeed **will emit** events even if CDC filtering for TTL jobs is configured using the `disable_changefeed_replication` [session variable]({% link {{ page.version.version }}/set-vars.md %}), `sql.ttl.changefeed_replication.disabled` [cluster setting]({% link {{ page.version.version }}/cluster-settings.md %}), or the `ttl_disable_changefeed_replication` [table storage parameter]({% link {{ page.version.version }}/row-level-ttl.md %}).

Refer to [Filter changefeeds for tables using TTL](#filter-changefeeds-for-tables-using-row-level-ttl) for usage details. `initial_scan` | `yes`/`no`/`only` | Control whether or not an initial scan will occur at the start time of a changefeed. Only one `initial_scan` option (`yes`, `no`, or `only`) can be used. If none of these are set, an initial scan will occur if there is no [`cursor`](#cursor), and will not occur if there is one. This preserves the behavior from previous releases. With `initial_scan = 'only'` set, the changefeed job will end with a successful status (`succeeded`) after the initial scan completes. You cannot specify `yes`, `no`, `only` simultaneously.

If used in conjunction with `cursor`, an initial scan will be performed at the cursor timestamp. If no `cursor` is specified, the initial scan is performed at `now()`.

Although the [`initial_scan` / `no_initial_scan`]({% link {{ page.version.version }}/create-changefeed.md %}#initial-scan) syntax from previous versions is still supported, you cannot combine the previous and current syntax.

Default: `initial_scan = 'yes'` `kafka_sink_config` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Set fields to configure the required level of message acknowledgement from the Kafka server, the version of the server, and batching parameters for Kafka sinks. Set the message file compression type. See [Kafka sink configuration]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka-sink-configuration) for more detail on configuring all the available fields for this option.

Example: `CREATE CHANGEFEED FOR table INTO 'kafka://localhost:9092' WITH kafka_sink_config='{"Flush": {"MaxMessages": 1, "Frequency": "1s"}, "RequiredAcks": "ONE"}'` From 5dce98c5053764cc1d848251ebbf8b377b2e7077 Mon Sep 17 00:00:00 2001 From: katmayb Date: Fri, 9 May 2025 17:50:56 -0400 Subject: [PATCH 2/2] Feedback --- src/current/v25.2/changefeed-messages.md | 86 ++++++++++-------------- src/current/v25.2/create-changefeed.md | 2 +- 2 files changed, 37 insertions(+), 51 deletions(-) diff --git a/src/current/v25.2/changefeed-messages.md b/src/current/v25.2/changefeed-messages.md index 37509fe3174..7d96e356510 100644 --- a/src/current/v25.2/changefeed-messages.md +++ b/src/current/v25.2/changefeed-messages.md @@ -533,85 +533,71 @@ For details on syntax and examples, refer to the [Change Data Capture Queries]({ ### Specify a column as a Kafka header -{% include_cached new-in.html version="v25.2" %} Use the `headers_json_column_name` option to specify a [`JSONB`]({% link {{ page.version.version }}/jsonb.md %}) column that the changefeed will emit as a Kafka header for each row. You can send metadata, such as routing or tracing information, at the protocol level in the header separate from the message payload. This allows for Kafka brokers or routers to filter the metadata the header contains without deserializing the payload. +{% include_cached new-in.html version="v25.2" %} Use the `headers_json_column_name` option to specify a [JSONB]({% link {{ page.version.version }}/jsonb.md %}) column that the changefeed emits as Kafka headers for each row’s change event. You can send metadata, such as routing or tracing information, at the protocol level in the header, separate from the message payload. This allows for Kafka brokers or routers to filter the metadata the header contains without deserializing the payload. + +Headers enable efficient routing, filtering, and distributed tracing by intermediate systems, such as Kafka brokers, stream processors, or observability tools. {{site.data.alerts.callout_info}} -The `headers_json_column_name` option is supported with changefeeds emitting [JSON](#json) or [Avro](#avro) messages to [Kafka sinks]({% link {{ page.version.version }}/changefeed-sinks.md %}). +The `headers_json_column_name` option is supported with changefeeds emitting to [Kafka sinks]({% link {{ page.version.version }}/changefeed-sinks.md %}). {{site.data.alerts.end}} -For example, define a table that updates customer records. This schema includes a `route_info` column of type `JSONB`, which will be used to store operation type, schema version, and message origin metadata for the Kafka header: +For example, define a table that updates compliance events. This schema includes a `kafka_meta` column of type `JSONB`, used to store a trace ID and other metadata for the Kafka header: {% include_cached copy-clipboard.html %} ~~~ sql -CREATE TABLE customer_updates ( - update_id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - customer_id UUID NOT NULL, - operation_type STRING NOT NULL, - change_version STRING NOT NULL, - source_system STRING NOT NULL DEFAULT 'crdb_app', - updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), - change_description STRING, - route_info JSONB +CREATE TABLE compliance_events ( + event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + user_id UUID NOT NULL, + event_type STRING NOT NULL, + event_timestamp TIMESTAMPTZ NOT NULL DEFAULT now(), + details STRING, + kafka_meta JSONB ); ~~~ -Insert example rows into the table, populating the `route_info` column with the `JSONB` data. The changefeed will emit this column as Kafka headers alongside the row changes: +Insert example rows into the table, populating the `kafka_meta` column with the `JSONB` data. The changefeed will emit this column as Kafka headers alongside the row changes: {% include_cached copy-clipboard.html %} ~~~ sql -INSERT INTO customer_updates ( - customer_id, operation_type, change_version, source_system, change_description, route_info +INSERT INTO compliance_events ( + user_id, event_type, details, kafka_meta ) VALUES -(gen_random_uuid(), 'insert', 'v1', 'crm_web', 'New customer created', '{"operation": "insert", "version": "v1", "origin": "crm_web"}'), -(gen_random_uuid(), 'update', 'v2', 'crm_mobile', 'Updated phone number', '{"operation": "update", "version": "v2", "origin": "crm_mobile"}'), -(gen_random_uuid(), 'delete', 'v1', 'crm_web', 'Customer deleted due to inactivity', '{"operation": "delete", "version": "v1", "origin": "crm_web"}'), -(gen_random_uuid(), 'update', 'v2', 'support_portal', 'Changed mailing address', '{"operation": "update", "version": "v2", "origin": "support_portal"}'), -(gen_random_uuid(), 'insert', 'v1', 'api_batch', 'Bulk customer import', '{"operation": "insert", "version": "v1", "origin": "api_batch"}'); +(gen_random_uuid(), 'policy_ack', 'User accepted data policy v2.1', '{"trace_id": "abc123", "compliance_level": "low"}'), +(gen_random_uuid(), 'access_review', 'Admin approved elevated access for app A', '{"trace_id": "def456", "compliance_level": "high"}'), +(gen_random_uuid(), 'policy_ack', 'User accepted retention policy update', '{"trace_id": "ghi789", "compliance_level": "medium"}'), +(gen_random_uuid(), 'access_review', 'User confirmed access to sensitive dataset', '{"trace_id": "xyz123", "compliance_level": "high"}'), +(gen_random_uuid(), 'policy_ack', 'Policy v3.0 acknowledged by contractor', '{"trace_id": "mno456", "compliance_level": "low"}'); ~~~ -Create a changefeed that emits messages from the `customer_updates` table to Kafka and define the `route_info` column in the `headers_json_column_name` option: +Create a changefeed that emits messages from the `compliance_events` table to Kafka and specify the `kafka_meta` column using the `headers_json_column_name` option: {% include_cached copy-clipboard.html %} ~~~ sql -CREATE CHANGEFEED FOR TABLE customer_updates INTO 'kafka://localhost:9092' WITH headers_json_column_name = 'route_info'; +CREATE CHANGEFEED FOR TABLE compliance_events INTO 'kafka://localhost:9092' WITH headers_json_column_name = 'kafka_meta'; ~~~ -The changefeed will emit each row’s `route_info` data as Kafka message headers, which Kafka brokers or stream processors can use to access the metadata without inspecting the payload. +The changefeed will emit each row’s `kafka_meta` data as Kafka headers, which Kafka brokers or stream processors can use to access the metadata without inspecting the payload. -The Kafka topic receives the message payload containing the row-level change: +The Kafka topic receives the message payload with the row-level change, excluding the specified header column (`kafka_meta`): ~~~json -{"after": {"change_description": "Updated phone number", "change_version": "v2", "customer_id": "5896dc90-a972-43e8-b69b-8b5a52691ce2", "operation_type": "update", "source_system": "crm_mobile", "update_id": "39a7bb4c-ee3b-4897-88fd-cfed94558e72", "updated_at": "2025-05-06T14:57:42.378814Z"}} -{"after": {"change_description": "Bulk customer import", "change_version": "v1", "customer_id": "0fccf7a5-5f61-4d65-ad6b-55d8740a0874", "operation_type": "insert", "source_system": "api_batch", "update_id": "e058a098-89ba-48b0-a117-93b6b00c7eba", "updated_at": "2025-05-06T14:57:42.378814Z"}} -{"after": {"change_description": "Changed mailing address", "change_version": "v2", "customer_id": "ccca656e-96b3-43b7-83ad-ee044627d67d", "operation_type": "update", "source_system": "support_portal", "update_id": "d0f730ce-d269-4bd5-9d2d-07c6fe65fc51", "updated_at": "2025-05-06T14:57:42.378814Z"}} +{"after": {"details": "User accepted data policy v2.1", "event_id": "ee321dc6-388b-4416-a389-adfafab50ee4", "event_timestamp": "2025-05-09T21:20:29.203923Z", "event_type": "policy_ack", "user_id": "06ba6114-529c-4a99-9811-1dd3d12dad07"}} +{"after": {"details": "User accepted retention policy update", "event_id": "59d391f8-c141-4dc9-9622-9079c3462201", "event_timestamp": "2025-05-09T21:20:29.203923Z", "event_type": "policy_ack", "user_id": "98213553-9c1a-43a6-a598-921c3c6c3b20"}} +{"after": {"details": "Admin approved elevated access for app A", "event_id": "41cf0dbe-c0bc-48aa-9b60-ef343bcef9e1", "event_timestamp": "2025-05-09T21:20:29.203923Z", "event_type": "access_review", "user_id": "ed192798-f7ef-4fe8-a496-f22bb5738b04"}} . . . ~~~ -You can query the `route_info` column to view the metadata that will be present in the Kafka headers for each corresponding message: - -{% include_cached copy-clipboard.html %} -~~~ sql -SELECT - update_id, - route_info->>'operation' AS operation, - route_info->>'version' AS version, - route_info->>'origin' AS origin -FROM customer_updates; -~~~ -~~~ - update_id | kafka_header_operation | kafka_header_version | kafka_header_origin ----------------------------------------+------------------------+----------------------+---------------------- - 3366969e-4f9f-472d-9e07-ac65e12b1e19 | delete | v1 | crm_web - 39a7bb4c-ee3b-4897-88fd-cfed94558e72 | update | v2 | crm_mobile - b9cd25ab-e44c-4bbc-a806-7cf701ad131e | insert | v1 | crm_web - d0f730ce-d269-4bd5-9d2d-07c6fe65fc51 | update | v2 | support_portal - e058a098-89ba-48b0-a117-93b6b00c7eba | insert | v1 | api_batch -(5 rows) -~~~ +The Kafka headers will contain: -You may need to duplicate fields between the message envelope and headers to support efficient routing and filtering by intermediate systems, such as Kafka brokers, stream processors, or observability tools, but still maintain the full context of the change in the message for downstream applications. +Key (`event_id`) | Value (Kafka payload) | Headers +------------------+-----------------------+-------- +`3e2a9b4a-f1e3-4202-b343-1a52e1ffb0d4` | `{"event_type": "policy_ack", "details": "User accepted data policy v2.1"}` | `trace_id=abc123, compliance_level=low` +`7c90a289-2f91-4666-a8d5-962dc894e1c2` | `{"event_type": "access_review", "details": "Admin approved elevated access for app A"}` | `trace_id=def456, compliance_level=high` +`1a6e0d3f-7191-4d99-9a36-7f4b85e5cd23` | `{"event_type": "policy_ack", "details": "User accepted retention policy update"} `| `trace_id=ghi789, compliance_level=medium` +`89af6b6e-f34d-4a1d-a69d-91d29526e9f7` | `{"event_type": "access_review", "details": "User confirmed access to sensitive dataset"}` | `trace_id=xyz123, compliance_level=high` +`587cf30d-3f17-4942-8a01-f110ef8a5ae3` | `{"event_type": "policy_ack", "details": "Policy v3.0 acknowledged by contractor"}` | `trace_id=mno456, compliance_level=low` -If you would like to filter the table columns that a changefeed emits, refer to the [CDC Queries]({% link {{ page.version.version }}/cdc-queries.md %}) page. Or, if you would like to customize the message envelope, refer to the [Changefeed Message Envelope](#message-envelopes) page. +If you would like to filter the table columns that a changefeed emits, refer to the [CDC Queries]({% link {{ page.version.version }}/cdc-queries.md %}) page. To customize the message envelope, refer to the [Changefeed Message Envelope](#message-envelopes) page. {% comment %}update message envelope link to the new page once PR #19542 is merged{% endcomment %} ## Message formats diff --git a/src/current/v25.2/create-changefeed.md b/src/current/v25.2/create-changefeed.md index b1165a692fd..91bd7678e68 100644 --- a/src/current/v25.2/create-changefeed.md +++ b/src/current/v25.2/create-changefeed.md @@ -122,7 +122,7 @@ Option | Value | Description `format` | `json` / `avro` / `csv` / `parquet` | Format of the emitted message.

`avro`: For mappings of CockroachDB types to Avro types, [refer-to-the-table]({% link {{ page.version.version }}/changefeed-messages.md %}#avro-types) and detail on [Avro-limitations]({% link {{ page.version.version }}/changefeed-messages.md %}#avro-limitations). **Note:** [`confluent_schema_registry`](#confluent-schema-registry) is required with `format=avro`.

`csv`: You cannot combine `format=csv` with the [`diff`](#diff) or [`resolved`](#resolved) options. Changefeeds use the same CSV format as the [`EXPORT`](export.html) statement. Refer to [Export-data-with-changefeeds]({% link {{ page.version.version }}/export-data-with-changefeeds.md %}) for details using these options to create a changefeed as an alternative to `EXPORT`. **Note:** [`initial_scan = 'only'`](#initial-scan) is required with `format=csv`.

`parquet`: Cloud storage is the only supported sink. The [`topic_in_value`](#topic-in-value) option is not compatible with `parquet` format.

Default: `format=json`. `full_table_name` | N/A | Use fully qualified table name in topics, subjects, schemas, and record output instead of the default table name. This can prevent unintended behavior when the same table name is present in multiple databases.

**Note:** This option cannot modify existing table names used as topics, subjects, etc., as part of an [`ALTER CHANGEFEED`]({% link {{ page.version.version }}/alter-changefeed.md %}) statement. To modify a topic, subject, etc., to use a fully qualified table name, create a new changefeed with this option.

Example: `CREATE CHANGEFEED FOR foo... WITH full_table_name` will create the topic name `defaultdb.public.foo` instead of `foo`. `gc_protect_expires_after` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Automatically expires protected timestamp records that are older than the defined duration. In the case where a changefeed job remains paused, `gc_protect_expires_after` will trigger the underlying protected timestamp record to expire and cancel the changefeed job to prevent accumulation of protected data.

Refer to [Protect-Changefeed-Data-from-Garbage-Collection]({% link {{ page.version.version }}/protect-changefeed-data.md %}) for more detail on protecting changefeed data. -New in v25.2:`headers_json_column_name` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Specify a [`JSONB`]({% link {{ page.version.version }}/jsonb.md %}) column that the changefeed will emit as a Kafka header for each row. Supported for JSON and Avro message format emitting to Kafka sinks. For more details, refer to [Specify a column as a Kafka header]({% link {{ page.version.version }}/changefeed-messages.md %}#specify-a-column-as-a-kafka-header). +New in v25.2:`headers_json_column_name` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Specify a [JSONB]({% link {{ page.version.version }}/jsonb.md %}) column that the changefeed emits as Kafka headers, separate from the message payload, for each row’s change event. `headers_json_column_name` is supported for Kafka sinks. For more details, refer to [Specify a column as a Kafka header]({% link {{ page.version.version }}/changefeed-messages.md %}#specify-a-column-as-a-kafka-header). `ignore_disable_changefeed_replication` | [`BOOL`]({% link {{ page.version.version }}/bool.md %}) | When set to `true`, the changefeed **will emit** events even if CDC filtering for TTL jobs is configured using the `disable_changefeed_replication` [session variable]({% link {{ page.version.version }}/set-vars.md %}), `sql.ttl.changefeed_replication.disabled` [cluster setting]({% link {{ page.version.version }}/cluster-settings.md %}), or the `ttl_disable_changefeed_replication` [table storage parameter]({% link {{ page.version.version }}/row-level-ttl.md %}).

Refer to [Filter changefeeds for tables using TTL](#filter-changefeeds-for-tables-using-row-level-ttl) for usage details. `initial_scan` | `yes`/`no`/`only` | Control whether or not an initial scan will occur at the start time of a changefeed. Only one `initial_scan` option (`yes`, `no`, or `only`) can be used. If none of these are set, an initial scan will occur if there is no [`cursor`](#cursor), and will not occur if there is one. This preserves the behavior from previous releases. With `initial_scan = 'only'` set, the changefeed job will end with a successful status (`succeeded`) after the initial scan completes. You cannot specify `yes`, `no`, `only` simultaneously.

If used in conjunction with `cursor`, an initial scan will be performed at the cursor timestamp. If no `cursor` is specified, the initial scan is performed at `now()`.

Although the [`initial_scan` / `no_initial_scan`]({% link {{ page.version.version }}/create-changefeed.md %}#initial-scan) syntax from previous versions is still supported, you cannot combine the previous and current syntax.

Default: `initial_scan = 'yes'` `kafka_sink_config` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Set fields to configure the required level of message acknowledgement from the Kafka server, the version of the server, and batching parameters for Kafka sinks. Set the message file compression type. See [Kafka sink configuration]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka-sink-configuration) for more detail on configuring all the available fields for this option.

Example: `CREATE CHANGEFEED FOR table INTO 'kafka://localhost:9092' WITH kafka_sink_config='{"Flush": {"MaxMessages": 1, "Frequency": "1s"}, "RequiredAcks": "ONE"}'`