@@ -57,20 +57,23 @@ func InstallDataStreamPipelines(ctx context.Context, api *elasticsearch.API, dat
5757
5858 nonce := time .Now ().UnixNano ()
5959
60- mainPipeline := getPipelineNameWithNonce (dataStreamManifest .GetPipelineNameOrDefault (), nonce )
61- pipelines , err := loadIngestPipelineFiles (dataStreamPath , nonce )
60+ mainPipeline := GetPipelineNameWithNonce (dataStreamManifest .GetPipelineNameOrDefault (), nonce )
61+ pipelines , err := LoadIngestPipelineFiles (dataStreamPath , nonce )
6262 if err != nil {
6363 return "" , nil , fmt .Errorf ("loading ingest pipeline files failed: %w" , err )
6464 }
6565
66- err = installPipelinesInElasticsearch (ctx , api , pipelines )
66+ err = InstallPipelinesInElasticsearch (ctx , api , pipelines )
6767 if err != nil {
6868 return "" , nil , err
6969 }
7070 return mainPipeline , pipelines , nil
7171}
7272
73- func loadIngestPipelineFiles (dataStreamPath string , nonce int64 ) ([]Pipeline , error ) {
73+ // LoadIngestPipelineFiles returns the set of pipelines found in the directory
74+ // elasticsearch/ingest_pipeline under the provided data stream path. The names
75+ // of the pipelines are decorated with the provided nonce.
76+ func LoadIngestPipelineFiles (dataStreamPath string , nonce int64 ) ([]Pipeline , error ) {
7477 elasticsearchPath := filepath .Join (dataStreamPath , "elasticsearch" , "ingest_pipeline" )
7578
7679 var pipelineFiles []string
@@ -100,7 +103,7 @@ func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]Pipeline, er
100103 return nil
101104 }
102105 pipelineTag := s [1 ]
103- return []byte (getPipelineNameWithNonce (pipelineTag , nonce ))
106+ return []byte (GetPipelineNameWithNonce (pipelineTag , nonce ))
104107 })
105108 if err != nil {
106109 return nil , err
@@ -114,7 +117,7 @@ func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]Pipeline, er
114117 name := filepath .Base (path )
115118 pipelines = append (pipelines , Pipeline {
116119 Path : path ,
117- Name : getPipelineNameWithNonce (name [:strings .Index (name , "." )], nonce ),
120+ Name : GetPipelineNameWithNonce (name [:strings .Index (name , "." )], nonce ),
118121 Format : filepath .Ext (strings .TrimSuffix (path , ".link" ))[1 :],
119122 Content : cWithRerouteProcessors ,
120123 ContentOriginal : c ,
@@ -241,7 +244,9 @@ func convertValue(value interface{}, label string) ([]string, error) {
241244 }
242245}
243246
244- func installPipelinesInElasticsearch (ctx context.Context , api * elasticsearch.API , pipelines []Pipeline ) error {
247+ // InstallPipelinesInElasticsearch installs the provided pipelines into the
248+ // Elasticsearch instance specified by the provided API handle.
249+ func InstallPipelinesInElasticsearch (ctx context.Context , api * elasticsearch.API , pipelines []Pipeline ) error {
245250 for _ , p := range pipelines {
246251 if err := installPipeline (ctx , api , p ); err != nil {
247252 return err
@@ -317,6 +322,7 @@ func getIngestPipeline(ctx context.Context, api *elasticsearch.API, pipeline Pip
317322 return nil
318323}
319324
320- func getPipelineNameWithNonce (pipelineName string , nonce int64 ) string {
325+ // GetPipelineNameWithNonce returns the pipeline name decorated with the provided nonce.
326+ func GetPipelineNameWithNonce (pipelineName string , nonce int64 ) string {
321327 return fmt .Sprintf ("%s-%d" , pipelineName , nonce )
322328}
0 commit comments