Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 166 additions & 20 deletions cmd/plugins/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,56 @@ limitations under the License.
package main

import (
"context"
"flag"
"net"
"net/http"
"os"
"os/signal"
"sync/atomic"
"syscall"
"time"

extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
"google.golang.org/grpc"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog/v2"
"sigs.k8s.io/gateway-api/pkg/client/clientset/versioned"

extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
"github.com/vllm-project/aibrix/pkg/cache"
"github.com/vllm-project/aibrix/pkg/constants"
"github.com/vllm-project/aibrix/pkg/plugins/gateway"
routing "github.com/vllm-project/aibrix/pkg/plugins/gateway/algorithms"
healthserver "github.com/vllm-project/aibrix/pkg/plugins/gateway/health"
"github.com/vllm-project/aibrix/pkg/utils"
"google.golang.org/grpc/health"
healthPb "google.golang.org/grpc/health/grpc_health_v1"
"sigs.k8s.io/gateway-api/pkg/client/clientset/versioned"
)

var (
grpcAddr string
metricsAddr string
grpcAddr string
metricsAddr string
profilingAddr string
enableLeaderElection bool
leaderElectionID string
leaderElectionNamespace string
)

func main() {
flag.StringVar(&grpcAddr, "grpc-bind-address", ":50052", "The address the gRPC server binds to.")
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&profilingAddr, "profiling-bind-address", ":6061", "The address the profiling endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "enable-leader-election", false, "Enable leader election for high availability")
flag.StringVar(&leaderElectionID,
"leader-election-id", "gateway-plugin-lock", "Name of the lease resource for leader election")
flag.StringVar(&leaderElectionNamespace, "leader-election-namespace",
"aibrix-system", "Namespace for leader election lease (default: same as pod)")

klog.InitFlags(flag.CommandLine)
defer klog.Flush()
flag.Parse()
Expand Down Expand Up @@ -110,32 +128,160 @@ func main() {
}
klog.Infof("Started metrics server on %s", metricsAddr)

isLeader := &atomic.Bool{}
isLeader.Store(false)

leaderCtx, leaderCancel := context.WithCancel(context.Background())
defer leaderCancel()

if enableLeaderElection {
klog.Info("Leader election enabled")

// Get pod info for lease
podName := os.Getenv("POD_NAME")
if podName == "" {
podName = string(uuid.NewUUID())
}
if leaderElectionNamespace == "" {
podNamespace := os.Getenv("POD_NAMESPACE")
if podNamespace == "" {
// Read from file (in-cluster mode)
nsBytes, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
if err != nil {
klog.Fatalf("Failed to read namespace from file: %v", err)
}
podNamespace = string(nsBytes)
}
leaderElectionNamespace = podNamespace
}

lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: leaderElectionID,
Namespace: leaderElectionNamespace,
},
Client: k8sClient.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: podName,
},
}

leConfig := leaderelection.LeaderElectionConfig{
Lock: lock,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
klog.Info("This instance is now the leader!")
isLeader.Store(true)
},
OnStoppedLeading: func() {
klog.Info("This instance is no longer the leader, initiating graceful shutdown...")
// Cancel the leader context to stop leader-specific operations
leaderCancel()
// Exit the process to let Kubernetes restart it
os.Exit(0)
},
OnNewLeader: func(identity string) {
if identity == podName {
klog.Info("Still the leader")
} else {
klog.Infof("New leader elected: %s", identity)
}
},
},
ReleaseOnCancel: true,
}

leaderElector, err := leaderelection.NewLeaderElector(leConfig)
if err != nil {
klog.Fatalf("Failed to create leader elector: %v", err)
}

leaderElector, err = leaderelection.NewLeaderElector(leConfig)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is duplicated.

if err != nil {
klog.Fatalf("Failed to create leader elector: %v", err)
}

go func() {
leaderElector.Run(leaderCtx)
}()
} else {
// Single instance mode, all instances are leaders
isLeader.Store(true)
klog.Info("Single instance mode enabled, this instance is always the leader")
}

// Setup gRPC server with custom health server
s := grpc.NewServer()
extProcPb.RegisterExternalProcessorServer(s, gatewayServer)

healthCheck := health.NewServer()
healthPb.RegisterHealthServer(s, healthCheck)
healthCheck.SetServingStatus("gateway-plugin", healthPb.HealthCheckResponse_SERVING)
newHealthServer := healthserver.NewHealthServer(isLeader, enableLeaderElection)
healthpb.RegisterHealthServer(s, newHealthServer)

klog.Info("starting gRPC server on " + grpcAddr)

