Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
github.com/knadh/koanf/v2 v2.3.3
github.com/lestrrat-go/httprc/v3 v3.0.4
github.com/lestrrat-go/jwx/v3 v3.0.13
github.com/rs/zerolog v1.34.0
github.com/spf13/cobra v1.10.2
github.com/spf13/pflag v1.0.10
github.com/stretchr/testify v1.11.1
Expand Down Expand Up @@ -62,6 +63,8 @@ require (
github.com/lestrrat-go/dsig-secp256k1 v1.0.0 // indirect
github.com/lestrrat-go/httpcc v1.0.1 // indirect
github.com/lestrrat-go/option/v2 v2.0.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.2.4 // indirect
Expand Down
14 changes: 14 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cncf/xds/go v0.0.0-20260202195803-dba9d589def2 h1:aBangftG7EVZoUb69Os8IaYg++6uMOdKK83QtkkvJik=
github.com/cncf/xds/go v0.0.0-20260202195803-dba9d589def2/go.mod h1:qwXFYgsP6T7XnJtbKlf1HP8AjxZZyzxMmc+Lq5GjlU4=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
Expand All @@ -58,6 +59,7 @@ github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4=
github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/goccy/go-yaml v1.19.2 h1:PmFC1S6h8ljIz6gMRBopkjP1TVT7xuwrButHID66PoM=
github.com/goccy/go-yaml v1.19.2/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 h1:f+oWsMOmNPc8JmEHVZIycC7hBoQxHH9pNKQORJNozsQ=
github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8/go.mod h1:wcDNUvekVysuuOpQKo3191zZyTpiI6se1N1ULghS0sw=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
Expand Down Expand Up @@ -108,19 +110,28 @@ github.com/lestrrat-go/jwx/v3 v3.0.13 h1:AdHKiPIYeCSnOJtvdpipPg/0SuFh9rdkN+HF3O0
github.com/lestrrat-go/jwx/v3 v3.0.13/go.mod h1:2m0PV1A9tM4b/jVLMx8rh6rBl7F6WGb3EG2hufN9OQU=
github.com/lestrrat-go/option/v2 v2.0.0 h1:XxrcaJESE1fokHy3FpaQ/cXW8ZsIdWcdFzzLOcID3Ss=
github.com/lestrrat-go/option/v2 v2.0.0/go.mod h1:oSySsmzMoR0iRzCDCaUfsCzxQHUEuhOViQObyy7S6Vg=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw=
github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s=
github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ=
github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4=
github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/planetscale/vtprotobuf v0.6.1-0.20250313105119-ba97887b0a25 h1:S1hI5JiKP7883xBzZAr1ydcxrKNSVNm7+3+JwjxZEsg=
github.com/planetscale/vtprotobuf v0.6.1-0.20250313105119-ba97887b0a25/go.mod h1:ZQntvDG8TkPgljxtA0R9frDoND4QORU1VXz015N5Ks4=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0=
github.com/rs/zerolog v1.34.0 h1:k43nTLIwcTVQAncfCw4KZ2VY6ukYoZaBPNOE8txlOeY=
github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6wYQ=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/segmentio/asm v1.2.1 h1:DTNbBqs57ioxAD4PrArqftgypG4/qNpXoJx8TVXxPR0=
github.com/segmentio/asm v1.2.1/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
Expand Down Expand Up @@ -157,6 +168,9 @@ golang.org/x/exp v0.0.0-20260218203240-3dfff04db8fa h1:Zt3DZoOFFYkKhDT3v7Lm9FDME
golang.org/x/exp v0.0.0-20260218203240-3dfff04db8fa/go.mod h1:K79w1Vqn7PoiZn+TkNpx3BUWUQksGO3JcVX6qIjytmA=
golang.org/x/net v0.51.0 h1:94R/GTO7mt3/4wIKpcR5gkGmRLOuE/2hNGeWq/GBIFo=
golang.org/x/net v0.51.0/go.mod h1:aamm+2QF5ogm02fjy5Bb7CQ0WMt1/WVM7FtyaTLlA9Y=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo=
golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk=
Expand Down
181 changes: 133 additions & 48 deletions internal/cli/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,104 @@ import (
"github.com/spf13/cobra"

"github.com/project-kessel/parsec/internal/config"
"github.com/project-kessel/parsec/internal/datasource"
"github.com/project-kessel/parsec/internal/keys"
"github.com/project-kessel/parsec/internal/probe"
"github.com/project-kessel/parsec/internal/server"
"github.com/project-kessel/parsec/internal/service"
"github.com/project-kessel/parsec/internal/trust"
)

type infraEventConfigs struct {
configReload *config.EventLoggingConfig
dataSourceCache *config.EventLoggingConfig
keyRotation *config.EventLoggingConfig
keyProvider *config.EventLoggingConfig
trustValidation *config.EventLoggingConfig
jwksCache *config.EventLoggingConfig
serverLifecycle *config.EventLoggingConfig
}

type wiredInfraObservers struct {
configReload config.ConfigReloadObserver
keyRotation keys.KeyRotationObserver
keyProvider keys.KeyProviderObserver
trustValidation trust.TrustValidationObserver
dataSourceCache datasource.DataSourceCacheObserver
jwksCache server.JWKSObserver
serverLifecycle server.ServerLifecycleObserver
}

type runtimeComponents struct {
trustStore trust.Store
tokenService *service.TokenService
authzTokenTypes []server.TokenTypeSpec
claimsFilterRegistry server.ClaimsFilterRegistry
issuerRegistry service.Registry
}

func buildInfraEventConfigs(obs *config.ObservabilityConfig) infraEventConfigs {
if obs == nil {
return infraEventConfigs{}
}
return infraEventConfigs{
configReload: obs.ConfigReload,
dataSourceCache: obs.DataSourceCache,
keyRotation: obs.KeyRotation,
keyProvider: obs.KeyProvider,
trustValidation: obs.TrustValidation,
jwksCache: obs.JWKSCache,
serverLifecycle: obs.ServerLifecycle,
}
}

func wireInfraObservers(logCtx config.LoggerContext, infraCfg infraEventConfigs) wiredInfraObservers {
return wiredInfraObservers{
configReload: probe.NewLoggingConfigReloadObserver(config.EventLogger(logCtx, "config_reload", infraCfg.configReload)),
keyRotation: probe.NewLoggingKeyRotationObserver(config.EventLogger(logCtx, "key_rotation", infraCfg.keyRotation)),
keyProvider: probe.NewLoggingKeyProviderObserver(config.EventLogger(logCtx, "key_provider", infraCfg.keyProvider)),
trustValidation: probe.NewLoggingTrustValidationObserver(config.EventLogger(logCtx, "trust_validation", infraCfg.trustValidation)),
dataSourceCache: probe.NewLoggingDataSourceCacheObserver(config.EventLogger(logCtx, "datasource_cache", infraCfg.dataSourceCache)),
jwksCache: probe.NewLoggingJWKSObserver(config.EventLogger(logCtx, "jwks_cache", infraCfg.jwksCache)),
serverLifecycle: probe.NewLoggingServerLifecycleObserver(config.EventLogger(logCtx, "server_lifecycle", infraCfg.serverLifecycle)),
}
}

func buildRuntimeComponents(provider *config.Provider) (*runtimeComponents, error) {
trustStore, err := provider.TrustStore()
if err != nil {
return nil, fmt.Errorf("failed to create trust store: %w", err)
}

tokenService, err := provider.TokenService()
if err != nil {
return nil, fmt.Errorf("failed to create token service: %w", err)
}

authzTokenTypes, err := provider.AuthzServerTokenTypes()
if err != nil {
return nil, fmt.Errorf("failed to get authz token types: %w", err)
}

claimsFilterRegistry, err := provider.ExchangeServerClaimsFilterRegistry()
if err != nil {
return nil, fmt.Errorf("failed to get exchange server claims filter registry: %w", err)
}

issuerRegistry, err := provider.IssuerRegistry()
if err != nil {
return nil, fmt.Errorf("failed to get issuer registry: %w", err)
}

return &runtimeComponents{
trustStore: trustStore,
tokenService: tokenService,
authzTokenTypes: authzTokenTypes,
claimsFilterRegistry: claimsFilterRegistry,
issuerRegistry: issuerRegistry,
}, nil
}

// NewServeCmd creates the serve command
func NewServeCmd() *cobra.Command {
cmd := &cobra.Command{
Expand Down Expand Up @@ -63,10 +158,8 @@ func runServe(cmd *cobra.Command, args []string) error {
// 1. Determine config file path
configPath := configFile
if configPath == "" {
// Check environment variable
configPath = os.Getenv("PARSEC_CONFIG")
}
// If still empty, configPath remains empty and we'll use env vars/flags only

// 2. Load configuration (file + env vars + flags)
loader, err := config.NewLoaderWithFlags(configPath, cmd.Flags())
Expand All @@ -79,52 +172,44 @@ func runServe(cmd *cobra.Command, args []string) error {
return fmt.Errorf("failed to parse config: %w", err)
}

// 3. Create provider to build all components from config
provider := config.NewProvider(cfg)
// 2a. Validate observability config early so typos surface at startup
if err := config.ValidateObservabilityConfig(cfg.Observability); err != nil {
return fmt.Errorf("invalid observability config: %w", err)
}

// 4. Create logger and observer — single instance shared across all components
logger := config.NewLogger(cfg.Observability)
// 3. Create logger, observers, then provider (observers must exist before provider)
logCtx := config.NewLoggerContext(cfg.Observability)
logger := logCtx.Logger
infraCfg := buildInfraEventConfigs(cfg.Observability)
infraObservers := wireInfraObservers(logCtx, infraCfg)
loader.SetObserver(infraObservers.configReload)

observer, err := config.NewObserverWithLogger(cfg.Observability, logger)
observer, err := config.NewObserverWithLogger(cfg.Observability, logCtx)
if err != nil {
return fmt.Errorf("failed to create observer: %w", err)
}

// Inject into provider so TokenService and other internal components use the same observer
provider.SetObserver(observer)
// 4. Create provider with all dependencies upfront — no temporal coupling
provider := config.NewProvider(cfg, config.ProviderDeps{
Observer: observer,
KeyRotationObserver: infraObservers.keyRotation,
KeyProviderObserver: infraObservers.keyProvider,
TrustObserver: infraObservers.trustValidation,
CacheObserver: infraObservers.dataSourceCache,
})

// 5. Build components via provider
trustStore, err := provider.TrustStore()
if err != nil {
return fmt.Errorf("failed to create trust store: %w", err)
}

tokenService, err := provider.TokenService()
if err != nil {
return fmt.Errorf("failed to create token service: %w", err)
}

authzTokenTypes, err := provider.AuthzServerTokenTypes()
if err != nil {
return fmt.Errorf("failed to get authz token types: %w", err)
}

claimsFilterRegistry, err := provider.ExchangeServerClaimsFilterRegistry()
if err != nil {
return fmt.Errorf("failed to get exchange server claims filter registry: %w", err)
}

issuerRegistry, err := provider.IssuerRegistry()
components, err := buildRuntimeComponents(provider)
if err != nil {
return fmt.Errorf("failed to get issuer registry: %w", err)
return err
}

// 6. Create service handlers with observability
authzServer := server.NewAuthzServer(trustStore, tokenService, authzTokenTypes, observer)
exchangeServer := server.NewExchangeServer(trustStore, tokenService, claimsFilterRegistry, observer)
authzServer := server.NewAuthzServer(components.trustStore, components.tokenService, components.authzTokenTypes, observer)
exchangeServer := server.NewExchangeServer(components.trustStore, components.tokenService, components.claimsFilterRegistry, observer)
jwksServer := server.NewJWKSServer(server.JWKSServerConfig{
IssuerRegistry: issuerRegistry,
Logger: logger,
IssuerRegistry: components.issuerRegistry,
Observer: infraObservers.jwksCache,
})

// Start JWKS background refresh
Expand Down Expand Up @@ -153,40 +238,40 @@ func runServe(cmd *cobra.Command, args []string) error {
AuthzServer: authzServer,
ExchangeServer: exchangeServer,
JWKSServer: jwksServer,
Observer: infraObservers.serverLifecycle,
})
if err := srv.Start(ctx); err != nil {
return fmt.Errorf("failed to start server: %w", err)
}

// 8a. All components initialized — signal readiness via gRPC health service.
// Per-service statuses transition from NOT_SERVING to SERVING.
srv.SetReady()

grpcAddr := grpcListener.Addr().String()
httpAddr := httpListener.Addr().String()
fmt.Println("parsec is running")
fmt.Printf(" gRPC (ext_authz): %s\n", grpcAddr)
fmt.Printf(" HTTP (token exchange): http://%s/v1/token\n", httpAddr)
fmt.Printf(" HTTP (JWKS): http://%s/v1/jwks.json\n", httpAddr)
fmt.Printf(" http://%s/.well-known/jwks.json\n", httpAddr)
fmt.Printf(" Health (gRPC): %s (grpc.health.v1.Health)\n", grpcAddr)
fmt.Printf(" Health (HTTP live): http://%s/healthz/live\n", httpAddr)
fmt.Printf(" Health (HTTP ready): http://%s/healthz/ready\n", httpAddr)
fmt.Printf(" Trust Domain: %s\n", provider.TrustDomain())
fmt.Printf(" Config: %s\n", configPath)
logger.Info().
Str("grpc_addr", grpcAddr).
Str("http_addr", httpAddr).
Str("token_exchange_url", "http://"+httpAddr+"/v1/token").
Str("jwks_url", "http://"+httpAddr+"/v1/jwks.json").
Str("jwks_wellknown_url", "http://"+httpAddr+"/.well-known/jwks.json").
Str("health_grpc", grpcAddr+" (grpc.health.v1.Health)").
Str("trust_domain", provider.TrustDomain()).
Str("config", configPath).
Msg("parsec is running")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check for some of missing print statements --> logger statements


// 9. Wait for interrupt signal
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
<-sigCh

fmt.Println("\nShutting down...")
logger.Info().Msg("Shutting down")

// 10. Graceful shutdown
if err := srv.Stop(ctx); err != nil {
return fmt.Errorf("error during shutdown: %w", err)
}

fmt.Println("Shutdown complete")
logger.Info().Msg("Shutdown complete")
return nil
}
13 changes: 10 additions & 3 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,9 +345,16 @@ type ObservabilityConfig struct {
LogFormat string `koanf:"log_format" usage:"log format: json, text"`

// Event-specific logging configuration
TokenIssuance *EventLoggingConfig `koanf:"token_issuance"`
TokenExchange *EventLoggingConfig `koanf:"token_exchange"`
AuthzCheck *EventLoggingConfig `koanf:"authz_check"`
TokenIssuance *EventLoggingConfig `koanf:"token_issuance"`
TokenExchange *EventLoggingConfig `koanf:"token_exchange"`
AuthzCheck *EventLoggingConfig `koanf:"authz_check"`
ConfigReload *EventLoggingConfig `koanf:"config_reload"`
DataSourceCache *EventLoggingConfig `koanf:"datasource_cache"`
KeyRotation *EventLoggingConfig `koanf:"key_rotation"`
KeyProvider *EventLoggingConfig `koanf:"key_provider"`
TrustValidation *EventLoggingConfig `koanf:"trust_validation"`
JWKSCache *EventLoggingConfig `koanf:"jwks_cache"`
ServerLifecycle *EventLoggingConfig `koanf:"server_lifecycle"`

// Composite observer fields - allows multiple observers
Observers []ObservabilityConfig `koanf:"observers"`
Expand Down
17 changes: 8 additions & 9 deletions internal/config/datasources.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import (
)

// NewDataSourceRegistry creates a data source registry from configuration
func NewDataSourceRegistry(cfg []DataSourceConfig, transport http.RoundTripper) (*service.DataSourceRegistry, error) {
func NewDataSourceRegistry(cfg []DataSourceConfig, transport http.RoundTripper, cacheObs datasource.DataSourceCacheObserver) (*service.DataSourceRegistry, error) {
registry := service.NewDataSourceRegistry()

for _, dsCfg := range cfg {
ds, err := newDataSource(dsCfg, transport)
ds, err := newDataSource(dsCfg, transport, cacheObs)
if err != nil {
return nil, fmt.Errorf("failed to create data source %s: %w", dsCfg.Name, err)
}
Expand All @@ -27,17 +27,17 @@ func NewDataSourceRegistry(cfg []DataSourceConfig, transport http.RoundTripper)
}

// newDataSource creates a data source from configuration
func newDataSource(cfg DataSourceConfig, transport http.RoundTripper) (service.DataSource, error) {
func newDataSource(cfg DataSourceConfig, transport http.RoundTripper, cacheObs datasource.DataSourceCacheObserver) (service.DataSource, error) {
switch cfg.Type {
case "lua":
return newLuaDataSource(cfg, transport)
return newLuaDataSource(cfg, transport, cacheObs)
default:
return nil, fmt.Errorf("unknown data source type: %s (supported: lua)", cfg.Type)
}
}

// newLuaDataSource creates a Lua data source with optional caching
func newLuaDataSource(cfg DataSourceConfig, transport http.RoundTripper) (service.DataSource, error) {
func newLuaDataSource(cfg DataSourceConfig, transport http.RoundTripper, cacheObs datasource.DataSourceCacheObserver) (service.DataSource, error) {
if cfg.Name == "" {
return nil, fmt.Errorf("data source name is required")
}
Expand Down Expand Up @@ -87,7 +87,7 @@ func newLuaDataSource(cfg DataSourceConfig, transport http.RoundTripper) (servic

// Wrap with caching if configured
if cfg.Caching != nil {
return wrapWithCaching(baseDS, *cfg.Caching)
return wrapWithCaching(baseDS, *cfg.Caching, cacheObs)
}

return baseDS, nil
Expand Down Expand Up @@ -117,11 +117,10 @@ func buildHTTPConfig(cfg *HTTPConfig, transport http.RoundTripper) (*luaservices
}

// wrapWithCaching wraps a data source with the configured caching layer
func wrapWithCaching(ds service.DataSource, cfg CachingConfig) (service.DataSource, error) {
func wrapWithCaching(ds service.DataSource, cfg CachingConfig, cacheObs datasource.DataSourceCacheObserver) (service.DataSource, error) {
switch cfg.Type {
case "in_memory":
// In-memory caching uses the Cacheable interface from the data source
return datasource.NewInMemoryCachingDataSource(ds), nil
return datasource.NewInMemoryCachingDataSource(ds, datasource.WithCacheObserver(cacheObs)), nil

case "distributed":
groupName := cfg.GroupName
Expand Down
Loading
Loading