Skip to content

Commit 35cc7d6

Browse files
committed
add task scheduler
1 parent f13fbac commit 35cc7d6

16 files changed

Lines changed: 854 additions & 56 deletions

File tree

rock/admin/main.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818
from rock.admin.entrypoints.sandbox_proxy_api import sandbox_proxy_router, set_sandbox_proxy_service
1919
from rock.admin.entrypoints.warmup_api import set_warmup_service, warmup_router
2020
from rock.admin.gem.api import gem_router, set_env_service
21+
from rock.admin.scheduler.scheduler import SchedulerProcess
2122
from rock.config import RockConfig
2223
from rock.logger import init_logger
2324
from rock.sandbox.gem_manager import GemManager
2425
from rock.sandbox.service.sandbox_proxy_service import SandboxProxyService
2526
from rock.sandbox.service.warmup_service import WarmupService
2627
from rock.utils import EAGLE_EYE_TRACE_ID, sandbox_id_ctx_var, trace_id_ctx_var
2728
from rock.utils.providers import RedisProvider
29+
from rock.utils.system import is_primary_pod
2830

2931
parser = argparse.ArgumentParser()
3032
parser.add_argument("--env", type=str, default="local")
@@ -59,6 +61,9 @@ async def lifespan(app: FastAPI):
5961
)
6062
await redis_provider.init_pool()
6163

64+
# init scheduler process
65+
scheduler_process = None
66+
6267
# init sandbox service
6368
if args.role == "admin":
6469
# init ray service
@@ -88,6 +93,17 @@ async def lifespan(app: FastAPI):
8893
set_warmup_service(warmup_service)
8994
set_env_service(sandbox_manager)
9095

96+
if rock_config.scheduler.enabled and is_primary_pod():
97+
scheduler_process = SchedulerProcess(
98+
scheduler_config=rock_config.scheduler,
99+
ray_address=rock_config.ray.address,
100+
ray_namespace=rock_config.ray.namespace,
101+
)
102+
scheduler_process.start()
103+
logger.info("Scheduler process started on primary pod")
104+
elif rock_config.scheduler.enabled:
105+
logger.info("Scheduler process skipped on non-primary pod")
106+
91107
else:
92108
sandbox_manager = SandboxProxyService(rock_config=rock_config, redis_provider=redis_provider)
93109
set_sandbox_proxy_service(sandbox_manager)
@@ -96,6 +112,11 @@ async def lifespan(app: FastAPI):
96112

97113
yield
98114

115+
# 停止调度器进程
116+
if scheduler_process:
117+
scheduler_process.stop()
118+
logger.info("Scheduler process stopped")
119+
99120
if redis_provider:
100121
await redis_provider.close_pool()
101122

rock/admin/scheduler/__init__.py

Whitespace-only changes.

