From d37fe6ccb74a24f141ee8ea5397a3a012d8a8600 Mon Sep 17 00:00:00 2001 From: Damien Mathieu <42@dmathieu.com> Date: Tue, 9 Jul 2024 18:13:54 +0200 Subject: [PATCH] Add consumer profiles (#10464) #### Description This adds profiles support for consumers. #### Link to tracking issue Based on the discussion in #10375. --------- Co-authored-by: Pablo Baeyens --- .chloggen/profile-consumer.yaml | 25 +++++++ consumer/consumer.go | 42 ++---------- consumer/consumerprofiles/Makefile | 1 + consumer/consumerprofiles/go.mod | 35 ++++++++++ consumer/consumerprofiles/go.sum | 77 ++++++++++++++++++++++ consumer/consumerprofiles/profiles.go | 47 +++++++++++++ consumer/consumerprofiles/profiles_test.go | 51 ++++++++++++++ consumer/internal/consumer.go | 42 ++++++++++++ consumer/logs.go | 7 +- consumer/metrics.go | 7 +- consumer/traces.go | 7 +- versions.yaml | 1 + 12 files changed, 297 insertions(+), 45 deletions(-) create mode 100644 .chloggen/profile-consumer.yaml create mode 100644 consumer/consumerprofiles/Makefile create mode 100644 consumer/consumerprofiles/go.mod create mode 100644 consumer/consumerprofiles/go.sum create mode 100644 consumer/consumerprofiles/profiles.go create mode 100644 consumer/consumerprofiles/profiles_test.go create mode 100644 consumer/internal/consumer.go diff --git a/.chloggen/profile-consumer.yaml b/.chloggen/profile-consumer.yaml new file mode 100644 index 00000000000..a515f5a7949 --- /dev/null +++ b/.chloggen/profile-consumer.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: new_component + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: consumer/consumerprofiles + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Allow handling profiles in consumer. + +# One or more tracking issues or pull requests related to the change +issues: [10464] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/consumer/consumer.go b/consumer/consumer.go index 503750ad7cb..64076655f20 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -5,52 +5,22 @@ package consumer // import "go.opentelemetry.io/collector/consumer" import ( "errors" + + "go.opentelemetry.io/collector/consumer/internal" ) // Capabilities describes the capabilities of a Processor. -type Capabilities struct { - // MutatesData is set to true if Consume* function of the - // processor modifies the input Traces, Logs or Metrics argument. - // Processors which modify the input data MUST set this flag to true. If the processor - // does not modify the data it MUST set this flag to false. If the processor creates - // a copy of the data before modifying then this flag can be safely set to false. - MutatesData bool -} - -type baseConsumer interface { - Capabilities() Capabilities -} +type Capabilities = internal.Capabilities var errNilFunc = errors.New("nil consumer func") -type baseImpl struct { - capabilities Capabilities -} - // Option to construct new consumers. -type Option func(*baseImpl) +type Option = internal.Option // WithCapabilities overrides the default GetCapabilities function for a processor. // The default GetCapabilities function returns mutable capabilities. func WithCapabilities(capabilities Capabilities) Option { - return func(o *baseImpl) { - o.capabilities = capabilities - } -} - -// Capabilities returns the capabilities of the component -func (bs baseImpl) Capabilities() Capabilities { - return bs.capabilities -} - -func newBaseImpl(options ...Option) *baseImpl { - bs := &baseImpl{ - capabilities: Capabilities{MutatesData: false}, + return func(o *internal.BaseImpl) { + o.Cap = capabilities } - - for _, op := range options { - op(bs) - } - - return bs } diff --git a/consumer/consumerprofiles/Makefile b/consumer/consumerprofiles/Makefile new file mode 100644 index 00000000000..ded7a36092d --- /dev/null +++ b/consumer/consumerprofiles/Makefile @@ -0,0 +1 @@ +include ../../Makefile.Common diff --git a/consumer/consumerprofiles/go.mod b/consumer/consumerprofiles/go.mod new file mode 100644 index 00000000000..7b0a06cb098 --- /dev/null +++ b/consumer/consumerprofiles/go.mod @@ -0,0 +1,35 @@ +module go.opentelemetry.io/collector/consumer/consumerprofiles + +go 1.21.0 + +replace go.opentelemetry.io/collector/pdata => ../../pdata + +replace go.opentelemetry.io/collector/pdata/pprofile => ../../pdata/pprofile + +replace go.opentelemetry.io/collector/consumer => ../ + +require ( + github.com/stretchr/testify v1.9.0 + go.opentelemetry.io/collector/consumer v0.104.0 + go.opentelemetry.io/collector/pdata/pprofile v0.104.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + go.opentelemetry.io/collector/pdata v1.11.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + golang.org/x/net v0.25.0 // indirect + golang.org/x/sys v0.20.0 // indirect + golang.org/x/text v0.15.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect + google.golang.org/grpc v1.65.0 // indirect + google.golang.org/protobuf v1.34.2 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) + +replace go.opentelemetry.io/collector/pdata/testdata => ../../pdata/testdata diff --git a/consumer/consumerprofiles/go.sum b/consumer/consumerprofiles/go.sum new file mode 100644 index 00000000000..528166b78c0 --- /dev/null +++ b/consumer/consumerprofiles/go.sum @@ -0,0 +1,77 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= +golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= +golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 h1:Zy9XzmMEflZ/MAaA7vNcoebnRAld7FsPW1EeBB7V0m8= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= +google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc= +google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/consumer/consumerprofiles/profiles.go b/consumer/consumerprofiles/profiles.go new file mode 100644 index 00000000000..7ab6b864dff --- /dev/null +++ b/consumer/consumerprofiles/profiles.go @@ -0,0 +1,47 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package consumerprofiles // import "go.opentelemetry.io/collector/consumer/consumerprofiles" + +import ( + "context" + "errors" + + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/internal" + "go.opentelemetry.io/collector/pdata/pprofile" +) + +var errNilFunc = errors.New("nil consumer func") + +// Profiles is an interface that receives pprofile.Profiles, processes it +// as needed, and sends it to the next processing node if any or to the destination. +type Profiles interface { + internal.BaseConsumer + // ConsumeProfiles receives pprofile.Profiles for consumption. + ConsumeProfiles(ctx context.Context, td pprofile.Profiles) error +} + +// ConsumeProfilesFunc is a helper function that is similar to ConsumeProfiles. +type ConsumeProfilesFunc func(ctx context.Context, td pprofile.Profiles) error + +// ConsumeProfiles calls f(ctx, td). +func (f ConsumeProfilesFunc) ConsumeProfiles(ctx context.Context, td pprofile.Profiles) error { + return f(ctx, td) +} + +type baseProfiles struct { + *internal.BaseImpl + ConsumeProfilesFunc +} + +// NewProfiles returns a Profiles configured with the provided options. +func NewProfiles(consume ConsumeProfilesFunc, options ...consumer.Option) (Profiles, error) { + if consume == nil { + return nil, errNilFunc + } + return &baseProfiles{ + BaseImpl: internal.NewBaseImpl(options...), + ConsumeProfilesFunc: consume, + }, nil +} diff --git a/consumer/consumerprofiles/profiles_test.go b/consumer/consumerprofiles/profiles_test.go new file mode 100644 index 00000000000..e50866e74da --- /dev/null +++ b/consumer/consumerprofiles/profiles_test.go @@ -0,0 +1,51 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package consumerprofiles + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pprofile" +) + +func TestDefaultProfiles(t *testing.T) { + cp, err := NewProfiles(func(context.Context, pprofile.Profiles) error { return nil }) + assert.NoError(t, err) + assert.NoError(t, cp.ConsumeProfiles(context.Background(), pprofile.NewProfiles())) + assert.Equal(t, consumer.Capabilities{MutatesData: false}, cp.Capabilities()) +} + +func TestNilFuncProfiles(t *testing.T) { + _, err := NewProfiles(nil) + assert.Equal(t, errNilFunc, err) +} + +func TestWithCapabilitiesProfiles(t *testing.T) { + cp, err := NewProfiles( + func(context.Context, pprofile.Profiles) error { return nil }, + consumer.WithCapabilities(consumer.Capabilities{MutatesData: true})) + assert.NoError(t, err) + assert.NoError(t, cp.ConsumeProfiles(context.Background(), pprofile.NewProfiles())) + assert.Equal(t, consumer.Capabilities{MutatesData: true}, cp.Capabilities()) +} + +func TestConsumeProfiles(t *testing.T) { + consumeCalled := false + cp, err := NewProfiles(func(context.Context, pprofile.Profiles) error { consumeCalled = true; return nil }) + assert.NoError(t, err) + assert.NoError(t, cp.ConsumeProfiles(context.Background(), pprofile.NewProfiles())) + assert.True(t, consumeCalled) +} + +func TestConsumeProfiles_ReturnError(t *testing.T) { + want := errors.New("my_error") + cp, err := NewProfiles(func(context.Context, pprofile.Profiles) error { return want }) + assert.NoError(t, err) + assert.Equal(t, want, cp.ConsumeProfiles(context.Background(), pprofile.NewProfiles())) +} diff --git a/consumer/internal/consumer.go b/consumer/internal/consumer.go new file mode 100644 index 00000000000..1f2b5683b22 --- /dev/null +++ b/consumer/internal/consumer.go @@ -0,0 +1,42 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/collector/consumer/internal" + +// Capabilities describes the capabilities of a Processor. +type Capabilities struct { + // MutatesData is set to true if Consume* function of the + // processor modifies the input Traces, Logs or Metrics argument. + // Processors which modify the input data MUST set this flag to true. If the processor + // does not modify the data it MUST set this flag to false. If the processor creates + // a copy of the data before modifying then this flag can be safely set to false. + MutatesData bool +} + +type BaseConsumer interface { + Capabilities() Capabilities +} + +type BaseImpl struct { + Cap Capabilities +} + +// Option to construct new consumers. +type Option func(*BaseImpl) + +// Capabilities returns the capabilities of the component +func (bs BaseImpl) Capabilities() Capabilities { + return bs.Cap +} + +func NewBaseImpl(options ...Option) *BaseImpl { + bs := &BaseImpl{ + Cap: Capabilities{MutatesData: false}, + } + + for _, op := range options { + op(bs) + } + + return bs +} diff --git a/consumer/logs.go b/consumer/logs.go index 5bf89a52f7a..15166ef1196 100644 --- a/consumer/logs.go +++ b/consumer/logs.go @@ -6,13 +6,14 @@ package consumer // import "go.opentelemetry.io/collector/consumer" import ( "context" + "go.opentelemetry.io/collector/consumer/internal" "go.opentelemetry.io/collector/pdata/plog" ) // Logs is an interface that receives plog.Logs, processes it // as needed, and sends it to the next processing node if any or to the destination. type Logs interface { - baseConsumer + internal.BaseConsumer // ConsumeLogs receives plog.Logs for consumption. ConsumeLogs(ctx context.Context, ld plog.Logs) error } @@ -26,7 +27,7 @@ func (f ConsumeLogsFunc) ConsumeLogs(ctx context.Context, ld plog.Logs) error { } type baseLogs struct { - *baseImpl + *internal.BaseImpl ConsumeLogsFunc } @@ -36,7 +37,7 @@ func NewLogs(consume ConsumeLogsFunc, options ...Option) (Logs, error) { return nil, errNilFunc } return &baseLogs{ - baseImpl: newBaseImpl(options...), + BaseImpl: internal.NewBaseImpl(options...), ConsumeLogsFunc: consume, }, nil } diff --git a/consumer/metrics.go b/consumer/metrics.go index 50df60f02d0..47897f9363a 100644 --- a/consumer/metrics.go +++ b/consumer/metrics.go @@ -6,13 +6,14 @@ package consumer // import "go.opentelemetry.io/collector/consumer" import ( "context" + "go.opentelemetry.io/collector/consumer/internal" "go.opentelemetry.io/collector/pdata/pmetric" ) // Metrics is an interface that receives pmetric.Metrics, processes it // as needed, and sends it to the next processing node if any or to the destination. type Metrics interface { - baseConsumer + internal.BaseConsumer // ConsumeMetrics receives pmetric.Metrics for consumption. ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error } @@ -26,7 +27,7 @@ func (f ConsumeMetricsFunc) ConsumeMetrics(ctx context.Context, md pmetric.Metri } type baseMetrics struct { - *baseImpl + *internal.BaseImpl ConsumeMetricsFunc } @@ -36,7 +37,7 @@ func NewMetrics(consume ConsumeMetricsFunc, options ...Option) (Metrics, error) return nil, errNilFunc } return &baseMetrics{ - baseImpl: newBaseImpl(options...), + BaseImpl: internal.NewBaseImpl(options...), ConsumeMetricsFunc: consume, }, nil } diff --git a/consumer/traces.go b/consumer/traces.go index 56cebd53b37..60df2d04536 100644 --- a/consumer/traces.go +++ b/consumer/traces.go @@ -6,13 +6,14 @@ package consumer // import "go.opentelemetry.io/collector/consumer" import ( "context" + "go.opentelemetry.io/collector/consumer/internal" "go.opentelemetry.io/collector/pdata/ptrace" ) // Traces is an interface that receives ptrace.Traces, processes it // as needed, and sends it to the next processing node if any or to the destination. type Traces interface { - baseConsumer + internal.BaseConsumer // ConsumeTraces receives ptrace.Traces for consumption. ConsumeTraces(ctx context.Context, td ptrace.Traces) error } @@ -26,7 +27,7 @@ func (f ConsumeTracesFunc) ConsumeTraces(ctx context.Context, td ptrace.Traces) } type baseTraces struct { - *baseImpl + *internal.BaseImpl ConsumeTracesFunc } @@ -36,7 +37,7 @@ func NewTraces(consume ConsumeTracesFunc, options ...Option) (Traces, error) { return nil, errNilFunc } return &baseTraces{ - baseImpl: newBaseImpl(options...), + BaseImpl: internal.NewBaseImpl(options...), ConsumeTracesFunc: consume, }, nil } diff --git a/versions.yaml b/versions.yaml index e22abc194fd..90ecb1250fd 100644 --- a/versions.yaml +++ b/versions.yaml @@ -35,6 +35,7 @@ module-sets: - go.opentelemetry.io/collector/connector - go.opentelemetry.io/collector/connector/forwardconnector - go.opentelemetry.io/collector/consumer + - go.opentelemetry.io/collector/consumer/consumerprofiles - go.opentelemetry.io/collector/exporter - go.opentelemetry.io/collector/exporter/debugexporter - go.opentelemetry.io/collector/exporter/loggingexporter