profilingServer := &http.Server{
Addr: profilingAddr,
}
go func() {
if err := http.ListenAndServe("localhost:6060", nil); err != nil {
klog.Fatalf("failed to setup profiling: %v", err)
if err := profilingServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
klog.Fatalf("failed to setup profiling on %s: %v", profilingAddr, err)
}
}()

var gracefulStop = make(chan os.Signal, 1)
signal.Notify(gracefulStop, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-gracefulStop
klog.Warningf("signal received: %v, initiating graceful shutdown...", sig)
gatewayServer.Shutdown()
// Create graceful shutdown function
gracefulShutdown := func() {
klog.Info("Initiating graceful shutdown...")

s.GracefulStop()
os.Exit(0)
klog.Info("gRPC server stopped")

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := profilingServer.Shutdown(ctx); err != nil {
klog.Errorf("Error shutting down profiling server: %v", err)
}
klog.Info("Profiling server stopped")

gatewayServer.Shutdown()
klog.Info("Gateway server stopped")

if err := redisClient.Close(); err != nil {
klog.Warningf("Error closing Redis client during shutdown: %v", err)
}
klog.Info("Redis client closed")

leaderCancel()
klog.Info("Leader context cancelled")
klog.Info("Graceful shutdown completed")
}

go func() {
if err := s.Serve(lis); err != nil {
klog.Errorf("gRPC server error: %v", err)
}
}()

if err := s.Serve(lis); err != nil {
panic(err)
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM)

if enableLeaderElection {
// In leader election mode: wait for either signal or losing leadership
select {
case sig := <-signalCh:
klog.Warningf("signal received: %v, initiating graceful shutdown...", sig)
case <-leaderCtx.Done():
klog.Info("Leader context cancelled (lost leadership), initiating shutdown...")
}
gracefulShutdown()
os.Exit(0)
} else {
// In single instance mode: wait for shutdown signal
sig := <-signalCh
klog.Warningf("signal received: %v, initiating graceful shutdown...", sig)
gracefulShutdown()
os.Exit(0)
}
}
6 changes: 6 additions & 0 deletions config/gateway/gateway-plugin/gateway-plugin.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ spec:
command: ['sh', '-c', 'until echo "ping" | nc aibrix-redis-master 6379 -w 1 | grep -c PONG; do echo waiting for service aibrix-redis-master; sleep 2; done']
containers:
- name: gateway-plugin
args:
- --enable-leader-election=false
#- --leader-election-id=gateway-plugin-lock
#- --leader-election-namespace=aibrix-system
image: gateway-plugins:latest
imagePullPolicy: IfNotPresent
ports:
Expand Down Expand Up @@ -120,11 +124,13 @@ spec:
livenessProbe:
grpc:
port: 50052
service: liveness
initialDelaySeconds: 5
periodSeconds: 10
readinessProbe:
grpc:
port: 50052
service: readiness
initialDelaySeconds: 5
periodSeconds: 10
serviceAccountName: aibrix-gateway-plugins
Expand Down
4 changes: 4 additions & 0 deletions dist/chart/templates/gateway-plugin/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ spec:
- name: gateway-plugin
image: {{ .Values.gatewayPlugin.container.image.repository }}:{{ .Values.gatewayPlugin.container.image.tag }}
imagePullPolicy: {{ .Values.gatewayPlugin.container.image.imagePullPolicy | default "IfNotPresent" }}
#args:
# - --enable-leader-election=false
# - --leader-election-id=gateway-plugin-lock
# - --leader-election-namespace=aibrix-system
ports:
- name: gateway
containerPort: 50052
Expand Down
12 changes: 12 additions & 0 deletions dist/chart/templates/gateway-plugin/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@ rules:
- patch
- update
- watch
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- model.aibrix.ai
resources:
Expand Down
18 changes: 10 additions & 8 deletions dist/chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,19 @@ gatewayPlugin:
liveness:
grpc:
port: 50052
initialDelaySeconds: 5
periodSeconds: 10
timeoutSeconds: 3
failureThreshold: 3
service: liveness
initialDelaySeconds: 60
periodSeconds: 15
timeoutSeconds: 10
failureThreshold: 8
readiness:
grpc:
port: 50052
initialDelaySeconds: 5
periodSeconds: 10
timeoutSeconds: 3
failureThreshold: 3
service: readiness
initialDelaySeconds: 60
periodSeconds: 15
timeoutSeconds: 10
failureThreshold: 8
envs:
AIBRIX_POD_METRIC_REFRESH_INTERVAL_MS: "50"
AIBRIX_PREFIX_CACHE_TOKENIZER_TYPE: "character"
Expand Down
Loading
Loading