Skip to content

Commit 5c56bb3

Browse files
Add initial process tree support
The process tree gives you context about running processes at a given time. They are used by the cast backend to provide more context to the user.
1 parent f2c4401 commit 5c56bb3

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+2427
-713
lines changed

Tiltfile

+3
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ local_resource(
2626
'./pkg/cgroup',
2727
'./pkg/containers',
2828
'./pkg/bucketcache',
29+
'./pkg/processtree',
30+
'./pkg/proc',
31+
'./pkg/system',
2932
],
3033
)
3134

api/v1/runtime/runtime_agent_api.pb.go

+590-229
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/v1/runtime/runtime_agent_api.proto

+30
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ service RuntimeSecurityAgentAPI {
1414
rpc LogsWriteStream(stream LogEvent) returns (WriteStreamResponse) {}
1515
rpc ContainerStatsWriteStream(stream ContainerStatsBatch) returns (WriteStreamResponse) {}
1616
rpc NetflowWriteStream(stream Netflow) returns (WriteStreamResponse) {}
17+
rpc ProcessEventsWriteStream(stream ProcessTreeEvent) returns (WriteStreamResponse) {}
1718

1819
rpc GetSyncState(GetSyncStateRequest) returns (GetSyncStateResponse) {}
1920
rpc UpdateSyncState(UpdateSyncStateRequest) returns (UpdateSyncStateResponse) {}
@@ -359,3 +360,32 @@ message KubeLinterCheck {
359360
uint64 passed = 2; // Represented as bitmasks of passed checks.
360361
uint64 failed = 3; // Represented as bitmasks of failed checks.
361362
}
363+
364+
message Process {
365+
uint32 pid = 1;
366+
uint32 ppid = 2;
367+
uint64 start_time = 3;
368+
uint64 parent_start_time = 4;
369+
repeated string args = 5;
370+
string filepath = 6;
371+
uint64 exit_time = 7;
372+
}
373+
374+
enum ProcessAction {
375+
PROCESS_ACTION_UNKNOWN = 0;
376+
PROCESS_ACTION_EXEC = 1;
377+
PROCESS_ACTION_FORK = 2;
378+
PROCESS_ACTION_EXIT = 3;
379+
}
380+
381+
message ProcessEvent {
382+
uint64 timestamp = 1; // Stored as unix timestamp in nanoseconds.
383+
string container_id = 2;
384+
Process process = 3;
385+
ProcessAction action = 4;
386+
}
387+
388+
message ProcessTreeEvent {
389+
bool initial = 1;
390+
repeated ProcessEvent events = 2;
391+
}

api/v1/runtime/runtime_agent_api_grpc.pb.go

+74-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

charts/kvisor/values-local.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ agent:
2525
netflow-sample-submit-interval-seconds: 1
2626
netflow-export-interval: 5s
2727
ebpf-events-stdio-exporter-enabled: false
28+
process-tree-enabled: true
2829

2930
prometheusScrape:
3031
enabled: true
@@ -51,6 +52,8 @@ controller:
5152

5253
containerSecurityContext:
5354
readOnlyRootFilesystem: false
55+
securityContext:
56+
runAsNonRoot: false
5457

5558
prometheusScrape:
5659
enabled: true

cmd/agent/daemon/app/app.go

+32-4
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/castai/kvisor/pkg/ebpftracer/types"
2929
"github.com/castai/kvisor/pkg/logging"
3030
"github.com/castai/kvisor/pkg/proc"
31+
"github.com/castai/kvisor/pkg/processtree"
3132
"github.com/go-playground/validator/v10"
3233
"github.com/grafana/pyroscope-go"
3334
"github.com/prometheus/client_golang/prometheus/promhttp"
@@ -59,6 +60,7 @@ type Config struct {
5960
Castai castai.Config
6061
EnricherConfig EnricherConfig
6162
Netflow NetflowConfig
63+
ProcessTree ProcessTreeConfig
6264
Clickhouse ClickhouseConfig
6365
KubeAPIServiceAddr string
6466
ExportersQueueSize int `validate:"required"`
@@ -133,6 +135,10 @@ type ClickhouseConfig struct {
133135
Password string
134136
}
135137

138+
type ProcessTreeConfig struct {
139+
Enabled bool
140+
}
141+
136142
func New(cfg *Config) *App {
137143
if err := validator.New().Struct(cfg); err != nil {
138144
panic(fmt.Errorf("invalid config: %w", err).Error())
@@ -187,6 +193,10 @@ func (a *App) Run(ctx context.Context) error {
187193
if cfg.Netflow.Enabled {
188194
exporters.Netflow = append(exporters.Netflow, state.NewCastaiNetflowExporter(log, castaiClient, a.cfg.ExportersQueueSize))
189195
}
196+
if cfg.ProcessTree.Enabled {
197+
exporter := state.NewCastaiProcessTreeExporter(log, castaiClient, a.cfg.ExportersQueueSize)
198+
exporters.ProcessTree = append(exporters.ProcessTree, exporter)
199+
}
190200
} else {
191201
log = logging.New(logCfg)
192202
exporters = state.NewExporters(log)
@@ -224,6 +234,11 @@ func (a *App) Run(ctx context.Context) error {
224234
clickhouseNetflowExporter := state.NewClickhouseNetflowExporter(log, storageConn, a.cfg.ExportersQueueSize)
225235
exporters.Netflow = append(exporters.Netflow, clickhouseNetflowExporter)
226236
}
237+
238+
if cfg.ProcessTree.Enabled {
239+
exporter := state.NewClickhouseProcessTreeExporter(log, storageConn, a.cfg.ExportersQueueSize)
240+
exporters.ProcessTree = append(exporters.ProcessTree, exporter)
241+
}
227242
}
228243

229244
if cfg.EBPFEventsStdioExporterEnabled {
@@ -245,10 +260,19 @@ func (a *App) Run(ctx context.Context) error {
245260
if err != nil {
246261
return err
247262
}
248-
containersClient, err := containers.NewClient(log, cgroupClient, a.cfg.ContainerdSockPath)
263+
procHandler := proc.New()
264+
containersClient, err := containers.NewClient(log, cgroupClient, a.cfg.ContainerdSockPath, procHandler)
249265
if err != nil {
250266
return err
251267
}
268+
processTreeCollector, err := processtree.New(log, procHandler, containersClient)
269+
if err != nil {
270+
return fmt.Errorf("process tree: %w", err)
271+
}
272+
err = processTreeCollector.Init(ctx)
273+
if err != nil {
274+
return fmt.Errorf("process tree: %w", err)
275+
}
252276
ct, err := conntrack.NewClient(log)
253277
if err != nil {
254278
return fmt.Errorf("conntrack: %w", err)
@@ -262,7 +286,6 @@ func (a *App) Run(ctx context.Context) error {
262286

263287
signatureEngine := signature.NewEngine(activeSignatures, log, a.cfg.SignatureEngineConfig)
264288

265-
procHandler := proc.New()
266289
mountNamespacePIDStore, err := getInitializedMountNamespaceStore(procHandler)
267290
if err != nil {
268291
return fmt.Errorf("mount namespace PID store: %w", err)
@@ -292,6 +315,7 @@ func (a *App) Run(ctx context.Context) error {
292315
NetflowSampleSubmitIntervalSeconds: a.cfg.Netflow.SampleSubmitIntervalSeconds,
293316
NetflowGrouping: a.cfg.Netflow.Grouping,
294317
TrackSyscallStats: cfg.ContainerStatsEnabled,
318+
ProcessTreeCollector: processTreeCollector,
295319
})
296320
if err := tracer.Load(); err != nil {
297321
return fmt.Errorf("loading tracer: %w", err)
@@ -300,8 +324,11 @@ func (a *App) Run(ctx context.Context) error {
300324

301325
policy := &ebpftracer.Policy{
302326
SystemEvents: []events.ID{
303-
events.SignalCgroupMkdir,
304-
events.SignalCgroupRmdir,
327+
events.CgroupMkdir,
328+
events.CgroupRmdir,
329+
events.SchedProcessExec,
330+
events.SchedProcessExit,
331+
events.SchedProcessFork,
305332
},
306333
Events: []*ebpftracer.EventPolicy{},
307334
}
@@ -366,6 +393,7 @@ func (a *App) Run(ctx context.Context) error {
366393
signatureEngine,
367394
enrichmentService,
368395
kubeAPIServerClient,
396+
processTreeCollector,
369397
)
370398

371399
errg, ctx := errgroup.WithContext(ctx)

cmd/agent/daemon/clickouse_init.go

+3
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ func NewClickhouseInitCommand() *cobra.Command {
5252
if err := conn.Exec(ctx, state.ClickhouseNetflowSchema()); err != nil {
5353
return fmt.Errorf("creating clickhouse netflow schema: %w", err)
5454
}
55+
if err := conn.Exec(ctx, state.ClickhouseProcessTreeSchema()); err != nil {
56+
return fmt.Errorf("creating clickhouse process tree schema: %w", err)
57+
}
5558

5659
return nil
5760
}

cmd/agent/daemon/daemon.go

+5
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ func NewRunCommand(version string) *cobra.Command {
7373
netflowExportInterval = pflag.Duration("netflow-export-interval", 15*time.Second, "Netflow export interval")
7474
netflowGrouping = ebpftracer.NetflowGroupingDropSrcPort
7575

76+
processTreeEnabled = pflag.Bool("process-tree-enabled", false, "Enables process tree tracking")
77+
7678
clickhouseAddr = pflag.String("clickhouse-addr", "", "Clickhouse address to send events to")
7779
clickhouseDatabase = pflag.String("clickhouse-database", "", "Clickhouse database name")
7880
clickhouseUsername = pflag.String("clickhouse-username", "", "Clickhouse username")
@@ -160,6 +162,9 @@ func NewRunCommand(version string) *cobra.Command {
160162
Username: *clickhouseUsername,
161163
Password: os.Getenv("CLICKHOUSE_PASSWORD"),
162164
},
165+
ProcessTree: app.ProcessTreeConfig{
166+
Enabled: *processTreeEnabled,
167+
},
163168
KubeAPIServiceAddr: *kubeAPIServiceAddr,
164169
ExportersQueueSize: *exportersQueueSize,
165170
}).Run(ctx); err != nil && !errors.Is(err, context.Canceled) {

cmd/agent/daemon/state/castai_events_exporter.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func (c *CastaiEventsExporter) Run(ctx context.Context) error {
4343
defer ws.Close()
4444
ws.ReopenDelay = c.writeStreamCreateRetryDelay
4545

46-
sender := &sender{
46+
sender := &eventSender{
4747
ws: ws,
4848
retryQueue: c.retryQueue,
4949
sendMetric: metrics.AgentExporterSendTotal.WithLabelValues("castai_events"),
@@ -62,15 +62,15 @@ func (c *CastaiEventsExporter) Run(ctx context.Context) error {
6262
}
6363
}
6464

65-
type sender struct {
65+
type eventSender struct {
6666
ws *castai.WriteStream[*castpb.Event, *castpb.WriteStreamResponse]
6767
retryQueue chan *castpb.Event
6868

6969
sendMetric prometheus.Counter
7070
sendErrorMetric prometheus.Counter
7171
}
7272

73-
func (s *sender) send(e *castpb.Event, retry bool) {
73+
func (s *eventSender) send(e *castpb.Event, retry bool) {
7474
if err := s.ws.Send(e); err != nil {
7575
if retry {
7676
s.retryQueue <- e

0 commit comments

Comments
 (0)