From 74c7acec23f0a7df7e90234ebcc936885cc26fd4 Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Fri, 19 Jun 2026 21:52:32 +0800 Subject: [PATCH 01/22] feat(redisclient): add cluster endpoint mode Managed providers can expose Redis Cluster rather than a direct or Sentinel endpoint. Add Cluster client construction, seed URL parsing, redirect and topology refresh settings, and DB zero validation to backend Redis config. URL schemes and credentials now survive into Cluster options while direct and Sentinel modes keep their existing path. --- backend/internal/redisclient/redisclient.go | 138 +++++++++++++++++--- 1 file changed, 117 insertions(+), 21 deletions(-) diff --git a/backend/internal/redisclient/redisclient.go b/backend/internal/redisclient/redisclient.go index cd276f8fa..df8a9f478 100644 --- a/backend/internal/redisclient/redisclient.go +++ b/backend/internal/redisclient/redisclient.go @@ -39,20 +39,25 @@ var ( ) const ( - endpointModeDirect = "direct" - endpointModeSentinel = "sentinel" - sentinelMasterDefault = "mpp-redis-ha" + endpointModeDirect = "direct" + endpointModeSentinel = "sentinel" + endpointModeCluster = "cluster" + sentinelMasterDefault = "mpp-redis-ha" + clusterMaxRedirects = 8 + clusterStateReloadInterval = 10 * time.Second ) type Config struct { EndpointMode string Addr string + Username string Password string DB int TLS bool TLSCACert string TLSCAFile string TLSServerName string + ClusterAddrs []string SentinelAddrs []string SentinelMasterName string tls *tls.Config @@ -70,11 +75,11 @@ const ( ) type ClientSet struct { - Default *redis.Client - Coordination *redis.Client - Cache *redis.Client - Queue *redis.Client - Session *redis.Client + Default redis.UniversalClient + Coordination redis.UniversalClient + Cache redis.UniversalClient + Queue redis.UniversalClient + Session redis.UniversalClient } type roleConfig struct { @@ -97,11 +102,11 @@ type poolConfig struct { ConnMaxLifetime time.Duration } -func NewFromEnv(ctx context.Context) (*redis.Client, error) { +func NewFromEnv(ctx context.Context) (redis.UniversalClient, error) { return NewRoleFromEnv(ctx, RoleDefault) } -func NewRoleFromEnv(ctx context.Context, role Role) (*redis.Client, error) { +func NewRoleFromEnv(ctx context.Context, role Role) (redis.UniversalClient, error) { config, err := ConfigFromEnv() if err != nil { return nil, err @@ -119,7 +124,7 @@ func NewClientSetFromEnv(ctx context.Context) (*ClientSet, error) { clientSet := &ClientSet{} builders := []struct { role Role - dst **redis.Client + dst *redis.UniversalClient }{ {role: RoleDefault, dst: &clientSet.Default}, {role: RoleCoordination, dst: &clientSet.Coordination}, @@ -158,6 +163,13 @@ func ConfigFromEnv() (Config, error) { if config.Addr == "" { return Config{}, ErrNotConfigured } + case endpointModeCluster: + if err := config.configureClusterFromEnv(); err != nil { + return Config{}, err + } + if len(config.ClusterAddrs) == 0 { + return Config{}, fmt.Errorf("%s must be set when %s=cluster", addrEnv, endpointModeEnv) + } case endpointModeSentinel: config.SentinelAddrs = csvEnv(sentinelAddrsEnv) config.SentinelMasterName = strings.TrimSpace(os.Getenv(sentinelMasterEnv)) @@ -173,6 +185,9 @@ func ConfigFromEnv() (Config, error) { if err != nil { return Config{}, err } + if endpointMode == endpointModeCluster && db != 0 { + return Config{}, fmt.Errorf("invalid %s: must be 0 when %s=cluster", dbEnv, endpointModeEnv) + } pool, err := poolConfigFromEnv() if err != nil { return Config{}, err @@ -186,7 +201,7 @@ func ConfigFromEnv() (Config, error) { return config, nil } -func New(ctx context.Context, config Config, role Role) (*redis.Client, error) { +func New(ctx context.Context, config Config, role Role) (redis.UniversalClient, error) { if err := config.configureTLS(); err != nil { return nil, err } @@ -199,15 +214,21 @@ func New(ctx context.Context, config Config, role Role) (*redis.Client, error) { return client, nil } -func newClient(config Config, role Role) *redis.Client { - if config.EndpointMode == endpointModeSentinel { +func newClient(config Config, role Role) redis.UniversalClient { + switch config.EndpointMode { + case endpointModeCluster: + client := redis.NewClusterClient(clusterOptions(config, role)) + client.AddHook(newLoggingHook(role)) + return client + case endpointModeSentinel: client := redis.NewFailoverClient(failoverOptions(config, role)) client.AddHook(newLoggingHook(role)) return client + default: + client := redis.NewClient(options(config, role)) + client.AddHook(newLoggingHook(role)) + return client } - client := redis.NewClient(options(config, role)) - client.AddHook(newLoggingHook(role)) - return client } func options(config Config, role Role) *redis.Options { @@ -253,6 +274,29 @@ func failoverOptions(config Config, role Role) *redis.FailoverOptions { return options } +func clusterOptions(config Config, role Role) *redis.ClusterOptions { + roleSettings := roleSettings(role) + options := &redis.ClusterOptions{ + Addrs: append([]string(nil), config.ClusterAddrs...), + Username: config.Username, + Password: config.Password, + TLSConfig: config.tlsConfig(), + MaxRedirects: clusterMaxRedirects, + ClusterStateReloadInterval: clusterStateReloadInterval, + DialTimeout: roleSettings.DialTimeout, + ReadTimeout: roleSettings.ReadTimeout, + WriteTimeout: roleSettings.WriteTimeout, + PoolTimeout: roleSettings.PoolTimeout, + MaxRetries: roleSettings.MaxRetries, + MinRetryBackoff: roleSettings.MinRetryBackoff, + MaxRetryBackoff: roleSettings.MaxRetryBackoff, + DialerRetries: roleSettings.DialerRetries, + DialerRetryTimeout: roleSettings.DialerRetryTimeout, + } + applyClusterPoolConfig(options, config.pool) + return options +} + func roleSettings(role Role) roleConfig { switch role { case RoleCoordination: @@ -340,14 +384,27 @@ func applyFailoverPoolConfig(options *redis.FailoverOptions, config poolConfig) options.ConnMaxLifetime = config.ConnMaxLifetime } +func applyClusterPoolConfig(options *redis.ClusterOptions, config poolConfig) { + if options == nil { + return + } + options.PoolSize = config.PoolSize + options.MinIdleConns = config.MinIdleConns + options.MaxIdleConns = config.MaxIdleConns + options.ConnMaxIdleTime = config.ConnMaxIdleTime + options.ConnMaxLifetime = config.ConnMaxLifetime +} + func endpointModeFromEnv() (string, error) { switch strings.ToLower(strings.TrimSpace(os.Getenv(endpointModeEnv))) { case "", endpointModeDirect: return endpointModeDirect, nil case endpointModeSentinel: return endpointModeSentinel, nil + case endpointModeCluster: + return endpointModeCluster, nil default: - return "", fmt.Errorf("%s must be one of: %s, %s", endpointModeEnv, endpointModeDirect, endpointModeSentinel) + return "", fmt.Errorf("%s must be one of: %s, %s, %s", endpointModeEnv, endpointModeDirect, endpointModeSentinel, endpointModeCluster) } } @@ -385,7 +442,7 @@ func poolConfigFromEnv() (poolConfig, error) { }, nil } -func pingWithRetry(ctx context.Context, client *redis.Client) error { +func pingWithRetry(ctx context.Context, client redis.UniversalClient) error { var lastErr error for range 10 { pingCtx, cancel := context.WithTimeout(ctx, 2*time.Second) @@ -408,14 +465,14 @@ func (c *ClientSet) Close() error { if c == nil { return nil } - clients := []*redis.Client{ + clients := []redis.UniversalClient{ c.Default, c.Coordination, c.Cache, c.Queue, c.Session, } - seen := make(map[*redis.Client]struct{}, len(clients)) + seen := make(map[redis.UniversalClient]struct{}, len(clients)) var firstErr error for _, client := range clients { if client == nil { @@ -535,6 +592,45 @@ func csvEnv(name string) []string { return values } +func (c *Config) configureClusterFromEnv() error { + addrs := csvEnv(addrEnv) + normalized := make([]string, 0, len(addrs)) + var urlUsername string + var urlPassword string + for _, addr := range addrs { + if !strings.Contains(addr, "://") { + normalized = append(normalized, addr) + continue + } + options, err := redis.ParseClusterURL(addr) + if err != nil { + return fmt.Errorf("invalid %s cluster URL: %w", addrEnv, err) + } + normalized = append(normalized, options.Addrs...) + if options.Username != "" { + if urlUsername != "" && urlUsername != options.Username { + return fmt.Errorf("invalid %s cluster URL: conflicting usernames", addrEnv) + } + urlUsername = options.Username + } + if options.Password != "" { + if urlPassword != "" && urlPassword != options.Password { + return fmt.Errorf("invalid %s cluster URL: conflicting passwords", addrEnv) + } + urlPassword = options.Password + } + if options.TLSConfig != nil { + c.TLS = true + } + } + c.ClusterAddrs = normalized + c.Username = urlUsername + if c.Password == "" { + c.Password = urlPassword + } + return nil +} + func redisTLSConfig(config Config) (*tls.Config, error) { tlsConfig := &tls.Config{ MinVersion: tls.VersionTLS12, From 3771bd2c983aacaa211c388c5a3d5dfa2bebbc1e Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Fri, 19 Jun 2026 21:52:32 +0800 Subject: [PATCH 02/22] refactor(app): carry universal redis clients Cluster clients do not fit the concrete standalone Redis client type used by app wiring. Widen server, runtime, and health Redis fields to redis.UniversalClient while preserving duplicate-close handling. Application startup can pass direct, Sentinel, or Cluster clients through the same runtime slots. --- backend/cmd/api/server.go | 4 ++-- backend/internal/app/health.go | 2 +- backend/internal/app/health_server.go | 2 +- backend/internal/app/runtime.go | 14 +++++++------- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/backend/cmd/api/server.go b/backend/cmd/api/server.go index 8bc11f059..66c0f852f 100644 --- a/backend/cmd/api/server.go +++ b/backend/cmd/api/server.go @@ -23,8 +23,8 @@ import ( type serverConfig struct { runtimeConfig app.RuntimeConfig jwtSigningKey []byte - redisClient *redis.Client - rateLimitRedis *redis.Client + redisClient redis.UniversalClient + rateLimitRedis redis.UniversalClient mockLogin bool ready *atomic.Bool sqlDB *gorm.DB diff --git a/backend/internal/app/health.go b/backend/internal/app/health.go index 166322e60..b56a92770 100644 --- a/backend/internal/app/health.go +++ b/backend/internal/app/health.go @@ -11,7 +11,7 @@ import ( "gorm.io/gorm" ) -func RegisterHealthRoutes(e *echo.Echo, ready *atomic.Bool, sqlDB *gorm.DB, redisClient *redis.Client) { +func RegisterHealthRoutes(e *echo.Echo, ready *atomic.Bool, sqlDB *gorm.DB, redisClient redis.UniversalClient) { e.GET("/health", func(c echo.Context) error { return c.JSON(http.StatusOK, map[string]string{"status": "healthy"}) }) diff --git a/backend/internal/app/health_server.go b/backend/internal/app/health_server.go index ef69e7fa2..210662eb6 100644 --- a/backend/internal/app/health_server.go +++ b/backend/internal/app/health_server.go @@ -17,7 +17,7 @@ const defaultHTTPPort = "8080" type HealthServerConfig struct { Ready *atomic.Bool - RedisClient *redis.Client + RedisClient redis.UniversalClient ObservabilitySuite *observability.Suite DBRouter *db.Router SQLDB *gorm.DB diff --git a/backend/internal/app/runtime.go b/backend/internal/app/runtime.go index b1d341677..f52137c1c 100644 --- a/backend/internal/app/runtime.go +++ b/backend/internal/app/runtime.go @@ -42,11 +42,11 @@ type Runtime struct { Config RuntimeConfig JWTSigningKey []byte MockLogin bool - RedisClient *redis.Client - RedisCoordination *redis.Client - RedisCache *redis.Client - RedisQueue *redis.Client - RedisSessionContinuity *redis.Client + RedisClient redis.UniversalClient + RedisCoordination redis.UniversalClient + RedisCache redis.UniversalClient + RedisQueue redis.UniversalClient + RedisSessionContinuity redis.UniversalClient ObservabilitySuite *observability.Suite DashboardService *dashboardsvc.DashboardService CollabDocumentService *collabdoc.Service @@ -214,14 +214,14 @@ func (r *Runtime) Close() error { if r == nil { return nil } - clients := []*redis.Client{ + clients := []redis.UniversalClient{ r.RedisClient, r.RedisCoordination, r.RedisCache, r.RedisQueue, r.RedisSessionContinuity, } - seen := map[*redis.Client]struct{}{} + seen := map[redis.UniversalClient]struct{}{} var firstErr error for _, client := range clients { if client == nil { From 965c829585bb095903ea01c907dd790093dc21d2 Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Fri, 19 Jun 2026 21:52:32 +0800 Subject: [PATCH 03/22] refactor(api): accept universal redis dependents HTTP edge code shares Redis clients with middleware and streaming coordination paths. Widen auth, rate limiting, and streamgate entry points from concrete clients to redis.UniversalClient. These callers can now use Cluster clients without changing command behavior. --- backend/internal/handlers/auth.go | 4 ++-- backend/internal/middleware/rate_limit.go | 12 ++++++------ backend/internal/pkg/streamgate/streamgate.go | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/backend/internal/handlers/auth.go b/backend/internal/handlers/auth.go index 0dedd51f5..ebd1c8516 100644 --- a/backend/internal/handlers/auth.go +++ b/backend/internal/handlers/auth.go @@ -26,13 +26,13 @@ import ( type AuthHandler struct { db *gorm.DB - redis *redis.Client + redis redis.UniversalClient email email.EmailService jwtSigningKey []byte usernameLoginEnabled bool } -func NewAuthHandler(db *gorm.DB, redis *redis.Client, email email.EmailService, jwtSigningKey []byte) *AuthHandler { +func NewAuthHandler(db *gorm.DB, redis redis.UniversalClient, email email.EmailService, jwtSigningKey []byte) *AuthHandler { return &AuthHandler{ db: db, redis: redis, diff --git a/backend/internal/middleware/rate_limit.go b/backend/internal/middleware/rate_limit.go index ebe23815b..621fa656c 100644 --- a/backend/internal/middleware/rate_limit.go +++ b/backend/internal/middleware/rate_limit.go @@ -32,7 +32,7 @@ var defaultRateLimitPolicyYAML []byte type RateLimitConfig struct { Enabled bool - RedisClient *redis.Client + RedisClient redis.UniversalClient KeyPrefix string FailOpen bool guard *redisdegrade.Guard @@ -111,7 +111,7 @@ local ttl = redis.call("PTTL", KEYS[1]) return { current, ttl } ` -func DefaultRateLimitConfig(client *redis.Client) RateLimitConfig { +func DefaultRateLimitConfig(client redis.UniversalClient) RateLimitConfig { policy, err := rateLimitPolicyFromYAML(defaultRateLimitPolicyYAML) if err != nil { panic(err) @@ -119,7 +119,7 @@ func DefaultRateLimitConfig(client *redis.Client) RateLimitConfig { return rateLimitConfigFromPolicy(client, policy) } -func rateLimitConfigFromPolicy(client *redis.Client, policy rateLimitPolicy) RateLimitConfig { +func rateLimitConfigFromPolicy(client redis.UniversalClient, policy rateLimitPolicy) RateLimitConfig { return RateLimitConfig{ Enabled: client != nil, RedisClient: client, @@ -149,7 +149,7 @@ func rateLimitConfigFromPolicy(client *redis.Client, policy rateLimitPolicy) Rat } } -func RateLimitConfigFromEnv(client *redis.Client) (RateLimitConfig, error) { +func RateLimitConfigFromEnv(client redis.UniversalClient) (RateLimitConfig, error) { policy, err := rateLimitPolicyFromYAML(defaultRateLimitPolicyYAML) if err != nil { return RateLimitConfig{}, err @@ -342,7 +342,7 @@ func rateLimitCategory(method, route string) string { } } -func checkRateLimitBuckets(ctx context.Context, client *redis.Client, prefix string, guard *redisdegrade.Guard, buckets []rateLimitBucket) (rateLimitResult, error) { +func checkRateLimitBuckets(ctx context.Context, client redis.UniversalClient, prefix string, guard *redisdegrade.Guard, buckets []rateLimitBucket) (rateLimitResult, error) { var selected rateLimitResult for _, bucket := range buckets { current, ttl, err := incrementRateLimitBucket(ctx, client, guard, rateLimitRedisKey(prefix, bucket), bucket.Window) @@ -370,7 +370,7 @@ func checkRateLimitBuckets(ctx context.Context, client *redis.Client, prefix str return selected, nil } -func incrementRateLimitBucket(ctx context.Context, client *redis.Client, guard *redisdegrade.Guard, key string, window time.Duration) (int64, time.Duration, error) { +func incrementRateLimitBucket(ctx context.Context, client redis.UniversalClient, guard *redisdegrade.Guard, key string, window time.Duration) (int64, time.Duration, error) { raw, err := redisdegrade.CallWork(guard, "rate_limit", func() (any, error) { return client.Eval(ctx, redisRateLimitScript, []string{key}, window.Milliseconds()).Result() }) diff --git a/backend/internal/pkg/streamgate/streamgate.go b/backend/internal/pkg/streamgate/streamgate.go index dbe55be4b..cb1d414e5 100644 --- a/backend/internal/pkg/streamgate/streamgate.go +++ b/backend/internal/pkg/streamgate/streamgate.go @@ -75,7 +75,7 @@ func (l *Lease) Release(ctx context.Context) error { type Limiter struct { config Config - redis *redis.Client + redis redis.UniversalClient memory *memoryStore } @@ -94,7 +94,7 @@ func (e *LimitError) Unwrap() error { return ErrLimitExceeded } -func New(redisClient *redis.Client, config Config) *Limiter { +func New(redisClient redis.UniversalClient, config Config) *Limiter { config = normalizeConfig(config) if !config.Enabled { return nil From 789e7d3ae57f3564a06918bc3ef9b569d88a9f74 Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Fri, 19 Jun 2026 21:52:32 +0800 Subject: [PATCH 04/22] refactor(browser-session): accept universal redis clients Browser session coordination uses the shared Redis client set from runtime wiring. Widen coordination and continuity Redis fields and setters to redis.UniversalClient. Session locks, live state, and cleanup paths can now run on Cluster-capable clients. --- backend/internal/services/browser_session/service.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/backend/internal/services/browser_session/service.go b/backend/internal/services/browser_session/service.go index eddfa9c98..252b4fcf8 100644 --- a/backend/internal/services/browser_session/service.go +++ b/backend/internal/services/browser_session/service.go @@ -73,8 +73,8 @@ type BrowserSessionService struct { workerClient publisher.BrowserWorkerClient cookieStore *publisher.CookieStore adapters map[string]publisher.RemoteBrowserPlatformAdapter - coordinationRedisClient *redis.Client - continuityRedisClient *redis.Client + coordinationRedisClient redis.UniversalClient + continuityRedisClient redis.UniversalClient quotaConfig BrowserSessionQuotaConfig dashboardAccountCacheInvalidator DashboardAccountCacheInvalidator } @@ -160,19 +160,19 @@ func (s *BrowserSessionService) RegisterSession(ctx context.Context, session *mo return nil } -func (s *BrowserSessionService) UseRedis(client *redis.Client) { +func (s *BrowserSessionService) UseRedis(client redis.UniversalClient) { s.UseRedisCoordination(client) s.UseRedisContinuity(client) } -func (s *BrowserSessionService) UseRedisCoordination(client *redis.Client) { +func (s *BrowserSessionService) UseRedisCoordination(client redis.UniversalClient) { if client == nil { return } s.coordinationRedisClient = client } -func (s *BrowserSessionService) UseRedisContinuity(client *redis.Client) { +func (s *BrowserSessionService) UseRedisContinuity(client redis.UniversalClient) { if client == nil { return } From 357f3595b9f93f8d33a6abfd1d22a21f475e1593 Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Fri, 19 Jun 2026 21:52:32 +0800 Subject: [PATCH 05/22] refactor(dashboard): accept universal redis clients Dashboard services fan Redis clients into cache and queue collaborators. Widen dashboard Redis setters so cache and queue clients can be direct, Sentinel, or Cluster. The facade no longer pins downstream services to standalone Redis clients. --- backend/internal/services/dashboard/facade.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/backend/internal/services/dashboard/facade.go b/backend/internal/services/dashboard/facade.go index 8ef7b66b4..e7b7a4efc 100644 --- a/backend/internal/services/dashboard/facade.go +++ b/backend/internal/services/dashboard/facade.go @@ -205,12 +205,12 @@ func (s *DashboardService) SetPublishQueue(queue publishsvc.PublishQueue) { s.SetQueue(queue) } -func (s *DashboardService) UseRedis(client *redis.Client) { +func (s *DashboardService) UseRedis(client redis.UniversalClient) { s.UseRedisCache(client) s.UseRedisQueue(client) } -func (s *DashboardService) UseRedisCache(client *redis.Client) { +func (s *DashboardService) UseRedisCache(client redis.UniversalClient) { if client == nil { return } @@ -220,7 +220,7 @@ func (s *DashboardService) UseRedisCache(client *redis.Client) { s.Stats.UseRedisCache(client) } -func (s *DashboardService) UseRedisQueue(client *redis.Client) { +func (s *DashboardService) UseRedisQueue(client redis.UniversalClient) { if client == nil { return } From fa92e47f41c8b707156960756da11ed96a14814d Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Fri, 19 Jun 2026 21:52:32 +0800 Subject: [PATCH 06/22] refactor(platform-account): accept universal redis clients Platform account state and cache stores share the managed Redis connection surface. Widen OAuth state and cache Redis clients to redis.UniversalClient. Account cache and OAuth flows can now use Cluster-aware clients transparently. --- .../services/platform_account/oauth2_state_store.go | 4 ++-- backend/internal/services/platform_account/service.go | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/backend/internal/services/platform_account/oauth2_state_store.go b/backend/internal/services/platform_account/oauth2_state_store.go index 005c776f0..3030fad46 100644 --- a/backend/internal/services/platform_account/oauth2_state_store.go +++ b/backend/internal/services/platform_account/oauth2_state_store.go @@ -57,7 +57,7 @@ func (s *MemoryXOAuth2StateStore) Consume(ctx context.Context, state string) (xO } type RedisXOAuth2StateStore struct { - client *redis.Client + client redis.UniversalClient prefix string } @@ -69,7 +69,7 @@ type redisXOAuth2PendingState struct { ExpiresAt time.Time `json:"expires_at"` } -func NewRedisXOAuth2StateStore(client *redis.Client) *RedisXOAuth2StateStore { +func NewRedisXOAuth2StateStore(client redis.UniversalClient) *RedisXOAuth2StateStore { return &RedisXOAuth2StateStore{ client: client, prefix: xOAuth2StateKeyPrefix, diff --git a/backend/internal/services/platform_account/service.go b/backend/internal/services/platform_account/service.go index c76f66d7a..03dfc21cc 100644 --- a/backend/internal/services/platform_account/service.go +++ b/backend/internal/services/platform_account/service.go @@ -28,8 +28,8 @@ type Service struct { xTester XConnectionTester xOAuth2Provider XOAuth2Provider xOAuth2States XOAuth2StateStore - stateStoreClient *redis.Client - cache *redis.Client + stateStoreClient redis.UniversalClient + cache redis.UniversalClient cacheTTL time.Duration cacheGroup *singleflight.Group cacheGuard *redisdegrade.Guard @@ -110,12 +110,12 @@ func (s *Service) strongReadDB() *gorm.DB { return s.router.Reader(s.requestContext(), dbrouter.StrongRead) } -func (s *Service) UseRedis(client *redis.Client) { +func (s *Service) UseRedis(client redis.UniversalClient) { s.UseRedisStateStore(client) s.UseRedisCache(client) } -func (s *Service) UseRedisStateStore(client *redis.Client) { +func (s *Service) UseRedisStateStore(client redis.UniversalClient) { if client == nil { return } @@ -123,7 +123,7 @@ func (s *Service) UseRedisStateStore(client *redis.Client) { s.xOAuth2States = NewRedisXOAuth2StateStore(client) } -func (s *Service) UseRedisCache(client *redis.Client) { +func (s *Service) UseRedisCache(client redis.UniversalClient) { if client == nil { return } From 2b5ec0ac0ee1e030a66470460573ec872f8c31db Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Fri, 19 Jun 2026 21:52:33 +0800 Subject: [PATCH 07/22] refactor(mediaasset): accept universal redis clients Media asset resolution stores cache entries and invalidates key families through Redis. Widen the media asset cache client and cache deletion helper to redis.UniversalClient. Resolve caching can use Cluster clients while keeping the existing command flow. --- backend/internal/services/mediaasset/resolve_cache.go | 2 +- backend/internal/services/mediaasset/service.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/backend/internal/services/mediaasset/resolve_cache.go b/backend/internal/services/mediaasset/resolve_cache.go index 5057ed404..1b27e69da 100644 --- a/backend/internal/services/mediaasset/resolve_cache.go +++ b/backend/internal/services/mediaasset/resolve_cache.go @@ -142,7 +142,7 @@ func (s *Service) invalidateResolvedMediaAssetCache(assetID uuid.UUID) { deleteResolvedMediaAssetCacheKeys(ctx, s.cache, s.cacheGuard, assetID) } -func deleteResolvedMediaAssetCacheKeys(ctx context.Context, client *redis.Client, guard *redisdegrade.Guard, assetID uuid.UUID) { +func deleteResolvedMediaAssetCacheKeys(ctx context.Context, client redis.UniversalClient, guard *redisdegrade.Guard, assetID uuid.UUID) { var cursor uint64 pattern := resolvedMediaAssetCachePrefix + ":" + rediskey.Tag("asset", assetID.String()) + ":*" for { diff --git a/backend/internal/services/mediaasset/service.go b/backend/internal/services/mediaasset/service.go index 16176ef93..0a2ab896f 100644 --- a/backend/internal/services/mediaasset/service.go +++ b/backend/internal/services/mediaasset/service.go @@ -29,7 +29,7 @@ type Service struct { projects *projectsvc.Service objectStorage objectstorage.Client storageConfig objectstorage.Config - cache *redis.Client + cache redis.UniversalClient cacheTTL time.Duration cacheGuard *redisdegrade.Guard cacheGroup *singleflight.Group @@ -67,11 +67,11 @@ func (s *Service) UseObjectStorage(client objectstorage.Client, config objectsto s.storageConfig = config } -func (s *Service) UseRedis(client *redis.Client) { +func (s *Service) UseRedis(client redis.UniversalClient) { s.UseRedisCache(client) } -func (s *Service) UseRedisCache(client *redis.Client) { +func (s *Service) UseRedisCache(client redis.UniversalClient) { if client == nil { return } From d3656021cb1ac84f13d32e73673383bc516ac126 Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Fri, 19 Jun 2026 21:52:33 +0800 Subject: [PATCH 08/22] refactor(project): accept universal redis clients Project cache helpers scan and bump Redis generations across several cache families. Widen project cache fields and helpers to redis.UniversalClient. Project list and setup option caches can now run against Cluster-capable clients. --- backend/internal/services/project/list_cache.go | 2 +- backend/internal/services/project/service.go | 6 +++--- backend/internal/services/project/setup_options_cache.go | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/backend/internal/services/project/list_cache.go b/backend/internal/services/project/list_cache.go index c6beb7f78..ca78ff343 100644 --- a/backend/internal/services/project/list_cache.go +++ b/backend/internal/services/project/list_cache.go @@ -245,7 +245,7 @@ func (s *Service) refreshProjectReadModel(projectID uuid.UUID) { s.readModels.RefreshProjectAsync(s.requestContext(), projectID) } -func deleteDashboardProjectListCacheKeys(ctx context.Context, client *redis.Client, guard *redisdegrade.Guard) { +func deleteDashboardProjectListCacheKeys(ctx context.Context, client redis.UniversalClient, guard *redisdegrade.Guard) { var cursor uint64 for { type scanResult struct { diff --git a/backend/internal/services/project/service.go b/backend/internal/services/project/service.go index f8e850f1b..6e24e09dc 100644 --- a/backend/internal/services/project/service.go +++ b/backend/internal/services/project/service.go @@ -43,7 +43,7 @@ type Service struct { db *gorm.DB router *dbrouter.Router collabDocuments *collabdoc.Service - cache *redis.Client + cache redis.UniversalClient cacheTTL time.Duration cacheGroup *singleflight.Group projectListGuard *redisdegrade.Guard @@ -80,11 +80,11 @@ func (s *Service) SetCollabDocumentService(svc *collabdoc.Service) { s.collabDocuments = svc } -func (s *Service) UseRedis(client *redis.Client) { +func (s *Service) UseRedis(client redis.UniversalClient) { s.UseRedisCache(client) } -func (s *Service) UseRedisCache(client *redis.Client) { +func (s *Service) UseRedisCache(client redis.UniversalClient) { if client == nil { return } diff --git a/backend/internal/services/project/setup_options_cache.go b/backend/internal/services/project/setup_options_cache.go index 524f732ef..f09084cff 100644 --- a/backend/internal/services/project/setup_options_cache.go +++ b/backend/internal/services/project/setup_options_cache.go @@ -240,7 +240,7 @@ func (s *Service) invalidateBrandProfileOptionsCache(workspaceID uuid.UUID) { deleteContentSetupOptionsCacheKeys(ctx, s.cache, s.contentSetupGuard, contentSetupOptionsWorkspacePattern(contentSetupResourceBrandProfiles, workspaceID)) } -func deleteContentSetupOptionsCacheKeys(ctx context.Context, client *redis.Client, guard *redisdegrade.Guard, pattern string) { +func deleteContentSetupOptionsCacheKeys(ctx context.Context, client redis.UniversalClient, guard *redisdegrade.Guard, pattern string) { var cursor uint64 for { type scanResult struct { @@ -295,7 +295,7 @@ func (s *Service) contentSetupOptionsCacheGeneration(ctx context.Context, resour return "workspace:" + workspaceGeneration, nil } -func contentSetupOptionsGeneration(ctx context.Context, client *redis.Client, guard *redisdegrade.Guard, key string) (string, error) { +func contentSetupOptionsGeneration(ctx context.Context, client redis.UniversalClient, guard *redisdegrade.Guard, key string) (string, error) { generation, err := redisdegrade.CallWork(guard, "cache_read", func() (string, error) { return client.Get(ctx, key).Result() }) From 033675a62180c7bc423ef77e3d4fddd31e80d982 Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Fri, 19 Jun 2026 21:52:33 +0800 Subject: [PATCH 09/22] refactor(queue): accept universal redis clients Asynq-backed queues need to share the same Redis client abstraction as runtime roles. Widen email, publish, and dashboard rebuild queue constructors to redis.UniversalClient. Queue workers can now be constructed from direct, Sentinel, or Cluster Redis clients. --- backend/internal/services/email/queue.go | 4 ++-- backend/internal/services/publish/queue.go | 4 ++-- backend/internal/services/readmodel/queue.go | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/backend/internal/services/email/queue.go b/backend/internal/services/email/queue.go index d2108c2db..5343fecdf 100644 --- a/backend/internal/services/email/queue.go +++ b/backend/internal/services/email/queue.go @@ -32,11 +32,11 @@ type codeEmailJob struct { } type AsyncEmailService struct { - redisClient *redis.Client + redisClient redis.UniversalClient asynqClient *asynq.Client } -func NewAsyncEmailService(client *redis.Client) *AsyncEmailService { +func NewAsyncEmailService(client redis.UniversalClient) *AsyncEmailService { return &AsyncEmailService{ redisClient: client, asynqClient: asynq.NewClientFromRedisClient(client), diff --git a/backend/internal/services/publish/queue.go b/backend/internal/services/publish/queue.go index 46e5bf4d1..f0d247700 100644 --- a/backend/internal/services/publish/queue.go +++ b/backend/internal/services/publish/queue.go @@ -73,11 +73,11 @@ type PublishQueue interface { } type RedisPublishQueue struct { - redisClient *redis.Client + redisClient redis.UniversalClient asynqClient *asynq.Client } -func NewRedisPublishQueue(client *redis.Client) *RedisPublishQueue { +func NewRedisPublishQueue(client redis.UniversalClient) *RedisPublishQueue { return &RedisPublishQueue{ redisClient: client, asynqClient: asynq.NewClientFromRedisClient(client), diff --git a/backend/internal/services/readmodel/queue.go b/backend/internal/services/readmodel/queue.go index 028e7aa65..cc5a0e374 100644 --- a/backend/internal/services/readmodel/queue.go +++ b/backend/internal/services/readmodel/queue.go @@ -34,11 +34,11 @@ type DashboardRebuildTaskInfo struct { } type RedisDashboardRebuildQueue struct { - redisClient *redis.Client + redisClient redis.UniversalClient asynqClient *asynq.Client } -func NewRedisDashboardRebuildQueue(client *redis.Client) *RedisDashboardRebuildQueue { +func NewRedisDashboardRebuildQueue(client redis.UniversalClient) *RedisDashboardRebuildQueue { if client == nil { return nil } From 9d705247b8e9031ee6cb14ad0435aa790c9d6fb7 Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Fri, 19 Jun 2026 21:52:33 +0800 Subject: [PATCH 10/22] refactor(publish): accept universal redis clients Publish coordination and queue clients are supplied from the runtime Redis roles. Widen publish service Redis setters to redis.UniversalClient. Publishing can share Cluster-capable coordination and queue clients without extra wiring. --- backend/internal/services/publish/service.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/backend/internal/services/publish/service.go b/backend/internal/services/publish/service.go index 502f2bf47..156c5bc24 100644 --- a/backend/internal/services/publish/service.go +++ b/backend/internal/services/publish/service.go @@ -184,12 +184,12 @@ func (s *Service) SetDashboardReadModelUpdater(updater DashboardReadModelUpdater s.readModels = updater } -func (s *Service) UseRedis(client *redis.Client) { +func (s *Service) UseRedis(client redis.UniversalClient) { s.UseRedisCoordination(client) s.UseRedisQueue(client) } -func (s *Service) UseRedisCoordination(client *redis.Client) { +func (s *Service) UseRedisCoordination(client redis.UniversalClient) { if client == nil { return } @@ -199,7 +199,7 @@ func (s *Service) UseRedisCoordination(client *redis.Client) { } } -func (s *Service) UseRedisQueue(client *redis.Client) { +func (s *Service) UseRedisQueue(client redis.UniversalClient) { if client == nil { return } From eb8ecbee3a13784feeb8696ae989d78ad6823220 Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Fri, 19 Jun 2026 21:52:33 +0800 Subject: [PATCH 11/22] refactor(stats): accept universal redis clients Stats overview caching uses Redis through the dashboard service graph. Widen the stats cache client and setter to redis.UniversalClient. Stats cache calls can now use the same Cluster-capable clients as other services. --- backend/internal/services/stats/service.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/backend/internal/services/stats/service.go b/backend/internal/services/stats/service.go index 6b4e41f39..eed4be4e9 100644 --- a/backend/internal/services/stats/service.go +++ b/backend/internal/services/stats/service.go @@ -19,7 +19,7 @@ type Service struct { db *gorm.DB router *dbrouter.Router projects *projectsvc.Service - cache *redis.Client + cache redis.UniversalClient cacheTTL time.Duration cacheGroup *singleflight.Group cacheGuard *redisdegrade.Guard @@ -54,11 +54,11 @@ func (s *Service) WithContext(ctx context.Context) *Service { return &scoped } -func (s *Service) UseRedis(client *redis.Client) { +func (s *Service) UseRedis(client redis.UniversalClient) { s.UseRedisCache(client) } -func (s *Service) UseRedisCache(client *redis.Client) { +func (s *Service) UseRedisCache(client redis.UniversalClient) { if client == nil { return } From 5365637646dc52af5e5b82afff14a9acbc3e55f0 Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Fri, 19 Jun 2026 21:52:33 +0800 Subject: [PATCH 12/22] feat(browser-session): add cluster redis state client Browser workers need the same managed Redis endpoint modes as backend session continuity. Add Cluster endpoint parsing, URL TLS and auth merging, redirect handling, topology refresh, and DB zero validation. Worker session state can now use direct, Sentinel, or Cluster Redis providers. --- .../internal/session/redis_state.go | 90 +++++++++++++++++-- 1 file changed, 85 insertions(+), 5 deletions(-) diff --git a/browser-worker/internal/session/redis_state.go b/browser-worker/internal/session/redis_state.go index a2aadd012..aee681186 100644 --- a/browser-worker/internal/session/redis_state.go +++ b/browser-worker/internal/session/redis_state.go @@ -28,7 +28,9 @@ const ( redisSentinelMasterEnv = "REDIS_SENTINEL_MASTER_NAME" redisEndpointModeDirect = "direct" redisEndpointModeSentinel = "sentinel" + redisEndpointModeCluster = "cluster" redisSentinelMasterDefault = "mpp-redis-ha" + redisClusterMaxRedirects = 8 browserSessionKeyPrefix = "mpp:browser:session:" browserSessionHeartbeatPrefix = "mpp:browser:worker-heartbeat:" @@ -45,6 +47,7 @@ const ( redisDialerRetries = 2 redisDialerRetryTimeout = 75 * time.Millisecond redisCommandRetries = 1 + redisClusterReloadEvery = 10 * time.Second ) var ( @@ -53,18 +56,20 @@ var ( ) type RedisStateStore struct { - client *redis.Client + client redis.UniversalClient } type redisConnectionConfig struct { EndpointMode string Addr string + Username string Password string DB int TLS bool TLSCACert string TLSCAFile string TLSServerName string + ClusterAddrs []string SentinelAddrs []string SentinelMasterName string tls *tls.Config @@ -136,6 +141,13 @@ func redisConnectionConfigFromEnv() (redisConnectionConfig, error) { if config.Addr == "" { return redisConnectionConfig{}, errRedisNotConfigured } + case redisEndpointModeCluster: + if err := config.configureClusterFromEnv(); err != nil { + return redisConnectionConfig{}, err + } + if len(config.ClusterAddrs) == 0 { + return redisConnectionConfig{}, fmt.Errorf("%s must be set when %s=cluster", redisAddrEnv, redisEndpointModeEnv) + } case redisEndpointModeSentinel: config.SentinelAddrs = redisCSVEnv(redisSentinelAddrsEnv) config.SentinelMasterName = strings.TrimSpace(os.Getenv(redisSentinelMasterEnv)) @@ -151,6 +163,9 @@ func redisConnectionConfigFromEnv() (redisConnectionConfig, error) { if err != nil { return redisConnectionConfig{}, err } + if endpointMode == redisEndpointModeCluster && db != 0 { + return redisConnectionConfig{}, fmt.Errorf("invalid %s: must be 0 when %s=cluster", redisDBEnv, redisEndpointModeEnv) + } config.DB = db if err := config.configureTLS(); err != nil { return redisConnectionConfig{}, err @@ -159,12 +174,16 @@ func redisConnectionConfigFromEnv() (redisConnectionConfig, error) { return config, nil } -func newRedisClient(config redisConnectionConfig) *redis.Client { +func newRedisClient(config redisConnectionConfig) redis.UniversalClient { _ = config.configureTLS() - if config.EndpointMode == redisEndpointModeSentinel { + switch config.EndpointMode { + case redisEndpointModeCluster: + return redis.NewClusterClient(redisClusterOptions(config)) + case redisEndpointModeSentinel: return redis.NewFailoverClient(redisFailoverOptions(config)) + default: + return redis.NewClient(redisOptions(config)) } - return redis.NewClient(redisOptions(config)) } func redisOptions(config redisConnectionConfig) *redis.Options { @@ -204,6 +223,26 @@ func redisFailoverOptions(config redisConnectionConfig) *redis.FailoverOptions { } } +func redisClusterOptions(config redisConnectionConfig) *redis.ClusterOptions { + return &redis.ClusterOptions{ + Addrs: append([]string(nil), config.ClusterAddrs...), + Username: config.Username, + Password: config.Password, + TLSConfig: config.tlsConfig(), + MaxRedirects: redisClusterMaxRedirects, + ClusterStateReloadInterval: redisClusterReloadEvery, + DialTimeout: redisDialTimeout, + ReadTimeout: redisReadTimeout, + WriteTimeout: redisWriteTimeout, + PoolTimeout: redisPoolTimeout, + MaxRetries: redisCommandRetries, + MinRetryBackoff: redisMinRetryBackoff, + MaxRetryBackoff: redisMaxRetryBackoff, + DialerRetries: redisDialerRetries, + DialerRetryTimeout: redisDialerRetryTimeout, + } +} + func (s *RedisStateStore) Close() error { if s == nil || s.client == nil { return nil @@ -353,8 +392,10 @@ func redisEndpointModeFromEnv() (string, error) { return redisEndpointModeDirect, nil case redisEndpointModeSentinel: return redisEndpointModeSentinel, nil + case redisEndpointModeCluster: + return redisEndpointModeCluster, nil default: - return "", fmt.Errorf("%s must be one of: %s, %s", redisEndpointModeEnv, redisEndpointModeDirect, redisEndpointModeSentinel) + return "", fmt.Errorf("%s must be one of: %s, %s, %s", redisEndpointModeEnv, redisEndpointModeDirect, redisEndpointModeSentinel, redisEndpointModeCluster) } } @@ -370,6 +411,45 @@ func redisCSVEnv(name string) []string { return values } +func (c *redisConnectionConfig) configureClusterFromEnv() error { + addrs := redisCSVEnv(redisAddrEnv) + normalized := make([]string, 0, len(addrs)) + var urlUsername string + var urlPassword string + for _, addr := range addrs { + if !strings.Contains(addr, "://") { + normalized = append(normalized, addr) + continue + } + options, err := redis.ParseClusterURL(addr) + if err != nil { + return fmt.Errorf("invalid %s cluster URL: %w", redisAddrEnv, err) + } + normalized = append(normalized, options.Addrs...) + if options.Username != "" { + if urlUsername != "" && urlUsername != options.Username { + return fmt.Errorf("invalid %s cluster URL: conflicting usernames", redisAddrEnv) + } + urlUsername = options.Username + } + if options.Password != "" { + if urlPassword != "" && urlPassword != options.Password { + return fmt.Errorf("invalid %s cluster URL: conflicting passwords", redisAddrEnv) + } + urlPassword = options.Password + } + if options.TLSConfig != nil { + c.TLS = true + } + } + c.ClusterAddrs = normalized + c.Username = urlUsername + if c.Password == "" { + c.Password = urlPassword + } + return nil +} + func redisTLSConfig(config redisConnectionConfig) (*tls.Config, error) { tlsConfig := &tls.Config{ MinVersion: tls.VersionTLS12, From d5c72f610abe7d8236a94821f796a4f34f8ae16c Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Fri, 19 Jun 2026 21:52:33 +0800 Subject: [PATCH 13/22] feat(collab): add cluster redis client mode Collab sync previously supported direct and Sentinel Redis endpoints only. Add cluster config validation and node-redis createCluster options with seed nodes, TLS, auth, and redirection limits. Collab pubsub can now target Redis Cluster while keeping standalone and Sentinel rollback paths. --- collab-service/src/collab/redis-pubsub.ts | 58 +++++++++++++++++++++-- collab-service/src/config.ts | 20 +++++++- 2 files changed, 72 insertions(+), 6 deletions(-) diff --git a/collab-service/src/collab/redis-pubsub.ts b/collab-service/src/collab/redis-pubsub.ts index a42b1c3e4..08b203c4e 100644 --- a/collab-service/src/collab/redis-pubsub.ts +++ b/collab-service/src/collab/redis-pubsub.ts @@ -1,14 +1,16 @@ -import { createClient, createSentinel } from "redis"; +import { createClient, createCluster, createSentinel } from "redis"; import { applyUpdate } from "yjs"; import { createHash, randomUUID } from "node:crypto"; import { readFileSync } from "node:fs"; import type { Document } from "@hocuspocus/server"; -import type { RedisClientOptions } from "redis"; +import type { RedisClientOptions, RedisClusterOptions } from "redis"; import type { ConnectionOptions as TLSConnectionOptions } from "node:tls"; import type { CollabConfig } from "../config.js"; type RedisSentinelOptions = Parameters[0]; +type RedisClusterRootNode = RedisClusterOptions["rootNodes"][number]; +type RedisClusterDefaults = NonNullable; type RedisSocketOptions = NonNullable; const remoteUpdateHashTTLMS = 60_000; @@ -20,6 +22,7 @@ const redisCommandQueueMaxLength = 256; const redisMaxReconnectDelayMS = 2_000; const redisMaxReconnectRetries = 3; const redisReconnectJitterMS = 50; +const redisClusterMaxCommandRedirections = 8; interface CollabUpdateEnvelope { actorUserId?: string; @@ -206,6 +209,11 @@ export function createRedisCollabPubSub( } export function createRedisClientFromConfig(config: CollabConfig): RedisClient { + if (config.REDIS_ENDPOINT_MODE === "cluster") { + return createCluster( + redisClusterOptionsFromConfig(config), + ) as unknown as RedisClient; + } if (config.REDIS_ENDPOINT_MODE === "sentinel") { return createSentinel( redisSentinelOptionsFromConfig(config), @@ -230,6 +238,43 @@ export function redisClientOptionsFromConfig( }; } +export function redisClusterOptionsFromConfig( + config: CollabConfig, +): RedisClusterOptions { + return { + defaults: redisClusterDefaultsFromConfig(config), + maxCommandRedirections: redisClusterMaxCommandRedirections, + rootNodes: redisClusterRootNodesFromConfig(config), + }; +} + +function redisClusterDefaultsFromConfig( + config: CollabConfig, +): RedisClusterDefaults { + return { + commandsQueueMaxLength: redisCommandQueueMaxLength, + disableOfflineQueue: true, + database: config.REDIS_DB, + pingInterval: redisPingIntervalMS, + password: config.REDIS_PASSWORD || undefined, + socket: redisSocketOptionsFromConfig(config), + }; +} + +export function redisClusterRootNodesFromConfig( + config: CollabConfig, +): RedisClusterRootNode[] { + return redisSeedAddressesFromConfig(config).map((address) => ({ + url: redisUrlFromAddress(address, config.REDIS_TLS), + })); +} + +function redisSeedAddressesFromConfig(config: CollabConfig): string[] { + return config.REDIS_ADDR.split(",") + .map((value) => value.trim()) + .filter(Boolean); +} + function redisSocketOptionsFromConfig( config: CollabConfig, ): RedisSocketOptions { @@ -308,17 +353,20 @@ export function redisSentinelRootNodesFromConfig( } function redisUrlFromConfig(config: CollabConfig): string { - const raw = config.REDIS_ADDR.trim(); + return redisUrlFromAddress(config.REDIS_ADDR.trim(), config.REDIS_TLS); +} + +function redisUrlFromAddress(raw: string, tls: boolean): string { if (raw.startsWith("rediss://")) { return raw; } if (raw.startsWith("redis://")) { - if (!config.REDIS_TLS) { + if (!tls) { return raw; } return `rediss://${raw.slice("redis://".length)}`; } - const scheme = config.REDIS_TLS ? "rediss" : "redis"; + const scheme = tls ? "rediss" : "redis"; return `${scheme}://${raw}`; } diff --git a/collab-service/src/config.ts b/collab-service/src/config.ts index e177d4d81..28de53cf5 100644 --- a/collab-service/src/config.ts +++ b/collab-service/src/config.ts @@ -70,7 +70,9 @@ const BaseEnvSchema = z.object({ ), DB_CONN_MAX_LIFETIME: EnvDurationMillis("30m"), DB_CONN_MAX_IDLE_TIME: EnvDurationMillis("5m"), - REDIS_ENDPOINT_MODE: z.enum(["direct", "sentinel"]).default("direct"), + REDIS_ENDPOINT_MODE: z + .enum(["direct", "sentinel", "cluster"]) + .default("direct"), REDIS_ADDR: z.string().default("redis:6379"), REDIS_PASSWORD: z.string().default(""), REDIS_DB: z.coerce.number().int().nonnegative().default(0), @@ -100,6 +102,22 @@ const EnvSchema = BaseEnvSchema.superRefine((config, ctx) => { message: "must be set when REDIS_ENDPOINT_MODE=sentinel", }); } + if (config.REDIS_ENDPOINT_MODE === "cluster") { + if (config.REDIS_ADDR.trim() === "") { + ctx.addIssue({ + code: "custom", + path: ["REDIS_ADDR"], + message: "must be set when REDIS_ENDPOINT_MODE=cluster", + }); + } + if (config.REDIS_DB !== 0) { + ctx.addIssue({ + code: "custom", + path: ["REDIS_DB"], + message: "must be 0 when REDIS_ENDPOINT_MODE=cluster", + }); + } + } }); export type CollabConfig = z.infer; From 0ca9d258900f334cee12dcdd5ceb910f550101ac Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Fri, 19 Jun 2026 21:52:33 +0800 Subject: [PATCH 14/22] test(redisclient): cover cluster client modes Backend Redis config needs coverage for Cluster mode and URL-derived settings. Add tests for Cluster options, seed URL parsing, URL TLS and auth, DB validation, and direct-client assertions. The tests lock the managed Cluster contract while preserving existing direct and Sentinel expectations. --- .../internal/redisclient/redisclient_test.go | 108 +++++++++++++++++- 1 file changed, 103 insertions(+), 5 deletions(-) diff --git a/backend/internal/redisclient/redisclient_test.go b/backend/internal/redisclient/redisclient_test.go index e4129471e..408f62055 100644 --- a/backend/internal/redisclient/redisclient_test.go +++ b/backend/internal/redisclient/redisclient_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/alicebob/miniredis/v2" + "github.com/redis/go-redis/v9" "github.com/stretchr/testify/require" ) @@ -41,7 +42,9 @@ func TestNewFromEnvAppliesPoolOverrides(t *testing.T) { require.NoError(t, client.Close()) }) - options := client.Options() + directClient, ok := client.(*redis.Client) + require.True(t, ok) + options := directClient.Options() require.Equal(t, redisServer.Addr(), options.Addr) require.Equal(t, 2, options.DB) require.Equal(t, 32, options.PoolSize) @@ -110,6 +113,82 @@ func TestConfigFromEnvBuildsSentinelEndpoint(t *testing.T) { require.Equal(t, 9, options.MaxIdleConns) } +func TestConfigFromEnvBuildsClusterEndpoint(t *testing.T) { + clearRedisEnv(t) + t.Setenv(endpointModeEnv, endpointModeCluster) + t.Setenv(addrEnv, " redis-cluster-0:6379,redis-cluster-1:6379 ") + t.Setenv(passwordEnv, "redis-secret") + t.Setenv(tlsEnv, "true") + t.Setenv(tlsCACertEnv, testRedisCACertPEM) + t.Setenv(tlsServerNameEnv, "redis.internal.example") + t.Setenv(poolSizeEnv, "24") + t.Setenv(minIdleConnsEnv, "3") + t.Setenv(maxIdleConnsEnv, "9") + + config, err := ConfigFromEnv() + require.NoError(t, err) + + require.Equal(t, endpointModeCluster, config.EndpointMode) + require.Equal(t, []string{"redis-cluster-0:6379", "redis-cluster-1:6379"}, config.ClusterAddrs) + require.Equal(t, "redis-secret", config.Password) + require.Zero(t, config.DB) + require.True(t, config.TLS) + + options := clusterOptions(config, RoleQueue) + require.Equal(t, []string{"redis-cluster-0:6379", "redis-cluster-1:6379"}, options.Addrs) + require.Equal(t, "redis-secret", options.Password) + require.Equal(t, clusterMaxRedirects, options.MaxRedirects) + require.Equal(t, clusterStateReloadInterval, options.ClusterStateReloadInterval) + require.Equal(t, 2, options.MaxRetries) + require.Equal(t, 50*time.Millisecond, options.MinRetryBackoff) + require.Equal(t, 250*time.Millisecond, options.MaxRetryBackoff) + require.Equal(t, 1*time.Second, options.DialTimeout) + require.Equal(t, 2*time.Second, options.ReadTimeout) + require.Equal(t, 2*time.Second, options.WriteTimeout) + require.NotNil(t, options.TLSConfig) + require.Equal(t, "redis.internal.example", options.TLSConfig.ServerName) + require.Equal(t, 24, options.PoolSize) + require.Equal(t, 3, options.MinIdleConns) + require.Equal(t, 9, options.MaxIdleConns) + + client := newClient(config, RoleQueue) + t.Cleanup(func() { + require.NoError(t, client.Close()) + }) + _, ok := client.(*redis.ClusterClient) + require.True(t, ok) +} + +func TestConfigFromEnvBuildsClusterEndpointFromURLs(t *testing.T) { + clearRedisEnv(t) + t.Setenv(endpointModeEnv, endpointModeCluster) + t.Setenv(addrEnv, "rediss://cluster-user:cluster-pass@redis-cluster-0:6380?addr=redis-cluster-1:6379") + + config, err := ConfigFromEnv() + require.NoError(t, err) + + require.Equal(t, []string{"redis-cluster-0:6380", "redis-cluster-1:6379"}, config.ClusterAddrs) + require.True(t, config.TLS) + require.Equal(t, "cluster-user", config.Username) + require.Equal(t, "cluster-pass", config.Password) + + options := clusterOptions(config, RoleDefault) + require.Equal(t, "cluster-user", options.Username) + require.Equal(t, "cluster-pass", options.Password) + require.NotNil(t, options.TLSConfig) +} + +func TestConfigFromEnvRejectsConflictingClusterURLPasswords(t *testing.T) { + clearRedisEnv(t) + t.Setenv(endpointModeEnv, endpointModeCluster) + t.Setenv(addrEnv, "redis://:one@redis-cluster-0:6379,redis://:two@redis-cluster-1:6379") + + _, err := ConfigFromEnv() + + require.Error(t, err) + require.Contains(t, err.Error(), "conflicting passwords") +} + func TestConfigFromEnvBuildsTLSOptions(t *testing.T) { clearRedisEnv(t) t.Setenv(addrEnv, "redis.example.invalid:6379") @@ -274,9 +353,15 @@ func TestNewClientSetFromEnvBuildsDistinctRoleClients(t *testing.T) { require.NotNil(t, clients.Session) require.NotSame(t, clients.Default, clients.Coordination) require.NotSame(t, clients.Default, clients.Queue) - require.Equal(t, 500*time.Millisecond, clients.Coordination.Options().DialTimeout) - require.Equal(t, 2*time.Second, clients.Queue.Options().ReadTimeout) - require.Equal(t, 1*time.Second, clients.Session.Options().ReadTimeout) + coordinationClient, ok := clients.Coordination.(*redis.Client) + require.True(t, ok) + queueClient, ok := clients.Queue.(*redis.Client) + require.True(t, ok) + sessionClient, ok := clients.Session.(*redis.Client) + require.True(t, ok) + require.Equal(t, 500*time.Millisecond, coordinationClient.Options().DialTimeout) + require.Equal(t, 2*time.Second, queueClient.Options().ReadTimeout) + require.Equal(t, 1*time.Second, sessionClient.Options().ReadTimeout) } func TestConfigFromEnvUsesSentinelMasterOverride(t *testing.T) { @@ -301,9 +386,22 @@ func TestConfigFromEnvRejectsMissingSentinelAddrs(t *testing.T) { require.Contains(t, err.Error(), endpointModeEnv) } -func TestConfigFromEnvRejectsUnknownEndpointMode(t *testing.T) { +func TestConfigFromEnvRejectsClusterWithNonZeroDB(t *testing.T) { clearRedisEnv(t) t.Setenv(endpointModeEnv, "cluster") + t.Setenv(addrEnv, "redis-cluster-0:6379") + t.Setenv(dbEnv, "1") + + _, err := ConfigFromEnv() + + require.Error(t, err) + require.Contains(t, err.Error(), dbEnv) + require.Contains(t, err.Error(), endpointModeCluster) +} + +func TestConfigFromEnvRejectsUnknownEndpointMode(t *testing.T) { + clearRedisEnv(t) + t.Setenv(endpointModeEnv, "unknown") _, err := ConfigFromEnv() From 462c9d6ae427076b33fcc07b8d8be7dbd73f9b6c Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Fri, 19 Jun 2026 21:52:33 +0800 Subject: [PATCH 15/22] test(app): adapt redis role client assertion Runtime tests still inspect direct client options for role-specific Redis baselines. Assert the configured coordination client is a direct redis.Client before reading its options. The test remains precise after runtime wiring moves to redis.UniversalClient. --- backend/internal/app/runtime_test.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/backend/internal/app/runtime_test.go b/backend/internal/app/runtime_test.go index 7fc39f93b..de5159759 100644 --- a/backend/internal/app/runtime_test.go +++ b/backend/internal/app/runtime_test.go @@ -7,6 +7,7 @@ import ( "github.com/alicebob/miniredis/v2" "github.com/glebarez/sqlite" + "github.com/redis/go-redis/v9" "gorm.io/gorm" ) @@ -90,8 +91,12 @@ func TestNewRuntimeBuildsRedisRoleClientsWhenConfigured(t *testing.T) { if runtime.RedisClient == nil || runtime.RedisCoordination == nil || runtime.RedisCache == nil || runtime.RedisQueue == nil || runtime.RedisSessionContinuity == nil { t.Fatal("expected all redis role clients to be wired") } - if runtime.RedisCoordination.Options().DialTimeout.String() != "500ms" { - t.Fatalf("expected coordination client baseline, got %s", runtime.RedisCoordination.Options().DialTimeout) + coordinationClient, ok := runtime.RedisCoordination.(*redis.Client) + if !ok { + t.Fatalf("expected coordination client to be a direct redis client, got %T", runtime.RedisCoordination) + } + if coordinationClient.Options().DialTimeout.String() != "500ms" { + t.Fatalf("expected coordination client baseline, got %s", coordinationClient.Options().DialTimeout) } } From 829c4c44ac6e3b626c02e6078561aab1928ca0ee Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Fri, 19 Jun 2026 21:52:33 +0800 Subject: [PATCH 16/22] test(browser-session): cover cluster state config Browser worker Redis state config now supports Cluster mode. Add tests for Cluster options, URL TLS and auth, DB validation, and conflicting URL credentials. The worker state store contract is covered for direct, Sentinel, and Cluster endpoints. --- .../internal/session/redis_state_test.go | 82 +++++++++++++++++++ 1 file changed, 82 insertions(+) diff --git a/browser-worker/internal/session/redis_state_test.go b/browser-worker/internal/session/redis_state_test.go index 57d6ac735..aea44839e 100644 --- a/browser-worker/internal/session/redis_state_test.go +++ b/browser-worker/internal/session/redis_state_test.go @@ -5,6 +5,7 @@ import ( "os" "testing" + "github.com/redis/go-redis/v9" "github.com/stretchr/testify/require" ) @@ -145,6 +146,63 @@ func TestRedisConnectionConfigFromEnvUsesSentinelEndpoint(t *testing.T) { require.Equal(t, redisDialerRetryTimeout, options.DialerRetryTimeout) } +func TestRedisConnectionConfigFromEnvUsesClusterEndpoint(t *testing.T) { + clearRedisEnv(t) + t.Setenv(redisEndpointModeEnv, redisEndpointModeCluster) + t.Setenv(redisAddrEnv, " redis-cluster-0:6379,redis-cluster-1:6379 ") + t.Setenv(redisPasswordEnv, "redis-secret") + t.Setenv(redisTLSEnv, "true") + t.Setenv(redisTLSCACertEnv, testRedisCACertPEM) + t.Setenv(redisTLSServerNameEnv, "redis.internal.example") + + config, err := redisConnectionConfigFromEnv() + require.NoError(t, err) + + require.Equal(t, redisEndpointModeCluster, config.EndpointMode) + require.Equal(t, []string{"redis-cluster-0:6379", "redis-cluster-1:6379"}, config.ClusterAddrs) + require.Equal(t, "redis-secret", config.Password) + require.Zero(t, config.DB) + + options := redisClusterOptions(config) + require.Equal(t, []string{"redis-cluster-0:6379", "redis-cluster-1:6379"}, options.Addrs) + require.Equal(t, "redis-secret", options.Password) + require.Equal(t, redisClusterMaxRedirects, options.MaxRedirects) + require.Equal(t, redisClusterReloadEvery, options.ClusterStateReloadInterval) + require.Equal(t, redisCommandRetries, options.MaxRetries) + require.Equal(t, redisMinRetryBackoff, options.MinRetryBackoff) + require.Equal(t, redisMaxRetryBackoff, options.MaxRetryBackoff) + require.Equal(t, redisDialerRetries, options.DialerRetries) + require.Equal(t, redisDialerRetryTimeout, options.DialerRetryTimeout) + require.NotNil(t, options.TLSConfig) + require.Equal(t, "redis.internal.example", options.TLSConfig.ServerName) + + client := newRedisClient(config) + t.Cleanup(func() { + require.NoError(t, client.Close()) + }) + _, ok := client.(*redis.ClusterClient) + require.True(t, ok) +} + +func TestRedisConnectionConfigFromEnvUsesClusterEndpointURLs(t *testing.T) { + clearRedisEnv(t) + t.Setenv(redisEndpointModeEnv, redisEndpointModeCluster) + t.Setenv(redisAddrEnv, "rediss://cluster-user:cluster-pass@redis-cluster-0:6380?addr=redis-cluster-1:6379") + + config, err := redisConnectionConfigFromEnv() + require.NoError(t, err) + + require.Equal(t, []string{"redis-cluster-0:6380", "redis-cluster-1:6379"}, config.ClusterAddrs) + require.True(t, config.TLS) + require.Equal(t, "cluster-user", config.Username) + require.Equal(t, "cluster-pass", config.Password) + + options := redisClusterOptions(config) + require.Equal(t, "cluster-user", options.Username) + require.Equal(t, "cluster-pass", options.Password) + require.NotNil(t, options.TLSConfig) +} + func TestRedisConnectionConfigFromEnvKeepsRedisOptionalForDirectMode(t *testing.T) { clearRedisEnv(t) t.Setenv(redisDBEnv, "not-a-number") @@ -176,6 +234,30 @@ func TestRedisConnectionConfigFromEnvRejectsMissingSentinelAddrs(t *testing.T) { require.Contains(t, err.Error(), redisEndpointModeEnv) } +func TestRedisConnectionConfigFromEnvRejectsClusterWithNonZeroDB(t *testing.T) { + clearRedisEnv(t) + t.Setenv(redisEndpointModeEnv, redisEndpointModeCluster) + t.Setenv(redisAddrEnv, "redis-cluster-0:6379") + t.Setenv(redisDBEnv, "1") + + _, err := redisConnectionConfigFromEnv() + + require.Error(t, err) + require.Contains(t, err.Error(), redisDBEnv) + require.Contains(t, err.Error(), redisEndpointModeCluster) +} + +func TestRedisConnectionConfigFromEnvRejectsConflictingClusterURLPasswords(t *testing.T) { + clearRedisEnv(t) + t.Setenv(redisEndpointModeEnv, redisEndpointModeCluster) + t.Setenv(redisAddrEnv, "redis://:one@redis-cluster-0:6379,redis://:two@redis-cluster-1:6379") + + _, err := redisConnectionConfigFromEnv() + + require.Error(t, err) + require.Contains(t, err.Error(), "conflicting passwords") +} + func clearRedisEnv(t *testing.T) { t.Helper() From 43e86ba07e61486e7c60469deca49845bbd2b7ae Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Fri, 19 Jun 2026 21:52:33 +0800 Subject: [PATCH 17/22] test(collab): cover cluster redis options Collab Cluster mode needs option-level coverage before integration environments are available. Add tests for cluster root nodes, TLS and auth defaults, redirection limits, and DB validation. The tests verify node-redis receives the same provider settings as direct and Sentinel modes. --- .../src/collab/redis-pubsub.test.ts | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/collab-service/src/collab/redis-pubsub.test.ts b/collab-service/src/collab/redis-pubsub.test.ts index 322b57e02..c3eebefc1 100644 --- a/collab-service/src/collab/redis-pubsub.test.ts +++ b/collab-service/src/collab/redis-pubsub.test.ts @@ -7,6 +7,8 @@ import { encodeStateAsUpdate } from "yjs"; import { loadConfig } from "../config.js"; import { + redisClusterOptionsFromConfig, + redisClusterRootNodesFromConfig, redisClientOptionsFromConfig, redisSentinelOptionsFromConfig, redisSentinelRootNodesFromConfig, @@ -222,6 +224,66 @@ describe("redisClientOptionsFromConfig", () => { }); }); +describe("redisClusterOptionsFromConfig", () => { + it("builds cluster options from config", () => { + const config = loadConfig({ + NODE_ENV: "test", + REDIS_ENDPOINT_MODE: "cluster", + REDIS_ADDR: " redis-cluster-0:6379,redis-cluster-1:6379 ", + REDIS_PASSWORD: "redis-secret", + REDIS_TLS: "true", + REDIS_TLS_CA_CERT: testRedisCACertPEM, + REDIS_TLS_SERVER_NAME: "redis.internal.example", + }); + + const options = redisClusterOptionsFromConfig(config); + + expect(options.maxCommandRedirections).toBe(8); + expect(options.rootNodes).toEqual([ + { url: "rediss://redis-cluster-0:6379" }, + { url: "rediss://redis-cluster-1:6379" }, + ]); + expect(options.defaults).toMatchObject({ + commandsQueueMaxLength: 256, + disableOfflineQueue: true, + database: 0, + pingInterval: 15_000, + password: "redis-secret", + socket: { + ca: testRedisCACertPEM, + connectTimeout: 1_000, + servername: "redis.internal.example", + socketTimeout: 1_000, + tls: true, + }, + }); + }); + + it("rejects cluster mode with non-zero db", () => { + expect(() => + loadConfig({ + NODE_ENV: "test", + REDIS_ENDPOINT_MODE: "cluster", + REDIS_ADDR: "redis-cluster-0:6379", + REDIS_DB: "1", + }), + ).toThrow(/REDIS_DB/); + }); + + it("parses cluster seed nodes", () => { + const config = loadConfig({ + NODE_ENV: "test", + REDIS_ENDPOINT_MODE: "cluster", + REDIS_ADDR: "redis-cluster-0:6379,redis-cluster-1:6379", + }); + + expect(redisClusterRootNodesFromConfig(config)).toEqual([ + { url: "redis://redis-cluster-0:6379" }, + { url: "redis://redis-cluster-1:6379" }, + ]); + }); +}); + describe("redisSentinelOptionsFromConfig", () => { it("builds sentinel options from config", () => { const config = loadConfig({ From 2f0dce8ccb6a34e63f6e58b35bce82b9bc12a29a Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Fri, 19 Jun 2026 21:52:33 +0800 Subject: [PATCH 18/22] chore(env): validate cluster redis seed lists The shared env contract now advertises Redis Cluster seed lists. Add the cluster endpoint mode and validate REDIS_ADDR as comma-separated hostport or URL entries. Env doctor can now check direct, Sentinel, and Cluster Redis deployment inputs from the same schema. --- contracts/env.schema.yaml | 6 ++++-- script/env/doctor.rb | 4 +++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/contracts/env.schema.yaml b/contracts/env.schema.yaml index 04d710f0a..768603bf2 100644 --- a/contracts/env.schema.yaml +++ b/contracts/env.schema.yaml @@ -40,6 +40,7 @@ profiles: items: - name: REDIS_ENDPOINT_MODE value: direct + - comment: Use REDIS_ENDPOINT_MODE=cluster for Redis Cluster; in that mode REDIS_ADDR is a comma-separated seed-node list and REDIS_DB must stay 0. - name: REDIS_ADDR value: redis:6379 - comment: If your Redis instance requires a password, set it here. Otherwise, leave it blank. Both situations are supported. @@ -316,6 +317,7 @@ profiles: items: - name: REDIS_ENDPOINT_MODE value: direct + - comment: Use REDIS_ENDPOINT_MODE=cluster for Redis Cluster; in that mode REDIS_ADDR is a comma-separated seed-node list and REDIS_DB must stay 0. - name: REDIS_ADDR value: redis:6379 - comment: If your Redis instance requires a password, set it here. Otherwise, leave it blank. Both situations are supported. @@ -761,11 +763,11 @@ variables: REDIS_ENDPOINT_MODE: category: redis type: enum - values: [direct, sentinel] + values: [direct, sentinel, cluster] services: [backend, publish-worker, browser-worker, collab-service] REDIS_ADDR: category: redis - type: hostport_or_url + type: csv_hostport_or_url required_in: [dev, deploy] services: [backend, publish-worker, browser-worker, collab-service, redis] REDIS_PASSWORD: diff --git a/script/env/doctor.rb b/script/env/doctor.rb index e815956d8..1319380c8 100755 --- a/script/env/doctor.rb +++ b/script/env/doctor.rb @@ -177,6 +177,8 @@ def validate_type(path, name, value, spec) add_error(path, name, "must be a three-letter currency code") unless value.match?(/\A[A-Za-z]{3}\z/) when "hostport_or_url" validate_hostport_or_url(path, name, value) + when "csv_hostport_or_url" + csv_values(value).each { |entry| validate_hostport_or_url(path, name, entry) } when "address" validate_hostport(path, name, value) when "csv" @@ -340,7 +342,7 @@ def example_value(name, spec) "admin@example.com" when "currency" "USD" - when "hostport_or_url", "csv_hostport" + when "hostport_or_url", "csv_hostport", "csv_hostport_or_url" "redis:6379" when "address" "0.0.0.0:8080" From e0bf64489afb99d98a7e6db59ea77d04039b5d22 Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Fri, 19 Jun 2026 21:52:34 +0800 Subject: [PATCH 19/22] chore(env): refresh cluster redis examples Generated env examples must reflect the updated Redis endpoint contract. Regenerate dev and deploy examples with the Cluster seed-list guidance comment. Operators see the Cluster mode requirement without hand-editing generated files. --- deploy/docker/.env.deploy.example | 1 + deploy/docker/.env.dev.example | 1 + 2 files changed, 2 insertions(+) diff --git a/deploy/docker/.env.deploy.example b/deploy/docker/.env.deploy.example index 959a6cb02..9381e31b2 100644 --- a/deploy/docker/.env.deploy.example +++ b/deploy/docker/.env.deploy.example @@ -20,6 +20,7 @@ DB_READER_LAG_CHECK_INTERVAL=5s # Redis REDIS_ENDPOINT_MODE=direct +# Use REDIS_ENDPOINT_MODE=cluster for Redis Cluster; in that mode REDIS_ADDR is a comma-separated seed-node list and REDIS_DB must stay 0. REDIS_ADDR=redis:6379 # If your Redis instance requires a password, set it here. Otherwise, leave it blank. Both situations are supported. REDIS_PASSWORD=replace-with-a-strong-redis-password diff --git a/deploy/docker/.env.dev.example b/deploy/docker/.env.dev.example index 3e1b58b88..3f6936efc 100644 --- a/deploy/docker/.env.dev.example +++ b/deploy/docker/.env.dev.example @@ -20,6 +20,7 @@ DB_READER_LAG_CHECK_INTERVAL=5s # Redis REDIS_ENDPOINT_MODE=direct +# Use REDIS_ENDPOINT_MODE=cluster for Redis Cluster; in that mode REDIS_ADDR is a comma-separated seed-node list and REDIS_DB must stay 0. REDIS_ADDR=redis:6379 # If your Redis instance requires a password, set it here. Otherwise, leave it blank. Both situations are supported. REDIS_PASSWORD= From 4b017edaf2aa471a52a99cc6f923f6f93d771f23 Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Fri, 19 Jun 2026 21:52:34 +0800 Subject: [PATCH 20/22] docs(redis): document cluster client mode Redis deployment docs still described only direct and Sentinel endpoint modes. Document Cluster seed-node configuration and the REDIS_DB zero requirement in module and Kubernetes baseline docs. Readers can configure Cluster providers while keeping direct and Sentinel rollback paths clear. --- deploy/kubernetes/app-baseline/README.md | 4 +++- doc/module-interface.md | 12 ++++++------ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/deploy/kubernetes/app-baseline/README.md b/deploy/kubernetes/app-baseline/README.md index d2694d3e8..141bd180f 100644 --- a/deploy/kubernetes/app-baseline/README.md +++ b/deploy/kubernetes/app-baseline/README.md @@ -16,7 +16,9 @@ Required overlay inputs: provider or private CA requires a custom root certificate path. - A reachable Redis endpoint through `REDIS_ENDPOINT_MODE=direct` and `REDIS_ADDR`, or through `REDIS_ENDPOINT_MODE=sentinel` with - `REDIS_SENTINEL_ADDRS` and `REDIS_SENTINEL_MASTER_NAME`. + `REDIS_SENTINEL_ADDRS` and `REDIS_SENTINEL_MASTER_NAME`, or through + `REDIS_ENDPOINT_MODE=cluster` with `REDIS_ADDR` set to one or more + comma-separated seed nodes. Cluster mode requires `REDIS_DB=0`. - Redis TLS policy through `REDIS_TLS`; set `REDIS_TLS_CA_CERT`, `REDIS_TLS_CA_FILE`, or `REDIS_TLS_SERVER_NAME` only when the provider requires custom trust material or a documented SNI override. diff --git a/doc/module-interface.md b/doc/module-interface.md index 4852b4619..90f6b5551 100644 --- a/doc/module-interface.md +++ b/doc/module-interface.md @@ -243,7 +243,7 @@ This endpoint lets `content-pipeline-service` exchange an object ref for a short | `DB_SSLMODE`, `DB_SSLROOTCERT` | No | `disable`, empty | PostgreSQL TLS | | `DB_MAX_OPEN_CONNS`, `DB_MAX_IDLE_CONNS`, `DB_CONN_MAX_LIFETIME`, `DB_CONN_MAX_IDLE_TIME` | No | `10`, `5`, `30m`, `5m` | Database pool settings | | `DB_READER_HOST` and other `DB_READER_*` values | No | Empty | Optional read replica | -| `REDIS_ENDPOINT_MODE`, `REDIS_ADDR`, `REDIS_PASSWORD`, `REDIS_DB`, `REDIS_TLS` | Depends on feature | `direct`, Redis DB defaults to `0` | Queues, locks, verification codes, rate limits, browser sessions. The backend applies role-specific timeout/retry defaults: `R0` coordination 500ms fail-fast with no command retry, `R1` continuity 750ms/1s with 1 bounded retry, `R2` caches 750ms with 1 bounded retry, `R4` queues 1-2s with 2 bounded retries | +| `REDIS_ENDPOINT_MODE`, `REDIS_ADDR`, `REDIS_PASSWORD`, `REDIS_DB`, `REDIS_TLS` | Depends on feature | `direct`, Redis DB defaults to `0` | Queues, locks, verification codes, rate limits, browser sessions. Modes are `direct`, `sentinel`, or `cluster`; in Cluster mode `REDIS_ADDR` is a comma-separated seed-node list and `REDIS_DB` must be `0`. The backend applies role-specific timeout/retry defaults: `R0` coordination 500ms fail-fast with no command retry, `R1` continuity 750ms/1s with 1 bounded retry, `R2` caches 750ms with 1 bounded retry, `R4` queues 1-2s with 2 bounded retries | | `REDIS_TLS_CA_CERT`, `REDIS_TLS_CA_FILE`, `REDIS_TLS_SERVER_NAME` | No | Empty | Optional managed Redis TLS trust material and SNI/certificate hostname override | | `REDIS_SENTINEL_ADDRS`, `REDIS_SENTINEL_MASTER_NAME` | Required when `REDIS_ENDPOINT_MODE=sentinel` | Empty, `mpp-redis-ha` | Sentinel seed endpoints and monitored master name for HA Redis | | `REDIS_POOL_SIZE`, `REDIS_MIN_IDLE_CONNS`, `REDIS_MAX_IDLE_CONNS`, `REDIS_CONN_MAX_IDLE_TIME`, `REDIS_CONN_MAX_LIFETIME` | No | go-redis defaults or template values | Redis pool settings | @@ -397,7 +397,7 @@ Key body shape: | Parameter | Required | Default | Description | | --- | --- | --- | --- | | `BROWSER_WORKER_INTERNAL_TOKEN` | Yes | None | Internal API bearer token | -| `REDIS_ENDPOINT_MODE`, `REDIS_ADDR`, `REDIS_PASSWORD`, `REDIS_DB`, `REDIS_TLS` | No | Empty direct address disables Redis client, but `/ready` can still pass with an empty store | Session state storage. Browser Worker uses the same `R1` continuity baseline as backend live-session continuity: 750ms dial, 1s read/write, 1 bounded retry | +| `REDIS_ENDPOINT_MODE`, `REDIS_ADDR`, `REDIS_PASSWORD`, `REDIS_DB`, `REDIS_TLS` | No | Empty direct address disables Redis client, but `/ready` can still pass with an empty store | Session state storage. Modes are `direct`, `sentinel`, or `cluster`; in Cluster mode `REDIS_ADDR` is a comma-separated seed-node list and `REDIS_DB` must be `0`. Browser Worker uses the same `R1` continuity baseline as backend live-session continuity: 750ms dial, 1s read/write, 1 bounded retry | | `REDIS_TLS_CA_CERT`, `REDIS_TLS_CA_FILE`, `REDIS_TLS_SERVER_NAME` | No | Empty | Optional managed Redis TLS trust material and SNI/certificate hostname override | | `REDIS_SENTINEL_ADDRS`, `REDIS_SENTINEL_MASTER_NAME` | Required when `REDIS_ENDPOINT_MODE=sentinel` | Empty, `mpp-redis-ha` | Sentinel seed endpoints and monitored master name for HA Redis | | `BROWSER_WORKER_POOL_SIZE` | No | `4` | Concurrent browser session limit; `0` means unlimited | @@ -449,7 +449,7 @@ Collab session JWTs must include: | `DATABASE_URL` | No | None | PostgreSQL URL. When set, it can replace split DB parameters | | `DB_HOST`, `DB_PORT`, `DB_USER`, `DB_PASSWORD`, `DB_NAME`, `DB_SSLMODE`, `DB_SSLROOTCERT` | Yes | Template values | PostgreSQL connection | | `DB_MAX_OPEN_CONNS`, `DB_CONN_MAX_LIFETIME`, `DB_CONN_MAX_IDLE_TIME` | No | `10`, `30m`, `5m` | Database pool settings | -| `REDIS_ENDPOINT_MODE`, `REDIS_ADDR`, `REDIS_PASSWORD`, `REDIS_DB`, `REDIS_TLS` | Yes when Redis sync is enabled | `direct`, `redis:6379`, empty, `0`, `false` | Multi-instance collaboration sync. Collab Redis pub/sub uses 1s connect/socket timeout, disabled offline queue, bounded reconnect backoff up to 2s, and a max command queue length of 256 | +| `REDIS_ENDPOINT_MODE`, `REDIS_ADDR`, `REDIS_PASSWORD`, `REDIS_DB`, `REDIS_TLS` | Yes when Redis sync is enabled | `direct`, `redis:6379`, empty, `0`, `false` | Multi-instance collaboration sync. Modes are `direct`, `sentinel`, or `cluster`; in Cluster mode `REDIS_ADDR` is a comma-separated seed-node list and `REDIS_DB` must be `0`. Collab Redis pub/sub uses 1s connect/socket timeout, disabled offline queue, bounded reconnect backoff up to 2s, and a max command queue length of 256 | | `REDIS_TLS_CA_CERT`, `REDIS_TLS_CA_FILE`, `REDIS_TLS_SERVER_NAME` | No | Empty | Optional managed Redis TLS trust material and SNI/certificate hostname override | | `REDIS_SENTINEL_ADDRS`, `REDIS_SENTINEL_MASTER_NAME` | Required when `REDIS_ENDPOINT_MODE=sentinel` | Empty, `mpp-redis-ha` | Sentinel seed endpoints and monitored master name for HA Redis | | `COLLAB_REDIS_SYNC_ENABLED` | No | `true` | Enables Redis pub/sub sync | @@ -564,10 +564,10 @@ Compose also includes `pgbouncer`. In Compose, backend and collab-service connec | Parameter | Default | Description | | --- | --- | --- | -| `REDIS_ADDR` | `redis:6379` in Compose | Redis address | -| `REDIS_ENDPOINT_MODE` | `direct` | `direct` uses `REDIS_ADDR`; `sentinel` uses `REDIS_SENTINEL_ADDRS` and `REDIS_SENTINEL_MASTER_NAME` | +| `REDIS_ADDR` | `redis:6379` in Compose | Redis address; in `cluster` mode this may be a comma-separated seed-node list | +| `REDIS_ENDPOINT_MODE` | `direct` | `direct` uses `REDIS_ADDR`; `sentinel` uses `REDIS_SENTINEL_ADDRS` and `REDIS_SENTINEL_MASTER_NAME`; `cluster` uses `REDIS_ADDR` seed nodes | | `REDIS_PASSWORD` | Empty | Password | -| `REDIS_DB` | `0` | DB index | +| `REDIS_DB` | `0` | DB index; must be `0` when `REDIS_ENDPOINT_MODE=cluster` | | `REDIS_TLS` | `false` | TLS | | `REDIS_TLS_CA_CERT` | Empty | Inline PEM CA bundle when managed Redis requires custom trust material | | `REDIS_TLS_CA_FILE` | Empty | Mounted PEM CA bundle path when managed Redis requires custom trust material | From d45e1d4f7246a1f7e843ab44cff57c0429dcbad8 Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Fri, 19 Jun 2026 22:28:14 +0800 Subject: [PATCH 21/22] fix(project): tag list cache scan pattern Cluster SCAN routing depends on a hash tag in the match pattern. Reuse the dashboard project-list hash tag for the invalidation scan pattern and cache keys. Project list invalidation now scans the same slot as the cached list entries in Cluster mode. --- backend/internal/services/project/list_cache.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/backend/internal/services/project/list_cache.go b/backend/internal/services/project/list_cache.go index ca78ff343..cb77e599e 100644 --- a/backend/internal/services/project/list_cache.go +++ b/backend/internal/services/project/list_cache.go @@ -16,11 +16,12 @@ import ( "github.com/kurodakayn/mpp-backend/internal/dto" "github.com/kurodakayn/mpp-backend/internal/pkg/cachettl" "github.com/kurodakayn/mpp-backend/internal/pkg/redisdegrade" - "github.com/kurodakayn/mpp-backend/internal/pkg/rediskey" ) const dashboardProjectListCachePrefix = "mpp:dashboard:projects:list:v2" -const dashboardProjectListCacheGenerationKey = "mpp:dashboard:projects:list-generation:v2:{dashboard:projects-list}" +const dashboardProjectListCacheHashTag = "{dashboard:projects-list}" +const dashboardProjectListCacheGenerationKey = "mpp:dashboard:projects:list-generation:v2:" + dashboardProjectListCacheHashTag +const dashboardProjectListCachePattern = dashboardProjectListCachePrefix + ":" + dashboardProjectListCacheHashTag + ":*" const dashboardProjectListDegradedGeneration = "degraded" const dashboardProjectListRefreshTimeout = 15 * time.Second const dashboardProjectListInvalidateTimeout = 2 * time.Second @@ -253,7 +254,7 @@ func deleteDashboardProjectListCacheKeys(ctx context.Context, client redis.Unive next uint64 } result, err := redisdegrade.CallWork(guard, "cache_invalidate", func() (scanResult, error) { - keys, next, err := client.Scan(ctx, cursor, dashboardProjectListCachePrefix+":*", 100).Result() + keys, next, err := client.Scan(ctx, cursor, dashboardProjectListCachePattern, 100).Result() return scanResult{keys: keys, next: next}, err }) if err != nil { @@ -297,7 +298,7 @@ func dashboardProjectListCacheKey(params dashboardProjectListCacheParams) string return fmt.Sprintf("%s:%d:%d", dashboardProjectListCachePrefix, params.Page, params.Limit) } sum := sha256.Sum256(encoded) - return dashboardProjectListCachePrefix + ":" + rediskey.Tag("dashboard", "projects-list") + ":" + hex.EncodeToString(sum[:]) + return dashboardProjectListCachePrefix + ":" + dashboardProjectListCacheHashTag + ":" + hex.EncodeToString(sum[:]) } func uuidStringValue(value *uuid.UUID) string { From f2c34c7571d5298f4a7595e0bd2890d8993075a9 Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Fri, 19 Jun 2026 22:28:14 +0800 Subject: [PATCH 22/22] test(project): cover tagged list scan pattern The project-list cache key contract needs to include the invalidation pattern. Assert the scan pattern shares the dashboard project-list hash tag with generated cache keys. Future changes cannot accidentally route Cluster invalidation scans to an arbitrary shard. --- .../internal/services/project/list_cache_keys_test.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/backend/internal/services/project/list_cache_keys_test.go b/backend/internal/services/project/list_cache_keys_test.go index 3604bee93..45e22a799 100644 --- a/backend/internal/services/project/list_cache_keys_test.go +++ b/backend/internal/services/project/list_cache_keys_test.go @@ -20,3 +20,14 @@ func TestDashboardProjectListKeysUseFamilyHashTag(t *testing.T) { require.True(t, ok) require.Equal(t, "dashboard:projects-list", tag) } + +func TestDashboardProjectListScanPatternUsesFamilyHashTag(t *testing.T) { + cacheKey := dashboardProjectListCacheKey(dashboardProjectListCacheParams{ + Generation: "1", + Page: 1, + Limit: 20, + }) + + require.True(t, rediskey.ShareTag(cacheKey, dashboardProjectListCachePattern)) + require.Contains(t, dashboardProjectListCachePattern, dashboardProjectListCacheHashTag) +}