Skip to content

Commit 21c5a74

Browse files
committed
output to insert data into table storage
1 parent e746ec9 commit 21c5a74

File tree

10 files changed

+659
-48
lines changed

10 files changed

+659
-48
lines changed

config/env/README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -831,6 +831,14 @@ OUTPUT_SQS_MESSAGE_GROUP_ID
831831
OUTPUT_SQS_REGION = eu-west-1
832832
OUTPUT_SQS_URL
833833
OUTPUT_STDOUT_DELIMITER
834+
OUTPUT_TABLE_STORAGE_INSERT_TYPE = INSERT
835+
OUTPUT_TABLE_STORAGE_MAX_IN_FLIGHT = 1
836+
OUTPUT_TABLE_STORAGE_PARTITION_KEY
837+
OUTPUT_TABLE_STORAGE_ROW_KEY
838+
OUTPUT_TABLE_STORAGE_STORAGE_ACCESS_KEY
839+
OUTPUT_TABLE_STORAGE_STORAGE_ACCOUNT
840+
OUTPUT_TABLE_STORAGE_TABLE_NAME
841+
OUTPUT_TABLE_STORAGE_TIMEOUT = 5s
834842
OUTPUT_TCP_ADDRESS = localhost:4194
835843
OUTPUT_UDP_ADDRESS = localhost:4194
836844
OUTPUT_WEBSOCKET_BASIC_AUTH_ENABLED = false

config/env/default.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -993,6 +993,15 @@ output:
993993
url: ${OUTPUT_SQS_URL}
994994
stdout:
995995
delimiter: ${OUTPUT_STDOUT_DELIMITER}
996+
table_storage:
997+
insert_type: ${OUTPUT_TABLE_STORAGE_INSERT_TYPE:INSERT}
998+
max_in_flight: ${OUTPUT_TABLE_STORAGE_MAX_IN_FLIGHT:1}
999+
partition_key: ${OUTPUT_TABLE_STORAGE_PARTITION_KEY}
1000+
row_key: ${OUTPUT_TABLE_STORAGE_ROW_KEY}
1001+
storage_access_key: ${OUTPUT_TABLE_STORAGE_STORAGE_ACCESS_KEY}
1002+
storage_account: ${OUTPUT_TABLE_STORAGE_STORAGE_ACCOUNT}
1003+
table_name: ${OUTPUT_TABLE_STORAGE_TABLE_NAME}
1004+
timeout: ${OUTPUT_TABLE_STORAGE_TIMEOUT:5s}
9961005
tcp:
9971006
address: ${OUTPUT_TCP_ADDRESS:localhost:4194}
9981007
type: ${OUTPUT_TYPE:dynamic}

config/table_storage.yaml

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
# This file was auto generated by benthos_config_gen.
2+
http:
3+
address: 0.0.0.0:4195
4+
enabled: true
5+
read_timeout: 5s
6+
root_path: /benthos
7+
debug_endpoints: false
8+
input:
9+
type: stdin
10+
stdin:
11+
delimiter: ""
12+
max_buffer: 1e+06
13+
multipart: false
14+
buffer:
15+
type: none
16+
none: {}
17+
pipeline:
18+
processors: []
19+
threads: 1
20+
output:
21+
type: table_storage
22+
table_storage:
23+
insert_type: INSERT
24+
max_in_flight: 1
25+
partition_key: ""
26+
properties: {}
27+
row_key: ""
28+
storage_access_key: ""
29+
storage_account: ""
30+
table_name: ""
31+
timeout: 5s
32+
resources:
33+
caches: {}
34+
conditions: {}
35+
inputs: {}
36+
outputs: {}
37+
processors: {}
38+
rate_limits: {}
39+
logger:
40+
prefix: benthos
41+
level: INFO
42+
add_timestamp: true
43+
json_format: true
44+
static_fields:
45+
'@service': benthos
46+
metrics:
47+
type: http_server
48+
http_server:
49+
prefix: benthos
50+
tracer:
51+
type: none
52+
none: {}
53+
shutdown_timeout: 20s

