Skip to content

Commit ea3e2d9

Browse files
committed
internal/elasticsearch/ingest: export LoadIngestPipelineFiles and InstallPipelinesInElasticsearch
1 parent 9edf442 commit ea3e2d9

File tree

1 file changed

+9
-4
lines changed

1 file changed

+9
-4
lines changed

internal/elasticsearch/ingest/datastream.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,19 +58,22 @@ func InstallDataStreamPipelines(ctx context.Context, api *elasticsearch.API, dat
5858
nonce := time.Now().UnixNano()
5959

6060
mainPipeline := getPipelineNameWithNonce(dataStreamManifest.GetPipelineNameOrDefault(), nonce)
61-
pipelines, err := loadIngestPipelineFiles(dataStreamPath, nonce)
61+
pipelines, err := LoadIngestPipelineFiles(dataStreamPath, nonce)
6262
if err != nil {
6363
return "", nil, fmt.Errorf("loading ingest pipeline files failed: %w", err)
6464
}
6565

66-
err = installPipelinesInElasticsearch(ctx, api, pipelines)
66+
err = InstallPipelinesInElasticsearch(ctx, api, pipelines)
6767
if err != nil {
6868
return "", nil, err
6969
}
7070
return mainPipeline, pipelines, nil
7171
}
7272

73-
func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]Pipeline, error) {
73+
// LoadIngestPipelineFiles returns the set of pipelines found in the directory
74+
// elasticsearch/ingest_pipeline under the provided data stream path. The names
75+
// of the pipelines are decorated with the provided nonce.
76+
func LoadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]Pipeline, error) {
7477
elasticsearchPath := filepath.Join(dataStreamPath, "elasticsearch", "ingest_pipeline")
7578

7679
var pipelineFiles []string
@@ -241,7 +244,9 @@ func convertValue(value interface{}, label string) ([]string, error) {
241244
}
242245
}
243246

244-
func installPipelinesInElasticsearch(ctx context.Context, api *elasticsearch.API, pipelines []Pipeline) error {
247+
// InstallPipelinesInElasticsearch installs the provided pipelines into the
248+
// Elasticsearch instance specified by the provided API handle.
249+
func InstallPipelinesInElasticsearch(ctx context.Context, api *elasticsearch.API, pipelines []Pipeline) error {
245250
for _, p := range pipelines {
246251
if err := installPipeline(ctx, api, p); err != nil {
247252
return err

0 commit comments

Comments
 (0)