Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix docker exec zombies #99

Merged
merged 1 commit into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions cmd/containerhelper/handlers/countZombies.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,12 @@ func countZombiesHandler(_ string, resp *model.Resp) error {
if _, err := strconv.ParseInt(info.Name(), 10, 32); err != nil {
return nil
}
content, err := os.ReadFile(filepath.Join(path, "/status"))
content, err := os.ReadFile(filepath.Join(path, "/stat"))
if err != nil {
return nil
}
lines := strings.Split(string(content), "\n")
for _, line := range lines {
if strings.HasPrefix(line, "State:") {
if strings.Contains(line, "zombie") {
count++
}
break
}
if strings.Contains(string(content), ") Z 1 ") {
count++
}
if count > countLimit {
return filepath.SkipAll
Expand Down
1 change: 1 addition & 0 deletions pkg/cri/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ type (
Hostname string
Runtime string
NetworkMode string
PidMode string
MergedDir string
LogPath string
Mounts []*MountPoint
Expand Down
2 changes: 1 addition & 1 deletion pkg/cri/criutils/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func CopyToContainerByChunk(ctx context.Context, i cri.Interface, c *cri.Contain
})

if err != nil {
logger.Criz("[digest] copy chunk error", zap.String("cid", c.ShortContainerID()), zap.Int("accSize", accSize), zap.Error(err))
logger.Criz("[digest] copy chunk error", zap.String("cid", c.ShortID()), zap.Int("accSize", accSize), zap.Error(err))
return err
}

Expand Down
96 changes: 70 additions & 26 deletions pkg/cri/impl/default_cri.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,10 @@ func (e *defaultCri) chunkCpLoop() {
cost := time.Since(begin)

if err == nil {
logger.Metaz("[local] retry hack success", zap.String("cid", c.ShortContainerID()), zap.Duration("cost", cost))
logger.Metaz("[local] retry hack success", zap.String("cid", c.ShortID()), zap.Duration("cost", cost))
c.Hacked = cri.HackOk
} else {
logger.Metaz("[local] retry hack error", zap.String("cid", c.ShortContainerID()), zap.Duration("cost", cost), zap.Error(err))
logger.Metaz("[local] retry hack error", zap.String("cid", c.ShortID()), zap.Duration("cost", cost), zap.Error(err))
c.Hacked = cri.HackRetryError
}
}()
Expand Down Expand Up @@ -229,7 +229,7 @@ func (e *defaultCri) CopyToContainer(ctx context.Context, c *cri.Container, srcP
cost := time.Now().Sub(begin)
logger.Criz("[digest] copy to container",
zap.String("engine", e.engine.Type()),
zap.String("cid", c.ShortContainerID()),
zap.String("cid", c.ShortID()),
zap.String("runtime", c.Runtime),
zap.String("method", method),
zap.String("src", srcPath),
Expand All @@ -250,7 +250,7 @@ func (e *defaultCri) CopyFromContainer(ctx context.Context, c *cri.Container, sr
defer func() {
logger.Criz("[digest] copy from container",
zap.String("engine", e.engine.Type()),
zap.String("cid", c.ShortContainerID()),
zap.String("cid", c.ShortID()),
zap.String("runtime", c.Runtime),
zap.String("method", method),
zap.String("src", srcPath),
Expand Down Expand Up @@ -281,7 +281,7 @@ func (e *defaultCri) Exec(ctx context.Context, c *cri.Container, req cri.ExecReq

logger.Criz("[digest] exec",
zap.String("engine", e.engine.Type()),
zap.String("cid", c.ShortContainerID()),
zap.String("cid", c.ShortID()),
zap.String("runtime", c.Runtime),
zap.Strings("cmd", req.Cmd),
zap.Int("code", r.ExitCode),
Expand Down Expand Up @@ -512,13 +512,13 @@ func (e *defaultCri) checkHelperMd5(ctx context.Context, c *cri.Container) bool
}
if md5, err := criutils.Md5sum(ctx, e, c, core.HelperToolPath); err == nil {
logger.Metaz("[local] helper exists",
zap.String("cid", c.ShortContainerID()),
zap.String("cid", c.ShortID()),
zap.String("md5", md5),
zap.String("local-md5", e.helperToolLocalMd5sum),
)
if md5 == e.helperToolLocalMd5sum {
logger.Metaz("[local] already hack",
zap.String("cid", c.ShortContainerID()),
zap.String("cid", c.ShortID()),
zap.String("ns", c.Pod.Namespace),
zap.String("pod", c.Pod.Name))
return true
Expand Down Expand Up @@ -551,8 +551,9 @@ func (e *defaultCri) buildCriContainer(criPod *cri.Pod, dc *cri.EngineDetailCont
SandboxID: dc.SandboxId,
Runtime: dc.Runtime,
NetworkMode: dc.NetworkMode,
PidMode: dc.PidMode,
Hacked: cri.HackInit,
IsAlpine: cri.AlpineStatusUnknown,
ZombieCount: -1,
}

if dc.LogPath != "" {
Expand Down Expand Up @@ -611,27 +612,24 @@ func (e *defaultCri) buildCriContainer(criPod *cri.Pod, dc *cri.EngineDetailCont
logger.Metaz("[local] fail to get hostname",
zap.String("ns", criPod.Namespace), //
zap.String("pod", criPod.Name), //
zap.String("cid", criContainer.ShortContainerID()),
zap.String("cid", criContainer.ShortID()),
zap.Error(err))
}
}

// skip kube-system containers
if !strings.HasPrefix(criPod.Namespace, "kube-") {

// check alpine based container
if bs, err := criutils.ReadContainerFileUsingExecCat(ctx, e, criContainer, "/etc/alpine-release"); err == nil && len(bs) > 0 {
criContainer.IsAlpine = cri.AlpineStatusYes
logger.Metaz("find alpine based container", zap.String("cid", criContainer.ShortContainerID()))
} else {
if strings.Contains(err.Error(), "No such file or directory") {
criContainer.IsAlpine = cri.AlpineStatusNo
}
logger.Metaz("find alpine based container", zap.String("cid", criContainer.ShortID()))
}

// check pid 1 process
if bs, err := criutils.ReadContainerFileUsingExecCat(ctx, e, criContainer, "/proc/1/cmdline"); err == nil && len(bs) > 0 {
// seperated by \0
criContainer.Pid1Name = filepath.Base(string(bytes.Split(bs, []byte{0})[0]))
logger.Metaz("find pid1Name", zap.String("cid", criContainer.ShortID()), zap.String("pid1Name", criContainer.Pid1Name))
}

alreadyExists := false
Expand All @@ -645,7 +643,7 @@ func (e *defaultCri) buildCriContainer(criPod *cri.Pod, dc *cri.EngineDetailCont
copyCount++
}

criContainer.ZombiesCount, _ = criutils.CountZombies(e, ctx, criContainer)
go e.updateZombieCheck(criContainer)

if copyCount == 2 {
alreadyExists = true
Expand All @@ -654,7 +652,7 @@ func (e *defaultCri) buildCriContainer(criPod *cri.Pod, dc *cri.EngineDetailCont

if alreadyExists {
logger.Metaz("[local] hack success",
zap.String("cid", criContainer.ShortContainerID()),
zap.String("cid", criContainer.ShortID()),
zap.String("ns", criPod.Namespace),
zap.String("pod", criPod.Name),
zap.Error(err))
Expand Down Expand Up @@ -933,9 +931,7 @@ func (e *defaultCri) buildPod(pod *v1.Pod, oldState *internalState, newState *in
cached.criContainer.Pod = criPod

if forceUpdateContainerInfo {
ctx, cancel := context.WithTimeout(context.Background(), defaultOpTimeout)
cached.criContainer.ZombiesCount, _ = criutils.CountZombies(e, ctx, cached.criContainer)
cancel()
go e.updateZombieCheck(cached.criContainer)
}

newStateLock.Lock()
Expand All @@ -947,7 +943,7 @@ func (e *defaultCri) buildPod(pod *v1.Pod, oldState *internalState, newState *in

} else {
if cached != nil {
logger.Metaz("container changed", zap.String("cid", cached.criContainer.ShortContainerID()))
logger.Metaz("container changed", zap.String("cid", cached.criContainer.ShortID()))
}
changed = true
criContainer := e.buildCriContainer(criPod, container)
Expand Down Expand Up @@ -1003,7 +999,7 @@ func (e *defaultCri) ExecAsync(ctx context.Context, c *cri.Container, req cri.Ex
}
logger.Criz("[digest] exec async",
zap.String("engine", e.engine.Type()),
zap.String("cid", c.ShortContainerID()),
zap.String("cid", c.ShortID()),
zap.String("runtime", c.Runtime),
zap.Strings("cmd", req.Cmd),
zap.Strings("env", req.Env))
Expand All @@ -1013,11 +1009,15 @@ func (e *defaultCri) ExecAsync(ctx context.Context, c *cri.Container, req cri.Ex
func (e *defaultCri) ensureHelperCopied(ctx context.Context, c *cri.Container, from string, to string) (ret error) {
begin := time.Now()
hitCache := false
var fromMd5 string
var toMd5 string
defer func() {
logger.Metaz("copy helper",
zap.String("cid", c.ShortContainerID()),
zap.String("cid", c.ShortID()),
zap.String("from", from),
zap.String("to", to),
//zap.String("fromMd5", fromMd5),
//zap.String("toMd5", toMd5),
zap.Bool("hitCache", hitCache),
zap.Duration("cost", time.Since(begin)),
zap.Error(ret),
Expand All @@ -1026,7 +1026,6 @@ func (e *defaultCri) ensureHelperCopied(ctx context.Context, c *cri.Container, f

i, ok := e.localFileMd5.Load(from)

var fromMd5 string
if !ok {
if calc, err := calcMd5(from); err == nil {
fromMd5 = calc
Expand All @@ -1043,8 +1042,9 @@ func (e *defaultCri) ensureHelperCopied(ctx context.Context, c *cri.Container, f
return nil
}

if cmd5, err := criutils.Md5sum(ctx, e, c, core.HelperToolPath); err == nil {
if fromMd5 == cmd5 {
var err error
if toMd5, err = criutils.Md5sum(ctx, e, c, to); err == nil {
if fromMd5 == toMd5 {
hitCache = true
return nil
}
Expand All @@ -1053,6 +1053,50 @@ func (e *defaultCri) ensureHelperCopied(ctx context.Context, c *cri.Container, f
return e.CopyToContainer(ctx, c, from, to)
}

// This method will be executed asynchronously, and there will be a little data inconsistency problem in a short period of time, but it is not critical.
func (e *defaultCri) updateZombieCheck(c *cri.Container) {
// well-known pid 1 processes
if c.PidMode == "host" || c.Pid1Name == "systemd" || c.Pid1Name == "init" || c.Pid1Name == "tini" {
c.Pid1CanRecycleZombieProcesses = true
c.ZombieCount = 0
return
}

ctx, cancel := context.WithTimeout(context.Background(), defaultOpTimeout)
defer cancel()

if count, err := criutils.CountZombies(e, ctx, c); err == nil {
c.ZombieCount = count
if c.ZombieCount > 0 {
return
}
} else {
logger.Errorz("count zombies error", zap.String("cid", c.ShortID()), zap.Error(err))
return
}

// already checked
if c.Pid1CanRecycleZombieProcesses {
return
}

if _, err := e.Exec(ctx, c, cri.ExecRequest{Cmd: []string{core.BusyboxPath, "timeout", "1", "true"}}); err != nil {
logger.Errorz("check timeout error", zap.String("cid", c.ShortID()), zap.Error(err))
return
}

// We have to wait long enough for zombie processes to appear
// This func will be executed by a separate goroutine, so sleep does not matter.
time.Sleep(2 * time.Second)

zombieCount0 := c.ZombieCount
if count, err := criutils.CountZombies(e, ctx, c); err == nil {
c.ZombieCount = count
}

c.Pid1CanRecycleZombieProcesses = c.ZombieCount == zombieCount0
}

func calcMd5(path string) (string, error) {
if file, err := os.Open(path); err == nil {
defer file.Close()
Expand Down
2 changes: 1 addition & 1 deletion pkg/cri/impl/default_cri_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (e *defaultCri) registerHttpHandlers() {
continue
}
if container.Hacked == 1 || container.Hacked == 5 {
ret = append(ret, container.ShortContainerID())
ret = append(ret, container.ShortID())
}
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/cri/impl/engine/containerd_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ func (e *ContainerdContainerEngine) GetContainerDetail(ctx context.Context, cid

if detail.IsSandbox {
detail.NetworkMode = "netns:" + sandboxMeta.NetNSPath
// TODO detail.PidMode
}

// I don't know how to get containerd's state dir.
Expand Down
1 change: 1 addition & 0 deletions pkg/cri/impl/engine/docker_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (e *DockerContainerEngine) GetContainerDetail(ctx context.Context, cid stri
Hostname: i.Config.Hostname,
Runtime: i.HostConfig.Runtime,
NetworkMode: string(i.HostConfig.NetworkMode),
PidMode: string(i.HostConfig.PidMode),
MergedDir: "",
Mounts: nil,
State: cri.ContainerState{
Expand Down
12 changes: 9 additions & 3 deletions pkg/cri/impl/engine/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ package engine

import (
"github.com/spf13/cast"
"github.com/traas-stack/holoinsight-agent/pkg/core"
"github.com/traas-stack/holoinsight-agent/pkg/cri"
"os"
"time"
)

var (
Expand All @@ -23,14 +25,18 @@ func init() {

// wrapTimeout wraps cmd with timeout -s KILL <seconds> to prevent the process from hanging and not exiting for any reason.
func wrapTimeout(c *cri.Container, cmd []string) []string {
// TODO Different busybox versions have different timeout command formats
// TODO In alpined based container, timeout will generate zombie processes
// Note:
// Different busybox versions have different timeout command formats
// In alpined based container, timeout will generate zombie processes
// timeout -s KILL <seconds> cmd...
// return append([]string{"timeout", "-s", "KILL", timeout}, cmd...)
if c.Pid1CanRecycleZombieProcesses {
return append([]string{core.BusyboxPath, "timeout", "-s", "KILL", timeout}, cmd...)
}
return cmd
}

// wrapEnv wraps envs with _FROM=holoinsight-agent. This env is used to mark the source of the call.
func wrapEnv(envs []string) []string {
return append(envs, "_FROM=holoinsight-agent")
return append(envs, "_FROM=holoinsight-agent", "_TS="+cast.ToString(time.Now().UnixMilli()))
}
2 changes: 1 addition & 1 deletion pkg/cri/impl/metastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func newInternalState() *internalState {

func (s *internalState) build() {
for _, c := range s.containerMap {
s.shortCidContainerMap[c.criContainer.ShortContainerID()] = c
s.shortCidContainerMap[c.criContainer.ShortID()] = c
}
for _, pod := range s.pods {
if pod.IsRunning() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cri/impl/netproxy/port_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (t *PortForwardTask) Start(ctx context.Context) (string, error) {
stopCh <- struct{}{}
}
}()
logCtx := zap.Fields(zap.String("uuid", uuid.New().String()), zap.String("cid", biz.ShortContainerID()), zap.String("listenAddr", listener.Addr().String()), zap.String("toAddr", t.Addr))
logCtx := zap.Fields(zap.String("uuid", uuid.New().String()), zap.String("cid", biz.ShortID()), zap.String("listenAddr", listener.Addr().String()), zap.String("toAddr", t.Addr))
logger.Infozo(logCtx, "[netproxy] create port forward")

go func() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cri/impl/netproxy/proxy_socks5.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (h *CriHandle) tcpHandle(s *socks5.Server, c *net.TCPConn, r *socks5.Reques

uuid2 := uuid.New().String()

logCtx := zap.Fields(zap.String("uuid", uuid2), zap.String("protocol", "socks5"), zap.String("cid", biz.ShortContainerID()), zap.String("addr", addr))
logCtx := zap.Fields(zap.String("uuid", uuid2), zap.String("protocol", "socks5"), zap.String("cid", biz.ShortID()), zap.String("addr", addr))

proxied, err := TcpProxy(logger.WithLogCtx(context.Background(), logCtx), h.Cri, biz, addr, DefaultDialTimeout)
if err != nil {
Expand Down
18 changes: 10 additions & 8 deletions pkg/cri/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,21 +109,23 @@ type (
// NetworkMode
NetworkMode string

// If PidMode is "host", it means that the container uses the pid namespace of the physical machine.
PidMode string

// docker json log: https://docs.docker.com/config/containers/logging/json-file/
LogPath string

// Attributes can be used to prevent arbitrary extension fields
Attributes sync.Map

// name of pid 1
// tini systemd java python
Pid1Name string
// The number of zombie processes inside the container
ZombieCount int

// Is this container based on alpine?
IsAlpine AlpineStatus
// pid 1 process name
Pid1Name string

// The number of zombie processes inside the container
ZombiesCount int
// Whether pid 1 has the ability to recycle zombie processes
Pid1CanRecycleZombieProcesses bool
}
ContainerState struct {
Pid int
Expand Down Expand Up @@ -211,7 +213,7 @@ func (c *Container) IsRunning() bool {
return c.State.Pid > 0 && c.State.Status == "running"
}

func (c *Container) ShortContainerID() string {
func (c *Container) ShortID() string {
return ShortContainerId(c.Id)
}

Expand Down
Loading