Skip to content

Commit

Permalink
fix: fix docker exec stdout EOF missing (#93)
Browse files Browse the repository at this point in the history
  • Loading branch information
xzchaoo authored Mar 13, 2024
1 parent 7ad2670 commit 3ccbd6b
Show file tree
Hide file tree
Showing 14 changed files with 282 additions and 24 deletions.
1 change: 1 addition & 0 deletions cmd/containerhelper/handlers/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ func init() {
model.RegisterHandler("inputProxy", inputProxyHandler)
model.RegisterHandler("httpProxy", httpProxyHandler)
model.RegisterHandler("tcpProxy", tcpProxyHandler)
model.RegisterHandler("fixout", fixOutHandler)
}
52 changes: 52 additions & 0 deletions cmd/containerhelper/handlers/fixout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0.
*/

package handlers

import (
fixout2 "github.com/traas-stack/holoinsight-agent/cmd/containerhelper/handlers/fixout"
"github.com/traas-stack/holoinsight-agent/cmd/containerhelper/model"
"os"
"os/exec"
)

// fixOutHandler will run another process and encode the stdout/stderr of that process into fixOutHandler's stdout.
func fixOutHandler(action string, resp *model.Resp) error {
// build cmd
cmd := exec.Command(os.Args[2], os.Args[3:]...)
cmd.Stdin = os.Stdin
stdoutr, err := cmd.StdoutPipe()
if err != nil {
return err
}
stderrr, err := cmd.StderrPipe()
if err != nil {
return err
}

if err := cmd.Start(); err != nil {
return err
}

errChan := make(chan error, 2)
// encode cmd's stdout and stderr into os.Stdout
go fixout2.CopyStream(fixout2.StdoutFd, stdoutr, errChan)
go fixout2.CopyStream(fixout2.StderrFd, stderrr, errChan)

wait := 2
loop:
for {
select {
case <-errChan:
wait--
if wait == 0 {
cmd.Wait()
// done
break loop
}
}
}

return nil
}
11 changes: 11 additions & 0 deletions cmd/containerhelper/handlers/fixout/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0.
*/

package fixout

const (
StdoutFd = 1
StderrFd = 2
bufSize = 4096
)
45 changes: 45 additions & 0 deletions cmd/containerhelper/handlers/fixout/decode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0.
*/

package fixout

import (
"encoding/binary"
"io"
)

func Decode(hr *io.PipeReader, stdoutW io.WriteCloser, stderrW io.WriteCloser) error {
var err error
activeFdCount := 2
for activeFdCount > 0 {
var fd byte
var size int16
if err = binary.Read(hr, binary.LittleEndian, &fd); err != nil {
break
}
if err = binary.Read(hr, binary.LittleEndian, &size); err != nil {
break
}
if size == -1 {
activeFdCount--
switch fd {
case StdoutFd:
stdoutW.Close()
case StderrFd:
stderrW.Close()
}
continue
}
switch fd {
case StdoutFd:
_, err = io.CopyN(stdoutW, hr, int64(size))
case StderrFd:
_, err = io.CopyN(stderrW, hr, int64(size))
}
if err != nil {
return err
}
}
return nil
}
71 changes: 71 additions & 0 deletions cmd/containerhelper/handlers/fixout/encode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0.
*/

package fixout

import (
"encoding/binary"
"io"
"os"
"sync"
)

var (
writeLock sync.Mutex
)

func write(fd int, payload []byte) error {
writeLock.Lock()
defer writeLock.Unlock()

// fd 1 byte
// size 2 bytes
// payload <size> bytes (optional)
if err := binary.Write(os.Stdout, binary.LittleEndian, byte(fd)); err != nil {
return err
}
if err := binary.Write(os.Stdout, binary.LittleEndian, int16(len(payload))); err != nil {
return err
}
return binary.Write(os.Stdout, binary.LittleEndian, payload)
}

func writeClose(fd int) error {
writeLock.Lock()
defer writeLock.Unlock()

// fd 1 byte
// -1 2 bytes (const)
if err := binary.Write(os.Stdout, binary.LittleEndian, byte(fd)); err != nil {
return err
}
return binary.Write(os.Stdout, binary.LittleEndian, int16(-1))
}

// copyStream reads bytes from in, and encodes bytes into os.Stdout
func CopyStream(fd int, in io.Reader, errChan chan error) {
buf := make([]byte, bufSize)
for {
n, err := in.Read(buf)
var err2 error
if n > 0 {
err2 = write(fd, buf[:n])
}
if err == io.EOF {
if err2 == nil {
err2 = writeClose(fd)
}
}
if err == nil {
err = err2
}
if err != nil {
errChan <- err
if err != io.EOF {
io.Copy(io.Discard, in)
}
break
}
}
}
3 changes: 2 additions & 1 deletion pkg/cri/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ type (
WorkingDir string `json:"workingDir"`
Input io.Reader
// User is the user passed to docker exec, defaults to 'root'
User string
User string
FixOut bool
}
)

