Skip to content

Commit 13e2d01

Browse files
committed
fix: 账号移出分组时清理粘性会话,修复关闭账号后仍能使用的问题
当账号从分组中移除或分组关系发生变更时,之前绑定该账号的粘性会话 会继续将流量路由到该账号,导致已禁用/移除的账号仍能继续处理请求。 变更内容: - GatewayCache 接口新增 DeleteStickySessionsByAccount 方法, 并在 gateway_cache.go 中通过反向索引 (sticky_session_index:{groupID}:{accountID}) 实现批量删除 - SchedulerSnapshotService 注入 GatewayCache,当收到 AccountGroupsChanged outbox 事件时自动清理对应的粘性会话 - OpenAI 和 Gemini 网关服务的 tryStickySessionHit 中新增 isAccountInGroup 防御性检查,确保账号仍属于当前分组 - 更新所有相关测试 stub/mock(testutil、openai、gemini、gateway) - 重新生成 wire_gen.go 以适配新的 ProvideSchedulerSnapshotService 签名 Closes Wei-Shaw#662
1 parent 19ea392 commit 13e2d01

15 files changed

Lines changed: 151 additions & 11 deletions

backend/cmd/server/wire_gen.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ require (
8787
github.com/goccy/go-json v0.10.2 // indirect
8888
github.com/google/go-cmp v0.7.0 // indirect
8989
github.com/google/go-querystring v1.1.0 // indirect
90+
github.com/google/subcommands v1.2.0 // indirect
9091
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 // indirect
9192
github.com/hashicorp/hcl v1.0.0 // indirect
9293
github.com/hashicorp/hcl/v2 v2.18.1 // indirect

backend/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,8 @@ github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17
136136
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
137137
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs=
138138
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
139+
github.com/google/subcommands v1.2.0 h1:vWQspBTo2nEqTUFita5/KeEWlUL8kQObDFbub/EN9oE=
140+
github.com/google/subcommands v1.2.0/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk=
139141
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
140142
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
141143
github.com/google/wire v0.7.0 h1:JxUKI6+CVBgCO2WToKy/nQk0sS+amI9z9EjVmdaocj4=

backend/internal/handler/gateway_handler_warmup_intercept_unit_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ func newTestGatewayHandler(t *testing.T, group *service.Group, accounts []*servi
125125
t.Helper()
126126

127127
schedulerCache := &fakeSchedulerCache{accounts: accounts}
128-
schedulerSnapshot := service.NewSchedulerSnapshotService(schedulerCache, nil, nil, nil, nil)
128+
schedulerSnapshot := service.NewSchedulerSnapshotService(schedulerCache, nil, nil, nil, nil, nil)
129129

130130
gwSvc := service.NewGatewayService(
131131
nil, // accountRepo (not used: scheduler snapshot hit)

backend/internal/repository/gateway_cache.go

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
)
1111

1212
const stickySessionPrefix = "sticky_session:"
13+
const stickySessionIndexPrefix = "sticky_session_index:"
1314

1415
type gatewayCache struct {
1516
rdb *redis.Client
@@ -25,14 +26,27 @@ func buildSessionKey(groupID int64, sessionHash string) string {
2526
return fmt.Sprintf("%s%d:%s", stickySessionPrefix, groupID, sessionHash)
2627
}
2728

29+
// buildSessionIndexKey 构建反向索引 key,用于从 accountID 查找 sessionHash
30+
// 格式: sticky_session_index:{groupID}:{accountID}
31+
func buildSessionIndexKey(groupID int64, accountID int64) string {
32+
return fmt.Sprintf("%s%d:%d", stickySessionIndexPrefix, groupID, accountID)
33+
}
34+
2835
func (c *gatewayCache) GetSessionAccountID(ctx context.Context, groupID int64, sessionHash string) (int64, error) {
2936
key := buildSessionKey(groupID, sessionHash)
3037
return c.rdb.Get(ctx, key).Int64()
3138
}
3239

3340
func (c *gatewayCache) SetSessionAccountID(ctx context.Context, groupID int64, sessionHash string, accountID int64, ttl time.Duration) error {
3441
key := buildSessionKey(groupID, sessionHash)
35-
return c.rdb.Set(ctx, key, accountID, ttl).Err()
42+
indexKey := buildSessionIndexKey(groupID, accountID)
43+
44+
pipe := c.rdb.Pipeline()
45+
pipe.Set(ctx, key, accountID, ttl)
46+
pipe.SAdd(ctx, indexKey, sessionHash)
47+
pipe.Expire(ctx, indexKey, ttl)
48+
_, err := pipe.Exec(ctx)
49+
return err
3650
}
3751

3852
func (c *gatewayCache) RefreshSessionTTL(ctx context.Context, groupID int64, sessionHash string, ttl time.Duration) error {
@@ -49,5 +63,41 @@ func (c *gatewayCache) RefreshSessionTTL(ctx context.Context, groupID int64, ses
4963
// or unschedulable), allowing subsequent requests to select a new available account.
5064
func (c *gatewayCache) DeleteSessionAccountID(ctx context.Context, groupID int64, sessionHash string) error {
5165
key := buildSessionKey(groupID, sessionHash)
66+
// 注意:这里不清理反向索引,因为不知道对应的 accountID
67+
// 反向索引会在 GetSessionAccountID 返回错误时由调用方清理,或在 DeleteStickySessionsByAccount 中批量清理
5268
return c.rdb.Del(ctx, key).Err()
5369
}
70+
71+
// DeleteStickySessionsByAccount 删除指定账号在指定分组中的所有粘性会话。
72+
// 当账号被移除分组时调用,确保该账号不会继续被 sticky session 使用。
73+
//
74+
// DeleteStickySessionsByAccount deletes all sticky sessions for the given account in the given group.
75+
// Called when account is removed from a group to ensure it won't be used by sticky sessions.
76+
func (c *gatewayCache) DeleteStickySessionsByAccount(ctx context.Context, groupID int64, accountID int64) error {
77+
indexKey := buildSessionIndexKey(groupID, accountID)
78+
79+
// 获取该账号的所有 sessionHash
80+
sessionHashes, err := c.rdb.SMembers(ctx, indexKey).Result()
81+
if err != nil {
82+
return fmt.Errorf("get session index failed: %w", err)
83+
}
84+
85+
if len(sessionHashes) == 0 {
86+
return nil
87+
}
88+
89+
// 批量删除 sticky session
90+
pipe := c.rdb.Pipeline()
91+
for _, sessionHash := range sessionHashes {
92+
sessionKey := buildSessionKey(groupID, sessionHash)
93+
pipe.Del(ctx, sessionKey)
94+
}
95+
// 删除反向索引
96+
pipe.Del(ctx, indexKey)
97+
98+
_, err = pipe.Exec(ctx)
99+
if err != nil {
100+
return fmt.Errorf("delete sticky sessions failed: %w", err)
101+
}
102+
return nil
103+
}

backend/internal/repository/scheduler_snapshot_outbox_integration_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func TestSchedulerSnapshotOutboxReplay(t *testing.T) {
4848
require.NoError(t, accountRepo.Create(ctx, account))
4949
require.NoError(t, cache.SetAccount(ctx, account))
5050

51-
svc := service.NewSchedulerSnapshotService(cache, outboxRepo, accountRepo, nil, cfg)
51+
svc := service.NewSchedulerSnapshotService(cache, outboxRepo, accountRepo, nil, nil, cfg)
5252
svc.Start()
5353
t.Cleanup(svc.Stop)
5454

backend/internal/service/gateway_multiplatform_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,23 @@ func (m *mockGatewayCacheForPlatform) DeleteSessionAccountID(ctx context.Context
221221
return nil
222222
}
223223

224+
func (m *mockGatewayCacheForPlatform) DeleteStickySessionsByAccount(ctx context.Context, groupID int64, accountID int64) error {
225+
if m.sessionBindings == nil {
226+
return nil
227+
}
228+
// 删除所有绑定到该账号的会话
229+
for sessionHash, boundAccountID := range m.sessionBindings {
230+
if boundAccountID == accountID {
231+
delete(m.sessionBindings, sessionHash)
232+
if m.deletedSessions == nil {
233+
m.deletedSessions = make(map[string]int)
234+
}
235+
m.deletedSessions[sessionHash]++
236+
}
237+
}
238+
return nil
239+
}
240+
224241
type mockGroupRepoForGateway struct {
225242
groups map[int64]*Group
226243
getByIDCalls int

backend/internal/service/gateway_service.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,9 @@ type GatewayCache interface {
345345
// DeleteSessionAccountID 删除粘性会话绑定,用于账号不可用时主动清理
346346
// Delete sticky session binding, used to proactively clean up when account becomes unavailable
347347
DeleteSessionAccountID(ctx context.Context, groupID int64, sessionHash string) error
348+
// DeleteStickySessionsByAccount 删除指定账号在指定分组中的所有粘性会话
349+
// Delete all sticky sessions for the given account in the given group
350+
DeleteStickySessionsByAccount(ctx context.Context, groupID int64, accountID int64) error
348351
}
349352

350353
// derefGroupID safely dereferences *int64 to int64, returning 0 if nil
@@ -1403,7 +1406,7 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro
14031406
if clearSticky {
14041407
_ = s.cache.DeleteSessionAccountID(ctx, derefGroupID(groupID), sessionHash)
14051408
}
1406-
if !clearSticky && s.isAccountInGroup(account, groupID) &&
1409+
if !clearSticky && isAccountInGroup(account, groupID) &&
14071410
s.isAccountAllowedForPlatform(account, platform, useMixed) &&
14081411
(requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, account, requestedModel)) &&
14091412
account.IsSchedulableForModelWithContext(ctx, requestedModel) &&
@@ -1857,7 +1860,7 @@ func (s *GatewayService) isAccountAllowedForPlatform(account *Account, platform
18571860

18581861
// isAccountInGroup checks if the account belongs to the specified group.
18591862
// Returns true if groupID is nil (no group restriction) or account belongs to the group.
1860-
func (s *GatewayService) isAccountInGroup(account *Account, groupID *int64) bool {
1863+
func isAccountInGroup(account *Account, groupID *int64) bool {
18611864
if groupID == nil {
18621865
return true // 无分组限制
18631866
}
@@ -2397,7 +2400,7 @@ func (s *GatewayService) selectAccountForModelWithPlatform(ctx context.Context,
23972400
if clearSticky {
23982401
_ = s.cache.DeleteSessionAccountID(ctx, derefGroupID(groupID), sessionHash)
23992402
}
2400-
if !clearSticky && s.isAccountInGroup(account, groupID) && account.Platform == platform && (requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, account, requestedModel)) && account.IsSchedulableForModelWithContext(ctx, requestedModel) {
2403+
if !clearSticky && isAccountInGroup(account, groupID) && account.Platform == platform && (requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, account, requestedModel)) && account.IsSchedulableForModelWithContext(ctx, requestedModel) {
24012404
if s.debugModelRoutingEnabled() {
24022405
logger.LegacyPrintf("service.gateway", "[ModelRoutingDebug] legacy routed sticky hit: group_id=%v model=%s session=%s account=%d", derefGroupID(groupID), requestedModel, shortSessionHash(sessionHash), accountID)
24032406
}
@@ -2497,7 +2500,7 @@ func (s *GatewayService) selectAccountForModelWithPlatform(ctx context.Context,
24972500
if clearSticky {
24982501
_ = s.cache.DeleteSessionAccountID(ctx, derefGroupID(groupID), sessionHash)
24992502
}
2500-
if !clearSticky && s.isAccountInGroup(account, groupID) && account.Platform == platform && (requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, account, requestedModel)) && account.IsSchedulableForModelWithContext(ctx, requestedModel) {
2503+
if !clearSticky && isAccountInGroup(account, groupID) && account.Platform == platform && (requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, account, requestedModel)) && account.IsSchedulableForModelWithContext(ctx, requestedModel) {
25012504
return account, nil
25022505
}
25032506
}
@@ -2604,7 +2607,7 @@ func (s *GatewayService) selectAccountWithMixedScheduling(ctx context.Context, g
26042607
if clearSticky {
26052608
_ = s.cache.DeleteSessionAccountID(ctx, derefGroupID(groupID), sessionHash)
26062609
}
2607-
if !clearSticky && s.isAccountInGroup(account, groupID) && (requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, account, requestedModel)) && account.IsSchedulableForModelWithContext(ctx, requestedModel) {
2610+
if !clearSticky && isAccountInGroup(account, groupID) && (requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, account, requestedModel)) && account.IsSchedulableForModelWithContext(ctx, requestedModel) {
26082611
if account.Platform == nativePlatform || (account.Platform == PlatformAntigravity && account.IsMixedSchedulingEnabled()) {
26092612
if s.debugModelRoutingEnabled() {
26102613
logger.LegacyPrintf("service.gateway", "[ModelRoutingDebug] legacy mixed routed sticky hit: group_id=%v model=%s session=%s account=%d", derefGroupID(groupID), requestedModel, shortSessionHash(sessionHash), accountID)
@@ -2706,7 +2709,7 @@ func (s *GatewayService) selectAccountWithMixedScheduling(ctx context.Context, g
27062709
if clearSticky {
27072710
_ = s.cache.DeleteSessionAccountID(ctx, derefGroupID(groupID), sessionHash)
27082711
}
2709-
if !clearSticky && s.isAccountInGroup(account, groupID) && (requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, account, requestedModel)) && account.IsSchedulableForModelWithContext(ctx, requestedModel) {
2712+
if !clearSticky && isAccountInGroup(account, groupID) && (requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, account, requestedModel)) && account.IsSchedulableForModelWithContext(ctx, requestedModel) {
27102713
if account.Platform == nativePlatform || (account.Platform == PlatformAntigravity && account.IsMixedSchedulingEnabled()) {
27112714
return account, nil
27122715
}

backend/internal/service/gemini_messages_compat_service.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,13 @@ func (s *GeminiMessagesCompatService) tryStickySessionHit(
207207
return nil
208208
}
209209

210+
// 检查账号是否仍在当前分组中(防御性检查:防止账号被移除分组后继续使用)
211+
// Verify account still belongs to the current group
212+
if !isAccountInGroup(account, groupID) {
213+
_ = s.cache.DeleteSessionAccountID(ctx, derefGroupID(groupID), cacheKey)
214+
return nil
215+
}
216+
210217
// 验证账号是否可用于当前请求
211218
// Verify account is usable for current request
212219
if !s.isAccountUsableForRequest(ctx, account, requestedModel, platform, useMixedScheduling) {

backend/internal/service/gemini_multiplatform_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,23 @@ func (m *mockGatewayCacheForGemini) DeleteSessionAccountID(ctx context.Context,
274274
return nil
275275
}
276276

277+
func (m *mockGatewayCacheForGemini) DeleteStickySessionsByAccount(ctx context.Context, groupID int64, accountID int64) error {
278+
if m.sessionBindings == nil {
279+
return nil
280+
}
281+
// 删除所有绑定到该账号的会话
282+
for sessionHash, boundAccountID := range m.sessionBindings {
283+
if boundAccountID == accountID {
284+
delete(m.sessionBindings, sessionHash)
285+
if m.deletedSessions == nil {
286+
m.deletedSessions = make(map[string]int)
287+
}
288+
m.deletedSessions[sessionHash]++
289+
}
290+
}
291+
return nil
292+
}
293+
277294
// TestGeminiMessagesCompatService_SelectAccountForModelWithExclusions_GeminiPlatform 测试 Gemini 单平台选择
278295
func TestGeminiMessagesCompatService_SelectAccountForModelWithExclusions_GeminiPlatform(t *testing.T) {
279296
ctx := context.Background()

0 commit comments

Comments
 (0)