diff --git a/cmd/containerhelper/handlers/all.go b/cmd/containerhelper/handlers/all.go index f104607..3ad9428 100644 --- a/cmd/containerhelper/handlers/all.go +++ b/cmd/containerhelper/handlers/all.go @@ -18,4 +18,5 @@ func init() { model.RegisterHandler("inputProxy", inputProxyHandler) model.RegisterHandler("httpProxy", httpProxyHandler) model.RegisterHandler("tcpProxy", tcpProxyHandler) + model.RegisterHandler("fixout", fixOutHandler) } diff --git a/cmd/containerhelper/handlers/fixout.go b/cmd/containerhelper/handlers/fixout.go new file mode 100644 index 0000000..7b809b3 --- /dev/null +++ b/cmd/containerhelper/handlers/fixout.go @@ -0,0 +1,52 @@ +/* + * Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0. + */ + +package handlers + +import ( + fixout2 "github.com/traas-stack/holoinsight-agent/cmd/containerhelper/handlers/fixout" + "github.com/traas-stack/holoinsight-agent/cmd/containerhelper/model" + "os" + "os/exec" +) + +// fixOutHandler will run another process and encode the stdout/stderr of that process into fixOutHandler's stdout. +func fixOutHandler(action string, resp *model.Resp) error { + // build cmd + cmd := exec.Command(os.Args[2], os.Args[3:]...) + cmd.Stdin = os.Stdin + stdoutr, err := cmd.StdoutPipe() + if err != nil { + return err + } + stderrr, err := cmd.StderrPipe() + if err != nil { + return err + } + + if err := cmd.Start(); err != nil { + return err + } + + errChan := make(chan error, 2) + // encode cmd's stdout and stderr into os.Stdout + go fixout2.CopyStream(fixout2.StdoutFd, stdoutr, errChan) + go fixout2.CopyStream(fixout2.StderrFd, stderrr, errChan) + + wait := 2 +loop: + for { + select { + case <-errChan: + wait-- + if wait == 0 { + cmd.Wait() + // done + break loop + } + } + } + + return nil +} diff --git a/cmd/containerhelper/handlers/fixout/common.go b/cmd/containerhelper/handlers/fixout/common.go new file mode 100644 index 0000000..4db82b4 --- /dev/null +++ b/cmd/containerhelper/handlers/fixout/common.go @@ -0,0 +1,11 @@ +/* + * Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0. + */ + +package fixout + +const ( + StdoutFd = 1 + StderrFd = 2 + bufSize = 4096 +) diff --git a/cmd/containerhelper/handlers/fixout/decode.go b/cmd/containerhelper/handlers/fixout/decode.go new file mode 100644 index 0000000..dce0f6f --- /dev/null +++ b/cmd/containerhelper/handlers/fixout/decode.go @@ -0,0 +1,45 @@ +/* + * Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0. + */ + +package fixout + +import ( + "encoding/binary" + "io" +) + +func Decode(hr *io.PipeReader, stdoutW io.WriteCloser, stderrW io.WriteCloser) error { + var err error + activeFdCount := 2 + for activeFdCount > 0 { + var fd byte + var size int16 + if err = binary.Read(hr, binary.LittleEndian, &fd); err != nil { + break + } + if err = binary.Read(hr, binary.LittleEndian, &size); err != nil { + break + } + if size == -1 { + activeFdCount-- + switch fd { + case StdoutFd: + stdoutW.Close() + case StderrFd: + stderrW.Close() + } + continue + } + switch fd { + case StdoutFd: + _, err = io.CopyN(stdoutW, hr, int64(size)) + case StderrFd: + _, err = io.CopyN(stderrW, hr, int64(size)) + } + if err != nil { + return err + } + } + return nil +} diff --git a/cmd/containerhelper/handlers/fixout/encode.go b/cmd/containerhelper/handlers/fixout/encode.go new file mode 100644 index 0000000..7ea9309 --- /dev/null +++ b/cmd/containerhelper/handlers/fixout/encode.go @@ -0,0 +1,71 @@ +/* + * Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0. + */ + +package fixout + +import ( + "encoding/binary" + "io" + "os" + "sync" +) + +var ( + writeLock sync.Mutex +) + +func write(fd int, payload []byte) error { + writeLock.Lock() + defer writeLock.Unlock() + + // fd 1 byte + // size 2 bytes + // payload bytes (optional) + if err := binary.Write(os.Stdout, binary.LittleEndian, byte(fd)); err != nil { + return err + } + if err := binary.Write(os.Stdout, binary.LittleEndian, int16(len(payload))); err != nil { + return err + } + return binary.Write(os.Stdout, binary.LittleEndian, payload) +} + +func writeClose(fd int) error { + writeLock.Lock() + defer writeLock.Unlock() + + // fd 1 byte + // -1 2 bytes (const) + if err := binary.Write(os.Stdout, binary.LittleEndian, byte(fd)); err != nil { + return err + } + return binary.Write(os.Stdout, binary.LittleEndian, int16(-1)) +} + +// copyStream reads bytes from in, and encodes bytes into os.Stdout +func CopyStream(fd int, in io.Reader, errChan chan error) { + buf := make([]byte, bufSize) + for { + n, err := in.Read(buf) + var err2 error + if n > 0 { + err2 = write(fd, buf[:n]) + } + if err == io.EOF { + if err2 == nil { + err2 = writeClose(fd) + } + } + if err == nil { + err = err2 + } + if err != nil { + errChan <- err + if err != io.EOF { + io.Copy(io.Discard, in) + } + break + } + } +} diff --git a/pkg/cri/api.go b/pkg/cri/api.go index 476ed4b..4c2bd09 100644 --- a/pkg/cri/api.go +++ b/pkg/cri/api.go @@ -135,7 +135,8 @@ type ( WorkingDir string `json:"workingDir"` Input io.Reader // User is the user passed to docker exec, defaults to 'root' - User string + User string + FixOut bool } ) diff --git a/pkg/cri/criutils/tcpproxy.go b/pkg/cri/criutils/tcpproxy.go index 88a6d19..79402bf 100644 --- a/pkg/cri/criutils/tcpproxy.go +++ b/pkg/cri/criutils/tcpproxy.go @@ -22,23 +22,26 @@ func TcpProxy(ctx context.Context, i cri.Interface, c *cri.Container, addr strin if c.Runtime == cri.Runc { sandbox := c.Pod.Sandbox if sandbox != nil { - return cricore.NsEnterDial(c, "tcp", addr, dialTimeout) + conn, err := cricore.NsEnterDial(c, "tcp", addr, dialTimeout) + logger.Infozc(ctx, "[netproxy] runtime is runc, use nsenter", zap.Error(err)) + return conn, err } } pin, pout := io.Pipe() ctx2, cancel := context.WithCancel(ctx) + logger.Infozc(ctx, "[netproxy] use cri exec") ear, err := i.ExecAsync(ctx2, c, cri.ExecRequest{ - Cmd: []string{core.HelperToolPath, "tcpProxy"}, - Env: []string{"TCPPROXY_ADDR=" + addr, "TCPPROXY_IDLE_TIMEOUT=180s", "NO_JSON_OUTPUT=true"}, - Input: pin, + Cmd: []string{core.HelperToolPath, "tcpProxy"}, + Env: []string{"TCPPROXY_ADDR=" + addr, "TCPPROXY_IDLE_TIMEOUT=180s", "NO_JSON_OUTPUT=true"}, + Input: pin, + FixOut: true, }) if err != nil { cancel() pout.CloseWithError(err) return nil, err } - stderrCh := make(chan string, 1) go func() { bs, _ := io.ReadAll(ear.Stderr) @@ -47,19 +50,19 @@ func TcpProxy(ctx context.Context, i cri.Interface, c *cri.Container, addr strin go func() { defer cancel() - var rc cri.ExecAsyncResultCode hasResult := false select { - case <-ctx.Done(): + case <-ctx2.Done(): case rc = <-ear.Result: hasResult = true } + if !hasResult { rc = <-ear.Result } stderr := <-stderrCh - logger.Infozc(ctx, "[netproxy] tcpproxy exec finished", zap.Int("code", rc.Code), zap.String("stderr", stderr), zap.Error(rc.Err)) + logger.Infozc(ctx, "[netproxy] cri exec finished", zap.Int("code", rc.Code), zap.String("stderr", stderr), zap.Error(rc.Err)) pout.CloseWithError(rc.Err) }() @@ -67,8 +70,10 @@ func TcpProxy(ctx context.Context, i cri.Interface, c *cri.Container, addr strin Reader: ear.Stdout, Writer: pout, CloseFunc: func() { + // TODO uses a more deterministic strategy // If we cancel ctx immediately, then the bottom layer has a certain probability to return <-ctx.Done() instead of <-ear.Result, and there is competition here. //We prefer to leave the opportunity to <-ear.Result, so here is an appropriate delay of 100ms. + // cancel happens before ear.Result time.AfterFunc(100*time.Millisecond, cancel) }, }, nil diff --git a/pkg/cri/impl/default_cri.go b/pkg/cri/impl/default_cri.go index c8dcb42..f122a80 100644 --- a/pkg/cri/impl/default_cri.go +++ b/pkg/cri/impl/default_cri.go @@ -956,5 +956,11 @@ func (e *defaultCri) ExecAsync(ctx context.Context, c *cri.Container, req cri.Ex if req.User == "" { req.User = defaultExecUser } + logger.Criz("[digest] exec async", + zap.String("engine", e.engine.Type()), + zap.String("cid", c.ShortContainerID()), + zap.String("runtime", c.Runtime), + zap.Strings("cmd", req.Cmd), + zap.Strings("env", req.Env)) return e.engine.ExecAsync(ctx, c, req) } diff --git a/pkg/cri/impl/engine/docker_engine.go b/pkg/cri/impl/engine/docker_engine.go index a07a72d..e7a373c 100644 --- a/pkg/cri/impl/engine/docker_engine.go +++ b/pkg/cri/impl/engine/docker_engine.go @@ -11,6 +11,8 @@ import ( "github.com/docker/docker/api/types" dockersdk "github.com/docker/docker/client" "github.com/docker/docker/pkg/stdcopy" + "github.com/traas-stack/holoinsight-agent/cmd/containerhelper/handlers/fixout" + "github.com/traas-stack/holoinsight-agent/pkg/core" "github.com/traas-stack/holoinsight-agent/pkg/cri" "github.com/traas-stack/holoinsight-agent/pkg/cri/dockerutils" "github.com/traas-stack/holoinsight-agent/pkg/k8s/k8slabels" @@ -223,6 +225,10 @@ func (e *DockerContainerEngine) Supports(feature cri.ContainerEngineFeature) boo func (e *DockerContainerEngine) ExecAsync(ctx context.Context, c *cri.Container, req cri.ExecRequest) (cri.ExecAsyncResult, error) { resultCh := make(chan cri.ExecAsyncResultCode) + hackedCmd := req.Cmd + if req.FixOut { + hackedCmd = append([]string{core.HelperToolPath, "fixout"}, req.Cmd...) + } invalidResult := cri.ExecAsyncResult{Cmd: strings.Join(req.Cmd, " "), Result: resultCh} create, err := e.Client.ContainerExecCreate(ctx, c.Id, types.ExecConfig{ User: req.User, @@ -235,7 +241,7 @@ func (e *DockerContainerEngine) ExecAsync(ctx context.Context, c *cri.Container, DetachKeys: "", Env: req.Env, WorkingDir: req.WorkingDir, - Cmd: req.Cmd, + Cmd: hackedCmd, }) if err != nil { return invalidResult, err @@ -263,14 +269,32 @@ func (e *DockerContainerEngine) ExecAsync(ctx context.Context, c *cri.Container, respReaderDone := false go func() { - _, err := stdcopy.StdCopy(stdoutW, stderrW, resp.Reader) - respReaderDone = true - if err != nil && err != io.EOF { - stdoutW.CloseWithError(err) - stderrW.CloseWithError(err) + if req.FixOut { + hr, hw := io.Pipe() + go func() { + // decode docker resp to stdout and stderr + // stderr is useless wo put it into Discard + stdcopy.StdCopy(hw, io.Discard, resp.Reader) + hw.Close() + }() + err := fixout.Decode(hr, stdoutW, stderrW) + respReaderDone = true + if err != nil && err != io.EOF { + stdoutW.CloseWithError(err) + stderrW.CloseWithError(err) + } + errCh <- err + io.Copy(io.Discard, hr) + } else { + _, err := stdcopy.StdCopy(stdoutW, stderrW, resp.Reader) + respReaderDone = true + if err != nil && err != io.EOF { + stdoutW.CloseWithError(err) + stderrW.CloseWithError(err) + } + errCh <- err + io.Copy(io.Discard, resp.Reader) } - errCh <- err - io.Copy(io.Discard, resp.Reader) }() wait := 2 diff --git a/pkg/cri/impl/netproxy/common.go b/pkg/cri/impl/netproxy/common.go index 1c8f3ed..e2bbb8a 100644 --- a/pkg/cri/impl/netproxy/common.go +++ b/pkg/cri/impl/netproxy/common.go @@ -7,6 +7,7 @@ package netproxy import ( "context" "github.com/traas-stack/holoinsight-agent/pkg/cri" + "net" "net/http" "time" ) @@ -17,13 +18,19 @@ const ( ) type ( - Handler func(ctx context.Context, pod *cri.Pod, req *http.Request) (*http.Request, *http.Response, error) + HttpHandler func(ctx context.Context, pod *cri.Pod, req *http.Request) (*http.Request, *http.Response, error) + TcpHandler func(ctx context.Context, i cri.Interface, c *cri.Container, addr string, dialTimeout time.Duration) (net.Conn, error) ) var ( - handlers []Handler + handlers []HttpHandler + tcpHandlers []TcpHandler ) -func AddHttpProxyHandler(handler Handler) { +func AddHttpProxyHandler(handler HttpHandler) { handlers = append(handlers, handler) } + +func AddTcpProxyHandler(handler TcpHandler) { + tcpHandlers = append(tcpHandlers, handler) +} diff --git a/pkg/cri/impl/netproxy/port_forward.go b/pkg/cri/impl/netproxy/port_forward.go index 5683cb3..ddbfd72 100644 --- a/pkg/cri/impl/netproxy/port_forward.go +++ b/pkg/cri/impl/netproxy/port_forward.go @@ -80,7 +80,7 @@ func (t *PortForwardTask) Start(ctx context.Context) (string, error) { func handlePortForwardRequest(logCtx zap.Option, biz *cri.Container, conn net.Conn, addr string) { defer conn.Close() - subConn, err := criutils.TcpProxy(logger.WithLogCtx(context.Background(), logCtx), ioc.Crii, biz, addr, DefaultDialTimeout) + subConn, err := tcpProxy(logger.WithLogCtx(context.Background(), logCtx), ioc.Crii, biz, addr, DefaultDialTimeout) if err != nil { panic(err) } diff --git a/pkg/cri/impl/netproxy/proxy_socks5.go b/pkg/cri/impl/netproxy/proxy_socks5.go index ca6fbc2..8b1a121 100644 --- a/pkg/cri/impl/netproxy/proxy_socks5.go +++ b/pkg/cri/impl/netproxy/proxy_socks5.go @@ -80,6 +80,10 @@ func (h *CriHandle) tcpHandle(s *socks5.Server, c *net.TCPConn, r *socks5.Reques } pod := criutils.FindFirstPodByIp(h.Cri, host) if pod == nil { + a, addr, port, _ := socks5.ParseAddress(Socks5ProxyAddr) + rep := socks5.RepHostUnreachable + socks5.NewReply(rep, a, addr, port).WriteTo(c) + logger.Errorz("no pod when proxy", zap.String("ip", host)) return errors.New("no pod") } if len(pod.Biz) == 0 { @@ -93,7 +97,7 @@ func (h *CriHandle) tcpHandle(s *socks5.Server, c *net.TCPConn, r *socks5.Reques logCtx := zap.Fields(zap.String("uuid", uuid2), zap.String("protocol", "socks5"), zap.String("cid", biz.ShortContainerID()), zap.String("addr", addr)) - proxied, err := criutils.TcpProxy(logger.WithLogCtx(context.Background(), logCtx), h.Cri, biz, addr, DefaultDialTimeout) + proxied, err := tcpProxy(logger.WithLogCtx(context.Background(), logCtx), h.Cri, biz, addr, DefaultDialTimeout) if err != nil { logger.Infozo(logCtx, "[netproxy] create tcperror error", zap.Error(err)) a, addr, port, _ := socks5.ParseAddress(Socks5ProxyAddr) @@ -107,8 +111,6 @@ func (h *CriHandle) tcpHandle(s *socks5.Server, c *net.TCPConn, r *socks5.Reques } return err } - logger.Infozo(logCtx, "[netproxy] create") - defer proxied.Close() // handshake { @@ -118,6 +120,8 @@ func (h *CriHandle) tcpHandle(s *socks5.Server, c *net.TCPConn, r *socks5.Reques return err } } + logger.Infozo(logCtx, "[netproxy] stream created") + defer proxied.Close() // copy streams errCh := make(chan error, 2) @@ -166,3 +170,12 @@ func (h *CriHandle) tcpHandle(s *socks5.Server, c *net.TCPConn, r *socks5.Reques func (h *CriHandle) UDPHandle(s *socks5.Server, addr *net.UDPAddr, d *socks5.Datagram) error { return errors.New("unsupported") } + +func tcpProxy(ctx context.Context, i cri.Interface, c *cri.Container, addr string, dialTimeout time.Duration) (net.Conn, error) { + for _, handler := range tcpHandlers { + if conn, err := handler(ctx, i, c, addr, dialTimeout); conn != nil && err == nil { + return conn, err + } + } + return criutils.TcpProxy(ctx, i, c, addr, dialTimeout) +} diff --git a/pkg/util/net.go b/pkg/util/net.go index 34925c1..4cf9151 100644 --- a/pkg/util/net.go +++ b/pkg/util/net.go @@ -201,3 +201,11 @@ func (h *DnsCacheHelper) NewHttpClient() *http.Client { func dial(ctx context.Context, network, addr string) (net.Conn, error) { return NewDnsCacheHelper().Dial(ctx, network, addr) } + +func ReplaceHost(hostport string, host string) string { + _, port, err := net.SplitHostPort(hostport) + if err == nil { + return net.JoinHostPort(host, port) + } + return host +} diff --git a/pkg/util/net_test.go b/pkg/util/net_test.go new file mode 100644 index 0000000..ff1e4f7 --- /dev/null +++ b/pkg/util/net_test.go @@ -0,0 +1,14 @@ +/* + * Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0. + */ + +package util + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestReplaceHost(t *testing.T) { + assert.Equal(t, "2.2.2.2:2222", ReplaceHost("1.1.1.1:2222", "2.2.2.2")) +}