diff --git a/tools/harness/src/nv_ingest_harness/cases/e2e.py b/tools/harness/src/nv_ingest_harness/cases/e2e.py index 8e2eb4e4d..0e636b5fc 100644 --- a/tools/harness/src/nv_ingest_harness/cases/e2e.py +++ b/tools/harness/src/nv_ingest_harness/cases/e2e.py @@ -8,7 +8,14 @@ from nv_ingest_client.util.document_analysis import analyze_document_chunks from nv_ingest_client.util.milvus import nvingest_retrieval -from nv_ingest_harness.utils.interact import embed_info, kv_event_log, segment_results +from nv_ingest_harness.utils.interact import ( + embed_info, + get_embed_task_endpoint, + get_embedding_api_base, + is_embedding_endpoint_reachable, + kv_event_log, + segment_results, +) from nv_ingest_harness.utils.milvus import milvus_chunks from nv_ingest_harness.utils.pdf import pdf_page_count from nv_ingest_harness.utils.vdb import get_lancedb_path @@ -78,7 +85,8 @@ def main(config=None, log_path: str = "test_results") -> int: split_chunk_size = config.split_chunk_size split_chunk_overlap = config.split_chunk_overlap - model_name, dense_dim = embed_info() + model_name, dense_dim = embed_info(config.embedding_model) + embedding_endpoint = get_embedding_api_base(hostname) # Log configuration for transparency print("=== Test Configuration ===") @@ -181,7 +189,7 @@ def main(config=None, log_path: str = "test_results") -> int: ) # Embed (must come before storage per pipeline ordering) - ingestor = ingestor.embed(model_name=model_name) + ingestor = ingestor.embed(model_name=model_name, endpoint_url=get_embed_task_endpoint()) # Store images to disk (server-side image storage) - optional # Note: Supports both MinIO (s3://) and local disk (file://) via storage_uri @@ -275,32 +283,42 @@ def main(config=None, log_path: str = "test_results") -> int: "How many dollars does a power drill cost?", ] querying_start = time.time() - if vdb_backend == "lancedb": - try: - from nv_ingest_client.util.vdb.lancedb import LanceDB - except ImportError as exc: - print(f"Warning: LanceDB retrieval not available ({exc}). Skipping retrieval sanity check.") - else: - lancedb_client = LanceDB(uri=lancedb_path, table_name=collection_name, hybrid=hybrid) - _ = lancedb_client.retrieval( - queries, - hybrid=hybrid, - embedding_endpoint=f"http://{hostname}:8012/v1", - model_name=model_name, - top_k=5, - ) + if not is_embedding_endpoint_reachable(hostname): + print("Warning: Embedding endpoint is not reachable from host; skipping retrieval sanity check.") + retrieval_time = 0.0 else: - _ = nvingest_retrieval( - queries, - collection_name, - hybrid=sparse, - embedding_endpoint=f"http://{hostname}:8012/v1", - model_name=model_name, - top_k=5, - gpu_search=gpu_search, - nv_ranker=False, - ) - retrieval_time = time.time() - querying_start + if vdb_backend == "lancedb": + try: + from nv_ingest_client.util.vdb.lancedb import LanceDB + except ImportError as exc: + print(f"Warning: LanceDB retrieval not available ({exc}). Skipping retrieval sanity check.") + else: + lancedb_client = LanceDB(uri=lancedb_path, table_name=collection_name, hybrid=hybrid) + try: + _ = lancedb_client.retrieval( + queries, + hybrid=hybrid, + embedding_endpoint=embedding_endpoint, + model_name=model_name, + top_k=5, + ) + except Exception as exc: + print(f"Warning: LanceDB retrieval sanity check failed; continuing test run ({exc})") + else: + try: + _ = nvingest_retrieval( + queries, + collection_name, + hybrid=sparse, + embedding_endpoint=embedding_endpoint, + model_name=model_name, + top_k=5, + gpu_search=gpu_search, + nv_ranker=False, + ) + except Exception as exc: + print(f"Warning: Milvus retrieval sanity check failed; continuing test run ({exc})") + retrieval_time = time.time() - querying_start kv_event_log("retrieval_time_s", retrieval_time, log_path) # Summarize - Build comprehensive results dict diff --git a/tools/harness/src/nv_ingest_harness/cases/e2e_with_llm_summary.py b/tools/harness/src/nv_ingest_harness/cases/e2e_with_llm_summary.py index 66b6bfbcc..6503099d5 100644 --- a/tools/harness/src/nv_ingest_harness/cases/e2e_with_llm_summary.py +++ b/tools/harness/src/nv_ingest_harness/cases/e2e_with_llm_summary.py @@ -12,6 +12,9 @@ from nv_ingest_harness.utils.cases import get_repo_root from nv_ingest_harness.utils.interact import ( embed_info, + get_embed_task_endpoint, + get_embedding_api_base, + is_embedding_endpoint_reachable, segment_results, kv_event_log, ) # noqa: E402 @@ -84,7 +87,8 @@ def main(config=None, log_path: str = "test_results") -> int: print(f"Path to User-Defined Function: {str(udf_path)}") llm_model = config.llm_summarization_model - model_name, dense_dim = embed_info() + model_name, dense_dim = embed_info(config.embedding_model) + embedding_endpoint = get_embedding_api_base(hostname) # Log configuration for transparency print("=== Configuration ===") @@ -134,7 +138,7 @@ def main(config=None, log_path: str = "test_results") -> int: # Embed and upload (core pipeline) print("Uploading to collection:", collection_name) ingestor = ( - ingestor.embed(model_name=model_name) + ingestor.embed(model_name=model_name, endpoint_url=get_embed_task_endpoint()) .vdb_upload( collection_name=collection_name, dense_dim=dense_dim, @@ -190,17 +194,24 @@ def main(config=None, log_path: str = "test_results") -> int: "How many dollars does a power drill cost?", ] querying_start = time.time() - _ = nvingest_retrieval( - queries, - collection_name, - hybrid=sparse, - embedding_endpoint=f"http://{hostname}:8012/v1", - embedding_model_name=model_name, - model_name=model_name, - top_k=5, - gpu_search=gpu_search, - ) - kv_event_log("retrieval_time_s", time.time() - querying_start, log_path) + if not is_embedding_endpoint_reachable(hostname): + print("Warning: Embedding endpoint is not reachable from host; skipping retrieval sanity check.") + kv_event_log("retrieval_time_s", 0.0, log_path) + else: + try: + _ = nvingest_retrieval( + queries, + collection_name, + hybrid=sparse, + embedding_endpoint=embedding_endpoint, + embedding_model_name=model_name, + model_name=model_name, + top_k=5, + gpu_search=gpu_search, + ) + except Exception as exc: + print(f"Warning: retrieval sanity check failed; continuing test run ({exc})") + kv_event_log("retrieval_time_s", time.time() - querying_start, log_path) # Summarize dataset_name = os.path.basename(data_dir.rstrip("/")) if data_dir else "unknown" diff --git a/tools/harness/src/nv_ingest_harness/utils/interact.py b/tools/harness/src/nv_ingest_harness/utils/interact.py index 5a29f3bb5..7db95643b 100644 --- a/tools/harness/src/nv_ingest_harness/utils/interact.py +++ b/tools/harness/src/nv_ingest_harness/utils/interact.py @@ -3,6 +3,7 @@ import json import os import shutil +from urllib.parse import urlparse import time import zipfile import urllib.request @@ -20,7 +21,67 @@ def run_cmd(cmd: list[str], cwd: Path | None = None) -> int: return subprocess.call(cmd, cwd=cwd) +MODEL_DIMENSIONS = { + "nvidia/llama-3.2-nemoretriever-1b-vlm-embed-v1": 2048, + "nvidia/llama-3.2-nv-embedqa-1b-v2": 2048, + "nvidia/llama-3.2-nemoretriever-300m-embed-v1": 2048, + "nvidia/nv-embedqa-e5-v5": 1024, +} +DEFAULT_MODEL = "nvidia/llama-3.2-nv-embedqa-1b-v2" +DEFAULT_DIMENSION = 2048 + + +def _ensure_v1_base(url: str) -> str: + base = url.rstrip("/") + parsed = urlparse(base) + if parsed.path.endswith("/v1"): + return base + if parsed.path in ("", "/"): + return f"{base}/v1" + return base + + +def get_embedding_api_base(hostname: str = "localhost") -> str: + explicit = os.getenv("HARNESS_EMBEDDING_ENDPOINT") + if explicit: + return _ensure_v1_base(explicit) + + env_endpoint = os.getenv("EMBEDDING_NIM_ENDPOINT") + if env_endpoint: + parsed = urlparse(env_endpoint) + endpoint_host = (parsed.hostname or "").lower() + host_mode = hostname in {"localhost", "127.0.0.1"} + if not (host_mode and endpoint_host in {"embedding"}): + return _ensure_v1_base(env_endpoint) + + return f"http://{hostname}:8012/v1" + + +def get_embedding_models_url(hostname: str = "localhost") -> str: + return f"{get_embedding_api_base(hostname).rstrip('/')}/models" + + +def get_embedding_health_url(hostname: str = "localhost") -> str: + return f"{get_embedding_api_base(hostname).rstrip('/')}/health/ready" + + +def is_embedding_endpoint_reachable(hostname: str = "localhost", timeout_s: float = 1.5) -> bool: + try: + with urllib.request.urlopen(get_embedding_health_url(hostname), timeout=timeout_s) as response: + return response.status == 200 + except Exception: + return False + + +def get_embed_task_endpoint() -> str: + env_endpoint = os.getenv("EMBEDDING_NIM_ENDPOINT") + if env_endpoint and env_endpoint.strip(): + return _ensure_v1_base(env_endpoint.strip()) + return "http://embedding:8000/v1" + + def embed_info( + preferred_model: str | None = None, max_retries: int = 5, initial_backoff: float = 1.0, backoff_multiplier: float = 2.0, @@ -41,19 +102,14 @@ def embed_info( tuple: A tuple containing (model_name: str, embedding_dimension: int). Returns a default model if the embedding service is not available after retries. """ - # Model name to embedding dimension mapping - MODEL_DIMENSIONS = { - "nvidia/llama-3.2-nemoretriever-1b-vlm-embed-v1": 2048, - "nvidia/llama-3.2-nv-embedqa-1b-v2": 2048, - "nvidia/llama-3.2-nemoretriever-300m-embed-v1": 2048, - "nvidia/nv-embedqa-e5-v5": 1024, - } - - # Default model - DEFAULT_MODEL = "nvidia/nv-embedqa-e5-v5" - DEFAULT_DIMENSION = 1024 - - url = "http://localhost:8012/v1/models" + if preferred_model and preferred_model.lower() != "auto": + return preferred_model, MODEL_DIMENSIONS.get(preferred_model, DEFAULT_DIMENSION) + + env_model = os.getenv("EMBEDDING_NIM_MODEL_NAME") + if env_model: + return env_model, MODEL_DIMENSIONS.get(env_model, DEFAULT_DIMENSION) + + url = get_embedding_models_url("localhost") # Try to fetch model info from embedding service API with retry/backoff for attempt in range(max_retries): diff --git a/tools/harness/src/nv_ingest_harness/utils/recall.py b/tools/harness/src/nv_ingest_harness/utils/recall.py index d5b20d863..d6689019c 100644 --- a/tools/harness/src/nv_ingest_harness/utils/recall.py +++ b/tools/harness/src/nv_ingest_harness/utils/recall.py @@ -15,6 +15,7 @@ from nv_ingest_client.util.milvus import nvingest_retrieval from nv_ingest_harness.utils.cases import get_repo_root +from nv_ingest_harness.utils.interact import get_embedding_api_base def _get_retrieval_func( @@ -123,7 +124,7 @@ def get_recall_scores( if vdb_backend == "lancedb": batch_answers = retrieval_func( batch_queries, - embedding_endpoint=f"http://{hostname}:8012/v1", + embedding_endpoint=get_embedding_api_base(hostname), model_name=model_name, top_k=top_k, nv_ranker=nv_ranker, @@ -135,7 +136,7 @@ def get_recall_scores( batch_queries, collection_name, hybrid=sparse, - embedding_endpoint=f"http://{hostname}:8012/v1", + embedding_endpoint=get_embedding_api_base(hostname), model_name=model_name, top_k=top_k, gpu_search=gpu_search, @@ -242,7 +243,7 @@ def get_recall_scores_pdf_only( if vdb_backend == "lancedb": batch_answers = retrieval_func( batch_queries, - embedding_endpoint=f"http://{hostname}:8012/v1", + embedding_endpoint=get_embedding_api_base(hostname), model_name=model_name, top_k=top_k, nv_ranker=nv_ranker, @@ -254,7 +255,7 @@ def get_recall_scores_pdf_only( batch_queries, collection_name, hybrid=sparse, - embedding_endpoint=f"http://{hostname}:8012/v1", + embedding_endpoint=get_embedding_api_base(hostname), model_name=model_name, top_k=top_k, gpu_search=gpu_search,