Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 158 additions & 3 deletions src/network/nonce_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,161 @@ def _evaluate_routing_topology(self) -> None:

logger.error("CRITICAL FAILURE: Comprehensive Horizon node matrix completely unreachable. No healthy nodes found.")

def get_active_endpoint_url(self) -> str:
"""Returns the currently active, validated node URL for ledger submissions."""
return self.active_node.url

class RPCNodeFailoverSupervisor:
"""Proactive RPC node failover supervisor that monitors node connectivity.

It maintains a list of endpoints and runs a background thread to check their
latency and health using lightweight JSON-RPC requests. If the active node
experiences a latency drop or fails, the supervisor instantly shifts the
active traffic to the fastest available secondary node.

Complexity:
Time: O(1) for active endpoint lookup, O(N) for checking N endpoints.
Space: O(N) to store latency stats for N endpoints.
"""

def __init__(
self,
endpoints: Optional[List[str]] = None,
check_interval_sec: float = 2.0,
latency_threshold_ms: float = 500.0,
ping_timeout_sec: float = 1.0,
) -> None:
self.check_interval_sec = check_interval_sec
self.latency_threshold_ms = latency_threshold_ms
self.ping_timeout_sec = ping_timeout_sec

if endpoints is None:
primary = os.environ.get("RPC_URL")
fallbacks = os.environ.get("FALLBACK_RPC_URLS")
loaded = []
if primary:
loaded.append(primary.strip())
if fallbacks:
for f in fallbacks.split(","):
if f.strip():
loaded.append(f.strip())
if not loaded:
loaded = [
"https://rpc.testnet.stellar.org",
"https://rpc.mainnet.stellar.org",
]
self.endpoints = loaded
else:
self.endpoints = list(endpoints)

self._lock = threading.Lock()
self._active_endpoint = self.endpoints[0] if self.endpoints else ""
self._latencies: Dict[str, float] = {ep: 0.0 for ep in self.endpoints}
self._healthy_endpoints: set = set(self.endpoints)

self._stop_event = threading.Event()
self._monitor_thread: Optional[threading.Thread] = None

def start(self) -> None:
"""Start the background monitoring thread."""
with self._lock:
if self._monitor_thread is not None and self._monitor_thread.is_alive():
return
self._stop_event.clear()
self._monitor_thread = threading.Thread(
target=self._run_monitor,
name="RPCNodeFailoverSupervisor-Monitor",
daemon=True,
)
self._monitor_thread.start()
logger.info("[RPCNodeFailoverSupervisor] Started proactive background monitoring.")

def stop(self) -> None:
"""Stop the background monitoring thread."""
self._stop_event.set()
if self._monitor_thread is not None:
self._monitor_thread.join(timeout=1.0)
self._monitor_thread = None
logger.info("[RPCNodeFailoverSupervisor] Stopped background monitoring.")

def get_active_endpoint(self) -> str:
"""Return the currently selected active RPC endpoint."""
with self._lock:
return self._active_endpoint

def _ping_node(self, endpoint: str) -> Optional[float]:
"""Perform a fast, lightweight check on a single node and return its latency in ms."""
try:
start = time.time()
response = requests.post(
endpoint,
json={"jsonrpc": "2.0", "id": 1, "method": "getHealth"},
timeout=self.ping_timeout_sec,
)
latency_ms = (time.time() - start) * 1000.0
if response.status_code == 200:
data = response.json()
if "result" in data or "error" in data:
return latency_ms
return None
except Exception:
return None

def _run_monitor(self) -> None:
"""Main loop for the background monitoring thread."""
while not self._stop_event.is_set():
temp_latencies = {}
temp_healthy = set()

for ep in self.endpoints:
latency = self._ping_node(ep)
if latency is not None:
temp_latencies[ep] = latency
temp_healthy.add(ep)
else:
temp_latencies[ep] = float("inf")

with self._lock:
self._latencies.update(temp_latencies)
self._healthy_endpoints = temp_healthy

active_ok = False
active_latency = self._latencies.get(self._active_endpoint, float("inf"))

if (
self._active_endpoint in self._healthy_endpoints
and active_latency <= self.latency_threshold_ms
):
active_ok = True

if not active_ok:
best_endpoint = self._active_endpoint
best_latency = active_latency

for ep in self.endpoints:
ep_latency = self._latencies.get(ep, float("inf"))
if ep in self._healthy_endpoints and ep_latency < best_latency:
best_endpoint = ep
best_latency = ep_latency

if best_endpoint != self._active_endpoint:
logger.warning(
"[RPCNodeFailoverSupervisor] Shifted traffic from %s (latency: %.1fms) to %s (latency: %.1fms)",
self._active_endpoint,
active_latency,
best_endpoint,
best_latency,
)
self._active_endpoint = best_endpoint

self._stop_event.wait(self.check_interval_sec)


rpc_supervisor = RPCNodeFailoverSupervisor()


__all__ = [
"NonceTracker",
"NonceWindow",
"nonce_tracker",
"nonce_window",
"RPCNodeFailoverSupervisor",
"rpc_supervisor",
]
49 changes: 37 additions & 12 deletions src/network/rpc_client.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,62 @@
import logging
import requests
from typing import List, Dict, Any
from network.nonce_tracker import RPCNodeFailoverSupervisor

logger = logging.getLogger(__name__)


class FailoverRouter:
"""
Automated RPC Endpoint Switching Routine.
"""Automated RPC Endpoint Switching Routine.

Automatically switches data transmission paths to backup node endpoints
if a target fails to respond within a 3500ms window.
using a proactive RPC supervisor to avoid connection timeouts.
"""

def __init__(self, primary_endpoint: str, backup_endpoints: List[str]):
self.primary_endpoint = primary_endpoint
self.backup_endpoints = backup_endpoints
self.timeout_sec = 3.5 # 3500ms window
self.supervisor = RPCNodeFailoverSupervisor(
endpoints=[primary_endpoint] + backup_endpoints,
check_interval_sec=2.0,
latency_threshold_ms=500.0,
ping_timeout_sec=1.0,
)
self.supervisor.start()

def transmit(self, path: str, payload: Dict[str, Any]) -> Dict[str, Any]:
endpoints = [self.primary_endpoint] + self.backup_endpoints

active_url = self.supervisor.get_active_endpoint()
endpoints = [active_url] + [
ep for ep in self.supervisor.endpoints if ep != active_url
]

for url in endpoints:
target_url = f"{url.rstrip('/')}/{path.lstrip('/')}"
try:
response = requests.post(
target_url,
json=payload,
timeout=self.timeout_sec
target_url, json=payload, timeout=self.timeout_sec
)
response.raise_for_status()
return response.json()
except requests.exceptions.Timeout:
logger.warning(f"Node {target_url} timed out after {self.timeout_sec}s. Switching to backup.")
logger.warning(
f"Node {target_url} timed out after {self.timeout_sec}s. Switching to backup."
)
except requests.exceptions.RequestException as e:
logger.warning(f"Node {target_url} failed: {e}. Switching to backup.")

logger.warning(
f"Node {target_url} failed: {e}. Switching to backup."
)

raise ConnectionError("All RPC endpoints failed to respond.")

def close(self) -> None:
"""Stop the proactive supervisor thread."""
try:
self.supervisor.stop()
except Exception:
pass

def __del__(self) -> None:
self.close()

Loading