Skip to content

Commit a97b660

Browse files
committed
feature: Add external-filter in Header for advanced routing
Signed-off-by: rayne-Li <[email protected]>
1 parent 0fba891 commit a97b660

File tree

5 files changed

+72
-8
lines changed

5 files changed

+72
-8
lines changed

pkg/plugins/gateway/gateway.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ import (
2828
"github.com/redis/go-redis/v9"
2929
"google.golang.org/grpc/codes"
3030
"google.golang.org/grpc/status"
31+
corev1 "k8s.io/api/core/v1"
32+
"k8s.io/apimachinery/pkg/labels"
3133
"k8s.io/client-go/kubernetes"
3234
"k8s.io/klog/v2"
3335

@@ -160,7 +162,7 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
160162
}
161163
}
162164

163-
func (s *Server) selectTargetPod(ctx *types.RoutingContext, pods types.PodList) (string, error) {
165+
func (s *Server) selectTargetPod(ctx *types.RoutingContext, pods types.PodList, externalFilterExpr string) (string, error) {
164166
router, err := routing.Select(ctx)
165167
if err != nil {
166168
return "", err
@@ -170,6 +172,25 @@ func (s *Server) selectTargetPod(ctx *types.RoutingContext, pods types.PodList)
170172
return "", fmt.Errorf("no pods for routing")
171173
}
172174
readyPods := utils.FilterRoutablePods(pods.All())
175+
176+
// filter pod by header ext-filter
177+
// k8s labelSelector format, eg: "k=v"、"env in (prod,stg)"
178+
if externalFilterExpr != "" {
179+
sel, err := labels.Parse(externalFilterExpr)
180+
if err != nil {
181+
return "", err
182+
}
183+
out := make([]*corev1.Pod, 0, len(readyPods))
184+
for _, p := range readyPods {
185+
klog.V(3).InfoS("filtering pod", "pod", p.Name)
186+
if sel.Matches(labels.Set(p.Labels)) {
187+
out = append(out, p)
188+
klog.V(3).InfoS("filter passed", "pod", p.Name)
189+
}
190+
}
191+
readyPods = out
192+
}
193+
173194
if len(readyPods) == 0 {
174195
return "", fmt.Errorf("no ready pods for routing")
175196
}

pkg/plugins/gateway/gateway_req_body.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,10 @@ func (s *Server) HandleRequestBody(ctx context.Context, requestID string, req *e
7474
headers = buildEnvoyProxyHeaders(headers, HeaderModel, model)
7575
klog.InfoS("request start", "requestID", requestID, "requestPath", requestPath, "model", model, "stream", stream)
7676
} else {
77-
targetPodIP, err := s.selectTargetPod(routingCtx, podsArr)
77+
// external filter in header
78+
externalFilter := routingCtx.ReqHeaders[HeaderExternalFilter]
79+
klog.InfoS("found external filter", "filter", externalFilter)
80+
targetPodIP, err := s.selectTargetPod(routingCtx, podsArr, externalFilter)
7881
if targetPodIP == "" || err != nil {
7982
klog.ErrorS(err, "failed to select target pod", "requestID", requestID, "routingStrategy", routingAlgorithm, "model", model, "routingDuration", routingCtx.GetRoutingDelay())
8083
return generateErrorResponse(

pkg/plugins/gateway/gateway_req_headers.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ func (s *Server) HandleRequestHeaders(ctx context.Context, requestID string, req
5656
requestPath = string(n.RawValue)
5757
case authorizationKey:
5858
reqHeaders[n.Key] = string(n.RawValue)
59+
case HeaderExternalFilter:
60+
reqHeaders[n.Key] = string(n.RawValue)
5961
}
6062
}
6163

pkg/plugins/gateway/gateway_test.go

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -159,11 +159,12 @@ func Test_selectTargetPod(t *testing.T) {
159159

160160
// Define test cases for different pod selection and error scenarios
161161
tests := []struct {
162-
name string
163-
pods types.PodList
164-
mockSetup func(*mockRouter, types.RoutingAlgorithm)
165-
expectedError bool
166-
expectedPodIP string
162+
name string
163+
pods types.PodList
164+
mockSetup func(*mockRouter, types.RoutingAlgorithm)
165+
expectedError bool
166+
expectedPodIP string
167+
externalFilter string
167168
}{
168169
{
169170
name: "routing.Route returns error",
@@ -288,6 +289,42 @@ func Test_selectTargetPod(t *testing.T) {
288289
expectedError: false,
289290
expectedPodIP: "1.2.3.4:8000",
290291
},
292+
{
293+
name: "external filter",
294+
pods: &utils.PodArray{Pods: []*v1.Pod{
295+
{
296+
ObjectMeta: metav1.ObjectMeta{
297+
Labels: map[string]string{
298+
"foo": "bar",
299+
},
300+
},
301+
Status: v1.PodStatus{
302+
PodIP: "1.2.3.4",
303+
Conditions: []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}},
304+
},
305+
},
306+
{
307+
ObjectMeta: metav1.ObjectMeta{
308+
Labels: map[string]string{
309+
"foo": "sad",
310+
},
311+
},
312+
Status: v1.PodStatus{
313+
PodIP: "5.6.7.8",
314+
Conditions: []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}},
315+
},
316+
},
317+
}},
318+
mockSetup: func(mockRouter *mockRouter, algo types.RoutingAlgorithm) {
319+
// Register a mock router that selects a pod from multiple ready pods
320+
routing.Register(algo, func() (types.Router, error) {
321+
return mockRouter, nil
322+
})
323+
mockRouter.On("Route", mock.Anything, mock.Anything).Return("1.2.3.4:8000", nil).Once()
324+
},
325+
expectedError: false,
326+
expectedPodIP: "1.2.3.4:8000",
327+
},
291328
}
292329

293330
for _, tt := range tests {
@@ -305,7 +342,7 @@ func Test_selectTargetPod(t *testing.T) {
305342
ctx := types.NewRoutingContext(context.Background(), routingAlgo, "test-model", "test-message", "test-request", "test-user")
306343

307344
// Call selectTargetPod and check the result
308-
podIP, err := server.selectTargetPod(ctx, tt.pods)
345+
podIP, err := server.selectTargetPod(ctx, tt.pods, tt.externalFilter)
309346

310347
if tt.expectedError {
311348
assert.Error(subtest, err)

pkg/plugins/gateway/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ const (
4646
HeaderRoutingStrategy = "routing-strategy"
4747
HeaderRequestID = "request-id"
4848
HeaderModel = "model"
49+
HeaderExternalFilter = "external-filter"
4950

5051
// RPM & TPM Update Errors
5152
HeaderUpdateTPM = "x-update-tpm"

0 commit comments

Comments
 (0)