Skip to content

Commit

Permalink
feat: replace spdy with websocket for portforward and pod exec #21517 (
Browse files Browse the repository at this point in the history
…#21518)

Signed-off-by: xinmei.mao <[email protected]>
Co-authored-by: xinmei.mao <[email protected]>
  • Loading branch information
maoqide and xinmei.mao authored Jan 28, 2025
1 parent e3bcc48 commit 1698370
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 2 deletions.
15 changes: 15 additions & 0 deletions server/application/terminal.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ import (
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
cmdutil "k8s.io/kubectl/pkg/cmd/util"

appv1 "github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1"
applisters "github.com/argoproj/argo-cd/v3/pkg/client/listers/application/v1alpha1"
Expand Down Expand Up @@ -318,6 +320,19 @@ func startProcess(k8sClient kubernetes.Interface, cfg *rest.Config, namespace, p
return err
}

// Fallback executor is default, unless feature flag is explicitly disabled.
// Reuse environment variable for kubectl to disable the feature flag, default is enabled.
if !cmdutil.RemoteCommandWebsockets.IsDisabled() {
// WebSocketExecutor must be "GET" method as described in RFC 6455 Sec. 4.1 (page 17).
websocketExec, err := remotecommand.NewWebSocketExecutor(cfg, "GET", req.URL().String())
if err != nil {
return err
}
exec, err = remotecommand.NewFallbackExecutor(websocketExec, exec, httpstream.IsUpgradeFailure)
if err != nil {
return err
}
}
return exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{
Stdin: ptyHandler,
Stdout: ptyHandler,
Expand Down
17 changes: 15 additions & 2 deletions util/kube/portforwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
cmdutil "k8s.io/kubectl/pkg/cmd/util"

"github.com/argoproj/argo-cd/v3/util/io"
)
Expand Down Expand Up @@ -71,6 +73,18 @@ func PortForward(targetPort int, namespace string, overrides *clientcmd.ConfigOv
}
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url)

// Reuse environment variable for kubectl to disable the feature flag, default is enabled.
if !cmdutil.PortForwardWebsockets.IsDisabled() {
tunnelingDialer, err := portforward.NewSPDYOverWebsocketDialer(url, config)
if err != nil {
return -1, fmt.Errorf("could not create tunneling dialer: %w", err)
}
// First attempt tunneling (websocket) dialer, then fallback to spdy dialer.
dialer = portforward.NewFallbackDialer(tunnelingDialer, dialer, func(err error) bool {
return httpstream.IsUpgradeFailure(err) || httpstream.IsHTTPSProxyError(err)
})
}

readyChan := make(chan struct{}, 1)
failedChan := make(chan error, 1)
out := new(bytes.Buffer)
Expand All @@ -82,8 +96,7 @@ func PortForward(targetPort int, namespace string, overrides *clientcmd.ConfigOv
}
port := ln.Addr().(*net.TCPAddr).Port
io.Close(ln)

forwarder, err := portforward.New(dialer, []string{fmt.Sprintf("%d:%d", port, targetPort)}, context.Background().Done(), readyChan, out, errOut)
forwarder, err := portforward.NewOnAddresses(dialer, []string{"localhost"}, []string{fmt.Sprintf("%d:%d", port, targetPort)}, context.Background().Done(), readyChan, out, errOut)
if err != nil {
return -1, err
}
Expand Down

0 comments on commit 1698370

Please sign in to comment.