-
Notifications
You must be signed in to change notification settings - Fork 22
/
Copy pathtransformer.go
127 lines (106 loc) · 3.56 KB
/
transformer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package helper
import (
"context"
"fmt"
"github.com/antonmedv/expr"
"github.com/antonmedv/expr/vm"
"github.com/observiq/stanza/entry"
"github.com/observiq/stanza/errors"
"github.com/observiq/stanza/operator"
"go.uber.org/zap"
)
// NewTransformerConfig creates a new transformer config with default values
func NewTransformerConfig(operatorID, operatorType string) TransformerConfig {
return TransformerConfig{
WriterConfig: NewWriterConfig(operatorID, operatorType),
OnError: SendOnError,
}
}
// TransformerConfig provides a basic implementation of a transformer config.
type TransformerConfig struct {
WriterConfig `yaml:",inline"`
OnError string `json:"on_error" yaml:"on_error"`
IfExpr string `json:"if" yaml:"if"`
}
// Build will build a transformer operator.
func (c TransformerConfig) Build(context operator.BuildContext) (TransformerOperator, error) {
writerOperator, err := c.WriterConfig.Build(context)
if err != nil {
return TransformerOperator{}, errors.WithDetails(err, "operator_id", c.ID())
}
switch c.OnError {
case SendOnError, DropOnError:
default:
return TransformerOperator{}, errors.NewError(
"operator config has an invalid `on_error` field.",
"ensure that the `on_error` field is set to either `send` or `drop`.",
"on_error", c.OnError,
)
}
transformerOperator := TransformerOperator{
WriterOperator: writerOperator,
OnError: c.OnError,
}
if c.IfExpr != "" {
compiled, err := expr.Compile(c.IfExpr, expr.AsBool(), expr.AllowUndefinedVariables())
if err != nil {
return TransformerOperator{}, fmt.Errorf("failed to compile expression '%s': %w", c.IfExpr, err)
}
transformerOperator.IfExpr = compiled
}
return transformerOperator, nil
}
// TransformerOperator provides a basic implementation of a transformer operator.
type TransformerOperator struct {
WriterOperator
OnError string
IfExpr *vm.Program
}
// CanProcess will always return true for a transformer operator.
func (t *TransformerOperator) CanProcess() bool {
return true
}
// ProcessWith will process an entry with a transform function.
func (t *TransformerOperator) ProcessWith(ctx context.Context, entry *entry.Entry, transform TransformFunction) error {
// Short circuit if the "if" condition does not match
skip, err := t.Skip(ctx, entry)
if err != nil {
return t.HandleEntryError(ctx, entry, err)
}
if skip {
t.Write(ctx, entry)
return nil
}
if err := transform(entry); err != nil {
return t.HandleEntryError(ctx, entry, err)
}
t.Write(ctx, entry)
return nil
}
// HandleEntryError will handle an entry error using the on_error strategy.
func (t *TransformerOperator) HandleEntryError(ctx context.Context, entry *entry.Entry, err error) error {
t.Errorw("Failed to process entry", zap.Any("error", err), zap.Any("action", t.OnError), zap.Any("entry", entry))
if t.OnError == SendOnError {
t.Write(ctx, entry)
}
return err
}
// Skip if entry doesn't match expression
func (t *TransformerOperator) Skip(_ context.Context, entry *entry.Entry) (bool, error) {
if t.IfExpr == nil {
return false, nil
}
env := GetExprEnv(entry)
defer PutExprEnv(env)
matches, err := vm.Run(t.IfExpr, env)
if err != nil {
return false, fmt.Errorf("running if expr: %s", err)
}
return !matches.(bool), nil
}
// TransformFunction is function that transforms an entry.
type TransformFunction = func(*entry.Entry) error
// SendOnError specifies an on_error mode for sending entries after an error.
const SendOnError = "send"
// DropOnError specifies an on_error mode for dropping entries after an error.
const DropOnError = "drop"