Skip to content

Commit 9eea668

Browse files
committed
internal/elasticsearch/ingest: export LoadIngestPipelineFiles and InstallPipelinesInElasticsearch
1 parent 9edf442 commit 9eea668

File tree

1 file changed

+12
-7
lines changed

1 file changed

+12
-7
lines changed

internal/elasticsearch/ingest/datastream.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,20 +57,23 @@ func InstallDataStreamPipelines(ctx context.Context, api *elasticsearch.API, dat
5757

5858
nonce := time.Now().UnixNano()
5959

60-
mainPipeline := getPipelineNameWithNonce(dataStreamManifest.GetPipelineNameOrDefault(), nonce)
61-
pipelines, err := loadIngestPipelineFiles(dataStreamPath, nonce)
60+
mainPipeline := GetPipelineNameWithNonce(dataStreamManifest.GetPipelineNameOrDefault(), nonce)
61+
pipelines, err := LoadIngestPipelineFiles(dataStreamPath, nonce)
6262
if err != nil {
6363
return "", nil, fmt.Errorf("loading ingest pipeline files failed: %w", err)
6464
}
6565

66-
err = installPipelinesInElasticsearch(ctx, api, pipelines)
66+
err = InstallPipelinesInElasticsearch(ctx, api, pipelines)
6767
if err != nil {
6868
return "", nil, err
6969
}
7070
return mainPipeline, pipelines, nil
7171
}
7272

73-
func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]Pipeline, error) {
73+
// LoadIngestPipelineFiles returns the set of pipelines found in the directory
74+
// elasticsearch/ingest_pipeline under the provided data stream path. The names
75+
// of the pipelines are decorated with the provided nonce.
76+
func LoadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]Pipeline, error) {
7477
elasticsearchPath := filepath.Join(dataStreamPath, "elasticsearch", "ingest_pipeline")
7578

7679
var pipelineFiles []string
@@ -100,7 +103,7 @@ func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]Pipeline, er
100103
return nil
101104
}
102105
pipelineTag := s[1]
103-
return []byte(getPipelineNameWithNonce(pipelineTag, nonce))
106+
return []byte(GetPipelineNameWithNonce(pipelineTag, nonce))
104107
})
105108
if err != nil {
106109
return nil, err
@@ -114,7 +117,7 @@ func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]Pipeline, er
114117
name := filepath.Base(path)
115118
pipelines = append(pipelines, Pipeline{
116119
Path: path,
117-
Name: getPipelineNameWithNonce(name[:strings.Index(name, ".")], nonce),
120+
Name: GetPipelineNameWithNonce(name[:strings.Index(name, ".")], nonce),
118121
Format: filepath.Ext(strings.TrimSuffix(path, ".link"))[1:],
119122
Content: cWithRerouteProcessors,
120123
ContentOriginal: c,
@@ -241,7 +244,9 @@ func convertValue(value interface{}, label string) ([]string, error) {
241244
}
242245
}
243246

244-
func installPipelinesInElasticsearch(ctx context.Context, api *elasticsearch.API, pipelines []Pipeline) error {
247+
// InstallPipelinesInElasticsearch installs the provided pipelines into the
248+
// Elasticsearch instance specified by the provided API handle.
249+
func InstallPipelinesInElasticsearch(ctx context.Context, api *elasticsearch.API, pipelines []Pipeline) error {
245250
for _, p := range pipelines {
246251
if err := installPipeline(ctx, api, p); err != nil {
247252
return err

0 commit comments

Comments
 (0)