go.mod

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ module github.com/Jeffail/benthos/v3
33
require (
44
cloud.google.com/go v0.57.0 // indirect
55
cloud.google.com/go/pubsub v1.3.1
6+
github.com/Azure/azure-sdk-for-go v44.2.0+incompatible
67
github.com/Azure/azure-storage-blob-go v0.10.0
78
github.com/Azure/go-amqp v0.12.7
89
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 // indirect
10+
github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect
911
github.com/Jeffail/gabs/v2 v2.5.1
1012
github.com/Microsoft/go-winio v0.4.14 // indirect
1113
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect
@@ -22,6 +24,7 @@ require (
2224
github.com/colinmarc/hdfs v1.1.3
2325
github.com/containerd/continuity v0.0.0-20181203112020-004b46473808 // indirect
2426
github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect
27+
github.com/dnaeon/go-vcr v1.0.1 // indirect
2528
github.com/docker/go-connections v0.4.0 // indirect
2629
github.com/docker/go-units v0.3.3 // indirect
2730
github.com/eclipse/paho.mqtt.golang v1.2.0
@@ -66,6 +69,7 @@ require (
6669
github.com/quipo/dependencysolver v0.0.0-20170801134659-2b009cb4ddcc
6770
github.com/quipo/statsd v0.0.0-20180118161217-3d6a5565f314
6871
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 // indirect
72+
github.com/satori/go.uuid v1.2.0 // indirect
6973
github.com/sirupsen/logrus v1.6.0 // indirect
7074
github.com/smartystreets/assertions v0.0.0-20190215210624-980c5ac6f3ac // indirect
7175
github.com/smartystreets/goconvey v0.0.0-20190222223459-a17d461953aa // indirect

lib/output/constructor.go

Lines changed: 51 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
"github.com/Jeffail/benthos/v3/lib/types"
1313
"github.com/Jeffail/benthos/v3/lib/util/config"
1414
"github.com/Jeffail/benthos/v3/lib/x/docs"
15-
yaml "gopkg.in/yaml.v3"
15+
"gopkg.in/yaml.v3"
1616
)
1717

1818
//------------------------------------------------------------------------------
@@ -91,6 +91,7 @@ const (
9191
TypeSTDOUT = "stdout"
9292
TypeSwitch = "switch"
9393
TypeSyncResponse = "sync_response"
94+
TypeTableStorage = "table_storage"
9495
TypeTCP = "tcp"
9596
TypeTry = "try"
9697
TypeUDP = "udp"
@@ -103,53 +104,54 @@ const (
103104

104105
// Config is the all encompassing configuration struct for all output types.
105106
type Config struct {
106-
Type string `json:"type" yaml:"type"`
107-
AMQP writer.AMQPConfig `json:"amqp" yaml:"amqp"`
108-
AMQP09 writer.AMQPConfig `json:"amqp_0_9" yaml:"amqp_0_9"`
109-
AMQP1 writer.AMQP1Config `json:"amqp_1" yaml:"amqp_1"`
110-
BlobStorage writer.AzureBlobStorageConfig `json:"blob_storage" yaml:"blob_storage"`
111-
Broker BrokerConfig `json:"broker" yaml:"broker"`
112-
Cache writer.CacheConfig `json:"cache" yaml:"cache"`
113-
Drop writer.DropConfig `json:"drop" yaml:"drop"`
114-
DropOnError DropOnErrorConfig `json:"drop_on_error" yaml:"drop_on_error"`
115-
Dynamic DynamicConfig `json:"dynamic" yaml:"dynamic"`
116-
DynamoDB writer.DynamoDBConfig `json:"dynamodb" yaml:"dynamodb"`
117-
Elasticsearch writer.ElasticsearchConfig `json:"elasticsearch" yaml:"elasticsearch"`
118-
File FileConfig `json:"file" yaml:"file"`
119-
Files writer.FilesConfig `json:"files" yaml:"files"`
120-
GCPPubSub writer.GCPPubSubConfig `json:"gcp_pubsub" yaml:"gcp_pubsub"`
121-
HDFS writer.HDFSConfig `json:"hdfs" yaml:"hdfs"`
122-
HTTPClient writer.HTTPClientConfig `json:"http_client" yaml:"http_client"`
123-
HTTPServer HTTPServerConfig `json:"http_server" yaml:"http_server"`
124-
Inproc InprocConfig `json:"inproc" yaml:"inproc"`
125-
Kafka writer.KafkaConfig `json:"kafka" yaml:"kafka"`
126-
Kinesis writer.KinesisConfig `json:"kinesis" yaml:"kinesis"`
127-
KinesisFirehose writer.KinesisFirehoseConfig `json:"kinesis_firehose" yaml:"kinesis_firehose"`
128-
MQTT writer.MQTTConfig `json:"mqtt" yaml:"mqtt"`
129-
Nanomsg writer.NanomsgConfig `json:"nanomsg" yaml:"nanomsg"`
130-
NATS writer.NATSConfig `json:"nats" yaml:"nats"`
131-
NATSStream writer.NATSStreamConfig `json:"nats_stream" yaml:"nats_stream"`
132-
NSQ writer.NSQConfig `json:"nsq" yaml:"nsq"`
133-
Plugin interface{} `json:"plugin,omitempty" yaml:"plugin,omitempty"`
134-
RedisHash writer.RedisHashConfig `json:"redis_hash" yaml:"redis_hash"`
135-
RedisList writer.RedisListConfig `json:"redis_list" yaml:"redis_list"`
136-
RedisPubSub writer.RedisPubSubConfig `json:"redis_pubsub" yaml:"redis_pubsub"`
137-
RedisStreams writer.RedisStreamsConfig `json:"redis_streams" yaml:"redis_streams"`
138-
Resource string `json:"resource" yaml:"resource"`
139-
Retry RetryConfig `json:"retry" yaml:"retry"`
140-
S3 writer.AmazonS3Config `json:"s3" yaml:"s3"`
141-
SNS writer.SNSConfig `json:"sns" yaml:"sns"`
142-
SQS writer.AmazonSQSConfig `json:"sqs" yaml:"sqs"`
143-
STDOUT STDOUTConfig `json:"stdout" yaml:"stdout"`
144-
Switch SwitchConfig `json:"switch" yaml:"switch"`
145-
SyncResponse struct{} `json:"sync_response" yaml:"sync_response"`
146-
TCP writer.TCPConfig `json:"tcp" yaml:"tcp"`
147-
Try TryConfig `json:"try" yaml:"try"`
148-
UDP writer.UDPConfig `json:"udp" yaml:"udp"`
149-
Socket writer.SocketConfig `json:"socket" yaml:"socket"`
150-
Websocket writer.WebsocketConfig `json:"websocket" yaml:"websocket"`
151-
ZMQ4 *writer.ZMQ4Config `json:"zmq4,omitempty" yaml:"zmq4,omitempty"`
152-
Processors []processor.Config `json:"processors" yaml:"processors"`
107+
Type string `json:"type" yaml:"type"`
108+
AMQP writer.AMQPConfig `json:"amqp" yaml:"amqp"`
109+
AMQP09 writer.AMQPConfig `json:"amqp_0_9" yaml:"amqp_0_9"`
110+
AMQP1 writer.AMQP1Config `json:"amqp_1" yaml:"amqp_1"`
111+
BlobStorage writer.AzureBlobStorageConfig `json:"blob_storage" yaml:"blob_storage"`
112+
Broker BrokerConfig `json:"broker" yaml:"broker"`
113+
Cache writer.CacheConfig `json:"cache" yaml:"cache"`
114+
Drop writer.DropConfig `json:"drop" yaml:"drop"`
115+
DropOnError DropOnErrorConfig `json:"drop_on_error" yaml:"drop_on_error"`
116+
Dynamic DynamicConfig `json:"dynamic" yaml:"dynamic"`
117+
DynamoDB writer.DynamoDBConfig `json:"dynamodb" yaml:"dynamodb"`
118+
Elasticsearch writer.ElasticsearchConfig `json:"elasticsearch" yaml:"elasticsearch"`
119+
File FileConfig `json:"file" yaml:"file"`
120+
Files writer.FilesConfig `json:"files" yaml:"files"`
121+
GCPPubSub writer.GCPPubSubConfig `json:"gcp_pubsub" yaml:"gcp_pubsub"`
122+
HDFS writer.HDFSConfig `json:"hdfs" yaml:"hdfs"`
123+
HTTPClient writer.HTTPClientConfig `json:"http_client" yaml:"http_client"`
124+
HTTPServer HTTPServerConfig `json:"http_server" yaml:"http_server"`
125+
Inproc InprocConfig `json:"inproc" yaml:"inproc"`
126+
Kafka writer.KafkaConfig `json:"kafka" yaml:"kafka"`
127+
Kinesis writer.KinesisConfig `json:"kinesis" yaml:"kinesis"`
128+
KinesisFirehose writer.KinesisFirehoseConfig `json:"kinesis_firehose" yaml:"kinesis_firehose"`
129+
MQTT writer.MQTTConfig `json:"mqtt" yaml:"mqtt"`
130+
Nanomsg writer.NanomsgConfig `json:"nanomsg" yaml:"nanomsg"`
131+
NATS writer.NATSConfig `json:"nats" yaml:"nats"`
132+
NATSStream writer.NATSStreamConfig `json:"nats_stream" yaml:"nats_stream"`
133+
NSQ writer.NSQConfig `json:"nsq" yaml:"nsq"`
134+
Plugin interface{} `json:"plugin,omitempty" yaml:"plugin,omitempty"`
135+
RedisHash writer.RedisHashConfig `json:"redis_hash" yaml:"redis_hash"`
136+
RedisList writer.RedisListConfig `json:"redis_list" yaml:"redis_list"`
137+
RedisPubSub writer.RedisPubSubConfig `json:"redis_pubsub" yaml:"redis_pubsub"`
138+
RedisStreams writer.RedisStreamsConfig `json:"redis_streams" yaml:"redis_streams"`
139+
Resource string `json:"resource" yaml:"resource"`
140+
Retry RetryConfig `json:"retry" yaml:"retry"`
141+
S3 writer.AmazonS3Config `json:"s3" yaml:"s3"`
142+
SNS writer.SNSConfig `json:"sns" yaml:"sns"`
143+
SQS writer.AmazonSQSConfig `json:"sqs" yaml:"sqs"`
144+
STDOUT STDOUTConfig `json:"stdout" yaml:"stdout"`
145+
Switch SwitchConfig `json:"switch" yaml:"switch"`
146+
SyncResponse struct{} `json:"sync_response" yaml:"sync_response"`
147+
TableStorage writer.AzureTableStorageConfig `json:"table_storage" yaml:"table_storage"`
148+
TCP writer.TCPConfig `json:"tcp" yaml:"tcp"`
149+
Try TryConfig `json:"try" yaml:"try"`
150+
UDP writer.UDPConfig `json:"udp" yaml:"udp"`
151+
Socket writer.SocketConfig `json:"socket" yaml:"socket"`
152+
Websocket writer.WebsocketConfig `json:"websocket" yaml:"websocket"`
153+
ZMQ4 *writer.ZMQ4Config `json:"zmq4,omitempty" yaml:"zmq4,omitempty"`
154+
Processors []processor.Config `json:"processors" yaml:"processors"`
153155
}
154156

155157
// NewConfig returns a configuration struct fully populated with default values.
@@ -195,6 +197,7 @@ func NewConfig() Config {
195197
STDOUT: NewSTDOUTConfig(),
196198
Switch: NewSwitchConfig(),
197199
SyncResponse: struct{}{},
200+
TableStorage: writer.NewAzureTableStorageConfig(),
198201
TCP: writer.NewTCPConfig(),
199202
Try: NewTryConfig(),
200203
UDP: writer.NewUDPConfig(),

lib/output/table_storage.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package output
2+
3+
import (
4+
"github.com/Jeffail/benthos/v3/lib/log"
5+
"github.com/Jeffail/benthos/v3/lib/metrics"
6+
"github.com/Jeffail/benthos/v3/lib/output/writer"
7+
"github.com/Jeffail/benthos/v3/lib/types"
8+
"github.com/Jeffail/benthos/v3/lib/x/docs"
9+
)
10+
11+
//------------------------------------------------------------------------------
12+
13+
func init() {
14+
Constructors[TypeTableStorage] = TypeSpec{
15+
constructor: NewAzureTableStorage,
16+
Summary: `
17+
BETA: This input is currently in a BETA stage and is therefore subject to
18+
breaking configuration changes outside of major version releases.
19+
20+
Stores message parts in an Azure Table Storage table.`,
21+
Description: `
22+
In order to set the ` + "`table_name`" + `, ` + "`partition_key`" + ` and ` + "`row_key`" + `
23+
you can use function interpolations described [here](/docs/configuration/interpolation#bloblang-queries), which are
24+
calculated per message of a batch.
25+
26+
If the ` + "`properties`" + ` are not set in the config, all the ` + "`json`" + ` fields
27+
are marshaled and stored in the table, which will be created if it does not exist.
28+
The ` + "`object`" + ` and ` + "`array`" + ` fields are marshaled as strings. e.g.:
29+
30+
The json message:
31+
` + "``` yaml" + `
32+
{
33+
"foo": 55,
34+
"bar": {
35+
"baz": "a",
36+
"bez": "b"
37+
},
38+
"diz": ["a", "b"]
39+
}
40+
` + "```" + `
41+
42+
will store in the table the following properties:
43+
` + "``` yaml" + `
44+
foo: '55'
45+
bar: '{ "baz": "a", "bez": "b" }'
46+
diz: '["a", "b"]'
47+
` + "```" + `
48+
49+
It's also possible to use function interpolations to get or transform the properties values, e.g.:
50+
51+
` + "``` yaml" + `
52+
properties:
53+
device: '${! json("device") }'
54+
timestamp: '${! json("timestamp") }'
55+
` + "```" + ``,
56+
Async: true,
57+
FieldSpecs: docs.FieldSpecs{
58+
docs.FieldCommon("storage_account", "The storage account to upload messages to."),
59+
docs.FieldCommon("storage_access_key", "The storage account access key."),
60+
docs.FieldCommon("table_name", "The table to store messages into.",
61+
`${!meta("kafka_topic")}`,
62+
).SupportsInterpolation(false),
63+
docs.FieldCommon("partition_key", "The partition key.",
64+
`${!json("date")}`,
65+
).SupportsInterpolation(false),
66+
docs.FieldCommon("row_key", "The row key.",
67+
`${!json("device")}-${!uuid_v4()}`,
68+
).SupportsInterpolation(false),
69+
docs.FieldCommon("properties", "A map of properties to store into the table.").SupportsInterpolation(true),
70+
docs.FieldAdvanced("insert_type", "Type of insert operation").HasOptions(
71+
"INSERT", "INSERT_MERGE", "INSERT_REPLACE",
72+
).SupportsInterpolation(false),
73+
docs.FieldCommon("max_in_flight",
74+
"The maximum number of messages to have in flight at a given time. Increase this to improve throughput."),
75+
docs.FieldAdvanced("timeout", "The maximum period to wait on an upload before abandoning it and reattempting."),
76+
},
77+
}
78+
}
79+
80+
//------------------------------------------------------------------------------
81+
82+
// NewAzureTableStorage creates a new NewAzureTableStorage output type.
83+
func NewAzureTableStorage(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error) {
84+
sthree, err := writer.NewAzureTableStorage(conf.TableStorage, log, stats)
85+
if err != nil {
86+
return nil, err
87+
}
88+
if conf.TableStorage.MaxInFlight == 1 {
89+
return NewWriter(
90+
TypeTableStorage, sthree, log, stats,
91+
)
92+
}
93+
return NewAsyncWriter(
94+
TypeTableStorage, conf.TableStorage.MaxInFlight, sthree, log, stats,
95+
)
96+
}
97+
98+
//------------------------------------------------------------------------------

0 commit comments

Comments
 (0)