Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix gRPC logger to log structured fields #141

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ func newHealthHandler(checkFunc func() bool) http.HandlerFunc {
}

// bindEnv is a helper function that binds an environment variable to a key and handles errors.
func bindEnv(logger zap.SugaredLogger, key, envVar string) {
func bindEnv(logger *zap.Logger, key, envVar string) {
if err := viper.BindEnv(key, envVar); err != nil {
logger.Errorw("Error binding environment variable",
"error", errors.New("error binding environment variable"),
"variable", envVar,
logger.Error("Error binding environment variable",
zap.Error(errors.New("error binding environment variable")),
zap.String("variable", envVar),
)
}
}
Expand All @@ -58,19 +58,19 @@ func main() {
// Create a buffered grpc write syncer without a valid gRPC connection initially
// Using nil for the `pb.KubernetesInfoService_KubernetesLogsClient`.
bufferedGrpcSyncer := controller.NewBufferedGrpcWriteSyncer()
logger := controller.NewGRPClogger(bufferedGrpcSyncer)
logger := controller.NewProductionGRPCLogger(bufferedGrpcSyncer)
defer logger.Sync() //nolint:errcheck

viper.AutomaticEnv()

// Bind specific environment variables to keys
bindEnv(*logger, "cluster_creds", "CLUSTER_CREDS_SECRET")
bindEnv(*logger, "cilium_namespace", "CILIUM_NAMESPACE")
bindEnv(*logger, "onboarding_client_id", "ONBOARDING_CLIENT_ID")
bindEnv(*logger, "onboarding_client_secret", "ONBOARDING_CLIENT_SECRET")
bindEnv(*logger, "onboarding_endpoint", "ONBOARDING_ENDPOINT")
bindEnv(*logger, "token_endpoint", "TOKEN_ENDPOINT")
bindEnv(*logger, "tls_skip_verify", "TLS_SKIP_VERIFY")
bindEnv(logger, "cluster_creds", "CLUSTER_CREDS_SECRET")
bindEnv(logger, "cilium_namespace", "CILIUM_NAMESPACE")
bindEnv(logger, "onboarding_client_id", "ONBOARDING_CLIENT_ID")
bindEnv(logger, "onboarding_client_secret", "ONBOARDING_CLIENT_SECRET")
bindEnv(logger, "onboarding_endpoint", "ONBOARDING_ENDPOINT")
bindEnv(logger, "token_endpoint", "TOKEN_ENDPOINT")
bindEnv(logger, "tls_skip_verify", "TLS_SKIP_VERIFY")

// Set default values
viper.SetDefault("cluster_creds", "clustercreds")
Expand All @@ -89,18 +89,18 @@ func main() {
TlsSkipVerify: viper.GetBool("tls_skip_verify"),
}

logger.Infow("Starting application",
"cluster_creds_secret", envConfig.ClusterCreds,
"cilium_namespace", envConfig.CiliumNamespace,
"onboarding_client_id", envConfig.OnboardingClientId,
"onboarding_endpoint", envConfig.OnboardingEndpoint,
"token_endpoint", envConfig.TokenEndpoint,
"tls_skip_verify", envConfig.TlsSkipVerify,
logger.Info("Starting application",
zap.String("cluster_creds_secret", envConfig.ClusterCreds),
zap.String("cilium_namespace", envConfig.CiliumNamespace),
zap.String("onboarding_client_id", envConfig.OnboardingClientId),
zap.String("onboarding_endpoint", envConfig.OnboardingEndpoint),
zap.String("token_endpoint", envConfig.TokenEndpoint),
zap.Bool("tls_skip_verify", envConfig.TlsSkipVerify),
)

// Start the gops agent and listen on a specific address and port
if err := agent.Listen(agent.Options{}); err != nil {
logger.Errorw("Failed to start gops agent", "error", err)
logger.Error("Failed to start gops agent", zap.Error(err))
}
http.HandleFunc("/healthz", newHealthHandler(controller.ServerIsHealthy))
healthChecker := &http.Server{Addr: ":8080"}
Expand Down
20 changes: 10 additions & 10 deletions internal/controller/authenticator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var kacp = keepalive.ClientParameters{

// Authenticator keeps a logger for its own methods.
type Authenticator struct {
Logger *zap.SugaredLogger
Logger *zap.Logger
}

// GetOnboardingCredentials returns credentials to onboard this cluster with CloudSecure.
Expand All @@ -52,14 +52,14 @@ func (authn *Authenticator) ReadCredentialsK8sSecrets(ctx context.Context, secre
// Create a new clientset
clientset, err := NewClientSet()
if err != nil {
authn.Logger.Errorw("Failed to create clientSet", "error", err)
authn.Logger.Error("Failed to create clientSet", zap.Error(err))
return "", "", err
}

// Get the secret
secret, err := clientset.CoreV1().Secrets("illumio-cloud").Get(ctx, secretName, metav1.GetOptions{})
if err != nil {
authn.Logger.Errorw("Failed to get secret", "error", err)
authn.Logger.Error("Failed to get secret", zap.Error(err))
return "", "", err
}

Expand All @@ -78,7 +78,7 @@ func (authn *Authenticator) ReadCredentialsK8sSecrets(ctx context.Context, secre
func (authn *Authenticator) DoesK8sSecretExist(ctx context.Context, secretName string) bool {
clientset, err := NewClientSet()
if err != nil {
authn.Logger.Errorw("Failed to create clientSet", "error", err)
authn.Logger.Error("Failed to create clientSet", zap.Error(err))
}

// Get the secret -> illumio-cloud will need to be configurable
Expand All @@ -90,7 +90,7 @@ func (authn *Authenticator) DoesK8sSecretExist(ctx context.Context, secretName s
func (authn *Authenticator) WriteK8sSecret(ctx context.Context, keyData OnboardResponse, ClusterCreds string) error {
clientset, err := NewClientSet()
if err != nil {
authn.Logger.Errorw("Failed to create clientSet", "error", err)
authn.Logger.Error("Failed to create clientSet", zap.Error(err))
return err
}

Expand All @@ -109,7 +109,7 @@ func (authn *Authenticator) WriteK8sSecret(ctx context.Context, keyData OnboardR

_, err = clientset.CoreV1().Secrets(namespace).Update(ctx, secret, metav1.UpdateOptions{})
if err != nil {
authn.Logger.Errorw("Failed to update secret", "error", err)
authn.Logger.Error("Failed to update secret", zap.Error(err))
return err
}
return nil
Expand Down Expand Up @@ -145,7 +145,7 @@ func IsRunningInCluster() bool {
// SetUpOAuthConnection establishes a gRPC connection using OAuth credentials and logging the process.
func SetUpOAuthConnection(
ctx context.Context,
logger *zap.SugaredLogger,
logger *zap.Logger,
tokenURL string,
tlsSkipVerify bool,
clientID string,
Expand All @@ -164,19 +164,19 @@ func SetUpOAuthConnection(

token, err := tokenSource.Token()
if err != nil {
logger.Errorw("Error retrieving a valid token", "error", err)
logger.Error("Error retrieving a valid token", zap.Error(err))
return nil, err
}

claims, err := ParseToken(token.AccessToken)
if err != nil {
logger.Errorw("Error parsing token", "error", err)
logger.Error("Error parsing token", zap.Error(err))
return nil, err
}

aud, err := getFirstAudience(logger, claims)
if err != nil {
logger.Errorw("Error pulling audience out of token", "error", err)
logger.Error("Error pulling audience out of token", zap.Error(err))
return nil, err
}
tokenSource = GetTokenSource(ctx, oauthConfig, tlsConfig)
Expand Down
4 changes: 2 additions & 2 deletions internal/controller/authenticator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (suite *ControllerTestSuite) TestGetOnboardingCredentials() {
core := zapcore.NewTee(
zapcore.NewCore(encoder, consoleSyncer, zapcore.InfoLevel),
)
logger := zap.New(core, zap.AddCaller(), zap.AddCallerSkip(1)).Sugar()
logger := zap.New(core, zap.AddCaller(), zap.AddCallerSkip(1))
logger = logger.With(zap.String("name", "test"))

tests := map[string]struct {
Expand Down Expand Up @@ -78,7 +78,7 @@ func (suite *ControllerTestSuite) TestReadCredentialsK8sSecrets() {
core := zapcore.NewTee(
zapcore.NewCore(encoder, consoleSyncer, zapcore.InfoLevel),
)
logger := zap.New(core, zap.AddCaller(), zap.AddCallerSkip(1)).Sugar()
logger := zap.New(core, zap.AddCaller(), zap.AddCallerSkip(1))
logger = logger.With(zap.String("name", "test"))

tests := map[string]struct {
Expand Down
12 changes: 6 additions & 6 deletions internal/controller/flow_cilium.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (

// CiliumFlowCollector collects flows from Cilium Hubble Relay running in this cluster.
type CiliumFlowCollector struct {
logger *zap.SugaredLogger
logger *zap.Logger
client observer.ObserverClient
}

Expand Down Expand Up @@ -49,7 +49,7 @@ func discoverCiliumHubbleRelayAddress(ctx context.Context, ciliumNamespace strin
}

// newCiliumCollector connects to Ciilium Hubble Relay, sets up an Observer client, and returns a new Collector using it.
func newCiliumFlowCollector(ctx context.Context, logger *zap.SugaredLogger, ciliumNamespace string) (*CiliumFlowCollector, error) {
func newCiliumFlowCollector(ctx context.Context, logger *zap.Logger, ciliumNamespace string) (*CiliumFlowCollector, error) {
config, err := NewClientSet()
if err != nil {
return nil, fmt.Errorf("failed to create new client set: %w", err)
Expand Down Expand Up @@ -163,13 +163,13 @@ func (fm *CiliumFlowCollector) exportCiliumFlows(ctx context.Context, sm streamM
observerClient := fm.client
stream, err := observerClient.GetFlows(ctx, req)
if err != nil {
fm.logger.Errorw("Error getting network flows", "error", err)
fm.logger.Error("Error getting network flows", zap.Error(err))
return err
}
defer func() {
err = stream.CloseSend()
if err != nil {
fm.logger.Errorw("Error closing observerClient stream", "error", err)
fm.logger.Error("Error closing observerClient stream", zap.Error(err))
}
}()
for {
Expand All @@ -180,7 +180,7 @@ func (fm *CiliumFlowCollector) exportCiliumFlows(ctx context.Context, sm streamM
}
flow, err := stream.Recv()
if err != nil {
fm.logger.Warnw("Failed to get flow log from stream", "error", err)
fm.logger.Warn("Failed to get flow log from stream", zap.Error(err))
return err
}
ciliumFlow := convertCiliumFlow(flow)
Expand All @@ -189,7 +189,7 @@ func (fm *CiliumFlowCollector) exportCiliumFlows(ctx context.Context, sm streamM
}
err = sendNetworkFlowRequest(&sm, ciliumFlow)
if err != nil {
fm.logger.Errorw("Cannot send cilium flow", "error", err)
fm.logger.Error("Cannot send cilium flow", zap.Error(err))
return err
}
}
Expand Down
Loading
Loading