1
1
from codeflare_sdk import (
2
2
Cluster ,
3
3
ClusterConfiguration ,
4
- TokenAuthentication ,
5
4
generate_cert ,
6
5
)
7
6
8
7
import pytest
9
8
import ray
10
9
import math
10
+ import time
11
+ import subprocess
11
12
12
13
from support import *
13
14
16
17
class TestRayLocalInteractiveOauth :
17
18
def setup_method (self ):
18
19
initialize_kubernetes_client (self )
20
+ self .port_forward_process = None
19
21
20
22
def teardown_method (self ):
23
+ if self .port_forward_process :
24
+ self .port_forward_process .terminate ()
25
+ try :
26
+ self .port_forward_process .wait (timeout = 10 )
27
+ except subprocess .TimeoutExpired :
28
+ self .port_forward_process .kill ()
29
+ self .port_forward_process .wait ()
30
+ self .port_forward_process = None
21
31
delete_namespace (self )
22
32
delete_kueue_resources (self )
23
33
@@ -39,6 +49,8 @@ def run_local_interactives(
39
49
):
40
50
cluster_name = "test-ray-cluster-li"
41
51
52
+ ray .shutdown ()
53
+
42
54
cluster = Cluster (
43
55
ClusterConfiguration (
44
56
name = cluster_name ,
@@ -49,45 +61,124 @@ def run_local_interactives(
49
61
head_memory_requests = 2 ,
50
62
head_memory_limits = 2 ,
51
63
worker_cpu_requests = "500m" ,
52
- worker_cpu_limits = 1 ,
64
+ worker_cpu_limits = "500m" ,
53
65
worker_memory_requests = 1 ,
54
66
worker_memory_limits = 4 ,
55
67
worker_extended_resource_requests = {gpu_resource_name : number_of_gpus },
56
- write_to_file = True ,
57
68
verify_tls = False ,
58
69
)
59
70
)
60
- cluster .up ()
61
- cluster .wait_ready ()
62
-
63
- generate_cert .generate_tls_cert (cluster_name , self .namespace )
64
- generate_cert .export_env (cluster_name , self .namespace )
65
-
66
- print (cluster .local_client_url ())
67
71
68
- ray .shutdown ()
69
- ray .init (address = cluster .local_client_url (), logging_level = "DEBUG" )
70
-
71
- @ray .remote (num_gpus = number_of_gpus / 2 )
72
- def heavy_calculation_part (num_iterations ):
73
- result = 0.0
74
- for i in range (num_iterations ):
75
- for j in range (num_iterations ):
76
- for k in range (num_iterations ):
77
- result += math .sin (i ) * math .cos (j ) * math .tan (k )
78
- return result
79
-
80
- @ray .remote (num_gpus = number_of_gpus / 2 )
81
- def heavy_calculation (num_iterations ):
82
- results = ray .get (
83
- [heavy_calculation_part .remote (num_iterations // 30 ) for _ in range (30 )]
72
+ try :
73
+ cluster .up ()
74
+
75
+ cluster .wait_ready ()
76
+ cluster .status ()
77
+
78
+ TIMEOUT = 300 # 5 minutes
79
+ END = time .time () + TIMEOUT
80
+
81
+ head_pod_name = None
82
+ worker_pod_name = None
83
+
84
+ while time .time () < END :
85
+ if not head_pod_name :
86
+ head_pod_name = kubectl_get_pod_name_by_substring (
87
+ self .namespace , cluster_name , "head"
88
+ )
89
+ if not worker_pod_name :
90
+ worker_pod_name = kubectl_get_pod_name_by_substring (
91
+ self .namespace , cluster_name , "worker"
92
+ )
93
+
94
+ head_status = (
95
+ kubectl_get_pod_status (self .namespace , head_pod_name )
96
+ if head_pod_name
97
+ else "NotFound"
98
+ )
99
+ worker_status = (
100
+ kubectl_get_pod_status (self .namespace , worker_pod_name )
101
+ if worker_pod_name
102
+ else "NotFound"
103
+ )
104
+
105
+ if (
106
+ head_pod_name
107
+ and worker_pod_name
108
+ and "Running" in head_status
109
+ and "Running" in worker_status
110
+ ):
111
+ head_ready = kubectl_get_pod_ready (self .namespace , head_pod_name )
112
+ worker_ready = kubectl_get_pod_ready (
113
+ self .namespace , worker_pod_name
114
+ )
115
+ if head_ready and worker_ready :
116
+ break
117
+ time .sleep (10 )
118
+
119
+ generate_cert .generate_tls_cert (cluster_name , self .namespace )
120
+ generate_cert .export_env (cluster_name , self .namespace )
121
+
122
+ local_port = "20001"
123
+ ray_client_port = "10001"
124
+ head_service_name = f"{ cluster_name } -head-svc"
125
+
126
+ port_forward_cmd = [
127
+ "kubectl" ,
128
+ "port-forward" ,
129
+ "-n" ,
130
+ self .namespace ,
131
+ f"svc/{ head_service_name } " ,
132
+ f"{ local_port } :{ ray_client_port } " ,
133
+ ]
134
+ self .port_forward_process = subprocess .Popen (
135
+ port_forward_cmd , stdout = subprocess .DEVNULL , stderr = subprocess .DEVNULL
84
136
)
85
- return sum (results )
86
-
87
- ref = heavy_calculation .remote (3000 )
88
- result = ray .get (ref )
89
- assert result == 1789.4644387076714
90
- ray .cancel (ref )
91
- ray .shutdown ()
92
-
93
- cluster .down ()
137
+ time .sleep (5 )
138
+
139
+ client_url = f"ray://localhost:{ local_port } "
140
+ cluster .status ()
141
+
142
+ ray .init (address = client_url , logging_level = "INFO" )
143
+
144
+ @ray .remote (num_gpus = number_of_gpus / 2 )
145
+ def heavy_calculation_part (num_iterations ):
146
+ result = 0.0
147
+ for i in range (num_iterations ):
148
+ for j in range (num_iterations ):
149
+ for k in range (num_iterations ):
150
+ result += math .sin (i ) * math .cos (j ) * math .tan (k )
151
+ return result
152
+
153
+ @ray .remote (num_gpus = number_of_gpus / 2 )
154
+ def heavy_calculation (num_iterations ):
155
+ results = ray .get (
156
+ [
157
+ heavy_calculation_part .remote (num_iterations // 30 )
158
+ for _ in range (30 )
159
+ ]
160
+ )
161
+ return sum (results )
162
+
163
+ ref = heavy_calculation .remote (3000 )
164
+
165
+ try :
166
+ result = ray .get (ref )
167
+ assert result == 1789.4644387076714
168
+ except Exception as e :
169
+ raise
170
+ finally :
171
+ ray .cancel (ref )
172
+
173
+ ray .shutdown ()
174
+
175
+ finally :
176
+ if self .port_forward_process :
177
+ self .port_forward_process .terminate ()
178
+ try :
179
+ self .port_forward_process .wait (timeout = 10 )
180
+ except subprocess .TimeoutExpired :
181
+ self .port_forward_process .kill ()
182
+ self .port_forward_process .wait ()
183
+ self .port_forward_process = None
184
+ cluster .down ()
0 commit comments