Skip to content

Commit

Permalink
feat: pktmon client recreation + remove ltsc2019 in azp (#562)
Browse files Browse the repository at this point in the history
# Description
Occasionally the grpc stream experiences transient errors, which are
mitigated by recreating the client observer stream. Requires further
RCA, but this avoids crashloopbackoff, and gracefully logs to continue

## Related Issue

If this pull request is related to any issue, please mention it here.
Additionally, make sure that the issue is assigned to you before
submitting this pull request.

## Checklist

- [ ] I have read the [contributing
documentation](https://retina.sh/docs/contributing).
- [ ] I signed and signed-off the commits (`git commit -S -s ...`). See
[this
documentation](https://docs.github.com/en/authentication/managing-commit-signature-verification/about-commit-signature-verification)
on signing commits.
- [ ] I have correctly attributed the author(s) of the code.
- [ ] I have tested the changes locally.
- [ ] I have followed the project's style guidelines.
- [ ] I have updated the documentation, if necessary.
- [ ] I have added tests, if applicable.

## Screenshots (if applicable) or Testing Completed

Please add any relevant screenshots or GIFs to showcase the changes
made.

## Additional Notes

Add any additional notes or context about the pull request here.

---

Please refer to the [CONTRIBUTING.md](../CONTRIBUTING.md) file for more
information on how to contribute to this project.
  • Loading branch information
matmerr authored Aug 1, 2024
1 parent e62bae2 commit 792cb27
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 105 deletions.
60 changes: 22 additions & 38 deletions .pipelines/cg-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -135,39 +135,6 @@ stages:
pathtoPublish: "$(Build.ArtifactStagingDirectory)"
condition: succeeded()

# windows 2019 only in buildx, windows server 2022 requires native windows container build because of cgo
- job: retinaagentimageswin2019
displayName: Build Retina Windows Images (buildx)
pool:
name: "$(BUILD_POOL_NAME_DEFAULT)"
strategy:
matrix:
windows-ltsc2019:
platform: "windows"
arch: "amd64"
year: "2019"

steps:
- checkout: self
fetchTags: true
- script: |
set -euo pipefail
echo "VERSION=$(make version)"
export VERSION=$(make version)
mkdir -p ./output/images/$(platform)/$(arch)/$(year)
make retina-image-win \
TARGET=final \
WINDOWS_YEARS=$(year) \
TAG=$(make version) \
BUILDX_ACTION="-o type=docker,dest=./output/images/$(platform)/$(arch)/$(year)/retina-agent-$VERSION-windows-ltsc$(year)-$(arch).tar"
displayName: "Build Retina Windows Image"
- task: PublishBuildArtifacts@1
inputs:
artifactName: output
pathtoPublish: ./output
condition: succeeded()

- job: windowsnative
displayName: Build Retina Windows Images (native)
pool:
Expand All @@ -176,12 +143,29 @@ stages:
- checkout: self
fetchTags: true

- task: Docker@2
displayName: Docker Login


- task: DownloadPipelineArtifact@2
inputs:
containerRegistry: $(WINDOWS_BUILDER_REGISTRY)
command: "login"
addPipelineData: false
buildType: 'specific'
project: $(BUILDER_ADO_PROECT)
definition: $(BUILDER_ADO_DEFINITION_ID) # Replace with your build definition ID
buildId: $(BUILDER_ADO_BUILD_ID)
artifactName: $(BUILDER_ADO_ARTIFACTE_NAME) # Replace with your artifact name
itemPattern: '**/*builder*.tar'
downloadPath: '$(Pipeline.Workspace)\artifacts'

- task: PowerShell@2
displayName: "Load Builder Image"
inputs:
targetType: "inline"
script: |
$rootDir = "$(Pipeline.Workspace)\artifacts"
$dockerImages = Get-ChildItem -Path $rootDir -Recurse -Filter *.tar
foreach ($image in $dockerImages) {
Write-Host "Loading Docker image: $($image.FullName)"
docker load -i $image.FullName
}
- task: PowerShell@2
displayName: "Build Retina Windows Image (LTSC2022)"
Expand Down
3 changes: 1 addition & 2 deletions controller/Dockerfile.windows-native
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ FROM --platform=windows/amd64 mcr.microsoft.com/oss/go/microsoft/golang:1.22-wi
WORKDIR C:\\retina
COPY go.mod .
COPY go.sum .
ENV CGO_ENABLED=1
ENV CGO_ENABLED=0
RUN go mod download
RUN go mod verify
ADD . .
ARG VERSION
ARG APP_INSIGHTS_ID
Expand Down
1 change: 1 addition & 0 deletions controller/Dockerfile.windows-native.dockerignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pkg/plugin/windows/pktmon/packetmonitorsupport/*
*.tar

154 changes: 89 additions & 65 deletions pkg/plugin/windows/pktmon/pktmon_plugin_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"os"
"os/exec"

"github.com/cilium/cilium/api/v1/flow"
observerv1 "github.com/cilium/cilium/api/v1/observer"
v1 "github.com/cilium/cilium/pkg/hubble/api/v1"
kcfg "github.com/microsoft/retina/pkg/config"
Expand All @@ -21,18 +20,21 @@ import (
"go.uber.org/zap/zapio"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
)

var (
ErrNilEnricher = errors.New("enricher is nil")
ErrUnexpectedExit = errors.New("unexpected exit")
ErrNilGrpcClient = errors.New("grpc client is nil")

socket = "/temp/retina-pktmon.sock"
)

const (
Name = "pktmon"
connectionRetryAttempts = 3
connectionRetryAttempts = 5
eventChannelSize = 1000
)

type Plugin struct {
Expand All @@ -42,6 +44,9 @@ type Plugin struct {
pktmonCmd *exec.Cmd
stdWriter *zapio.Writer
errWriter *zapio.Writer

grpcClient *GRPCClient
stream observerv1.Observer_GetFlowsClient
}

func (p *Plugin) Init() error {
Expand All @@ -52,11 +57,11 @@ func (p *Plugin) Name() string {
return "pktmon"
}

type Client struct {
type GRPCClient struct {
observerv1.ObserverClient
}

func NewClient() (*Client, error) {
func newGRPCClient() (*GRPCClient, error) {
retryPolicy := map[string]any{
"methodConfig": []map[string]any{
{
Expand Down Expand Up @@ -84,15 +89,15 @@ func NewClient() (*Client, error) {
return nil, fmt.Errorf("failed to dial pktmon server: %w", err)
}

return &Client{observerv1.NewObserverClient(conn)}, nil
return &GRPCClient{observerv1.NewObserverClient(conn)}, nil
}

func (p *Plugin) RunPktMonServer() error {
func (p *Plugin) RunPktMonServer(ctx context.Context) error {
p.stdWriter = &zapio.Writer{Log: p.l.Logger, Level: zap.InfoLevel}
defer p.stdWriter.Close()
p.errWriter = &zapio.Writer{Log: p.l.Logger, Level: zap.ErrorLevel}
defer p.errWriter.Close()
p.pktmonCmd = exec.Command("controller-pktmon.exe")
p.pktmonCmd = exec.CommandContext(ctx, "controller-pktmon.exe")
p.pktmonCmd.Args = append(p.pktmonCmd.Args, "--socketpath", socket)
p.pktmonCmd.Env = os.Environ()
p.pktmonCmd.Stdout = p.stdWriter
Expand All @@ -111,103 +116,122 @@ func (p *Plugin) RunPktMonServer() error {
}

func (p *Plugin) Start(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

p.enricher = enricher.Instance()
if p.enricher == nil {
return ErrNilEnricher
}

go func() {
err := p.RunPktMonServer()
err := p.RunPktMonServer(ctx)
if err != nil {
p.l.Error("failed to run pktmon server", zap.Error(err))
p.l.Error("pktmon server exited", zap.Error(err))
}

// if the pktmon server process exits, cancel the context, we need to crash
cancel()
}()

var str observerv1.Observer_GetFlowsClient
err := p.SetupStream()
if err != nil {
return fmt.Errorf("failed to setup initial pktmon stream: %w", err)
}

// run the getflows loop
for {
err := p.GetFlow(ctx)
if _, ok := status.FromError(err); ok {
p.l.Error("failed to get flow, retriable:", zap.Error(err))
continue
}
return fmt.Errorf("failed to get flow, unrecoverable: %w", err)
}
}

func (p *Plugin) SetupStream() error {
var err error
fn := func() error {
p.l.Info("creating pktmon client")
client, err := NewClient()
p.grpcClient, err = newGRPCClient()
if err != nil {
return fmt.Errorf("failed to create pktmon client before getting flows: %w", err)
}

str, err = client.GetFlows(ctx, &observerv1.GetFlowsRequest{})
return nil
}
err = utils.Retry(fn, connectionRetryAttempts)
if err != nil {
return fmt.Errorf("failed to create pktmon client: %w", err)
}

return nil
}

func (p *Plugin) StartStream(ctx context.Context) error {
if p.grpcClient == nil {
return fmt.Errorf("unable to start stream: %w", ErrNilGrpcClient)
}

var err error
fn := func() error {
p.stream, err = p.grpcClient.GetFlows(ctx, &observerv1.GetFlowsRequest{})
if err != nil {
return fmt.Errorf("failed to open pktmon stream: %w", err)
}
return nil
}
err := utils.Retry(fn, connectionRetryAttempts)
err = utils.Retry(fn, connectionRetryAttempts)
if err != nil {
return fmt.Errorf("failed to create pktmon client: %w", err)
}

return nil
}

func (p *Plugin) GetFlow(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

err := p.StartStream(ctx)
if err != nil {
return fmt.Errorf("failed to setup pktmon stream: %w", err)
}

for {
select {
case <-ctx.Done():
return fmt.Errorf("pktmon context cancelled: %w", ctx.Err())
return fmt.Errorf("pktmon plugin context done: %w", ctx.Err())
default:
err := p.GetFlow(str)
event, err := p.stream.Recv()
if err != nil {
return fmt.Errorf("failed to get flow from observer: %w", err)
return fmt.Errorf("failed to receive pktmon event: %w", err)
}
}
}
}

func (p *Plugin) GetFlow(str observerv1.Observer_GetFlowsClient) error {
event, err := str.Recv()
if err != nil {
return fmt.Errorf("failed to receive pktmon event: %w", err)
}

fl := event.GetFlow()
if fl == nil {
p.l.Error("received nil flow, flow proto mismatch from client/server?")
return nil
}
fl := event.GetFlow()
if fl == nil {
p.l.Error("received nil flow, flow proto mismatch from client/server?")
return nil
}

ev := &v1.Event{
Event: fl,
Timestamp: fl.GetTime(),
}
ev := &v1.Event{
Event: fl,
Timestamp: fl.GetTime(),
}

if fl.GetType() == flow.FlowType_L7 {
dns := fl.GetL7().GetDns()
if dns != nil {
query := dns.GetQuery()
ans := dns.GetIps()
if dns.GetQtypes()[0] == "Q" {
p.l.Sugar().Debugf("query from %s to %s: request %s\n", fl.GetIP().GetSource(), fl.GetIP().GetDestination(), query)
if p.enricher != nil {
p.enricher.Write(ev)
} else {
p.l.Sugar().Debugf("answer from %s to %s: result: %+v\n", fl.GetIP().GetSource(), fl.GetIP().GetDestination(), ans)
p.l.Error("enricher is nil when writing event")
}
}
}

if p.enricher != nil {
p.enricher.Write(ev)
} else {
p.l.Error("enricher is nil when writing event")
}

// Write the event to the external channel.
if p.externalChannel != nil {
select {
case p.externalChannel <- ev:
default:
// Channel is full, drop the event.
// We shouldn't slow down the reader.
metrics.LostEventsCounter.WithLabelValues(utils.ExternalChannel, string(Name)).Inc()
// Write the event to the external channel.
if p.externalChannel != nil {
select {
case p.externalChannel <- ev:
default:
// Channel is full, drop the event.
// We shouldn't slow down the reader.
metrics.LostEventsCounter.WithLabelValues(utils.ExternalChannel, string(Name)).Inc()
}
}
}
}
return nil
}

func (p *Plugin) SetupChannel(ch chan *v1.Event) error {
Expand Down

0 comments on commit 792cb27

Please sign in to comment.