rock/admin/scheduler/scheduler.py

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
# rock/admin/scheduler/scheduler.py
2+
import asyncio
3+
import multiprocessing as mp
4+
import signal
5+
import time
6+
from multiprocessing import Process
7+
8+
import ray
9+
from apscheduler.schedulers.asyncio import AsyncIOScheduler
10+
11+
from rock.admin.scheduler.task_base import BaseTask
12+
from rock.admin.scheduler.task_registry import TaskRegistry
13+
from rock.common.constants import SCHEDULER_LOG_NAME
14+
from rock.config import SchedulerConfig
15+
from rock.logger import init_logger
16+
17+
logger = init_logger(name="scheduler", file_name=SCHEDULER_LOG_NAME)
18+
19+
20+
# Worker IP cache (module-level, used in subprocess)
21+
_worker_cache_ips: list[str] = []
22+
_worker_cache_time: float = 0.0
23+
_worker_cache_ttl: int = 60
24+
25+
26+
def _get_alive_workers(ray_address: str, ray_namespace: str, force_refresh: bool = False) -> list[str]:
27+
"""Get alive worker IPs from Ray cluster with caching."""
28+
global _worker_cache_ips, _worker_cache_time
29+
30+
try:
31+
current_time = time.time()
32+
cache_expired = (current_time - _worker_cache_time) > _worker_cache_ttl
33+
34+
if force_refresh or cache_expired or not _worker_cache_ips:
35+
logger.info("Refreshing worker IP cache from Ray cluster")
36+
37+
# Track if connection was initialized by this call
38+
should_shutdown = False
39+
if not ray.is_initialized():
40+
logger.info(f"Ray start init with address[{ray_address}] and namespace[{ray_namespace}]")
41+
ray.init(address=ray_address, namespace=ray_namespace)
42+
should_shutdown = True
43+
else:
44+
logger.info("Ray has already inintialized")
45+
46+
try:
47+
nodes = ray.nodes()
48+
alive_ips = []
49+
for node in nodes:
50+
if node.get("Alive", False) and node.get("Resources", {}).get("CPU", 0) > 0:
51+
ip = node.get("NodeManagerAddress", "").split(":")[0]
52+
if ip:
53+
alive_ips.append(ip)
54+
55+
_worker_cache_ips = alive_ips
56+
_worker_cache_time = current_time
57+
logger.info(f"Worker cache updated, found {len(_worker_cache_ips)} workers")
58+
finally:
59+
# Disconnect if initialized by this call to reduce Ray head load
60+
if should_shutdown:
61+
ray.shutdown()
62+
logger.debug("Ray connection closed after fetching worker IPs")
63+
64+
return _worker_cache_ips
65+
except Exception as e:
66+
logger.error(f"Failed to get alive workers: {e}")
67+
# Ensure connection cleanup on exception
68+
if ray.is_initialized():
69+
try:
70+
ray.shutdown()
71+
except Exception:
72+
pass
73+
return _worker_cache_ips if _worker_cache_ips else []
74+
75+
76+
async def _execute_task(task: BaseTask, ray_address: str, ray_namespace: str):
77+
"""Execute a single task."""
78+
try:
79+
worker_ips = _get_alive_workers(ray_address, ray_namespace)
80+
if worker_ips:
81+
logger.info(f"Running task '{task.name}' on {len(worker_ips)} workers")
82+
await task.run(worker_ips)
83+
else:
84+
logger.warning(f"No alive workers found for task '{task.name}'")
85+
except Exception as e:
86+
logger.error(f"Task '{task.name}' failed: {e}")
87+
88+
89+
async def _run_scheduler_async(
90+
scheduler_config: SchedulerConfig,
91+
ray_address: str,
92+
ray_namespace: str,
93+
):
94+
"""Core async logic for running the scheduler."""
95+
global _worker_cache_ttl
96+
_worker_cache_ttl = scheduler_config.worker_cache_ttl
97+
98+
scheduler = AsyncIOScheduler()
99+
100+
# Register tasks in subprocess
101+
from rock.admin.scheduler.task_factory import TaskFactory
102+
103+
TaskFactory.register_all_tasks(scheduler_config)
104+
105+
# Add all tasks to scheduler
106+
for task in TaskRegistry.get_all_tasks().values():
107+
scheduler.add_job(
108+
_execute_task,
109+
trigger="interval",
110+
seconds=task.interval_seconds,
111+
args=[task, ray_address, ray_namespace],
112+
id=task.name,
113+
name=task.name,
114+
replace_existing=True,
115+
)
116+
logger.info(f"Added job '{task.name}' with interval {task.interval_seconds}s")
117+
118+
# Start scheduler in event loop
119+
scheduler.start()
120+
logger.info("Scheduler started")
121+
122+
# Create event for graceful shutdown
123+
stop_event = asyncio.Event()
124+
125+
def signal_handler(signum, frame):
126+
logger.info("Received signal, shutting down scheduler")
127+
stop_event.set()
128+
129+
signal.signal(signal.SIGTERM, signal_handler)
130+
signal.signal(signal.SIGINT, signal_handler)
131+
132+
try:
133+
# Wait for stop signal
134+
await stop_event.wait()
135+
except (KeyboardInterrupt, SystemExit):
136+
pass
137+
finally:
138+
scheduler.shutdown(wait=False)
139+
logger.info("Scheduler stopped")
140+
141+
142+
def _run_scheduler_in_process(
143+
scheduler_config: SchedulerConfig,
144+
ray_address: str,
145+
ray_namespace: str,
146+
):
147+
"""Run scheduler in a separate process."""
148+
try:
149+
asyncio.run(_run_scheduler_async(scheduler_config, ray_address, ray_namespace))
150+
except (KeyboardInterrupt, SystemExit):
151+
logger.info("Scheduler process interrupted")
152+
153+
154+
class SchedulerProcess:
155+
"""Scheduler process manager - runs APScheduler in a separate process."""
156+
157+
def __init__(self, scheduler_config: SchedulerConfig, ray_address: str, ray_namespace: str):
158+
self.scheduler_config = scheduler_config
159+
self.ray_address = ray_address
160+
self.ray_namespace = ray_namespace
161+
self._process: Process | None = None
162+
# Use spawn to avoid inheriting Ray connection state from parent
163+
self._ctx = mp.get_context("spawn")
164+
165+
def start(self):
166+
"""Start the scheduler process."""
167+
if self._process and self._process.is_alive():
168+
logger.warning("Scheduler process is already running")
169+
return
170+
171+
# Use spawn context so subprocess won't inherit parent's Ray state
172+
self._process = self._ctx.Process(
173+
target=_run_scheduler_in_process,
174+
args=(self.scheduler_config, self.ray_address, self.ray_namespace),
175+
daemon=True,
176+
)
177+
self._process.start()
178+
logger.info(f"Scheduler process started with PID: {self._process.pid}")
179+
180+
def stop(self):
181+
"""Stop the scheduler process."""
182+
if self._process and self._process.is_alive():
183+
self._process.terminate()
184+
self._process.join(timeout=5)
185+
if self._process.is_alive():
186+
self._process.kill()
187+
self._process.join(timeout=2)
188+
logger.info("Scheduler process stopped")
189+
190+
def is_alive(self) -> bool:
191+
"""Check if the scheduler process is alive."""
192+
return self._process is not None and self._process.is_alive()

0 commit comments

Comments
 (0)