From 5d0e676f0ba0c6179e3e3a2bc7308be7752a1141 Mon Sep 17 00:00:00 2001 From: Kevin Leimkuhler Date: Tue, 31 May 2022 13:51:38 -0600 Subject: [PATCH] Remove linkerd-viz dependency from linkerd-mutlicluster `gateways` command (#8467) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This changes linkerd-multicluster's `gateways` command to use the service mirror component's `/metrics` endpoint so that there is no longer a dependency on linkerd-viz. The dependency on linkerd-viz is leftover from when those components were part of the default installation meaning that we could always rely on the Prometheus component being present. Now, the `gateways` command starts a port-forward to each service mirror component (for each linked cluster) and queries the `/metrics` endpoint for the `gateway_alive` and `gateway_latency` metrics. It then queries the local cluster for the number of mirror services that correspond to the target cluster of that service mirror. Using these three data points, it creates the output table for the command. ### Output changes Currently the `gateways` command displays the P50, P95, and P99 latencies for each gateway ```shell $ linkerd multicluster gateways CLUSTER ALIVE NUM_SVC LATENCY_P50 LATENCY_P95 LATENCY_P99 k3d-x True 1 1ms 3ms 3ms k3d-z True 0 1ms 3ms 3ms ``` With this change, we now just show the last observed latency. This involved adding the `gateway_latency` metric Gauge — different from the current latencies Observer. ```shell $ linkerd multicluster gateways CLUSTER ALIVE NUM_SVC LATENCY k3d-x True 1 2ms k3d-z True 0 3ms ``` This is because I have not found a Prometheus common library for taking the parsed metrics from `/metrics` and turning that into a histogram yet; I think we should be able to do this but I'm leaving as a follow-up for now. Signed-off-by: Kevin Leimkuhler --- cli/cmd/controller-metrics.go | 2 +- cli/cmd/diagnostics.go | 6 +- cli/cmd/metrics_diagnostics_util.go | 41 +-- multicluster/cmd/gateways.go | 249 ++++++++++++------ multicluster/cmd/link.go | 12 + multicluster/service-mirror/metrics.go | 12 + multicluster/service-mirror/probe_worker.go | 1 + pkg/k8s/metrics.go | 49 ++++ .../multicluster-traffic/mc_traffic_test.go | 2 +- 9 files changed, 251 insertions(+), 123 deletions(-) create mode 100644 pkg/k8s/metrics.go diff --git a/cli/cmd/controller-metrics.go b/cli/cmd/controller-metrics.go index 60f4db65519a8..89007e7926271 100644 --- a/cli/cmd/controller-metrics.go +++ b/cli/cmd/controller-metrics.go @@ -50,7 +50,7 @@ func newCmdControllerMetrics() *cobra.Command { return err } - results := getMetrics(k8sAPI, pods.Items, adminHTTPPortName, options.wait, verbose) + results := getMetrics(k8sAPI, pods.Items, k8s.AdminHTTPPortName, options.wait, verbose) var buf bytes.Buffer for i, result := range results { diff --git a/cli/cmd/diagnostics.go b/cli/cmd/diagnostics.go index b19872e2679c4..67dcb1c5c05be 100644 --- a/cli/cmd/diagnostics.go +++ b/cli/cmd/diagnostics.go @@ -4,10 +4,6 @@ import ( "github.com/spf13/cobra" ) -const ( - adminHTTPPortName string = "admin-http" -) - // newCmdDiagnostics creates a new cobra command `diagnostics` which contains commands to fetch Linkerd diagnostics func newCmdDiagnostics() *cobra.Command { @@ -24,7 +20,7 @@ This command provides subcommands to diagnose the functionality of Linkerd.`, # Get metrics from the web deployment in the emojivoto namespace. linkerd diagnostics proxy-metrics -n emojivoto deploy/web - + # Get the endpoints for authorities in Linkerd's control-plane itself linkerd diagnostics endpoints web.linkerd-viz.svc.cluster.local:8084 `, diff --git a/cli/cmd/metrics_diagnostics_util.go b/cli/cmd/metrics_diagnostics_util.go index bbb63362ddf02..9b1a5cc13efa0 100644 --- a/cli/cmd/metrics_diagnostics_util.go +++ b/cli/cmd/metrics_diagnostics_util.go @@ -4,9 +4,6 @@ import ( "bytes" "crypto/sha256" "fmt" - "io/ioutil" - "net/http" - "os" "sort" "sync/atomic" "time" @@ -35,42 +32,6 @@ func (s byResult) Less(i, j int) bool { return s[i].pod < s[j].pod || ((s[i].pod == s[j].pod) && s[i].container < s[j].container) } -// getResponse makes a http Get request to the passed url and returns the response/error -func getResponse(url string) ([]byte, error) { - // url has been constructed by k8s.newPortForward and is not passed in by - // the user. - //nolint:gosec - resp, err := http.Get(url) - if err != nil { - return nil, err - } - defer resp.Body.Close() - return ioutil.ReadAll(resp.Body) -} - -// getContainerMetrics returns the metrics exposed by a container on the passed in portName -func getContainerMetrics( - k8sAPI *k8s.KubernetesAPI, - pod corev1.Pod, - container corev1.Container, - emitLogs bool, - portName string, -) ([]byte, error) { - portForward, err := k8s.NewContainerMetricsForward(k8sAPI, pod, container, emitLogs, portName) - if err != nil { - return nil, err - } - - defer portForward.Stop() - if err = portForward.Init(); err != nil { - fmt.Fprintf(os.Stderr, "Error running port-forward: %s", err) - return nil, err - } - - metricsURL := portForward.URLFor("/metrics") - return getResponse(metricsURL) -} - // getAllContainersWithPort returns all the containers within // a pod which exposes metrics at a port with name portName func getAllContainersWithPort( @@ -119,7 +80,7 @@ func getMetrics( } for _, c := range containers { - bytes, err := getContainerMetrics(k8sAPI, p, c, emitLogs, portName) + bytes, err := k8s.GetContainerMetrics(k8sAPI, p, c, emitLogs, portName) resultChan <- metricsResult{ pod: p.GetName(), diff --git a/multicluster/cmd/gateways.go b/multicluster/cmd/gateways.go index 36901bb82e4e8..f42769f94d7c7 100644 --- a/multicluster/cmd/gateways.go +++ b/multicluster/cmd/gateways.go @@ -1,95 +1,218 @@ package cmd import ( + "bytes" "context" "fmt" "io" "os" + "sync/atomic" + "time" "github.com/linkerd/linkerd2/cli/table" "github.com/linkerd/linkerd2/pkg/k8s" - vizCmd "github.com/linkerd/linkerd2/viz/cmd" - "github.com/linkerd/linkerd2/viz/metrics-api/client" - pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz" + io_prometheus_client "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" "github.com/spf13/cobra" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) type ( gatewaysOptions struct { - gatewayNamespace string + clusterName string + wait time.Duration + } + + gatewayMetrics struct { + clusterName string + metrics []byte + err error + } + + gatewayStatus struct { clusterName string - timeWindow string + alive bool + numberOfServices int + latency uint64 } ) +func newGatewaysOptions() *gatewaysOptions { + return &gatewaysOptions{ + wait: 30 * time.Second, + } +} + func newGatewaysCommand() *cobra.Command { - opts := gatewaysOptions{} + opts := newGatewaysOptions() cmd := &cobra.Command{ Use: "gateways", Short: "Display stats information about the gateways in target clusters", Args: cobra.NoArgs, RunE: func(cmd *cobra.Command, args []string) error { - req := &pb.GatewaysRequest{ - RemoteClusterName: opts.clusterName, - GatewayNamespace: opts.gatewayNamespace, - TimeWindow: opts.timeWindow, - } - k8sAPI, err := k8s.NewAPI(kubeconfigPath, kubeContext, impersonate, impersonateGroup, 0) if err != nil { return err } - ctx := cmd.Context() - - vizNs, err := k8sAPI.GetNamespaceWithExtensionLabel(ctx, vizCmd.ExtensionName) + // Get all the service mirror components in the + // linkerd-multicluster namespace which we'll collect gateway + // metrics from. + multiclusterNs, err := k8sAPI.GetNamespaceWithExtensionLabel(cmd.Context(), MulticlusterExtensionName) if err != nil { - return fmt.Errorf("make sure the linkerd-viz extension is installed, using 'linkerd viz install' (%w)", err) + return fmt.Errorf("make sure the linkerd-multicluster extension is installed, using 'linkerd multicluster install' (%w)", err) } - - client, err := client.NewExternalClient(ctx, vizNs.Name, k8sAPI) - if err != nil { - return err + selector := fmt.Sprintf("component=%s", "linkerd-service-mirror") + if opts.clusterName != "" { + selector = fmt.Sprintf("%s,mirror.linkerd.io/cluster-name=%s", selector, opts.clusterName) } - - resp, err := requestGatewaysFromAPI(client, req) + pods, err := k8sAPI.CoreV1().Pods(multiclusterNs.Name).List(cmd.Context(), metav1.ListOptions{LabelSelector: selector}) if err != nil { - fmt.Fprintln(os.Stderr, err) + fmt.Fprintf(os.Stderr, "failed to list pods in namespace %s: %s", multiclusterNs.Name, err) os.Exit(1) } - renderGateways(resp.GetOk().GatewaysTable.Rows, stdout) + var statuses []gatewayStatus + gatewayMetrics := getGatewayMetrics(k8sAPI, pods.Items, opts.wait) + for _, gateway := range gatewayMetrics { + if gateway.err != nil { + fmt.Fprintf(os.Stderr, "Failed to get gateway status for %s: %s\n", gateway.clusterName, gateway.err) + continue + } + gatewayStatus := gatewayStatus{ + clusterName: gateway.clusterName, + } + + // Parse the gateway metrics so that we can extract liveness + // and latency information. + var metricsParser expfmt.TextParser + parsedMetrics, err := metricsParser.TextToMetricFamilies(bytes.NewReader(gateway.metrics)) + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to parse metrics for %s: %s\n", gateway.clusterName, err) + continue + } + + // Check if the gateway is alive by using the gateway_alive + // metric and ensuring the label matches the target cluster. + for _, metrics := range parsedMetrics["gateway_alive"].GetMetric() { + if !isTargetClusterMetric(metrics, gateway.clusterName) { + continue + } + if metrics.GetGauge().GetValue() == 1 { + gatewayStatus.alive = true + break + } + } + + // Search the local cluster for mirror services that are + // mirrored from the target cluster. + selector := fmt.Sprintf("%s=%s,%s=%s", + k8s.MirroredResourceLabel, "true", + k8s.RemoteClusterNameLabel, gateway.clusterName, + ) + services, err := k8sAPI.CoreV1().Services(corev1.NamespaceAll).List(context.Background(), metav1.ListOptions{LabelSelector: selector}) + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to list services for %s: %s\n", gateway.clusterName, err) + continue + } + gatewayStatus.numberOfServices = len(services.Items) + + // Check the last observed latency by using the + // gateway_latency metric and ensuring the label the target + // cluster. + for _, metrics := range parsedMetrics["gateway_latency"].GetMetric() { + if !isTargetClusterMetric(metrics, gateway.clusterName) { + continue + } + gatewayStatus.latency = uint64(metrics.GetGauge().GetValue()) + break + } + + statuses = append(statuses, gatewayStatus) + } + renderGateways(statuses, stdout) return nil }, } cmd.Flags().StringVar(&opts.clusterName, "cluster-name", "", "the name of the target cluster") - cmd.Flags().StringVar(&opts.gatewayNamespace, "gateway-namespace", "", "the namespace in which the gateway resides on the target cluster") - cmd.Flags().StringVarP(&opts.timeWindow, "time-window", "t", "1m", "Time window (for example: \"15s\", \"1m\", \"10m\", \"1h\"). Needs to be at least 15s.") + cmd.Flags().DurationVarP(&opts.wait, "wait", "w", opts.wait, "time allowed to fetch diagnostics") return cmd } -func requestGatewaysFromAPI(client pb.ApiClient, req *pb.GatewaysRequest) (*pb.GatewaysResponse, error) { - resp, err := client.Gateways(context.Background(), req) - if err != nil { - return nil, fmt.Errorf("Gateways API error: %w", err) +func getGatewayMetrics(k8sAPI *k8s.KubernetesAPI, pods []corev1.Pod, wait time.Duration) []gatewayMetrics { + var metrics []gatewayMetrics + metricsChan := make(chan gatewayMetrics) + var activeRoutines int32 + for _, pod := range pods { + atomic.AddInt32(&activeRoutines, 1) + go func(p corev1.Pod) { + defer atomic.AddInt32(&activeRoutines, -1) + name := p.Labels[k8s.RemoteClusterNameLabel] + container, err := getServiceMirrorContainer(p) + if err != nil { + metricsChan <- gatewayMetrics{ + clusterName: name, + err: err, + } + return + } + metrics, err := k8s.GetContainerMetrics(k8sAPI, p, container, false, k8s.AdminHTTPPortName) + metricsChan <- gatewayMetrics{ + clusterName: name, + metrics: metrics, + err: err, + } + }(pod) } - if e := resp.GetError(); e != nil { - return nil, fmt.Errorf("Gateways API response error: %v", e.Error) + timeout := time.NewTimer(wait) + defer timeout.Stop() +wait: + for { + select { + case metric := <-metricsChan: + metrics = append(metrics, metric) + case <-timeout.C: + break wait + } + if atomic.LoadInt32(&activeRoutines) == 0 { + break + } + } + return metrics +} + +func getServiceMirrorContainer(pod corev1.Pod) (corev1.Container, error) { + if pod.Status.Phase != corev1.PodRunning { + return corev1.Container{}, fmt.Errorf("pod not running: %s", pod.GetName()) + } + for _, c := range pod.Spec.Containers { + if c.Name == "service-mirror" { + return c, nil + } + } + return corev1.Container{}, fmt.Errorf("pod %s did not have 'service-mirror' container", pod.GetName()) +} + +func isTargetClusterMetric(metric *io_prometheus_client.Metric, targetClusterName string) bool { + for _, label := range metric.GetLabel() { + if label.GetName() == "target_cluster_name" { + return label.GetValue() == targetClusterName + } } - return resp, nil + return false } -func renderGateways(rows []*pb.GatewaysTable_Row, w io.Writer) { +func renderGateways(statuses []gatewayStatus, w io.Writer) { t := buildGatewaysTable() t.Data = []table.Row{} - for _, row := range rows { - row := row // Copy to satisfy golint. - t.Data = append(t.Data, gatewaysRowToTableRow(row)) + for _, status := range statuses { + status := status + t.Data = append(t.Data, gatewayStatusToTableRow(status)) } t.Render(w) } @@ -98,79 +221,53 @@ var ( clusterNameHeader = "CLUSTER" aliveHeader = "ALIVE" pairedServicesHeader = "NUM_SVC" - latencyP50Header = "LATENCY_P50" - latencyP95Header = "LATENCY_P95" - latencyP99Header = "LATENCY_P99" + latencyHeader = "LATENCY" ) func buildGatewaysTable() table.Table { columns := []table.Column{ - table.Column{ + { Header: clusterNameHeader, Width: 7, Flexible: true, LeftAlign: true, }, - table.Column{ + { Header: aliveHeader, Width: 5, Flexible: true, LeftAlign: true, }, - table.Column{ + { Header: pairedServicesHeader, Width: 9, }, - table.Column{ - Header: latencyP50Header, - Width: 11, - }, - table.Column{ - Header: latencyP95Header, - Width: 11, - }, - table.Column{ - Header: latencyP99Header, + { + Header: latencyHeader, Width: 11, }, } t := table.NewTable(columns, []table.Row{}) - t.Sort = []int{0, 1} // Sort by namespace, then name. + t.Sort = []int{0} // sort by cluster name return t } -func gatewaysRowToTableRow(row *pb.GatewaysTable_Row) []string { +func gatewayStatusToTableRow(status gatewayStatus) []string { valueOrPlaceholder := func(value string) string { - if row.Alive { + if status.alive { return value } return "-" } - alive := "False" - - if row.Alive { + if status.alive { alive = "True" } return []string{ - row.ClusterName, + status.clusterName, alive, - fmt.Sprint(row.PairedServices), - valueOrPlaceholder(fmt.Sprintf("%dms", row.LatencyMsP50)), - valueOrPlaceholder(fmt.Sprintf("%dms", row.LatencyMsP95)), - valueOrPlaceholder(fmt.Sprintf("%dms", row.LatencyMsP99)), + fmt.Sprint(status.numberOfServices), + valueOrPlaceholder(fmt.Sprintf("%dms", status.latency)), } } - -func extractGatewayPort(gateway *corev1.Service) (uint32, error) { - for _, port := range gateway.Spec.Ports { - if port.Name == k8s.GatewayPortName { - if gateway.Spec.Type == "NodePort" { - return uint32(port.NodePort), nil - } - return uint32(port.Port), nil - } - } - return 0, fmt.Errorf("gateway service %s has no gateway port named %s", gateway.Name, k8s.GatewayPortName) -} diff --git a/multicluster/cmd/link.go b/multicluster/cmd/link.go index b838e02a0454f..766356773d430 100644 --- a/multicluster/cmd/link.go +++ b/multicluster/cmd/link.go @@ -415,3 +415,15 @@ func buildServiceMirrorValues(opts *linkOptions) (*multicluster.Values, error) { return defaults, nil } + +func extractGatewayPort(gateway *corev1.Service) (uint32, error) { + for _, port := range gateway.Spec.Ports { + if port.Name == k8s.GatewayPortName { + if gateway.Spec.Type == "NodePort" { + return uint32(port.NodePort), nil + } + return uint32(port.Port), nil + } + } + return 0, fmt.Errorf("gateway service %s has no gateway port named %s", gateway.Name, k8s.GatewayPortName) +} diff --git a/multicluster/service-mirror/metrics.go b/multicluster/service-mirror/metrics.go index ba14732a04249..2652641d3b143 100644 --- a/multicluster/service-mirror/metrics.go +++ b/multicluster/service-mirror/metrics.go @@ -16,6 +16,7 @@ const ( // workers. type ProbeMetricVecs struct { alive *prometheus.GaugeVec + latency *prometheus.GaugeVec latencies *prometheus.HistogramVec enqueues *prometheus.CounterVec dequeues *prometheus.CounterVec @@ -26,6 +27,7 @@ type ProbeMetricVecs struct { // probe worker. type ProbeMetrics struct { alive prometheus.Gauge + latency prometheus.Gauge latencies prometheus.Observer probes *prometheus.CounterVec unregister func() @@ -79,6 +81,14 @@ func NewProbeMetricVecs() ProbeMetricVecs { labelNames, ) + latency := promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "gateway_latency", + Help: "A gauge which is the latency of the last probe to the gateway.", + }, + labelNames, + ) + latencies := promauto.NewHistogramVec( prometheus.HistogramOpts{ Name: "gateway_probe_latency_ms", @@ -95,6 +105,7 @@ func NewProbeMetricVecs() ProbeMetricVecs { return ProbeMetricVecs{ alive: alive, + latency: latency, latencies: latencies, enqueues: enqueues, dequeues: dequeues, @@ -116,6 +127,7 @@ func (mv ProbeMetricVecs) NewWorkerMetrics(remoteClusterName string) (*ProbeMetr } return &ProbeMetrics{ alive: mv.alive.With(labels), + latency: mv.latency.With(labels), latencies: mv.latencies.With(labels), probes: curriedProbes, unregister: func() { diff --git a/multicluster/service-mirror/probe_worker.go b/multicluster/service-mirror/probe_worker.go index a1d0104e2304a..2795b19c53d31 100644 --- a/multicluster/service-mirror/probe_worker.go +++ b/multicluster/service-mirror/probe_worker.go @@ -117,6 +117,7 @@ func (pw *ProbeWorker) doProbe() { } else { pw.log.Debug("Gateway is healthy") pw.metrics.alive.Set(1) + pw.metrics.latency.Set(float64(end.Milliseconds())) pw.metrics.latencies.Observe(float64(end.Milliseconds())) pw.metrics.probes.With(successLabel).Inc() if !pw.alive { diff --git a/pkg/k8s/metrics.go b/pkg/k8s/metrics.go new file mode 100644 index 0000000000000..6caa73e7d6ca5 --- /dev/null +++ b/pkg/k8s/metrics.go @@ -0,0 +1,49 @@ +package k8s + +import ( + "fmt" + "io/ioutil" + "net/http" + "os" + + corev1 "k8s.io/api/core/v1" +) + +// AdminHTTPPortName is the name of the port used by the admin http server. +const AdminHTTPPortName string = "admin-http" + +// GetContainerMetrics returns the metrics exposed by a container on the passed in portName +func GetContainerMetrics( + k8sAPI *KubernetesAPI, + pod corev1.Pod, + container corev1.Container, + emitLogs bool, + portName string, +) ([]byte, error) { + portForward, err := NewContainerMetricsForward(k8sAPI, pod, container, emitLogs, portName) + if err != nil { + return nil, err + } + + defer portForward.Stop() + if err = portForward.Init(); err != nil { + fmt.Fprintf(os.Stderr, "Error running port-forward: %s", err) + return nil, err + } + + metricsURL := portForward.URLFor("/metrics") + return getResponse(metricsURL) +} + +// getResponse makes a http Get request to the passed url and returns the response/error +func getResponse(url string) ([]byte, error) { + // url has been constructed by k8s.newPortForward and is not passed in by + // the user. + //nolint:gosec + resp, err := http.Get(url) + if err != nil { + return nil, err + } + defer resp.Body.Close() + return ioutil.ReadAll(resp.Body) +} diff --git a/test/integration/multicluster/multicluster-traffic/mc_traffic_test.go b/test/integration/multicluster/multicluster-traffic/mc_traffic_test.go index 7b21d2d5fac28..8127f1c5e3830 100644 --- a/test/integration/multicluster/multicluster-traffic/mc_traffic_test.go +++ b/test/integration/multicluster/multicluster-traffic/mc_traffic_test.go @@ -86,7 +86,7 @@ func TestGateways(t *testing.T) { return errors.New("response is empty") } fields := strings.Fields(rows[1]) - if len(fields) < 6 { + if len(fields) < 4 { return fmt.Errorf("unexpected number of columns: %d", len(fields)) } if fields[0] != "target" {