Expand Down
21 changes: 13 additions & 8 deletions pkg/cri/criutils/tcpproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,26 @@ func TcpProxy(ctx context.Context, i cri.Interface, c *cri.Container, addr strin
if c.Runtime == cri.Runc {
sandbox := c.Pod.Sandbox
if sandbox != nil {
return cricore.NsEnterDial(c, "tcp", addr, dialTimeout)
conn, err := cricore.NsEnterDial(c, "tcp", addr, dialTimeout)
logger.Infozc(ctx, "[netproxy] runtime is runc, use nsenter", zap.Error(err))
return conn, err
}
}

pin, pout := io.Pipe()
ctx2, cancel := context.WithCancel(ctx)
logger.Infozc(ctx, "[netproxy] use cri exec")
ear, err := i.ExecAsync(ctx2, c, cri.ExecRequest{
Cmd: []string{core.HelperToolPath, "tcpProxy"},
Env: []string{"TCPPROXY_ADDR=" + addr, "TCPPROXY_IDLE_TIMEOUT=180s", "NO_JSON_OUTPUT=true"},
Input: pin,
Cmd: []string{core.HelperToolPath, "tcpProxy"},
Env: []string{"TCPPROXY_ADDR=" + addr, "TCPPROXY_IDLE_TIMEOUT=180s", "NO_JSON_OUTPUT=true"},
Input: pin,
FixOut: true,
})
if err != nil {
cancel()
pout.CloseWithError(err)
return nil, err
}

stderrCh := make(chan string, 1)
go func() {
bs, _ := io.ReadAll(ear.Stderr)
Expand All @@ -47,28 +50,30 @@ func TcpProxy(ctx context.Context, i cri.Interface, c *cri.Container, addr strin

go func() {
defer cancel()

var rc cri.ExecAsyncResultCode
hasResult := false
select {
case <-ctx.Done():
case <-ctx2.Done():
case rc = <-ear.Result:
hasResult = true
}

if !hasResult {
rc = <-ear.Result
}
stderr := <-stderrCh
logger.Infozc(ctx, "[netproxy] tcpproxy exec finished", zap.Int("code", rc.Code), zap.String("stderr", stderr), zap.Error(rc.Err))
logger.Infozc(ctx, "[netproxy] cri exec finished", zap.Int("code", rc.Code), zap.String("stderr", stderr), zap.Error(rc.Err))
pout.CloseWithError(rc.Err)
}()

return &util.ReadWriterConn{
Reader: ear.Stdout,
Writer: pout,
CloseFunc: func() {
// TODO uses a more deterministic strategy
// If we cancel ctx immediately, then the bottom layer has a certain probability to return <-ctx.Done() instead of <-ear.Result, and there is competition here.
//We prefer to leave the opportunity to <-ear.Result, so here is an appropriate delay of 100ms.
// cancel happens before ear.Result
time.AfterFunc(100*time.Millisecond, cancel)
},
}, nil
Expand Down
6 changes: 6 additions & 0 deletions pkg/cri/impl/default_cri.go
Original file line number Diff line number Diff line change
Expand Up @@ -956,5 +956,11 @@ func (e *defaultCri) ExecAsync(ctx context.Context, c *cri.Container, req cri.Ex
if req.User == "" {
req.User = defaultExecUser
}
logger.Criz("[digest] exec async",
zap.String("engine", e.engine.Type()),
zap.String("cid", c.ShortContainerID()),
zap.String("runtime", c.Runtime),
zap.Strings("cmd", req.Cmd),
zap.Strings("env", req.Env))
return e.engine.ExecAsync(ctx, c, req)
}
40 changes: 32 additions & 8 deletions pkg/cri/impl/engine/docker_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/docker/docker/api/types"
dockersdk "github.com/docker/docker/client"
"github.com/docker/docker/pkg/stdcopy"
"github.com/traas-stack/holoinsight-agent/cmd/containerhelper/handlers/fixout"
"github.com/traas-stack/holoinsight-agent/pkg/core"
"github.com/traas-stack/holoinsight-agent/pkg/cri"
"github.com/traas-stack/holoinsight-agent/pkg/cri/dockerutils"
"github.com/traas-stack/holoinsight-agent/pkg/k8s/k8slabels"
Expand Down Expand Up @@ -223,6 +225,10 @@ func (e *DockerContainerEngine) Supports(feature cri.ContainerEngineFeature) boo

func (e *DockerContainerEngine) ExecAsync(ctx context.Context, c *cri.Container, req cri.ExecRequest) (cri.ExecAsyncResult, error) {
resultCh := make(chan cri.ExecAsyncResultCode)
hackedCmd := req.Cmd
if req.FixOut {
hackedCmd = append([]string{core.HelperToolPath, "fixout"}, req.Cmd...)
}
invalidResult := cri.ExecAsyncResult{Cmd: strings.Join(req.Cmd, " "), Result: resultCh}
create, err := e.Client.ContainerExecCreate(ctx, c.Id, types.ExecConfig{
User: req.User,
Expand All @@ -235,7 +241,7 @@ func (e *DockerContainerEngine) ExecAsync(ctx context.Context, c *cri.Container,
DetachKeys: "",
Env: req.Env,
WorkingDir: req.WorkingDir,
Cmd: req.Cmd,
Cmd: hackedCmd,
})
if err != nil {
return invalidResult, err
Expand Down Expand Up @@ -263,14 +269,32 @@ func (e *DockerContainerEngine) ExecAsync(ctx context.Context, c *cri.Container,

respReaderDone := false
go func() {
_, err := stdcopy.StdCopy(stdoutW, stderrW, resp.Reader)
respReaderDone = true
if err != nil && err != io.EOF {
stdoutW.CloseWithError(err)
stderrW.CloseWithError(err)
if req.FixOut {
hr, hw := io.Pipe()
go func() {
// decode docker resp to stdout and stderr
// stderr is useless wo put it into Discard
stdcopy.StdCopy(hw, io.Discard, resp.Reader)
hw.Close()
}()
err := fixout.Decode(hr, stdoutW, stderrW)
respReaderDone = true
if err != nil && err != io.EOF {
stdoutW.CloseWithError(err)
stderrW.CloseWithError(err)
}
errCh <- err
io.Copy(io.Discard, hr)
} else {
_, err := stdcopy.StdCopy(stdoutW, stderrW, resp.Reader)
respReaderDone = true
if err != nil && err != io.EOF {
stdoutW.CloseWithError(err)
stderrW.CloseWithError(err)
}
errCh <- err
io.Copy(io.Discard, resp.Reader)
}
errCh <- err
io.Copy(io.Discard, resp.Reader)
}()

wait := 2
Expand Down
13 changes: 10 additions & 3 deletions pkg/cri/impl/netproxy/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package netproxy
import (
"context"
"github.com/traas-stack/holoinsight-agent/pkg/cri"
"net"
"net/http"
"time"
)
Expand All @@ -17,13 +18,19 @@ const (
)

type (
Handler func(ctx context.Context, pod *cri.Pod, req *http.Request) (*http.Request, *http.Response, error)
HttpHandler func(ctx context.Context, pod *cri.Pod, req *http.Request) (*http.Request, *http.Response, error)
TcpHandler func(ctx context.Context, i cri.Interface, c *cri.Container, addr string, dialTimeout time.Duration) (net.Conn, error)
)

var (
handlers []Handler
handlers []HttpHandler
tcpHandlers []TcpHandler
)

func AddHttpProxyHandler(handler Handler) {
func AddHttpProxyHandler(handler HttpHandler) {
handlers = append(handlers, handler)
}

func AddTcpProxyHandler(handler TcpHandler) {
tcpHandlers = append(tcpHandlers, handler)
}
2 changes: 1 addition & 1 deletion pkg/cri/impl/netproxy/port_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (t *PortForwardTask) Start(ctx context.Context) (string, error) {
func handlePortForwardRequest(logCtx zap.Option, biz *cri.Container, conn net.Conn, addr string) {
defer conn.Close()

subConn, err := criutils.TcpProxy(logger.WithLogCtx(context.Background(), logCtx), ioc.Crii, biz, addr, DefaultDialTimeout)
subConn, err := tcpProxy(logger.WithLogCtx(context.Background(), logCtx), ioc.Crii, biz, addr, DefaultDialTimeout)
if err != nil {
panic(err)
}
Expand Down
Loading

0 comments on commit 3ccbd6b

Please sign in to comment.