Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 42 additions & 31 deletions docs/reference/pipeline/built-in-pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,22 @@ Additionally, the "greptime_" prefix of the pipeline name is reserved.

## `greptime_identity`

The `greptime_identity` pipeline is designed for writing JSON logs and automatically creates columns for each field in the JSON log.
The `greptime_identity` pipeline is designed for writing JSON logs and automatically creates columns for each field in the JSON log. Nested JSON objects are automatically flattened into separate columns using dot notation.

- The first-level keys in the JSON log are used as column names.
- An error is returned if the same field has different types.
- Fields with `null` values are ignored.
- If time index is not specified, an additional column, `greptime_timestamp`, is added to the table as the time index to indicate when the log was written.
- Nested objects are automatically flattened (e.g., `{"a": {"b": 1}}` becomes column `a.b`)
- Arrays are converted to JSON strings
- An error is returned if the same field has different types
- Fields with `null` values are ignored
- If time index is not specified, an additional column, `greptime_timestamp`, is added to the table as the time index to indicate when the log was written

### Type conversion rules

- `string` -> `string`
- `number` -> `int64` or `float64`
- `boolean` -> `bool`
- `null` -> ignore
- `array` -> `json`
- `object` -> `json`
- `array` -> `string` (JSON-stringified)
- `object` -> automatically flattened into separate columns (see [Flatten JSON objects](#flatten-json-objects))


For example, if we have the following json data:
Expand All @@ -39,7 +40,7 @@ For example, if we have the following json data:
]
```

We'll merge the schema for each row of this batch to get the final schema. The table schema will be:
We'll merge the schema for each row of this batch to get the final schema. Note that nested objects are automatically flattened into separate columns (e.g., `object.a`, `object.b`), and arrays are converted to JSON strings. The table schema will be:

```sql
mysql> desc pipeline_logs;
Expand All @@ -49,26 +50,27 @@ mysql> desc pipeline_logs;
| age | Int64 | | YES | | FIELD |
| is_student | Boolean | | YES | | FIELD |
| name | String | | YES | | FIELD |
| object | Json | | YES | | FIELD |
| object.a | Int64 | | YES | | FIELD |
| object.b | Int64 | | YES | | FIELD |
| score | Float64 | | YES | | FIELD |
| company | String | | YES | | FIELD |
| array | Json | | YES | | FIELD |
| array | String | | YES | | FIELD |
| greptime_timestamp | TimestampNanosecond | PRI | NO | | TIMESTAMP |
+--------------------+---------------------+------+------+---------+---------------+
8 rows in set (0.00 sec)
9 rows in set (0.00 sec)
```

The data will be stored in the table as follows:

```sql
mysql> select * from pipeline_logs;
+------+------------+---------+---------------+-------+---------+---------+----------------------------+
| age | is_student | name | object | score | company | array | greptime_timestamp |
+------+------------+---------+---------------+-------+---------+---------+----------------------------+
| 22 | 1 | Charlie | NULL | 95.5 | NULL | [1,2,3] | 2024-10-18 09:35:48.333020 |
| 21 | 0 | NULL | NULL | 85.5 | A | NULL | 2024-10-18 09:35:48.333020 |
| 20 | 1 | Alice | {"a":1,"b":2} | 90.5 | NULL | NULL | 2024-10-18 09:35:48.333020 |
+------+------------+---------+---------------+-------+---------+---------+----------------------------+
+------+------------+---------+----------+----------+-------+---------+-----------+----------------------------+
| age | is_student | name | object.a | object.b | score | company | array | greptime_timestamp |
+------+------------+---------+----------+----------+-------+---------+-----------+----------------------------+
| 22 | 1 | Charlie | NULL | NULL | 95.5 | NULL | [1,2,3] | 2024-10-18 09:35:48.333020 |
| 21 | 0 | NULL | NULL | NULL | 85.5 | A | NULL | 2024-10-18 09:35:48.333020 |
| 20 | 1 | Alice | 1 | 2 | 90.5 | NULL | NULL | 2024-10-18 09:35:48.333020 |
+------+------------+---------+----------+----------+-------+---------+-----------+----------------------------+
3 rows in set (0.01 sec)
```

Expand Down Expand Up @@ -121,33 +123,38 @@ Here are some example of using `custom_time_index` assuming the time variable is

### Flatten JSON objects

If flattening a JSON object into a single-level structure is needed, add the `x-greptime-pipeline-params` header to the request and set `flatten_json_object` to `true`.
The `greptime_identity` pipeline **automatically flattens** nested JSON objects into a single-level structure. This behavior is always enabled and creates separate columns for each nested field using dot notation (e.g., `a.b.c`).

#### Controlling flattening depth

You can control how deeply nested objects are flattened using the `max_nested_levels` parameter in the `x-greptime-pipeline-params` header. The default value is 10 levels.

Here is a sample request:

```shell
curl -X "POST" "http://localhost:4000/v1/ingest?db=<db-name>&table=<table-name>&pipeline_name=greptime_identity&version=<pipeline-version>" \
-H "Content-Type: application/x-ndjson" \
-H "Authorization: Basic {{authentication}}" \
-H "x-greptime-pipeline-params: flatten_json_object=true" \
-H "x-greptime-pipeline-params: max_nested_levels=5" \
-d "$<log-items>"
```

With this configuration, GreptimeDB will automatically flatten each field of the JSON object into separate columns. For example:
When the maximum nesting level is reached, any remaining nested structure is converted to a JSON string and stored in a single column. For example, with `max_nested_levels=3`:

```JSON
{
"a": {
"b": {
"c": [1, 2, 3]
"c": {
"d": [1, 2, 3]
}
}
},
"d": [
"e": [
"foo",
"bar"
],
"e": {
"f": [7, 8, 9],
"f": {
"g": {
"h": 123,
"i": "hello",
Expand All @@ -163,14 +170,18 @@ Will be flattened to:

```json
{
"a.b.c": [1,2,3],
"d": ["foo","bar"],
"e.f": [7,8,9],
"e.g.h": 123,
"e.g.i": "hello",
"e.g.j.k": true
"a.b.c": "{\"d\":[1,2,3]}",
"e": "[\"foo\",\"bar\"]",
"f.g.h": 123,
"f.g.i": "hello",
"f.g.j": "{\"k\":true}"
}
```

Note that:
- Arrays at any level are always converted to JSON strings (e.g., `"e"` becomes `"[\"foo\",\"bar\"]"`)
- When the nesting level limit is reached (level 3 in this example), the remaining nested objects are converted to JSON strings (e.g., `"a.b.c"` and `"f.g.j"`)
- Regular scalar values within the depth limit are stored as their native types (e.g., `"f.g.h"` as integer, `"f.g.i"` as string)



2 changes: 1 addition & 1 deletion docs/user-guide/ingest-data/for-observability/vector.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ password = "<password>"

[sinks.my_sink_id.extra_params]
source = "vector"
x-greptime-pipeline-params = "flatten_json_object=true"
x-greptime-pipeline-params = "max_nested_levels=10"
```

This example demonstrates how to use `greptimedb_logs` sink to write generated demo logs data to GreptimeDB. For more information, please refer to [Vector greptimedb_logs sink](https://vector.dev/docs/reference/configuration/sinks/greptimedb_logs/) documentation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,22 @@ GreptimeDB 提供了常见日志格式的内置 Pipeline,允许你直接使用

## `greptime_identity`

`greptime_identity` Pipeline 适用于写入 JSON 日志,并自动为 JSON 日志中的每个字段创建列。
`greptime_identity` Pipeline 适用于写入 JSON 日志,并自动为 JSON 日志中的每个字段创建列。嵌套的 JSON 对象将自动展开为使用点符号的单独列。

- JSON 日志中的第一层级的 key 是表中的列名。
- 如果相同字段包含不同类型的数据,则会返回错误。
- 值为 `null` 的字段将被忽略。
- 如果没有手动指定,一个作为时间索引的额外列 `greptime_timestamp` 将被添加到表中,以指示日志写入的时间。
- 嵌套对象会被自动展开(例如,`{"a": {"b": 1}}` 变成列 `a.b`)
- 数组会被转换为 JSON 字符串
- 如果相同字段包含不同类型的数据,则会返回错误
- 值为 `null` 的字段将被忽略
- 如果没有手动指定,一个作为时间索引的额外列 `greptime_timestamp` 将被添加到表中,以指示日志写入的时间

### 类型转换规则

- `string` -> `string`
- `number` -> `int64` 或 `float64`
- `boolean` -> `bool`
- `null` -> 忽略
- `array` -> `json`
- `object` -> `json`
- `array` -> `string`(JSON 字符串格式)
- `object` -> 自动展开为单独的列(参见[展开 JSON 对象](#展开-json-对象))

例如,如果我们有以下 JSON 数据:

Expand All @@ -37,7 +38,7 @@ GreptimeDB 提供了常见日志格式的内置 Pipeline,允许你直接使用
]
```

我们将合并每个批次的行结构以获得最终 schema。表 schema 如下所示:
我们将合并每个批次的行结构以获得最终 schema。注意,嵌套对象会自动展开为单独的列(例如 `object.a`、`object.b`),数组会转换为 JSON 字符串。表 schema 如下所示:

```sql
mysql> desc pipeline_logs;
Expand All @@ -47,26 +48,27 @@ mysql> desc pipeline_logs;
| age | Int64 | | YES | | FIELD |
| is_student | Boolean | | YES | | FIELD |
| name | String | | YES | | FIELD |
| object | Json | | YES | | FIELD |
| object.a | Int64 | | YES | | FIELD |
| object.b | Int64 | | YES | | FIELD |
| score | Float64 | | YES | | FIELD |
| company | String | | YES | | FIELD |
| array | Json | | YES | | FIELD |
| array | String | | YES | | FIELD |
| greptime_timestamp | TimestampNanosecond | PRI | NO | | TIMESTAMP |
+--------------------+---------------------+------+------+---------+---------------+
8 rows in set (0.00 sec)
9 rows in set (0.00 sec)
```

数据将存储在表中,如下所示:

```sql
mysql> select * from pipeline_logs;
+------+------------+---------+---------------+-------+---------+---------+----------------------------+
| age | is_student | name | object | score | company | array | greptime_timestamp |
+------+------------+---------+---------------+-------+---------+---------+----------------------------+
| 22 | 1 | Charlie | NULL | 95.5 | NULL | [1,2,3] | 2024-10-18 09:35:48.333020 |
| 21 | 0 | NULL | NULL | 85.5 | A | NULL | 2024-10-18 09:35:48.333020 |
| 20 | 1 | Alice | {"a":1,"b":2} | 90.5 | NULL | NULL | 2024-10-18 09:35:48.333020 |
+------+------------+---------+---------------+-------+---------+---------+----------------------------+
+------+------------+---------+----------+----------+-------+---------+-----------+----------------------------+
| age | is_student | name | object.a | object.b | score | company | array | greptime_timestamp |
+------+------------+---------+----------+----------+-------+---------+-----------+----------------------------+
| 22 | 1 | Charlie | NULL | NULL | 95.5 | NULL | [1,2,3] | 2024-10-18 09:35:48.333020 |
| 21 | 0 | NULL | NULL | NULL | 85.5 | A | NULL | 2024-10-18 09:35:48.333020 |
| 20 | 1 | Alice | 1 | 2 | 90.5 | NULL | NULL | 2024-10-18 09:35:48.333020 |
+------+------------+---------+----------+----------+-------+---------+-----------+----------------------------+
3 rows in set (0.01 sec)
```

Expand Down Expand Up @@ -117,35 +119,40 @@ DESC pipeline_logs;
- "2025-06-27T15:02:23.082253908Z": `custom_time_index=input_ts;datestr;%Y-%m-%dT%H:%M:%S%.9f%#z`


### 展开 json 对象
### 展开 JSON 对象

如果你希望将 JSON 对象展开为单层结构,可以在请求的 header 中添加 `x-greptime-pipeline-params` 参数,设置 `flatten_json_object` 为 `true`。
`greptime_identity` pipeline **自动展开**嵌套的 JSON 对象为单层结构。此行为始终启用,使用点符号(例如 `a.b.c`)为每个嵌套字段创建单独的列。

#### 控制展开深度

你可以使用 `x-greptime-pipeline-params` header 中的 `max_nested_levels` 参数来控制对象展开的深度。默认值为 10 层。

以下是一个示例请求:

```shell
curl -X "POST" "http://localhost:4000/v1/ingest?db=<db-name>&table=<table-name>&pipeline_name=greptime_identity&version=<pipeline-version>" \
-H "Content-Type: application/x-ndjson" \
-H "Authorization: Basic {{authentication}}" \
-H "x-greptime-pipeline-params: flatten_json_object=true" \
-H "x-greptime-pipeline-params: max_nested_levels=5" \
-d "$<log-items>"
```

这样,GreptimeDB 将自动将 JSON 对象的每个字段展开为单独的列。比如
当达到最大嵌套级别时,任何剩余的嵌套结构都会被转换为 JSON 字符串并存储在单个列中。例如,当 `max_nested_levels=3` 时:

```JSON
{
"a": {
"b": {
"c": [1, 2, 3]
"c": {
"d": [1, 2, 3]
}
}
},
"d": [
"e": [
"foo",
"bar"
],
"e": {
"f": [7, 8, 9],
"f": {
"g": {
"h": 123,
"i": "hello",
Expand All @@ -161,12 +168,16 @@ curl -X "POST" "http://localhost:4000/v1/ingest?db=<db-name>&table=<table-name>&

```json
{
"a.b.c": [1,2,3],
"d": ["foo","bar"],
"e.f": [7,8,9],
"e.g.h": 123,
"e.g.i": "hello",
"e.g.j.k": true
"a.b.c": "{\"d\":[1,2,3]}",
"e": "[\"foo\",\"bar\"]",
"f.g.h": 123,
"f.g.i": "hello",
"f.g.j": "{\"k\":true}"
}
```

注意:
- 任何级别的数组都会被转换为 JSON 字符串(例如,`"e"` 变成 `"[\"foo\",\"bar\"]"`)
- 当达到嵌套级别限制时(此例中为第 3 层),剩余的嵌套对象会被转换为 JSON 字符串(例如 `"a.b.c"` 和 `"f.g.j"`)
- 深度限制内的常规标量值以其原生类型存储(例如 `"f.g.h"` 为整数,`"f.g.i"` 为字符串)

Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ password = "<password>"

[sinks.my_sink_id.extra_params]
source = "vector"
x-greptime-pipeline-params = "flatten_json_object=true"
x-greptime-pipeline-params = "max_nested_levels=10"
```

此示例展示了如何使用 `greptimedb_logs` sink 将生成的 demo 日志数据写入 GreptimeDB。更多信息请参考 [Vector greptimedb_logs sink](https://vector.dev/docs/reference/configuration/sinks/greptimedb_logs/) 文档。
Expand Down
Loading