Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: refactor tcpproxy #94

Merged
merged 1 commit into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion cmd/containerhelper/handlers/tcpproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"io"
"net"
"os"
"strings"
"time"
)

Expand All @@ -27,7 +28,13 @@ func tcpProxyHandler(_ string, _ *model.Resp) error {
}

func tcpProxyHandler0(ctx context.Context, addr string, idleTimeout time.Duration, in io.Reader, out io.Writer) error {
conn, err := net.DialTimeout("tcp", addr, defaultDialTimeout)
var conn net.Conn
var err error
if strings.HasSuffix(addr, ".sock") {
conn, err = net.DialTimeout("unix", addr, defaultDialTimeout)
} else {
conn, err = net.DialTimeout("tcp", addr, defaultDialTimeout)
}
if err != nil {
return err
}
Expand Down
33 changes: 17 additions & 16 deletions pkg/cri/cricore/nsenter_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
package cricore

import (
"errors"
"context"
"github.com/traas-stack/holoinsight-agent/pkg/core"
"github.com/traas-stack/holoinsight-agent/pkg/cri"
"golang.org/x/sys/unix"
Expand Down Expand Up @@ -49,23 +49,24 @@ func NsEnterAndRunCodes(nsFile string, callback func()) error {
return err2
}

func NsEnterDial(c *cri.Container, network, addr string, dialTimeout time.Duration) (net.Conn, error) {
if c.NetworkMode == "host" {
return net.DialTimeout(network, addr, dialTimeout)
}

func NsEnterContainerAndRunCodes(c *cri.Container, callback func()) error {
if strings.HasPrefix(c.NetworkMode, "netns:") {
netNsFile := filepath.Join(core.GetHostfs(), c.NetworkMode[len("netns:"):])
var conn net.Conn
var err error
err2 := NsEnterAndRunCodes(netNsFile, func() {
conn, err = net.DialTimeout(network, addr, dialTimeout)
})
if err == nil {
err = err2
}
return conn, err
return NsEnterAndRunCodes(netNsFile, callback)
}

return nil, errors.New("invalid NetworkMode: " + c.NetworkMode)
callback()
return nil
}

func NsEnterDial(ctx context.Context, c *cri.Container, network, addr string, dialTimeout time.Duration) (net.Conn, error) {
var conn net.Conn
var err error
err2 := NsEnterContainerAndRunCodes(c, func() {
conn, err = (&net.Dialer{Timeout: dialTimeout}).DialContext(ctx, network, addr)
})
if err == nil {
err = err2
}
return conn, err
}
30 changes: 24 additions & 6 deletions pkg/cri/criutils/tcpproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,40 @@ import (
"time"
)

const (
delayCancel = 500 * time.Millisecond
)

// TcpProxy
func TcpProxy(ctx context.Context, i cri.Interface, c *cri.Container, addr string, dialTimeout time.Duration) (net.Conn, error) {
if c.Runtime == cri.Runc {
sandbox := c.Pod.Sandbox
if sandbox != nil {
conn, err := cricore.NsEnterDial(c, "tcp", addr, dialTimeout)
conn, err := cricore.NsEnterDial(ctx, c, "tcp", addr, dialTimeout)
logger.Infozc(ctx, "[netproxy] runtime is runc, use nsenter", zap.Error(err))
return conn, err
}
}
return TcpProxyByExec(ctx, i, c, addr)
}

func TcpProxyByExec(ctx context.Context, i cri.Interface, c *cri.Container, addr string) (net.Conn, error) {
pin, pout := io.Pipe()
ctx2, cancel := context.WithCancel(ctx)

// Normally, the stream ends before exec ends.
// If execCtx has been canceled at this time, exec will die from kill.
// Although there is no actual loss (because the stream has been read) this will result in an error.
// So we have to delay cancel execCtx
execCtx, cancel := context.WithCancel(context.Background())
go func() {
select {
case <-ctx.Done():
time.AfterFunc(delayCancel, cancel)
case <-execCtx.Done():
}
}()
logger.Infozc(ctx, "[netproxy] use cri exec")
ear, err := i.ExecAsync(ctx2, c, cri.ExecRequest{
ear, err := i.ExecAsync(execCtx, c, cri.ExecRequest{
Cmd: []string{core.HelperToolPath, "tcpProxy"},
Env: []string{"TCPPROXY_ADDR=" + addr, "TCPPROXY_IDLE_TIMEOUT=180s", "NO_JSON_OUTPUT=true"},
Input: pin,
Expand All @@ -53,7 +72,7 @@ func TcpProxy(ctx context.Context, i cri.Interface, c *cri.Container, addr strin
var rc cri.ExecAsyncResultCode
hasResult := false
select {
case <-ctx2.Done():
case <-execCtx.Done():
case rc = <-ear.Result:
hasResult = true
}
Expand All @@ -70,11 +89,10 @@ func TcpProxy(ctx context.Context, i cri.Interface, c *cri.Container, addr strin
Reader: ear.Stdout,
Writer: pout,
CloseFunc: func() {
// TODO uses a more deterministic strategy
// If we cancel ctx immediately, then the bottom layer has a certain probability to return <-ctx.Done() instead of <-ear.Result, and there is competition here.
//We prefer to leave the opportunity to <-ear.Result, so here is an appropriate delay of 100ms.
// cancel happens before ear.Result
time.AfterFunc(100*time.Millisecond, cancel)
time.AfterFunc(delayCancel, cancel)
},
}, nil
}
2 changes: 1 addition & 1 deletion pkg/cri/impl/netproxy/port_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (t *PortForwardTask) Start(ctx context.Context) (string, error) {
func handlePortForwardRequest(logCtx zap.Option, biz *cri.Container, conn net.Conn, addr string) {
defer conn.Close()

subConn, err := tcpProxy(logger.WithLogCtx(context.Background(), logCtx), ioc.Crii, biz, addr, DefaultDialTimeout)
subConn, err := TcpProxy(logger.WithLogCtx(context.Background(), logCtx), ioc.Crii, biz, addr, DefaultDialTimeout)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cri/impl/netproxy/proxy_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func createNsEnterHttpClient(pod *cri.Pod) (*http.Transport, *http.Client, error
if d, ok := ctx.Deadline(); ok {
timeout = d.Sub(time.Now())
}
return cricore.NsEnterDial(pod.Sandbox, network, addr, timeout)
return cricore.NsEnterDial(ctx, pod.Sandbox, network, addr, timeout)
})
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/cri/impl/netproxy/proxy_socks5.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (h *CriHandle) tcpHandle(s *socks5.Server, c *net.TCPConn, r *socks5.Reques

logCtx := zap.Fields(zap.String("uuid", uuid2), zap.String("protocol", "socks5"), zap.String("cid", biz.ShortContainerID()), zap.String("addr", addr))

proxied, err := tcpProxy(logger.WithLogCtx(context.Background(), logCtx), h.Cri, biz, addr, DefaultDialTimeout)
proxied, err := TcpProxy(logger.WithLogCtx(context.Background(), logCtx), h.Cri, biz, addr, DefaultDialTimeout)
if err != nil {
logger.Infozo(logCtx, "[netproxy] create tcperror error", zap.Error(err))
a, addr, port, _ := socks5.ParseAddress(Socks5ProxyAddr)
Expand Down Expand Up @@ -171,7 +171,7 @@ func (h *CriHandle) UDPHandle(s *socks5.Server, addr *net.UDPAddr, d *socks5.Dat
return errors.New("unsupported")
}

func tcpProxy(ctx context.Context, i cri.Interface, c *cri.Container, addr string, dialTimeout time.Duration) (net.Conn, error) {
func TcpProxy(ctx context.Context, i cri.Interface, c *cri.Container, addr string, dialTimeout time.Duration) (net.Conn, error) {
for _, handler := range tcpHandlers {
if conn, err := handler(ctx, i, c, addr, dialTimeout); conn != nil && err == nil {
return conn, err
Expand Down
4 changes: 4 additions & 0 deletions pkg/util/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,3 +209,7 @@ func ReplaceHost(hostport string, host string) string {
}
return host
}

func ReplaceHostToLocalhost(hostport string) string {
return ReplaceHost(hostport, "127.0.0.1")
}
Loading