Skip to content

Commit e4f1082

Browse files
committed
feature: add simple session affinity plugins in gateway plugin
Signed-off-by: CYJiang <[email protected]>
1 parent 26e1a8e commit e4f1082

File tree

5 files changed

+445
-0
lines changed

5 files changed

+445
-0
lines changed
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
Copyright 2025 The Aibrix Team.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package routingalgorithms
18+
19+
import (
20+
"encoding/base64"
21+
"fmt"
22+
"math/rand"
23+
"net"
24+
"strconv"
25+
26+
"github.com/vllm-project/aibrix/pkg/types"
27+
"github.com/vllm-project/aibrix/pkg/utils"
28+
29+
"k8s.io/klog/v2"
30+
)
31+
32+
const (
33+
RouterSessionAffinity types.RoutingAlgorithm = "session-affinity"
34+
sessionIDHeader string = "x-session-id"
35+
)
36+
37+
func init() {
38+
Register(RouterSessionAffinity, NewSessionAffinityRouter)
39+
}
40+
41+
type sessionAffinityRouter struct{}
42+
43+
func NewSessionAffinityRouter() (types.Router, error) {
44+
return &sessionAffinityRouter{}, nil
45+
}
46+
47+
// Route implements session affinity by attempting to route requests to the same pod
48+
// using a session ID stored in the request header. The session ID encodes the target
49+
// pod's address as "IP:Port". If no valid session exists, it falls back to a randomly selected ready pod.
50+
func (r *sessionAffinityRouter) Route(ctx *types.RoutingContext, readyPodList types.PodList) (string, error) {
51+
if ctx.ReqHeaders == nil {
52+
klog.V(4).InfoS("No request or headers, skipping session affinity",
53+
"request_id", ctx.RequestID)
54+
return r.fallbackRoute(ctx, readyPodList)
55+
}
56+
57+
sessionID := ctx.ReqHeaders[sessionIDHeader]
58+
var targetAddr string
59+
60+
if sessionID != "" {
61+
decoded, err := base64.StdEncoding.DecodeString(sessionID)
62+
if err != nil {
63+
klog.ErrorS(err, "Invalid session ID format",
64+
"request_id", ctx.RequestID, "session_id", sessionID)
65+
} else {
66+
targetAddr = string(decoded)
67+
}
68+
}
69+
70+
// If find a decoded target address, try to match ready pod
71+
if targetAddr != "" {
72+
for _, pod := range readyPodList.All() {
73+
port := utils.GetModelPortForPod(ctx.RequestID, pod)
74+
if port == 0 {
75+
continue
76+
}
77+
78+
addr := net.JoinHostPort(pod.Status.PodIP, strconv.Itoa(int(port)))
79+
if addr == targetAddr {
80+
ctx.SetTargetPod(pod)
81+
r.setSessionHeader(ctx, addr) // refresh or keep same
82+
klog.V(4).InfoS("Session affinity matched address", "request_id", ctx.RequestID, "addr", addr)
83+
return ctx.TargetAddress(), nil
84+
}
85+
}
86+
}
87+
88+
// Session ID missing, invalid, or pod not ready → fallback
89+
klog.V(4).InfoS("Session affinity failed, falling back", "request_id", ctx.RequestID, "session_id", sessionID)
90+
return r.fallbackRoute(ctx, readyPodList)
91+
}
92+
93+
func (r *sessionAffinityRouter) setSessionHeader(ctx *types.RoutingContext, addr string) {
94+
if ctx.RespHeaders == nil {
95+
ctx.RespHeaders = make(map[string]string)
96+
}
97+
ctx.RespHeaders[sessionIDHeader] = base64.StdEncoding.EncodeToString([]byte(addr))
98+
}
99+
100+
// fallbackRoute selects a random ready pod and returns its IP:Port as the target address.
101+
// It also sets the session ID in the response so the client can stick to this pod next time.
102+
func (r *sessionAffinityRouter) fallbackRoute(ctx *types.RoutingContext, readyPodList types.PodList) (string, error) {
103+
pods := readyPodList.All()
104+
105+
selected := pods[rand.Intn(len(pods))]
106+
port := utils.GetModelPortForPod(ctx.RequestID, selected)
107+
if port == 0 || selected.Status.PodIP == "" {
108+
return "", fmt.Errorf("selected pod has no valid network address")
109+
}
110+
addr := net.JoinHostPort(selected.Status.PodIP, strconv.Itoa(int(port)))
111+
112+
ctx.SetTargetPod(selected)
113+
r.setSessionHeader(ctx, addr)
114+
klog.V(5).Infof("Fallback to random pod: %s (%s)", selected.Name, addr)
115+
116+
return ctx.TargetAddress(), nil
117+
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
Copyright 2025 The Aibrix Team.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package routingalgorithms
18+
19+
import (
20+
"context"
21+
"encoding/base64"
22+
"testing"
23+
24+
"github.com/stretchr/testify/assert"
25+
"github.com/vllm-project/aibrix/pkg/types"
26+
27+
v1 "k8s.io/api/core/v1"
28+
)
29+
30+
func TestSessionAffinityRouter(t *testing.T) {
31+
tests := []struct {
32+
name string
33+
reqHeaders map[string]string
34+
readyPods []*v1.Pod
35+
expectErr bool
36+
expectPossibleAddrs []string // all valid target addresses (IP:port) that may be selected
37+
}{
38+
{
39+
name: "valid session ID matches ready pod",
40+
reqHeaders: map[string]string{
41+
sessionIDHeader: base64.StdEncoding.EncodeToString([]byte("10.0.0.2:8000")),
42+
},
43+
readyPods: []*v1.Pod{
44+
newPod("pod1", "10.0.0.1", true, map[string]string{"model.aibrix.ai/port": "8000"}),
45+
newPod("pod2", "10.0.0.2", true, map[string]string{"model.aibrix.ai/port": "8000"}),
46+
newPod("pod3", "10.0.0.3", true, map[string]string{"model.aibrix.ai/port": "8000"}),
47+
},
48+
expectErr: false,
49+
expectPossibleAddrs: []string{"10.0.0.2:8000"},
50+
},
51+
{
52+
name: "no session ID → fallback to any ready pod",
53+
reqHeaders: nil,
54+
readyPods: []*v1.Pod{
55+
newPod("pod1", "10.0.0.1", true, map[string]string{"model.aibrix.ai/port": "8000"}),
56+
newPod("pod2", "10.0.0.2", true, map[string]string{"model.aibrix.ai/port": "8000"}),
57+
},
58+
expectErr: false,
59+
expectPossibleAddrs: []string{"10.0.0.1:8000", "10.0.0.2:8000"},
60+
},
61+
{
62+
name: "invalid base64 session ID → fallback",
63+
reqHeaders: map[string]string{
64+
sessionIDHeader: "%%%INVALID_BASE64%%%",
65+
},
66+
readyPods: []*v1.Pod{
67+
newPod("a", "192.168.1.10", true, map[string]string{"model.aibrix.ai/port": "8000"}),
68+
newPod("b", "192.168.1.11", true, map[string]string{"model.aibrix.ai/port": "8000"}),
69+
},
70+
expectErr: false,
71+
expectPossibleAddrs: []string{"192.168.1.10:8000", "192.168.1.11:8000"},
72+
},
73+
{
74+
name: "session ID points to non-existent address → fallback",
75+
reqHeaders: map[string]string{
76+
sessionIDHeader: base64.StdEncoding.EncodeToString([]byte("10.99.99.99:8000")), // 不存在的 IP
77+
},
78+
readyPods: []*v1.Pod{
79+
newPod("x", "10.1.1.1", true, map[string]string{"model.aibrix.ai/port": "8000"}),
80+
newPod("y", "10.1.1.2", true, map[string]string{"model.aibrix.ai/port": "8000"}),
81+
},
82+
expectErr: false,
83+
expectPossibleAddrs: []string{"10.1.1.1:8000", "10.1.1.2:8000"},
84+
},
85+
}
86+
87+
for _, tt := range tests {
88+
t.Run(tt.name, func(t *testing.T) {
89+
router := &sessionAffinityRouter{}
90+
91+
ctx := types.NewRoutingContext(context.Background(), "test", "model1", "", "", "")
92+
ctx.ReqHeaders = tt.reqHeaders
93+
94+
podList := newMockPodList(tt.readyPods, nil)
95+
96+
addr, err := router.Route(ctx, podList)
97+
98+
if tt.expectErr {
99+
assert.Error(t, err)
100+
return
101+
}
102+
103+
assert.NoError(t, err)
104+
assert.NotNil(t, ctx.RespHeaders, "RespHeaders should not be nil")
105+
assert.Contains(t, ctx.RespHeaders, sessionIDHeader, "Response must include session ID header")
106+
107+
// verify the returned address is one of the expected ready pod addresses
108+
assert.Contains(t, tt.expectPossibleAddrs, addr, "selected address must be one of the ready pods' IP:port")
109+
110+
// verify that the session ID in the response decodes to the same address
111+
sessionB64 := ctx.RespHeaders[sessionIDHeader]
112+
sessionBytes, decodeErr := base64.StdEncoding.DecodeString(sessionB64)
113+
assert.NoError(t, decodeErr, "session ID must be valid base64")
114+
actualSessionAddr := string(sessionBytes)
115+
116+
assert.Equal(t, addr, actualSessionAddr, "session ID must encode the same address as returned by Route()")
117+
})
118+
}
119+
}

pkg/plugins/gateway/gateway_rsp_headers.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package gateway
1919
import (
2020
"context"
2121
"strconv"
22+
"strings"
2223

2324
configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
2425
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
@@ -46,6 +47,21 @@ func (s *Server) HandleResponseHeaders(ctx context.Context, requestID string, mo
4647
headers = buildEnvoyProxyHeaders(headers, HeaderTargetPod, routerCtx.TargetAddress())
4748
}
4849

50+
if routerCtx != nil && routerCtx.RespHeaders != nil {
51+
for key, value := range routerCtx.RespHeaders {
52+
// skip HTTP/2 pseudo-header fields (such as :status, :path, etc.) to avoid protocol errors.
53+
if strings.HasPrefix(key, ":") {
54+
continue
55+
}
56+
headers = append(headers, &configPb.HeaderValueOption{
57+
Header: &configPb.HeaderValue{
58+
Key: key,
59+
RawValue: []byte(value),
60+
},
61+
})
62+
}
63+
}
64+
4965
for _, headerValue := range b.ResponseHeaders.Headers.Headers {
5066
if headerValue.Key == ":status" {
5167
code, _ := strconv.Atoi(string(headerValue.RawValue))

0 commit comments

Comments
 (0)