Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agent/cmd/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var RelayCommand = &cobra.Command{
Short: "Allows relaying calls from Cortex to the local environment",
Run: func(cmd *cobra.Command, args []string) {

config := config.NewAgentEnvConfig()
config := config.NewAgentEnvConfig().ApplyFlags(cmd.Flags())
if config.CortexApiToken == "" {
fmt.Println("Cortex API token (CORTEX_API_TOKEN) must be provided")
os.Exit(1)
Expand Down
2 changes: 2 additions & 0 deletions agent/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ func init() {
rootCmd.AddCommand(initCmd)
rootCmd.AddCommand(handlersRootCmd)
rootCmd.AddCommand(RelayCommand)

rootCmd.Flags().BoolP("verbose", "v", false, "Verbose mode")
}

func Execute() {
Expand Down
3 changes: 1 addition & 2 deletions agent/cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var serveCmd = &cobra.Command{
os.Setenv("DRYRUN", "true")
}

config := config.NewAgentEnvConfig()
config := config.NewAgentEnvConfig().ApplyFlags(cmd.Flags())

if id, _ := cmd.Flags().GetString("alias"); id != "" {
config.IntegrationAlias = id
Expand All @@ -55,6 +55,5 @@ var serveCmd = &cobra.Command{

func init() {
serveCmd.Flags().Bool("dry-run", false, "Dry run mode")
serveCmd.Flags().BoolP("verbose", "v", false, "Verbose mode")
serveCmd.Flags().StringP("alias", "a", "customer-agent", "Alias (identifier) for this agent type")
}
15 changes: 6 additions & 9 deletions agent/cmd/stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,8 @@ func startAgent(opts fx.Option) {
}

var AgentModule = fx.Module("agent",
fx.Provide(cortexHttp.NewPrometheusRegistry),
fx.Provide(createHttpTransport),
fx.Provide(createHttpClient),
fx.Provide(cortexHttp.NewAxonHandler),
fx.Provide(server.NewMainHttpServer),
fx.Provide(func(config config.AgentConfig) *zap.Logger {

if config.VerboseOutput {
return zap.NewNop()
}

cfg := zap.NewDevelopmentConfig()

loggingLevel := zap.InfoLevel
Expand All @@ -52,6 +43,12 @@ var AgentModule = fx.Module("agent",
}
return logger
}),
fx.Provide(cortexHttp.NewPrometheusRegistry),
fx.Provide(createHttpTransport),
fx.Provide(createHttpClient),
fx.Provide(cortexHttp.NewAxonHandler),
fx.Provide(server.NewMainHttpServer),

fx.Invoke(func(config config.AgentConfig, logger *zap.Logger) {
if config.CortexApiToken == "" && !config.DryRun {
logger.Fatal("Cannot start agent: either CORTEX_API_TOKEN or DRYRUN is required")
Expand Down
8 changes: 8 additions & 0 deletions agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/spf13/pflag"
)

const DefaultGrpcPort = 50051
Expand Down Expand Up @@ -261,3 +262,10 @@ func NewAgentEnvConfig() AgentConfig {

return cfg
}

func (ac AgentConfig) ApplyFlags(flags *pflag.FlagSet) AgentConfig {
if enabled, _ := flags.GetBool("verbose"); enabled {
ac.VerboseOutput = true
}
return ac
}
2 changes: 1 addition & 1 deletion agent/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/prometheus/client_golang v1.22.0
github.com/robfig/cron/v3 v3.0.0
github.com/spf13/cobra v1.8.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.10.0
go.uber.org/fx v1.23.0
go.uber.org/mock v0.5.0
Expand All @@ -26,7 +27,6 @@ require (
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.62.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
go.uber.org/dig v1.18.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.33.0 // indirect
Expand Down
34 changes: 32 additions & 2 deletions agent/server/http/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package http

import (
"bufio"
"bytes"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -119,6 +120,7 @@ func NewHttpServer(p HttpServerParams, opts ...ServerOption) Server {
},
[]string{"method", "path", "status"},
),
config: p.Config,
}

server.mux.Use(server.requestMiddleware)
Expand Down Expand Up @@ -162,6 +164,7 @@ type httpServer struct {
mux *mux.Router
requestCounter *prometheus.CounterVec
requestLatency *prometheus.HistogramVec
config config.AgentConfig
}

func (h *httpServer) RegisterHandler(handler RegisterableHandler) {
Expand Down Expand Up @@ -195,11 +198,34 @@ func (h *httpServer) requestMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
rec := &responseRecorder{ResponseWriter: w, statusCode: http.StatusOK}

fields := []zap.Field{
zap.String("method", r.Method),
zap.String("path", r.URL.Path),
zap.Int("content-length", int(r.ContentLength)),
}

if h.config.VerboseOutput && r.ContentLength > 0 && r.Body != nil {

br := bufio.NewReader(r.Body)
bodyBytes, err := io.ReadAll(br)
if err != nil {

h.logger.Error("Failed to read request body", zap.Error(err))
return
}
body := string(bodyBytes)
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
fields = append(fields, zap.String("body", body))
}
h.logger.Debug("HTTP request ==>",
fields...,
)
next.ServeHTTP(rec, r)
duration := time.Since(start)
h.requestCounter.WithLabelValues(r.Method, r.URL.Path, fmt.Sprintf("%d", rec.statusCode)).Inc()
h.requestLatency.WithLabelValues(r.Method, r.URL.Path, fmt.Sprintf("%d", rec.statusCode)).Observe(duration.Seconds())
h.logger.Info("HTTP incoming request",
h.logger.Info("<== HTTP request",
zap.String("method", r.Method),
zap.String("path", r.URL.Path),
zap.Int("status", rec.statusCode),
Expand All @@ -209,6 +235,9 @@ func (h *httpServer) requestMiddleware(next http.Handler) http.Handler {
)
})
}

var defaultReadTimeout = time.Second

func (h *httpServer) Start() (int, error) {

if h.server != nil {
Expand All @@ -222,7 +251,8 @@ func (h *httpServer) Start() (int, error) {
go func() {

h.server = &http.Server{
Handler: h.mux,
Handler: h.mux,
ReadTimeout: defaultReadTimeout,
}

err := h.server.Serve(ln)
Expand Down
1 change: 1 addition & 0 deletions agent/server/http/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func createWebhookHttpServer(lifecycle fx.Lifecycle, config config.AgentConfig,
Handlers: []RegisterableHandler{
NewWebhookHandler(config, logger, handlerManager, registry),
},
Config: config,
}
httpServer := NewHttpServer(params, WithName("webhook"), WithPort(config.WebhookServerPort))

Expand Down
1 change: 1 addition & 0 deletions agent/server/main_http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func NewMainHttpServer(p MainHttpServerParams) cortexHttp.Server {
Logger: p.Logger,
Registry: p.Registry,
Handlers: []cortexHttp.RegisterableHandler{},
Config: p.Config,
}

config := p.Config
Expand Down
1 change: 1 addition & 0 deletions agent/server/snykbroker/reflector.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func NewRegistrationReflector(p RegistrationReflectorParams) *RegistrationReflec

httpParams := cortexHttp.HttpServerParams{
Logger: p.Logger.Named("relay-reflector"),
Config: p.Config,
}

if p.Registry != nil {
Expand Down
Loading