From 805369a6018207febb99d7d52c0b34245ab566bb Mon Sep 17 00:00:00 2001 From: fscnick Date: Tue, 28 Oct 2025 15:36:14 +0800 Subject: [PATCH 01/11] [RayJob] background job info poc --- .../ray_job_submission_service_server.go | 2 +- .../config/v1alpha1/configuration_types.go | 6 +- .../controllers/ray/rayjob_controller.go | 5 + .../dashboardclient/dashboard_cache_client.go | 151 ++++++++++++++++++ .../dashboardclient/dashboard_httpclient.go | 3 +- ray-operator/controllers/ray/utils/util.go | 13 +- ray-operator/test/sampleyaml/support.go | 2 +- 7 files changed, 177 insertions(+), 5 deletions(-) create mode 100644 ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go diff --git a/apiserver/pkg/server/ray_job_submission_service_server.go b/apiserver/pkg/server/ray_job_submission_service_server.go index 4fdead1e50f..596b991821e 100644 --- a/apiserver/pkg/server/ray_job_submission_service_server.go +++ b/apiserver/pkg/server/ray_job_submission_service_server.go @@ -41,7 +41,7 @@ type RayJobSubmissionServiceServer struct { // Create RayJobSubmissionServiceServer func NewRayJobSubmissionServiceServer(clusterServer *ClusterServer, options *RayJobSubmissionServiceServerOptions) *RayJobSubmissionServiceServer { zl := zerolog.New(os.Stdout).Level(zerolog.DebugLevel) - return &RayJobSubmissionServiceServer{clusterServer: clusterServer, options: options, log: zerologr.New(&zl).WithName("jobsubmissionservice"), dashboardClientFunc: utils.GetRayDashboardClientFunc(nil, false)} + return &RayJobSubmissionServiceServer{clusterServer: clusterServer, options: options, log: zerologr.New(&zl).WithName("jobsubmissionservice"), dashboardClientFunc: utils.GetRayDashboardClientFunc(nil, false, false)} } // Submit Ray job diff --git a/ray-operator/apis/config/v1alpha1/configuration_types.go b/ray-operator/apis/config/v1alpha1/configuration_types.go index 910ec0d11ab..1abb7dedab4 100644 --- a/ray-operator/apis/config/v1alpha1/configuration_types.go +++ b/ray-operator/apis/config/v1alpha1/configuration_types.go @@ -86,10 +86,14 @@ type Configuration struct { // EnableMetrics indicates whether KubeRay operator should emit control plane metrics. EnableMetrics bool `json:"enableMetrics,omitempty"` + + // UseBackgroundGoroutine indicates that it wil use goroutine to fetch the job info from ray dashboard and + // store the job info in the cache + UseBackgroundGoroutine bool `json:"useBackgroundGoroutine,omitempty"` } func (config Configuration) GetDashboardClient(mgr manager.Manager) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { - return utils.GetRayDashboardClientFunc(mgr, config.UseKubernetesProxy) + return utils.GetRayDashboardClientFunc(mgr, config.UseKubernetesProxy, config.UseBackgroundGoroutine) } func (config Configuration) GetHttpProxyClient(mgr manager.Manager) func(hostIp, podNamespace, podName string, port int) utils.RayHttpProxyClientInterface { diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index 5da57de6491..0cbcf45c444 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -2,6 +2,7 @@ package ray import ( "context" + errs "errors" "fmt" "os" "strconv" @@ -284,6 +285,10 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) jobInfo, err := rayDashboardClient.GetJobInfo(ctx, rayJobInstance.Status.JobId) if err != nil { + if errs.Is(err, dashboardclient.ErrAgain) { + logger.Info("The Ray job Info was not ready. Try again next iteration.", "JobId", rayJobInstance.Status.JobId) + return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil + } // If the Ray job was not found, GetJobInfo returns a BadRequest error. if errors.IsBadRequest(err) { if rayJobInstance.Spec.SubmissionMode == rayv1.HTTPMode { diff --git a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go new file mode 100644 index 00000000000..40e91d49a82 --- /dev/null +++ b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go @@ -0,0 +1,151 @@ +package dashboardclient + +import ( + "context" + "errors" + "sync" + "time" + + cmap "github.com/orcaman/concurrent-map/v2" + + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" + utiltypes "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/types" +) + +var ErrAgain = errors.New("EAGAIN") + +var ( + initWorkPool sync.Once + taskQueue chan Task + + // TODO: make queue size and worker size configurable. + taskQueueSize = 128 + workerSize = 8 + + cacheStorage *cmap.ConcurrentMap[string, *JobInfoCache] + + queryInterval = 3 * time.Second // TODO: make it configurable +) + +type ( + Task func() bool + JobInfoCache struct { + JobInfo *utiltypes.RayJobInfo + Err error + UpdateAt *time.Time + } +) + +var _ RayDashboardClientInterface = (*RayDashboardCacheClient)(nil) + +type RayDashboardCacheClient struct { + client RayDashboardClientInterface +} + +func (r *RayDashboardCacheClient) InitClient(client RayDashboardClientInterface) { + initWorkPool.Do(func() { + if taskQueue == nil { + taskQueue = make(chan Task, taskQueueSize) + + // TODO: should we have observability for these goroutine? + for i := 0; i < workerSize; i++ { + // TODO: should we consider the stop ? + go func() { + for task := range taskQueue { + again := task() + + if again { + time.AfterFunc(queryInterval, func() { + taskQueue <- task + }) + } + } + }() + } + } + + if cacheStorage == nil { + tmp := cmap.New[*JobInfoCache]() + cacheStorage = &tmp + } + }) + + r.client = client +} + +func (r *RayDashboardCacheClient) UpdateDeployments(ctx context.Context, configJson []byte) error { + // TODO implement me + panic("implement me") +} + +func (r *RayDashboardCacheClient) GetServeDetails(ctx context.Context) (*utiltypes.ServeDetails, error) { + // TODO implement me + panic("implement me") +} + +func (r *RayDashboardCacheClient) GetMultiApplicationStatus(ctx context.Context) (map[string]*utiltypes.ServeApplicationStatus, error) { + // TODO implement me + panic("implement me") +} + +func (r *RayDashboardCacheClient) GetJobInfo(ctx context.Context, jobId string) (*utiltypes.RayJobInfo, error) { + if cached, ok := cacheStorage.Get(jobId); ok { + return cached.JobInfo, cached.Err + } + cached := &JobInfoCache{Err: ErrAgain} + cacheStorage.Set(jobId, cached) + + // send to worker pool + task := func() bool { + jobInfoCache, ok := cacheStorage.Get(jobId) + if !ok { + // TODO: this should not happen + } + jobInfoCache.JobInfo, jobInfoCache.Err = r.client.GetJobInfo(ctx, jobId) + currentTime := time.Now() + jobInfoCache.UpdateAt = ¤tTime + + cacheStorage.Set(jobId, jobInfoCache) + // handle not found(ex: rayjob has deleted) + + if rayv1.IsJobTerminal(jobInfoCache.JobInfo.JobStatus) { + return false + } + + return true + } + + taskQueue <- task + + return nil, ErrAgain +} + +func (r *RayDashboardCacheClient) ListJobs(ctx context.Context) (*[]utiltypes.RayJobInfo, error) { + // TODO implement me + panic("implement me") +} + +func (r *RayDashboardCacheClient) SubmitJob(ctx context.Context, rayJob *rayv1.RayJob) (string, error) { + // TODO implement me + panic("implement me") +} + +func (r *RayDashboardCacheClient) SubmitJobReq(ctx context.Context, request *utiltypes.RayJobRequest) (string, error) { + // TODO implement me + panic("implement me") +} + +func (r *RayDashboardCacheClient) GetJobLog(ctx context.Context, jobName string) (*string, error) { + // TODO implement me + panic("implement me") +} + +func (r *RayDashboardCacheClient) StopJob(ctx context.Context, jobName string) error { + // TODO implement me + panic("implement me") +} + +func (r *RayDashboardCacheClient) DeleteJob(ctx context.Context, jobName string) error { + // TODO implement me + panic("implement me") +} diff --git a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go index d360ebc0af9..98bd8da369c 100644 --- a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go +++ b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go @@ -27,7 +27,8 @@ var ( ) type RayDashboardClientInterface interface { - InitClient(client *http.Client, dashboardURL string) + // Remove InitClient for adapting variable implementation + // InitClient(client *http.Client, dashboardURL string) UpdateDeployments(ctx context.Context, configJson []byte) error // V2/multi-app Rest API GetServeDetails(ctx context.Context) (*utiltypes.ServeDetails, error) diff --git a/ray-operator/controllers/ray/utils/util.go b/ray-operator/controllers/ray/utils/util.go index 540162bf9f4..9bc463de718 100644 --- a/ray-operator/controllers/ray/utils/util.go +++ b/ray-operator/controllers/ray/utils/util.go @@ -877,7 +877,7 @@ func FetchHeadServiceURL(ctx context.Context, cli client.Client, rayCluster *ray return headServiceURL, nil } -func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { +func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool, useBackgroundGoroutine bool) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { return func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { dashboardClient := &dashboardclient.RayDashboardClient{} if useKubernetesProxy { @@ -897,12 +897,23 @@ func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool) fun mgr.GetHTTPClient(), fmt.Sprintf("%s/api/v1/namespaces/%s/services/%s:dashboard/proxy", mgr.GetConfig().Host, rayCluster.Namespace, headSvcName), ) + if useBackgroundGoroutine { + dashboardCachedClient := &dashboardclient.RayDashboardCacheClient{} + dashboardCachedClient.InitClient(dashboardClient) + return dashboardCachedClient, nil + } return dashboardClient, nil } dashboardClient.InitClient(&http.Client{ Timeout: 2 * time.Second, }, "http://"+url) + + if useBackgroundGoroutine { + dashboardCachedClient := &dashboardclient.RayDashboardCacheClient{} + dashboardCachedClient.InitClient(dashboardClient) + return dashboardCachedClient, nil + } return dashboardClient, nil } } diff --git a/ray-operator/test/sampleyaml/support.go b/ray-operator/test/sampleyaml/support.go index 42ffd19e0a2..f470295001b 100644 --- a/ray-operator/test/sampleyaml/support.go +++ b/ray-operator/test/sampleyaml/support.go @@ -75,7 +75,7 @@ func QueryDashboardGetAppStatus(t Test, rayCluster *rayv1.RayCluster) func(Gomeg g.Expect(err).ToNot(HaveOccurred()) url := fmt.Sprintf("127.0.0.1:%d", localPort) - rayDashboardClientFunc := utils.GetRayDashboardClientFunc(nil, false) + rayDashboardClientFunc := utils.GetRayDashboardClientFunc(nil, false, false) rayDashboardClient, err := rayDashboardClientFunc(rayCluster, url) g.Expect(err).ToNot(HaveOccurred()) serveDetails, err := rayDashboardClient.GetServeDetails(t.Ctx()) From 73b14b5c0b794c873236a42b21b0fd069c44d07c Mon Sep 17 00:00:00 2001 From: fscnick Date: Tue, 28 Oct 2025 16:52:34 +0800 Subject: [PATCH 02/11] [RayJob] add implement some methods --- .../dashboardclient/dashboard_cache_client.go | 40 ++++++------------- 1 file changed, 13 insertions(+), 27 deletions(-) diff --git a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go index 40e91d49a82..f76c887beec 100644 --- a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go +++ b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go @@ -74,18 +74,15 @@ func (r *RayDashboardCacheClient) InitClient(client RayDashboardClientInterface) } func (r *RayDashboardCacheClient) UpdateDeployments(ctx context.Context, configJson []byte) error { - // TODO implement me - panic("implement me") + return r.client.UpdateDeployments(ctx, configJson) } func (r *RayDashboardCacheClient) GetServeDetails(ctx context.Context) (*utiltypes.ServeDetails, error) { - // TODO implement me - panic("implement me") + return r.client.GetServeDetails(ctx) } func (r *RayDashboardCacheClient) GetMultiApplicationStatus(ctx context.Context) (map[string]*utiltypes.ServeApplicationStatus, error) { - // TODO implement me - panic("implement me") + return r.client.GetMultiApplicationStatus(ctx) } func (r *RayDashboardCacheClient) GetJobInfo(ctx context.Context, jobId string) (*utiltypes.RayJobInfo, error) { @@ -97,10 +94,9 @@ func (r *RayDashboardCacheClient) GetJobInfo(ctx context.Context, jobId string) // send to worker pool task := func() bool { - jobInfoCache, ok := cacheStorage.Get(jobId) - if !ok { - // TODO: this should not happen - } + jobInfoCache, _ := cacheStorage.Get(jobId) + // TODO: should we handle cache not exist here, which it shouldn't happen + jobInfoCache.JobInfo, jobInfoCache.Err = r.client.GetJobInfo(ctx, jobId) currentTime := time.Now() jobInfoCache.UpdateAt = ¤tTime @@ -108,11 +104,7 @@ func (r *RayDashboardCacheClient) GetJobInfo(ctx context.Context, jobId string) cacheStorage.Set(jobId, jobInfoCache) // handle not found(ex: rayjob has deleted) - if rayv1.IsJobTerminal(jobInfoCache.JobInfo.JobStatus) { - return false - } - - return true + return !rayv1.IsJobTerminal(jobInfoCache.JobInfo.JobStatus) } taskQueue <- task @@ -121,31 +113,25 @@ func (r *RayDashboardCacheClient) GetJobInfo(ctx context.Context, jobId string) } func (r *RayDashboardCacheClient) ListJobs(ctx context.Context) (*[]utiltypes.RayJobInfo, error) { - // TODO implement me - panic("implement me") + return r.client.ListJobs(ctx) } func (r *RayDashboardCacheClient) SubmitJob(ctx context.Context, rayJob *rayv1.RayJob) (string, error) { - // TODO implement me - panic("implement me") + return r.client.SubmitJob(ctx, rayJob) } func (r *RayDashboardCacheClient) SubmitJobReq(ctx context.Context, request *utiltypes.RayJobRequest) (string, error) { - // TODO implement me - panic("implement me") + return r.client.SubmitJobReq(ctx, request) } func (r *RayDashboardCacheClient) GetJobLog(ctx context.Context, jobName string) (*string, error) { - // TODO implement me - panic("implement me") + return r.client.GetJobLog(ctx, jobName) } func (r *RayDashboardCacheClient) StopJob(ctx context.Context, jobName string) error { - // TODO implement me - panic("implement me") + return r.client.StopJob(ctx, jobName) } func (r *RayDashboardCacheClient) DeleteJob(ctx context.Context, jobName string) error { - // TODO implement me - panic("implement me") + return r.client.DeleteJob(ctx, jobName) } From 4ce23811cb3432898b81e84a146418eca4493785 Mon Sep 17 00:00:00 2001 From: fscnick Date: Wed, 29 Oct 2025 14:27:12 +0800 Subject: [PATCH 03/11] [RayJob] encapsulate the worker pool --- .../dashboardclient/dashboard_cache_client.go | 69 +++++++++++-------- 1 file changed, 42 insertions(+), 27 deletions(-) diff --git a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go index f76c887beec..6cf0f351eb7 100644 --- a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go +++ b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go @@ -14,17 +14,20 @@ import ( var ErrAgain = errors.New("EAGAIN") -var ( - initWorkPool sync.Once - taskQueue chan Task - +const ( // TODO: make queue size and worker size configurable. taskQueueSize = 128 workerSize = 8 - cacheStorage *cmap.ConcurrentMap[string, *JobInfoCache] + queryInterval = 3 * time.Second +) - queryInterval = 3 * time.Second // TODO: make it configurable +var ( + initWorkPool sync.Once + pool workerPool + + initCacheStorage sync.Once + cacheStorage *cmap.ConcurrentMap[string, *JobInfoCache] ) type ( @@ -34,8 +37,36 @@ type ( Err error UpdateAt *time.Time } + + workerPool struct { + taskQueue chan Task + } ) +func (w *workerPool) init(taskQueueSize int, workerSize int, queryInterval time.Duration) { + w.taskQueue = make(chan Task, taskQueueSize) + + // TODO: should we have observability for these goroutine? + for i := 0; i < workerSize; i++ { + // TODO: should we consider the stop ? + go func() { + for task := range w.taskQueue { + again := task() + + if again { + time.AfterFunc(queryInterval, func() { + w.taskQueue <- task + }) + } + } + }() + } +} + +func (w *workerPool) PutTask(task Task) { + w.taskQueue <- task +} + var _ RayDashboardClientInterface = (*RayDashboardCacheClient)(nil) type RayDashboardCacheClient struct { @@ -44,26 +75,10 @@ type RayDashboardCacheClient struct { func (r *RayDashboardCacheClient) InitClient(client RayDashboardClientInterface) { initWorkPool.Do(func() { - if taskQueue == nil { - taskQueue = make(chan Task, taskQueueSize) - - // TODO: should we have observability for these goroutine? - for i := 0; i < workerSize; i++ { - // TODO: should we consider the stop ? - go func() { - for task := range taskQueue { - again := task() - - if again { - time.AfterFunc(queryInterval, func() { - taskQueue <- task - }) - } - } - }() - } - } + pool.init(taskQueueSize, workerSize, queryInterval) + }) + initCacheStorage.Do(func() { if cacheStorage == nil { tmp := cmap.New[*JobInfoCache]() cacheStorage = &tmp @@ -90,7 +105,7 @@ func (r *RayDashboardCacheClient) GetJobInfo(ctx context.Context, jobId string) return cached.JobInfo, cached.Err } cached := &JobInfoCache{Err: ErrAgain} - cacheStorage.Set(jobId, cached) + cacheStorage.SetIfAbsent(jobId, cached) // send to worker pool task := func() bool { @@ -107,7 +122,7 @@ func (r *RayDashboardCacheClient) GetJobInfo(ctx context.Context, jobId string) return !rayv1.IsJobTerminal(jobInfoCache.JobInfo.JobStatus) } - taskQueue <- task + pool.PutTask(task) return nil, ErrAgain } From e184e5c6a6fc647f9ab8c769f5e5f06d47b4ad13 Mon Sep 17 00:00:00 2001 From: fscnick Date: Wed, 29 Oct 2025 15:58:32 +0800 Subject: [PATCH 04/11] [RayJob] replace concurrency map with lru cache --- go.mod | 1 + go.sum | 2 ++ .../dashboardclient/dashboard_cache_client.go | 17 +++++++++++------ ray-operator/go.mod | 1 + ray-operator/go.sum | 2 ++ 5 files changed, 17 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index d5dab3bca19..1ce8746cd50 100644 --- a/go.mod +++ b/go.mod @@ -67,6 +67,7 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 // indirect + github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect diff --git a/go.sum b/go.sum index afff157cfd8..2eb06da7d9e 100644 --- a/go.sum +++ b/go.sum @@ -107,6 +107,8 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92Bcuy github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 h1:5ZPtiqj0JL5oKWmcsq4VMaAW5ukBEgSGXEN89zeH1Jo= github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3/go.mod h1:ndYquD05frm2vACXE1nsccT4oJzjhw2arTS2cpUD1PI= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/jarcoal/httpmock v1.4.0 h1:BvhqnH0JAYbNudL2GMJKgOHe2CtKlzJ/5rWKyp+hc2k= diff --git a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go index 6cf0f351eb7..c3ee6a906cd 100644 --- a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go +++ b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go @@ -6,7 +6,7 @@ import ( "sync" "time" - cmap "github.com/orcaman/concurrent-map/v2" + lru "github.com/hashicorp/golang-lru/v2" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" utiltypes "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/types" @@ -20,6 +20,9 @@ const ( workerSize = 8 queryInterval = 3 * time.Second + + cacheSize = 10000 + cacheExpiry = 10 * time.Minute ) var ( @@ -27,7 +30,7 @@ var ( pool workerPool initCacheStorage sync.Once - cacheStorage *cmap.ConcurrentMap[string, *JobInfoCache] + cacheStorage *lru.Cache[string, *JobInfoCache] ) type ( @@ -80,8 +83,8 @@ func (r *RayDashboardCacheClient) InitClient(client RayDashboardClientInterface) initCacheStorage.Do(func() { if cacheStorage == nil { - tmp := cmap.New[*JobInfoCache]() - cacheStorage = &tmp + // the New() returns error only if the size is less or equal than zero. + cacheStorage, _ = lru.New[string, *JobInfoCache](cacheSize) } }) @@ -105,7 +108,9 @@ func (r *RayDashboardCacheClient) GetJobInfo(ctx context.Context, jobId string) return cached.JobInfo, cached.Err } cached := &JobInfoCache{Err: ErrAgain} - cacheStorage.SetIfAbsent(jobId, cached) + if cached, existed, _ := cacheStorage.PeekOrAdd(jobId, cached); existed { + return cached.JobInfo, cached.Err + } // send to worker pool task := func() bool { @@ -116,7 +121,7 @@ func (r *RayDashboardCacheClient) GetJobInfo(ctx context.Context, jobId string) currentTime := time.Now() jobInfoCache.UpdateAt = ¤tTime - cacheStorage.Set(jobId, jobInfoCache) + cacheStorage.Add(jobId, jobInfoCache) // handle not found(ex: rayjob has deleted) return !rayv1.IsJobTerminal(jobInfoCache.JobInfo.JobStatus) diff --git a/ray-operator/go.mod b/ray-operator/go.mod index 907c9fd9e8b..cbe6d63098b 100644 --- a/ray-operator/go.mod +++ b/ray-operator/go.mod @@ -7,6 +7,7 @@ require ( github.com/coder/websocket v1.8.13 github.com/go-logr/logr v1.4.3 github.com/go-logr/zapr v1.3.0 + github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/jarcoal/httpmock v1.4.0 github.com/onsi/ginkgo/v2 v2.23.4 github.com/onsi/gomega v1.37.0 diff --git a/ray-operator/go.sum b/ray-operator/go.sum index 26b72b129ae..d9a20efaf4e 100644 --- a/ray-operator/go.sum +++ b/ray-operator/go.sum @@ -53,6 +53,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 h1:JeSE6pjso5THxAzdVpqr6/geYxZytqFMBCOtn/ujyeo= github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674/go.mod h1:r4w70xmWCQKmi1ONH4KIaBptdivuRPyosB9RmPlGEwA= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/jarcoal/httpmock v1.4.0 h1:BvhqnH0JAYbNudL2GMJKgOHe2CtKlzJ/5rWKyp+hc2k= github.com/jarcoal/httpmock v1.4.0/go.mod h1:ftW1xULwo+j0R0JJkJIIi7UKigZUXCLLanykgjwBXL0= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= From 859f6a1c9c7acf7409e2092a43f8753df22f6d7d Mon Sep 17 00:00:00 2001 From: fscnick Date: Thu, 30 Oct 2025 15:29:50 +0800 Subject: [PATCH 05/11] [RayJob] remove cache on stop and config flag --- .../config/v1alpha1/configuration_types.go | 4 +++ .../controllers/ray/rayjob_controller.go | 31 +++++++++++++------ ray-operator/controllers/ray/suite_test.go | 4 +++ .../dashboardclient/dashboard_cache_client.go | 12 +++++-- ray-operator/controllers/ray/utils/util.go | 1 + ray-operator/main.go | 3 ++ 6 files changed, 42 insertions(+), 13 deletions(-) diff --git a/ray-operator/apis/config/v1alpha1/configuration_types.go b/ray-operator/apis/config/v1alpha1/configuration_types.go index 1abb7dedab4..55e1269c6a6 100644 --- a/ray-operator/apis/config/v1alpha1/configuration_types.go +++ b/ray-operator/apis/config/v1alpha1/configuration_types.go @@ -99,3 +99,7 @@ func (config Configuration) GetDashboardClient(mgr manager.Manager) func(rayClus func (config Configuration) GetHttpProxyClient(mgr manager.Manager) func(hostIp, podNamespace, podName string, port int) utils.RayHttpProxyClientInterface { return utils.GetRayHttpProxyClientFunc(mgr, config.UseKubernetesProxy) } + +func (config Configuration) DoesUseBackgroundGoroutine() bool { + return config.UseBackgroundGoroutine +} diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index 0cbcf45c444..2cd24da0afa 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -43,11 +43,11 @@ const ( // RayJobReconciler reconciles a RayJob object type RayJobReconciler struct { client.Client - Scheme *runtime.Scheme - Recorder record.EventRecorder - - dashboardClientFunc func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) - options RayJobReconcilerOptions + Recorder record.EventRecorder + options RayJobReconcilerOptions + Scheme *runtime.Scheme + dashboardClientFunc func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) + useBackgroundGoroutine bool } type RayJobReconcilerOptions struct { @@ -59,11 +59,12 @@ type RayJobReconcilerOptions struct { func NewRayJobReconciler(_ context.Context, mgr manager.Manager, options RayJobReconcilerOptions, provider utils.ClientProvider) *RayJobReconciler { dashboardClientFunc := provider.GetDashboardClient(mgr) return &RayJobReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - Recorder: mgr.GetEventRecorderFor("rayjob-controller"), - dashboardClientFunc: dashboardClientFunc, - options: options, + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Recorder: mgr.GetEventRecorderFor("rayjob-controller"), + dashboardClientFunc: dashboardClientFunc, + useBackgroundGoroutine: provider.DoesUseBackgroundGoroutine(), + options: options, } } @@ -758,6 +759,16 @@ func (r *RayJobReconciler) deleteClusterResources(ctx context.Context, rayJobIns if !cluster.DeletionTimestamp.IsZero() { logger.Info("The deletion of the associated RayCluster for RayJob is ongoing.", "RayCluster", cluster.Name) } else { + if r.useBackgroundGoroutine { + // clear cache, and it will remove this job from updating loop. + rayDashboardClient, err := r.dashboardClientFunc(&cluster, rayJobInstance.Status.DashboardURL) + if err != nil { + logger.Error(err, "Failed to get dashboard client for RayJob") + } + if err := rayDashboardClient.StopJob(ctx, rayJobInstance.Status.JobId); err != nil { + logger.Error(err, "Failed to stop job for RayJob") + } + } if err := r.Delete(ctx, &cluster); err != nil { r.Recorder.Eventf(rayJobInstance, corev1.EventTypeWarning, string(utils.FailedToDeleteRayCluster), "Failed to delete cluster %s/%s: %v", cluster.Namespace, cluster.Name, err) return false, err diff --git a/ray-operator/controllers/ray/suite_test.go b/ray-operator/controllers/ray/suite_test.go index 85c913e7bd6..e2ab5e16196 100644 --- a/ray-operator/controllers/ray/suite_test.go +++ b/ray-operator/controllers/ray/suite_test.go @@ -64,6 +64,10 @@ func (testProvider TestClientProvider) GetHttpProxyClient(_ manager.Manager) fun } } +func (testProvider TestClientProvider) DoesUseBackgroundGoroutine() bool { + return false +} + func TestAPIs(t *testing.T) { RegisterFailHandler(Fail) diff --git a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go index c3ee6a906cd..e1616649f45 100644 --- a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go +++ b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go @@ -21,6 +21,7 @@ const ( queryInterval = 3 * time.Second + // TODO: consider a proper size for accommodating the all live job info cacheSize = 10000 cacheExpiry = 10 * time.Minute ) @@ -114,16 +115,19 @@ func (r *RayDashboardCacheClient) GetJobInfo(ctx context.Context, jobId string) // send to worker pool task := func() bool { - jobInfoCache, _ := cacheStorage.Get(jobId) - // TODO: should we handle cache not exist here, which it shouldn't happen + jobInfoCache, existed := cacheStorage.Get(jobId) + if !existed { + return false + } jobInfoCache.JobInfo, jobInfoCache.Err = r.client.GetJobInfo(ctx, jobId) currentTime := time.Now() jobInfoCache.UpdateAt = ¤tTime - cacheStorage.Add(jobId, jobInfoCache) // handle not found(ex: rayjob has deleted) + cacheStorage.Add(jobId, jobInfoCache) + return !rayv1.IsJobTerminal(jobInfoCache.JobInfo.JobStatus) } @@ -149,9 +153,11 @@ func (r *RayDashboardCacheClient) GetJobLog(ctx context.Context, jobName string) } func (r *RayDashboardCacheClient) StopJob(ctx context.Context, jobName string) error { + cacheStorage.Remove(jobName) return r.client.StopJob(ctx, jobName) } func (r *RayDashboardCacheClient) DeleteJob(ctx context.Context, jobName string) error { + cacheStorage.Remove(jobName) return r.client.DeleteJob(ctx, jobName) } diff --git a/ray-operator/controllers/ray/utils/util.go b/ray-operator/controllers/ray/utils/util.go index 9bc463de718..31a93664b64 100644 --- a/ray-operator/controllers/ray/utils/util.go +++ b/ray-operator/controllers/ray/utils/util.go @@ -653,6 +653,7 @@ func EnvVarByName(envName string, envVars []corev1.EnvVar) (corev1.EnvVar, bool) type ClientProvider interface { GetDashboardClient(mgr manager.Manager) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) GetHttpProxyClient(mgr manager.Manager) func(hostIp, podNamespace, podName string, port int) RayHttpProxyClientInterface + DoesUseBackgroundGoroutine() bool } func ManagedByExternalController(controllerName *string) *string { diff --git a/ray-operator/main.go b/ray-operator/main.go index ceba7d4772e..feb675e717f 100644 --- a/ray-operator/main.go +++ b/ray-operator/main.go @@ -73,6 +73,7 @@ func main() { var enableMetrics bool var qps float64 var burst int + var useBackgroundGoroutine bool // TODO: remove flag-based config once Configuration API graduates to v1. flag.StringVar(&metricsAddr, "metrics-addr", configapi.DefaultMetricsAddr, "The address the metric endpoint binds to.") @@ -106,6 +107,7 @@ func main() { flag.BoolVar(&enableMetrics, "enable-metrics", false, "Enable the emission of control plane metrics.") flag.Float64Var(&qps, "qps", float64(configapi.DefaultQPS), "The QPS value for the client communicating with the Kubernetes API server.") flag.IntVar(&burst, "burst", configapi.DefaultBurst, "The maximum burst for throttling requests from this client to the Kubernetes API server.") + flag.BoolVar(&useBackgroundGoroutine, "use-background-goroutine", false, "Enable the background goroutine for fetching job info in RayJob.") opts := k8szap.Options{ TimeEncoder: zapcore.ISO8601TimeEncoder, @@ -138,6 +140,7 @@ func main() { config.EnableMetrics = enableMetrics config.QPS = &qps config.Burst = &burst + config.UseBackgroundGoroutine = useBackgroundGoroutine } stdoutEncoder, err := newLogEncoder(logStdoutEncoder) From 03ce0e96557b914fb0fc7958729f44a1a6b97398 Mon Sep 17 00:00:00 2001 From: fscnick Date: Thu, 30 Oct 2025 15:51:37 +0800 Subject: [PATCH 06/11] [RayJob] expiry cache cleanup goroutine Signed-off-by: fscnick --- .../dashboardclient/dashboard_cache_client.go | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go index e1616649f45..0f33589876b 100644 --- a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go +++ b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go @@ -50,8 +50,8 @@ type ( func (w *workerPool) init(taskQueueSize int, workerSize int, queryInterval time.Duration) { w.taskQueue = make(chan Task, taskQueueSize) - // TODO: should we have observability for these goroutine? for i := 0; i < workerSize; i++ { + // TODO: observability for these goroutine // TODO: should we consider the stop ? go func() { for task := range w.taskQueue { @@ -87,6 +87,25 @@ func (r *RayDashboardCacheClient) InitClient(client RayDashboardClientInterface) // the New() returns error only if the size is less or equal than zero. cacheStorage, _ = lru.New[string, *JobInfoCache](cacheSize) } + + // expiry cache cleanup + go func() { + ticker := time.NewTicker(queryInterval * 10) + defer ticker.Stop() + + // TODO: observability + // TODO: should we consider the stop? + for range ticker.C { + keys := cacheStorage.Keys() + for _, key := range keys { + if cached, ok := cacheStorage.Peek(key); ok { + if time.Now().Add(-cacheExpiry).Before(*cached.UpdateAt) { + cacheStorage.Remove(key) + } + } + } + } + }() }) r.client = client From ac275c29b0497876c3976c14ba744dd622e96c1c Mon Sep 17 00:00:00 2001 From: fscnick Date: Thu, 30 Oct 2025 22:00:48 +0800 Subject: [PATCH 07/11] [RayJob] code and comment minor fix Signed-off-by: fscnick --- .../controllers/ray/rayjob_controller.go | 2 +- .../dashboardclient/dashboard_cache_client.go | 18 +++++++++++------- .../dashboardclient/dashboard_httpclient.go | 2 -- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index 2cd24da0afa..1d05e625bdc 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -760,11 +760,11 @@ func (r *RayJobReconciler) deleteClusterResources(ctx context.Context, rayJobIns logger.Info("The deletion of the associated RayCluster for RayJob is ongoing.", "RayCluster", cluster.Name) } else { if r.useBackgroundGoroutine { - // clear cache, and it will remove this job from updating loop. rayDashboardClient, err := r.dashboardClientFunc(&cluster, rayJobInstance.Status.DashboardURL) if err != nil { logger.Error(err, "Failed to get dashboard client for RayJob") } + // clear cache, and it will remove this job from the cache updating loop. if err := rayDashboardClient.StopJob(ctx, rayJobInstance.Status.JobId); err != nil { logger.Error(err, "Failed to stop job for RayJob") } diff --git a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go index 0f33589876b..37b7410173d 100644 --- a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go +++ b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go @@ -27,9 +27,11 @@ const ( ) var ( + // singleton initWorkPool sync.Once pool workerPool + // singleton initCacheStorage sync.Once cacheStorage *lru.Cache[string, *JobInfoCache] ) @@ -93,13 +95,14 @@ func (r *RayDashboardCacheClient) InitClient(client RayDashboardClientInterface) ticker := time.NewTicker(queryInterval * 10) defer ticker.Stop() - // TODO: observability - // TODO: should we consider the stop? + // TODO: observability ? + // TODO: should we consider the stop ? for range ticker.C { keys := cacheStorage.Keys() + expiredThreshold := time.Now().Add(-cacheExpiry) for _, key := range keys { if cached, ok := cacheStorage.Peek(key); ok { - if time.Now().Add(-cacheExpiry).Before(*cached.UpdateAt) { + if cached.UpdateAt.Before(expiredThreshold) { cacheStorage.Remove(key) } } @@ -127,12 +130,15 @@ func (r *RayDashboardCacheClient) GetJobInfo(ctx context.Context, jobId string) if cached, ok := cacheStorage.Get(jobId); ok { return cached.JobInfo, cached.Err } - cached := &JobInfoCache{Err: ErrAgain} + currentTime := time.Now() + cached := &JobInfoCache{Err: ErrAgain, UpdateAt: ¤tTime} + + // Put a placeholder in storage. The cache will be updated only if the placeholder exists. + // The placeholder will be removed when StopJob or DeleteJob. if cached, existed, _ := cacheStorage.PeekOrAdd(jobId, cached); existed { return cached.JobInfo, cached.Err } - // send to worker pool task := func() bool { jobInfoCache, existed := cacheStorage.Get(jobId) if !existed { @@ -143,8 +149,6 @@ func (r *RayDashboardCacheClient) GetJobInfo(ctx context.Context, jobId string) currentTime := time.Now() jobInfoCache.UpdateAt = ¤tTime - // handle not found(ex: rayjob has deleted) - cacheStorage.Add(jobId, jobInfoCache) return !rayv1.IsJobTerminal(jobInfoCache.JobInfo.JobStatus) diff --git a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go index 98bd8da369c..570ce082243 100644 --- a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go +++ b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go @@ -27,8 +27,6 @@ var ( ) type RayDashboardClientInterface interface { - // Remove InitClient for adapting variable implementation - // InitClient(client *http.Client, dashboardURL string) UpdateDeployments(ctx context.Context, configJson []byte) error // V2/multi-app Rest API GetServeDetails(ctx context.Context) (*utiltypes.ServeDetails, error) From 0923ef5c1b307c166c3ae2726ee66e20d5fb6e2d Mon Sep 17 00:00:00 2001 From: fscnick Date: Sat, 1 Nov 2025 22:14:41 +0800 Subject: [PATCH 08/11] [RayJob] task check contain or not befor add Signed-off-by: fscnick --- .../ray/utils/dashboardclient/dashboard_cache_client.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go index 37b7410173d..38deb5825b1 100644 --- a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go +++ b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go @@ -131,11 +131,11 @@ func (r *RayDashboardCacheClient) GetJobInfo(ctx context.Context, jobId string) return cached.JobInfo, cached.Err } currentTime := time.Now() - cached := &JobInfoCache{Err: ErrAgain, UpdateAt: ¤tTime} + placeholder := &JobInfoCache{Err: ErrAgain, UpdateAt: ¤tTime} // Put a placeholder in storage. The cache will be updated only if the placeholder exists. // The placeholder will be removed when StopJob or DeleteJob. - if cached, existed, _ := cacheStorage.PeekOrAdd(jobId, cached); existed { + if cached, existed, _ := cacheStorage.PeekOrAdd(jobId, placeholder); existed { return cached.JobInfo, cached.Err } @@ -149,7 +149,9 @@ func (r *RayDashboardCacheClient) GetJobInfo(ctx context.Context, jobId string) currentTime := time.Now() jobInfoCache.UpdateAt = ¤tTime - cacheStorage.Add(jobId, jobInfoCache) + if _, existed := cacheStorage.ContainsOrAdd(jobId, jobInfoCache); !existed { + return false + } return !rayv1.IsJobTerminal(jobInfoCache.JobInfo.JobStatus) } From 9f87da63f2ecb952e8d69a3b5327652dff4c8832 Mon Sep 17 00:00:00 2001 From: fscnick Date: Tue, 2 Dec 2025 20:09:25 +0800 Subject: [PATCH 09/11] [RayJob] remove delete cache from deleteClusterResources and add lock for cache Signed-off-by: fscnick --- .../controllers/ray/rayjob_controller.go | 30 ++++++------------- .../dashboardclient/dashboard_cache_client.go | 28 ++++++++++++++++- ray-operator/controllers/ray/utils/util.go | 1 - 3 files changed, 36 insertions(+), 23 deletions(-) diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index 1d05e625bdc..68920c27455 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -43,11 +43,10 @@ const ( // RayJobReconciler reconciles a RayJob object type RayJobReconciler struct { client.Client - Recorder record.EventRecorder - options RayJobReconcilerOptions - Scheme *runtime.Scheme - dashboardClientFunc func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) - useBackgroundGoroutine bool + Recorder record.EventRecorder + options RayJobReconcilerOptions + Scheme *runtime.Scheme + dashboardClientFunc func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) } type RayJobReconcilerOptions struct { @@ -59,12 +58,11 @@ type RayJobReconcilerOptions struct { func NewRayJobReconciler(_ context.Context, mgr manager.Manager, options RayJobReconcilerOptions, provider utils.ClientProvider) *RayJobReconciler { dashboardClientFunc := provider.GetDashboardClient(mgr) return &RayJobReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - Recorder: mgr.GetEventRecorderFor("rayjob-controller"), - dashboardClientFunc: dashboardClientFunc, - useBackgroundGoroutine: provider.DoesUseBackgroundGoroutine(), - options: options, + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Recorder: mgr.GetEventRecorderFor("rayjob-controller"), + dashboardClientFunc: dashboardClientFunc, + options: options, } } @@ -759,16 +757,6 @@ func (r *RayJobReconciler) deleteClusterResources(ctx context.Context, rayJobIns if !cluster.DeletionTimestamp.IsZero() { logger.Info("The deletion of the associated RayCluster for RayJob is ongoing.", "RayCluster", cluster.Name) } else { - if r.useBackgroundGoroutine { - rayDashboardClient, err := r.dashboardClientFunc(&cluster, rayJobInstance.Status.DashboardURL) - if err != nil { - logger.Error(err, "Failed to get dashboard client for RayJob") - } - // clear cache, and it will remove this job from the cache updating loop. - if err := rayDashboardClient.StopJob(ctx, rayJobInstance.Status.JobId); err != nil { - logger.Error(err, "Failed to stop job for RayJob") - } - } if err := r.Delete(ctx, &cluster); err != nil { r.Recorder.Eventf(rayJobInstance, corev1.EventTypeWarning, string(utils.FailedToDeleteRayCluster), "Failed to delete cluster %s/%s: %v", cluster.Namespace, cluster.Name, err) return false, err diff --git a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go index 38deb5825b1..945deb97160 100644 --- a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go +++ b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go @@ -34,6 +34,7 @@ var ( // singleton initCacheStorage sync.Once cacheStorage *lru.Cache[string, *JobInfoCache] + cacheLock sync.RWMutex ) type ( @@ -101,11 +102,13 @@ func (r *RayDashboardCacheClient) InitClient(client RayDashboardClientInterface) keys := cacheStorage.Keys() expiredThreshold := time.Now().Add(-cacheExpiry) for _, key := range keys { + cacheLock.Lock() if cached, ok := cacheStorage.Peek(key); ok { if cached.UpdateAt.Before(expiredThreshold) { cacheStorage.Remove(key) } } + cacheLock.Unlock() } } }() @@ -127,32 +130,49 @@ func (r *RayDashboardCacheClient) GetMultiApplicationStatus(ctx context.Context) } func (r *RayDashboardCacheClient) GetJobInfo(ctx context.Context, jobId string) (*utiltypes.RayJobInfo, error) { + cacheLock.RLock() if cached, ok := cacheStorage.Get(jobId); ok { + cacheLock.RUnlock() return cached.JobInfo, cached.Err } + cacheLock.RUnlock() + currentTime := time.Now() placeholder := &JobInfoCache{Err: ErrAgain, UpdateAt: ¤tTime} // Put a placeholder in storage. The cache will be updated only if the placeholder exists. // The placeholder will be removed when StopJob or DeleteJob. + cacheLock.Lock() if cached, existed, _ := cacheStorage.PeekOrAdd(jobId, placeholder); existed { + cacheLock.Unlock() return cached.JobInfo, cached.Err } + cacheLock.Unlock() task := func() bool { + cacheLock.RLock() jobInfoCache, existed := cacheStorage.Get(jobId) if !existed { + cacheLock.RUnlock() return false } + cacheLock.RUnlock() jobInfoCache.JobInfo, jobInfoCache.Err = r.client.GetJobInfo(ctx, jobId) currentTime := time.Now() jobInfoCache.UpdateAt = ¤tTime - if _, existed := cacheStorage.ContainsOrAdd(jobId, jobInfoCache); !existed { + cacheLock.Lock() + if existed := cacheStorage.Contains(jobId); !existed { + cacheLock.Unlock() return false } + cacheStorage.Add(jobId, jobInfoCache) + cacheLock.Unlock() + if jobInfoCache.JobInfo == nil { + return true + } return !rayv1.IsJobTerminal(jobInfoCache.JobInfo.JobStatus) } @@ -178,11 +198,17 @@ func (r *RayDashboardCacheClient) GetJobLog(ctx context.Context, jobName string) } func (r *RayDashboardCacheClient) StopJob(ctx context.Context, jobName string) error { + cacheLock.Lock() + defer cacheLock.Unlock() + cacheStorage.Remove(jobName) return r.client.StopJob(ctx, jobName) } func (r *RayDashboardCacheClient) DeleteJob(ctx context.Context, jobName string) error { + cacheLock.Lock() + defer cacheLock.Unlock() + cacheStorage.Remove(jobName) return r.client.DeleteJob(ctx, jobName) } diff --git a/ray-operator/controllers/ray/utils/util.go b/ray-operator/controllers/ray/utils/util.go index 31a93664b64..9bc463de718 100644 --- a/ray-operator/controllers/ray/utils/util.go +++ b/ray-operator/controllers/ray/utils/util.go @@ -653,7 +653,6 @@ func EnvVarByName(envName string, envVars []corev1.EnvVar) (corev1.EnvVar, bool) type ClientProvider interface { GetDashboardClient(mgr manager.Manager) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) GetHttpProxyClient(mgr manager.Manager) func(hostIp, podNamespace, podName string, port int) RayHttpProxyClientInterface - DoesUseBackgroundGoroutine() bool } func ManagedByExternalController(controllerName *string) *string { From 97ab4079929ca99eb580baf02e6827250e5aebf6 Mon Sep 17 00:00:00 2001 From: fscnick Date: Tue, 2 Dec 2025 20:18:52 +0800 Subject: [PATCH 10/11] [Helm] add argument for useBackgroundGoroutine Signed-off-by: fscnick --- helm-chart/kuberay-operator/README.md | 1 + helm-chart/kuberay-operator/templates/deployment.yaml | 3 +++ helm-chart/kuberay-operator/values.yaml | 4 ++++ 3 files changed, 8 insertions(+) diff --git a/helm-chart/kuberay-operator/README.md b/helm-chart/kuberay-operator/README.md index ecc0f8cf988..14e32aaec98 100644 --- a/helm-chart/kuberay-operator/README.md +++ b/helm-chart/kuberay-operator/README.md @@ -191,6 +191,7 @@ spec: | rbacEnable | bool | `true` | If rbacEnable is set to false, no RBAC resources will be created, including the Role for leader election, the Role for Pods and Services, and so on. | | crNamespacedRbacEnable | bool | `true` | When crNamespacedRbacEnable is set to true, the KubeRay operator will create a Role for RayCluster preparation (e.g., Pods, Services) and a corresponding RoleBinding for each namespace listed in the "watchNamespace" parameter. Please note that even if crNamespacedRbacEnable is set to false, the Role and RoleBinding for leader election will still be created. Note: (1) This variable is only effective when rbacEnable and singleNamespaceInstall are both set to true. (2) In most cases, it should be set to true, unless you are using a Kubernetes cluster managed by GitOps tools such as ArgoCD. | | singleNamespaceInstall | bool | `false` | When singleNamespaceInstall is true: - Install namespaced RBAC resources such as Role and RoleBinding instead of cluster-scoped ones like ClusterRole and ClusterRoleBinding so that the chart can be installed by users with permissions restricted to a single namespace. (Please note that this excludes the CRDs, which can only be installed at the cluster scope.) - If "watchNamespace" is not set, the KubeRay operator will, by default, only listen to resource events within its own namespace. | +| useBackgroundGoroutine | bool | `false` | When useBackgroundGoroutine is set to true: the KubeRay operator will use background goroutines to fetch the jobInfo from Ray clusters for RayJob status updates. | | env | string | `nil` | Environment variables. | | resources | object | `{"limits":{"cpu":"100m","memory":"512Mi"}}` | Resource requests and limits for containers. | | livenessProbe.initialDelaySeconds | int | `10` | | diff --git a/helm-chart/kuberay-operator/templates/deployment.yaml b/helm-chart/kuberay-operator/templates/deployment.yaml index 78cb0fe944d..14183095576 100644 --- a/helm-chart/kuberay-operator/templates/deployment.yaml +++ b/helm-chart/kuberay-operator/templates/deployment.yaml @@ -152,6 +152,9 @@ spec: {{- $argList = append $argList (printf "--burst=%v" .Values.kubeClient.burst) -}} {{- end -}} {{- end -}} + {{- if hasKey .Values "useBackgroundGoroutine" -}} + {{- $argList = append $argList (printf "--use-background-goroutine=%t" .Values.useBackgroundGoroutine) -}} + {{- end -}} {{- (printf "\n") -}} {{- $argList | toYaml | indent 12 }} ports: diff --git a/helm-chart/kuberay-operator/values.yaml b/helm-chart/kuberay-operator/values.yaml index 1b7b46020b0..a18c914acc5 100644 --- a/helm-chart/kuberay-operator/values.yaml +++ b/helm-chart/kuberay-operator/values.yaml @@ -185,6 +185,10 @@ crNamespacedRbacEnable: true # to resource events within its own namespace. singleNamespaceInstall: false +# -- When useBackgroundGoroutine is set to true: +# the KubeRay operator will use background goroutines to fetch the jobInfo from Ray clusters for RayJob status updates. +useBackgroundGoroutine: false + # The KubeRay operator will watch the custom resources in the namespaces listed in the "watchNamespace" parameter. # watchNamespace: # - n1 From a2a0961a5f363394cd7eefe4135ded40d64037bf Mon Sep 17 00:00:00 2001 From: fscnick Date: Wed, 3 Dec 2025 20:33:55 +0800 Subject: [PATCH 11/11] [RayJob] repeated error did not update Signed-off-by: fscnick --- .../utils/dashboardclient/dashboard_cache_client.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go index 945deb97160..59c35db3864 100644 --- a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go +++ b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go @@ -7,6 +7,7 @@ import ( "time" lru "github.com/hashicorp/golang-lru/v2" + k8serrors "k8s.io/apimachinery/pkg/api/errors" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" utiltypes "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/types" @@ -158,7 +159,17 @@ func (r *RayDashboardCacheClient) GetJobInfo(ctx context.Context, jobId string) } cacheLock.RUnlock() - jobInfoCache.JobInfo, jobInfoCache.Err = r.client.GetJobInfo(ctx, jobId) + var statusErr *k8serrors.StatusError + jobInfo, err := r.client.GetJobInfo(ctx, jobId) + if err != nil && !errors.As(err, &statusErr) { + if jobInfoCache.Err != nil && err.Error() == jobInfoCache.Err.Error() { + // The error is the same as last time, no need to update, just put the task to execute later. + // If the error is not fixed, eventually the cache will be expired and removed. + return true + } + } + jobInfoCache.JobInfo = jobInfo + jobInfoCache.Err = err currentTime := time.Now() jobInfoCache.UpdateAt = ¤tTime