Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions comp/logs-library/pipeline/nodeless.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package pipeline

import (
"github.com/DataDog/datadog-agent/comp/logs/agent/config"
)

// applyRoutingHeaders sets x-dd-logs-routing on non-nodeless endpoints based on
// the primary transport, or strips it entirely for nodeless nodes.
func applyRoutingHeaders(endpoints *config.Endpoints, nodeless bool) {
if nodeless {
stripRoutingHeader(endpoints)
return
}

var routingValue string
if endpoints.UseGRPC {
routingValue = "grpc"
} else if endpoints.UseHTTP {
routingValue = "http"
} else {
return
}

if endpoints.Main.ExtraHTTPHeaders == nil {
endpoints.Main.ExtraHTTPHeaders = map[string]string{}
}
endpoints.Main.ExtraHTTPHeaders["x-dd-logs-routing"] = routingValue
if len(endpoints.Endpoints) > 0 {
if endpoints.Endpoints[0].ExtraHTTPHeaders == nil {
endpoints.Endpoints[0].ExtraHTTPHeaders = map[string]string{}
}
endpoints.Endpoints[0].ExtraHTTPHeaders["x-dd-logs-routing"] = routingValue
}
}

func stripRoutingHeader(endpoints *config.Endpoints) {
delete(endpoints.Main.ExtraHTTPHeaders, "x-dd-logs-routing")
for i := range endpoints.Endpoints {
delete(endpoints.Endpoints[i].ExtraHTTPHeaders, "x-dd-logs-routing")
}
}
33 changes: 33 additions & 0 deletions comp/logs-library/pipeline/nodeless_k8s.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build kubelet

package pipeline

import (
"context"

pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/hostinfo"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

// isNodelessNode returns true when the current node has the label class=nodeless.
func isNodelessNode(_ pkgconfigmodel.Reader) bool {
nodeInfo, err := hostinfo.NewNodeInfo()
if err != nil {
log.Debugf("logs-agent: could not create NodeInfo to check nodeless label: %v", err)
return false
}

labels, err := nodeInfo.GetNodeLabels(context.Background())
if err != nil {
log.Debugf("logs-agent: could not get node labels: %v", err)
return false
}

return labels["class"] == "nodeless"
}
17 changes: 17 additions & 0 deletions comp/logs-library/pipeline/nodeless_nok8s.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build !kubelet

package pipeline

import (
pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model"
)

// isNodelessNode always returns false when not built with kubelet support.
func isNodelessNode(_ pkgconfigmodel.Reader) bool {
return false
}
11 changes: 11 additions & 0 deletions comp/logs-library/pipeline/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,17 @@ func NewProvider(
var senderImpl sender.PipelineComponent
serverlessMeta := sender.NewServerlessMeta(serverless)

nodeless := isNodelessNode(cfg)

// Nodeless nodes must not use gRPC (not zonally balanced); fall back to HTTP.
if nodeless && endpoints.UseGRPC {
endpoints.UseGRPC = false
endpoints.UseHTTP = true
}

// Inject or strip the x-dd-logs-routing header based on node type and transport.
applyRoutingHeaders(endpoints, nodeless)

if endpoints.UseGRPC {
senderImpl = grpcsender.NewSender(numberOfPipelines, cfg, sink, endpoints, destinationsContext, compression)
} else if endpoints.UseHTTP {
Expand Down
Loading