|
2 | 2 |
|
3 | 3 | from codeflare_sdk import Cluster, ClusterConfiguration
|
4 | 4 | from codeflare_sdk.ray.rayjobs import RayJob
|
5 |
| -from codeflare_sdk.ray.rayjobs.status import CodeflareRayJobStatus |
6 | 5 |
|
7 | 6 | import pytest
|
8 | 7 |
|
@@ -105,104 +104,120 @@ def assert_rayjob_submit_against_existing_cluster(
|
105 | 104 | ), f"Job submission failed, expected {job_name}, got {submission_result}"
|
106 | 105 | print(f"✅ Successfully submitted RayJob '{job_name}' against existing cluster")
|
107 | 106 |
|
108 |
| - # Debug: Check if RayJob resource was actually created |
109 |
| - import subprocess |
110 |
| - import time |
111 |
| - |
112 |
| - print("🔍 Checking if RayJob resource exists in Kubernetes...") |
113 |
| - for attempt in range(6): # Check for 30 seconds |
114 |
| - try: |
115 |
| - # Check if RayJob resource exists |
116 |
| - result = subprocess.run( |
117 |
| - ["kubectl", "get", "rayjobs", "-n", self.namespace, job_name], |
118 |
| - capture_output=True, |
119 |
| - text=True, |
120 |
| - timeout=10, |
121 |
| - ) |
122 |
| - if result.returncode == 0: |
123 |
| - print(f"✅ RayJob resource '{job_name}' found in Kubernetes!") |
124 |
| - print(f"RayJob details:\n{result.stdout}") |
125 |
| - break |
126 |
| - else: |
127 |
| - print( |
128 |
| - f"❌ Attempt {attempt + 1}: RayJob resource '{job_name}' not found" |
129 |
| - ) |
130 |
| - if attempt < 5: |
131 |
| - time.sleep(5) |
132 |
| - except Exception as e: |
133 |
| - print(f"❌ Error checking RayJob: {e}") |
134 |
| - |
135 |
| - # Also check what RayJob resources exist in the namespace |
136 |
| - try: |
137 |
| - result = subprocess.run( |
138 |
| - ["kubectl", "get", "rayjobs", "-n", self.namespace], |
139 |
| - capture_output=True, |
140 |
| - text=True, |
141 |
| - timeout=10, |
142 |
| - ) |
143 |
| - print(f"📋 All RayJobs in namespace '{self.namespace}':\n{result.stdout}") |
144 |
| - except Exception as e: |
145 |
| - print(f"❌ Error listing RayJobs: {e}") |
146 |
| - |
147 |
| - # Monitor the job status until completion |
148 |
| - self.monitor_rayjob_completion(rayjob, timeout=900) |
| 107 | + # Monitor the job status until completion using kubectl (Kind-specific workaround) |
| 108 | + self.monitor_rayjob_completion_kubectl(job_name, timeout=900) |
149 | 109 |
|
150 | 110 | print(f"✅ RayJob '{job_name}' completed successfully against existing cluster!")
|
151 | 111 |
|
152 |
| - def monitor_rayjob_completion(self, rayjob: RayJob, timeout: int = 900): |
| 112 | + def monitor_rayjob_completion_kubectl(self, job_name: str, timeout: int = 900): |
153 | 113 | """
|
154 |
| - Monitor a RayJob until it completes or fails. |
| 114 | + Monitor a RayJob until it completes or fails using kubectl directly. |
| 115 | + This is a workaround for Kind clusters where the SDK status method doesn't work. |
155 | 116 |
|
156 | 117 | Args:
|
157 |
| - rayjob: The RayJob instance to monitor |
| 118 | + job_name: The name of the RayJob to monitor |
158 | 119 | timeout: Maximum time to wait in seconds (default: 15 minutes)
|
159 | 120 | """
|
160 |
| - print(f"⏳ Monitoring RayJob '{rayjob.name}' status...") |
| 121 | + import subprocess |
| 122 | + import time |
| 123 | + |
| 124 | + print(f"⏳ Monitoring RayJob '{job_name}' status using kubectl...") |
161 | 125 |
|
162 | 126 | elapsed_time = 0
|
163 | 127 | check_interval = 10 # Check every 10 seconds
|
164 |
| - job_found = False # Track if we've seen the job at least once |
165 | 128 |
|
166 | 129 | while elapsed_time < timeout:
|
167 |
| - status, ready = rayjob.status(print_to_console=True) |
168 |
| - |
169 |
| - # Track if we've found the job (not UNKNOWN status) |
170 |
| - if status != CodeflareRayJobStatus.UNKNOWN: |
171 |
| - job_found = True |
172 |
| - |
173 |
| - # Check if job has completed (either successfully or failed) |
174 |
| - if status == CodeflareRayJobStatus.COMPLETE: |
175 |
| - print(f"✅ RayJob '{rayjob.name}' completed successfully!") |
176 |
| - return |
177 |
| - elif status == CodeflareRayJobStatus.FAILED: |
178 |
| - raise AssertionError(f"❌ RayJob '{rayjob.name}' failed!") |
179 |
| - elif status == CodeflareRayJobStatus.RUNNING: |
180 |
| - print(f"🏃 RayJob '{rayjob.name}' is still running...") |
181 |
| - elif status == CodeflareRayJobStatus.UNKNOWN: |
182 |
| - if job_found: |
183 |
| - # If we've seen the job before but now it's unknown, that's concerning |
184 |
| - print( |
185 |
| - f"⚠️ RayJob '{rayjob.name}' status became unknown after being found" |
| 130 | + try: |
| 131 | + # Get RayJob status using kubectl |
| 132 | + result = subprocess.run( |
| 133 | + [ |
| 134 | + "kubectl", |
| 135 | + "get", |
| 136 | + "rayjobs", |
| 137 | + "-n", |
| 138 | + self.namespace, |
| 139 | + job_name, |
| 140 | + "-o", |
| 141 | + "jsonpath={.status.jobDeploymentStatus}", |
| 142 | + ], |
| 143 | + capture_output=True, |
| 144 | + text=True, |
| 145 | + timeout=10, |
| 146 | + ) |
| 147 | + |
| 148 | + if result.returncode == 0: |
| 149 | + status = result.stdout.strip() |
| 150 | + |
| 151 | + # Also get job status for more details |
| 152 | + job_status_result = subprocess.run( |
| 153 | + [ |
| 154 | + "kubectl", |
| 155 | + "get", |
| 156 | + "rayjobs", |
| 157 | + "-n", |
| 158 | + self.namespace, |
| 159 | + job_name, |
| 160 | + "-o", |
| 161 | + "jsonpath={.status.jobStatus}", |
| 162 | + ], |
| 163 | + capture_output=True, |
| 164 | + text=True, |
| 165 | + timeout=10, |
186 | 166 | )
|
187 |
| - else: |
188 |
| - # Job hasn't appeared yet, this is normal initially |
| 167 | + job_status = ( |
| 168 | + job_status_result.stdout.strip() |
| 169 | + if job_status_result.returncode == 0 |
| 170 | + else "Unknown" |
| 171 | + ) |
| 172 | + |
189 | 173 | print(
|
190 |
| - f"⏳ Waiting for RayJob '{rayjob.name}' to appear in Kubernetes..." |
| 174 | + f"📊 RayJob '{job_name}' - Deployment Status: {status}, Job Status: {job_status}" |
191 | 175 | )
|
192 | 176 |
|
| 177 | + # Check completion status |
| 178 | + if status == "Complete" or job_status == "SUCCEEDED": |
| 179 | + print(f"✅ RayJob '{job_name}' completed successfully!") |
| 180 | + return |
| 181 | + elif status == "Failed" or job_status == "FAILED": |
| 182 | + # Get error details |
| 183 | + try: |
| 184 | + error_result = subprocess.run( |
| 185 | + [ |
| 186 | + "kubectl", |
| 187 | + "get", |
| 188 | + "rayjobs", |
| 189 | + "-n", |
| 190 | + self.namespace, |
| 191 | + job_name, |
| 192 | + "-o", |
| 193 | + "yaml", |
| 194 | + ], |
| 195 | + capture_output=True, |
| 196 | + text=True, |
| 197 | + timeout=10, |
| 198 | + ) |
| 199 | + print( |
| 200 | + f"❌ RayJob '{job_name}' failed! Details:\n{error_result.stdout}" |
| 201 | + ) |
| 202 | + except: |
| 203 | + pass |
| 204 | + raise AssertionError(f"❌ RayJob '{job_name}' failed!") |
| 205 | + elif status == "Running" or job_status == "RUNNING": |
| 206 | + print(f"🏃 RayJob '{job_name}' is still running...") |
| 207 | + else: |
| 208 | + print(f"⏳ RayJob '{job_name}' status: {status}") |
| 209 | + |
| 210 | + else: |
| 211 | + print(f"❌ Could not get RayJob status: {result.stderr}") |
| 212 | + |
| 213 | + except Exception as e: |
| 214 | + print(f"❌ Error checking RayJob status: {e}") |
| 215 | + |
193 | 216 | # Wait before next check
|
194 | 217 | sleep(check_interval)
|
195 | 218 | elapsed_time += check_interval
|
196 | 219 |
|
197 | 220 | # If we reach here, the job has timed out
|
198 |
| - final_status, _ = rayjob.status(print_to_console=True) |
199 |
| - if not job_found: |
200 |
| - raise TimeoutError( |
201 |
| - f"⏰ RayJob '{rayjob.name}' was never found in Kubernetes within {timeout} seconds. " |
202 |
| - f"Check if the RayJob resource was created successfully." |
203 |
| - ) |
204 |
| - else: |
205 |
| - raise TimeoutError( |
206 |
| - f"⏰ RayJob '{rayjob.name}' did not complete within {timeout} seconds. " |
207 |
| - f"Final status: {final_status}" |
208 |
| - ) |
| 221 | + raise TimeoutError( |
| 222 | + f"⏰ RayJob '{job_name}' did not complete within {timeout} seconds." |
| 223 | + ) |
0 commit comments