Skip to content

Commit 95d58a8

Browse files
committed
add worker group
Signed-off-by: Ryan <[email protected]>
1 parent 3145953 commit 95d58a8

File tree

2 files changed

+115
-116
lines changed

2 files changed

+115
-116
lines changed
Lines changed: 88 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -1,135 +1,108 @@
1-
package e2e
1+
package e2erayservice
22

33
import (
4-
"fmt"
54
"testing"
6-
"time"
75

86
. "github.com/onsi/gomega"
97
corev1 "k8s.io/api/core/v1"
8+
"k8s.io/apimachinery/pkg/api/resource"
109
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10+
corev1ac "k8s.io/client-go/applyconfigurations/core/v1"
1111

1212
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
1313
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
1414
rayv1ac "github.com/ray-project/kuberay/ray-operator/pkg/client/applyconfiguration/ray/v1"
1515
. "github.com/ray-project/kuberay/ray-operator/test/support"
1616
)
1717

18-
// NewRayClusterSpecWithAuth creates a new RayClusterSpec with the specified AuthMode.
19-
func NewRayClusterSpecWithAuth(authMode rayv1.AuthMode) *rayv1ac.RayClusterSpecApplyConfiguration {
20-
return NewRayClusterSpec().
21-
WithAuthOptions(rayv1ac.AuthOptions().WithMode(authMode))
22-
}
23-
24-
func TestRayClusterAuthOptions(t *testing.T) {
18+
func TestRayServiceAuthToken(t *testing.T) {
2519
test := With(t)
2620
g := NewWithT(t)
2721

22+
// Create a namespace
2823
namespace := test.NewTestNamespace()
2924

30-
test.T().Run("RayCluster with token authentication enabled", func(t *testing.T) {
31-
t.Parallel()
32-
33-
rayClusterAC := rayv1ac.RayCluster("raycluster-auth-token", namespace.Name).
34-
WithSpec(NewRayClusterSpecWithAuth(rayv1.AuthModeToken).WithRayVersion("2.52"))
35-
36-
rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), rayClusterAC, TestApplyOptions)
37-
g.Expect(err).NotTo(HaveOccurred())
38-
LogWithTimestamp(test.T(), "Created RayCluster %s/%s successfully with AuthModeToken", rayCluster.Namespace, rayCluster.Name)
39-
40-
LogWithTimestamp(test.T(), "Waiting for RayCluster %s/%s to become ready", rayCluster.Namespace, rayCluster.Name)
41-
g.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium).
42-
Should(WithTransform(RayClusterState, Equal(rayv1.Ready)))
43-
44-
headPod, err := GetHeadPod(test, rayCluster)
45-
g.Expect(err).NotTo(HaveOccurred())
46-
g.Expect(headPod).NotTo(BeNil())
47-
48-
// Verify Ray container has auth token env vars
49-
VerifyContainerAuthTokenEnvVars(test, rayCluster, &headPod.Spec.Containers[utils.RayContainerIndex])
50-
51-
// Verify worker pods have auth token env vars
52-
workerPods, err := GetWorkerPods(test, rayCluster)
53-
g.Expect(err).NotTo(HaveOccurred())
54-
g.Expect(workerPods).ToNot(BeEmpty())
55-
for _, workerPod := range workerPods {
56-
VerifyContainerAuthTokenEnvVars(test, rayCluster, &workerPod.Spec.Containers[utils.RayContainerIndex])
57-
}
58-
59-
// Get auth token for job submission tests
60-
authToken := getAuthTokenFromPod(test, rayCluster, headPod)
61-
g.Expect(authToken).NotTo(BeEmpty(), "Auth token should be present")
62-
63-
// Test job submission with auth token using Ray Job CLI
64-
test.T().Run("Submit job with auth token should succeed", func(_ *testing.T) {
65-
LogWithTimestamp(test.T(), "Testing job submission WITH auth token")
66-
67-
submissionId := fmt.Sprintf("test-job-with-auth-%d", time.Now().Unix())
68-
69-
// Submit job via Ray Job CLI with auth token
70-
// Set RAY_AUTH_TOKEN environment variable for authentication
71-
submitCmd := []string{
72-
"bash", "-c",
73-
fmt.Sprintf("RAY_AUTH_TOKEN=%s ray job submit --address http://127.0.0.1:8265 --submission-id %s --no-wait -- python -c 'import ray; ray.init(); print(\"Job with auth succeeded\")'",
74-
authToken, submissionId),
75-
}
76-
77-
stdout, _ := ExecPodCmd(test, headPod, headPod.Spec.Containers[utils.RayContainerIndex].Name, submitCmd)
78-
79-
// Verify job was submitted successfully
80-
g.Expect(stdout.String()).To(ContainSubstring(submissionId), "Job submission should succeed with valid auth token")
81-
82-
// Verify job status is queryable with auth token (confirms auth works)
83-
g.Eventually(func(g Gomega) {
84-
statusCmd := []string{
85-
"bash", "-c",
86-
fmt.Sprintf("RAY_AUTH_TOKEN=%s ray job status --address http://127.0.0.1:8265 %s", authToken, submissionId),
87-
}
88-
stdout, _ := ExecPodCmd(test, headPod, headPod.Spec.Containers[utils.RayContainerIndex].Name, statusCmd)
89-
g.Expect(stdout.String()).To(ContainSubstring("succeeded"))
90-
}, TestTimeoutShort).Should(Succeed())
91-
92-
LogWithTimestamp(test.T(), "Successfully submitted and verified job with auth token")
93-
})
94-
95-
test.T().Run("Submit job with incorrect auth token should fail", func(_ *testing.T) {
96-
LogWithTimestamp(test.T(), "Testing job submission WITH incorrect auth token (should fail)")
97-
98-
submissionId := fmt.Sprintf("test-job-bad-auth-%d", time.Now().Unix())
99-
100-
// Submit job via Ray Job CLI with INCORRECT auth token
101-
incorrectToken := "incorrect-token-12345"
102-
submitCmd := []string{
103-
"bash", "-c",
104-
fmt.Sprintf("RAY_AUTH_TOKEN=%s ray job submit --address http://127.0.0.1:8265 --submission-id %s --no-wait -- python -c 'print(\"Should not run\")'",
105-
incorrectToken, submissionId),
106-
}
107-
108-
_, stderr := ExecPodCmd(test, headPod, headPod.Spec.Containers[utils.RayContainerIndex].Name, submitCmd, true)
109-
110-
// Verify response indicates authentication failure
111-
g.Expect(stderr.String()).To(ContainSubstring("Unauthenticated"), "Job submission should fail with Unauthorized when auth token is incorrect")
112-
113-
LogWithTimestamp(test.T(), "Job submission correctly rejected with incorrect auth token")
114-
})
115-
})
116-
}
117-
118-
// getAuthTokenFromPod extracts the auth token from the pod's environment variables.
119-
// It reads the token from the secret referenced by the RAY_AUTH_TOKEN environment variable.
120-
func getAuthTokenFromPod(test Test, rayCluster *rayv1.RayCluster, pod *corev1.Pod) string {
121-
test.T().Helper()
122-
g := NewWithT(test.T())
123-
124-
for _, envVar := range pod.Spec.Containers[utils.RayContainerIndex].Env {
125-
if envVar.Name == utils.RAY_AUTH_TOKEN_ENV_VAR {
126-
if envVar.ValueFrom != nil && envVar.ValueFrom.SecretKeyRef != nil {
127-
secret, err := test.Client().Core().CoreV1().Secrets(rayCluster.Namespace).
128-
Get(test.Ctx(), envVar.ValueFrom.SecretKeyRef.Name, metav1.GetOptions{})
129-
g.Expect(err).NotTo(HaveOccurred())
130-
return string(secret.Data[envVar.ValueFrom.SecretKeyRef.Key])
131-
}
132-
}
25+
// Create the RayService for testing with auth token using programmatic configuration
26+
rayServiceName := "rayservice-auth"
27+
rayServiceSpec := RayServiceSampleYamlApplyConfiguration()
28+
rayServiceSpec.RayClusterSpec.WithAuthOptions(rayv1ac.AuthOptions().WithMode(rayv1.AuthModeToken))
29+
30+
// Add a worker group to verify auth token propagation to workers
31+
workerGroupSpec := rayv1ac.WorkerGroupSpec().
32+
WithGroupName("small-group").
33+
WithReplicas(1).
34+
WithMinReplicas(1).
35+
WithMaxReplicas(1).
36+
WithRayStartParams(map[string]string{"num-cpus": "1"}).
37+
WithTemplate(corev1ac.PodTemplateSpec().
38+
WithSpec(corev1ac.PodSpec().
39+
WithContainers(corev1ac.Container().
40+
WithName("ray-worker").
41+
WithImage(GetRayImage()).
42+
WithResources(corev1ac.ResourceRequirements().
43+
WithRequests(corev1.ResourceList{
44+
corev1.ResourceCPU: resource.MustParse("1"),
45+
corev1.ResourceMemory: resource.MustParse("1Gi"),
46+
}).
47+
WithLimits(corev1.ResourceList{
48+
corev1.ResourceCPU: resource.MustParse("1"),
49+
corev1.ResourceMemory: resource.MustParse("2Gi"),
50+
})))))
51+
rayServiceSpec.RayClusterSpec.WithWorkerGroupSpecs(workerGroupSpec)
52+
53+
rayServiceAC := rayv1ac.RayService(rayServiceName, namespace.Name).WithSpec(rayServiceSpec)
54+
55+
rayService, err := test.Client().Ray().RayV1().RayServices(namespace.Name).Apply(test.Ctx(), rayServiceAC, TestApplyOptions)
56+
g.Expect(err).NotTo(HaveOccurred())
57+
g.Expect(rayService).NotTo(BeNil())
58+
LogWithTimestamp(test.T(), "Created RayService %s/%s successfully with AuthModeToken", rayService.Namespace, rayService.Name)
59+
60+
// Wait for RayService to be ready
61+
LogWithTimestamp(test.T(), "Waiting for RayService %s/%s to be ready", rayService.Namespace, rayService.Name)
62+
g.Eventually(RayService(test, rayService.Namespace, rayService.Name), TestTimeoutMedium).
63+
Should(WithTransform(IsRayServiceReady, BeTrue()))
64+
65+
// Get the RayService
66+
rayService, err = GetRayService(test, namespace.Name, rayServiceName)
67+
g.Expect(err).NotTo(HaveOccurred())
68+
LogWithTimestamp(test.T(), "RayService %s/%s is ready", rayService.Namespace, rayService.Name)
69+
70+
// Get the underlying RayCluster of the RayService
71+
rayClusterName := rayService.Status.ActiveServiceStatus.RayClusterName
72+
g.Expect(rayClusterName).NotTo(BeEmpty(), "RayCluster name should be populated")
73+
LogWithTimestamp(test.T(), "RayService %s/%s has active RayCluster %s", rayService.Namespace, rayService.Name, rayClusterName)
74+
75+
// Wait for the RayCluster to become ready
76+
LogWithTimestamp(test.T(), "Waiting for RayCluster %s/%s to become ready", namespace.Name, rayClusterName)
77+
g.Eventually(RayCluster(test, namespace.Name, rayClusterName), TestTimeoutMedium).
78+
Should(WithTransform(RayClusterState, Equal(rayv1.Ready)))
79+
80+
rayCluster, err := GetRayCluster(test, namespace.Name, rayClusterName)
81+
g.Expect(err).NotTo(HaveOccurred())
82+
83+
// Verify the head pod has auth token environment variables
84+
headPod, err := GetHeadPod(test, rayCluster)
85+
g.Expect(err).NotTo(HaveOccurred())
86+
g.Expect(headPod).NotTo(BeNil())
87+
LogWithTimestamp(test.T(), "Found head pod %s/%s", headPod.Namespace, headPod.Name)
88+
89+
// Verify Ray container has auth token env vars
90+
VerifyContainerAuthTokenEnvVars(test, rayCluster, &headPod.Spec.Containers[utils.RayContainerIndex])
91+
LogWithTimestamp(test.T(), "Verified auth token env vars in head pod Ray container")
92+
93+
// Verify worker pods have auth token env vars
94+
workerPods, err := GetWorkerPods(test, rayCluster)
95+
g.Expect(err).NotTo(HaveOccurred())
96+
g.Expect(workerPods).ToNot(BeEmpty(), "RayCluster should have at least one worker pod")
97+
LogWithTimestamp(test.T(), "Found %d worker pod(s)", len(workerPods))
98+
99+
for _, workerPod := range workerPods {
100+
VerifyContainerAuthTokenEnvVars(test, rayCluster, &workerPod.Spec.Containers[utils.RayContainerIndex])
101+
LogWithTimestamp(test.T(), "Verified auth token env vars in worker pod %s/%s", workerPod.Namespace, workerPod.Name)
133102
}
134-
return ""
135-
}
103+
104+
// Clean up the RayService
105+
err = test.Client().Ray().RayV1().RayServices(namespace.Name).Delete(test.Ctx(), rayService.Name, metav1.DeleteOptions{})
106+
g.Expect(err).NotTo(HaveOccurred())
107+
LogWithTimestamp(test.T(), "Deleted RayService %s/%s successfully", rayService.Namespace, rayService.Name)
108+
}

