Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions internal/elasticsearch/ingest/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,23 @@ func InstallDataStreamPipelines(ctx context.Context, api *elasticsearch.API, dat

nonce := time.Now().UnixNano()

mainPipeline := getPipelineNameWithNonce(dataStreamManifest.GetPipelineNameOrDefault(), nonce)
pipelines, err := loadIngestPipelineFiles(dataStreamPath, nonce)
mainPipeline := GetPipelineNameWithNonce(dataStreamManifest.GetPipelineNameOrDefault(), nonce)
pipelines, err := LoadIngestPipelineFiles(dataStreamPath, nonce)
if err != nil {
return "", nil, fmt.Errorf("loading ingest pipeline files failed: %w", err)
}

err = installPipelinesInElasticsearch(ctx, api, pipelines)
err = InstallPipelinesInElasticsearch(ctx, api, pipelines)
if err != nil {
return "", nil, err
}
return mainPipeline, pipelines, nil
}

func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]Pipeline, error) {
// LoadIngestPipelineFiles returns the set of pipelines found in the directory
// elasticsearch/ingest_pipeline under the provided data stream path. The names
// of the pipelines are decorated with the provided nonce.
func LoadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]Pipeline, error) {
elasticsearchPath := filepath.Join(dataStreamPath, "elasticsearch", "ingest_pipeline")

var pipelineFiles []string
Expand Down Expand Up @@ -100,7 +103,7 @@ func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]Pipeline, er
return nil
}
pipelineTag := s[1]
return []byte(getPipelineNameWithNonce(pipelineTag, nonce))
return []byte(GetPipelineNameWithNonce(pipelineTag, nonce))
})
if err != nil {
return nil, err
Expand All @@ -114,7 +117,7 @@ func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]Pipeline, er
name := filepath.Base(path)
pipelines = append(pipelines, Pipeline{
Path: path,
Name: getPipelineNameWithNonce(name[:strings.Index(name, ".")], nonce),
Name: GetPipelineNameWithNonce(name[:strings.Index(name, ".")], nonce),
Format: filepath.Ext(strings.TrimSuffix(path, ".link"))[1:],
Content: cWithRerouteProcessors,
ContentOriginal: c,
Expand Down Expand Up @@ -241,7 +244,9 @@ func convertValue(value interface{}, label string) ([]string, error) {
}
}

func installPipelinesInElasticsearch(ctx context.Context, api *elasticsearch.API, pipelines []Pipeline) error {
// InstallPipelinesInElasticsearch installs the provided pipelines into the
// Elasticsearch instance specified by the provided API handle.
func InstallPipelinesInElasticsearch(ctx context.Context, api *elasticsearch.API, pipelines []Pipeline) error {
for _, p := range pipelines {
if err := installPipeline(ctx, api, p); err != nil {
return err
Expand Down Expand Up @@ -317,6 +322,7 @@ func getIngestPipeline(ctx context.Context, api *elasticsearch.API, pipeline Pip
return nil
}

func getPipelineNameWithNonce(pipelineName string, nonce int64) string {
// GetPipelineNameWithNonce returns the pipeline name decorated with the provided nonce.
func GetPipelineNameWithNonce(pipelineName string, nonce int64) string {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this method needed out of this package? The pipelines returned by LoadIngestPipelineFiles already include the nonce 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nonce is something that ideally I'd like not to exist, but it needs to. Separating them allows me to (mostly) ignore it.

All of these packages are in internal, so I don't think we need to be precious about exporting labels.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could take the opportunity to improve the interface so callers don't need to think about "nonces", they would only need to think if they need unique names or not, what seems like a higher level concept more aligned with the actual needing.

But well, I guess this is something we can refactor later, so chose the option you prefer.

return fmt.Sprintf("%s-%d", pipelineName, nonce)
}