|
| 1 | +--- |
| 2 | +date: 2020-11-11 |
| 3 | +title: "Accessing Data and Metadata" |
| 4 | +linkTitle: "Accessing Data and Metadata" |
| 5 | +weight: 60 |
| 6 | +description: > |
| 7 | + The commons configuration for Connect File Pulse. |
| 8 | +--- |
| 9 | + |
| 10 | +Some filters (e.g : [AppendFilter](/kafka-connect-file-pulse/docs/developer-guide/filters/#appendfilter) can be configured using *Simple Connect Expression Language*. |
| 11 | + |
| 12 | +*Simple Connect Expression Language* (ScEL for short) is an expression language based on regex that allows quick access and manipulating record fields and metadata. |
| 13 | + |
| 14 | +The syntaxes to define an expression are of the form : `<expression string>` or `"{{ <expression string> }}"`. |
| 15 | + |
| 16 | +ScEL supports the following capabilities : |
| 17 | + |
| 18 | +* **Literal expressions** |
| 19 | +* **Field Selector** |
| 20 | +* **Nested Navigation** |
| 21 | +* **String substitution** |
| 22 | +* **Functions** |
| 23 | + |
| 24 | +## Literal expressions |
| 25 | + |
| 26 | +* String : `'Hello World'` |
| 27 | +* Number : `42` |
| 28 | +* Boolean: `True` |
| 29 | +* Nullable: `null` |
| 30 | + |
| 31 | +## Field Selector |
| 32 | + |
| 33 | +The expression language can be used to easily select one field from the input record : |
| 34 | + |
| 35 | +`$.username` |
| 36 | + |
| 37 | +## Nested Navigation |
| 38 | + |
| 39 | +To navigate down a struct value, just use a period to indicate a nested field value : |
| 40 | + |
| 41 | +`$.address.city` |
| 42 | + |
| 43 | +## String substitution |
| 44 | + |
| 45 | +The expression language can be used to easily build a new string field that concatenate multiple ones : |
| 46 | + |
| 47 | +`The user {{ $.username }} is living in city {{ $.address.city }}` |
| 48 | + |
| 49 | +## Function |
| 50 | + |
| 51 | +The expression language support function call : |
| 52 | + |
| 53 | +`The user {{ $.username }} is living in city {{ uppercase($.address.city) }}` |
| 54 | + |
| 55 | +## Dynamic Field Selector |
| 56 | + |
| 57 | +String substitution can be used to dynamically select a field : |
| 58 | + |
| 59 | +The bellow example shows how to dynamically build a field selector by concatenating `$.` and |
| 60 | +the first element present in the array field `$.values`. |
| 61 | + |
| 62 | +`{{ '$.'extract_array($.values, 0) }}` |
| 63 | + |
| 64 | + |
| 65 | +Note the used of double-quotes to define a substitution expressions |
| 66 | + |
| 67 | +## Built-in Functions |
| 68 | + |
| 69 | +ScEL supports a number of predefined functions that can be used to apply a single transformation on a field. |
| 70 | + |
| 71 | +| Function | Description | Syntax | |
| 72 | +| ----------------|---------------|-----------| |
| 73 | +| `concat` | Concatenate two or more string expressions. | `{{ concat(expr1, expr2, ...) }}` | |
| 74 | +| `concat_ws` | Concatenate two or more string expressions, using the specified separator between each. | `{{ concat_ws(separator, prefix, suffix, expr1, expr2, ...) }}` | |
| 75 | +| `contains` | Returns `true` if an array field's value contains the specified value | `{{ contains(array, 'value') }}` | |
| 76 | +| `converts` | Converts a field's value into the specified type | `{{ converts(field, INTEGER) }}` | |
| 77 | +| `ends_with` | Returns `true` if an a string field's value end with the specified string suffix | `{{ ends_with(field, 'suffix') }}` | |
| 78 | +| `equals` | Returns `true` if an a string or number fields's value equals the specified value | `{{ equals(field, value) }}` | |
| 79 | +| `exists` | Returns `true` if an the specified field exists | `{{ ends_with(field, value) }}` | |
| 80 | +| `extract_array` | Returns the element at the specified position of the specified array | `{{ extract_array(array, 0) }}` | |
| 81 | +| `hash` | Hash a given string expression, using murmur2 algorithm | `{{ hash(field_expr) }}` | |
| 82 | +| `is_null` | Returns `true` if a field's value is null | `{{ is_null(field) }}` | |
| 83 | +| `length` | Returns the number of elements into an array of the length of an string field | `{{ length(array) }}` | |
| 84 | +| `lowercase` | Converts all of the characters in a string field's value to lower case | `{{ lowercase(field) }}` | |
| 85 | +| `matches` | Returns `true` if a field's value match the specified regex | `{{ matches(field, 'regex') }}` | |
| 86 | +| `md5` | Compute the MD5 hash of string expression | `{{ md5(field_expr) }}` | |
| 87 | +| `nlv` | Sets a default value if a field's value is null | `{{ length(array) }}` | |
| 88 | +| `replace_all ` | Replaces every subsequence of the field's value that matches the given pattern with the given replacement string. | `{{ replace_all(field, 'regex', 'replacement') }}` | |
| 89 | +| `starts_with` | Returns `true` if an a string field's value start with the specified string prefix | `{{ starts_with(field, 'prefix') }}` | |
| 90 | +| `trim` | Trims the spaces from the beginning and end of a string. | `{{ trim(field) }}` | |
| 91 | +| `uppercase` | Converts all of the characters in a string field's value to upper case | `{{ uppercase(field) }}` | |
| 92 | +| `uuid` | Create a Universally Unique Identifier (UUID) | `{{ uuid() }}` | |
| 93 | + |
| 94 | + |
| 95 | +In addition, ScEL supports nested functions. |
| 96 | + |
| 97 | +For example, the following expression can be used to replace all whitespace characters after transforming our field's value into lowercase. |
| 98 | + |
| 99 | +``` |
| 100 | +replace_all(lowercase($.field), '\\s', '-') |
| 101 | +``` |
| 102 | + |
| 103 | +{{% alert title="Limitation" color="warning" %}} |
| 104 | +Currently, FilePulse does not support user-defined functions (UDFs). So you cannot register your own functions to enrich the expression language. |
| 105 | +{{% /alert %}} |
| 106 | + |
| 107 | + |
| 108 | +## Scopes |
| 109 | + |
| 110 | +In the previous section, we have shown how to use the expression language to select a specific field. |
| 111 | +The selected field was part of our the current record being processed. |
| 112 | + |
| 113 | +Actually, ScEL allows you to get access to additional fields through the used of scopes. |
| 114 | +Basically, a scope defined the root object on which a selector expression must evaluated. |
| 115 | + |
| 116 | +The syntax to define an expression with a scope is of the form : "`$<scope>.<selector expression string>`". |
| 117 | + |
| 118 | +By default, if no scope is defined in the expression, the scope `$value` is implicitly used. |
| 119 | + |
| 120 | +ScEL supports a number of predefined scopes that can be used for example : |
| 121 | + |
| 122 | + - **To define the topic for the record.** |
| 123 | + - **To define the key for the record.** |
| 124 | + - **To get access to metadata about the source file.** |
| 125 | + - Etc. |
| 126 | + |
| 127 | +| Scope | Description | Type | |
| 128 | +|--- | --- |--- | |
| 129 | +| `$headers` | The record headers | - | |
| 130 | +| `$key` | The record key | `string` | |
| 131 | +| `$metadata` | The file metadata | `struct` | |
| 132 | +| `$offset` | The offset information of this record into the source file | `struct` | |
| 133 | +| `$system` | The system environment variables and runtime properties | `struct` | |
| 134 | +| `$timestamp` | The record timestamp | `long` | |
| 135 | +| `$topic` | The output topic | `string` | |
| 136 | +| `$value` | The record value| `struct` | |
| 137 | +| `$variables` | The contextual filter-chain variables| `map[string, object]` | |
| 138 | + |
| 139 | +Note, that in case of failures more fields are added to the current filter context (see : [Handling Failures](/kafka-connect-file-pulse/docs/developer-guide/handling-failures/)) |
| 140 | + |
| 141 | +### Record Headers |
| 142 | + |
| 143 | +The scope `headers` allows defining the headers of the output record. |
| 144 | + |
| 145 | +### Record key |
| 146 | + |
| 147 | +The scope `key` allows defining the key of the output record. Only string key is currently supported. |
| 148 | + |
| 149 | +### Source Metadata |
| 150 | + |
| 151 | +The scope `metadata` allows read access to information about the file being processing. |
| 152 | + |
| 153 | +| Predefined Fields (ScEL) | Description | Type | |
| 154 | +|--- | --- |--- | |
| 155 | +| `$metadata.name` | The file name | `string` | |
| 156 | +| `$metadata.path` | The file directory path | `string` | |
| 157 | +| `$metadata.absolutePath` | The file absolute path | `string` | |
| 158 | +| `$metadata.hash` | The file CRC32 hash | `int` | |
| 159 | +| `$metadata.lastModified` | The file last modified time. | `long` | |
| 160 | +| `$metadata.size` | The file size | `long` | |
| 161 | +| `$metadata.inode` | The file Unix inode | `long` | |
| 162 | + |
| 163 | +## Record Offset |
| 164 | + |
| 165 | +The scope `offset` allows read access to information about the original position of the record into the source file. |
| 166 | +The available fields depend on the configured FileInputRecord. |
| 167 | + |
| 168 | +| Predefined Fields (ScEL) | Description | Type | |
| 169 | +|--- | --- |--- | |
| 170 | +| `$offset.timestamp` | The creation time of the record (millisecond) | `long` | |
| 171 | + |
| 172 | +Information only available if `RowFilterReader` is configured. |
| 173 | + |
| 174 | +| Predefined Fields (ScEL) | Description | Type | |
| 175 | +|--- | --- |--- | |
| 176 | +| `$offset.startPosition` | The start position of the record into the source file | `long` | |
| 177 | +| `$offset.endPosition` | The end position of the record into the source file | `long` | |
| 178 | +| `$offset.size` | The size in bytes | `long` | |
| 179 | +| `$offset.row` | The row number of the record into the source | `long` | |
| 180 | + |
| 181 | +Information only available if `BytesArrayInputReader` is configured. |
| 182 | + |
| 183 | +| Predefined Fields (ScEL) | Description | Type | |
| 184 | +|--- | --- |--- | |
| 185 | +| `$offset.startPosition` | The start position of the record into the source file (always equals to 0) | `long` | |
| 186 | +| `$offset.endPosition` | The end position of the record into the source file (equals to the file size) | `long` | |
| 187 | + |
| 188 | +Information only available if `AvroFilterInputReader` is configured. |
| 189 | + |
| 190 | +| Predefined Fields (ScEL) | Description | Type | |
| 191 | +|--- | --- |--- | |
| 192 | +| `$offset.blockStart` | The start position of the current block | `long` | |
| 193 | +| `$offset.position` | The position into the current block. | `long` | |
| 194 | +| `$offset.records` | The number of record read into the current block. | `long` | |
| 195 | + |
| 196 | +## System |
| 197 | + |
| 198 | +The scope `system` allows accessing to the system environment variables and runtime properties. |
| 199 | + |
| 200 | +| Predefined Fields (ScEL) | Description | Type | |
| 201 | +|--- | --- |--- | |
| 202 | +| `$system.env` | The system environment variables. | `map[string, string]` | |
| 203 | +| `$system.props` | The system environment properties. | `map[string, string]` | |
| 204 | + |
| 205 | +## Timestamp |
| 206 | + |
| 207 | +The scope `$timestamp` allows defining the timestamp of the output record. |
| 208 | + |
| 209 | +## Topic |
| 210 | + |
| 211 | +The scope `$topic` allows defining the target topic of the output record. |
| 212 | + |
| 213 | +## Value |
| 214 | + |
| 215 | +The scope `$value` allows defining the fields of the output record |
| 216 | + |
| 217 | +## Variables |
| 218 | + |
| 219 | +The scope `$variables` allows read/write access to a simple key-value map structure. |
| 220 | +This scope can be used to share user-defined variables between [Processing Filters](/kafka-connect-file-pulse/docs/developer-guide/filters/). |
| 221 | + |
| 222 | +Note : variables are not cached between records. |
0 commit comments