ray-operator/test/e2erayservice/rayservice_auth_test.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ import (
44
"testing"
55

66
. "github.com/onsi/gomega"
7+
corev1 "k8s.io/api/core/v1"
8+
"k8s.io/apimachinery/pkg/api/resource"
79
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10+
corev1ac "k8s.io/client-go/applyconfigurations/core/v1"
811

912
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
1013
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
@@ -24,6 +27,29 @@ func TestRayServiceAuthToken(t *testing.T) {
2427
rayServiceSpec := RayServiceSampleYamlApplyConfiguration()
2528
rayServiceSpec.RayClusterSpec.WithAuthOptions(rayv1ac.AuthOptions().WithMode(rayv1.AuthModeToken))
2629

30+
// Add a worker group to verify auth token propagation to workers
31+
workerGroupSpec := rayv1ac.WorkerGroupSpec().
32+
WithGroupName("small-group").
33+
WithReplicas(1).
34+
WithMinReplicas(1).
35+
WithMaxReplicas(1).
36+
WithRayStartParams(map[string]string{"num-cpus": "1"}).
37+
WithTemplate(corev1ac.PodTemplateSpec().
38+
WithSpec(corev1ac.PodSpec().
39+
WithContainers(corev1ac.Container().
40+
WithName("ray-worker").
41+
WithImage(GetRayImage()).
42+
WithResources(corev1ac.ResourceRequirements().
43+
WithRequests(corev1.ResourceList{
44+
corev1.ResourceCPU: resource.MustParse("1"),
45+
corev1.ResourceMemory: resource.MustParse("1Gi"),
46+
}).
47+
WithLimits(corev1.ResourceList{
48+
corev1.ResourceCPU: resource.MustParse("1"),
49+
corev1.ResourceMemory: resource.MustParse("2Gi"),
50+
})))))
51+
rayServiceSpec.RayClusterSpec.WithWorkerGroupSpecs(workerGroupSpec)
52+
2753
rayServiceAC := rayv1ac.RayService(rayServiceName, namespace.Name).WithSpec(rayServiceSpec)
2854

2955
rayService, err := test.Client().Ray().RayV1().RayServices(namespace.Name).Apply(test.Ctx(), rayServiceAC, TestApplyOptions)
@@ -79,4 +105,4 @@ func TestRayServiceAuthToken(t *testing.T) {
79105
err = test.Client().Ray().RayV1().RayServices(namespace.Name).Delete(test.Ctx(), rayService.Name, metav1.DeleteOptions{})
80106
g.Expect(err).NotTo(HaveOccurred())
81107
LogWithTimestamp(test.T(), "Deleted RayService %s/%s successfully", rayService.Namespace, rayService.Name)
82-
}
108+
}

0 commit comments

Comments
 (0)