Skip to content

Commit 22d0afb

Browse files
authored
feat: MaxCompute Sink (#55)
* feat: - bump depot version - add maxcompute sink - adjust gradle dependencies * fix: Instrumentation * chore: add configuration for image building and local testing * chore: cleanup unused change * chore: add maxcompute sink documentation * chore: change version to 0.11.0 and depot version to 0.10.0 * chore: fix maxcompute-sink.md * fix: wrong class name * chore: Update maxcompute-sink.md
1 parent a376678 commit 22d0afb

File tree

5 files changed

+85
-3
lines changed

5 files changed

+85
-3
lines changed

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ lombok {
3333
}
3434

3535
group 'com.gotocompany'
36-
version '0.10.7'
36+
version '0.11.0'
3737

3838
def projName = "firehose"
3939

@@ -100,7 +100,7 @@ dependencies {
100100
implementation platform('com.google.cloud:libraries-bom:20.5.0')
101101
implementation 'com.google.cloud:google-cloud-storage:2.20.1'
102102
implementation 'org.apache.logging.log4j:log4j-core:2.20.0'
103-
implementation group: 'com.gotocompany', name: 'depot', version: '0.9.2'
103+
implementation group: 'com.gotocompany', name: 'depot', version: '0.10.0'
104104
implementation group: 'com.networknt', name: 'json-schema-validator', version: '1.0.59' exclude group: 'org.slf4j'
105105
implementation 'dev.cel:cel:0.5.2'
106106

docs/docs/sinks/maxcompute-sink.md

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# MaxCompute sink
2+
3+
### Datatype Protobuf
4+
5+
MaxCompute sink has several responsibilities, including :
6+
7+
1. Creation of MaxCompute table if it does not exist.
8+
2. Updating the MaxCompute table schema based on the latest protobuf schema.
9+
3. Translating protobuf messages into MaxCompute compatible records and inserting them into MaxCompute tables.
10+
11+
## MaxCompute Table Schema Update
12+
13+
### Protobuf
14+
15+
MaxCompute Sink update the MaxCompute table schema on separate table update operation. MaxCompute
16+
utilise [Stencil](https://github.com/goto/stencil) to parse protobuf messages generate schema and update MaxCompute
17+
tables with the latest schema.
18+
The stencil client periodically reload the descriptor cache. Table schema update happened after the descriptor caches
19+
uploaded.
20+
21+
#### Supported Protobuf - MaxCompute Table Type Mapping
22+
23+
| Protobuf Type | MaxCompute Type |
24+
|------------------------------------------|-------------------------------|
25+
| bytes | BINARY |
26+
| string | STRING |
27+
| enum | STRING |
28+
| float | FLOAT |
29+
| double | DOUBLE |
30+
| bool | BOOLEAN |
31+
| int64, uint64, fixed64, sfixed64, sint64 | BIGINT |
32+
| int32, uint32, fixed32, sfixed32, sint32 | INT |
33+
| message | STRUCT |
34+
| .google.protobuf.Timestamp | TIMESTAMP_NTZ |
35+
| .google.protobuf.Struct | STRING (Json Serialised) |
36+
| .google.protobuf.Duration | STRUCT |
37+
| map<k,v> | ARRAY<STRUCT<key:k, value:v>> |
38+
39+
## Partitioning
40+
41+
MaxCompute Sink supports creation of table with partition configuration. Currently, MaxCompute Sink supports primitive field(STRING, TINYINT, SMALLINT, BIGINT)
42+
and timestamp field based partitioning. Timestamp based partitioning strategy introduces a pseudo-partition column with the value of the timestamp field truncated to the nearest start of day.
43+
44+
## Clustering
45+
46+
MaxCompute Sink currently does not support clustering.
47+
48+
## Metadata
49+
50+
For data quality checking purposes, sometimes some metadata need to be added on the record.
51+
if `SINK_MAXCOMPUTE_ADD_METADATA_ENABLED` is true then the metadata will be added.
52+
`SINK_MAXCOMPUTE_METADATA_NAMESPACE` is used for another namespace to add columns
53+
if namespace is empty, the metadata columns will be added in the root level.
54+
`SINK_MAXCOMPUTE_METADATA_COLUMNS_TYPES` is set with kafka metadata column and their type,
55+
An example of metadata columns that can be added for kafka records.

env/local.properties

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,4 +205,21 @@ SOURCE_KAFKA_CONSUMER_GROUP_ID=sample-group-id
205205
# SINK_REDIS_TTL_TYPE=DISABLE
206206
# SINK_REDIS_TTL_VALUE=0
207207
# SINK_REDIS_DEPLOYMENT_TYPE=Standalone
208+
#############################################
209+
#
210+
## MaxCompute Sink
211+
#
212+
SINK_MAXCOMPUTE_ODPS_URL=http://service.ap-southeast-5.maxcompute.aliyun.com/api
213+
SINK_MAXCOMPUTE_TUNNEL_URL=http://dt.ap-southeast-5.maxcompute.aliyun.com
214+
SINK_MAXCOMPUTE_ACCESS_ID=
215+
SINK_MAXCOMPUTE_ACCESS_KEY=
216+
SINK_MAXCOMPUTE_PROJECT_ID=your_project_id
217+
SINK_MAXCOMPUTE_SCHEMA=default
218+
SINK_MAXCOMPUTE_METADATA_NAMESPACE=__kafka_metadata
219+
SINK_MAXCOMPUTE_ADD_METADATA_ENABLED=true
220+
SINK_MAXCOMPUTE_METADATA_COLUMNS_TYPES=message_timestamp=timestamp,message_topic=string,message_partition=integer,message_offset=long
221+
SINK_MAXCOMPUTE_TABLE_PARTITIONING_ENABLE=true
222+
SINK_MAXCOMPUTE_TABLE_PARTITION_KEY=event_timestamp
223+
SINK_MAXCOMPUTE_TABLE_PARTITION_COLUMN_NAME=__partition_key
224+
SINK_MAXCOMPUTE_TABLE_NAME=table_name
208225

src/main/java/com/gotocompany/firehose/config/enums/SinkType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,6 @@ public enum SinkType {
1919
BLOB,
2020
BIGQUERY,
2121
BIGTABLE,
22-
MONGODB
22+
MONGODB,
23+
MAXCOMPUTE
2324
}

src/main/java/com/gotocompany/firehose/sink/SinkFactory.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import com.gotocompany.depot.http.HttpSink;
1212
import com.gotocompany.depot.log.LogSink;
1313
import com.gotocompany.depot.log.LogSinkFactory;
14+
import com.gotocompany.depot.maxcompute.MaxComputeSink;
15+
import com.gotocompany.depot.maxcompute.MaxComputeSinkFactory;
1416
import com.gotocompany.depot.metrics.StatsDReporter;
1517
import com.gotocompany.depot.redis.RedisSink;
1618
import com.gotocompany.depot.redis.RedisSinkFactory;
@@ -46,6 +48,7 @@ public class SinkFactory {
4648
private LogSinkFactory logSinkFactory;
4749
private RedisSinkFactory redisSinkFactory;
4850
private com.gotocompany.depot.http.HttpSinkFactory httpv2SinkFactory;
51+
private MaxComputeSinkFactory maxComputeSinkFactory;
4952

5053
public SinkFactory(KafkaConsumerConfig kafkaConsumerConfig,
5154
StatsDReporter statsDReporter,
@@ -104,6 +107,10 @@ public void init() {
104107
statsDReporter);
105108
httpv2SinkFactory.init();
106109
return;
110+
case MAXCOMPUTE:
111+
maxComputeSinkFactory = new MaxComputeSinkFactory(statsDReporter, stencilClient, config);
112+
maxComputeSinkFactory.init();
113+
return;
107114
default:
108115
throw new ConfigurationException("Invalid Firehose SINK_TYPE");
109116
}
@@ -139,6 +146,8 @@ public Sink getSink() {
139146
return MongoSinkFactory.create(config, statsDReporter, stencilClient);
140147
case HTTPV2:
141148
return new GenericSink(new FirehoseInstrumentation(statsDReporter, HttpSink.class), sinkType.name(), httpv2SinkFactory.create());
149+
case MAXCOMPUTE:
150+
return new GenericSink(new FirehoseInstrumentation(statsDReporter, MaxComputeSink.class), sinkType.name(), maxComputeSinkFactory.create());
142151
default:
143152
throw new ConfigurationException("Invalid Firehose SINK_TYPE");
144153
}

0 commit comments

Comments
 (0)