Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion internal/agentcfg/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ func newMockElasticsearchClient(t testing.TB, handler func(http.ResponseWriter,
config := elasticsearch.DefaultConfig()
config.Backoff.Init = time.Nanosecond
config.Hosts = []string{srv.URL}
client, err := elasticsearch.NewClient(config, logptest.NewTestingLogger(t, ""))
client, err := elasticsearch.NewClient(elasticsearch.ClientParams{
Config: config,
Logger: logptest.NewTestingLogger(t, ""),
})
require.NoError(t, err)
return client
}
Expand Down
2 changes: 1 addition & 1 deletion internal/beater/auth/authenticator.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func NewAuthenticator(cfg config.AgentAuth, tp trace.TracerProvider, logger *log
cfg.APIKey.ESConfig.Username = ""
cfg.APIKey.ESConfig.Password = ""
cfg.APIKey.ESConfig.APIKey = ""
client, err := elasticsearch.NewClientParams(elasticsearch.ClientParams{
client, err := elasticsearch.NewClient(elasticsearch.ClientParams{
Config: cfg.APIKey.ESConfig,
Logger: logger,
TracerProvider: tp,
Expand Down
53 changes: 29 additions & 24 deletions internal/beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,25 +316,22 @@ func (s *Runner) Run(ctx context.Context) error {
close(publishReady)
return nil
})
newESClient := func(tp trace.TracerProvider) func(cfg *elasticsearch.Config, logger *logp.Logger) (*elasticsearch.Client, error) {
return func(cfg *elasticsearch.Config, logger *logp.Logger) (*elasticsearch.Client, error) {
httpTransport, err := elasticsearch.NewHTTPTransport(cfg, logger)
if err != nil {
return nil, err
}
transport := &waitReadyRoundTripper{Transport: httpTransport, ready: publishReady, drain: drain}
return elasticsearch.NewClientParams(elasticsearch.ClientParams{
Config: cfg,
Transport: transport,
RetryOnError: func(_ *http.Request, err error) bool {
return !errors.Is(err, errServerShuttingDown)
},
Logger: logger,
TracerProvider: tp,
})
newElasticsearchClient := func(args elasticsearch.ClientParams) (*elasticsearch.Client, error) {
httpTransport, err := elasticsearch.NewHTTPTransport(args.Config, args.Logger)
if err != nil {
return nil, err
}
transport := &waitReadyRoundTripper{Transport: httpTransport, ready: publishReady, drain: drain}
return elasticsearch.NewClient(elasticsearch.ClientParams{
Config: args.Config,
Transport: transport,
RetryOnError: func(_ *http.Request, err error) bool {
return !errors.Is(err, errServerShuttingDown)
},
Logger: args.Logger,
TracerProvider: args.TracerProvider,
})
}
newElasticsearchClient := newESClient(s.tracerProvider)

var sourcemapFetcher sourcemap.Fetcher
if s.config.RumConfig.Enabled && s.config.RumConfig.SourceMapping.Enabled {
Expand Down Expand Up @@ -456,7 +453,7 @@ func (s *Runner) Run(ctx context.Context) error {
SourcemapFetcher: sourcemapFetcher,
PublishReady: publishReady,
KibanaClient: kibanaClient,
NewElasticsearchClient: newESClient(tracenoop.NewTracerProvider()),
NewElasticsearchClient: newElasticsearchClient,
GRPCServer: grpcServer,
Semaphore: semaphore.NewWeighted(int64(s.config.MaxConcurrentDecoders)),
BeatMonitoring: s.beatMonitoring,
Expand Down Expand Up @@ -509,7 +506,7 @@ func (s *Runner) Run(ctx context.Context) error {
if tracerServerListener != nil {
// use a batch processor without tracing to prevent the tracing processor from sending traces to itself
finalTracerBatchProcessor, closeTracerFinalBatchProcessor, err := s.newFinalBatchProcessor(
tracer, newESClient(tracenoop.NewTracerProvider()), memLimitGB, s.logger, tracenoop.NewTracerProvider(), metricnoop.NewMeterProvider(),
tracer, newElasticsearchClient, memLimitGB, s.logger, tracenoop.NewTracerProvider(), metricnoop.NewMeterProvider(),
)
if err != nil {
return err
Expand Down Expand Up @@ -649,7 +646,7 @@ func (s *Runner) waitReady(
if err != nil {
return err
}
esOutputClient, err = elasticsearch.NewClientParams(elasticsearch.ClientParams{
esOutputClient, err = elasticsearch.NewClient(elasticsearch.ClientParams{
Config: esConfig,
Logger: s.logger,
TracerProvider: s.tracerProvider,
Expand Down Expand Up @@ -712,7 +709,7 @@ func (s *Runner) waitReady(
// "elasticsearch", then we use docappender; otherwise we use the libbeat publisher.
func (s *Runner) newFinalBatchProcessor(
tracer *apm.Tracer,
newElasticsearchClient func(*elasticsearch.Config, *logp.Logger) (*elasticsearch.Client, error),
newElasticsearchClient func(elasticsearch.ClientParams) (*elasticsearch.Client, error),
memLimit float64,
logger *logp.Logger,
tp trace.TracerProvider,
Expand All @@ -734,7 +731,11 @@ func (s *Runner) newFinalBatchProcessor(
if err != nil {
return nil, nil, err
}
client, err := newElasticsearchClient(esCfg, logger)
client, err := newElasticsearchClient(elasticsearch.ClientParams{
Config: esCfg,
Logger: logger,
TracerProvider: tp,
})
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -918,11 +919,15 @@ const sourcemapIndex = ".apm-source-map"
func newSourcemapFetcher(
cfg config.SourceMapping,
kibanaClient *kibana.Client,
newElasticsearchClient func(*elasticsearch.Config, *logp.Logger) (*elasticsearch.Client, error),
newElasticsearchClient func(elasticsearch.ClientParams) (*elasticsearch.Client, error),
tp trace.TracerProvider,
logger *logp.Logger,
) (sourcemap.Fetcher, context.CancelFunc, error) {
esClient, err := newElasticsearchClient(cfg.ESConfig, logger)
esClient, err := newElasticsearchClient(elasticsearch.ClientParams{
Config: cfg.ESConfig,
Logger: logger,
TracerProvider: tp,
})
if err != nil {
return nil, nil, err
}
Expand Down
5 changes: 4 additions & 1 deletion internal/beater/beater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,10 @@ func newMockClusterUUIDClient(t testing.TB, clusterUUID string) *elasticsearch.C

config := elasticsearch.DefaultConfig()
config.Hosts = []string{srv.URL}
client, err := elasticsearch.NewClient(config, logptest.NewTestingLogger(t, ""))
client, err := elasticsearch.NewClient(elasticsearch.ClientParams{
Config: config,
Logger: logptest.NewTestingLogger(t, ""),
})
require.NoError(t, err)
return client
}
Expand Down
10 changes: 7 additions & 3 deletions internal/beater/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ type ServerParams struct {
// for indexing. Under some configuration, the server will wrap the
// client's transport such that requests will be blocked until data
// streams have been initialised.
NewElasticsearchClient func(*elasticsearch.Config, *logp.Logger) (*elasticsearch.Client, error)
NewElasticsearchClient func(elasticsearch.ClientParams) (*elasticsearch.Client, error)

// GRPCServer holds a *grpc.Server to which services will be registered
// for receiving data, configuration requests, etc.
Expand Down Expand Up @@ -240,7 +240,7 @@ func newAgentConfigFetcher(
ctx context.Context,
cfg *config.Config,
kibanaClient *kibana.Client,
newElasticsearchClient func(*elasticsearch.Config, *logp.Logger) (*elasticsearch.Client, error),
newElasticsearchClient func(elasticsearch.ClientParams) (*elasticsearch.Client, error),
tp trace.TracerProvider,
mp metric.MeterProvider,
logger *logp.Logger,
Expand All @@ -264,7 +264,11 @@ func newAgentConfigFetcher(
// It is possible that none of the above applies.
}

esClient, err := newElasticsearchClient(cfg.AgentConfig.ESConfig, logger)
esClient, err := newElasticsearchClient(elasticsearch.ClientParams{
Config: cfg.AgentConfig.ESConfig,
Logger: logger,
TracerProvider: tp,
})
if err != nil {
return nil, nil, err
}
Expand Down
10 changes: 2 additions & 8 deletions internal/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var userAgent = fmt.Sprintf("Elastic-APM-Server/%s go-elasticsearch/%s", version

type Client = elastictransport.Client

// ClientParams holds parameters for NewClientParams.
// ClientParams holds parameters for NewClient.
type ClientParams struct {
// Config holds the user-defined configuration: Elasticsearch hosts,
// max retries, etc.
Expand All @@ -66,13 +66,7 @@ type ClientParams struct {
}

// NewClient returns a stack version-aware Elasticsearch client,
// equivalent to NewClientParams(ClientParams{Config: config}).
func NewClient(config *Config, logger *logp.Logger) (*Client, error) {
return NewClientParams(ClientParams{Config: config, Logger: logger})
}

// NewClientParams returns a stack version-aware Elasticsearch client.
func NewClientParams(args ClientParams) (*Client, error) {
func NewClient(args ClientParams) (*Client, error) {
if args.Config == nil {
return nil, errConfigMissing
}
Expand Down
23 changes: 17 additions & 6 deletions internal/elasticsearch/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,17 @@ import (

func TestClient(t *testing.T) {
t.Run("no config", func(t *testing.T) {
goESClient, err := NewClient(nil, logptest.NewTestingLogger(t, ""))
goESClient, err := NewClient(ClientParams{})
assert.Error(t, err)
assert.Nil(t, goESClient)
})

t.Run("valid config", func(t *testing.T) {
cfg := Config{Hosts: Hosts{"localhost:9200", "localhost:9201"}}
goESClient, err := NewClient(&cfg, logptest.NewTestingLogger(t, ""))
goESClient, err := NewClient(ClientParams{
Config: &cfg,
Logger: logptest.NewTestingLogger(t, ""),
})
require.NoError(t, err)
assert.NotNil(t, goESClient)
})
Expand All @@ -60,7 +63,10 @@ func TestClientCustomHeaders(t *testing.T) {
Hosts: Hosts{srv.URL},
Headers: map[string]string{"custom": "header"},
}
client, err := NewClient(&cfg, logptest.NewTestingLogger(t, ""))
client, err := NewClient(ClientParams{
Config: &cfg,
Logger: logptest.NewTestingLogger(t, ""),
})
require.NoError(t, err)

req, err := http.NewRequest(http.MethodPost, "/_bulk", bytes.NewReader([]byte("{}")))
Expand All @@ -73,7 +79,6 @@ func TestClientCustomHeaders(t *testing.T) {
case <-time.After(1 * time.Second):
t.Fatal("timed out while waiting for request")
}

}

func TestClientCustomUserAgent(t *testing.T) {
Expand All @@ -88,7 +93,10 @@ func TestClientCustomUserAgent(t *testing.T) {
cfg := Config{
Hosts: Hosts{srv.URL},
}
client, err := NewClient(&cfg, logptest.NewTestingLogger(t, ""))
client, err := NewClient(ClientParams{
Config: &cfg,
Logger: logptest.NewTestingLogger(t, ""),
})
require.NoError(t, err)

req, err := http.NewRequest(http.MethodPost, "/_bulk", bytes.NewReader([]byte("{}")))
Expand Down Expand Up @@ -179,7 +187,10 @@ func TestClientRetryableStatuses(t *testing.T) {
MaxRetries: maxRetries,
Hosts: []string{srv.URL},
}
client, err := NewClient(&c, logptest.NewTestingLogger(t, ""))
client, err := NewClient(ClientParams{
Config: &c,
Logger: logptest.NewTestingLogger(t, ""),
})
require.NoError(t, err)

var buf bytes.Buffer
Expand Down
5 changes: 4 additions & 1 deletion internal/elasticsearch/security_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ func TestHasPrivilegesError(t *testing.T) {
}))
defer server.Close()

client, err := NewClient(&Config{Hosts: Hosts{server.Listener.Addr().String()}}, logptest.NewTestingLogger(t, ""))
client, err := NewClient(ClientParams{
Config: &Config{Hosts: Hosts{server.Listener.Addr().String()}},
Logger: logptest.NewTestingLogger(t, ""),
})
require.NoError(t, err)

resp, err := HasPrivileges(context.Background(), client, HasPrivilegesRequest{}, "foo")
Expand Down
7 changes: 5 additions & 2 deletions internal/sourcemap/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func newUnavailableElasticsearchClient(t testing.TB) *elasticsearch.Client {
cfg.MaxRetries = 1
cfg.Backoff.Init = time.Nanosecond
cfg.Backoff.Max = time.Nanosecond
client, err := elasticsearch.NewClientParams(elasticsearch.ClientParams{Config: cfg, Transport: transport, Logger: logptest.NewTestingLogger(t, "")})
client, err := elasticsearch.NewClient(elasticsearch.ClientParams{Config: cfg, Transport: transport, Logger: logptest.NewTestingLogger(t, "")})
require.NoError(t, err)
return client
}
Expand All @@ -162,7 +162,10 @@ func newMockElasticsearchClient(t testing.TB, statusCode int, responseBody io.Re
config := elasticsearch.DefaultConfig()
config.Backoff.Init = time.Nanosecond
config.Hosts = []string{srv.URL}
client, err := elasticsearch.NewClient(config, logptest.NewTestingLogger(t, ""))
client, err := elasticsearch.NewClient(elasticsearch.ClientParams{
Config: config,
Logger: logptest.NewTestingLogger(t, ""),
})
require.NoError(t, err)
return client
}
Expand Down
10 changes: 8 additions & 2 deletions internal/sourcemap/metadata_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,10 @@ func TestMetadataFetcher(t *testing.T) {
esConfig := elasticsearch.DefaultConfig()
esConfig.Hosts = []string{ts.URL}

esClient, err := elasticsearch.NewClient(esConfig, logptest.NewTestingLogger(t, ""))
esClient, err := elasticsearch.NewClient(elasticsearch.ClientParams{
Config: esConfig,
Logger: logptest.NewTestingLogger(t, ""),
})
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
Expand Down Expand Up @@ -268,7 +271,10 @@ func TestInvalidation(t *testing.T) {
esConfig := elasticsearch.DefaultConfig()
esConfig.Hosts = []string{ts.URL}

esClient, err := elasticsearch.NewClient(esConfig, logptest.NewTestingLogger(t, ""))
esClient, err := elasticsearch.NewClient(elasticsearch.ClientParams{
Config: esConfig,
Logger: logptest.NewTestingLogger(t, ""),
})
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
Expand Down
2 changes: 1 addition & 1 deletion internal/sourcemap/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func TestBatchProcessorTimeout(t *testing.T) {

cfg := elasticsearch.DefaultConfig()
cfg.Hosts = []string{""}
client, err := elasticsearch.NewClientParams(elasticsearch.ClientParams{
client, err := elasticsearch.NewClient(elasticsearch.ClientParams{
Config: cfg,
Transport: transport,
Logger: logptest.NewTestingLogger(t, ""),
Expand Down
8 changes: 7 additions & 1 deletion x-pack/apm-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/gofrs/uuid/v5"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace/noop"
"golang.org/x/sync/errgroup"

"github.com/elastic/beats/v7/libbeat/common/reload"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/elastic/apm-data/model/modelprocessor"
"github.com/elastic/apm-server/internal/beatcmd"
"github.com/elastic/apm-server/internal/beater"
"github.com/elastic/apm-server/internal/elasticsearch"
"github.com/elastic/apm-server/x-pack/apm-server/sampling"
"github.com/elastic/apm-server/x-pack/apm-server/sampling/eventstorage"
)
Expand Down Expand Up @@ -102,7 +104,11 @@ func newProcessors(args beater.ServerParams) ([]namedProcessor, error) {

func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, error) {
tailSamplingConfig := args.Config.Sampling.Tail
es, err := args.NewElasticsearchClient(tailSamplingConfig.ESConfig, args.Logger)
es, err := args.NewElasticsearchClient(elasticsearch.ClientParams{
Config: tailSamplingConfig.ESConfig,
Logger: args.Logger,
TracerProvider: noop.NewTracerProvider(),
})
if err != nil {
return nil, fmt.Errorf("failed to create Elasticsearch client for tail-sampling: %w", err)
}
Expand Down