Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import os
import shutil
import tempfile
from datetime import datetime

import pytest

Expand Down Expand Up @@ -154,10 +155,14 @@ def predownload_tokenizers(pytestconfig):

@pytest.fixture(autouse=True)
def logger(request):
log_path = os.path.join(request.node.name, "test.log.txt")
timestamp = datetime.now().strftime("%m-%d-%Y_%H-%M-%S")
log_dir = f"{request.node.name}_{timestamp}"
request.node.log_dir = log_dir
log_path = os.path.join(log_dir, "test.log.txt")

logger = logging.getLogger()
shutil.rmtree(request.node.name, ignore_errors=True)
os.makedirs(request.node.name, exist_ok=True)
shutil.rmtree(log_dir, ignore_errors=True)
os.makedirs(log_dir, exist_ok=True)
handler = logging.FileHandler(log_path, mode="w")
formatter = logging.Formatter(LOG_FORMAT, datefmt=DATE_FORMAT)
handler.setFormatter(formatter)
Expand Down
123 changes: 101 additions & 22 deletions tests/fault_tolerance/deploy/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import json
import logging
import os
import signal
import subprocess
import time
from pathlib import Path
Expand Down Expand Up @@ -60,7 +61,7 @@ def get_frontend_port(
Returns:
Tuple of (pod_name, local_port, pod_instance) or (None, None, None) if failed
"""
pods = managed_deployment.get_pods(managed_deployment.frontend_service_name)
pods = managed_deployment.get_pods([managed_deployment.frontend_service_name])

port = 0
pod_name = None
Expand Down Expand Up @@ -270,6 +271,7 @@ def run_aiperf(
logger: logging.Logger,
max_retries: int = 1,
retry_delay: float = 1,
continuous_load: bool = False,
) -> bool:
"""
Execute AI-Perf with specified parameters.
Expand All @@ -280,13 +282,14 @@ def run_aiperf(
model: Model name
pod_name: Selected pod name for logging
port: Local port number
requests_per_client: Number of requests to send
requests_per_client: Number of requests to send (used if continuous load not enabled)
input_token_length: Input token count
output_token_length: Output token count
output_dir: Directory for AI-Perf artifacts
logger: Logger instance
max_retries: Maximum number of retry attempts (default: 1)
retry_delay: Delay in seconds between retries (default: 1)
continuous_load: If True, use continuous load instead of fixed request count

Returns:
True if successful, False otherwise
Expand Down Expand Up @@ -315,8 +318,6 @@ def run_aiperf(
# Enable streaming for TTFT and ITL metrics
"--streaming",
# Request parameters
"--request-count",
str(requests_per_client), # Required: how many requests
"--concurrency",
"1", # Optional: we set to 1 for sequential
# Token configuration
Expand All @@ -338,8 +339,13 @@ def run_aiperf(
"100", # For reproducible results
]

# Calculate timeout (same as legacy would for all requests)
timeout = max(requests_per_client * 2 + 60, 300) # At least 5 minutes
if continuous_load:
cmd.extend(["--benchmark-duration", "1800"]) # 30 minutes for continuous load
logger.info("Using continuous load with duration: 30 minutes")
timeout = 1860 # 31 minutes default for duration-based tests (30 minutes + 1 minute buffer)
else:
cmd.extend(["--request-count", str(requests_per_client)])
timeout = max(requests_per_client * 2 + 60, 300) # At least 5 minutes

# Log execution
logger.info(f"Starting AI-Perf for Pod {pod_name} Local Port {port}")
Expand All @@ -354,15 +360,20 @@ def run_aiperf(
logger.info(f"Command: {' '.join(cmd)}")

# Retry logic for fault tolerance - retry FULL request count until success

max_attempts = max_retries if max_retries > 0 else 1
# Note: For continuous load, we only run once and expect SIGINT to stop it
max_attempts = 1 if continuous_load else (max_retries if max_retries > 0 else 1)
success = False
all_results = []

for attempt in range(max_attempts):
logger.info(
f"AI-Perf attempt {attempt + 1}/{max_attempts} with {requests_per_client} requests"
)
if continuous_load:
logger.info(
"AI-Perf continuous load (will run until interrupted by SIGINT)"
)
else:
logger.info(
f"AI-Perf attempt {attempt + 1}/{max_attempts} with {requests_per_client} requests"
)

# Update output directory for this attempt
attempt_dir = output_dir / f"attempt_{attempt}"
Expand All @@ -374,13 +385,7 @@ def run_aiperf(
cmd_attempt[artifact_dir_idx] = str(attempt_dir)

try:
result = subprocess.run(
cmd_attempt,
capture_output=True,
text=True,
timeout=timeout,
stdin=subprocess.DEVNULL, # Prevent stdin reading which can cause process suspension
)
result = run_aiperf_with_signal_handling(cmd_attempt, logger, timeout)

# Save logs for this attempt
with open(attempt_dir / "genai_perf.log", "w") as f:
Expand All @@ -398,6 +403,7 @@ def run_aiperf(
}
)

# Even with continuous load, with SIGINT, aiperf should return 0 and create the profile_export_aiperf.json file
if result.returncode == 0:
# AI-Perf returns 0 even if all requests failed, so we need to check the output
json_path = attempt_dir / "profile_export_aiperf.json"
Expand All @@ -412,6 +418,19 @@ def run_aiperf(
)
if success:
break # Success - exit the retry loop
## TODO: bug with aiperf git+https://github.com/ai-dynamo/aiperf.git@4d3fa29403c8f75da22a14f1f7b3aeb27db9288f
## where sending a SIGINT on Mac can sometimes have an error code of -9 (SIGABRT) which results in profile_export_aiperf.json not being created
elif result.returncode == -9 and continuous_load:
logger.warning(
f"""
Attempt {attempt + 1} failed with return code {result.returncode}
This is a known bug with aiperf on Mac where sending a SIGINT can sometimes have an error code of -9 (SIGABRT)
which results in profile_export_aiperf.json not being created
"""
)
logger.debug(
f"Stderr: {result.stderr[:500] if result.stderr else 'No stderr'}"
)
else:
logger.warning(
f"Attempt {attempt + 1} failed with return code {result.returncode}"
Expand All @@ -423,20 +442,77 @@ def run_aiperf(
logger.error(f"Error in attempt {attempt + 1}: {str(e)}")
all_results.append({"attempt": attempt + 1, "error": str(e)})

# Sleep before next attempt (if not the last attempt)
if not success and attempt < max_attempts - 1:
# Sleep before next attempt (if not the last attempt and not continuous load)
if not success and attempt < max_attempts - 1 and not continuous_load:
time.sleep(retry_delay)

if success:
if success and not continuous_load:
logger.info(
f"AI-Perf successfully completed all {requests_per_client} requests for {pod_name}"
)
elif success and continuous_load:
logger.info(
f"AI-Perf sustained continuous load for {pod_name} and existed succesfully"
)
else:
logger.error(f"AI-Perf failed all {max_attempts} attempts for {pod_name}")

return success


def run_aiperf_with_signal_handling(
cmd_attempt: List[str],
logger: logging.Logger,
timeout: int,
) -> subprocess.CompletedProcess:
"""
Run aiperf with signal handling for graceful shutdown.

Handles SIGINT forwarding and timeout when running with subprocess.Popen.
This ensures that Ctrl-C and SIGINT are properly forwarded to the subprocess
so it can clean up gracefully and write results files.
"""
proc = subprocess.Popen(
cmd_attempt,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
stdin=subprocess.DEVNULL,
)

# Set up signal handler to forward SIGINT to subprocess
def signal_handler(signum, frame):
logger.info(f"Received signal {signum}, forwarding to aiperf subprocess")
try:
proc.send_signal(signal.SIGINT)
except ProcessLookupError:
pass # Process already terminated

signal.signal(signal.SIGINT, signal_handler)

try:
stdout, stderr = proc.communicate(timeout=timeout)
returncode = proc.returncode
except subprocess.TimeoutExpired:
logger.warning(f"AI-Perf subprocess timed out after {timeout}s")
proc.kill()
stdout, stderr = proc.communicate()
returncode = proc.returncode
except KeyboardInterrupt:
logger.info("Received KeyboardInterrupt, sending SIGINT to aiperf subprocess")
proc.send_signal(signal.SIGINT)
try:
stdout, stderr = proc.communicate(timeout=30) # Give it time to clean up
returncode = proc.returncode
except subprocess.TimeoutExpired:
logger.warning("Subprocess didn't terminate gracefully, killing it")
proc.kill()
stdout, stderr = proc.communicate()
returncode = proc.returncode

return subprocess.CompletedProcess(cmd_attempt, returncode, stdout, stderr)


def log_summary_metrics(
output_dir: Path, logger: logging.Logger, pod_name: str, port: int
) -> None:
Expand Down Expand Up @@ -513,6 +589,7 @@ def client(
output_token_length: int,
max_retries: int,
retry_delay: float = 1,
continuous_load: bool = False,
):
"""
Generate load using AI-Perf for fault tolerance testing.
Expand All @@ -527,11 +604,12 @@ def client(
model: Model name
log_dir: Directory for output logs and AI-Perf artifacts
index: Client index used for round-robin pod selection
requests_per_client: Number of requests to generate
requests_per_client: Number of requests to generate (used if continuous load not enabled)
input_token_length: Number of input tokens per request
output_token_length: Number of output tokens per request
max_retries: Maximum retry attempts for AI-Perf execution
retry_delay: Delay in seconds between retry attempts
continuous_load: If True, use continuous load instead of fixed request count
"""
logger = logging.getLogger(f"CLIENT: {index}")
logging.getLogger("httpx").setLevel(logging.WARNING)
Expand Down Expand Up @@ -578,6 +656,7 @@ def client(
logger=logger,
max_retries=max_retries,
retry_delay=retry_delay,
continuous_load=continuous_load,
)

if not success:
Expand Down
1 change: 1 addition & 0 deletions tests/fault_tolerance/deploy/client_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def get_client_function(client_type: str) -> Callable:
output_token_length,
max_retries,
retry_delay_or_rate, # Differs between implementations
continuous_load,
)

Raises:
Expand Down
13 changes: 13 additions & 0 deletions tests/fault_tolerance/deploy/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ def pytest_addoption(parser):
help="Include tests that require custom builds (e.g., MoE models). "
"By default, these tests are excluded.",
)
parser.addoption(
"--skip-restart-services",
action="store_true",
default=False,
help="Skip restarting NATS and etcd services before deployment. "
"By default, these services are restarted.",
)


def pytest_generate_tests(metafunc):
Expand Down Expand Up @@ -109,3 +116,9 @@ def namespace(request):
def client_type(request):
"""Get client type from command line or use scenario default."""
return request.config.getoption("--client-type")


@pytest.fixture
def skip_restart_services(request):
"""Get skip restart services flag from command line."""
return request.config.getoption("--skip-restart-services")
7 changes: 6 additions & 1 deletion tests/fault_tolerance/deploy/legacy_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ def client(
max_retries,
max_request_rate,
retry_delay=1,
continuous_load=False,
):
"""Legacy custom client for fault tolerance testing.

Expand All @@ -211,7 +212,11 @@ def client(
max_retries: Maximum retry attempts per request
max_request_rate: Maximum requests per second (for rate limiting)
retry_delay: Delay in seconds between retries
continuous_load: If True, use continuous load instead of fixed request count
"""
if continuous_load:
raise ValueError("Continuous load is not supported for legacy client")

logger = logging.getLogger(f"CLIENT: {index}")
logging.getLogger("httpx").setLevel(logging.WARNING)

Expand All @@ -228,7 +233,7 @@ def client(
for i in range(requests_per_client):
# Get available pods
pods = managed_deployment.get_pods(
managed_deployment.frontend_service_name
[managed_deployment.frontend_service_name]
)
port = 0
pod_name = None
Expand Down
31 changes: 23 additions & 8 deletions tests/fault_tolerance/deploy/parse_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ def parse_aiperf_client_results(log_dir: str) -> Dict[str, Any]:
Returns:
Dictionary with aggregated metrics and client count
"""
logger = logging.getLogger(__name__)
all_metrics: Dict[str, Any] = {
"total_requests": 0,
"successful_requests": 0,
Expand Down Expand Up @@ -382,22 +383,28 @@ def parse_aiperf_client_results(log_dir: str) -> Dict[str, Any]:
with open(profile_json) as f:
client_metrics = json.load(f)

# AI-Perf format has "records" dictionary at the top level
# AI-Perf format can have "records" dictionary or metrics at top level
# Try records first (older format), then fall back to top level (newer format)
records = client_metrics.get("records", {})

# Extract successful request count
request_count_record = records.get("request_count", {})
# Extract successful request count - check both locations
request_count_record = records.get(
"request_count"
) or client_metrics.get("request_count", {})
successful_count = (
int(request_count_record.get("avg", 0))
if request_count_record
if request_count_record and isinstance(request_count_record, dict)
else 0
)

# Extract error request count
error_request_count_record = records.get("error_request_count", {})
# Extract error request count - check both locations
error_request_count_record = records.get(
"error_request_count"
) or client_metrics.get("error_request_count", {})
error_request_count = (
int(error_request_count_record.get("avg", 0))
if error_request_count_record
and isinstance(error_request_count_record, dict)
else 0
)

Expand All @@ -418,9 +425,17 @@ def parse_aiperf_client_results(log_dir: str) -> Dict[str, Any]:
# Sum up actual error counts from each error type
error_count = sum(error.get("count", 0) for error in error_summary)

# Check if test was cancelled
# Log if test was cancelled (expected for continuous load mode)
if client_metrics.get("was_cancelled", False):
error_count = request_count # Mark all as failed if cancelled
logger.info(
f"AI-Perf client {item} was cancelled - anticipated if running with continuous load mode. "
f"Completed {request_count} requests before cancellation."
)

# Note: If test was cancelled (was_cancelled=True), we still count the requests
# that were successfully completed before cancellation. The request_count
# represents successful requests, and error_count represents actual errors.
# We don't mark cancelled requests as failed - they were just interrupted.

# Validate data consistency
if request_count < error_count:
Expand Down
Loading
Loading