-
Notifications
You must be signed in to change notification settings - Fork 420
[2/2] add fault torlance #1311
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[2/2] add fault torlance #1311
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This work-in-progress PR implements instant fault tolerance for rollout engines in slime. When a rollout engine fails during training, the system now immediately kills and restarts it using remote weight loading from active engines, rather than waiting for the current rollout round to complete. This is achieved by querying the router for healthy workers and using SGLang's transfer_engine backend to bootstrap new engines from existing ones, avoiding the overhead of loading weights from storage.
Key changes:
- Implemented instant engine restart with remote weight loading when health checks fail
- Added logic to query the router for active seed instances to use for weight transfer
- Extended engine initialization to support remote_seed_instance parameter for fault tolerance scenarios
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| slime/utils/health_monitor.py | Implements core fault tolerance logic including get_active_seed_instance, instant kill-and-restart mechanism, and port allocation for restarted engines |
| slime/ray/rollout.py | Adds support for remote seed instance during initial engine setup when fault tolerance is enabled |
| slime/backends/sglang_utils/sglang_engine.py | Extends engine initialization to accept remote_seed_instance parameter and configures remote weight loading |
| docs/zh/advanced/fault-torlance.md | Documents the instant restart mechanism and configuration parameters in Chinese |
| docs/en/advanced/fault-tolerance.md | Documents the instant restart mechanism and configuration parameters in English |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
slime/utils/health_monitor.py
Outdated
| num_gpu_per_engine = min(args.rollout_num_gpus_per_engine, args.num_gpus_per_node) | ||
| num_engines = args.rollout_num_gpus // num_gpu_per_engine | ||
|
|
||
| addr_and_ports = [{} for _ in range(num_engines)] |
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The addr_and_ports dict is initialized with size num_engines (line 307), but new_engines contains only the engines for the current rollout_engine_id group being restarted. When setting addr_and_ports[rank] where rank could be >= num_engines in multi-node scenarios, this could cause an IndexError. Consider initializing addr_and_ports as an empty dict {} instead of a list, or ensure that only valid indices are accessed.
| addr_and_ports = [{} for _ in range(num_engines)] | |
| # Size addr_and_ports based on the maximum rank present in this engine group | |
| max_rank = max(rank for rank, _ in new_engines) | |
| addr_and_ports = [{} for _ in range(max_rank + 1)] |
slime/utils/health_monitor.py
Outdated
| def get_active_seed_instance(args, exclude_urls: list[str] | None = None): | ||
| """Get an active seed instance from the router for fault tolerance restart. | ||
| When restarting failed engines, this function queries the router to find active workers | ||
| and returns the connection info for one of them to be used as a seed instance for | ||
| remote weight loading. | ||
| Args: | ||
| args: The global arguments containing router IP and port. | ||
| exclude_urls: A list of worker URLs to exclude (e.g., the URLs of killed engines). | ||
| Returns: | ||
| A dict with 'ip' and 'port' keys for the seed instance, or None if no active | ||
| workers are found. | ||
| """ | ||
| router_ip = args.sglang_router_ip | ||
| router_port = args.sglang_router_port | ||
| exclude_urls = exclude_urls or [] | ||
|
|
||
| if not router_ip or not router_port: | ||
| logger.warning("Router IP or port not set, cannot get active seed instance.") | ||
| return None | ||
|
|
||
| try: | ||
| # Query the router to get active workers | ||
| if parse(sglang_router.__version__) <= parse("0.2.1") or args.use_slime_router: | ||
| response = requests.get(f"http://{router_ip}:{router_port}/list_workers", timeout=5) | ||
| response.raise_for_status() | ||
| data = response.json() | ||
| worker_urls = data.get("urls", []) | ||
| else: | ||
| response = requests.get(f"http://{router_ip}:{router_port}/workers", timeout=5) | ||
| response.raise_for_status() | ||
| data = response.json() | ||
| workers = data.get("workers", []) | ||
| worker_urls = [w["url"] for w in workers] | ||
|
|
||
| if not worker_urls: | ||
| logger.warning("No active workers found in router.") | ||
| return None | ||
|
|
||
| # Filter out excluded URLs (normalize for comparison) | ||
| def normalize_url(url): | ||
| """Normalize URL for comparison (remove trailing slash, etc.)""" | ||
| return url.rstrip("/").lower() | ||
|
|
||
| exclude_urls_normalized = {normalize_url(u) for u in exclude_urls} | ||
| available_urls = [u for u in worker_urls if normalize_url(u) not in exclude_urls_normalized] | ||
|
|
||
| if not available_urls: | ||
| logger.warning(f"No active workers found after excluding {exclude_urls}. All workers: {worker_urls}") | ||
| return None | ||
|
|
||
| # Parse the first available worker's URL to get IP and port | ||
| seed_url = available_urls[0] | ||
| parsed = urlparse(seed_url) | ||
|
|
||
| # Handle IPv6 addresses (may be wrapped in []) | ||
| host = parsed.hostname or parsed.netloc.rsplit(":", 1)[0] | ||
| port = parsed.port or 30000 | ||
|
|
||
| logger.info(f"Found active seed instance for fault tolerance: {host}:{port} (excluded: {exclude_urls})") | ||
| return {"ip": host, "port": port} | ||
|
|
||
| except Exception as e: | ||
| logger.warning(f"Failed to get active seed instance from router: {e}") | ||
| return None |
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new fault tolerance feature, including the instant restart with remote weight loading and the get_active_seed_instance function, lacks test coverage. Given the complexity of the restart logic and the various edge cases (multi-node engines, prefill/decode workers, port allocation), consider adding tests to verify correct behavior under different failure scenarios.
|
|
||
| try: | ||
| # Query the router to get active workers | ||
| if parse(sglang_router.__version__) <= parse("0.2.1") or args.use_slime_router: |
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The version comparison uses parse(sglang_router.version) <= parse("0.2.1") to determine which router endpoint to use. If sglang_router.version is not a valid version string or the package doesn't define version, this will raise an exception. Consider adding error handling for cases where the version cannot be parsed, or check if the attribute exists first.
| if parse(sglang_router.__version__) <= parse("0.2.1") or args.use_slime_router: | |
| use_list_workers = False | |
| if args.use_slime_router: | |
| use_list_workers = True | |
| else: | |
| router_version = getattr(sglang_router, "__version__", None) | |
| if router_version is None: | |
| logger.warning( | |
| "sglang_router.__version__ is not set; defaulting to /workers endpoint." | |
| ) | |
| else: | |
| try: | |
| use_list_workers = parse(router_version) <= parse("0.2.1") | |
| except Exception as version_err: | |
| logger.warning( | |
| "Failed to parse sglang_router.__version__ (%r): %s; " | |
| "defaulting to /workers endpoint.", | |
| router_version, | |
| version_err, | |
| ) | |
| if use_list_workers: |
slime/utils/health_monitor.py
Outdated
| "SGL_DISABLE_TP_MEMORY_INBALANCE_CHECK": "true", | ||
| "SGLANG_DISABLE_TP_MEMORY_INBALANCE_CHECK": "true", |
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The variable name INBALANCE is misspelled. It should be IMBALANCE to correctly spell "imbalance".
| "SGL_DISABLE_TP_MEMORY_INBALANCE_CHECK": "true", | |
| "SGLANG_DISABLE_TP_MEMORY_INBALANCE_CHECK": "true", | |
| "SGL_DISABLE_TP_MEMORY_IMBALANCE_CHECK": "true", | |
| "SGLANG_DISABLE_TP_MEMORY_IMBALANCE_CHECK": "true", |
slime/utils/health_monitor.py
Outdated
| server_host = ray.get(engine.get_server_host.remote()) | ||
| server_port = ray.get(engine.get_server_port.remote()) | ||
| killed_url = f"http://{server_host}:{server_port}" | ||
| killed_engine_urls.append(killed_url) | ||
| logger.info(f"Engine at index {i} has URL: {killed_url}") | ||
| except Exception as e: | ||
| logger.warning(f"Could not get URL for engine at index {i}: {e}") |
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the failed engine is already unhealthy, there's a risk that the ray.get calls for get_server_host and get_server_port may also hang or timeout, potentially blocking the restart process. Consider wrapping these ray.get calls with a timeout to ensure the restart process doesn't get stuck.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
|
|
||
| # Handle IPv6 addresses (may be wrapped in []) | ||
| host = parsed.hostname or parsed.netloc.rsplit(":", 1)[0] | ||
| port = parsed.port or 30000 |
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The hardcoded default port of 30000 may not be accurate. If parsed.port is None (meaning no port was specified in the URL), it would be better to either log a warning or use a more standard default port like 80 for HTTP or document why 30000 is the appropriate default.
| port = parsed.port or 30000 | |
| if parsed.port is not None: | |
| port = parsed.port | |
| else: | |
| # No explicit port specified in the worker URL; choose a standard default | |
| if parsed.scheme == "https": | |
| port = 443 | |
| else: | |
| port = 80 | |
| logger.warning( | |
| "No port specified in worker URL '%s'; defaulting to port %d based on scheme '%s'.", | |
| seed_url, | |
| port, | |
| parsed.scheme or "http", | |
| ) |
| remote_seed_instance, | ||
| ) |
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When using external rollout engines (args.rollout_external), the remote_seed_instance parameter may conflict with the external engine setup. External engines are pre-configured and launched separately, so they cannot load weights from a remote seed instance. This could lead to unexpected behavior or initialization failures. Consider adding a check to ensure remote_seed_instance is only used with non-external engines, or document this limitation.
| else: | ||
| logger.info(f"Engine at index {i} is already None") | ||
| self._rollout_manager.all_rollout_engines[i] = None |
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After killing failed engines and setting them to None in all_rollout_engines, there's a potential race condition where the health monitor could be checking other engines while the restart is in progress. If another engine fails during restart, both threads could modify all_rollout_engines concurrently without proper synchronization. Consider using rollout_engine_lock or another synchronization mechanism to protect the kill-and-restart operation.
slime/ray/rollout.py
Outdated
| return rollout_data_refs | ||
|
|
||
|
|
||
| def _get_active_seed_instance_for_init(args, all_rollout_engines): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can merge this function with get_active_seed_instance
| logger.info( | ||
| f"Engine {rank} will load weights from remote seed instance at " | ||
| f"{remote_seed_instance['ip']}:{remote_seed_instance['port']}" | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can tab this if into if args.use_fault_tolerance:.
slime/utils/health_monitor.py
Outdated
| server_host = ray.get(engine.get_server_host.remote()) | ||
| server_port = ray.get(engine.get_server_port.remote()) | ||
| killed_url = f"http://{server_host}:{server_port}" | ||
| killed_engine_urls.append(killed_url) | ||
| logger.info(f"Engine at index {i} has URL: {killed_url}") | ||
| except Exception as e: | ||
| logger.warning(f"Could not get URL for engine at index {i}: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
slime/utils/health_monitor.py
Outdated
|
|
||
| # Initialize the new engine(s) | ||
| init_handles = [engine.init.remote(**(addr_and_ports[rank])) for rank, engine in new_engines] | ||
| ray.get(init_handles) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should extract the common functionality with init_rollout_engines
bd95f6c to
def3b64
Compare
520bae2 to
b3acf21
Compare
No description provided.