diff --git a/src/current/_includes/v25.2/sidebar-data/stream-data.json b/src/current/_includes/v25.2/sidebar-data/stream-data.json index 50a1dcce5bb..020861b68be 100644 --- a/src/current/_includes/v25.2/sidebar-data/stream-data.json +++ b/src/current/_includes/v25.2/sidebar-data/stream-data.json @@ -1,156 +1,168 @@ { - "title": "Stream Data", - "is_top_level": true, - "items": [ - { - "title": "Change Data Capture Overview", - "urls": [ - "/${VERSION}/change-data-capture-overview.html" - ] - }, - { - "title": "Get Started with Changefeeds", - "items": [ - { - "title": "Create and Configure Changefeeds", - "urls": [ - "/${VERSION}/create-and-configure-changefeeds.html" - ] - }, - { - "title": "Changefeed Best Practices", - "urls": [ - "/${VERSION}/changefeed-best-practices.html" - ] - }, - { - "title": "Changefeed Messages", - "urls": [ - "/${VERSION}/changefeed-messages.html" - ] - }, - { - "title": "Changefeed Sinks", - "urls": [ - "/${VERSION}/changefeed-sinks.html" - ] - }, - { - "title": "Changefeed Examples", - "urls": [ - "/${VERSION}/changefeed-examples.html" - ] - } - ] - }, - { - "title": "Monitor Changefeeds", - "items": [ - { - "title": "Overview", + "title": "Stream Data", + "is_top_level": true, + "items": [ + { + "title": "Change Data Capture Overview", + "urls": [ + "/${VERSION}/change-data-capture-overview.html" + ] + }, + { + "title": "Get Started with Changefeeds", + "items": [ + { + "title": "Create and Configure Changefeeds", "urls": [ - "/${VERSION}/monitor-and-debug-changefeeds.html" + "/${VERSION}/create-and-configure-changefeeds.html" ] - }, - { - "title": "Monitoring Guide", + }, + { + "title": "Changefeed Best Practices", "urls": [ - "/${VERSION}/changefeed-monitoring-guide.html" + "/${VERSION}/changefeed-best-practices.html" ] - }, - { - "title": "Protect Changefeed Data", - "urls": [ - "/${VERSION}/protect-changefeed-data.html" + }, + { + "title": "Changefeed Messages", + "items": [ + { + "title": "Overview", + "urls": [ + "/${VERSION}/changefeed-messages.html" + ] + }, + { + "title": "Message Envelope", + "urls":[ + "/${VERSION}/changefeed-message-envelopes.html" + ] + } ] - } - ] - }, - { - "title": "Optimize Changefeeds", + }, + { + "title": "Changefeed Sinks", + "urls": [ + "/${VERSION}/changefeed-sinks.html" + ] + }, + { + "title": "Changefeed Examples", + "urls": [ + "/${VERSION}/changefeed-examples.html" + ] + } + ] + }, + { + "title": "Monitor Changefeeds", + "items": [ + { + "title": "Overview", + "urls": [ + "/${VERSION}/monitor-and-debug-changefeeds.html" + ] + }, + { + "title": "Monitoring Guide", + "urls": [ + "/${VERSION}/changefeed-monitoring-guide.html" + ] + }, + { + "title": "Protect Changefeed Data", + "urls": [ + "/${VERSION}/protect-changefeed-data.html" + ] + } + ] + }, + { + "title": "Optimize Changefeeds", + "items": [ + { + "title": "Change Data Capture Queries", + "urls": [ + "/${VERSION}/cdc-queries.html" + ] + }, + { + "title": "Changefeeds on Tables with Column Families", + "urls": [ + "/${VERSION}/changefeeds-on-tables-with-column-families.html" + ] + }, + { + "title": "Export Data with Changefeeds", + "urls": [ + "/${VERSION}/export-data-with-changefeeds.html" + ] + }, + { + "title": "Changefeeds in Multi-Region Deployments", + "urls": [ + "/${VERSION}/changefeeds-in-multi-region-deployments.html" + ] + } + ] + }, + { + "title": "Changefeed Tutorials", "items": [ { - "title": "Change Data Capture Queries", - "urls": [ - "/${VERSION}/cdc-queries.html" + "title": "Stream a Changefeed to an Amazon MSK Cluster", + "items": [ + { + "title": "Amazon MSK", + "urls": [ + "/${VERSION}/stream-a-changefeed-to-amazon-msk.html" + ] + }, + { + "title": "Amazon MSK Serverless", + "urls": [ + "/${VERSION}/stream-a-changefeed-to-amazon-msk-serverless.html" + ] + } ] }, { - "title": "Changefeeds on Tables with Column Families", + "title": "Connect to a Changefeed Kafka Sink with OAuth Using Okta", "urls": [ - "/${VERSION}/changefeeds-on-tables-with-column-families.html" + "/${VERSION}/connect-to-a-changefeed-kafka-sink-with-oauth-using-okta.html" ] }, { - "title": "Export Data with Changefeeds", + "title": "Stream a Changefeed from CockroachDB Cloud to Snowflake", "urls": [ - "/${VERSION}/export-data-with-changefeeds.html" + "/cockroachcloud/stream-changefeed-to-snowflake-aws.html" ] }, { - "title": "Changefeeds in Multi-Region Deployments", + "title": "Stream a Changefeed to a Confluent Cloud Kafka Cluster", "urls": [ - "/${VERSION}/changefeeds-in-multi-region-deployments.html" + "/${VERSION}/stream-a-changefeed-to-a-confluent-cloud-kafka-cluster.html" ] } ] }, { - "title": "Changefeed Tutorials", - "items": [ - { - "title": "Stream a Changefeed to an Amazon MSK Cluster", - "items": [ - { - "title": "Amazon MSK", - "urls": [ - "/${VERSION}/stream-a-changefeed-to-amazon-msk.html" - ] - }, + "title": "Technical Overview", + "items": [ { - "title": "Amazon MSK Serverless", + "title": "How Does an Enterprise Changefeed Work?", "urls": [ - "/${VERSION}/stream-a-changefeed-to-amazon-msk-serverless.html" + "/${VERSION}/how-does-an-enterprise-changefeed-work.html" ] } ] }, { - "title": "Connect to a Changefeed Kafka Sink with OAuth Using Okta", - "urls": [ - "/${VERSION}/connect-to-a-changefeed-kafka-sink-with-oauth-using-okta.html" - ] - }, - { - "title": "Stream a Changefeed from CockroachDB Cloud to Snowflake", - "urls": [ - "/cockroachcloud/stream-changefeed-to-snowflake-aws.html" - ] - }, - { - "title": "Stream a Changefeed to a Confluent Cloud Kafka Cluster", + "title": "Advanced Changefeed Configuration", "urls": [ - "/${VERSION}/stream-a-changefeed-to-a-confluent-cloud-kafka-cluster.html" + "/${VERSION}/advanced-changefeed-configuration.html" ] } ] - }, - { - "title": "Technical Overview", - "items": [ - { - "title": "How Does an Enterprise Changefeed Work?", - "urls": [ - "/${VERSION}/how-does-an-enterprise-changefeed-work.html" - ] - } - ] - }, - { - "title": "Advanced Changefeed Configuration", - "urls": [ - "/${VERSION}/advanced-changefeed-configuration.html" - ] - } - ] -} + } + \ No newline at end of file diff --git a/src/current/v25.1/changefeed-messages.md b/src/current/v25.1/changefeed-messages.md index fcef98d0c3e..3345fa3269e 100644 --- a/src/current/v25.1/changefeed-messages.md +++ b/src/current/v25.1/changefeed-messages.md @@ -120,7 +120,7 @@ CREATE CHANGEFEED FOR TABLE rides INTO 'external://kafka' WITH diff, envelope=wr ### `bare` -`bare` removes the `after` key from the changefeed message and stores any metadata in a `crdb` field. When used with [`avro`](#avro) format, `record` will replace the `after` key. +`bare` removes the `after` key from the changefeed message and stores any metadata in a `crdb` field. When used with [`avro`]({% link {{ page.version.version }}/changefeed-messages.md %}#avro) format, `record` will replace the `after` key. - Cloud storage sink: @@ -181,7 +181,7 @@ CREATE CHANGEFEED FOR TABLE rides INTO 'external://kafka' WITH diff, envelope=wr ### `row` -`row` emits the row without any additional metadata fields in the message. This envelope option is only supported for [Kafka sinks]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka) or sinkless changefeeds. `row` does not support [`avro`](#avro) format—if you are using `avro`, refer to the [`bare`](#bare) envelope option. +`row` emits the row without any additional metadata fields in the message. This envelope option is only supported for [Kafka sinks]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka) or sinkless changefeeds. `row` does not support [`avro`]({% link {{ page.version.version }}/changefeed-messages.md %}#avro) format—if you are using `avro`, refer to the [`bare`](#bare) envelope option. - Kafka sink: diff --git a/src/current/v25.2/cdc-queries.md b/src/current/v25.2/cdc-queries.md index 116c00311b0..043f89efd79 100644 --- a/src/current/v25.2/cdc-queries.md +++ b/src/current/v25.2/cdc-queries.md @@ -62,7 +62,7 @@ The following table outlines functions that are useful with CDC queries: Function | Description --------------------------+---------------------- `changefeed_creation_timestamp()` | Returns the decimal MVCC timestamp when the changefeed was created. Use this function to build CDC queries that restrict emitted events by time. `changefeed_creation_timestamp()` can serve a similar purpose to the [`now()` time function]({% link {{ page.version.version }}/functions-and-operators.md %}#date-and-time-functions), which is not supported with CDC queries. -`event_op()` | Returns a string describing the type of event. If a changefeed is running with the [`diff`]({% link {{ page.version.version }}/create-changefeed.md %}#diff) option, then this function returns `'insert'`, `'update'`, or `'delete'`. If a changefeed is running without the `diff` option, it is not possible to determine an update from an insert, so `event_op()` returns [`'upsert'`](https://www.cockroachlabs.com/blog/sql-upsert/) or `'delete'`. +`event_op()` | Returns a string describing the type of event. If a changefeed is running with the [`diff`]({% link {{ page.version.version }}/create-changefeed.md %}#diff) option, then this function returns `'insert'`, `'update'`, or `'delete'`. If a changefeed is running without the `diff` option, it is not possible to determine an update from an insert, so `event_op()` returns [`'upsert'`](https://www.cockroachlabs.com/blog/sql-upsert/) or `'delete'`.

If you're using CDC queries to filter only for the type of change operation, we recommend specifying the [`envelope=enriched` option]({% link {{ page.version.version }}/changefeed-message-envelopes.md %}#route-events-based-on-operation-type) for this metadata instead. `event_schema_timestamp()` | Returns the timestamp of [schema change]({% link {{ page.version.version }}/online-schema-changes.md %}) events that cause a [changefeed message]({% link {{ page.version.version }}/changefeed-messages.md %}) to emit. When the schema change event does not result in a table backfill or scan, `event_schema_timestamp()` will return the event's timestamp. When the schema change event does result in a table backfill or scan, `event_schema_timestamp()` will return the timestamp at which the backfill/scan is read — the [high-water mark time]({% link {{ page.version.version }}/how-does-an-enterprise-changefeed-work.md %}) of the changefeed. You can also use the following functions in CDC queries: @@ -102,7 +102,7 @@ You can **not** use the following functions with CDC queries: CDC queries allow you to customize your changefeed for particular scenarios. This section outlines several possible use cases for CDC queries. -{% include {{ page.version.version }}/cdc/bare-envelope-cdc-queries.md %} Refer to the [Changefeed Messages]({% link {{ page.version.version }}/changefeed-messages.md %}#bare) page for more detail. +{% include {{ page.version.version }}/cdc/bare-envelope-cdc-queries.md %} Refer to the [Changefeed Message Envelopes]({% link {{ page.version.version }}/changefeed-message-envelopes.md %}#bare) page for more detail. Depending on how you are filtering or adapting the message envelope with a CDC query and which sink you're emitting to, message output may vary from some of the example cases in this section. diff --git a/src/current/v25.2/changefeed-examples.md b/src/current/v25.2/changefeed-examples.md index d3ece7031bd..d083e452785 100644 --- a/src/current/v25.2/changefeed-examples.md +++ b/src/current/v25.2/changefeed-examples.md @@ -680,7 +680,7 @@ In this example, you'll set up a changefeed for a single-node cluster that is co 2021/08/24 14:00:22 {"payload":[{"after":{"city":"san francisco","creation_time":"2019-01-02T03:04:05","current_location":"3893 Dunn Fall Apt. 11","ext":{"color":"black"},"id":"21b2ec54-81ad-4af7-a76d-6087b9c7f0f8","dog_owner_id":"8924c3af-ea6e-4e7e-b2c8-2e318f973393","status":"lost","type":"scooter"},"key":["san francisco","21b2ec54-81ad-4af7-a76d-6087b9c7f0f8"],"topic":"vehicles","updated":"1629813621680097993.0000000000"}],"length":1} ~~~ - For more detail on emitted changefeed messages, see [responses]({% link {{ page.version.version }}/changefeed-messages.md %}#responses). + For more detail on emitted changefeed messages, refer to the [Changefeed Messages]({% link {{ page.version.version }}/changefeed-messages.md %}) page. ## Create a changefeed connected to an Apache Pulsar sink @@ -769,7 +769,7 @@ In this example, you'll set up a changefeed for a single-node cluster that is co key:[null], properties:[], content:{"Key":["rome", "3c7d6676-f713-4985-ba52-4c19fe6c3692"],"Value":{"after": {"city": "rome", "end_address": null, "end_time": null, "id": "3c7d6676-f713-4985-ba52-4c19fe6c3692", "revenue": 27.00, "rider_id": "c15a4926-fbb2-4931-a9a0-6dfabc6c506b", "start_address": "39415 Brandon Avenue Apt. 29", "start_time": "2024-05-09T12:18:42.055498", "vehicle_city": "rome", "vehicle_id": "627dad1a-3531-4214-a173-16bcc6b93036"}},"Topic":"rides"} ~~~ - For more detail on emitted changefeed messages, refer to [Responses]({% link {{ page.version.version }}/changefeed-messages.md %}#responses). + For more detail on emitted changefeed messages, refer to the [Changefeed Messages]({% link {{ page.version.version }}/changefeed-messages.md %}) page. diff --git a/src/current/v25.2/changefeed-for.md b/src/current/v25.2/changefeed-for.md index 77cfb2eb807..4a0829c7ebb 100644 --- a/src/current/v25.2/changefeed-for.md +++ b/src/current/v25.2/changefeed-for.md @@ -66,7 +66,7 @@ Option | Value | Description `confluent_schema_registry` | Schema Registry address | The [Schema Registry](https://docs.confluent.io/current/schema-registry/docs/index.html#sr) address is required to use `avro`. `cursor` | [Timestamp]({% link {{ page.version.version }}/as-of-system-time.md %}#parameters) | Emits any changes after the given timestamp, but does not output the current state of the table first. If `cursor` is not specified, the changefeed starts by doing a consistent scan of all the watched rows and emits the current value, then moves to emitting any changes that happen after the scan.

`cursor` can be used to start a new changefeed where a previous changefeed ended.

Example: `CURSOR=1536242855577149065.0000000000` `end_time` | [Timestamp]({% link {{ page.version.version }}/as-of-system-time.md %}#parameters) | Indicate the timestamp up to which the changefeed will emit all events and then complete with a `successful` status. Provide a future timestamp to `end_time` in number of nanoseconds since the [Unix epoch](https://wikipedia.org/wiki/Unix_time). For example, `end_time="1655402400000000000"`. -`envelope` | `wrapped` / `bare` / `key_only` / `row` | `wrapped` the default envelope structure for changefeed messages containing an array of the primary key, a top-level field for the type of message, and the current state of the row (or `null` for deleted rows).

`bare` removes the `after` key from the changefeed message. When used with `avro` format, `record` will replace the `after` key.

`key_only` emits only the key and no value, which is faster if you only need to know the key of the changed row.

`row` emits the row without any additional metadata fields in the message. `row` does not support [`avro` format](#format).

Refer to [Responses]({% link {{ page.version.version }}/changefeed-messages.md %}#responses) for more detail on message format.

Default: `envelope=wrapped`. +`envelope` | `wrapped` / `enriched` / `bare` / `key_only` / `row` | `wrapped` the default envelope structure for changefeed messages containing an array of the primary key, a top-level field for the type of message, and the current state of the row (or `null` for deleted rows).

Refer to [Changefeed Message Envelopes]({% link {{ page.version.version }}/changefeed-message-envelopes.md %}) page for more detail on each envelope.

Default: `envelope=wrapped`. `format` | `json` / `avro` / `csv` / `parquet` | Format of the emitted message.

`avro`: For mappings of CockroachDB types to Avro types, [refer to the table]({% link {{ page.version.version }}/changefeed-messages.md %}#avro-types) and detail on [Avro limitations](#avro-limitations). **Note:** [`confluent_schema_registry`](#confluent-registry) is required with `format=avro`.

`csv`: You cannot combine `format=csv` with the `diff` or [`resolved`](#resolved-option) options. Changefeeds use the same CSV format as the [`EXPORT`](export.html) statement. Refer to [Export data with changefeeds]({% link {{ page.version.version }}/export-data-with-changefeeds.md %}) for details using these options to create a changefeed as an alternative to `EXPORT`. **Note:** [`initial_scan = 'only'`](#initial-scan) is required with `format=csv`.

`parquet`: Cloud storage is the only supported sink. The `topic_in_value` option is not compatible with `parquet` format.

Default: `format=json`. `initial_scan` / `no_initial_scan` / `initial_scan_only` | N/A | Control whether or not an initial scan will occur at the start time of a changefeed. `initial_scan_only` will perform an initial scan and then the changefeed job will complete with a `successful` status. You cannot use [`end_time`](#end-time) and `initial_scan_only` simultaneously.

If none of these options are specified, an initial scan will occur if there is no [`cursor`](#cursor-option), and will not occur if there is one. This preserves the behavior from previous releases.

You cannot specify `initial_scan` and `no_initial_scan` or `no_initial_scan and` `initial_scan_only` simultaneously.

Default: `initial_scan`
If used in conjunction with `cursor`, an initial scan will be performed at the cursor timestamp. If no `cursor` is specified, the initial scan is performed at `now()`. `min_checkpoint_frequency` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Controls how often nodes flush their progress to the [coordinating changefeed node]({% link {{ page.version.version }}/how-does-an-enterprise-changefeed-work.md %}). Changefeeds will wait for at least the specified duration before a flushing. This can help you control the flush frequency to achieve better throughput. If this is set to `0s`, a node will flush as long as the high-water mark has increased for the ranges that particular node is processing. If a changefeed is resumed, then `min_checkpoint_frequency` is the amount of time that changefeed will need to catch up. That is, it could emit duplicate messages during this time.

**Note:** [`resolved`](#resolved-option) messages will not be emitted more frequently than the configured `min_checkpoint_frequency` (but may be emitted less frequently). Since `min_checkpoint_frequency` defaults to `30s`, you **must** configure `min_checkpoint_frequency` to at least the desired `resolved` message frequency if you require `resolved` messages more frequently than `30s`.

**Default:** `30s` @@ -74,7 +74,7 @@ Option | Value | Description `resolved` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Emit [resolved timestamps]({% link {{ page.version.version }}/changefeed-messages.md %}#resolved-messages) for the changefeed. Resolved timestamps do not emit until all ranges in the changefeed have progressed to a specific point in time.

Set a minimum amount of time that the changefeed's high-water mark (overall resolved timestamp) must advance by before another resolved timestamp is emitted. Example: `resolved='10s'`. This option will **only** emit a resolved timestamp if the timestamp has advanced (and by at least the optional duration, if set). If a duration is unspecified, all resolved timestamps are emitted as the high-water mark advances.

**Note:** If you set `resolved` lower than `30s`, then you **must** also set the [`min_checkpoint_frequency`](#min-checkpoint-frequency) option to at minimum the same value as `resolved`, because `resolved` messages may be emitted less frequently than `min_checkpoint_frequency`, but cannot be emitted more frequently.

Refer to [Resolved messages]({% link {{ page.version.version }}/changefeed-messages.md %}#resolved-messages) for more detail. `split_column_families` | N/A | Target a table with multiple columns families. Emit messages for each column family in the target table. Each message will include the label: `table.family`. `updated` | N/A | Include updated timestamps with each row. -`virtual_columns` | `STRING` | Changefeeds omit [virtual computed columns]({% link {{ page.version.version }}/computed-columns.md %}) from emitted [messages]({% link {{ page.version.version }}/changefeed-messages.md %}#responses) by default. To maintain the behavior of previous CockroachDB versions where the changefeed would emit [`NULL`]({% link {{ page.version.version }}/null-handling.md %}) values for virtual computed columns, set `virtual_columns = "null"` when you start a changefeed.

You may also define `virtual_columns = "omitted"`, though this is already the default behavior for v22.1+. If you do not set `"omitted"` on a table with virtual computed columns when you create a changefeed, you will receive a warning that changefeeds will filter out virtual computed values.

**Default:** `"omitted"` +`virtual_columns` | `STRING` | Changefeeds omit [virtual computed columns]({% link {{ page.version.version }}/computed-columns.md %}) from emitted [messages]({% link {{ page.version.version }}/changefeed-messages.md %}) by default. To maintain the behavior of previous CockroachDB versions where the changefeed would emit [`NULL`]({% link {{ page.version.version }}/null-handling.md %}) values for virtual computed columns, set `virtual_columns = "null"` when you start a changefeed.

You may also define `virtual_columns = "omitted"`, though this is already the default behavior for v22.1+. If you do not set `"omitted"` on a table with virtual computed columns when you create a changefeed, you will receive a warning that changefeeds will filter out virtual computed values.

**Default:** `"omitted"` #### Avro limitations diff --git a/src/current/v25.2/changefeed-message-envelopes.md b/src/current/v25.2/changefeed-message-envelopes.md new file mode 100644 index 00000000000..0042b0ff4f7 --- /dev/null +++ b/src/current/v25.2/changefeed-message-envelopes.md @@ -0,0 +1,968 @@ +--- +title: Changefeed Message Envelope +summary: Learn how to configure the changefeed message envelope. +toc: true +--- + +In CockroachDB, the changefeed _envelope_ is the structure of each [message]({% link {{ page.version.version }}/changefeed-messages.md %}) emitted to the [sink]({% link {{ page.version.version }}/changefeed-sinks.md %}). Changefeeds package _events_, triggered by an update to a row in a [watched table]({% link {{ page.version.version }}/change-data-capture-overview.md %}#watched-table), into messages according to the configured envelope. By default, changefeed messages use the [`wrapped` envelope](#wrapped), which includes the primary key of the changed row and a top-level field indicating the value of the row after the change event.​ + +You can use changefeed [options](#option-reference) to customize message contents in order to integrate with your downstream requirements. For example, the previous state of the row, the origin cluster's metadata, or the schema of the event payload in the message. Envelope configuration also allows you to prioritize change event metadata versus changefeed throughput. + +The possible envelope fields support use cases such as: + +- [Auditing](#audit-changes-in-data) changes in data. +- Enabling [full-fidelity changefeed messages](#enable-full-fidelity-message-envelopes). +- Routing events based on [operation type](#route-events-based-on-operation-type). +- Automatically [generating or synchronizing schemas](#add-envelope-schema-fields) in downstream consumers. + +This page covers: + +- [Overview](#overview) for a brief summary of the CockroachDB changefeed envelope fields. +- [Use case](#use-cases) examples. +- Reference lists: + - The [options](#option-reference) to configure the envelope. + - The supported envelope [fields](#field-reference). + +{{site.data.alerts.callout_info}} +You can also specify the _format_ of changefeed messages, such as Avro. For more details, refer to [Message formats]({% link {{ page.version.version }}/changefeed-messages.md %}#message-formats). +{{site.data.alerts.end}} + +## Overview + +The envelope of a changefeed message depends on the following: + +- The [configured envelope and changefeed options](#option-reference) +- The [sink]({% link {{ page.version.version }}/changefeed-sinks.md %}) +- The [message format]({% link {{ page.version.version }}/changefeed-messages.md %}#message-formats) + +The envelope can contain any of the outlined fields in this section. For more details, refer to the complete reference lists that explain each [envelope field](#field-reference) and [configurable option](#option-reference) to build a changefeed's envelope structure. + +The default [`wrapped` envelope](#wrapped) will be similar in structure to: + +~~~json +{ + "after": { + "col_a": "new_value", + // ... other columns ... + }, +}, +~~~ + +You can optionally include the state of the row [before and after](#audit-changes-in-data) the change event and [operation](#route-events-based-on-operation-type), [origin](#preserve-the-origin-of-data), and [timestamp metadata](#updated) about the change. It is important to consider that increasing the size of each message can have an impact on changefeed throughput. + +When you include metadata options, this part of the message will be similar in structure to: + +~~~json +// ... + "after": { + "col_a": "new_value", + // ... other columns ... + }, + "before": { + "col_a": "old_value", + // ... other columns ... + }, + "op": "c", + "source": { + // ... Metadata source fields + }, + "ts_ns": 1745527444910044000 +// ... +~~~ + +You can optionally include the schema definition of the payload, which will mark a field's data type. It will also describe whether the field is always present (`false`), or can be missing from the envelope (`true`) depending on the configuration. The structure of the schema definition in the message will be similar to: + +~~~json +// ... + "schema": { + "type": "struct", + "name": "cockroachdb.envelope", // Schema identifier + "optional": false, + "fields": [ + { + "field": "before", + "type": "struct", + "optional": true, + "fields": [ + // Schema for row before change (same as the table schema) + ] + }, + { + "field": "after", + "type": "struct", + "optional": true, + "fields": [ + // Schema for row after change (same as the table schema) + ] + }, + { + "field": "op", + "type": "string", + "optional": false // Operation type + }, +// ... Schema definition of additional fields source, ts_ns ... +~~~ + +## Use cases + +The use case examples in the following sections emit to a Kafka sink. Review the [Options](#option-reference) table for option sink support. + +Each example uses the following table schema: + +{% include_cached copy-clipboard.html %} +~~~sql +CREATE TABLE public.products ( + id UUID NOT NULL DEFAULT gen_random_uuid(), + name STRING NOT NULL, + description STRING NULL, + price DECIMAL(10,2) NOT NULL, + in_stock BOOL NULL DEFAULT true, + category STRING NULL, + created_at TIMESTAMP NULL DEFAULT current_timestamp():::TIMESTAMP, + CONSTRAINT products_pkey PRIMARY KEY (id ASC) +); +~~~ + +{{site.data.alerts.callout_info}} +The values that the `envelope` option accepts are compatible with different [changefeed sinks]({% link {{ page.version.version }}/changefeed-sinks.md %}), and the structure of the message will vary depending on the sink. +{{site.data.alerts.end}} + +### Audit changes in data + +You can include both the previous and updated states of a row in the message envelope to support use cases like auditing or applying change-based logic in downstream systems. Use the [`diff`](#diff-option) option with [`CREATE CHANGEFEED`]({% link {{ page.version.version }}/create-changefeed.md %}) to include the previous state of the row: + +{% include_cached copy-clipboard.html %} +~~~sql +CREATE CHANGEFEED FOR TABLE products INTO 'kafka://localhost:9092' WITH diff; +~~~ + +The `diff` option adds the [`"before"`](#before) field to the envelope containing the state of the row before the change: + +~~~json +{ + "after": { + "category": "Home & Kitchen", + "created_at": "2025-04-01T17:55:46.812942", + "description": "Adjustable LED desk lamp with touch controls", + "id": "32856ed8-34d3-45a3-a449-412bdeaa277c", + "in_stock": true, + "name": "LED Desk Lamp", + "price": 26.30 + }, + "before": { + "category": "Home & Kitchen", + "created_at": "2025-04-01T17:55:46.812942", + "description": "Adjustable LED desk lamp with touch controls", + "id": "32856ed8-34d3-45a3-a449-412bdeaa277c", + "in_stock": true, + "name": "LED Desk Lamp", + "price": 22.30 + } +} +~~~ + +For an insert into the table, the `"before"` field contains `null`: + +~~~json +{ + "after": { + "category": "Electronics", + "created_at": "2025-04-23T13:48:40.981735", + "description": "Over-ear headphones with active noise cancellation", + "id": "3d8f4ca4-36e9-43b2-b057-d691624a4cba", + "in_stock": true, + "name": "Noise Cancelling Headphones", + "price": 129.50 + }, + "before": null +} +~~~ + +### Route events based on operation type + +{{site.data.alerts.callout_info}} +{% include feature-phases/preview.md %} +{{site.data.alerts.end}} + +{% include_cached new-in.html version="v25.2" %} You may want to route change events in a table based on the operation type (insert, update, delete), which can be useful for correctly applying or handling changes in a downstream system or replication pipeline. Use the [`envelope=enriched`](#enriched) option with [`CREATE CHANGEFEED`]({% link {{ page.version.version }}/create-changefeed.md %}) to include the [`"op"`](#op) field in the envelope: + +{% include_cached copy-clipboard.html %} +~~~ sql +CREATE CHANGEFEED FOR TABLE products INTO 'external://kafka:9092' WITH envelope=enriched; +~~~ +~~~json +{ + "after": { + "category": "Home & Kitchen", + "created_at": "2025-04-01T17:55:46.812942", + "description": "Adjustable LED desk lamp with touch controls", + "id": "32856ed8-34d3-45a3-a449-412bdeaa277c", + "in_stock": true, + "name": "LED Desk Lamp", + "price": 22.30 + }, + "op": "c", + "ts_ns": 1743792394409866000 +} +~~~ + +The `"op"` field can contain: + +- `"c"` for [inserts]({% link {{ page.version.version }}/insert.md %}). +- `"u"` for [updates]({% link {{ page.version.version }}/update.md %}). +- `"d"` for [deletes]({% link {{ page.version.version }}/delete.md %}). + +The [`"ts_ns"`](#ts_ns) timestamp, included in the envelope when you specify only the [`envelope=enriched`](#enriched) option, is the time the message was processed by the changefeed job. If you require timestamps to order messages based on the change event's commit time, then you must specify `envelope=enriched, enriched_properties=source, updated` when you create the changefeed, which will include `"ts_hlc"` and `"ts_ns"` in the [`"source"`](#source) field. + +### Add envelope schema fields + +{{site.data.alerts.callout_info}} +{% include feature-phases/preview.md %} +{{site.data.alerts.end}} + +{% include_cached new-in.html version="v25.2" %} Adding the schema of the event payload to the message envelope allows you to: + +- Handle schema changes in downstream processing systems that require field types. +- Detect and adapt to changes in the table schema over time. +- Correctly parse and cast the data to deserialize into a different format. +- Automatically generate or synchronize schemas in downstream systems. +- Verify critical fields are present and set up alerts based on this. + +It is important to consider that adding the schema of the event payload can increase the size of each message significantly, which can have an impact on changefeed throughput. + +Use the [`envelope=enriched, enriched_properties=schema`](#enriched-properties-option) options with [`CREATE CHANGEFEED`]({% link {{ page.version.version }}/create-changefeed.md %}) to include the `"schema"` top-level field and the [schema fields and types](#schema): + +{% include_cached copy-clipboard.html %} +~~~ sql +CREATE CHANGEFEED FOR TABLE products INTO 'external://kafka:9092' WITH envelope=enriched, enriched_properties=schema; +~~~ +~~~json +{ + "payload": { + "after": { + "category": "Home & Kitchen", + "created_at": "2025-04-01T17:55:46.812942", + "description": "Ceramic mug with 350ml capacity", + "id": "8320b051-3ff7-4aa8-9708-78142fde7e31", + "in_stock": true, + "name": "Coffee Mug", + "price": 12.50 + }, + "op": "c", + "ts_ns": 1745353048801146000 + }, + "schema": { + "fields": [ + { + "field": "after", + "fields": [ + { + "field": "id", + "optional": false, + "type": "string" + }, + { + "field": "name", + "optional": false, + "type": "string" + }, + { + "field": "description", + "optional": true, + "type": "string" + }, + { + "field": "price", + "name": "decimal", + "optional": false, + "parameters": { + "precision": "10", + "scale": "2" + }, + "type": "float64" + }, + { + "field": "in_stock", + "optional": true, + "type": "boolean" + }, + { + "field": "category", + "optional": true, + "type": "string" + }, + { + "field": "created_at", + "name": "timestamp", + "optional": true, + "type": "string" + } + ], + "name": "products.after.value", + "optional": false, + "type": "struct" + }, + { + "field": "ts_ns", + "optional": false, + "type": "int64" + }, + { + "field": "op", + "optional": false, + "type": "string" + } + ], + "name": "cockroachdb.envelope", + "optional": false, + "type": "struct" + } +} +~~~ + +### Preserve the origin of data + +{{site.data.alerts.callout_info}} +{% include feature-phases/preview.md %} +{{site.data.alerts.end}} + +{% include_cached new-in.html version="v25.2" %} When you have multiple changefeeds running from your cluster, or your CockroachDB cluster is part of a multi-architecture system, it is useful to have metadata on the origin of changefeed data in the message envelope. + +Use the [`envelope=enriched, enriched_properties=source`](#enriched-properties-option) options with [`CREATE CHANGEFEED`]({% link {{ page.version.version }}/create-changefeed.md %}) to include the [`"source"`](#source) top-level field that contains metadata for the origin cluster and the changefeed job: + +{% include_cached copy-clipboard.html %} +~~~sql +CREATE CHANGEFEED FOR TABLE products INTO 'kafka://localhost:9092' WITH envelope=enriched, enriched_properties=source; +~~~ +~~~json +{ + "after": { + "category": "Electronics", + "created_at": "2025-04-24T14:59:28.96273", + "description": "Portable speaker with Bluetooth 5.0", + "id": "58390d92-2472-43e1-86bc-1642395e8dad", + "in_stock": true, + "name": "Bluetooth Speaker", + "price": 45.00 + }, + "op": "c", + "source": { + "changefeed_sink": "kafka", + "cluster_id": "38269e9c-9823-4568-875e-000000000000", + "cluster_name": "", + "database_name": "test", + "db_version": "v25.2.0-beta.2", + "job_id": "1066457644516704257", + "node_id": "2", + "node_name": "localhost", + "origin": "cockroachdb", + "primary_keys": [ + "id" + ], + "schema_name": "public", + "source_node_locality": "", + "table_name": "products" + }, + "ts_ns": 1745527444910044000 +} +~~~ + +### Enable full-fidelity message envelopes + +{{site.data.alerts.callout_info}} +{% include feature-phases/preview.md %} +{{site.data.alerts.end}} + +{% include_cached new-in.html version="v25.2" %} A _full-fidelity_ changefeed message envelope ensures complete information about every change event in the [watched tables]({% link {{ page.version.version }}/change-data-capture-overview.md %}#watched-table)—including the before and after state of a row, timestamps, and rich metadata like source and schema information. This type of configured envelope allows downstream consumers to replay, audit, debug, or analyze changes with no loss of context. + +It is important to consider that a full-fidelity envelope increases the size of each message significantly, which can have an impact on changefeed throughput. + +Use the [`mvcc_timestamp`](#mvcc-timestamp-option), [`envelope=enriched, enriched_properties='source,schema'`](#enriched-properties-option), [`diff`](#diff-option), and [`key_in_value`](#key-in-value-option) (for Kafka) options with [`CREATE CHANGEFEED`]({% link {{ page.version.version }}/create-changefeed.md %}) to create a full-fidelity envelope: + +{% include_cached copy-clipboard.html %} +~~~sql +CREATE CHANGEFEED FOR TABLE products INTO 'kafka://localhost:9092' WITH envelope=enriched, enriched_properties='source,schema', diff, mvcc_timestamp, updated, key_in_value; +~~~ +~~~json +{ + "payload": { + "after": { + "category": "Home & Kitchen", + "created_at": "2025-04-30T20:02:35.40316", + "description": "Ceramic mug with 350ml capacity", + "id": "98879aa0-72bc-4b12-a128-325a4753162e", + "in_stock": true, + "name": "Coffee Mug", + "price": 12.50 + }, + "before": { + "category": "Home & Kitchen", + "created_at": "2025-04-30T20:02:35.40316", + "description": "Ceramic mug with 350ml capacity", + "id": "98879aa0-72bc-4b12-a128-325a4753162e", + "in_stock": true, + "name": "Coffee Mug", + "price": 12.50 + }, + "key": { + "id": "98879aa0-72bc-4b12-a128-325a4753162e" + }, + "op": "u", + "source": { + "changefeed_sink": "kafka", + "cluster_id": "585e6512-ea54-490a-8f1d-50c8d182a2e6", + "cluster_name": "", + "database_name": "test", + "db_version": "v25.2.0-beta.2", + "job_id": "1072079179268751361", + "mvcc_timestamp": "1747243394200039000.0000000000", + "node_id": "1", + "node_name": "localhost", + "origin": "cockroachdb", + "primary_keys": ["id"], + "schema_name": "public", + "source_node_locality": "", + "table_name": "products", + "ts_hlc": "1747243394200039000.0000000000", + "ts_ns": 1747243394200039000 + }, + "ts_ns": 1747243394886231000 + }, + "schema": { + "fields": [ + { + "field": "before", + "fields": [ + { "field": "id", "optional": false, "type": "string" }, + { "field": "name", "optional": false, "type": "string" }, + { "field": "description", "optional": true, "type": "string" }, + { + "field": "price", + "name": "decimal", + "optional": false, + "parameters": { "precision": "10", "scale": "2" }, + "type": "float64" + }, + { "field": "in_stock", "optional": true, "type": "boolean" }, + { "field": "category", "optional": true, "type": "string" }, + { + "field": "created_at", + "name": "timestamp", + "optional": true, + "type": "string" + } + ], + "name": "products.before.value", + "optional": true, + "type": "struct" + }, + { + "field": "after", + "fields": [ + { "field": "id", "optional": false, "type": "string" }, + { "field": "name", "optional": false, "type": "string" }, + { "field": "description", "optional": true, "type": "string" }, + { + "field": "price", + "name": "decimal", + "optional": false, + "parameters": { "precision": "10", "scale": "2" }, + "type": "float64" + }, + { "field": "in_stock", "optional": true, "type": "boolean" }, + { "field": "category", "optional": true, "type": "string" }, + { + "field": "created_at", + "name": "timestamp", + "optional": true, + "type": "string" + } + ], + "name": "products.after.value", + "optional": false, + "type": "struct" + }, + { + "field": "source", + "fields": [ + { "field": "ts_ns", "optional": true, "type": "int64" }, + { "field": "database_name", "optional": false, "type": "string" }, + { "field": "cluster_name", "optional": false, "type": "string" }, + { "field": "cluster_id", "optional": false, "type": "string" }, + { "field": "node_name", "optional": false, "type": "string" }, + { "field": "mvcc_timestamp", "optional": true, "type": "string" }, + { "field": "origin", "optional": false, "type": "string" }, + { "field": "db_version", "optional": false, "type": "string" }, + { "field": "source_node_locality", "optional": false, "type": "string" }, + { "field": "table_name", "optional": false, "type": "string" }, + { "field": "changefeed_sink", "optional": false, "type": "string" }, + { "field": "job_id", "optional": false, "type": "string" }, + { "field": "node_id", "optional": false, "type": "string" }, + { "field": "ts_hlc", "optional": true, "type": "string" }, + { "field": "schema_name", "optional": false, "type": "string" }, + { + "field": "primary_keys", + "items": { "optional": false, "type": "string" }, + "optional": false, + "type": "array" + } + ], + "name": "cockroachdb.source", + "optional": true, + "type": "struct" + }, + { + "field": "key", + "fields": [ + { "field": "id", "optional": false, "type": "string" } + ], + "name": "products.key", + "optional": false, + "type": "struct" + }, + { "field": "ts_ns", "optional": false, "type": "int64" }, + { "field": "op", "optional": false, "type": "string" } + ], + "name": "cockroachdb.envelope", + "optional": false, + "type": "struct" + } +} +~~~ + +## Option reference + +For a full list of options that modify the message envelope, refer to the following table: + +Option | Description | Sink support +-------+-------------+-------------+------------- +`diff` | Include a [`"before"`](#before) field in each message, showing the state of the row before the change. Supported with [`wrapped`](#wrapped-option) or [`enriched`](#enriched-option) envelopes. | All +New in v25.2: `enriched_properties` (**Preview**) | (Only applicable when [`envelope=enriched`](#enriched-option) is set) Specify the type of metadata included in the message payload. Values: `source`, `schema`. | Kafka, Pub/Sub, webhook, sinkless +`envelope=bare` | Emit an envelope without the `"after"` wrapper. The row's column data is at the top level of the message. Metadata that would typically be separate will be under a [`"__crdb__"`](#__crdb__) field. Provides a more compact structure to the envelope. `bare` is the default envelope when using [CDC queries]({% link {{ page.version.version }}/cdc-queries.md %}). When `bare` is used with the Avro format, `"record"` will replace the `"after"` keyword. | All +New in v25.2: `envelope=enriched` (**Preview**) | Extend the envelope with [additional metadata fields](#field-reference). With `enriched_properties`, includes a [`"source"`](#source) field and/or a [`"schema"`](#schema) field with extra context. Supported in JSON and Avro message formats. | Kafka, Pub/Sub, webhook, sinkless +`envelope=key_only` | [Send only the primary key](#key_only) of the changed row and no value payload, which is more efficient if only the key of the changed row is needed. Not compatible with the `updated` option. | Kafka, sinkless +`envelope=row` | Emit the row data without any additional metadata field in the envelope. Not supported in Avro format or with the [`diff`](#diff-option) option. | Kafka, sinkless +`envelope=wrapped` (default) | Produce changefeed messages in a wrapped structure with metadata and row data. [`wrapped`](#wrapped) includes an [`"after"`](#after) field, and optionally a [`"before"`](#before) field if [`diff`](#diff-option) is used. **Note:** Envelopes contain a primary key when your changefeed is emitting to a sink that does not have a message key as part of its protocol. By default, messages emitting to Kafka sinks do not have the primary key field. Use the [`key_in_value`](#key-in-value-option) option to include a primary key array field in messages emitted to [Kafka sinks]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka). | All +`full_table_name` | Use the [fully qualified table name]({% link {{ page.version.version }}/sql-name-resolution.md %}) (`database.schema.table`) in topics, subjects, schemas, and record output instead of the default table name. Including the full table name prevents unintended behavior when the same table name is present in multiple databases. | All +`key_in_value` | Add a primary key array to the emitted message in Kafka sinks. This makes the primary key of a deleted row recoverable in sinks where each message has a value, but not a key. `key_in_value` is on by default in cloud storage and webhook sinks. To only emit the primary key of the changed row in Kafka sinks, use [`envelope=key_only`](#key_only). | Kafka +`mvcc_timestamp` | Emit the [MVCC]({% link {{ page.version.version }}/architecture/storage-layer.md %}#mvcc) timestamp for each change event. The message envelope contains the MVCC timestamp of the changed row, even during the changefeed's initial scan. Provides a precise database commit timestamp, which is useful for debugging or strict ordering. | All +`resolved` | Emit `resolved` timestamps in a format depending on the connected sink. **Note:** The `resolved` timestamp is emitted as a separate message, and has its own envelope containing a `resolved` key and a timestamp value as a string. For more details on the `resolved` options, refer to [Resolved messages]({% link {{ page.version.version }}/changefeed-messages.md %}#resolved-messages). | All +`updated` | Add an [`"updated"`](#updated) timestamp field to each message, showing the commit time of the change. When the changefeed runs an initial scan or a [schema change backfill]({% link {{ page.version.version }}/changefeed-messages.md %}#schema-changes-with-column-backfill), the `"updated"` field will reflect the time of the scan or backfill, not the MVCC timestamp. If you use `updated` with `envelope=enriched`, you must also specify [`enriched_properties=source`](#enriched-properties-option), and then the `"updated"` field will be replaced with `"ts_ns"` and `"ts_hlc"` in the [`"source"`](#source) fields. **Note:** `envelope=enriched` with the `updated` option will not produce a change event commit timestamp in the message—to include the timestamp, use `updated` with `envelope=enriched, enriched_properties=source, updated`. | All + +### `envelope` option examples + +#### `wrapped` + +`wrapped` is the default envelope structure for changefeed messages. This envelope contains an array of the primary key (or the key as part of the message metadata), a top-level field for the type of message, and the current state of the row (or `NULL` for [deleted rows]({% link {{ page.version.version }}/changefeed-messages.md %}#delete-messages)). + +By default, messages emitted to Kafka sinks do not have the primary key array field. If you would like messages emitted to Kafka sinks to contain a primary key array field, you can use the [`key_in_value`]({% link {{ page.version.version }}/create-changefeed.md %}#key-in-value) option. Refer to the following message outputs for examples of this. + +Cloud storage sink: + +~~~sql +CREATE CHANGEFEED FOR TABLE vehicles INTO 'external://cloud'; +~~~ +~~~ +{"after": {"city": "seattle", "creation_time": "2019-01-02T03:04:05", "current_location": "86359 Jeffrey Ranch", "ext": {"color": "yellow"}, "id": "68ee1f95-3137-48e2-8ce3-34ac2d18c7c8", "owner_id": "570a3d70-a3d7-4c00-8000-000000000011", "status": "in_use", "type": "scooter"}, "key": ["seattle", "68ee1f95-3137-48e2-8ce3-34ac2d18c7c8"]} +~~~ + +Kafka sink: + +Default when `envelope=wrapped` or `envelope` is not specified: + +~~~sql +CREATE CHANGEFEED FOR TABLE vehicles INTO 'external://kafka'; +~~~ +~~~ +{"after": {"city": "washington dc", "creation_time": "2019-01-02T03:04:05", "current_location": "24315 Elizabeth Mountains", "ext": {"color": "yellow"}, "id": "dadc1c0b-30f0-4c8b-bd16-046c8612bbea", "owner_id": "034075b6-5380-4996-a267-5a129781f4d3", "status": "in_use", "type": "scooter"}} +~~~ + +Kafka sink message with `key_in_value` provided: + +~~~sql +CREATE CHANGEFEED FOR TABLE vehicles INTO 'external://kafka' WITH key_in_value, envelope=wrapped; +~~~ +~~~ +{"after": {"city": "washington dc", "creation_time": "2019-01-02T03:04:05", "current_location": "46227 Jeremy Haven Suite 92", "ext": {"brand": "Schwinn", "color": "red"}, "id": "298cc7a0-de6b-4659-ae57-eaa2de9d99c3", "owner_id": "beda1202-63f7-41d2-aa35-ee3a835679d1", "status": "in_use", "type": "bike"}, "key": ["washington dc", "298cc7a0-de6b-4659-ae57-eaa2de9d99c3"]} +~~~ + +#### `enriched` + +{{site.data.alerts.callout_info}} +{% include feature-phases/preview.md %} +{{site.data.alerts.end}} + +{% include_cached new-in.html version="v25.2" %} [`enriched`](#enriched-option) introduces additional metadata to the envelope, which is further configurable with the [`enriched_properties`](#enriched-properties-option) option. This envelope option is supported for [Kafka sinks]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka), [webhook]({% link {{ page.version.version }}/changefeed-sinks.md %}#webhook-sink), [Google Cloud Pub/Sub]({% link {{ page.version.version }}/changefeed-sinks.md %}#google-cloud-pub-sub), and sinkless changefeeds. + +To add the operation type and the processing timestamp of the change event to the envelope, use `envelope=enriched`: + +{% include_cached copy-clipboard.html %} +~~~sql +CREATE CHANGEFEED FOR TABLE products INTO 'external://kafka' WITH envelope=enriched; +~~~ +~~~ +{"after": {"category": "Electronics", "created_at": "2025-04-24T14:59:28.96273", "description": "Ergonomic wireless mouse with USB receiver", "id": "cb1a3e43-dccf-422f-a27d-ea027c233682", "in_stock": true, "name": "Wireless Mouse", "price": 29.99}, "op": "c", "ts_ns": 1745525261013511000} +~~~ + +To order messages when using `envelope=enriched`, you must also use `enriched_properties='source'` with the `updated` option in order to include the [`"ts_hlc"` and `"ts_ns"`](#source) commit timestamps in the `"source"` field: + +{% include_cached copy-clipboard.html %} +~~~ sql +CREATE CHANGEFEED FOR TABLE products INTO 'kafka://localhost:9092' WITH envelope=enriched,enriched_properties=source, updated; +~~~ +~~~json +{ + "after": { + "category": "Home & Kitchen", + "created_at": "2025-04-30T20:02:35.40316", + "description": "Adjustable LED desk lamp with touch controls", + "id": "4b36388a-f7e6-4b95-9f78-3aee9e2060d6", + "in_stock": true, + "name": "LED Desk Lamp", + "price": 22.30 + }, + "op": "c", + "source": { + "changefeed_sink": "kafka", + "cluster_id": "585e6512-ea54-490a-8f1d-50c8d182a2e6", + "cluster_name": "", + "database_name": "test", + "db_version": "v25.2.0-beta.2", + "job_id": "1068153948173991937", + "node_id": "1", + "node_name": "localhost", + "origin": "cockroachdb", + "primary_keys": [ + "id" + ], + "schema_name": "public", + "source_node_locality": "", + "table_name": "products", + "ts_hlc": "1746045115619002000.0000000000", + "ts_ns": 1746045115619002000 + }, + "ts_ns": 1746045115679811000 +} +~~~ + +To add the origin of the change data and the schema of the payload, use the `envelope=enriched` and `enriched_properties='source,schema'`: + +{% include_cached copy-clipboard.html %} +~~~sql +CREATE CHANGEFEED FOR TABLE products INTO 'external://kafka' WITH envelope=enriched, enriched_properties='source,schema'; +~~~ +~~~json +{ + "payload": { + "after": { + "category": "Electronics", + "created_at": "2025-04-24T14:59:28.96273", + "description": "Ergonomic wireless mouse with USB receiver", + "id": "cb1a3e43-dccf-422f-a27d-ea027c233682", + "in_stock": true, + "name": "Wireless Mouse", + "price": 29.99 + }, + "op": "c", + "source": { + "changefeed_sink": "kafka", + "cluster_id": "38269e9c-9823-4568-875e-d867e12156f2", + "cluster_name": "", + "database_name": "test", + "db_version": "v25.2.0-beta.2", + "job_id": "1066452286961614849", + "node_id": "2", + "node_name": "localhost", + "origin": "cockroachdb", + "primary_keys": [ + "id" + ], + "schema_name": "public", + "source_node_locality": "", + "table_name": "products" + }, + "ts_ns": 1745525809913428000 + }, + "schema": { + "fields": [ + { + "field": "after", + "fields": [ + { "field": "id", "optional": false, "type": "string" }, + { "field": "name", "optional": false, "type": "string" }, + { "field": "description", "optional": true, "type": "string" }, + { + "field": "price", + "name": "decimal", + "optional": false, + "parameters": { + "precision": "10", + "scale": "2" + }, + "type": "float64" + }, + { "field": "in_stock", "optional": true, "type": "boolean" }, + { "field": "category", "optional": true, "type": "string" }, + { + "field": "created_at", + "name": "timestamp", + "optional": true, + "type": "string" + } + ], + "name": "products.after.value", + "optional": false, + "type": "struct" + }, + { + "field": "source", + "fields": [ + { "field": "mvcc_timestamp", "optional": true, "type": "string" }, + { "field": "ts_ns", "optional": true, "type": "int64" }, + { "field": "ts_hlc", "optional": true, "type": "string" }, + { "field": "table_name", "optional": false, "type": "string" }, + { "field": "origin", "optional": false, "type": "string" }, + { "field": "cluster_id", "optional": false, "type": "string" }, + { "field": "node_id", "optional": false, "type": "string" }, + { "field": "changefeed_sink", "optional": false, "type": "string" }, + { "field": "schema_name", "optional": false, "type": "string" }, + { "field": "node_name", "optional": false, "type": "string" }, + { "field": "database_name", "optional": false, "type": "string" }, + { "field": "source_node_locality", "optional": false, "type": "string" }, + { + "field": "primary_keys", + "items": { + "optional": false, + "type": "string" + }, + "optional": false, + "type": "array" + }, + { "field": "job_id", "optional": false, "type": "string" }, + { "field": "db_version", "optional": false, "type": "string" }, + { "field": "cluster_name", "optional": false, "type": "string" } + ], + "name": "cockroachdb.source", + "optional": true, + "type": "struct" + }, + { "field": "ts_ns", "optional": false, "type": "int64" }, + { "field": "op", "optional": false, "type": "string" } + ], + "name": "cockroachdb.envelope", + "optional": false, + "type": "struct" + } +} +~~~ + +#### `bare` + +`bare` removes the `after` key from the changefeed message and stores any metadata in a `crdb` field. When used with [`avro`]({% link {{ page.version.version }}/changefeed-messages.md %}#avro) format, `record` will replace the `after` key. + +Cloud storage sink: + +~~~sql +CREATE CHANGEFEED FOR TABLE vehicles INTO 'external://cloud' WITH envelope=bare; +~~~ +~~~ +{"__crdb__": {"key": ["washington dc", "cd48e501-e86d-4019-9923-2fc9a964b264"]}, "city": "washington dc", "creation_time": "2019-01-02T03:04:05", "current_location": "87247 Diane Park", "ext": {"brand": "Fuji", "color": "yellow"}, "id": "cd48e501-e86d-4019-9923-2fc9a964b264", "owner_id": "a616ce61-ade4-43d2-9aab-0e3b24a9aa9a", "status": "available", "type": "bike"} +~~~ + +{% include {{ page.version.version }}/cdc/bare-envelope-cdc-queries.md %} + +In CDC queries: + +A changefeed containing a `SELECT` clause without any additional options: + +~~~sql +CREATE CHANGEFEED INTO 'external://kafka' AS SELECT city, type FROM movr.vehicles; +~~~ +~~~ +{"city": "los angeles", "type": "skateboard"} +~~~ + +A changefeed containing a `SELECT` clause with the [`topic_in_value`]({% link {{ page.version.version }}/create-changefeed.md %}#topic-in-value) option specified: + +~~~sql +CREATE CHANGEFEED INTO 'external://kafka' WITH topic_in_value AS SELECT city, type FROM movr.vehicles; +~~~ +~~~ +{"__crdb__": {"topic": "vehicles"}, "city": "los angeles", "type": "skateboard"} +~~~ + +#### `key_only` + +`key_only` emits only the key and no value, which is faster if you only need to know the key of the changed row. This envelope option is only supported for [Kafka sinks]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka) or sinkless changefeeds. + +Kafka sink: + +~~~sql +CREATE CHANGEFEED FOR TABLE users INTO 'external://kafka' WITH envelope=key_only; +~~~ +~~~ +["boston", "22222222-2222-4200-8000-000000000002"] +~~~ + +{{site.data.alerts.callout_info}} +It is necessary to set up a [Kafka consumer](https://docs.confluent.io/platform/current/clients/consumer.html) to display the key because the key is part of the metadata in Kafka messages, rather than in its own field. When you start a Kafka consumer, you can use `--property print.key=true` to have the key print in the changefeed message. +{{site.data.alerts.end}} + +Sinkless changefeeds: + +~~~sql +CREATE CHANGEFEED FOR TABLE users WITH envelope=key_only; +~~~ +~~~ +{"key":"[\"seattle\", \"fff726cc-13b3-475f-ad92-a21cafee5d3f\"]","table":"users","value":""} +~~~ + +#### `row` + +`row` emits the row without any additional metadata fields in the message. This envelope option is only supported for [Kafka sinks]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka) or sinkless changefeeds. `row` does not support [`avro`]({% link {{ page.version.version }}/changefeed-messages.md %}#avro) format—if you are using `avro`, refer to the [`bare`](#bare) envelope option. + +Kafka sink: + +~~~sql +CREATE CHANGEFEED FOR TABLE vehicles INTO 'external://kafka' WITH envelope=row; +~~~ +~~~ +{"city": "washington dc", "creation_time": "2019-01-02T03:04:05", "current_location": "85551 Moore Mountains Apt. 47", "ext": {"color": "red"}, "id": "d3b37607-1e9f-4e25-b772-efb9374b08e3", "owner_id": "4f26b516-f13f-4136-83e1-2ea1ae151c20", "status": "available", "type": "skateboard"} +~~~ + +## Field reference + +CockroachDB provides multiple changefeed envelopes, each supported by different [sinks]({% link {{ page.version.version }}/changefeed-sinks.md %}) and [use cases](#use-cases). For the [changefeed options](#option-reference) to enable each field, refer to the following descriptions: + +### `"payload"` + +The top-level `"payload"` field is present in envelopes for changefeeds emitting to a [webhook]({% link {{ page.version.version }}/changefeed-sinks.md %}#webhook-sink) sink and contains all of the change event data. The messages emit as a batch with a `"payload"` wrapper around the change event data and a `"length"` field for the number of messages in the batch: + +~~~ +{"payload": [{"after" : {"a" : 1, "b" : "a"}, "key": [1], "topic": "foo"}, {"after": {"a": 1, "b": "b"}, "key": [1], "topic": "foo" }], "length":2} +~~~ + +#### `"payload"` with enriched envelope + +{% include_cached new-in.html version="v25.2" %} When the [`envelope=enriched, enriched_properties=schema`](#enriched-properties-option) envelope options are specified, the envelope will include a `payload` field that wraps the entire message except for the [`"schema"`](#schema) fields. + +As a result, when you emit messages to a webhook sink with `envelope=enriched, enriched_properties=schema`, you will receive messages with two `"payload"` fields, similar to the following structure: + +~~~json +{ + "payload": [ + { + "payload": { + "after": { + "category": "Home & Kitchen", + "created_at": "2025-04-30T20:02:35.40316", + "description": "Adjustable LED desk lamp with touch controls", + "id": "4b36388a-f7e6-4b95-9f78-3aee9e2060d6", + "in_stock": true, + "name": "LED Desk Lamp", + "price": 22.30 + }, + "before": null, + "key": { + "id": "4b36388a-f7e6-4b95-9f78-3aee9e2060d6" + }, + "op": "c", + "source": { + // ... Metadata source fields + }, + "ts_ns": 1747245104106528000 + }, + "schema": { + "fields": [ +// ... +~~~ + +#### `"after"` + +The state of the row after the change. This contains the column names and values after an [`INSERT`]({% link {{ page.version.version }}/insert.md %}) or [`UPDATE`]({% link {{ page.version.version }}/update.md %}). For [`DELETE`]({% link {{ page.version.version }}/delete.md %}) operations, `"after"` will be `NULL`. In the default [`wrapped`](#wrapped) envelope, every message for an `INSERT` or `UPDATE` has an `"after"` field with the new data. In a [`row`](#row) envelope, the whole message is the state of the row without the `"after"` wrapper, and in the [`key_only`](#key_only) envelope there is no `"after"` field because only the key is sent. + +#### `"before"` + +The state of the row before the change. This field appears only if the [`diff`](#diff-option) option is enabled on a `wrapped` (or `enriched`) envelope. For updates, `"before"` is the previous values of the row before the update. For deletes, `"before"` is the last state of the row. For inserts, `"before"` will be `NULL` (the row had no prior state). This field is useful for [auditing changes](#audit-changes-in-data) or computing differences. (Not applicable to envelopes like [`row`](#row) or [`key_only`](#key_only), which do not support the `"before"` or `"after"` fields.) + +#### `"key"` + +- For non-`enriched` envelopes: An array composed of the row's `PRIMARY KEY` field(s) (e.g., `[1]` for JSON or `{"id":{"long":1}}` for Avro). For Kafka sinks, the primary key array field is by default off. If you would like messages emitted to Kafka sinks to contain a primary key array, you can use the [`key_in_value`]({% link {{ page.version.version }}/create-changefeed.md %}#key-in-value) option. +- For `enriched` envelopes: The primary key of the row as an object, e.g, `{"id": 1}`. + +#### `"updated"` + +A timestamp indicating when the change was committed. This field appears if the [`updated`](#updated-option) option is enabled. It is formatted as a string timestamp. The `updated` timestamp corresponds to the transaction commit time for that change. If the changefeed was started with a [`cursor`]({% link {{ page.version.version }}/create-changefeed.md %}#cursor) (at a specific past timestamp), the updated times will align with the MVCC timestamps of each row version. + +#### `"op"` + +{% include_cached new-in.html version="v25.2" %} The type of change operation. `"c"` for `INSERT`, `"u"` for `UPDATE`, `"d"` for `DELETE`. This field emits if [`envelope=enriched`](#enriched-option) is enabled. + +If you're using [CDC queries]({% link {{ page.version.version }}/cdc-queries.md %}) to [filter only for the type of change operation]({% link {{ page.version.version }}/cdc-queries.md %}#cdc-query-function-support), we recommend using the `envelope=enriched` option instead for this metadata. + +#### `"source"` + +{% include_cached new-in.html version="v25.2" %} Metadata about the source of the change event. This is included when using [`envelope=enriched`](#enriched-option) with [`enriched_properties='source'`](#enriched-properties-option) (or `'source,schema'`). The `"source"` field includes the following fields about the cluster running the changefeed: + +- `"changefeed_sink"`: The [sink]({% link {{ page.version.version }}/changefeed-sinks.md %}) type receiving the changefeed (e.g., `"kafka"`, `"sinkless buffer"`). +- `"cluster_id"`: The [unique ID]({% link {{ page.version.version }}/cockroach-start.md %}#standard-output). +- `"cluster_name"`: The name, [if configured]({% link {{ page.version.version }}/cockroach-start.md %}#flags-cluster-name). +- `"database_name"`: The name of the database. +- `"db_version"`: The CockroachDB version. +- `"job_id"`: The changefeed's [job]({% link {{ page.version.version }}/show-jobs.md %}) ID. +- `"mvcc_timestamp"`: The [MVCC timestamp]({% link {{ page.version.version }}/architecture/storage-layer.md %}#mvcc) of the change, as a string. This is CockroachDB’s timestamp for the version (a decimal number representing logical time). It is included if the [`mvcc_timestamp`](#mvcc-timestamp-option) option is set. Unlike [`"updated"`](#updated), which might be omitted for initial scans, `"mvcc_timestamp"` is always present on each message when enabled, including backfill events. This field is mainly used for low-level debugging or when exact internal timestamps are needed. (`"mvcc_timestamp"` will be a top-level field when the changefeed is not using the [`enriched`](#enriched) envelope.) +- `"node_id"`: The ID of the node that emitted the changefeed message. +- `"node_name"`: The name of the node that emitted the changefeed message. +- `"origin"`: The identifier for the changefeed's origin, always `cockroachdb`. +- `"primary_keys"`: An array of [primary key]({% link {{ page.version.version }}/primary-key.md %}) column name(s) for the changed row. +- `"schema_name"`: The schema name of the changed table. +- `"source_node_locality"`: The [locality]({% link {{ page.version.version }}/cockroach-start.md %}#standard-output) of the node that emitted the changefeed messages, e.g., `"cloud=gce,region=us-east1,zone=us-east1-b"`. +- `"table_name"`: The name of the table that changed. +- `"ts_hlc"`: A timestamp indicating when the change was committed. Included instead of `"updated"` when the [`enriched_properties=source`](#enriched-properties-option) and [`updated`](#updated-option) options are included at changefeed creation. +- `"ts_ns"`: A timestamp indicating when the change was committed. Included instead of `"updated"` when the [`enriched_properties=source`](#enriched-properties-option) and [`updated`](#updated-option) options are included at changefeed creation. + +#### `"ts_ns"` + +{% include_cached new-in.html version="v25.2" %} The processing time of the event by the changefeed job. This field emits if [`envelope=enriched`](#enriched-option) is enabled. + +When you're comparing changes for ordering, it is important to ignore this top-level [`ts_ns` field]. Instead, if you require timestamps to order messages based on the change event's commit time, then you must specify `envelope=enriched, enriched_properties=source, updated` when you create the changefeed, which will include `"ts_hlc"` and `"ts_ns"` in the [`"source"`](#source) field. + +#### `table`: + +The name of the table that generated the change. This field appears in certain contexts: + +- In sinkless changefeeds (those created with [`EXPERIMENTAL CHANGEFEED FOR`]({% link {{ page.version.version }}/changefeed-for.md %}) or `CREATE CHANGEFEED ... WITH sinkless`), the output includes a `"table"` field for each row change, because a single sinkless feed could cover multiple tables. +- In the [`enriched`](#enriched) envelope, the [fully qualified table name]({% link {{ page.version.version }}/sql-name-resolution.md %}) is typically part of the [`"source"`](#source) field (as `"database_name"`, `"schema_name"`, `"table_name"` sub-fields), rather than a separate top-level `"table"` field. + +{{site.data.alerts.callout_info}} +When a changefeed targets a table with multiple column families, the family name is appended to the table name as part of the topic. Refer to [Tables with columns families in changefeeds]({% link {{ page.version.version }}/changefeeds-on-tables-with-column-families.md %}#message-format) for guidance. +{{site.data.alerts.end}} + +#### `__crdb__` + +A field used by the [`bare`](#bare) envelope to carry CockroachDB metadata that would otherwise be top level. When `envelope=bare`, the message’s top level is the row data, so CockroachDB inserts any needed metadata (primary key, topic, timestamps, etc.) under a nested `"__crdb__"` object. For example: + +~~~json +{ +"__crdb__": {"key": [101]}, +"id": 101, +"customer_id": 1, +"total": 50.00, +"status": "new" +} +~~~ + +Here `"__crdb__": {"key": [101]}` holds the primary key for the row, while the rest of the object are the row’s columns at the top level. Other metadata like [`"updated"`](#updated) timestamps or [`resolved`](#resolved-option) timestamps would also appear inside `__crdb__`, if present. This field is specific to the `bare` envelope or other cases (like custom [CDC queries]({% link {{ page.version.version }}/cdc-queries.md %}) with `SELECT`) where metadata needs to be attached without interfering with the selected row data. + +### `"schema"` + +{% include_cached new-in.html version="v25.2" %} The schema and type of each payload field. This is included when using [`envelope=enriched`](#enriched-option) with [`enriched_properties='schema'`](#enriched-properties-option). The `"schema"` field provides information needed to interpret the data. The following are present for each schema present in the envelope, depending on the configured options: + +- `"field"`: The name of field. +- `"type"`: The type of the field. If `"type"` is an `array`, then the `"items"` field defines the data type of the array elements. +- `"optional"`: This is a boolean field that defines whether a field may be absent in the `"payload"` section of the envelope depending on the configuration. +- `"name"`: The name of the described schema. + +## See more + +- For more details on the file naming format for cloud storage sinks, refer to [changefeed files]({% link {{ page.version.version }}/create-changefeed.md %}#files). \ No newline at end of file diff --git a/src/current/v25.2/changefeed-messages.md b/src/current/v25.2/changefeed-messages.md index 7d96e356510..d55d4310076 100644 --- a/src/current/v25.2/changefeed-messages.md +++ b/src/current/v25.2/changefeed-messages.md @@ -6,12 +6,10 @@ docs_area: stream_data key: use-changefeeds.html --- -Changefeeds generate and emit messages (on a per-key basis) as changes happen to the rows in watched tables. CockroachDB changefeeds have an at-least-once delivery guarantee as well as message ordering guarantees. You can also configure the format of changefeed messages with different [options]({% link {{ page.version.version }}/create-changefeed.md %}#options) (e.g., `format=avro`). +Changefeeds generate and emit _messages_ (on a per-key basis) to sinks as _change events_ (`INSERT`, `UPDATE`, `DELETE`) happen to the rows in [watched tables]({% link {{ page.version.version }}/change-data-capture-overview.md %}#watched-table). CockroachDB changefeeds have an at-least-once delivery guarantee as well as message ordering guarantees. -This page describes the format and behavior of changefeed messages. You will find the following information on this page: +This page has reference information for the following changefeed message topics: -- [Responses](#responses): The general format of changefeed messages. -- [Message envelopes](#message-envelopes): The structure of the changefeed message. - [Ordering and delivery guarantees](#ordering-and-delivery-guarantees): CockroachDB's guarantees for a changefeed's message ordering and delivery. - [Delete messages](#delete-messages): The format of messages when a row is deleted. - [Resolved messages](#resolved-messages): The resolved timestamp option and how to configure it. @@ -20,178 +18,15 @@ This page describes the format and behavior of changefeed messages. You will fin - [Filtering changefeed messages](#filtering-changefeed-messages): The settings and syntax to prevent and filter the messages that changefeeds emit. - [Message formats](#message-formats): The limitations and type mapping when creating a changefeed with different message formats. -{{site.data.alerts.callout_info}} -{% include {{page.version.version}}/cdc/types-udt-composite-general.md %} -{{site.data.alerts.end}} +To enable various use cases and sink support, you can use changefeed options to configure the message: -## Responses - -By default, changefeed messages emitted to a [sink]({% link {{ page.version.version }}/changefeed-sinks.md %}) contain keys and values of the watched table rows that have changed. The message will contain the following fields depending on the type of emitted change and the [options]({% link {{ page.version.version }}/create-changefeed.md %}#options) you specified to create the changefeed: - -- **Key**: An array composed of the row's `PRIMARY KEY` field(s) (e.g., `[1]` for JSON or `{"id":{"long":1}}` for Avro). -- **Value**: - - One of four possible top-level fields: - - `after`, which contains the state of the row after the update (or `null` for `DELETE`s). - - `updated`, which contains the [updated]({% link {{ page.version.version }}/create-changefeed.md %}#updated) timestamp. - - `resolved`, which is emitted for records representing [resolved](#resolved-messages) timestamps. These records do not include an `after` value since they only function as checkpoints. - - `before`, which contains the state of the row before an update. Changefeeds must use the [`diff` option]({% link {{ page.version.version }}/create-changefeed.md %}#diff) with the default [`wrapped` envelope](#wrapped) to emit the `before` field. When a row did not previously have any data, the `before` field will emit `null`. - - For [`INSERT`]({% link {{ page.version.version }}/insert.md %}) and [`UPDATE`]({% link {{ page.version.version }}/update.md %}), the current state of the row inserted or updated. - - For [`DELETE`]({% link {{ page.version.version }}/delete.md %}), `null`. +- [_Envelope_]({% link {{ page.version.version }}/changefeed-message-envelopes.md %}): the structure of each message. Include or exclude row data, payload schema definitions, and source metadata. +- [_Format_](#message-formats): the available formats, such as JSON, CSV, and Avro. {{site.data.alerts.callout_info}} -If you use the `envelope` option to alter the changefeed message fields, your messages may not contain one or more of the values noted in the preceding list. As an example, when emitting to a Kafka sink, you can limit messages to just the changed key value by using the `envelope` option set to [`key_only`](#key_only). For more detail, refer to [Message envelopes](#message-envelopes). -{{site.data.alerts.end}} - -For example, changefeeds emitting to a sink will have the default message format: - -Statement | Response ------------------------------------------------+----------------------------------------------------------------------- -`INSERT INTO office_dogs VALUES (1, 'Petee');` | JSON: `[1] {"after": {"id": 1, "name": "Petee"}}`
Avro: `{"id":{"long":1}} {"after":{"office_dogs":{"id":{"long":1},"name":{"string":"Petee"}}}}` -`DELETE FROM office_dogs WHERE name = 'Petee'` | JSON: `[1] {"after": null}`
Avro: `{"id":{"long":1}} {"after":null}` - -When a changefeed targets a table with multiple column families, the family name is appended to the table name as part of the topic. Refer to [Tables with columns families in changefeeds]({% link {{ page.version.version }}/changefeeds-on-tables-with-column-families.md %}#message-format) for guidance. - -For [webhook sinks]({% link {{ page.version.version }}/changefeed-sinks.md %}#webhook-sink), the response format arrives as a batch of changefeed messages with a `payload` and `length`. - -~~~ -{"payload": [{"after" : {"a" : 1, "b" : "a"}, "key": [1], "topic": "foo"}, {"after": {"a": 1, "b": "b"}, "key": [1], "topic": "foo" }], "length":2} -~~~ - -[Webhook message batching]({% link {{ page.version.version }}/changefeed-sinks.md %}#webhook-sink-configuration) is subject to the same key [ordering guarantee](#ordering-and-delivery-guarantees) as other sinks. Therefore, as messages are batched, you will not receive two batches at the same time with overlapping keys. You may receive a single batch containing multiple messages about one key, because ordering is maintained for a single key within its batch. - -Refer to [changefeed files]({% link {{ page.version.version }}/create-changefeed.md %}#files) for more detail on the file naming format for {{ site.data.products.enterprise }} changefeeds. - -## Message envelopes - -The _envelope_ defines the structure of a changefeed message. You can use the [`envelope`]({% link {{ page.version.version }}/create-changefeed.md %}#envelope) option to manipulate the changefeed envelope. The values that the `envelope` option accepts are compatible with different [changefeed sinks]({% link {{ page.version.version }}/changefeed-sinks.md %}), and the structure of the message will vary depending on the sink. - -{{site.data.alerts.callout_info}} -Changefeeds created with [`EXPERIMENTAL CHANGEFEED FOR`]({% link {{ page.version.version }}/changefeed-for.md %}) or [`CREATE CHANGEFEED`]({% link {{ page.version.version }}/create-changefeed.md %}) with no sink specified (sinkless changefeeds) produce messages without the envelope metadata fields of changefeeds emitting to sinks. +{% include {{page.version.version}}/cdc/types-udt-composite-general.md %} {{site.data.alerts.end}} -The following sections provide examples of changefeed messages that are emitted when you specify each of the supported `envelope` options. Other [changefeed options]({% link {{ page.version.version }}/create-changefeed.md %}#options) can affect the message envelope and what messages are emitted. Therefore, the examples are a guide for what you can expect when only the `envelope` option is specified. - -### `wrapped` - -`wrapped` is the default envelope structure for changefeed messages. This envelope contains an array of the primary key (or the key as part of the message metadata), a top-level field for the type of message, and the current state of the row (or `null` for [deleted rows](#delete-messages)). - -The message envelope contains a primary key array when your changefeed is emitting to a sink that does not have a message key as part of its protocol, (e.g., cloud storage, webhook sinks, or Google Pub/Sub). By default, messages emitted to Kafka sinks do not have the primary key array, because the key is part of the message metadata. If you would like messages emitted to Kafka sinks to contain a primary key array, you can use the [`key_in_value`]({% link {{ page.version.version }}/create-changefeed.md %}#key-in-value) option. Refer to the following message outputs for examples of this. - -- Cloud storage sink: - - ~~~sql - CREATE CHANGEFEED FOR TABLE vehicles INTO 'external://cloud'; - ~~~ - ~~~ - {"after": {"city": "seattle", "creation_time": "2019-01-02T03:04:05", "current_location": "86359 Jeffrey Ranch", "ext": {"color": "yellow"}, "id": "68ee1f95-3137-48e2-8ce3-34ac2d18c7c8", "owner_id": "570a3d70-a3d7-4c00-8000-000000000011", "status": "in_use", "type": "scooter"}, "key": ["seattle", "68ee1f95-3137-48e2-8ce3-34ac2d18c7c8"]} - ~~~ - -- Kafka sink: - - - Default when `envelope=wrapped` or `envelope` is not specified: - - ~~~sql - CREATE CHANGEFEED FOR TABLE vehicles INTO 'external://kafka'; - ~~~ - ~~~ - {"after": {"city": "washington dc", "creation_time": "2019-01-02T03:04:05", "current_location": "24315 Elizabeth Mountains", "ext": {"color": "yellow"}, "id": "dadc1c0b-30f0-4c8b-bd16-046c8612bbea", "owner_id": "034075b6-5380-4996-a267-5a129781f4d3", "status": "in_use", "type": "scooter"}} - ~~~ - - - Kafka sink message with `key_in_value` provided: - - ~~~sql - CREATE CHANGEFEED FOR TABLE vehicles INTO 'external://kafka' WITH key_in_value, envelope=wrapped; - ~~~ - ~~~ - {"after": {"city": "washington dc", "creation_time": "2019-01-02T03:04:05", "current_location": "46227 Jeremy Haven Suite 92", "ext": {"brand": "Schwinn", "color": "red"}, "id": "298cc7a0-de6b-4659-ae57-eaa2de9d99c3", "owner_id": "beda1202-63f7-41d2-aa35-ee3a835679d1", "status": "in_use", "type": "bike"}, "key": ["washington dc", "298cc7a0-de6b-4659-ae57-eaa2de9d99c3"]} - ~~~ - -#### `wrapped` and `diff` - -To include a `before` field in the changefeed message that contains the state of a row before an update in the changefeed message, use the `diff` option with `wrapped`: - -~~~sql -CREATE CHANGEFEED FOR TABLE rides INTO 'external://kafka' WITH diff, envelope=wrapped; -~~~ - -~~~ -{"after": {"city": "seattle", "end_address": null, "end_time": null, "id": "f6c02fe0-a4e0-476d-a3b7-91934d15dce2", "revenue": 25.00, "rider_id": "14067022-6e9b-427b-bd74-5ef48e93da1f", "start_address": "2 Michael Field", "start_time": "2023-06-02T15:14:20.790155", "vehicle_city": "seattle", "vehicle_id": "55555555-5555-4400-8000-000000000005"}, "before": {"city": "seattle", "end_address": null, "end_time": null, "id": "f6c02fe0-a4e0-476d-a3b7-91934d15dce2", "revenue": 25.00, "rider_id": "14067022-6e9b-427b-bd74-5ef48e93da1f", "start_address": "5 Michael Field", "start_time": "2023-06-02T15:14:20.790155", "vehicle_city": "seattle", "vehicle_id": "55555555-5555-4400-8000-000000000005"}, "key": ["seattle", "f6c02fe0-a4e0-476d-a3b7-91934d15dce2"]} -~~~ - -### `bare` - -`bare` removes the `after` key from the changefeed message and stores any metadata in a `crdb` field. When used with [`avro`](#avro) format, `record` will replace the `after` key. - -- Cloud storage sink: - - ~~~sql - CREATE CHANGEFEED FOR TABLE vehicles INTO 'external://cloud' WITH envelope=bare; - ~~~ - ~~~ - {"__crdb__": {"key": ["washington dc", "cd48e501-e86d-4019-9923-2fc9a964b264"]}, "city": "washington dc", "creation_time": "2019-01-02T03:04:05", "current_location": "87247 Diane Park", "ext": {"brand": "Fuji", "color": "yellow"}, "id": "cd48e501-e86d-4019-9923-2fc9a964b264", "owner_id": "a616ce61-ade4-43d2-9aab-0e3b24a9aa9a", "status": "available", "type": "bike"} - ~~~ - -{% include {{ page.version.version }}/cdc/bare-envelope-cdc-queries.md %} - -- In CDC queries: - - - A changefeed containing a `SELECT` clause without any additional options: - - ~~~sql - CREATE CHANGEFEED INTO 'external://kafka' AS SELECT city, type FROM movr.vehicles; - ~~~ - ~~~ - {"city": "los angeles", "type": "skateboard"} - ~~~ - - - A changefeed containing a `SELECT` clause with the [`topic_in_value`]({% link {{ page.version.version }}/create-changefeed.md %}#topic-in-value) option specified: - - ~~~sql - CREATE CHANGEFEED INTO 'external://kafka' WITH topic_in_value AS SELECT city, type FROM movr.vehicles; - ~~~ - ~~~ - {"__crdb__": {"topic": "vehicles"}, "city": "los angeles", "type": "skateboard"} - ~~~ - -### `key_only` - -`key_only` emits only the key and no value, which is faster if you only need to know the key of the changed row. This envelope option is only supported for [Kafka sinks]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka) or sinkless changefeeds. - -- Kafka sink: - - ~~~sql - CREATE CHANGEFEED FOR TABLE users INTO 'external://kafka' WITH envelope=key_only; - ~~~ - ~~~ - ["boston", "22222222-2222-4200-8000-000000000002"] - ~~~ - - {{site.data.alerts.callout_info}} - It is necessary to set up a [Kafka consumer](https://docs.confluent.io/platform/current/clients/consumer.html) to display the key because the key is part of the metadata in Kafka messages, rather than in its own field. When you start a Kafka consumer, you can use `--property print.key=true` to have the key print in the changefeed message. - {{site.data.alerts.end}} - -- Sinkless changefeeds: - - ~~~sql - CREATE CHANGEFEED FOR TABLE users WITH envelope=key_only; - ~~~ - ~~~ - {"key":"[\"seattle\", \"fff726cc-13b3-475f-ad92-a21cafee5d3f\"]","table":"users","value":""} - ~~~ - -### `row` - -`row` emits the row without any additional metadata fields in the message. This envelope option is only supported for [Kafka sinks]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka) or sinkless changefeeds. `row` does not support [`avro`](#avro) format—if you are using `avro`, refer to the [`bare`](#bare) envelope option. - -- Kafka sink: - - ~~~sql - CREATE CHANGEFEED FOR TABLE vehicles INTO 'external://kafka' WITH envelope=row; - ~~~ - ~~~ - {"city": "washington dc", "creation_time": "2019-01-02T03:04:05", "current_location": "85551 Moore Mountains Apt. 47", "ext": {"color": "red"}, "id": "d3b37607-1e9f-4e25-b772-efb9374b08e3", "owner_id": "4f26b516-f13f-4136-83e1-2ea1ae151c20", "status": "available", "type": "skateboard"} - ~~~ - ## Ordering and delivery guarantees Changefeeds provide the following guarantees for message delivery to changefeed sinks: @@ -272,6 +107,10 @@ As an example, you run the following sequence of SQL statements to create a chan Depending on the workload, you can use resolved timestamp notifications on every Kafka partition to provide strong ordering and global consistency guarantees by buffering records in between timestamp closures. Use the `resolved` timestamp to see every row that changed at a certain time. {{site.data.alerts.end}} +{{site.data.alerts.callout_info}} +[Webhook message batching]({% link {{ page.version.version }}/changefeed-sinks.md %}#webhook-sink-configuration) is subject to the same key [ordering guarantee](#ordering-and-delivery-guarantees) as other sinks. Therefore, as messages are batched, you will not receive two batches at the same time with overlapping keys. You may receive a single batch containing multiple messages about one key, because ordering is maintained for a single key within its batch. +{{site.data.alerts.end}} + #### Define a key column Typically, changefeeds that emit to Kafka sinks shard rows between Kafka partitions using the row's primary key, which is hashed. The resulting hash remains the same and ensures a row will always emit to the same Kafka partition. @@ -326,7 +165,9 @@ In this example, with duplicates removed, an individual row is emitted in the sa The first time a message is delivered, it will be in the correct timestamp order, which follows the [per-key ordering guarantee](#per-key-ordering). However, when there are [duplicate messages](#duplicate-messages), the changefeed may **not** re-emit every row update. As a result, there may be gaps in a sequence of duplicate messages for a key. {{site.data.alerts.end}} -To compare two different rows for [happens-before](https://wikipedia.org/wiki/Happened-before), compare the `updated` timestamp. This works across anything in the same cluster (tables, nodes, etc.). +To compare two different rows for [happens-before](https://wikipedia.org/wiki/Happened-before), compare the `updated` timestamp. This works across anything in the same cluster (tables, nodes, etc.). + +When you use the [`enriched` envelope]({% link {{ page.version.version }}/changefeed-message-envelopes.md %}#enriched), if you require timestamps to order messages based on the change event's commit time, then you must specify `envelope=enriched, enriched_properties=source, updated` when you create the changefeed, which will include `"ts_hlc"` and `"ts_ns"` in the [`"source"`]({% link {{ page.version.version }}/changefeed-message-envelopes.md %}#source) field. (It is important to ignore the [`ts_ns` field]({% link {{ page.version.version }}/changefeed-message-envelopes.md %}#ts_ns) at the top level when you're comparing changes for ordering.) For more details on configuring envelope fields, refer to the [Changefeed Message Envelope]({% link {{ page.version.version }}/changefeed-message-envelopes.md %}) page. The complexity with timestamps is necessary because CockroachDB supports transactions that can affect any part of the cluster, and it is not possible to horizontally divide the transaction log into independent changefeeds. For more information about this, [read our blog post on CDC](https://www.cockroachlabs.com/blog/change-data-capture/). diff --git a/src/current/v25.2/changefeeds-in-multi-region-deployments.md b/src/current/v25.2/changefeeds-in-multi-region-deployments.md index 5d4cd08cfa0..13eb78fd8f6 100644 --- a/src/current/v25.2/changefeeds-in-multi-region-deployments.md +++ b/src/current/v25.2/changefeeds-in-multi-region-deployments.md @@ -80,7 +80,7 @@ If the [`schema_change_policy`]({% link {{ page.version.version }}/create-change . . . ~~~ -See the changefeed [responses]({% link {{ page.version.version }}/changefeed-messages.md %}#responses) section for more general information on the messages emitted from a changefeed. +See the changefeed [messages]({% link {{ page.version.version }}/changefeed-messages.md %}) page for more general information on the messages emitted from a changefeed. ## See also diff --git a/src/current/v25.2/changefeeds-on-tables-with-column-families.md b/src/current/v25.2/changefeeds-on-tables-with-column-families.md index 4a48b5c7f7d..ffc0a2e5a32 100644 --- a/src/current/v25.2/changefeeds-on-tables-with-column-families.md +++ b/src/current/v25.2/changefeeds-on-tables-with-column-families.md @@ -41,7 +41,7 @@ CREATE CHANGEFEED FOR TABLE tbl FAMILY f_1, TABLE tbl FAMILY f_2; ## Message format -The response will follow a typical [changefeed message format]({% link {{ page.version.version }}/changefeed-messages.md %}#responses), but with the family name appended to the table name with a `.`, in the format `table.family`: +The response will follow a typical [changefeed message format]({% link {{ page.version.version }}/changefeed-messages.md %}), but with the family name appended to the table name with a `.`, in the format `table.family`: ~~~ {"after":{"column":"value"},"key":[1],"topic":"table.family"} diff --git a/src/current/v25.2/create-changefeed.md b/src/current/v25.2/create-changefeed.md index 91bd7678e68..210626f5be7 100644 --- a/src/current/v25.2/create-changefeed.md +++ b/src/current/v25.2/create-changefeed.md @@ -114,10 +114,10 @@ Option | Value | Description `compression` | `gzip`, `zstd` | Compress changefeed data files written to:. For compression options when using a Kafka sink, see [Kafka sink configuration]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka-sink-configuration). `confluent_schema_registry` | Schema Registry address | The [Schema Registry](https://docs.confluent.io/current/schema-registry/docs/index.html#sr) address is required to use `avro`.

{% include {{ page.version.version }}/cdc/schema-registry-timeout.md %}

{% include {{ page.version.version }}/cdc/confluent-cloud-sr-url.md %}

{% include {{ page.version.version }}/cdc/schema-registry-metric.md %} `cursor` | [Timestamp]({% link {{ page.version.version }}/as-of-system-time.md %}#parameters) | Emit any changes after the given timestamp. `cursor` does not output the current state of the table first. When `cursor` is not specified, the changefeed starts by doing an initial scan of all the watched rows and emits the current value, then moves to emitting any changes that happen after the scan.

The changefeed will encounter an error if you specify a timestamp that is before the configured garbage collection window for the target table. (Refer to [`gc.ttlseconds`]({% link {{ page.version.version }}/configure-replication-zones.md %}#replication-zone-variables).) With default garbage collection settings, this means you cannot create a changefeed that starts more than [the-default-MVCC-garbage-collection-interval]({% link {{ page.version.version }}/configure-replication-zones.md %}#gc-ttlseconds) in the past.

You can use `cursor` to [start-a-new-changefeed-where-a-previous-changefeed-ended](#start-a-new-changefeed-where-another-ended).

Example: `cursor='1536242855577149065.0000000000'` -`diff` | N/A | Publish a [`before` field]({% link {{ page.version.version }}/changefeed-messages.md %}#wrapped-and-diff) with each message, which includes the value of the row before the update was applied. Changefeeds must use the `diff` option with the default [`wrapped` envelope](#envelope) to emit the `before` field. +`diff` | N/A | Publish a [`before` field]({% link {{ page.version.version }}/changefeed-message-envelopes.md %}) with each message, which includes the value of the row before the update was applied. Changefeeds must use the `diff` option with the default [`wrapped` envelope](#envelope) or the `enriched` envelope to emit the `before` field. `encode_json_value_null_as_object` | N/A | Emit JSON `NULL` values as `{"__crdb_json_null__": true}` to distinguish these values from SQL `NULL` values. Refer to the [Changefeed Messages]({% link {{ page.version.version }}/changefeed-messages.md %}#json) page for an example.

**Note:** When this option is enabled, if the changefeed encounters the literal value `{"__crdb_json_null__": true}` in JSON, it will have the same representation as a JSON `NULL` value and a warning will be printed to the [`DEV` logging channel]({% link {{ page.version.version }}/logging.md %}#dev). `end_time` | [Timestamp]({% link {{ page.version.version }}/as-of-system-time.md %}#parameters) | Indicate the timestamp up to which the changefeed will emit all events and then complete with a `successful` status. Provide a future timestamp to `end_time` in number of nanoseconds since the [Unix epoch](https://wikipedia.org/wiki/Unix_time). For example, `end_time="1655402400000000000"`. You cannot use `end_time` and [`initial_scan = 'only'`](#initial-scan) simultaneously. -`envelope` | `wrapped` / `bare` / `key_only` / `row` | `wrapped` the default envelope structure for changefeed messages containing an array of the primary key, a top-level field for the type of message, and the current state of the row (or `null` for deleted rows).

`bare` removes the `after` key from the changefeed message and stores any metadata in a `crdb` field. When used with `avro` format, `record` will replace the `after` key. **Note:** This is the default envelope format for [CDC-queries]({% link {{ page.version.version }}/cdc-queries.md %}). For an example, refer to [Filter-columns]({% link {{ page.version.version }}/cdc-queries.md %}#filter-columns).

`key_only` emits only the key and no value, which is faster if you only need to know the key of the changed row. This envelope option is only supported for [Kafka-sinks]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka) or sinkless changefeeds.

`row` emits the row without any additional metadata fields in the message. This envelope option is only supported in [Kafka-sinks]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka) or sinkless changefeeds. `row` does not support [`avro` format](#format).

Refer to [Responses]({% link {{ page.version.version }}/changefeed-messages.md %}#responses) for more detail on message format.

Default: `envelope=wrapped`. Default for [CDC-queries]({% link {{ page.version.version }}/cdc-queries.md %}): `envelope=bare`. +`envelope` | `wrapped` / `enriched` / `bare` / `key_only` / `row` | `wrapped` the default envelope structure for changefeed messages containing an array of the primary key, a top-level field for the type of message, and the current state of the row (or `null` for deleted rows).

Refer to the [Changefeed Message Envelopes]({% link {{ page.version.version }}/changefeed-message-envelopes.md %}) page for more detail on each envelope.

Default: `envelope=wrapped`. Default for [CDC-queries]({% link {{ page.version.version }}/cdc-queries.md %}): `envelope=bare`. `execution_locality` | Key-value pairs | Restricts the execution of a changefeed to nodes that match the defined locality filter requirements, e.g., `WITH execution_locality = 'region=us-west-1a,cloud=aws'`.

See [Run a changefeed job by locality]({% link {{ page.version.version }}/changefeeds-in-multi-region-deployments.md %}#run-a-changefeed-job-by-locality) for usage and reference detail. `format` | `json` / `avro` / `csv` / `parquet` | Format of the emitted message.

`avro`: For mappings of CockroachDB types to Avro types, [refer-to-the-table]({% link {{ page.version.version }}/changefeed-messages.md %}#avro-types) and detail on [Avro-limitations]({% link {{ page.version.version }}/changefeed-messages.md %}#avro-limitations). **Note:** [`confluent_schema_registry`](#confluent-schema-registry) is required with `format=avro`.

`csv`: You cannot combine `format=csv` with the [`diff`](#diff) or [`resolved`](#resolved) options. Changefeeds use the same CSV format as the [`EXPORT`](export.html) statement. Refer to [Export-data-with-changefeeds]({% link {{ page.version.version }}/export-data-with-changefeeds.md %}) for details using these options to create a changefeed as an alternative to `EXPORT`. **Note:** [`initial_scan = 'only'`](#initial-scan) is required with `format=csv`.

`parquet`: Cloud storage is the only supported sink. The [`topic_in_value`](#topic-in-value) option is not compatible with `parquet` format.

Default: `format=json`. `full_table_name` | N/A | Use fully qualified table name in topics, subjects, schemas, and record output instead of the default table name. This can prevent unintended behavior when the same table name is present in multiple databases.

**Note:** This option cannot modify existing table names used as topics, subjects, etc., as part of an [`ALTER CHANGEFEED`]({% link {{ page.version.version }}/alter-changefeed.md %}) statement. To modify a topic, subject, etc., to use a fully qualified table name, create a new changefeed with this option.

Example: `CREATE CHANGEFEED FOR foo... WITH full_table_name` will create the topic name `defaultdb.public.foo` instead of `foo`. diff --git a/src/current/v25.2/how-does-an-enterprise-changefeed-work.md b/src/current/v25.2/how-does-an-enterprise-changefeed-work.md index 566f6a15e48..738c570902a 100644 --- a/src/current/v25.2/how-does-an-enterprise-changefeed-work.md +++ b/src/current/v25.2/how-does-an-enterprise-changefeed-work.md @@ -13,7 +13,7 @@ Each node uses its _aggregator processors_ to send back checkpoint progress to t With [`resolved`]({% link {{ page.version.version }}/create-changefeed.md %}#resolved) specified when a changefeed is started, the coordinator will send the resolved timestamp (i.e., the high-water mark) to each endpoint in the sink. For example, when using [Kafka]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka) this will be sent as a message to each partition; for [cloud storage]({% link {{ page.version.version }}/changefeed-sinks.md %}#cloud-storage-sink), this will be emitted as a resolved timestamp file. -As rows are updated, added, and deleted in the targeted table(s), the node sends the row changes through the [rangefeed mechanism]({% link {{ page.version.version }}/create-and-configure-changefeeds.md %}#enable-rangefeeds) to the changefeed encoder, which encodes these changes into the [final message format]({% link {{ page.version.version }}/changefeed-messages.md %}#responses). The message is emitted from the encoder to the sink—it can emit to any endpoint in the sink. In the diagram example, this means that the messages can emit to any Kafka Broker. +As rows are updated, added, and deleted in the targeted table(s), the node sends the row changes through the [rangefeed mechanism]({% link {{ page.version.version }}/create-and-configure-changefeeds.md %}#enable-rangefeeds) to the changefeed encoder, which encodes these changes into the [final message format]({% link {{ page.version.version }}/changefeed-messages.md %}). The message is emitted from the encoder to the sink—it can emit to any endpoint in the sink. In the diagram example, this means that the messages can emit to any Kafka Broker. If you are running changefeeds from a [multi-region]({% link {{ page.version.version }}/multiregion-overview.md %}) cluster, you may want to define which nodes take part in running the changefeed job. You can use the [`execution_locality` option]({% link {{ page.version.version }}/changefeeds-in-multi-region-deployments.md %}#run-a-changefeed-job-by-locality) with key-value pairs to specify the locality requirements nodes must meet. See [Job coordination using the execution locality option]({% link {{ page.version.version }}/changefeeds-in-multi-region-deployments.md %}#job-coordination-using-the-execution-locality-option) for detail on how a changefeed job works with this option. diff --git a/src/current/v25.2/stream-a-changefeed-to-a-confluent-cloud-kafka-cluster.md b/src/current/v25.2/stream-a-changefeed-to-a-confluent-cloud-kafka-cluster.md index b2ef265d16b..014377bf164 100644 --- a/src/current/v25.2/stream-a-changefeed-to-a-confluent-cloud-kafka-cluster.md +++ b/src/current/v25.2/stream-a-changefeed-to-a-confluent-cloud-kafka-cluster.md @@ -352,4 +352,4 @@ You can use the **Schema** tab to view the schema for a specific topic. ## See also - [Changefeed Sinks]({% link {{ page.version.version }}/changefeed-sinks.md %}) -- [Responses]({% link {{ page.version.version }}/changefeed-messages.md %}#responses) +- [Messages]({% link {{ page.version.version }}/changefeed-messages.md %})