diff --git a/go.mod b/go.mod index b1778d5..0e1cae7 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/knadh/koanf/v2 v2.3.3 github.com/lestrrat-go/httprc/v3 v3.0.4 github.com/lestrrat-go/jwx/v3 v3.0.13 + github.com/rs/zerolog v1.34.0 github.com/spf13/cobra v1.10.2 github.com/spf13/pflag v1.0.10 github.com/stretchr/testify v1.11.1 @@ -62,6 +63,8 @@ require ( github.com/lestrrat-go/dsig-secp256k1 v1.0.0 // indirect github.com/lestrrat-go/httpcc v1.0.1 // indirect github.com/lestrrat-go/option/v2 v2.0.0 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.19 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/pelletier/go-toml/v2 v2.2.4 // indirect diff --git a/go.sum b/go.sum index 8368fa9..8aa5703 100644 --- a/go.sum +++ b/go.sum @@ -36,6 +36,7 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cncf/xds/go v0.0.0-20260202195803-dba9d589def2 h1:aBangftG7EVZoUb69Os8IaYg++6uMOdKK83QtkkvJik= github.com/cncf/xds/go v0.0.0-20260202195803-dba9d589def2/go.mod h1:qwXFYgsP6T7XnJtbKlf1HP8AjxZZyzxMmc+Lq5GjlU4= +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -58,6 +59,7 @@ github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/goccy/go-yaml v1.19.2 h1:PmFC1S6h8ljIz6gMRBopkjP1TVT7xuwrButHID66PoM= github.com/goccy/go-yaml v1.19.2/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 h1:f+oWsMOmNPc8JmEHVZIycC7hBoQxHH9pNKQORJNozsQ= github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8/go.mod h1:wcDNUvekVysuuOpQKo3191zZyTpiI6se1N1ULghS0sw= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= @@ -108,12 +110,18 @@ github.com/lestrrat-go/jwx/v3 v3.0.13 h1:AdHKiPIYeCSnOJtvdpipPg/0SuFh9rdkN+HF3O0 github.com/lestrrat-go/jwx/v3 v3.0.13/go.mod h1:2m0PV1A9tM4b/jVLMx8rh6rBl7F6WGb3EG2hufN9OQU= github.com/lestrrat-go/option/v2 v2.0.0 h1:XxrcaJESE1fokHy3FpaQ/cXW8ZsIdWcdFzzLOcID3Ss= github.com/lestrrat-go/option/v2 v2.0.0/go.mod h1:oSySsmzMoR0iRzCDCaUfsCzxQHUEuhOViQObyy7S6Vg= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= +github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4= github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/planetscale/vtprotobuf v0.6.1-0.20250313105119-ba97887b0a25 h1:S1hI5JiKP7883xBzZAr1ydcxrKNSVNm7+3+JwjxZEsg= github.com/planetscale/vtprotobuf v0.6.1-0.20250313105119-ba97887b0a25/go.mod h1:ZQntvDG8TkPgljxtA0R9frDoND4QORU1VXz015N5Ks4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -121,6 +129,9 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= +github.com/rs/zerolog v1.34.0 h1:k43nTLIwcTVQAncfCw4KZ2VY6ukYoZaBPNOE8txlOeY= +github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6wYQ= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/segmentio/asm v1.2.1 h1:DTNbBqs57ioxAD4PrArqftgypG4/qNpXoJx8TVXxPR0= github.com/segmentio/asm v1.2.1/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= @@ -157,6 +168,9 @@ golang.org/x/exp v0.0.0-20260218203240-3dfff04db8fa h1:Zt3DZoOFFYkKhDT3v7Lm9FDME golang.org/x/exp v0.0.0-20260218203240-3dfff04db8fa/go.mod h1:K79w1Vqn7PoiZn+TkNpx3BUWUQksGO3JcVX6qIjytmA= golang.org/x/net v0.51.0 h1:94R/GTO7mt3/4wIKpcR5gkGmRLOuE/2hNGeWq/GBIFo= golang.org/x/net v0.51.0/go.mod h1:aamm+2QF5ogm02fjy5Bb7CQ0WMt1/WVM7FtyaTLlA9Y= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk= diff --git a/internal/cli/serve.go b/internal/cli/serve.go index 66378f0..1bee950 100644 --- a/internal/cli/serve.go +++ b/internal/cli/serve.go @@ -11,9 +11,104 @@ import ( "github.com/spf13/cobra" "github.com/project-kessel/parsec/internal/config" + "github.com/project-kessel/parsec/internal/datasource" + "github.com/project-kessel/parsec/internal/keys" + "github.com/project-kessel/parsec/internal/probe" "github.com/project-kessel/parsec/internal/server" + "github.com/project-kessel/parsec/internal/service" + "github.com/project-kessel/parsec/internal/trust" ) +type infraEventConfigs struct { + configReload *config.EventLoggingConfig + dataSourceCache *config.EventLoggingConfig + keyRotation *config.EventLoggingConfig + keyProvider *config.EventLoggingConfig + trustValidation *config.EventLoggingConfig + jwksCache *config.EventLoggingConfig + serverLifecycle *config.EventLoggingConfig +} + +type wiredInfraObservers struct { + configReload config.ConfigReloadObserver + keyRotation keys.KeyRotationObserver + keyProvider keys.KeyProviderObserver + trustValidation trust.TrustValidationObserver + dataSourceCache datasource.DataSourceCacheObserver + jwksCache server.JWKSObserver + serverLifecycle server.ServerLifecycleObserver +} + +type runtimeComponents struct { + trustStore trust.Store + tokenService *service.TokenService + authzTokenTypes []server.TokenTypeSpec + claimsFilterRegistry server.ClaimsFilterRegistry + issuerRegistry service.Registry +} + +func buildInfraEventConfigs(obs *config.ObservabilityConfig) infraEventConfigs { + if obs == nil { + return infraEventConfigs{} + } + return infraEventConfigs{ + configReload: obs.ConfigReload, + dataSourceCache: obs.DataSourceCache, + keyRotation: obs.KeyRotation, + keyProvider: obs.KeyProvider, + trustValidation: obs.TrustValidation, + jwksCache: obs.JWKSCache, + serverLifecycle: obs.ServerLifecycle, + } +} + +func wireInfraObservers(logCtx config.LoggerContext, infraCfg infraEventConfigs) wiredInfraObservers { + return wiredInfraObservers{ + configReload: probe.NewLoggingConfigReloadObserver(config.EventLogger(logCtx, "config_reload", infraCfg.configReload)), + keyRotation: probe.NewLoggingKeyRotationObserver(config.EventLogger(logCtx, "key_rotation", infraCfg.keyRotation)), + keyProvider: probe.NewLoggingKeyProviderObserver(config.EventLogger(logCtx, "key_provider", infraCfg.keyProvider)), + trustValidation: probe.NewLoggingTrustValidationObserver(config.EventLogger(logCtx, "trust_validation", infraCfg.trustValidation)), + dataSourceCache: probe.NewLoggingDataSourceCacheObserver(config.EventLogger(logCtx, "datasource_cache", infraCfg.dataSourceCache)), + jwksCache: probe.NewLoggingJWKSObserver(config.EventLogger(logCtx, "jwks_cache", infraCfg.jwksCache)), + serverLifecycle: probe.NewLoggingServerLifecycleObserver(config.EventLogger(logCtx, "server_lifecycle", infraCfg.serverLifecycle)), + } +} + +func buildRuntimeComponents(provider *config.Provider) (*runtimeComponents, error) { + trustStore, err := provider.TrustStore() + if err != nil { + return nil, fmt.Errorf("failed to create trust store: %w", err) + } + + tokenService, err := provider.TokenService() + if err != nil { + return nil, fmt.Errorf("failed to create token service: %w", err) + } + + authzTokenTypes, err := provider.AuthzServerTokenTypes() + if err != nil { + return nil, fmt.Errorf("failed to get authz token types: %w", err) + } + + claimsFilterRegistry, err := provider.ExchangeServerClaimsFilterRegistry() + if err != nil { + return nil, fmt.Errorf("failed to get exchange server claims filter registry: %w", err) + } + + issuerRegistry, err := provider.IssuerRegistry() + if err != nil { + return nil, fmt.Errorf("failed to get issuer registry: %w", err) + } + + return &runtimeComponents{ + trustStore: trustStore, + tokenService: tokenService, + authzTokenTypes: authzTokenTypes, + claimsFilterRegistry: claimsFilterRegistry, + issuerRegistry: issuerRegistry, + }, nil +} + // NewServeCmd creates the serve command func NewServeCmd() *cobra.Command { cmd := &cobra.Command{ @@ -63,10 +158,8 @@ func runServe(cmd *cobra.Command, args []string) error { // 1. Determine config file path configPath := configFile if configPath == "" { - // Check environment variable configPath = os.Getenv("PARSEC_CONFIG") } - // If still empty, configPath remains empty and we'll use env vars/flags only // 2. Load configuration (file + env vars + flags) loader, err := config.NewLoaderWithFlags(configPath, cmd.Flags()) @@ -79,52 +172,44 @@ func runServe(cmd *cobra.Command, args []string) error { return fmt.Errorf("failed to parse config: %w", err) } - // 3. Create provider to build all components from config - provider := config.NewProvider(cfg) + // 2a. Validate observability config early so typos surface at startup + if err := config.ValidateObservabilityConfig(cfg.Observability); err != nil { + return fmt.Errorf("invalid observability config: %w", err) + } - // 4. Create logger and observer — single instance shared across all components - logger := config.NewLogger(cfg.Observability) + // 3. Create logger, observers, then provider (observers must exist before provider) + logCtx := config.NewLoggerContext(cfg.Observability) + logger := logCtx.Logger + infraCfg := buildInfraEventConfigs(cfg.Observability) + infraObservers := wireInfraObservers(logCtx, infraCfg) + loader.SetObserver(infraObservers.configReload) - observer, err := config.NewObserverWithLogger(cfg.Observability, logger) + observer, err := config.NewObserverWithLogger(cfg.Observability, logCtx) if err != nil { return fmt.Errorf("failed to create observer: %w", err) } - // Inject into provider so TokenService and other internal components use the same observer - provider.SetObserver(observer) + // 4. Create provider with all dependencies upfront — no temporal coupling + provider := config.NewProvider(cfg, config.ProviderDeps{ + Observer: observer, + KeyRotationObserver: infraObservers.keyRotation, + KeyProviderObserver: infraObservers.keyProvider, + TrustObserver: infraObservers.trustValidation, + CacheObserver: infraObservers.dataSourceCache, + }) // 5. Build components via provider - trustStore, err := provider.TrustStore() - if err != nil { - return fmt.Errorf("failed to create trust store: %w", err) - } - - tokenService, err := provider.TokenService() - if err != nil { - return fmt.Errorf("failed to create token service: %w", err) - } - - authzTokenTypes, err := provider.AuthzServerTokenTypes() - if err != nil { - return fmt.Errorf("failed to get authz token types: %w", err) - } - - claimsFilterRegistry, err := provider.ExchangeServerClaimsFilterRegistry() - if err != nil { - return fmt.Errorf("failed to get exchange server claims filter registry: %w", err) - } - - issuerRegistry, err := provider.IssuerRegistry() + components, err := buildRuntimeComponents(provider) if err != nil { - return fmt.Errorf("failed to get issuer registry: %w", err) + return err } // 6. Create service handlers with observability - authzServer := server.NewAuthzServer(trustStore, tokenService, authzTokenTypes, observer) - exchangeServer := server.NewExchangeServer(trustStore, tokenService, claimsFilterRegistry, observer) + authzServer := server.NewAuthzServer(components.trustStore, components.tokenService, components.authzTokenTypes, observer) + exchangeServer := server.NewExchangeServer(components.trustStore, components.tokenService, components.claimsFilterRegistry, observer) jwksServer := server.NewJWKSServer(server.JWKSServerConfig{ - IssuerRegistry: issuerRegistry, - Logger: logger, + IssuerRegistry: components.issuerRegistry, + Observer: infraObservers.jwksCache, }) // Start JWKS background refresh @@ -153,40 +238,40 @@ func runServe(cmd *cobra.Command, args []string) error { AuthzServer: authzServer, ExchangeServer: exchangeServer, JWKSServer: jwksServer, + Observer: infraObservers.serverLifecycle, }) if err := srv.Start(ctx); err != nil { return fmt.Errorf("failed to start server: %w", err) } // 8a. All components initialized — signal readiness via gRPC health service. - // Per-service statuses transition from NOT_SERVING to SERVING. srv.SetReady() grpcAddr := grpcListener.Addr().String() httpAddr := httpListener.Addr().String() - fmt.Println("parsec is running") - fmt.Printf(" gRPC (ext_authz): %s\n", grpcAddr) - fmt.Printf(" HTTP (token exchange): http://%s/v1/token\n", httpAddr) - fmt.Printf(" HTTP (JWKS): http://%s/v1/jwks.json\n", httpAddr) - fmt.Printf(" http://%s/.well-known/jwks.json\n", httpAddr) - fmt.Printf(" Health (gRPC): %s (grpc.health.v1.Health)\n", grpcAddr) - fmt.Printf(" Health (HTTP live): http://%s/healthz/live\n", httpAddr) - fmt.Printf(" Health (HTTP ready): http://%s/healthz/ready\n", httpAddr) - fmt.Printf(" Trust Domain: %s\n", provider.TrustDomain()) - fmt.Printf(" Config: %s\n", configPath) + logger.Info(). + Str("grpc_addr", grpcAddr). + Str("http_addr", httpAddr). + Str("token_exchange_url", "http://"+httpAddr+"/v1/token"). + Str("jwks_url", "http://"+httpAddr+"/v1/jwks.json"). + Str("jwks_wellknown_url", "http://"+httpAddr+"/.well-known/jwks.json"). + Str("health_grpc", grpcAddr+" (grpc.health.v1.Health)"). + Str("trust_domain", provider.TrustDomain()). + Str("config", configPath). + Msg("parsec is running") // 9. Wait for interrupt signal sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) <-sigCh - fmt.Println("\nShutting down...") + logger.Info().Msg("Shutting down") // 10. Graceful shutdown if err := srv.Stop(ctx); err != nil { return fmt.Errorf("error during shutdown: %w", err) } - fmt.Println("Shutdown complete") + logger.Info().Msg("Shutdown complete") return nil } diff --git a/internal/config/config.go b/internal/config/config.go index 15f84bc..813079e 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -345,9 +345,16 @@ type ObservabilityConfig struct { LogFormat string `koanf:"log_format" usage:"log format: json, text"` // Event-specific logging configuration - TokenIssuance *EventLoggingConfig `koanf:"token_issuance"` - TokenExchange *EventLoggingConfig `koanf:"token_exchange"` - AuthzCheck *EventLoggingConfig `koanf:"authz_check"` + TokenIssuance *EventLoggingConfig `koanf:"token_issuance"` + TokenExchange *EventLoggingConfig `koanf:"token_exchange"` + AuthzCheck *EventLoggingConfig `koanf:"authz_check"` + ConfigReload *EventLoggingConfig `koanf:"config_reload"` + DataSourceCache *EventLoggingConfig `koanf:"datasource_cache"` + KeyRotation *EventLoggingConfig `koanf:"key_rotation"` + KeyProvider *EventLoggingConfig `koanf:"key_provider"` + TrustValidation *EventLoggingConfig `koanf:"trust_validation"` + JWKSCache *EventLoggingConfig `koanf:"jwks_cache"` + ServerLifecycle *EventLoggingConfig `koanf:"server_lifecycle"` // Composite observer fields - allows multiple observers Observers []ObservabilityConfig `koanf:"observers"` diff --git a/internal/config/datasources.go b/internal/config/datasources.go index 26f278c..c9da044 100644 --- a/internal/config/datasources.go +++ b/internal/config/datasources.go @@ -12,11 +12,11 @@ import ( ) // NewDataSourceRegistry creates a data source registry from configuration -func NewDataSourceRegistry(cfg []DataSourceConfig, transport http.RoundTripper) (*service.DataSourceRegistry, error) { +func NewDataSourceRegistry(cfg []DataSourceConfig, transport http.RoundTripper, cacheObs datasource.DataSourceCacheObserver) (*service.DataSourceRegistry, error) { registry := service.NewDataSourceRegistry() for _, dsCfg := range cfg { - ds, err := newDataSource(dsCfg, transport) + ds, err := newDataSource(dsCfg, transport, cacheObs) if err != nil { return nil, fmt.Errorf("failed to create data source %s: %w", dsCfg.Name, err) } @@ -27,17 +27,17 @@ func NewDataSourceRegistry(cfg []DataSourceConfig, transport http.RoundTripper) } // newDataSource creates a data source from configuration -func newDataSource(cfg DataSourceConfig, transport http.RoundTripper) (service.DataSource, error) { +func newDataSource(cfg DataSourceConfig, transport http.RoundTripper, cacheObs datasource.DataSourceCacheObserver) (service.DataSource, error) { switch cfg.Type { case "lua": - return newLuaDataSource(cfg, transport) + return newLuaDataSource(cfg, transport, cacheObs) default: return nil, fmt.Errorf("unknown data source type: %s (supported: lua)", cfg.Type) } } // newLuaDataSource creates a Lua data source with optional caching -func newLuaDataSource(cfg DataSourceConfig, transport http.RoundTripper) (service.DataSource, error) { +func newLuaDataSource(cfg DataSourceConfig, transport http.RoundTripper, cacheObs datasource.DataSourceCacheObserver) (service.DataSource, error) { if cfg.Name == "" { return nil, fmt.Errorf("data source name is required") } @@ -87,7 +87,7 @@ func newLuaDataSource(cfg DataSourceConfig, transport http.RoundTripper) (servic // Wrap with caching if configured if cfg.Caching != nil { - return wrapWithCaching(baseDS, *cfg.Caching) + return wrapWithCaching(baseDS, *cfg.Caching, cacheObs) } return baseDS, nil @@ -117,11 +117,10 @@ func buildHTTPConfig(cfg *HTTPConfig, transport http.RoundTripper) (*luaservices } // wrapWithCaching wraps a data source with the configured caching layer -func wrapWithCaching(ds service.DataSource, cfg CachingConfig) (service.DataSource, error) { +func wrapWithCaching(ds service.DataSource, cfg CachingConfig, cacheObs datasource.DataSourceCacheObserver) (service.DataSource, error) { switch cfg.Type { case "in_memory": - // In-memory caching uses the Cacheable interface from the data source - return datasource.NewInMemoryCachingDataSource(ds), nil + return datasource.NewInMemoryCachingDataSource(ds, datasource.WithCacheObserver(cacheObs)), nil case "distributed": groupName := cfg.GroupName diff --git a/internal/config/issuers.go b/internal/config/issuers.go index 00f8707..46ed211 100644 --- a/internal/config/issuers.go +++ b/internal/config/issuers.go @@ -14,12 +14,18 @@ import ( "github.com/project-kessel/parsec/internal/service" ) +// IssuerRegistryConfig groups optional dependencies for NewIssuerRegistry. +type IssuerRegistryConfig struct { + KeyRotationObserver keys.KeyRotationObserver + KeyProviderObserver keys.KeyProviderObserver +} + // NewIssuerRegistry creates an issuer registry from configuration -func NewIssuerRegistry(cfg Config) (service.Registry, error) { +func NewIssuerRegistry(cfg Config, opts IssuerRegistryConfig) (service.Registry, error) { registry := service.NewSimpleRegistry() // Build key provider registry from global config - providerRegistry, err := buildKeyProviderRegistry(cfg.KeyProviders) + providerRegistry, err := buildKeyProviderRegistry(cfg.KeyProviders, opts.KeyProviderObserver) if err != nil { return nil, fmt.Errorf("failed to build key provider registry: %w", err) } @@ -28,7 +34,7 @@ func NewIssuerRegistry(cfg Config) (service.Registry, error) { slotStore := keys.NewInMemoryKeySlotStore() // Build signer registry from global config - signerRegistry, err := buildSignerRegistry(cfg.Signers, cfg.TrustDomain, providerRegistry, slotStore) + signerRegistry, err := buildSignerRegistry(cfg.Signers, cfg.TrustDomain, providerRegistry, slotStore, opts.KeyRotationObserver) if err != nil { return nil, fmt.Errorf("failed to build signer registry: %w", err) } @@ -61,7 +67,7 @@ func NewIssuerRegistry(cfg Config) (service.Registry, error) { } // buildKeyProviderRegistry creates a map of KeyProvider instances from configuration -func buildKeyProviderRegistry(configs []KeyProviderConfig) (map[string]keys.KeyProvider, error) { +func buildKeyProviderRegistry(configs []KeyProviderConfig, providerObs keys.KeyProviderObserver) (map[string]keys.KeyProvider, error) { registry := make(map[string]keys.KeyProvider) for _, cfg := range configs { @@ -111,6 +117,7 @@ func buildKeyProviderRegistry(configs []KeyProviderConfig) (map[string]keys.KeyP Algorithm: cfg.Algorithm, Region: cfg.Region, AliasPrefix: cfg.AliasPrefix, + Observer: providerObs, }) if err != nil { return nil, fmt.Errorf("failed to create aws_kms key provider %s: %w", cfg.ID, err) @@ -127,7 +134,7 @@ func buildKeyProviderRegistry(configs []KeyProviderConfig) (map[string]keys.KeyP } // buildSignerRegistry creates a SignerRegistry from configuration -func buildSignerRegistry(configs []SignerConfig, trustDomain string, providerRegistry map[string]keys.KeyProvider, slotStore keys.KeySlotStore) (*keys.SignerRegistry, error) { +func buildSignerRegistry(configs []SignerConfig, trustDomain string, providerRegistry map[string]keys.KeyProvider, slotStore keys.KeySlotStore, rotObs keys.KeyRotationObserver) (*keys.SignerRegistry, error) { registry := keys.NewSignerRegistry() for _, cfg := range configs { @@ -211,6 +218,7 @@ func buildSignerRegistry(configs []SignerConfig, trustDomain string, providerReg GracePeriod: gracePeriod, CheckInterval: checkInterval, PrepareTimeout: prepareTimeout, + Observer: rotObs, }) default: return nil, fmt.Errorf("unknown signer type for %s: %s (supported: dual_slot)", cfg.ID, cfg.Type) diff --git a/internal/config/loader.go b/internal/config/loader.go index daacad3..3948d03 100644 --- a/internal/config/loader.go +++ b/internal/config/loader.go @@ -22,6 +22,7 @@ import ( type Loader struct { k *koanf.Koanf configPath string + observer ConfigReloadObserver } // NewLoader creates a new configuration loader that reads from a file @@ -126,6 +127,11 @@ func newLoader(configPath string, flags *pflag.FlagSet) (*Loader, error) { }, nil } +// SetObserver sets the observer for config reload events. +func (l *Loader) SetObserver(observer ConfigReloadObserver) { + l.observer = observer +} + // Get unmarshals the configuration into a Config struct func (l *Loader) Get() (*Config, error) { var cfg Config @@ -153,53 +159,46 @@ func (l *Loader) Watch(ctx context.Context, onChange func(*Config) error) error // Set up file watcher if err := fp.Watch(func(event interface{}, err error) { if err != nil { - // Log error but continue watching - fmt.Printf("config watch error: %v\n", err) + l.observer.ConfigReloadFailed("watch", err) return } - // Reload the config parser, err := getParserForFile(l.configPath) if err != nil { - fmt.Printf("config parser error: %v\n", err) + l.observer.ConfigReloadFailed("parser", err) return } - // Create new koanf instance for reload (same precedence as startup) k := koanf.New(".") if err := k.Load(confmap.Provider(getDefaults(), "."), nil); err != nil { - fmt.Printf("config defaults reload error: %v\n", err) + l.observer.ConfigReloadFailed("defaults", err) return } if err := k.Load(fp, parser); err != nil { - fmt.Printf("config reload error: %v\n", err) + l.observer.ConfigReloadFailed("reload", err) return } - // Reload env vars if err := k.Load(env.Provider(".", env.Opt{ Prefix: "PARSEC_", TransformFunc: func(k, v string) (string, any) { return envTransform(k), v }, }), nil); err != nil { - fmt.Printf("env reload error: %v\n", err) + l.observer.ConfigReloadFailed("env", err) return } - // Unmarshal new config var cfg Config if err := k.Unmarshal("", &cfg); err != nil { - fmt.Printf("config unmarshal error: %v\n", err) + l.observer.ConfigReloadFailed("unmarshal", err) return } - // Update loader's koanf instance l.k = k - // Call onChange callback if err := onChange(&cfg); err != nil { - fmt.Printf("config onChange error: %v\n", err) + l.observer.ConfigReloadFailed("onChange", err) } }); err != nil { return fmt.Errorf("failed to watch config file: %w", err) diff --git a/internal/config/observability.go b/internal/config/observability.go index 4c68650..de2e67a 100644 --- a/internal/config/observability.go +++ b/internal/config/observability.go @@ -1,65 +1,96 @@ package config import ( - "context" "fmt" - "log/slog" + "io" "os" "strings" + "github.com/rs/zerolog" + "github.com/project-kessel/parsec/internal/probe" "github.com/project-kessel/parsec/internal/service" ) +// LoggerContext couples a zerolog logger with its destination writer so +// per-event formatting overrides can preserve the original sink. +type LoggerContext struct { + Logger zerolog.Logger + Writer io.Writer +} + // NewObserver creates an application observer from configuration. // This is a convenience wrapper that creates its own logger from cfg. func NewObserver(cfg *ObservabilityConfig) (service.ApplicationObserver, error) { - return NewObserverWithLogger(cfg, NewLogger(cfg)) + return NewObserverWithLogger(cfg, NewLoggerContext(cfg)) } // NewObserverWithLogger creates an application observer using the provided logger. // Use this when you want the observer to share a logger with other components. -func NewObserverWithLogger(cfg *ObservabilityConfig, logger *slog.Logger) (service.ApplicationObserver, error) { +func NewObserverWithLogger(cfg *ObservabilityConfig, logCtx LoggerContext) (service.ApplicationObserver, error) { if cfg == nil { - // Default to no-op observer if not configured - return &service.NoOpApplicationObserver{}, nil + return newNoopObserver(), nil } switch cfg.Type { case "logging": - return probe.NewLoggingObserverWithConfig(probe.LoggingObserverConfig{ - Logger: logger, - }), nil + return newLoggingObserver(cfg, logCtx), nil case "noop", "": - return &service.NoOpApplicationObserver{}, nil + return newNoopObserver(), nil case "composite": - return newCompositeObserver(cfg) + return newCompositeObserver(cfg, logCtx) default: return nil, fmt.Errorf("unknown observability type: %s (supported: logging, noop, composite)", cfg.Type) } } -// NewLogger creates a structured logger from the observability configuration. -// Returns slog.Default() if cfg is nil. -func NewLogger(cfg *ObservabilityConfig) *slog.Logger { +// NewLogger creates a structured zerolog logger from the observability configuration. +func NewLogger(cfg *ObservabilityConfig) zerolog.Logger { + return NewLoggerContext(cfg).Logger +} + +func newLoggingObserver(cfg *ObservabilityConfig, logCtx LoggerContext) service.ApplicationObserver { + return probe.NewLoggingObserverWithConfig(probe.LoggingObserverConfig{ + TokenIssuanceLogger: EventLogger(logCtx, "token_issuance", cfg.TokenIssuance), + TokenExchangeLogger: EventLogger(logCtx, "token_exchange", cfg.TokenExchange), + AuthzCheckLogger: EventLogger(logCtx, "authz_check", cfg.AuthzCheck), + }) +} + +func newNoopObserver() service.ApplicationObserver { + return &service.NoOpApplicationObserver{} +} + +// NewLoggerContext creates a structured zerolog logger and the writer used as +// its sink. Writer holds the raw destination (e.g. os.Stdout), never a +// format wrapper, so EventLogger can re-wrap it with a different format. +func NewLoggerContext(cfg *ObservabilityConfig) LoggerContext { if cfg == nil { - return slog.Default() + return LoggerContext{ + Logger: zerolog.New(os.Stdout).With().Timestamp().Logger(), + Writer: os.Stdout, + } } + rawSink := os.Stdout defaultLevel := parseLogLevel(cfg.LogLevel) - handler := createEventFilteringHandler(cfg, defaultLevel) - return slog.New(handler) + writer := createWriter(cfg.LogFormat, rawSink) + return LoggerContext{ + Logger: zerolog.New(writer).With().Timestamp().Logger().Level(defaultLevel), + Writer: rawSink, + } } // newCompositeObserver creates a composite observer that delegates to multiple observers -func newCompositeObserver(cfg *ObservabilityConfig) (service.ApplicationObserver, error) { +func newCompositeObserver(cfg *ObservabilityConfig, logCtx LoggerContext) (service.ApplicationObserver, error) { if len(cfg.Observers) == 0 { return nil, fmt.Errorf("composite observer requires at least one sub-observer") } var observers []service.ApplicationObserver for i, subCfg := range cfg.Observers { - observer, err := NewObserver(&subCfg) + childLogCtx := deriveLoggerContext(logCtx, &subCfg) + observer, err := NewObserverWithLogger(&subCfg, childLogCtx) if err != nil { return nil, fmt.Errorf("failed to create observer %d: %w", i, err) } @@ -69,127 +100,147 @@ func newCompositeObserver(cfg *ObservabilityConfig) (service.ApplicationObserver return service.NewCompositeObserver(observers...), nil } -// createEventFilteringHandler creates a handler that filters log events based on the event attribute -func createEventFilteringHandler(cfg *ObservabilityConfig, defaultLevel slog.Level) slog.Handler { - // Create base handler - baseHandler := createHandler(cfg.LogFormat, defaultLevel) +// deriveLoggerContext builds a child LoggerContext that shares the parent's +// raw sink but applies the child config's LogLevel and/or LogFormat overrides. +// If the child specifies neither, the parent context is returned as-is. +func deriveLoggerContext(parent LoggerContext, cfg *ObservabilityConfig) LoggerContext { + if cfg == nil || (cfg.LogLevel == "" && cfg.LogFormat == "") { + return parent + } - // Build event-specific level map - eventLevels := make(map[string]slog.Level) + logger := parent.Logger - if cfg.TokenIssuance != nil { - if cfg.TokenIssuance.Enabled != nil && !*cfg.TokenIssuance.Enabled { - eventLevels["token_issuance"] = slog.Level(1000) // Effectively disabled - } else if cfg.TokenIssuance.LogLevel != "" { - eventLevels["token_issuance"] = parseLogLevel(cfg.TokenIssuance.LogLevel) - } + if cfg.LogFormat != "" { + logger = logger.Output(createWriter(cfg.LogFormat, parent.Writer)) } - - if cfg.TokenExchange != nil { - if cfg.TokenExchange.Enabled != nil && !*cfg.TokenExchange.Enabled { - eventLevels["token_exchange"] = slog.Level(1000) // Effectively disabled - } else if cfg.TokenExchange.LogLevel != "" { - eventLevels["token_exchange"] = parseLogLevel(cfg.TokenExchange.LogLevel) - } + if cfg.LogLevel != "" { + logger = logger.Level(parseLogLevel(cfg.LogLevel)) } - if cfg.AuthzCheck != nil { - if cfg.AuthzCheck.Enabled != nil && !*cfg.AuthzCheck.Enabled { - eventLevels["authz_check"] = slog.Level(1000) // Effectively disabled - } else if cfg.AuthzCheck.LogLevel != "" { - eventLevels["authz_check"] = parseLogLevel(cfg.AuthzCheck.LogLevel) - } + return LoggerContext{ + Logger: logger, + Writer: parent.Writer, } +} - return &eventFilteringHandler{ - next: baseHandler, - eventLevels: eventLevels, - defaultLevel: defaultLevel, +// EventLogger creates a pre-configured sub-logger for a specific event type. +// The returned logger has the "event" field baked in. +// +// Override precedence (applied in order): +// 1. LogFormat -- output format is always applied first +// 2. Enabled=false -- disables the event entirely (zerolog.Disabled), overrides LogLevel +// 3. LogLevel -- sets the minimum severity threshold +// +// If eventCfg is nil the logger inherits all base settings unchanged. +func EventLogger(logCtx LoggerContext, eventName string, eventCfg *EventLoggingConfig) zerolog.Logger { + logger := logCtx.Logger.With().Str("event", eventName).Logger() + if eventCfg == nil { + return logger } + if eventCfg.LogFormat != "" { + logger = logger.Output(createWriter(eventCfg.LogFormat, logCtx.Writer)) + } + if eventCfg.Enabled != nil && !*eventCfg.Enabled { + return logger.Level(zerolog.Disabled) + } + if eventCfg.LogLevel != "" { + return logger.Level(parseLogLevel(eventCfg.LogLevel)) + } + return logger } -// eventFilteringHandler wraps a handler and filters based on the event attribute -type eventFilteringHandler struct { - next slog.Handler - eventLevels map[string]slog.Level - defaultLevel slog.Level +// createWriter creates a zerolog writer based on the format string. +// It preserves destination by using fallback as the sink. +func createWriter(format string, fallback io.Writer) io.Writer { + if fallback == nil { + fallback = os.Stdout + } + switch strings.ToLower(format) { + case "text": + return zerolog.ConsoleWriter{Out: fallback} + case "json", "": + return fallback + default: + return fallback + } } -func (h *eventFilteringHandler) Enabled(ctx context.Context, level slog.Level) bool { - // For now, use default level check - // The actual filtering happens in Handle - return level >= h.defaultLevel -} +// ValidateObservabilityConfig checks that all log_level and log_format values +// in the config (both base and per-event) are recognized. Returns an error on +// the first unrecognized value so operators get fast feedback on typos. +func ValidateObservabilityConfig(cfg *ObservabilityConfig) error { + if cfg == nil { + return nil + } + if err := validateLogLevel("observability.log_level", cfg.LogLevel); err != nil { + return err + } + if err := validateLogFormat("observability.log_format", cfg.LogFormat); err != nil { + return err + } -func (h *eventFilteringHandler) Handle(ctx context.Context, record slog.Record) error { - // Extract event attribute if present - var eventName string - record.Attrs(func(attr slog.Attr) bool { - if attr.Key == "event" { - eventName = attr.Value.String() - return false // Stop iteration + events := map[string]*EventLoggingConfig{ + "token_issuance": cfg.TokenIssuance, + "token_exchange": cfg.TokenExchange, + "authz_check": cfg.AuthzCheck, + "config_reload": cfg.ConfigReload, + "datasource_cache": cfg.DataSourceCache, + "key_rotation": cfg.KeyRotation, + "key_provider": cfg.KeyProvider, + "trust_validation": cfg.TrustValidation, + "jwks_cache": cfg.JWKSCache, + "server_lifecycle": cfg.ServerLifecycle, + } + for name, ecfg := range events { + if ecfg == nil { + continue } - return true - }) - - // Check event-specific level - if eventName != "" { - if eventLevel, ok := h.eventLevels[eventName]; ok { - if record.Level < eventLevel { - return nil // Filter out - } + if err := validateLogLevel("observability."+name+".log_level", ecfg.LogLevel); err != nil { + return err + } + if err := validateLogFormat("observability."+name+".log_format", ecfg.LogFormat); err != nil { + return err } } - return h.next.Handle(ctx, record) -} - -func (h *eventFilteringHandler) WithAttrs(attrs []slog.Attr) slog.Handler { - return &eventFilteringHandler{ - next: h.next.WithAttrs(attrs), - eventLevels: h.eventLevels, - defaultLevel: h.defaultLevel, + for i := range cfg.Observers { + if err := ValidateObservabilityConfig(&cfg.Observers[i]); err != nil { + return err + } } + return nil } -func (h *eventFilteringHandler) WithGroup(name string) slog.Handler { - return &eventFilteringHandler{ - next: h.next.WithGroup(name), - eventLevels: h.eventLevels, - defaultLevel: h.defaultLevel, +func validateLogLevel(field, value string) error { + switch strings.ToLower(value) { + case "", "debug", "info", "warn", "warning", "error": + return nil + default: + return fmt.Errorf("invalid %s %q (valid: debug, info, warn, error)", field, value) } } -// createHandler creates a slog handler based on format and level -func createHandler(format string, level slog.Level) slog.Handler { - opts := &slog.HandlerOptions{ - Level: level, - } - - switch strings.ToLower(format) { - case "text": - return slog.NewTextHandler(os.Stdout, opts) - case "json", "": - return slog.NewJSONHandler(os.Stdout, opts) +func validateLogFormat(field, value string) error { + switch strings.ToLower(value) { + case "", "json", "text": + return nil default: - // Default to JSON - return slog.NewJSONHandler(os.Stdout, opts) + return fmt.Errorf("invalid %s %q (valid: json, text)", field, value) } } -// parseLogLevel parses a log level string -func parseLogLevel(levelStr string) slog.Level { +// parseLogLevel parses a log level string into a zerolog.Level. +func parseLogLevel(levelStr string) zerolog.Level { switch strings.ToLower(levelStr) { case "debug": - return slog.LevelDebug + return zerolog.DebugLevel case "info", "": - return slog.LevelInfo + return zerolog.InfoLevel case "warn", "warning": - return slog.LevelWarn + return zerolog.WarnLevel case "error": - return slog.LevelError + return zerolog.ErrorLevel default: - // Default to info - return slog.LevelInfo + return zerolog.InfoLevel } } diff --git a/internal/config/observability_test.go b/internal/config/observability_test.go new file mode 100644 index 0000000..82f777e --- /dev/null +++ b/internal/config/observability_test.go @@ -0,0 +1,204 @@ +package config + +import ( + "bytes" + "encoding/json" + "testing" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func boolPtr(b bool) *bool { return &b } + +// jsonLogCtx builds a LoggerContext that writes JSON to buf. +// Writer is set to the raw buf so format overrides work correctly. +func jsonLogCtx(buf *bytes.Buffer) LoggerContext { + return LoggerContext{ + Logger: zerolog.New(buf).With().Timestamp().Logger().Level(zerolog.InfoLevel), + Writer: buf, + } +} + +func TestEventLogger_NilConfig_InheritsBase(t *testing.T) { + var buf bytes.Buffer + logCtx := jsonLogCtx(&buf) + + logger := EventLogger(logCtx, "test_event", nil) + logger.Info().Msg("hello") + + assert.Contains(t, buf.String(), `"event":"test_event"`) + assert.Contains(t, buf.String(), `"message":"hello"`) +} + +func TestEventLogger_LevelAndEnabled(t *testing.T) { + tests := []struct { + name string + baseLevel zerolog.Level + eventCfg *EventLoggingConfig + emitLevel zerolog.Level + wantEmpty bool + }{ + { + name: "nil config inherits base level", + baseLevel: zerolog.InfoLevel, + eventCfg: nil, + emitLevel: zerolog.DebugLevel, + wantEmpty: true, + }, + { + name: "level override widens to debug", + baseLevel: zerolog.InfoLevel, + eventCfg: &EventLoggingConfig{LogLevel: "debug"}, + emitLevel: zerolog.DebugLevel, + wantEmpty: false, + }, + { + name: "level override restricts to error", + baseLevel: zerolog.DebugLevel, + eventCfg: &EventLoggingConfig{LogLevel: "error"}, + emitLevel: zerolog.InfoLevel, + wantEmpty: true, + }, + { + name: "enabled false suppresses all", + baseLevel: zerolog.InfoLevel, + eventCfg: &EventLoggingConfig{Enabled: boolPtr(false)}, + emitLevel: zerolog.ErrorLevel, + wantEmpty: true, + }, + { + name: "enabled true no suppression", + baseLevel: zerolog.InfoLevel, + eventCfg: &EventLoggingConfig{Enabled: boolPtr(true)}, + emitLevel: zerolog.InfoLevel, + wantEmpty: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var buf bytes.Buffer + logCtx := LoggerContext{ + Logger: zerolog.New(&buf).With().Timestamp().Logger().Level(tt.baseLevel), + Writer: &buf, + } + logger := EventLogger(logCtx, "evt", tt.eventCfg) + logger.WithLevel(tt.emitLevel).Msg("test msg") + + if tt.wantEmpty { + assert.Empty(t, buf.String()) + } else { + assert.Contains(t, buf.String(), "test msg") + } + }) + } +} + +func TestEventLogger_FormatOverride_JSONToText(t *testing.T) { + var buf bytes.Buffer + logCtx := jsonLogCtx(&buf) + + logger := EventLogger(logCtx, "text_event", &EventLoggingConfig{ + LogFormat: "text", + }) + logger.Info().Msg("text output") + + output := buf.String() + require.NotEmpty(t, output) + assert.False(t, json.Valid([]byte(output)), + "output should NOT be valid JSON when format overridden to text; got: %s", output) + assert.Contains(t, output, "text output") +} + +func TestEventLogger_FormatOverride_TextToJSON(t *testing.T) { + var rawBuf bytes.Buffer + textWriter := zerolog.ConsoleWriter{Out: &rawBuf, NoColor: true} + logCtx := LoggerContext{ + Logger: zerolog.New(textWriter).With().Timestamp().Logger().Level(zerolog.InfoLevel), + Writer: &rawBuf, + } + + logger := EventLogger(logCtx, "json_event", &EventLoggingConfig{ + LogFormat: "json", + }) + logger.Info().Msg("json output") + + output := rawBuf.String() + require.NotEmpty(t, output) + assert.True(t, json.Valid([]byte(output)), + "output should be valid JSON when format overridden to json; got: %s", output) +} + +func TestEventLogger_FormatAndLevel_Combined(t *testing.T) { + var buf bytes.Buffer + logCtx := jsonLogCtx(&buf) + + logger := EventLogger(logCtx, "combo", &EventLoggingConfig{ + LogFormat: "text", + LogLevel: "debug", + }) + + logger.Debug().Msg("combo debug") + output := buf.String() + + require.NotEmpty(t, output) + assert.Contains(t, output, "combo debug") + assert.False(t, json.Valid([]byte(output)), + "should be text format, not JSON; got: %s", output) +} + +func TestDeriveLoggerContext(t *testing.T) { + t.Run("child level override applies", func(t *testing.T) { + var buf bytes.Buffer + parent := LoggerContext{ + Logger: zerolog.New(&buf).With().Timestamp().Logger().Level(zerolog.InfoLevel), + Writer: &buf, + } + child := deriveLoggerContext(parent, &ObservabilityConfig{LogLevel: "debug"}) + + child.Logger.Debug().Msg("child debug") + assert.Contains(t, buf.String(), "child debug", + "child log_level=debug should widen the parent's info level") + }) + + t.Run("child format override applies", func(t *testing.T) { + var buf bytes.Buffer + parent := LoggerContext{ + Logger: zerolog.New(&buf).With().Timestamp().Logger().Level(zerolog.InfoLevel), + Writer: &buf, + } + child := deriveLoggerContext(parent, &ObservabilityConfig{LogFormat: "text"}) + + child.Logger.Info().Msg("text child") + output := buf.String() + require.NotEmpty(t, output) + assert.False(t, json.Valid([]byte(output)), + "child log_format=text should override parent JSON; got: %s", output) + }) + + t.Run("no overrides returns parent as-is", func(t *testing.T) { + var buf bytes.Buffer + parent := LoggerContext{ + Logger: zerolog.New(&buf).With().Timestamp().Logger().Level(zerolog.WarnLevel), + Writer: &buf, + } + child := deriveLoggerContext(parent, &ObservabilityConfig{}) + + child.Logger.Info().Msg("should not appear") + assert.Empty(t, buf.String(), "child with no overrides should inherit parent warn level") + }) + + t.Run("shares parent raw sink", func(t *testing.T) { + var buf bytes.Buffer + parent := LoggerContext{ + Logger: zerolog.New(&buf).With().Timestamp().Logger().Level(zerolog.InfoLevel), + Writer: &buf, + } + child := deriveLoggerContext(parent, &ObservabilityConfig{LogLevel: "debug"}) + + assert.Equal(t, parent.Writer, child.Writer, + "child must share the parent's raw sink") + }) +} diff --git a/internal/config/observer_probe_test.go b/internal/config/observer_probe_test.go new file mode 100644 index 0000000..a93a755 --- /dev/null +++ b/internal/config/observer_probe_test.go @@ -0,0 +1,11 @@ +package config_test + +import ( + "github.com/project-kessel/parsec/internal/config" + "github.com/project-kessel/parsec/internal/probe" +) + +// Compile-time check that LoggingConfigReloadObserver satisfies +// config.ConfigReloadObserver. The production code can't assert this +// directly because probe already imports config (circular import). +var _ config.ConfigReloadObserver = (*probe.LoggingConfigReloadObserver)(nil) diff --git a/internal/config/provider.go b/internal/config/provider.go index 1ca8929..c102c85 100644 --- a/internal/config/provider.go +++ b/internal/config/provider.go @@ -4,14 +4,27 @@ import ( "fmt" "net/http" + "github.com/project-kessel/parsec/internal/datasource" "github.com/project-kessel/parsec/internal/httpfixture" + "github.com/project-kessel/parsec/internal/keys" "github.com/project-kessel/parsec/internal/server" "github.com/project-kessel/parsec/internal/service" "github.com/project-kessel/parsec/internal/trust" ) -// Provider constructs all application components from configuration -// This is the main entry point for building a configured parsec instance +// ProviderDeps groups the external dependencies injected into a Provider at +// construction time. All observer fields must be non-nil; domain constructors +// call observer methods unconditionally. +type ProviderDeps struct { + Observer service.ApplicationObserver + KeyRotationObserver keys.KeyRotationObserver + KeyProviderObserver keys.KeyProviderObserver + TrustObserver trust.TrustValidationObserver + CacheObserver datasource.DataSourceCacheObserver +} + +// Provider constructs all application components from configuration. +// Create one with NewProvider. type Provider struct { config *Config @@ -24,30 +37,37 @@ type Provider struct { httpFixtureProvider httpfixture.FixtureProvider httpFixtureBuilt bool observer service.ApplicationObserver + keyRotationObserver keys.KeyRotationObserver + keyProviderObserver keys.KeyProviderObserver + trustObserver trust.TrustValidationObserver + cacheObserver datasource.DataSourceCacheObserver } -// NewProvider creates a new provider from configuration -func NewProvider(config *Config) *Provider { - return &Provider{ - config: config, +// NewProvider creates a new provider from configuration and optional deps. +// All observer fields must be non-nil; domain constructors call observer +// methods unconditionally. Without deps, the provider builds the application +// observer from config on demand. +func NewProvider(config *Config, deps ...ProviderDeps) *Provider { + p := &Provider{config: config} + if len(deps) > 0 { + d := deps[0] + p.observer = d.Observer + p.keyRotationObserver = d.KeyRotationObserver + p.keyProviderObserver = d.KeyProviderObserver + p.trustObserver = d.TrustObserver + p.cacheObserver = d.CacheObserver } -} - -// SetObserver sets the application observer for all components built by this provider. -// Must be called before TokenService() or any method that depends on the observer. -func (p *Provider) SetObserver(observer service.ApplicationObserver) { - p.observer = observer + return p } // Observer returns the configured application observer. -// If SetObserver was called, returns that observer. -// Otherwise, creates a default observer from config. +// If one was provided via ProviderDeps, it is returned directly. +// Otherwise, a default observer is built from config. func (p *Provider) Observer() (service.ApplicationObserver, error) { if p.observer != nil { return p.observer, nil } - // Build from config (fallback when SetObserver was not called) observer, err := NewObserver(p.config.Observability) if err != nil { return nil, fmt.Errorf("failed to create observer: %w", err) @@ -64,7 +84,7 @@ func (p *Provider) TrustStore() (trust.Store, error) { } transport := p.HTTPTransport() - store, err := NewTrustStore(p.config.TrustStore, transport) + store, err := NewTrustStore(p.config.TrustStore, transport, p.trustObserver) if err != nil { return nil, fmt.Errorf("failed to create trust store: %w", err) } @@ -80,7 +100,7 @@ func (p *Provider) DataSourceRegistry() (*service.DataSourceRegistry, error) { } transport := p.HTTPTransport() - registry, err := NewDataSourceRegistry(p.config.DataSources, transport) + registry, err := NewDataSourceRegistry(p.config.DataSources, transport, p.cacheObserver) if err != nil { return nil, fmt.Errorf("failed to create data source registry: %w", err) } @@ -95,7 +115,10 @@ func (p *Provider) IssuerRegistry() (service.Registry, error) { return p.issuerRegistry, nil } - registry, err := NewIssuerRegistry(*p.config) + registry, err := NewIssuerRegistry(*p.config, IssuerRegistryConfig{ + KeyRotationObserver: p.keyRotationObserver, + KeyProviderObserver: p.keyProviderObserver, + }) if err != nil { return nil, fmt.Errorf("failed to create issuer registry: %w", err) } diff --git a/internal/config/reload_observer.go b/internal/config/reload_observer.go new file mode 100644 index 0000000..e03df36 --- /dev/null +++ b/internal/config/reload_observer.go @@ -0,0 +1,12 @@ +package config + +// ConfigReloadObserver receives configuration reload events from Loader. +type ConfigReloadObserver interface { + ConfigReloadFailed(step string, err error) +} + +// NoopConfigReloadObserver satisfies ConfigReloadObserver with empty methods. +// Useful in tests that don't care about observer events. +type NoopConfigReloadObserver struct{} + +func (NoopConfigReloadObserver) ConfigReloadFailed(string, error) {} diff --git a/internal/config/validators.go b/internal/config/validators.go index 9efefde..70e2617 100644 --- a/internal/config/validators.go +++ b/internal/config/validators.go @@ -10,12 +10,12 @@ import ( ) // NewTrustStore creates a trust store from configuration -func NewTrustStore(cfg TrustStoreConfig, transport http.RoundTripper) (trust.Store, error) { +func NewTrustStore(cfg TrustStoreConfig, transport http.RoundTripper, trustObs trust.TrustValidationObserver) (trust.Store, error) { switch cfg.Type { case "stub_store": return newStubStore(cfg, transport) case "filtered_store": - return newFilteredStore(cfg, transport) + return newFilteredStore(cfg, transport, trustObs) default: return nil, fmt.Errorf("unknown trust store type: %s (supported: stub_store, filtered_store)", cfg.Type) } @@ -38,7 +38,7 @@ func newStubStore(cfg TrustStoreConfig, transport http.RoundTripper) (trust.Stor } // newFilteredStore creates a filtered trust store with validator filtering -func newFilteredStore(cfg TrustStoreConfig, transport http.RoundTripper) (trust.Store, error) { +func newFilteredStore(cfg TrustStoreConfig, transport http.RoundTripper, trustObs trust.TrustValidationObserver) (trust.Store, error) { var opts []trust.FilteredStoreOption // Add validator filter if configured @@ -50,6 +50,8 @@ func newFilteredStore(cfg TrustStoreConfig, transport http.RoundTripper) (trust. opts = append(opts, trust.WithValidatorFilter(filter)) } + opts = append(opts, trust.WithTrustValidationObserver(trustObs)) + store, err := trust.NewFilteredStore(opts...) if err != nil { return nil, fmt.Errorf("failed to create filtered store: %w", err) diff --git a/internal/datasource/in_memory_caching_datasource.go b/internal/datasource/in_memory_caching_datasource.go index 2cd99df..d0b7f4e 100644 --- a/internal/datasource/in_memory_caching_datasource.go +++ b/internal/datasource/in_memory_caching_datasource.go @@ -18,6 +18,7 @@ type InMemoryCachingDataSource struct { source service.DataSource cacheable service.Cacheable clock clock.Clock + observer DataSourceCacheObserver mu sync.RWMutex entries map[string]*cacheEntry } @@ -38,23 +39,28 @@ func WithClock(clk clock.Clock) InMemoryCachingDataSourceOption { } } -// NewInMemoryCachingDataSource wraps a data source with in-memory caching if it implements Cacheable -// Returns the original source if it doesn't implement Cacheable +// WithCacheObserver sets the observer for cache lifecycle events. +func WithCacheObserver(obs DataSourceCacheObserver) InMemoryCachingDataSourceOption { + return func(ds *InMemoryCachingDataSource) { + ds.observer = obs + } +} + +// NewInMemoryCachingDataSource wraps a data source with in-memory caching if it implements Cacheable. +// Returns the original source if it doesn't implement Cacheable. func NewInMemoryCachingDataSource(source service.DataSource, opts ...InMemoryCachingDataSourceOption) service.DataSource { cacheable, ok := source.(service.Cacheable) if !ok { - // Source is not cacheable, return as-is return source } ds := &InMemoryCachingDataSource{ source: source, cacheable: cacheable, - clock: clock.NewSystemClock(), // Default to system clock + clock: clock.NewSystemClock(), entries: make(map[string]*cacheEntry), } - // Apply options for _, opt := range opts { opt(ds) } @@ -87,17 +93,21 @@ func (c *InMemoryCachingDataSource) Fetch(ctx context.Context, input *service.Da if found { // Check if entry has expired if entry.expiresAt.IsZero() || c.clock.Now().Before(entry.expiresAt) { + c.observer.CacheHit(c.source.Name()) return entry.result, nil } - // Entry expired, remove it + c.observer.CacheExpired(c.source.Name()) c.mu.Lock() delete(c.entries, cacheKeyStr) c.mu.Unlock() } + c.observer.CacheMiss(c.source.Name()) + // Cache miss - fetch from source using the original (full) input result, err := c.source.Fetch(ctx, input) if err != nil { + c.observer.FetchFailed(c.source.Name(), err) return nil, err } diff --git a/internal/datasource/in_memory_caching_datasource_test.go b/internal/datasource/in_memory_caching_datasource_test.go index b7da403..1032630 100644 --- a/internal/datasource/in_memory_caching_datasource_test.go +++ b/internal/datasource/in_memory_caching_datasource_test.go @@ -63,6 +63,14 @@ func (m *mockNonCacheableDataSource) Fetch(ctx context.Context, input *service.D }, nil } +// newTestCachingDataSource wraps a source with in-memory caching and a +// NoopObserver. Callers can pass additional options (e.g. WithClock). +func newTestCachingDataSource(t *testing.T, source service.DataSource, opts ...InMemoryCachingDataSourceOption) service.DataSource { + t.Helper() + allOpts := append([]InMemoryCachingDataSourceOption{WithCacheObserver(NoopObserver{})}, opts...) + return NewInMemoryCachingDataSource(source, allOpts...) +} + func TestInMemoryCachingDataSource(t *testing.T) { ctx := context.Background() @@ -72,7 +80,7 @@ func TestInMemoryCachingDataSource(t *testing.T) { ttl: 1 * time.Hour, } - cached := NewInMemoryCachingDataSource(source) + cached := newTestCachingDataSource(t, source) input := &service.DataSourceInput{ Subject: &trust.Result{ @@ -114,7 +122,7 @@ func TestInMemoryCachingDataSource(t *testing.T) { ttl: 50 * time.Millisecond, } - cached := NewInMemoryCachingDataSource(source, WithClock(clk)) + cached := newTestCachingDataSource(t, source, WithClock(clk)) input := &service.DataSourceInput{ Subject: &trust.Result{ @@ -150,7 +158,7 @@ func TestInMemoryCachingDataSource(t *testing.T) { ttl: 1 * time.Hour, } - cached := NewInMemoryCachingDataSource(source) + cached := newTestCachingDataSource(t, source) input1 := &service.DataSourceInput{ Subject: &trust.Result{ @@ -204,7 +212,7 @@ func TestInMemoryCachingDataSource(t *testing.T) { ttl: 50 * time.Millisecond, } - cached := NewInMemoryCachingDataSource(source, WithClock(clk)).(*InMemoryCachingDataSource) + cached := newTestCachingDataSource(t, source, WithClock(clk)).(*InMemoryCachingDataSource) input := &service.DataSourceInput{ Subject: &trust.Result{ diff --git a/internal/datasource/observer.go b/internal/datasource/observer.go new file mode 100644 index 0000000..3051697 --- /dev/null +++ b/internal/datasource/observer.go @@ -0,0 +1,25 @@ +package datasource + +// DataSourceCacheObserver receives cache lifecycle events from InMemoryCachingDataSource. +type DataSourceCacheObserver interface { + // CacheHit is called when a fetch is served from cache. + CacheHit(dataSourceName string) + + // CacheMiss is called when a cache miss triggers a fetch from the underlying source. + CacheMiss(dataSourceName string) + + // CacheExpired is called when a cache entry is found but has expired. + CacheExpired(dataSourceName string) + + // FetchFailed is called when the underlying data source fetch fails on a cache miss. + FetchFailed(dataSourceName string, err error) +} + +// NoopObserver satisfies DataSourceCacheObserver with empty methods. +// Useful in tests that don't care about observer events. +type NoopObserver struct{} + +func (NoopObserver) CacheHit(string) {} +func (NoopObserver) CacheMiss(string) {} +func (NoopObserver) CacheExpired(string) {} +func (NoopObserver) FetchFailed(string, error) {} diff --git a/internal/keys/awskms.go b/internal/keys/awskms.go index 1aa2ed1..4adad64 100644 --- a/internal/keys/awskms.go +++ b/internal/keys/awskms.go @@ -10,7 +10,7 @@ import ( "strings" "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/config" + awsconfig "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/kms" "github.com/aws/aws-sdk-go-v2/service/kms/types" ) @@ -21,6 +21,7 @@ type AWSKMSKeyProvider struct { keyType KeyType algorithm string aliasPrefix string + observer KeyProviderObserver } // AWSKMSConfig configures the AWS KMS key provider @@ -30,9 +31,11 @@ type AWSKMSConfig struct { Region string AliasPrefix string Client *kms.Client + // Observer must be non-nil; use NoopObserver{} in tests. + Observer KeyProviderObserver } -// NewAWSKMSKeyProvider creates a new AWS KMS key provider +// NewAWSKMSKeyProvider creates a new AWS KMS key provider. func NewAWSKMSKeyProvider(ctx context.Context, cfg AWSKMSConfig) (*AWSKMSKeyProvider, error) { if cfg.KeyType == "" { return nil, fmt.Errorf("key_type is required") @@ -54,7 +57,7 @@ func NewAWSKMSKeyProvider(ctx context.Context, cfg AWSKMSConfig) (*AWSKMSKeyProv client = cfg.Client } else { // Load AWS config - awsCfg, err := config.LoadDefaultConfig(ctx, config.WithRegion(cfg.Region)) + awsCfg, err := awsconfig.LoadDefaultConfig(ctx, awsconfig.WithRegion(cfg.Region)) if err != nil { return nil, fmt.Errorf("failed to load AWS config: %w", err) } @@ -74,6 +77,7 @@ func NewAWSKMSKeyProvider(ctx context.Context, cfg AWSKMSConfig) (*AWSKMSKeyProv keyType: cfg.KeyType, algorithm: algorithm, aliasPrefix: cfg.AliasPrefix, + observer: cfg.Observer, }, nil } @@ -138,7 +142,7 @@ func (m *AWSKMSKeyProvider) rotateKey(ctx context.Context, trustDomain, namespac PendingWindowInDays: aws.Int32(7), }) if err != nil { - fmt.Printf("Warning: failed to schedule old key %s for deletion: %v\n", oldKeyID, err) + m.observer.OldKeyDeletionFailed(oldKeyID, err) } } @@ -150,7 +154,7 @@ func (m *AWSKMSKeyProvider) getKeyIDFromAlias(ctx context.Context, aliasName str KeyId: aws.String(aliasName), }) if err != nil { - return "", nil // Assume not found if error + return "", err } return aws.ToString(resp.KeyMetadata.KeyId), nil } diff --git a/internal/keys/dual_slot_signer.go b/internal/keys/dual_slot_signer.go index c6f3f0c..b44e345 100644 --- a/internal/keys/dual_slot_signer.go +++ b/internal/keys/dual_slot_signer.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "io" - "log" "sync" "time" @@ -59,8 +58,9 @@ type DualSlotRotatingSigner struct { activeAlg Algorithm // JWT Algorithm publicKeys []service.PublicKey // All non-expired public keys - clock clock.Clock - ticker clock.Ticker + clock clock.Clock + ticker clock.Ticker + observer KeyRotationObserver } // DualSlotRotatingSignerConfig configures the DualSlotRotatingSigner @@ -78,9 +78,12 @@ type DualSlotRotatingSignerConfig struct { GracePeriod time.Duration CheckInterval time.Duration PrepareTimeout time.Duration // How long to wait before retrying a stuck "preparing" state (default: 1 minute) + + // Observer must be non-nil; use NoopObserver{} in tests. + Observer KeyRotationObserver } -// NewDualSlotRotatingSigner creates a new dual-slot rotating signer +// NewDualSlotRotatingSigner creates a new dual-slot rotating signer. func NewDualSlotRotatingSigner(cfg DualSlotRotatingSignerConfig) *DualSlotRotatingSigner { clk := cfg.Clock if clk == nil { @@ -124,6 +127,7 @@ func NewDualSlotRotatingSigner(cfg DualSlotRotatingSignerConfig) *DualSlotRotati checkInterval: checkInterval, prepareTimeout: prepareTimeout, clock: clk, + observer: cfg.Observer, } } @@ -166,11 +170,10 @@ func (r *DualSlotRotatingSigner) Stop() { // doRotationCheck is called periodically by the ticker to check for rotation needs func (r *DualSlotRotatingSigner) doRotationCheck(ctx context.Context) { if err := r.checkAndRotate(ctx); err != nil { - log.Printf("Error during key rotation check: %v", err) + r.observer.RotationCheckFailed(err) } - // Update active key cache after each check (whether rotation happened or not) if err := r.updateActiveKeyCache(ctx); err != nil { - log.Printf("Error updating active key cache: %v", err) + r.observer.ActiveKeyCacheUpdateFailed(err) } } @@ -357,14 +360,14 @@ func (r *DualSlotRotatingSigner) checkAndRotate(ctx context.Context) error { _, err = r.slotStore.SaveSlot(ctx, targetSlot, storeVersion) if errors.Is(err, ErrVersionMismatch) { - log.Printf("Another process completed rotation for slot %s, skipping", targetSlot.Position) + r.observer.RotationSkippedVersionRace(string(targetSlot.Position)) return nil } if err != nil { return fmt.Errorf("failed to save slot: %w", err) } - log.Printf("Completed rotation for slot %s", targetSlot.Position) + r.observer.RotationCompleted(string(targetSlot.Position)) return nil } @@ -502,26 +505,26 @@ func (r *DualSlotRotatingSigner) updateActiveKeyCache(ctx context.Context) error // Get the KeyProvider that created this key provider, ok := r.keyProviderRegistry[slot.KeyProviderID] if !ok { - log.Printf("Warning: key provider %s not found for slot %s, skipping", slot.KeyProviderID, slot.Position) + r.observer.KeyProviderNotFound(slot.KeyProviderID, string(slot.Position)) continue } keyName := r.keyName(slot.Position) handle, err := provider.GetKeyHandle(ctx, r.trustDomain, r.namespace, keyName) if err != nil { - log.Printf("Warning: failed to get handle %s from key provider: %v", slot.Position, err) + r.observer.KeyHandleFailed(string(slot.Position), err) continue } pubKey, err := handle.Public(ctx) if err != nil { - log.Printf("Warning: failed to get public key for %s: %v", slot.Position, err) + r.observer.PublicKeyFailed(string(slot.Position), err) continue } thumbprintStr, err := ComputeThumbprint(pubKey) if err != nil { - log.Printf("Warning: failed to compute thumbprint for key %s: %v", slot.Position, err) + r.observer.ThumbprintFailed(string(slot.Position), err) continue } thumbprint := KeyID(thumbprintStr) @@ -529,7 +532,7 @@ func (r *DualSlotRotatingSigner) updateActiveKeyCache(ctx context.Context) error _, algStr, err := handle.Metadata(ctx) if err != nil { - log.Printf("Warning: failed to get metadata for %s: %v", slot.Position, err) + r.observer.MetadataFailed(string(slot.Position), err) continue } alg := Algorithm(algStr) diff --git a/internal/keys/dual_slot_signer_test.go b/internal/keys/dual_slot_signer_test.go index c3bbf46..d388545 100644 --- a/internal/keys/dual_slot_signer_test.go +++ b/internal/keys/dual_slot_signer_test.go @@ -68,17 +68,17 @@ func newTestDualSlotRotatingSigner(t *testing.T, clk clock.Clock, slotStore KeyS // Create rotating signer with short timings for testing rs := NewDualSlotRotatingSigner(DualSlotRotatingSignerConfig{ - Namespace: testTokenType, // Test namespace + Namespace: testTokenType, KeyProviderID: "test-provider", KeyProviderRegistry: kpRegistry, SlotStore: slotStore, Clock: clk, - // Short timings for faster tests - KeyTTL: 30 * time.Minute, // Longer to avoid premature expiration - RotationThreshold: 8 * time.Minute, // Rotate when 8m remaining - GracePeriod: 2 * time.Minute, - CheckInterval: 10 * time.Second, - PrepareTimeout: 1 * time.Minute, + KeyTTL: 30 * time.Minute, + RotationThreshold: 8 * time.Minute, + GracePeriod: 2 * time.Minute, + CheckInterval: 10 * time.Second, + PrepareTimeout: 1 * time.Minute, + Observer: NoopObserver{}, }) return rs, keyProvider @@ -631,6 +631,7 @@ func TestDualSlotRotatingSigner_Namespacing(t *testing.T) { SlotStore: slotStore, Clock: clk, PrepareTimeout: 1 * time.Minute, + Observer: NoopObserver{}, }) ctx := context.Background() diff --git a/internal/keys/observer.go b/internal/keys/observer.go new file mode 100644 index 0000000..3e04e36 --- /dev/null +++ b/internal/keys/observer.go @@ -0,0 +1,34 @@ +package keys + +// KeyRotationObserver receives key rotation lifecycle events from DualSlotRotatingSigner. +type KeyRotationObserver interface { + RotationCheckFailed(err error) + ActiveKeyCacheUpdateFailed(err error) + RotationCompleted(slot string) + RotationSkippedVersionRace(slot string) + KeyProviderNotFound(provider, slot string) + KeyHandleFailed(slot string, err error) + PublicKeyFailed(slot string, err error) + ThumbprintFailed(slot string, err error) + MetadataFailed(slot string, err error) +} + +// KeyProviderObserver receives key provider lifecycle events from AWSKMSKeyProvider. +type KeyProviderObserver interface { + OldKeyDeletionFailed(keyID string, err error) +} + +// NoopObserver satisfies both KeyRotationObserver and KeyProviderObserver +// with empty methods. Useful in tests that don't care about observer events. +type NoopObserver struct{} + +func (NoopObserver) RotationCheckFailed(error) {} +func (NoopObserver) ActiveKeyCacheUpdateFailed(error) {} +func (NoopObserver) RotationCompleted(string) {} +func (NoopObserver) RotationSkippedVersionRace(string) {} +func (NoopObserver) KeyProviderNotFound(string, string) {} +func (NoopObserver) KeyHandleFailed(string, error) {} +func (NoopObserver) PublicKeyFailed(string, error) {} +func (NoopObserver) ThumbprintFailed(string, error) {} +func (NoopObserver) MetadataFailed(string, error) {} +func (NoopObserver) OldKeyDeletionFailed(string, error) {} diff --git a/internal/probe/logging.go b/internal/probe/logging.go index 9797037..28321eb 100644 --- a/internal/probe/logging.go +++ b/internal/probe/logging.go @@ -2,133 +2,149 @@ package probe import ( "context" - "log/slog" + + "github.com/rs/zerolog" "github.com/project-kessel/parsec/internal/request" "github.com/project-kessel/parsec/internal/service" "github.com/project-kessel/parsec/internal/trust" ) -// loggingObserver creates request-scoped logging probes +// loggingObserver creates request-scoped logging probes. +// Each event type has a pre-built sub-logger with its event name and +// per-event log level baked in at construction time. type loggingObserver struct { service.NoOpApplicationObserver - logger *slog.Logger + tokenIssuanceLogger zerolog.Logger + tokenExchangeLogger zerolog.Logger + authzCheckLogger zerolog.Logger } -// LoggingObserverConfig configures the logging observer +// LoggingObserverConfig configures the logging observer. +// Each field is a pre-configured zerolog.Logger for one event type, +// with the "event" field and per-event level already applied. type LoggingObserverConfig struct { - // Logger is the base logger to use. If nil, uses slog.Default() - Logger *slog.Logger + TokenIssuanceLogger zerolog.Logger + TokenExchangeLogger zerolog.Logger + AuthzCheckLogger zerolog.Logger } // NewLoggingObserver creates an application observer that logs all observability events -// using structured logging with slog. -func NewLoggingObserver(logger *slog.Logger) service.ApplicationObserver { - if logger == nil { - logger = slog.Default() - } +// using structured logging with zerolog. All events inherit the base logger's level. +func NewLoggingObserver(logger zerolog.Logger) service.ApplicationObserver { return NewLoggingObserverWithConfig(LoggingObserverConfig{ - Logger: logger, + TokenIssuanceLogger: logger.With().Str("event", "token_issuance").Logger(), + TokenExchangeLogger: logger.With().Str("event", "token_exchange").Logger(), + AuthzCheckLogger: logger.With().Str("event", "authz_check").Logger(), }) } -// NewLoggingObserverWithConfig creates a logging observer with custom configuration +// NewLoggingObserverWithConfig creates a logging observer with pre-configured per-event loggers. func NewLoggingObserverWithConfig(cfg LoggingObserverConfig) service.ApplicationObserver { - logger := cfg.Logger - if logger == nil { - logger = slog.Default() - } - return &loggingObserver{ - logger: logger, + tokenIssuanceLogger: cfg.TokenIssuanceLogger, + tokenExchangeLogger: cfg.TokenExchangeLogger, + authzCheckLogger: cfg.AuthzCheckLogger, } } -func (o *loggingObserver) TokenIssuanceStarted( - ctx context.Context, +func tokenIssuanceRequestLogger( + base zerolog.Logger, subject *trust.Result, actor *trust.Result, scope string, tokenTypes []service.TokenType, -) (context.Context, service.TokenIssuanceProbe) { - // Create scoped logger for this probe type - probeLogger := o.logger.With("event", "token_issuance") - - attrs := []slog.Attr{ - slog.String("scope", scope), - slog.Any("token_types", tokenTypes), - } +) zerolog.Logger { + loggerCtx := base.With(). + Str("scope", scope). + Interface("token_types", tokenTypes) if subject != nil { - attrs = append(attrs, - slog.String("subject_id", subject.Subject), - slog.String("subject_trust_domain", subject.TrustDomain), - ) + loggerCtx = loggerCtx. + Str("subject_id", subject.Subject). + Str("subject_trust_domain", subject.TrustDomain) } if actor != nil { - attrs = append(attrs, - slog.String("actor_id", actor.Subject), - slog.String("actor_trust_domain", actor.TrustDomain), - ) + loggerCtx = loggerCtx. + Str("actor_id", actor.Subject). + Str("actor_trust_domain", actor.TrustDomain) } - probeLogger.LogAttrs(ctx, slog.LevelDebug, "Starting token issuance", attrs...) + return loggerCtx.Logger() +} + +func tokenExchangeRequestLogger( + base zerolog.Logger, + grantType string, + requestedTokenType string, + audience string, + scope string, +) zerolog.Logger { + return base.With(). + Str("grant_type", grantType). + Str("requested_token_type", requestedTokenType). + Str("audience", audience). + Str("scope", scope). + Logger() +} + +func (o *loggingObserver) TokenIssuanceStarted( + ctx context.Context, + subject *trust.Result, + actor *trust.Result, + scope string, + tokenTypes []service.TokenType, +) (context.Context, service.TokenIssuanceProbe) { + requestLogger := tokenIssuanceRequestLogger(o.tokenIssuanceLogger, subject, actor, scope, tokenTypes) + requestLogger.Debug().Msg("Starting token issuance") - // Return a request-scoped probe that captures the context return ctx, &loggingTokenIssuanceProbe{ - ctx: ctx, - logger: probeLogger, + logger: requestLogger, } } // loggingTokenIssuanceProbe is a request-scoped probe that logs events for a single token issuance type loggingTokenIssuanceProbe struct { service.NoOpTokenIssuanceProbe - ctx context.Context - logger *slog.Logger + logger zerolog.Logger } func (p *loggingTokenIssuanceProbe) TokenTypeIssuanceStarted(tokenType service.TokenType) { - p.logger.LogAttrs(p.ctx, slog.LevelDebug, - "Issuing token", - slog.String("token_type", string(tokenType)), - ) + p.logger.Debug(). + Str("token_type", string(tokenType)). + Msg("Issuing token") } func (p *loggingTokenIssuanceProbe) TokenTypeIssuanceSucceeded(tokenType service.TokenType, token *service.Token) { - attrs := []slog.Attr{ - slog.String("token_type", string(tokenType)), - } + event := p.logger.Debug(). + Str("token_type", string(tokenType)) if token != nil { - attrs = append(attrs, - slog.Time("issued_at", token.IssuedAt), - slog.Time("expires_at", token.ExpiresAt), - ) + event = event. + Time("issued_at", token.IssuedAt). + Time("expires_at", token.ExpiresAt) } - p.logger.LogAttrs(p.ctx, slog.LevelDebug, "Token issued successfully", attrs...) + event.Msg("Token issued successfully") } func (p *loggingTokenIssuanceProbe) TokenTypeIssuanceFailed(tokenType service.TokenType, err error) { - p.logger.LogAttrs(p.ctx, slog.LevelError, - "Token issuance failed", - slog.String("token_type", string(tokenType)), - slog.String("error", err.Error()), - ) + p.logger.Error(). + Str("token_type", string(tokenType)). + Err(err). + Msg("Token issuance failed") } func (p *loggingTokenIssuanceProbe) IssuerNotFound(tokenType service.TokenType, err error) { - p.logger.LogAttrs(p.ctx, slog.LevelError, - "No issuer found for token type", - slog.String("token_type", string(tokenType)), - slog.String("error", err.Error()), - ) + p.logger.Error(). + Str("token_type", string(tokenType)). + Err(err). + Msg("No issuer found for token type") } func (p *loggingTokenIssuanceProbe) End() { - p.logger.LogAttrs(p.ctx, slog.LevelDebug, "Token issuance completed") + p.logger.Debug().Msg("Token issuance completed") } // TokenExchangeStarted implements service.TokenExchangeObserver @@ -139,164 +155,140 @@ func (o *loggingObserver) TokenExchangeStarted( audience string, scope string, ) (context.Context, service.TokenExchangeProbe) { - // Create scoped logger for this probe type - probeLogger := o.logger.With("event", "token_exchange") - - probeLogger.LogAttrs(ctx, slog.LevelDebug, - "Starting token exchange", - slog.String("grant_type", grantType), - slog.String("requested_token_type", requestedTokenType), - slog.String("audience", audience), - slog.String("scope", scope), - ) + requestLogger := tokenExchangeRequestLogger(o.tokenExchangeLogger, grantType, requestedTokenType, audience, scope) + requestLogger.Debug().Msg("Starting token exchange") return ctx, &loggingTokenExchangeProbe{ - ctx: ctx, - logger: probeLogger, + logger: requestLogger, } } // loggingTokenExchangeProbe is a request-scoped probe that logs token exchange events type loggingTokenExchangeProbe struct { service.NoOpTokenExchangeProbe - ctx context.Context - logger *slog.Logger + logger zerolog.Logger } func (p *loggingTokenExchangeProbe) ActorValidationSucceeded(actor *trust.Result) { - attrs := []slog.Attr{} + event := p.logger.Debug() if actor != nil { - attrs = append(attrs, - slog.String("actor_id", actor.Subject), - slog.String("actor_trust_domain", actor.TrustDomain), - ) + event = event. + Str("actor_id", actor.Subject). + Str("actor_trust_domain", actor.TrustDomain) } - p.logger.LogAttrs(p.ctx, slog.LevelDebug, "Actor validation succeeded", attrs...) + event.Msg("Actor validation succeeded") } func (p *loggingTokenExchangeProbe) ActorValidationFailed(err error) { - p.logger.LogAttrs(p.ctx, slog.LevelError, - "Actor validation failed", - slog.String("error", err.Error()), - ) + p.logger.Error(). + Err(err). + Msg("Actor validation failed") } func (p *loggingTokenExchangeProbe) RequestContextParsed(attrs *request.RequestAttributes) { - p.logger.LogAttrs(p.ctx, slog.LevelDebug, "Request context parsed") + p.logger.Debug().Msg("Request context parsed") } func (p *loggingTokenExchangeProbe) RequestContextParseFailed(err error) { - p.logger.LogAttrs(p.ctx, slog.LevelError, - "Request context parse failed", - slog.String("error", err.Error()), - ) + p.logger.Error(). + Err(err). + Msg("Request context parse failed") } func (p *loggingTokenExchangeProbe) SubjectTokenValidationSucceeded(subject *trust.Result) { - attrs := []slog.Attr{} + event := p.logger.Debug() if subject != nil { - attrs = append(attrs, - slog.String("subject_id", subject.Subject), - slog.String("subject_trust_domain", subject.TrustDomain), - ) + event = event. + Str("subject_id", subject.Subject). + Str("subject_trust_domain", subject.TrustDomain) } - p.logger.LogAttrs(p.ctx, slog.LevelDebug, "Subject token validation succeeded", attrs...) + event.Msg("Subject token validation succeeded") } func (p *loggingTokenExchangeProbe) SubjectTokenValidationFailed(err error) { - p.logger.LogAttrs(p.ctx, slog.LevelError, - "Subject token validation failed", - slog.String("error", err.Error()), - ) + p.logger.Error(). + Err(err). + Msg("Subject token validation failed") } func (p *loggingTokenExchangeProbe) End() { - p.logger.LogAttrs(p.ctx, slog.LevelDebug, "Token exchange completed") + p.logger.Debug().Msg("Token exchange completed") } // AuthzCheckStarted implements service.AuthzCheckObserver func (o *loggingObserver) AuthzCheckStarted( ctx context.Context, ) (context.Context, service.AuthzCheckProbe) { - // Create scoped logger for this probe type - probeLogger := o.logger.With("event", "authz_check") - - probeLogger.LogAttrs(ctx, slog.LevelDebug, "Starting authorization check") + // AuthzCheckStarted receives no request-scoped parameters (unlike + // token issuance/exchange), so we use the base event logger directly. + requestLogger := o.authzCheckLogger + requestLogger.Debug().Msg("Starting authorization check") return ctx, &loggingAuthzCheckProbe{ - ctx: ctx, - logger: probeLogger, + logger: requestLogger, } } // loggingAuthzCheckProbe is a request-scoped probe that logs authorization check events type loggingAuthzCheckProbe struct { service.NoOpAuthzCheckProbe - ctx context.Context - logger *slog.Logger + logger zerolog.Logger } func (p *loggingAuthzCheckProbe) RequestAttributesParsed(attrs *request.RequestAttributes) { - logAttrs := []slog.Attr{} + event := p.logger.Debug() if attrs != nil { - logAttrs = append(logAttrs, - slog.String("method", attrs.Method), - slog.String("path", attrs.Path), - ) + event = event. + Str("method", attrs.Method). + Str("path", attrs.Path) } - p.logger.LogAttrs(p.ctx, slog.LevelDebug, "Request attributes parsed", logAttrs...) + event.Msg("Request attributes parsed") } func (p *loggingAuthzCheckProbe) ActorValidationSucceeded(actor *trust.Result) { - attrs := []slog.Attr{} + event := p.logger.Debug() if actor != nil { - attrs = append(attrs, - slog.String("actor_id", actor.Subject), - slog.String("actor_trust_domain", actor.TrustDomain), - ) + event = event. + Str("actor_id", actor.Subject). + Str("actor_trust_domain", actor.TrustDomain) } - p.logger.LogAttrs(p.ctx, slog.LevelDebug, "Actor validation succeeded", attrs...) + event.Msg("Actor validation succeeded") } func (p *loggingAuthzCheckProbe) ActorValidationFailed(err error) { - p.logger.LogAttrs(p.ctx, slog.LevelError, - "Actor validation failed", - slog.String("error", err.Error()), - ) + p.logger.Error(). + Err(err). + Msg("Actor validation failed") } func (p *loggingAuthzCheckProbe) SubjectCredentialExtracted(cred trust.Credential, headersUsed []string) { - p.logger.LogAttrs(p.ctx, slog.LevelDebug, - "Subject credential extracted", - slog.String("credential_type", string(cred.Type())), - ) + p.logger.Debug(). + Str("credential_type", string(cred.Type())). + Msg("Subject credential extracted") } func (p *loggingAuthzCheckProbe) SubjectCredentialExtractionFailed(err error) { - p.logger.LogAttrs(p.ctx, slog.LevelError, - "Subject credential extraction failed", - slog.String("error", err.Error()), - ) + p.logger.Error(). + Err(err). + Msg("Subject credential extraction failed") } func (p *loggingAuthzCheckProbe) SubjectValidationSucceeded(subject *trust.Result) { - attrs := []slog.Attr{} + event := p.logger.Debug() if subject != nil { - attrs = append(attrs, - slog.String("subject_id", subject.Subject), - slog.String("subject_trust_domain", subject.TrustDomain), - ) + event = event. + Str("subject_id", subject.Subject). + Str("subject_trust_domain", subject.TrustDomain) } - p.logger.LogAttrs(p.ctx, slog.LevelDebug, "Subject validation succeeded", attrs...) + event.Msg("Subject validation succeeded") } func (p *loggingAuthzCheckProbe) SubjectValidationFailed(err error) { - p.logger.LogAttrs(p.ctx, slog.LevelError, - "Subject validation failed", - slog.String("error", err.Error()), - ) + p.logger.Error(). + Err(err). + Msg("Subject validation failed") } func (p *loggingAuthzCheckProbe) End() { - p.logger.LogAttrs(p.ctx, slog.LevelDebug, "Authorization check completed") + p.logger.Debug().Msg("Authorization check completed") } diff --git a/internal/probe/logging_config.go b/internal/probe/logging_config.go new file mode 100644 index 0000000..39f4916 --- /dev/null +++ b/internal/probe/logging_config.go @@ -0,0 +1,20 @@ +package probe + +import ( + "github.com/rs/zerolog" +) + +// LoggingConfigReloadObserver logs configuration reload events via zerolog. +// It satisfies config.ConfigReloadObserver via structural typing to avoid +// a circular import (config already imports probe). +type LoggingConfigReloadObserver struct { + logger zerolog.Logger +} + +func NewLoggingConfigReloadObserver(logger zerolog.Logger) *LoggingConfigReloadObserver { + return &LoggingConfigReloadObserver{logger: logger} +} + +func (o *LoggingConfigReloadObserver) ConfigReloadFailed(step string, err error) { + o.logger.Error().Err(err).Str("step", step).Msg("config reload failed") +} diff --git a/internal/probe/logging_datasource.go b/internal/probe/logging_datasource.go new file mode 100644 index 0000000..9fcf744 --- /dev/null +++ b/internal/probe/logging_datasource.go @@ -0,0 +1,35 @@ +package probe + +import ( + "github.com/rs/zerolog" + + "github.com/project-kessel/parsec/internal/datasource" +) + +// Compile-time interface check. +var _ datasource.DataSourceCacheObserver = (*LoggingDataSourceCacheObserver)(nil) + +// LoggingDataSourceCacheObserver logs data source cache events via zerolog. +type LoggingDataSourceCacheObserver struct { + logger zerolog.Logger +} + +func NewLoggingDataSourceCacheObserver(logger zerolog.Logger) *LoggingDataSourceCacheObserver { + return &LoggingDataSourceCacheObserver{logger: logger} +} + +func (o *LoggingDataSourceCacheObserver) CacheHit(dataSourceName string) { + o.logger.Debug().Str("datasource", dataSourceName).Msg("cache hit") +} + +func (o *LoggingDataSourceCacheObserver) CacheMiss(dataSourceName string) { + o.logger.Debug().Str("datasource", dataSourceName).Msg("cache miss") +} + +func (o *LoggingDataSourceCacheObserver) CacheExpired(dataSourceName string) { + o.logger.Debug().Str("datasource", dataSourceName).Msg("cache entry expired") +} + +func (o *LoggingDataSourceCacheObserver) FetchFailed(dataSourceName string, err error) { + o.logger.Warn().Err(err).Str("datasource", dataSourceName).Msg("data source fetch failed") +} diff --git a/internal/probe/logging_infra_test.go b/internal/probe/logging_infra_test.go new file mode 100644 index 0000000..a98e0e0 --- /dev/null +++ b/internal/probe/logging_infra_test.go @@ -0,0 +1,245 @@ +package probe + +import ( + "bytes" + "errors" + "fmt" + "testing" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + + "github.com/project-kessel/parsec/internal/trust" +) + +func testLogger(buf *bytes.Buffer) zerolog.Logger { + return zerolog.New(buf).Level(zerolog.DebugLevel) +} + +// assertLog verifies the log output contains the expected level, message, +// and every additional field string (e.g. `"key":"value"`). +func assertLog(t *testing.T, out, level, msg string, fields ...string) { + t.Helper() + assert.Contains(t, out, fmt.Sprintf(`"level":"%s"`, level)) + assert.Contains(t, out, msg) + for _, f := range fields { + assert.Contains(t, out, f) + } +} + +// --- ConfigReload --- + +func TestLoggingConfigReloadObserver_ConfigReloadFailed(t *testing.T) { + var buf bytes.Buffer + obs := NewLoggingConfigReloadObserver(testLogger(&buf)) + + obs.ConfigReloadFailed("unmarshal", errors.New("bad yaml")) + + assertLog(t, buf.String(), "error", "config reload failed", + `"step":"unmarshal"`, `"error":"bad yaml"`) +} + +// --- DataSourceCache --- + +func TestLoggingDataSourceCacheObserver_DebugEvents(t *testing.T) { + tests := []struct { + name string + call func(*LoggingDataSourceCacheObserver) + msg string + }{ + {"CacheHit", func(o *LoggingDataSourceCacheObserver) { o.CacheHit("ds") }, "cache hit"}, + {"CacheMiss", func(o *LoggingDataSourceCacheObserver) { o.CacheMiss("ds") }, "cache miss"}, + {"CacheExpired", func(o *LoggingDataSourceCacheObserver) { o.CacheExpired("ds") }, "cache entry expired"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var buf bytes.Buffer + obs := NewLoggingDataSourceCacheObserver(testLogger(&buf)) + tt.call(obs) + assertLog(t, buf.String(), "debug", tt.msg, `"datasource":"ds"`) + }) + } +} + +func TestLoggingDataSourceCacheObserver_FetchFailed(t *testing.T) { + var buf bytes.Buffer + obs := NewLoggingDataSourceCacheObserver(testLogger(&buf)) + + obs.FetchFailed("my_ds", errors.New("timeout")) + + assertLog(t, buf.String(), "warn", "data source fetch failed", + `"datasource":"my_ds"`, `"error":"timeout"`) +} + +// --- KeyRotation --- + +func TestLoggingKeyRotationObserver_RotationCheckFailed(t *testing.T) { + var buf bytes.Buffer + obs := NewLoggingKeyRotationObserver(testLogger(&buf)) + + obs.RotationCheckFailed(errors.New("slot locked")) + + assertLog(t, buf.String(), "error", "key rotation check failed", + `"error":"slot locked"`) +} + +func TestLoggingKeyRotationObserver_InfoEvents(t *testing.T) { + tests := []struct { + name string + call func(*LoggingKeyRotationObserver) + msg string + slot string + }{ + {"RotationCompleted", func(o *LoggingKeyRotationObserver) { o.RotationCompleted("primary") }, + "key rotation completed", "primary"}, + {"RotationSkippedVersionRace", func(o *LoggingKeyRotationObserver) { o.RotationSkippedVersionRace("secondary") }, + "another process completed rotation, skipping", "secondary"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var buf bytes.Buffer + obs := NewLoggingKeyRotationObserver(testLogger(&buf)) + tt.call(obs) + assertLog(t, buf.String(), "info", tt.msg, + fmt.Sprintf(`"slot":"%s"`, tt.slot)) + }) + } +} + +func TestLoggingKeyRotationObserver_KeyProviderNotFound(t *testing.T) { + var buf bytes.Buffer + obs := NewLoggingKeyRotationObserver(testLogger(&buf)) + + obs.KeyProviderNotFound("aws_kms", "primary") + + assertLog(t, buf.String(), "warn", "key provider not found, skipping", + `"provider":"aws_kms"`, `"slot":"primary"`) +} + +func TestLoggingKeyRotationObserver_WarningMethods(t *testing.T) { + tests := []struct { + name string + call func(*LoggingKeyRotationObserver) + msg string + }{ + {"KeyHandleFailed", func(o *LoggingKeyRotationObserver) { o.KeyHandleFailed("s1", errors.New("e")) }, "failed to get key handle"}, + {"PublicKeyFailed", func(o *LoggingKeyRotationObserver) { o.PublicKeyFailed("s1", errors.New("e")) }, "failed to get public key"}, + {"ThumbprintFailed", func(o *LoggingKeyRotationObserver) { o.ThumbprintFailed("s1", errors.New("e")) }, "failed to compute thumbprint"}, + {"MetadataFailed", func(o *LoggingKeyRotationObserver) { o.MetadataFailed("s1", errors.New("e")) }, "failed to get key metadata"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var buf bytes.Buffer + obs := NewLoggingKeyRotationObserver(testLogger(&buf)) + tt.call(obs) + assertLog(t, buf.String(), "warn", tt.msg, `"slot":"s1"`) + }) + } +} + +// --- KeyProvider --- + +func TestLoggingKeyProviderObserver_OldKeyDeletionFailed(t *testing.T) { + var buf bytes.Buffer + obs := NewLoggingKeyProviderObserver(testLogger(&buf)) + + obs.OldKeyDeletionFailed("key-123", errors.New("access denied")) + + assertLog(t, buf.String(), "warn", "failed to schedule old key for deletion", + `"key_id":"key-123"`, `"error":"access denied"`) +} + +// --- TrustValidation --- + +func TestLoggingTrustValidationObserver_ValidatorFailed(t *testing.T) { + var buf bytes.Buffer + obs := NewLoggingTrustValidationObserver(testLogger(&buf)) + + obs.ValidatorFailed("oidc_v1", trust.CredentialTypeJWT, errors.New("expired")) + + assertLog(t, buf.String(), "debug", "validator rejected credential", + `"validator":"oidc_v1"`, `"credential_type":"jwt"`) +} + +func TestLoggingTrustValidationObserver_AllValidatorsFailed(t *testing.T) { + var buf bytes.Buffer + obs := NewLoggingTrustValidationObserver(testLogger(&buf)) + + obs.AllValidatorsFailed(trust.CredentialTypeBearer, 3, errors.New("no match")) + + assertLog(t, buf.String(), "warn", "all validators failed for credential type", + `"credential_type":"bearer"`, `"attempted":3`) +} + +func TestLoggingTrustValidationObserver_ValidatorFiltered(t *testing.T) { + var buf bytes.Buffer + obs := NewLoggingTrustValidationObserver(testLogger(&buf)) + + obs.ValidatorFiltered("v1", "actor-xyz") + + assertLog(t, buf.String(), "debug", "validator filtered out for actor", + `"validator":"v1"`, `"actor":"actor-xyz"`) +} + +func TestLoggingTrustValidationObserver_FilterEvaluationFailed(t *testing.T) { + var buf bytes.Buffer + obs := NewLoggingTrustValidationObserver(testLogger(&buf)) + + obs.FilterEvaluationFailed("v2", errors.New("cel error")) + + assertLog(t, buf.String(), "error", "filter evaluation failed", + `"validator":"v2"`, `"error":"cel error"`) +} + +// --- JWKS --- + +func TestLoggingJWKSObserver(t *testing.T) { + tests := []struct { + name string + call func(*LoggingJWKSObserver) + msg string + fields []string + }{ + {"InitialCachePopulationFailed", + func(o *LoggingJWKSObserver) { o.InitialCachePopulationFailed(errors.New("no issuers")) }, + "initial cache population failed, will retry", + []string{`"error":"no issuers"`}}, + {"CacheRefreshFailed", + func(o *LoggingJWKSObserver) { o.CacheRefreshFailed(errors.New("network")) }, + "background cache refresh failed", + []string{`"error":"network"`}}, + {"KeyConversionFailed", + func(o *LoggingJWKSObserver) { o.KeyConversionFailed("kid-1", errors.New("unsupported alg")) }, + "skipping key: conversion failed", + []string{`"key_id":"kid-1"`, `"error":"unsupported alg"`}}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var buf bytes.Buffer + obs := NewLoggingJWKSObserver(testLogger(&buf)) + tt.call(obs) + assertLog(t, buf.String(), "warn", tt.msg, tt.fields...) + }) + } +} + +// --- ServerLifecycle --- + +func TestLoggingServerLifecycleObserver(t *testing.T) { + tests := []struct { + name string + call func(*LoggingServerLifecycleObserver) + msg string + }{ + {"GRPCServeFailed", func(o *LoggingServerLifecycleObserver) { o.GRPCServeFailed(errors.New("bind error")) }, "gRPC server error"}, + {"HTTPServeFailed", func(o *LoggingServerLifecycleObserver) { o.HTTPServeFailed(errors.New("port in use")) }, "HTTP server error"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var buf bytes.Buffer + obs := NewLoggingServerLifecycleObserver(testLogger(&buf)) + tt.call(obs) + assertLog(t, buf.String(), "error", tt.msg) + }) + } +} diff --git a/internal/probe/logging_jwks.go b/internal/probe/logging_jwks.go new file mode 100644 index 0000000..a94ab3e --- /dev/null +++ b/internal/probe/logging_jwks.go @@ -0,0 +1,30 @@ +package probe + +import ( + "github.com/rs/zerolog" + + "github.com/project-kessel/parsec/internal/server" +) + +var _ server.JWKSObserver = (*LoggingJWKSObserver)(nil) + +// LoggingJWKSObserver logs JWKS cache lifecycle events via zerolog. +type LoggingJWKSObserver struct { + logger zerolog.Logger +} + +func NewLoggingJWKSObserver(logger zerolog.Logger) *LoggingJWKSObserver { + return &LoggingJWKSObserver{logger: logger} +} + +func (o *LoggingJWKSObserver) InitialCachePopulationFailed(err error) { + o.logger.Warn().Err(err).Msg("initial cache population failed, will retry") +} + +func (o *LoggingJWKSObserver) CacheRefreshFailed(err error) { + o.logger.Warn().Err(err).Msg("background cache refresh failed") +} + +func (o *LoggingJWKSObserver) KeyConversionFailed(keyID string, err error) { + o.logger.Warn().Err(err).Str("key_id", keyID).Msg("skipping key: conversion failed") +} diff --git a/internal/probe/logging_keys.go b/internal/probe/logging_keys.go new file mode 100644 index 0000000..b4523c2 --- /dev/null +++ b/internal/probe/logging_keys.go @@ -0,0 +1,71 @@ +package probe + +import ( + "github.com/rs/zerolog" + + "github.com/project-kessel/parsec/internal/keys" +) + +// Compile-time interface checks. +var ( + _ keys.KeyRotationObserver = (*LoggingKeyRotationObserver)(nil) + _ keys.KeyProviderObserver = (*LoggingKeyProviderObserver)(nil) +) + +// LoggingKeyRotationObserver logs key rotation lifecycle events via zerolog. +type LoggingKeyRotationObserver struct { + logger zerolog.Logger +} + +func NewLoggingKeyRotationObserver(logger zerolog.Logger) *LoggingKeyRotationObserver { + return &LoggingKeyRotationObserver{logger: logger} +} + +func (o *LoggingKeyRotationObserver) RotationCheckFailed(err error) { + o.logger.Error().Err(err).Msg("key rotation check failed") +} + +func (o *LoggingKeyRotationObserver) ActiveKeyCacheUpdateFailed(err error) { + o.logger.Error().Err(err).Msg("active key cache update failed") +} + +func (o *LoggingKeyRotationObserver) RotationCompleted(slot string) { + o.logger.Info().Str("slot", slot).Msg("key rotation completed") +} + +func (o *LoggingKeyRotationObserver) RotationSkippedVersionRace(slot string) { + o.logger.Info().Str("slot", slot).Msg("another process completed rotation, skipping") +} + +func (o *LoggingKeyRotationObserver) KeyProviderNotFound(provider, slot string) { + o.logger.Warn().Str("provider", provider).Str("slot", slot).Msg("key provider not found, skipping") +} + +func (o *LoggingKeyRotationObserver) KeyHandleFailed(slot string, err error) { + o.logger.Warn().Err(err).Str("slot", slot).Msg("failed to get key handle") +} + +func (o *LoggingKeyRotationObserver) PublicKeyFailed(slot string, err error) { + o.logger.Warn().Err(err).Str("slot", slot).Msg("failed to get public key") +} + +func (o *LoggingKeyRotationObserver) ThumbprintFailed(slot string, err error) { + o.logger.Warn().Err(err).Str("slot", slot).Msg("failed to compute thumbprint") +} + +func (o *LoggingKeyRotationObserver) MetadataFailed(slot string, err error) { + o.logger.Warn().Err(err).Str("slot", slot).Msg("failed to get key metadata") +} + +// LoggingKeyProviderObserver logs key provider lifecycle events via zerolog. +type LoggingKeyProviderObserver struct { + logger zerolog.Logger +} + +func NewLoggingKeyProviderObserver(logger zerolog.Logger) *LoggingKeyProviderObserver { + return &LoggingKeyProviderObserver{logger: logger} +} + +func (o *LoggingKeyProviderObserver) OldKeyDeletionFailed(keyID string, err error) { + o.logger.Warn().Err(err).Str("key_id", keyID).Msg("failed to schedule old key for deletion") +} diff --git a/internal/probe/logging_server.go b/internal/probe/logging_server.go new file mode 100644 index 0000000..c2dc1e6 --- /dev/null +++ b/internal/probe/logging_server.go @@ -0,0 +1,26 @@ +package probe + +import ( + "github.com/rs/zerolog" + + "github.com/project-kessel/parsec/internal/server" +) + +var _ server.ServerLifecycleObserver = (*LoggingServerLifecycleObserver)(nil) + +// LoggingServerLifecycleObserver logs server lifecycle events via zerolog. +type LoggingServerLifecycleObserver struct { + logger zerolog.Logger +} + +func NewLoggingServerLifecycleObserver(logger zerolog.Logger) *LoggingServerLifecycleObserver { + return &LoggingServerLifecycleObserver{logger: logger} +} + +func (o *LoggingServerLifecycleObserver) GRPCServeFailed(err error) { + o.logger.Error().Err(err).Msg("gRPC server error") +} + +func (o *LoggingServerLifecycleObserver) HTTPServeFailed(err error) { + o.logger.Error().Err(err).Msg("HTTP server error") +} diff --git a/internal/probe/logging_trust.go b/internal/probe/logging_trust.go new file mode 100644 index 0000000..4c4bb94 --- /dev/null +++ b/internal/probe/logging_trust.go @@ -0,0 +1,49 @@ +package probe + +import ( + "github.com/rs/zerolog" + + "github.com/project-kessel/parsec/internal/trust" +) + +// Compile-time interface check. +var _ trust.TrustValidationObserver = (*LoggingTrustValidationObserver)(nil) + +// LoggingTrustValidationObserver logs trust validation events via zerolog. +type LoggingTrustValidationObserver struct { + logger zerolog.Logger +} + +func NewLoggingTrustValidationObserver(logger zerolog.Logger) *LoggingTrustValidationObserver { + return &LoggingTrustValidationObserver{logger: logger} +} + +func (o *LoggingTrustValidationObserver) ValidatorFailed(validatorName string, credType trust.CredentialType, err error) { + o.logger.Debug(). + Err(err). + Str("validator", validatorName). + Str("credential_type", string(credType)). + Msg("validator rejected credential") +} + +func (o *LoggingTrustValidationObserver) AllValidatorsFailed(credType trust.CredentialType, attempted int, lastErr error) { + o.logger.Warn(). + Err(lastErr). + Str("credential_type", string(credType)). + Int("attempted", attempted). + Msg("all validators failed for credential type") +} + +func (o *LoggingTrustValidationObserver) ValidatorFiltered(validatorName string, actorSubject string) { + o.logger.Debug(). + Str("validator", validatorName). + Str("actor", actorSubject). + Msg("validator filtered out for actor") +} + +func (o *LoggingTrustValidationObserver) FilterEvaluationFailed(validatorName string, err error) { + o.logger.Error(). + Err(err). + Str("validator", validatorName). + Msg("filter evaluation failed") +} diff --git a/internal/server/authz_test.go b/internal/server/authz_test.go index d582f19..c751be2 100644 --- a/internal/server/authz_test.go +++ b/internal/server/authz_test.go @@ -382,6 +382,7 @@ func TestAuthzServer_WithActorFiltering(t *testing.T) { // Setup filtered trust store with CEL-based filtering filteredStore, err := trust.NewFilteredStore( trust.WithCELFilter(`actor.trust_domain == "gateway.example.com" && validator_name in ["external-validator"]`), + trust.WithTrustValidationObserver(trust.NoopObserver{}), ) if err != nil { t.Fatalf("failed to create filtered store: %v", err) @@ -479,6 +480,7 @@ func TestAuthzServer_WithActorFiltering(t *testing.T) { // Create a new store with the gateway validator storeWithGateway, err := trust.NewFilteredStore( trust.WithCELFilter(`actor.trust_domain == "gateway.example.com" && validator_name in ["external-validator"]`), + trust.WithTrustValidationObserver(trust.NoopObserver{}), ) if err != nil { t.Fatalf("failed to create store: %v", err) @@ -601,6 +603,7 @@ func TestAuthzServer_WithActorFilteringByRequestPath(t *testing.T) { (validator_name == "admin-validator" && request.path.startsWith("/admin")) || (validator_name == "user-validator" && request.path.startsWith("/api")) `), + trust.WithTrustValidationObserver(trust.NoopObserver{}), ) if err != nil { t.Fatalf("failed to create filtered store: %v", err) diff --git a/internal/server/exchange_test.go b/internal/server/exchange_test.go index 05e3cac..469c878 100644 --- a/internal/server/exchange_test.go +++ b/internal/server/exchange_test.go @@ -25,6 +25,7 @@ func TestExchangeServer_WithActorFiltering(t *testing.T) { // Setup filtered trust store with CEL-based filtering filteredStore, err := trust.NewFilteredStore( trust.WithCELFilter(`actor.trust_domain == "client.example.com" && validator_name in ["external-validator"]`), + trust.WithTrustValidationObserver(trust.NoopObserver{}), ) if err != nil { t.Fatalf("failed to create filtered store: %v", err) @@ -107,6 +108,7 @@ func TestExchangeServer_WithActorFiltering(t *testing.T) { // Create a new store with the client validator storeWithClient, err := trust.NewFilteredStore( trust.WithCELFilter(`actor.trust_domain == "client.example.com" && validator_name in ["external-validator"]`), + trust.WithTrustValidationObserver(trust.NoopObserver{}), ) if err != nil { t.Fatalf("failed to create store: %v", err) @@ -187,6 +189,7 @@ func TestExchangeServer_WithActorFiltering(t *testing.T) { (has(actor.claims.role) && actor.claims.role == "admin" && validator_name == "admin-validator") || (has(actor.claims.role) && actor.claims.role == "user" && validator_name == "user-validator") `), + trust.WithTrustValidationObserver(trust.NoopObserver{}), ) if err != nil { t.Fatalf("failed to create store: %v", err) @@ -262,6 +265,7 @@ func TestExchangeServer_WithActorFilteringByAudience(t *testing.T) { (validator_name == "prod-validator" && has(request.additional.requested_audience) && request.additional.requested_audience == "prod.example.com") || (validator_name == "dev-validator" && has(request.additional.requested_audience) && request.additional.requested_audience == "dev.example.com") `), + trust.WithTrustValidationObserver(trust.NoopObserver{}), ) if err != nil { t.Fatalf("failed to create filtered store: %v", err) diff --git a/internal/server/jwks.go b/internal/server/jwks.go index ef7ebdb..0d67695 100644 --- a/internal/server/jwks.go +++ b/internal/server/jwks.go @@ -7,7 +7,6 @@ import ( "crypto/rsa" "encoding/base64" "fmt" - "log/slog" "math/big" "sync" "time" @@ -26,7 +25,7 @@ type JWKSServer struct { issuerRegistry service.Registry clock clock.Clock refreshInterval time.Duration - logger *slog.Logger + observer JWKSObserver // Cached response mu sync.RWMutex @@ -49,11 +48,11 @@ type JWKSServerConfig struct { // Clock is used for time operations (defaults to system clock) Clock clock.Clock - // Logger is the structured logger to use (required) - Logger *slog.Logger + // Observer must be non-nil; use NoopObserver{} in tests. + Observer JWKSObserver } -// NewJWKSServer creates a new JWKS server with caching +// NewJWKSServer creates a new JWKS server with caching. func NewJWKSServer(cfg JWKSServerConfig) *JWKSServer { if cfg.RefreshInterval == 0 { cfg.RefreshInterval = 1 * time.Minute @@ -65,7 +64,7 @@ func NewJWKSServer(cfg JWKSServerConfig) *JWKSServer { issuerRegistry: cfg.IssuerRegistry, clock: cfg.Clock, refreshInterval: cfg.RefreshInterval, - logger: cfg.Logger, + observer: cfg.Observer, } } @@ -73,14 +72,14 @@ func NewJWKSServer(cfg JWKSServerConfig) *JWKSServer { func (s *JWKSServer) Start(ctx context.Context) error { // Populate cache immediately if err := s.refreshCache(ctx); err != nil { - s.logger.Warn("initial cache population failed, will retry", "error", err) + s.observer.InitialCachePopulationFailed(err) } // Start background refresh s.ticker = s.clock.Ticker(s.refreshInterval) return s.ticker.Start(func(ctx context.Context) { if err := s.refreshCache(ctx); err != nil { - s.logger.Warn("background cache refresh failed", "error", err) + s.observer.CacheRefreshFailed(err) } }) } @@ -156,7 +155,7 @@ func (s *JWKSServer) buildJWKSResponse(ctx context.Context) (*parsecv1.GetJWKSRe for _, pk := range publicKeys { jwk, err := convertToJSONWebKey(pk) if err != nil { - // Skip keys that can't be converted + s.observer.KeyConversionFailed(pk.KeyID, err) continue } allKeys = append(allKeys, jwk) diff --git a/internal/server/jwks_cache_test.go b/internal/server/jwks_cache_test.go index 4aceb11..269c426 100644 --- a/internal/server/jwks_cache_test.go +++ b/internal/server/jwks_cache_test.go @@ -5,7 +5,6 @@ import ( "crypto/ecdsa" "crypto/elliptic" "crypto/rand" - "log/slog" "testing" "time" @@ -38,7 +37,7 @@ func TestJWKSServerCaching(t *testing.T) { IssuerRegistry: registry, RefreshInterval: 1 * time.Minute, Clock: clk, - Logger: slog.Default(), + Observer: NoopObserver{}, }) // Start should populate the cache @@ -82,7 +81,7 @@ func TestJWKSServerCaching(t *testing.T) { jwksServer := NewJWKSServer(JWKSServerConfig{ IssuerRegistry: registry, - Logger: slog.Default(), + Observer: NoopObserver{}, }) // First request should populate cache @@ -118,9 +117,9 @@ func TestJWKSServerCaching(t *testing.T) { clk := clock.NewFixtureClock(time.Now()) jwksServer := NewJWKSServer(JWKSServerConfig{ IssuerRegistry: registry, - RefreshInterval: 1 * time.Hour, // Long interval so it doesn't refresh during test + RefreshInterval: 1 * time.Hour, Clock: clk, - Logger: slog.Default(), + Observer: NoopObserver{}, }) // Start populates cache @@ -170,7 +169,7 @@ func TestJWKSServerCaching(t *testing.T) { IssuerRegistry: registry, RefreshInterval: 1 * time.Minute, Clock: clk, - Logger: slog.Default(), + Observer: NoopObserver{}, }) // Start populates cache and begins background refresh @@ -223,7 +222,7 @@ func TestJWKSServerCaching(t *testing.T) { IssuerRegistry: registry, RefreshInterval: 1 * time.Minute, Clock: clk, - Logger: slog.Default(), + Observer: NoopObserver{}, }) // Start populates cache with good data @@ -269,7 +268,7 @@ func TestJWKSServerCaching(t *testing.T) { jwksServer := NewJWKSServer(JWKSServerConfig{ IssuerRegistry: registry, - Logger: slog.Default(), + Observer: NoopObserver{}, }) // Start will fail to populate cache but shouldn't error diff --git a/internal/server/jwks_server_test.go b/internal/server/jwks_server_test.go index 161637d..b9995d4 100644 --- a/internal/server/jwks_server_test.go +++ b/internal/server/jwks_server_test.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "io" - "log/slog" "net/http" "testing" "time" @@ -15,34 +14,49 @@ import ( "github.com/project-kessel/parsec/internal/trust" ) -// TestJWKSEndpoint tests that the JWKS endpoint returns valid JSON Web Key Sets -// via the HTTP server. -func TestJWKSEndpoint(t *testing.T) { - ctx := context.Background() - +// startJWKSTestServer creates a full test server with the given issuer +// registry, wiring up a stub trust store, data source registry, and token +// service so tests only need to set up issuers. +func startJWKSTestServer(t *testing.T, issuerRegistry service.Registry) *testEnv { + t.Helper() trustStore := trust.NewStubStore() - stubValidator := trust.NewStubValidator(trust.CredentialTypeBearer) - trustStore.AddValidator(stubValidator) + trustStore.AddValidator(trust.NewStubValidator(trust.CredentialTypeBearer)) dataSourceRegistry := service.NewDataSourceRegistry() - issuerRegistry := service.NewSimpleRegistry() + tokenService := service.NewTokenService("parsec.test", dataSourceRegistry, issuerRegistry, nil) + claimsFilterRegistry := NewStubClaimsFilterRegistry() - kp := keys.NewInMemoryKeyProvider(keys.KeyTypeECP256, "ES256") - slotStore := keys.NewInMemoryKeySlotStore() - providerRegistry := map[string]keys.KeyProvider{ - "test-provider": kp, - } + return startTestServer(t, Config{ + AuthzServer: NewAuthzServer(trustStore, tokenService, nil, nil), + ExchangeServer: NewExchangeServer(trustStore, tokenService, claimsFilterRegistry, nil), + JWKSServer: NewJWKSServer(JWKSServerConfig{IssuerRegistry: issuerRegistry, Observer: NoopObserver{}}), + Observer: NoopObserver{}, + }) +} + +// newTestSigner creates and starts a DualSlotRotatingSigner with an in-memory +// key provider of the given type. Fails the test if the signer cannot start. +func newTestSigner(t *testing.T, namespace string, keyType keys.KeyType, algo string) *keys.DualSlotRotatingSigner { + t.Helper() + kp := keys.NewInMemoryKeyProvider(keyType, algo) signer := keys.NewDualSlotRotatingSigner(keys.DualSlotRotatingSignerConfig{ - Namespace: string(service.TokenTypeTransactionToken), + Namespace: namespace, KeyProviderID: "test-provider", - KeyProviderRegistry: providerRegistry, - SlotStore: slotStore, + KeyProviderRegistry: map[string]keys.KeyProvider{"test-provider": kp}, + SlotStore: keys.NewInMemoryKeySlotStore(), + Observer: keys.NoopObserver{}, }) - - if err := signer.Start(ctx); err != nil { + if err := signer.Start(context.Background()); err != nil { t.Fatalf("Failed to start signer: %v", err) } + return signer +} +// TestJWKSEndpoint tests that the JWKS endpoint returns valid JSON Web Key Sets +// via the HTTP server. +func TestJWKSEndpoint(t *testing.T) { + issuerRegistry := service.NewSimpleRegistry() + signer := newTestSigner(t, string(service.TokenTypeTransactionToken), keys.KeyTypeECP256, "ES256") txnIssuer := issuer.NewTransactionTokenIssuer(issuer.TransactionTokenIssuerConfig{ IssuerURL: "https://parsec.test", TTL: 5 * time.Minute, @@ -50,18 +64,9 @@ func TestJWKSEndpoint(t *testing.T) { TransactionContextMappers: []service.ClaimMapper{service.NewPassthroughSubjectMapper()}, RequestContextMappers: []service.ClaimMapper{service.NewRequestAttributesMapper()}, }) - issuerRegistry.Register(service.TokenTypeTransactionToken, txnIssuer) - trustDomain := "parsec.test" - tokenService := service.NewTokenService(trustDomain, dataSourceRegistry, issuerRegistry, nil) - claimsFilterRegistry := NewStubClaimsFilterRegistry() - - env := startTestServer(t, Config{ - AuthzServer: NewAuthzServer(trustStore, tokenService, nil, nil), - ExchangeServer: NewExchangeServer(trustStore, tokenService, claimsFilterRegistry, nil), - JWKSServer: NewJWKSServer(JWKSServerConfig{IssuerRegistry: issuerRegistry, Logger: slog.Default()}), - }) + env := startJWKSTestServer(t, issuerRegistry) t.Run("GET /v1/jwks.json", func(t *testing.T) { assertValidJWKS(t, env.HTTPClient, env.HTTPBaseURL+"/v1/jwks.json") @@ -74,76 +79,29 @@ func TestJWKSEndpoint(t *testing.T) { // TestJWKSWithMultipleIssuers tests that JWKS returns keys from multiple issuers. func TestJWKSWithMultipleIssuers(t *testing.T) { - ctx := context.Background() - - trustStore := trust.NewStubStore() - stubValidator := trust.NewStubValidator(trust.CredentialTypeBearer) - trustStore.AddValidator(stubValidator) - - dataSourceRegistry := service.NewDataSourceRegistry() issuerRegistry := service.NewSimpleRegistry() - kp1 := keys.NewInMemoryKeyProvider(keys.KeyTypeECP256, "ES256") - slotStore1 := keys.NewInMemoryKeySlotStore() - providerRegistry1 := map[string]keys.KeyProvider{ - "test-provider-1": kp1, - } - rotatingSigner1 := keys.NewDualSlotRotatingSigner(keys.DualSlotRotatingSignerConfig{ - Namespace: string(service.TokenTypeTransactionToken), - KeyProviderID: "test-provider-1", - KeyProviderRegistry: providerRegistry1, - SlotStore: slotStore1, - }) - - if err := rotatingSigner1.Start(ctx); err != nil { - t.Fatalf("Failed to start rotating signer 1: %v", err) - } - + signer1 := newTestSigner(t, string(service.TokenTypeTransactionToken), keys.KeyTypeECP256, "ES256") txnIssuer := issuer.NewTransactionTokenIssuer(issuer.TransactionTokenIssuerConfig{ IssuerURL: "https://parsec.test", TTL: 5 * time.Minute, - Signer: rotatingSigner1, + Signer: signer1, TransactionContextMappers: []service.ClaimMapper{service.NewPassthroughSubjectMapper()}, RequestContextMappers: []service.ClaimMapper{service.NewRequestAttributesMapper()}, }) - issuerRegistry.Register(service.TokenTypeTransactionToken, txnIssuer) - kp2 := keys.NewInMemoryKeyProvider(keys.KeyTypeECP384, "ES384") - slotStore2 := keys.NewInMemoryKeySlotStore() - providerRegistry2 := map[string]keys.KeyProvider{ - "test-provider-2": kp2, - } - rotatingSigner2 := keys.NewDualSlotRotatingSigner(keys.DualSlotRotatingSignerConfig{ - Namespace: string(service.TokenTypeAccessToken), - KeyProviderID: "test-provider-2", - KeyProviderRegistry: providerRegistry2, - SlotStore: slotStore2, - }) - - if err := rotatingSigner2.Start(ctx); err != nil { - t.Fatalf("Failed to start rotating signer 2: %v", err) - } - + signer2 := newTestSigner(t, string(service.TokenTypeAccessToken), keys.KeyTypeECP384, "ES384") accessIssuer := issuer.NewTransactionTokenIssuer(issuer.TransactionTokenIssuerConfig{ IssuerURL: "https://parsec.test", TTL: 15 * time.Minute, - Signer: rotatingSigner2, + Signer: signer2, TransactionContextMappers: []service.ClaimMapper{service.NewPassthroughSubjectMapper()}, RequestContextMappers: []service.ClaimMapper{}, }) - issuerRegistry.Register(service.TokenTypeAccessToken, accessIssuer) - trustDomain := "parsec.test" - tokenService := service.NewTokenService(trustDomain, dataSourceRegistry, issuerRegistry, nil) - claimsFilterRegistry := NewStubClaimsFilterRegistry() - - env := startTestServer(t, Config{ - AuthzServer: NewAuthzServer(trustStore, tokenService, nil, nil), - ExchangeServer: NewExchangeServer(trustStore, tokenService, claimsFilterRegistry, nil), - JWKSServer: NewJWKSServer(JWKSServerConfig{IssuerRegistry: issuerRegistry, Logger: slog.Default()}), - }) + env := startJWKSTestServer(t, issuerRegistry) resp, err := env.HTTPClient.Get(env.HTTPBaseURL + "/v1/jwks.json") if err != nil { @@ -182,29 +140,14 @@ func TestJWKSWithMultipleIssuers(t *testing.T) { // TestJWKSWithUnsignedIssuer tests that unsigned issuers don't contribute keys to JWKS. func TestJWKSWithUnsignedIssuer(t *testing.T) { - trustStore := trust.NewStubStore() - stubValidator := trust.NewStubValidator(trust.CredentialTypeBearer) - trustStore.AddValidator(stubValidator) - - dataSourceRegistry := service.NewDataSourceRegistry() issuerRegistry := service.NewSimpleRegistry() - unsignedIssuer := issuer.NewUnsignedIssuer(issuer.UnsignedIssuerConfig{ TokenType: string(service.TokenTypeTransactionToken), ClaimMappers: []service.ClaimMapper{service.NewPassthroughSubjectMapper()}, }) - issuerRegistry.Register(service.TokenTypeTransactionToken, unsignedIssuer) - trustDomain := "parsec.test" - tokenService := service.NewTokenService(trustDomain, dataSourceRegistry, issuerRegistry, nil) - claimsFilterRegistry := NewStubClaimsFilterRegistry() - - env := startTestServer(t, Config{ - AuthzServer: NewAuthzServer(trustStore, tokenService, nil, nil), - ExchangeServer: NewExchangeServer(trustStore, tokenService, claimsFilterRegistry, nil), - JWKSServer: NewJWKSServer(JWKSServerConfig{IssuerRegistry: issuerRegistry, Logger: slog.Default()}), - }) + env := startJWKSTestServer(t, issuerRegistry) resp, err := env.HTTPClient.Get(env.HTTPBaseURL + "/v1/jwks.json") if err != nil { diff --git a/internal/server/jwks_test.go b/internal/server/jwks_test.go index a944d3d..e30ac3e 100644 --- a/internal/server/jwks_test.go +++ b/internal/server/jwks_test.go @@ -5,7 +5,6 @@ import ( "crypto/ecdsa" "crypto/elliptic" "crypto/rand" - "log/slog" "testing" "github.com/project-kessel/parsec/internal/service" @@ -18,7 +17,7 @@ func TestJWKSServer(t *testing.T) { emptyRegistry := service.NewSimpleRegistry() jwksServer := NewJWKSServer(JWKSServerConfig{ IssuerRegistry: emptyRegistry, - Logger: slog.Default(), + Observer: NoopObserver{}, }) resp, err := jwksServer.GetJWKS(ctx, nil) @@ -54,7 +53,7 @@ func TestJWKSServer(t *testing.T) { jwksServer := NewJWKSServer(JWKSServerConfig{ IssuerRegistry: registry, - Logger: slog.Default(), + Observer: NoopObserver{}, }) resp, err := jwksServer.GetJWKS(ctx, nil) @@ -122,7 +121,7 @@ func TestJWKSServer(t *testing.T) { jwksServer := NewJWKSServer(JWKSServerConfig{ IssuerRegistry: registry, - Logger: slog.Default(), + Observer: NoopObserver{}, }) resp, err := jwksServer.GetJWKS(ctx, nil) @@ -159,7 +158,7 @@ func TestJWKSServer(t *testing.T) { jwksServer := NewJWKSServer(JWKSServerConfig{ IssuerRegistry: registry, - Logger: slog.Default(), + Observer: NoopObserver{}, }) resp, err := jwksServer.GetJWKS(ctx, nil) @@ -194,7 +193,7 @@ func TestJWKSServer(t *testing.T) { jwksServer := NewJWKSServer(JWKSServerConfig{ IssuerRegistry: registry, - Logger: slog.Default(), + Observer: NoopObserver{}, }) resp, err := jwksServer.GetJWKS(ctx, nil) @@ -220,7 +219,7 @@ func TestJWKSServer(t *testing.T) { jwksServer := NewJWKSServer(JWKSServerConfig{ IssuerRegistry: registry, - Logger: slog.Default(), + Observer: NoopObserver{}, }) resp, err := jwksServer.GetJWKS(ctx, nil) diff --git a/internal/server/observer.go b/internal/server/observer.go new file mode 100644 index 0000000..645230b --- /dev/null +++ b/internal/server/observer.go @@ -0,0 +1,24 @@ +package server + +// JWKSObserver receives JWKS cache lifecycle events from JWKSServer. +type JWKSObserver interface { + InitialCachePopulationFailed(err error) + CacheRefreshFailed(err error) + KeyConversionFailed(keyID string, err error) +} + +// ServerLifecycleObserver receives server lifecycle events from Server. +type ServerLifecycleObserver interface { + GRPCServeFailed(err error) + HTTPServeFailed(err error) +} + +// NoopObserver satisfies both JWKSObserver and ServerLifecycleObserver +// with empty methods. Useful in tests that don't care about observer events. +type NoopObserver struct{} + +func (NoopObserver) InitialCachePopulationFailed(error) {} +func (NoopObserver) CacheRefreshFailed(error) {} +func (NoopObserver) KeyConversionFailed(string, error) {} +func (NoopObserver) GRPCServeFailed(error) {} +func (NoopObserver) HTTPServeFailed(error) {} diff --git a/internal/server/server.go b/internal/server/server.go index 5616135..92a506f 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -3,6 +3,7 @@ package server import ( "context" "encoding/json" + "errors" "fmt" "net" "net/http" @@ -42,6 +43,7 @@ type Server struct { grpcListener net.Listener httpListener net.Listener grpcDialOptions []grpc.DialOption + observer ServerLifecycleObserver authzServer *AuthzServer exchangeServer *ExchangeServer @@ -62,14 +64,18 @@ type Config struct { AuthzServer *AuthzServer ExchangeServer *ExchangeServer JWKSServer *JWKSServer + + // Observer must be non-nil; use NoopObserver{} in tests. + Observer ServerLifecycleObserver } -// New creates a new server with the given configuration +// New creates a new server with the given configuration. func New(cfg Config) *Server { return &Server{ grpcListener: cfg.GRPCListener, httpListener: cfg.HTTPListener, grpcDialOptions: cfg.GRPCDialOptions, + observer: cfg.Observer, authzServer: cfg.AuthzServer, exchangeServer: cfg.ExchangeServer, jwksServer: cfg.JWKSServer, @@ -142,13 +148,13 @@ func (s *Server) Start(ctx context.Context) error { // All fallible setup is complete. Launch the serve goroutines last so // that an early-return error never orphans a running goroutine. go func() { - if err := s.grpcServer.Serve(s.grpcListener); err != nil { - fmt.Printf("gRPC server error: %v\n", err) + if err := s.grpcServer.Serve(s.grpcListener); err != nil && !errors.Is(err, grpc.ErrServerStopped) { + s.observer.GRPCServeFailed(err) } }() go func() { if err := s.httpServer.Serve(s.httpListener); err != nil && err != http.ErrServerClosed { - fmt.Printf("HTTP server error: %v\n", err) + s.observer.HTTPServeFailed(err) } }() diff --git a/internal/server/server_test_helper_test.go b/internal/server/server_test_helper_test.go index 0c21ea1..58b5827 100644 --- a/internal/server/server_test_helper_test.go +++ b/internal/server/server_test_helper_test.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "io" - "log/slog" "net" "net/http" "testing" @@ -134,7 +133,8 @@ func stubServerConfig() Config { return Config{ AuthzServer: NewAuthzServer(trustStore, tokenService, nil, nil), ExchangeServer: NewExchangeServer(trustStore, tokenService, claimsFilterRegistry, nil), - JWKSServer: NewJWKSServer(JWKSServerConfig{IssuerRegistry: issuerRegistry, Logger: slog.Default()}), + JWKSServer: NewJWKSServer(JWKSServerConfig{IssuerRegistry: issuerRegistry, Observer: NoopObserver{}}), + Observer: NoopObserver{}, } } diff --git a/internal/trust/filtered_store.go b/internal/trust/filtered_store.go index b2bee1f..95ce9cf 100644 --- a/internal/trust/filtered_store.go +++ b/internal/trust/filtered_store.go @@ -30,7 +30,8 @@ type FilteredStore struct { // All named validators in order validators []NamedValidator // Filter for determining validator access - filter ValidatorFilter + filter ValidatorFilter + observer TrustValidationObserver } // FilteredStoreOption is a functional option for configuring a FilteredStore @@ -44,6 +45,14 @@ func WithValidatorFilter(filter ValidatorFilter) FilteredStoreOption { } } +// WithTrustValidationObserver sets the observer for trust validation events. +func WithTrustValidationObserver(obs TrustValidationObserver) FilteredStoreOption { + return func(s *FilteredStore) error { + s.observer = obs + return nil + } +} + // WithCELFilter sets a CEL-based filter expression for the store // The expression should evaluate to a boolean indicating whether a validator is allowed // It has access to: @@ -60,7 +69,7 @@ func WithCELFilter(script string) FilteredStoreOption { } } -// NewFilteredStore creates a new filtered store +// NewFilteredStore creates a new filtered store. func NewFilteredStore(opts ...FilteredStoreOption) (*FilteredStore, error) { s := &FilteredStore{ validatorsByType: make(map[CredentialType][]NamedValidator), @@ -103,19 +112,20 @@ func (s *FilteredStore) Validate(ctx context.Context, credential Credential) (*R } // Try validators in order until one succeeds - var errors []error + var errs []error for _, nv := range validators { result, err := nv.Validator.Validate(ctx, credential) if err == nil { return result, nil } - // Collect errors - errors = append(errors, err) + s.observer.ValidatorFailed(nv.Name, credType, err) + errs = append(errs, err) } - // All validators failed - return nil, fmt.Errorf("all validators failed for credential type %s: %w", credType, errors[len(errors)-1]) + lastErr := errs[len(errs)-1] + s.observer.AllValidatorsFailed(credType, len(errs), lastErr) + return nil, fmt.Errorf("all validators failed for credential type %s: %w", credType, lastErr) } // ForActor implements the Store interface @@ -131,22 +141,26 @@ func (s *FilteredStore) ForActor(ctx context.Context, actor *Result, requestAttr return s, nil } - // Create a new filtered store with the same filter + // Create a new filtered store inheriting filter and observer filtered := &FilteredStore{ validatorsByType: make(map[CredentialType][]NamedValidator), validators: make([]NamedValidator, 0), filter: s.filter, + observer: s.observer, } // Evaluate the filter for each validator for _, nv := range s.validators { allowed, err := s.filter.IsAllowed(actor, nv.Name, requestAttrs) if err != nil { + s.observer.FilterEvaluationFailed(nv.Name, err) return nil, fmt.Errorf("failed to evaluate filter for validator %s: %w", nv.Name, err) } if allowed { filtered.AddValidator(nv.Name, nv.Validator) + } else { + s.observer.ValidatorFiltered(nv.Name, actor.Subject) } } diff --git a/internal/trust/filtered_store_test.go b/internal/trust/filtered_store_test.go index b898737..b4fdb85 100644 --- a/internal/trust/filtered_store_test.go +++ b/internal/trust/filtered_store_test.go @@ -9,6 +9,18 @@ import ( "github.com/project-kessel/parsec/internal/claims" ) +// newTestFilteredStore creates a FilteredStore pre-wired with a NoopObserver. +// Callers can pass additional options (e.g. WithCELFilter) as needed. +func newTestFilteredStore(t *testing.T, opts ...FilteredStoreOption) *FilteredStore { + t.Helper() + allOpts := append([]FilteredStoreOption{WithTrustValidationObserver(NoopObserver{})}, opts...) + store, err := NewFilteredStore(allOpts...) + if err != nil { + t.Fatalf("failed to create filtered store: %v", err) + } + return store +} + func TestFilteredStore_ForActor(t *testing.T) { ctx := context.Background() @@ -142,13 +154,8 @@ func TestFilteredStore_ForActor(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - // Create filtered store with CEL filter - store, err := NewFilteredStore(WithCELFilter(tt.filterScript)) - if err != nil { - t.Fatalf("failed to create filtered store: %v", err) - } + store := newTestFilteredStore(t, WithCELFilter(tt.filterScript)) - // Add validators for name, validator := range tt.validators { store.AddValidator(name, validator) } @@ -202,14 +209,9 @@ func TestFilteredStore_Validate(t *testing.T) { TrustDomain: "test-domain", }) - store, err := NewFilteredStore() - if err != nil { - t.Fatalf("failed to create store: %v", err) - } - + store := newTestFilteredStore(t) store.AddValidator("test-validator", validator) - // Test validation cred := &BearerCredential{Token: "test-token"} result, err := store.Validate(ctx, cred) if err != nil { @@ -231,12 +233,7 @@ func TestFilteredStore_NoFilterReturnsAllValidators(t *testing.T) { validator := NewStubValidator(CredentialTypeBearer) - // Create store without filter - store, err := NewFilteredStore() - if err != nil { - t.Fatalf("failed to create store: %v", err) - } - + store := newTestFilteredStore(t) store.AddValidator("test-validator", validator) actor := &Result{ @@ -258,12 +255,9 @@ func TestFilteredStore_NoFilterReturnsAllValidators(t *testing.T) { func TestFilteredStore_NilActorError(t *testing.T) { ctx := context.Background() - store, err := NewFilteredStore(WithCELFilter(`true`)) - if err != nil { - t.Fatalf("failed to create store: %v", err) - } + store := newTestFilteredStore(t, WithCELFilter(`true`)) - _, err = store.ForActor(ctx, nil, nil) + _, err := store.ForActor(ctx, nil, nil) if err == nil { t.Errorf("expected error for nil actor, got nil") } @@ -344,9 +338,9 @@ func TestFilteredStore_InvalidCELScript(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, err := NewFilteredStore(WithCELFilter(tt.script)) + _, err := NewFilteredStore(WithCELFilter(tt.script), WithTrustValidationObserver(NoopObserver{})) if err == nil { - t.Errorf("expected error for invalid script, got nil") + t.Error("expected error for invalid script, got nil") } }) } diff --git a/internal/trust/observer.go b/internal/trust/observer.go new file mode 100644 index 0000000..15c629f --- /dev/null +++ b/internal/trust/observer.go @@ -0,0 +1,29 @@ +package trust + +// TrustValidationObserver receives events from FilteredStore during credential validation. +type TrustValidationObserver interface { + // ValidatorFailed is called when an individual validator fails during + // multi-validator validation. These intermediate errors are otherwise lost + // since only the last error is returned to callers. + ValidatorFailed(validatorName string, credType CredentialType, err error) + + // AllValidatorsFailed is called when all validators fail for a credential type. + AllValidatorsFailed(credType CredentialType, attempted int, lastErr error) + + // ValidatorFiltered is called when ForActor's policy filter removes a + // validator from the set available to an actor. + ValidatorFiltered(validatorName string, actorSubject string) + + // FilterEvaluationFailed is called when the policy filter returns an error + // for a specific validator. + FilterEvaluationFailed(validatorName string, err error) +} + +// NoopObserver satisfies TrustValidationObserver with empty methods. +// Useful in tests that don't care about observer events. +type NoopObserver struct{} + +func (NoopObserver) ValidatorFailed(string, CredentialType, error) {} +func (NoopObserver) AllValidatorsFailed(CredentialType, int, error) {} +func (NoopObserver) ValidatorFiltered(string, string) {} +func (NoopObserver) FilterEvaluationFailed(string, error) {}