From d9ec1c690cce2c3e789f34069f0e4dfc18d9ba11 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Tue, 23 Sep 2025 13:37:07 +0930 Subject: [PATCH] internal/elasticsearch/ingest: export LoadIngestPipelineFiles, InstallPipelinesInElasticsearch and GetPipelineNameWithNonce --- internal/elasticsearch/ingest/datastream.go | 22 +++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/internal/elasticsearch/ingest/datastream.go b/internal/elasticsearch/ingest/datastream.go index 17e2b9289..bb6a4115d 100644 --- a/internal/elasticsearch/ingest/datastream.go +++ b/internal/elasticsearch/ingest/datastream.go @@ -57,20 +57,23 @@ func InstallDataStreamPipelines(ctx context.Context, api *elasticsearch.API, dat nonce := time.Now().UnixNano() - mainPipeline := getPipelineNameWithNonce(dataStreamManifest.GetPipelineNameOrDefault(), nonce) - pipelines, err := loadIngestPipelineFiles(dataStreamPath, nonce) + mainPipeline := GetPipelineNameWithNonce(dataStreamManifest.GetPipelineNameOrDefault(), nonce) + pipelines, err := LoadIngestPipelineFiles(dataStreamPath, nonce) if err != nil { return "", nil, fmt.Errorf("loading ingest pipeline files failed: %w", err) } - err = installPipelinesInElasticsearch(ctx, api, pipelines) + err = InstallPipelinesInElasticsearch(ctx, api, pipelines) if err != nil { return "", nil, err } return mainPipeline, pipelines, nil } -func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]Pipeline, error) { +// LoadIngestPipelineFiles returns the set of pipelines found in the directory +// elasticsearch/ingest_pipeline under the provided data stream path. The names +// of the pipelines are decorated with the provided nonce. +func LoadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]Pipeline, error) { elasticsearchPath := filepath.Join(dataStreamPath, "elasticsearch", "ingest_pipeline") var pipelineFiles []string @@ -100,7 +103,7 @@ func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]Pipeline, er return nil } pipelineTag := s[1] - return []byte(getPipelineNameWithNonce(pipelineTag, nonce)) + return []byte(GetPipelineNameWithNonce(pipelineTag, nonce)) }) if err != nil { return nil, err @@ -114,7 +117,7 @@ func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]Pipeline, er name := filepath.Base(path) pipelines = append(pipelines, Pipeline{ Path: path, - Name: getPipelineNameWithNonce(name[:strings.Index(name, ".")], nonce), + Name: GetPipelineNameWithNonce(name[:strings.Index(name, ".")], nonce), Format: filepath.Ext(strings.TrimSuffix(path, ".link"))[1:], Content: cWithRerouteProcessors, ContentOriginal: c, @@ -241,7 +244,9 @@ func convertValue(value interface{}, label string) ([]string, error) { } } -func installPipelinesInElasticsearch(ctx context.Context, api *elasticsearch.API, pipelines []Pipeline) error { +// InstallPipelinesInElasticsearch installs the provided pipelines into the +// Elasticsearch instance specified by the provided API handle. +func InstallPipelinesInElasticsearch(ctx context.Context, api *elasticsearch.API, pipelines []Pipeline) error { for _, p := range pipelines { if err := installPipeline(ctx, api, p); err != nil { return err @@ -317,6 +322,7 @@ func getIngestPipeline(ctx context.Context, api *elasticsearch.API, pipeline Pip return nil } -func getPipelineNameWithNonce(pipelineName string, nonce int64) string { +// GetPipelineNameWithNonce returns the pipeline name decorated with the provided nonce. +func GetPipelineNameWithNonce(pipelineName string, nonce int64) string { return fmt.Sprintf("%s-%d", pipelineName, nonce) }