diff --git a/cli/cmd/controller-metrics.go b/cli/cmd/controller-metrics.go index 60f4db65519a8..89007e7926271 100644 --- a/cli/cmd/controller-metrics.go +++ b/cli/cmd/controller-metrics.go @@ -50,7 +50,7 @@ func newCmdControllerMetrics() *cobra.Command { return err } - results := getMetrics(k8sAPI, pods.Items, adminHTTPPortName, options.wait, verbose) + results := getMetrics(k8sAPI, pods.Items, k8s.AdminHTTPPortName, options.wait, verbose) var buf bytes.Buffer for i, result := range results { diff --git a/cli/cmd/diagnostics.go b/cli/cmd/diagnostics.go index b19872e2679c4..67dcb1c5c05be 100644 --- a/cli/cmd/diagnostics.go +++ b/cli/cmd/diagnostics.go @@ -4,10 +4,6 @@ import ( "github.com/spf13/cobra" ) -const ( - adminHTTPPortName string = "admin-http" -) - // newCmdDiagnostics creates a new cobra command `diagnostics` which contains commands to fetch Linkerd diagnostics func newCmdDiagnostics() *cobra.Command { @@ -24,7 +20,7 @@ This command provides subcommands to diagnose the functionality of Linkerd.`, # Get metrics from the web deployment in the emojivoto namespace. linkerd diagnostics proxy-metrics -n emojivoto deploy/web - + # Get the endpoints for authorities in Linkerd's control-plane itself linkerd diagnostics endpoints web.linkerd-viz.svc.cluster.local:8084 `, diff --git a/cli/cmd/metrics_diagnostics_util.go b/cli/cmd/metrics_diagnostics_util.go index bbb63362ddf02..9b1a5cc13efa0 100644 --- a/cli/cmd/metrics_diagnostics_util.go +++ b/cli/cmd/metrics_diagnostics_util.go @@ -4,9 +4,6 @@ import ( "bytes" "crypto/sha256" "fmt" - "io/ioutil" - "net/http" - "os" "sort" "sync/atomic" "time" @@ -35,42 +32,6 @@ func (s byResult) Less(i, j int) bool { return s[i].pod < s[j].pod || ((s[i].pod == s[j].pod) && s[i].container < s[j].container) } -// getResponse makes a http Get request to the passed url and returns the response/error -func getResponse(url string) ([]byte, error) { - // url has been constructed by k8s.newPortForward and is not passed in by - // the user. - //nolint:gosec - resp, err := http.Get(url) - if err != nil { - return nil, err - } - defer resp.Body.Close() - return ioutil.ReadAll(resp.Body) -} - -// getContainerMetrics returns the metrics exposed by a container on the passed in portName -func getContainerMetrics( - k8sAPI *k8s.KubernetesAPI, - pod corev1.Pod, - container corev1.Container, - emitLogs bool, - portName string, -) ([]byte, error) { - portForward, err := k8s.NewContainerMetricsForward(k8sAPI, pod, container, emitLogs, portName) - if err != nil { - return nil, err - } - - defer portForward.Stop() - if err = portForward.Init(); err != nil { - fmt.Fprintf(os.Stderr, "Error running port-forward: %s", err) - return nil, err - } - - metricsURL := portForward.URLFor("/metrics") - return getResponse(metricsURL) -} - // getAllContainersWithPort returns all the containers within // a pod which exposes metrics at a port with name portName func getAllContainersWithPort( @@ -119,7 +80,7 @@ func getMetrics( } for _, c := range containers { - bytes, err := getContainerMetrics(k8sAPI, p, c, emitLogs, portName) + bytes, err := k8s.GetContainerMetrics(k8sAPI, p, c, emitLogs, portName) resultChan <- metricsResult{ pod: p.GetName(), diff --git a/multicluster/cmd/gateways.go b/multicluster/cmd/gateways.go index 36901bb82e4e8..f42769f94d7c7 100644 --- a/multicluster/cmd/gateways.go +++ b/multicluster/cmd/gateways.go @@ -1,95 +1,218 @@ package cmd import ( + "bytes" "context" "fmt" "io" "os" + "sync/atomic" + "time" "github.com/linkerd/linkerd2/cli/table" "github.com/linkerd/linkerd2/pkg/k8s" - vizCmd "github.com/linkerd/linkerd2/viz/cmd" - "github.com/linkerd/linkerd2/viz/metrics-api/client" - pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz" + io_prometheus_client "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" "github.com/spf13/cobra" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) type ( gatewaysOptions struct { - gatewayNamespace string + clusterName string + wait time.Duration + } + + gatewayMetrics struct { + clusterName string + metrics []byte + err error + } + + gatewayStatus struct { clusterName string - timeWindow string + alive bool + numberOfServices int + latency uint64 } ) +func newGatewaysOptions() *gatewaysOptions { + return &gatewaysOptions{ + wait: 30 * time.Second, + } +} + func newGatewaysCommand() *cobra.Command { - opts := gatewaysOptions{} + opts := newGatewaysOptions() cmd := &cobra.Command{ Use: "gateways", Short: "Display stats information about the gateways in target clusters", Args: cobra.NoArgs, RunE: func(cmd *cobra.Command, args []string) error { - req := &pb.GatewaysRequest{ - RemoteClusterName: opts.clusterName, - GatewayNamespace: opts.gatewayNamespace, - TimeWindow: opts.timeWindow, - } - k8sAPI, err := k8s.NewAPI(kubeconfigPath, kubeContext, impersonate, impersonateGroup, 0) if err != nil { return err } - ctx := cmd.Context() - - vizNs, err := k8sAPI.GetNamespaceWithExtensionLabel(ctx, vizCmd.ExtensionName) + // Get all the service mirror components in the + // linkerd-multicluster namespace which we'll collect gateway + // metrics from. + multiclusterNs, err := k8sAPI.GetNamespaceWithExtensionLabel(cmd.Context(), MulticlusterExtensionName) if err != nil { - return fmt.Errorf("make sure the linkerd-viz extension is installed, using 'linkerd viz install' (%w)", err) + return fmt.Errorf("make sure the linkerd-multicluster extension is installed, using 'linkerd multicluster install' (%w)", err) } - - client, err := client.NewExternalClient(ctx, vizNs.Name, k8sAPI) - if err != nil { - return err + selector := fmt.Sprintf("component=%s", "linkerd-service-mirror") + if opts.clusterName != "" { + selector = fmt.Sprintf("%s,mirror.linkerd.io/cluster-name=%s", selector, opts.clusterName) } - - resp, err := requestGatewaysFromAPI(client, req) + pods, err := k8sAPI.CoreV1().Pods(multiclusterNs.Name).List(cmd.Context(), metav1.ListOptions{LabelSelector: selector}) if err != nil { - fmt.Fprintln(os.Stderr, err) + fmt.Fprintf(os.Stderr, "failed to list pods in namespace %s: %s", multiclusterNs.Name, err) os.Exit(1) } - renderGateways(resp.GetOk().GatewaysTable.Rows, stdout) + var statuses []gatewayStatus + gatewayMetrics := getGatewayMetrics(k8sAPI, pods.Items, opts.wait) + for _, gateway := range gatewayMetrics { + if gateway.err != nil { + fmt.Fprintf(os.Stderr, "Failed to get gateway status for %s: %s\n", gateway.clusterName, gateway.err) + continue + } + gatewayStatus := gatewayStatus{ + clusterName: gateway.clusterName, + } + + // Parse the gateway metrics so that we can extract liveness + // and latency information. + var metricsParser expfmt.TextParser + parsedMetrics, err := metricsParser.TextToMetricFamilies(bytes.NewReader(gateway.metrics)) + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to parse metrics for %s: %s\n", gateway.clusterName, err) + continue + } + + // Check if the gateway is alive by using the gateway_alive + // metric and ensuring the label matches the target cluster. + for _, metrics := range parsedMetrics["gateway_alive"].GetMetric() { + if !isTargetClusterMetric(metrics, gateway.clusterName) { + continue + } + if metrics.GetGauge().GetValue() == 1 { + gatewayStatus.alive = true + break + } + } + + // Search the local cluster for mirror services that are + // mirrored from the target cluster. + selector := fmt.Sprintf("%s=%s,%s=%s", + k8s.MirroredResourceLabel, "true", + k8s.RemoteClusterNameLabel, gateway.clusterName, + ) + services, err := k8sAPI.CoreV1().Services(corev1.NamespaceAll).List(context.Background(), metav1.ListOptions{LabelSelector: selector}) + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to list services for %s: %s\n", gateway.clusterName, err) + continue + } + gatewayStatus.numberOfServices = len(services.Items) + + // Check the last observed latency by using the + // gateway_latency metric and ensuring the label the target + // cluster. + for _, metrics := range parsedMetrics["gateway_latency"].GetMetric() { + if !isTargetClusterMetric(metrics, gateway.clusterName) { + continue + } + gatewayStatus.latency = uint64(metrics.GetGauge().GetValue()) + break + } + + statuses = append(statuses, gatewayStatus) + } + renderGateways(statuses, stdout) return nil }, } cmd.Flags().StringVar(&opts.clusterName, "cluster-name", "", "the name of the target cluster") - cmd.Flags().StringVar(&opts.gatewayNamespace, "gateway-namespace", "", "the namespace in which the gateway resides on the target cluster") - cmd.Flags().StringVarP(&opts.timeWindow, "time-window", "t", "1m", "Time window (for example: \"15s\", \"1m\", \"10m\", \"1h\"). Needs to be at least 15s.") + cmd.Flags().DurationVarP(&opts.wait, "wait", "w", opts.wait, "time allowed to fetch diagnostics") return cmd } -func requestGatewaysFromAPI(client pb.ApiClient, req *pb.GatewaysRequest) (*pb.GatewaysResponse, error) { - resp, err := client.Gateways(context.Background(), req) - if err != nil { - return nil, fmt.Errorf("Gateways API error: %w", err) +func getGatewayMetrics(k8sAPI *k8s.KubernetesAPI, pods []corev1.Pod, wait time.Duration) []gatewayMetrics { + var metrics []gatewayMetrics + metricsChan := make(chan gatewayMetrics) + var activeRoutines int32 + for _, pod := range pods { + atomic.AddInt32(&activeRoutines, 1) + go func(p corev1.Pod) { + defer atomic.AddInt32(&activeRoutines, -1) + name := p.Labels[k8s.RemoteClusterNameLabel] + container, err := getServiceMirrorContainer(p) + if err != nil { + metricsChan <- gatewayMetrics{ + clusterName: name, + err: err, + } + return + } + metrics, err := k8s.GetContainerMetrics(k8sAPI, p, container, false, k8s.AdminHTTPPortName) + metricsChan <- gatewayMetrics{ + clusterName: name, + metrics: metrics, + err: err, + } + }(pod) } - if e := resp.GetError(); e != nil { - return nil, fmt.Errorf("Gateways API response error: %v", e.Error) + timeout := time.NewTimer(wait) + defer timeout.Stop() +wait: + for { + select { + case metric := <-metricsChan: + metrics = append(metrics, metric) + case <-timeout.C: + break wait + } + if atomic.LoadInt32(&activeRoutines) == 0 { + break + } + } + return metrics +} + +func getServiceMirrorContainer(pod corev1.Pod) (corev1.Container, error) { + if pod.Status.Phase != corev1.PodRunning { + return corev1.Container{}, fmt.Errorf("pod not running: %s", pod.GetName()) + } + for _, c := range pod.Spec.Containers { + if c.Name == "service-mirror" { + return c, nil + } + } + return corev1.Container{}, fmt.Errorf("pod %s did not have 'service-mirror' container", pod.GetName()) +} + +func isTargetClusterMetric(metric *io_prometheus_client.Metric, targetClusterName string) bool { + for _, label := range metric.GetLabel() { + if label.GetName() == "target_cluster_name" { + return label.GetValue() == targetClusterName + } } - return resp, nil + return false } -func renderGateways(rows []*pb.GatewaysTable_Row, w io.Writer) { +func renderGateways(statuses []gatewayStatus, w io.Writer) { t := buildGatewaysTable() t.Data = []table.Row{} - for _, row := range rows { - row := row // Copy to satisfy golint. - t.Data = append(t.Data, gatewaysRowToTableRow(row)) + for _, status := range statuses { + status := status + t.Data = append(t.Data, gatewayStatusToTableRow(status)) } t.Render(w) } @@ -98,79 +221,53 @@ var ( clusterNameHeader = "CLUSTER" aliveHeader = "ALIVE" pairedServicesHeader = "NUM_SVC" - latencyP50Header = "LATENCY_P50" - latencyP95Header = "LATENCY_P95" - latencyP99Header = "LATENCY_P99" + latencyHeader = "LATENCY" ) func buildGatewaysTable() table.Table { columns := []table.Column{ - table.Column{ + { Header: clusterNameHeader, Width: 7, Flexible: true, LeftAlign: true, }, - table.Column{ + { Header: aliveHeader, Width: 5, Flexible: true, LeftAlign: true, }, - table.Column{ + { Header: pairedServicesHeader, Width: 9, }, - table.Column{ - Header: latencyP50Header, - Width: 11, - }, - table.Column{ - Header: latencyP95Header, - Width: 11, - }, - table.Column{ - Header: latencyP99Header, + { + Header: latencyHeader, Width: 11, }, } t := table.NewTable(columns, []table.Row{}) - t.Sort = []int{0, 1} // Sort by namespace, then name. + t.Sort = []int{0} // sort by cluster name return t } -func gatewaysRowToTableRow(row *pb.GatewaysTable_Row) []string { +func gatewayStatusToTableRow(status gatewayStatus) []string { valueOrPlaceholder := func(value string) string { - if row.Alive { + if status.alive { return value } return "-" } - alive := "False" - - if row.Alive { + if status.alive { alive = "True" } return []string{ - row.ClusterName, + status.clusterName, alive, - fmt.Sprint(row.PairedServices), - valueOrPlaceholder(fmt.Sprintf("%dms", row.LatencyMsP50)), - valueOrPlaceholder(fmt.Sprintf("%dms", row.LatencyMsP95)), - valueOrPlaceholder(fmt.Sprintf("%dms", row.LatencyMsP99)), + fmt.Sprint(status.numberOfServices), + valueOrPlaceholder(fmt.Sprintf("%dms", status.latency)), } } - -func extractGatewayPort(gateway *corev1.Service) (uint32, error) { - for _, port := range gateway.Spec.Ports { - if port.Name == k8s.GatewayPortName { - if gateway.Spec.Type == "NodePort" { - return uint32(port.NodePort), nil - } - return uint32(port.Port), nil - } - } - return 0, fmt.Errorf("gateway service %s has no gateway port named %s", gateway.Name, k8s.GatewayPortName) -} diff --git a/multicluster/cmd/link.go b/multicluster/cmd/link.go index b838e02a0454f..766356773d430 100644 --- a/multicluster/cmd/link.go +++ b/multicluster/cmd/link.go @@ -415,3 +415,15 @@ func buildServiceMirrorValues(opts *linkOptions) (*multicluster.Values, error) { return defaults, nil } + +func extractGatewayPort(gateway *corev1.Service) (uint32, error) { + for _, port := range gateway.Spec.Ports { + if port.Name == k8s.GatewayPortName { + if gateway.Spec.Type == "NodePort" { + return uint32(port.NodePort), nil + } + return uint32(port.Port), nil + } + } + return 0, fmt.Errorf("gateway service %s has no gateway port named %s", gateway.Name, k8s.GatewayPortName) +} diff --git a/multicluster/service-mirror/metrics.go b/multicluster/service-mirror/metrics.go index ba14732a04249..2652641d3b143 100644 --- a/multicluster/service-mirror/metrics.go +++ b/multicluster/service-mirror/metrics.go @@ -16,6 +16,7 @@ const ( // workers. type ProbeMetricVecs struct { alive *prometheus.GaugeVec + latency *prometheus.GaugeVec latencies *prometheus.HistogramVec enqueues *prometheus.CounterVec dequeues *prometheus.CounterVec @@ -26,6 +27,7 @@ type ProbeMetricVecs struct { // probe worker. type ProbeMetrics struct { alive prometheus.Gauge + latency prometheus.Gauge latencies prometheus.Observer probes *prometheus.CounterVec unregister func() @@ -79,6 +81,14 @@ func NewProbeMetricVecs() ProbeMetricVecs { labelNames, ) + latency := promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "gateway_latency", + Help: "A gauge which is the latency of the last probe to the gateway.", + }, + labelNames, + ) + latencies := promauto.NewHistogramVec( prometheus.HistogramOpts{ Name: "gateway_probe_latency_ms", @@ -95,6 +105,7 @@ func NewProbeMetricVecs() ProbeMetricVecs { return ProbeMetricVecs{ alive: alive, + latency: latency, latencies: latencies, enqueues: enqueues, dequeues: dequeues, @@ -116,6 +127,7 @@ func (mv ProbeMetricVecs) NewWorkerMetrics(remoteClusterName string) (*ProbeMetr } return &ProbeMetrics{ alive: mv.alive.With(labels), + latency: mv.latency.With(labels), latencies: mv.latencies.With(labels), probes: curriedProbes, unregister: func() { diff --git a/multicluster/service-mirror/probe_worker.go b/multicluster/service-mirror/probe_worker.go index a1d0104e2304a..2795b19c53d31 100644 --- a/multicluster/service-mirror/probe_worker.go +++ b/multicluster/service-mirror/probe_worker.go @@ -117,6 +117,7 @@ func (pw *ProbeWorker) doProbe() { } else { pw.log.Debug("Gateway is healthy") pw.metrics.alive.Set(1) + pw.metrics.latency.Set(float64(end.Milliseconds())) pw.metrics.latencies.Observe(float64(end.Milliseconds())) pw.metrics.probes.With(successLabel).Inc() if !pw.alive { diff --git a/pkg/k8s/metrics.go b/pkg/k8s/metrics.go new file mode 100644 index 0000000000000..6caa73e7d6ca5 --- /dev/null +++ b/pkg/k8s/metrics.go @@ -0,0 +1,49 @@ +package k8s + +import ( + "fmt" + "io/ioutil" + "net/http" + "os" + + corev1 "k8s.io/api/core/v1" +) + +// AdminHTTPPortName is the name of the port used by the admin http server. +const AdminHTTPPortName string = "admin-http" + +// GetContainerMetrics returns the metrics exposed by a container on the passed in portName +func GetContainerMetrics( + k8sAPI *KubernetesAPI, + pod corev1.Pod, + container corev1.Container, + emitLogs bool, + portName string, +) ([]byte, error) { + portForward, err := NewContainerMetricsForward(k8sAPI, pod, container, emitLogs, portName) + if err != nil { + return nil, err + } + + defer portForward.Stop() + if err = portForward.Init(); err != nil { + fmt.Fprintf(os.Stderr, "Error running port-forward: %s", err) + return nil, err + } + + metricsURL := portForward.URLFor("/metrics") + return getResponse(metricsURL) +} + +// getResponse makes a http Get request to the passed url and returns the response/error +func getResponse(url string) ([]byte, error) { + // url has been constructed by k8s.newPortForward and is not passed in by + // the user. + //nolint:gosec + resp, err := http.Get(url) + if err != nil { + return nil, err + } + defer resp.Body.Close() + return ioutil.ReadAll(resp.Body) +} diff --git a/test/integration/multicluster/multicluster-traffic/mc_traffic_test.go b/test/integration/multicluster/multicluster-traffic/mc_traffic_test.go index 7b21d2d5fac28..8127f1c5e3830 100644 --- a/test/integration/multicluster/multicluster-traffic/mc_traffic_test.go +++ b/test/integration/multicluster/multicluster-traffic/mc_traffic_test.go @@ -86,7 +86,7 @@ func TestGateways(t *testing.T) { return errors.New("response is empty") } fields := strings.Fields(rows[1]) - if len(fields) < 6 { + if len(fields) < 4 { return fmt.Errorf("unexpected number of columns: %d", len(fields)) } if fields[0] != "target" {