diff --git a/src/aiperf/common/enums/metric_enums.py b/src/aiperf/common/enums/metric_enums.py index b585a2c90..69b3db5ea 100644 --- a/src/aiperf/common/enums/metric_enums.py +++ b/src/aiperf/common/enums/metric_enums.py @@ -188,6 +188,10 @@ class GenericMetricUnit(BaseMetricUnit): RATIO = _unit("ratio") USER = _unit("user") PERCENT = _unit("%") + IMAGE = _unit("image") + IMAGES = _unit("images") + VIDEO = _unit("video") + VIDEOS = _unit("videos") class PowerMetricUnitInfo(BaseMetricUnitInfo): @@ -289,7 +293,11 @@ class MetricOverTimeUnitInfo(BaseMetricUnitInfo): @model_validator(mode="after") def _set_tag(self: Self) -> Self: """Set the tag based on the existing units. ie. requests/sec, tokens/sec, etc.""" - self.tag = f"{self.primary_unit}/{self.time_unit}" + self.tag = ( + f"{self.primary_unit}/{self.time_unit}" + if not self.inverted + else f"{self.time_unit}/{self.primary_unit}" + ) if self.third_unit: # If there is a third unit, add it to the tag. ie. tokens/sec/user self.tag += f"/{self.third_unit}" @@ -302,6 +310,7 @@ def _set_tag(self: Self) -> Self: primary_unit: "MetricUnitT" time_unit: MetricTimeUnit | MetricTimeUnitInfo third_unit: "MetricUnitT | None" = None + inverted: bool = False def convert_to(self, other_unit: "MetricUnitT", value: int | float) -> float: """Convert a value from this unit to another unit.""" @@ -342,6 +351,24 @@ class MetricOverTimeUnit(BaseMetricUnit): time_unit=MetricTimeUnit.SECONDS, third_unit=GenericMetricUnit.USER, ) + IMAGES_PER_SECOND = MetricOverTimeUnitInfo( + primary_unit=GenericMetricUnit.IMAGES, + time_unit=MetricTimeUnit.SECONDS, + ) + MS_PER_IMAGE = MetricOverTimeUnitInfo( + time_unit=MetricTimeUnit.MILLISECONDS, + primary_unit=GenericMetricUnit.IMAGE, + inverted=True, + ) + VIDEOS_PER_SECOND = MetricOverTimeUnitInfo( + primary_unit=GenericMetricUnit.VIDEOS, + time_unit=MetricTimeUnit.SECONDS, + ) + MS_PER_VIDEO = MetricOverTimeUnitInfo( + time_unit=MetricTimeUnit.MILLISECONDS, + primary_unit=GenericMetricUnit.VIDEO, + inverted=True, + ) @cached_property def info(self) -> MetricOverTimeUnitInfo: @@ -363,6 +390,11 @@ def third_unit(self) -> "MetricUnitT | None": """Get the third unit (if applicable).""" return self.info.third_unit + @cached_property + def inverted(self) -> bool: + """Whether the metric is inverted (e.g. time / metric).""" + return self.info.inverted + class MetricType(CaseInsensitiveStrEnum): """Defines the possible types of metrics.""" @@ -643,6 +675,9 @@ class MetricFlags(Flag): TOKENIZES_INPUT_ONLY = 1 << 12 """Metrics that are only applicable when the endpoint tokenizes input text.""" + SUPPORTS_VIDEO_ONLY = 1 << 13 + """Metrics that are only applicable to video-based endpoints.""" + def has_flags(self, flags: "MetricFlags") -> bool: """Return True if the metric has ALL of the given flag(s) (regardless of other flags).""" # Bitwise AND will return the input flags only if all of the given flags are present. diff --git a/src/aiperf/common/enums/plugin_enums.py b/src/aiperf/common/enums/plugin_enums.py index 0264fd07f..33e3f4c4e 100644 --- a/src/aiperf/common/enums/plugin_enums.py +++ b/src/aiperf/common/enums/plugin_enums.py @@ -32,6 +32,7 @@ class EndpointType(CaseInsensitiveStrEnum): NIM_RANKINGS = "nim_rankings" SOLIDO_RAG = "solido_rag" TEMPLATE = "template" + IMAGE_RETRIEVAL = "image_retrieval" class TransportType(CaseInsensitiveStrEnum): diff --git a/src/aiperf/common/models/__init__.py b/src/aiperf/common/models/__init__.py index 9ca8d8a49..33dd50e88 100644 --- a/src/aiperf/common/models/__init__.py +++ b/src/aiperf/common/models/__init__.py @@ -73,6 +73,7 @@ BaseInferenceServerResponse, BaseResponseData, EmbeddingResponseData, + ImageRetrievalResponseData, MetricRecordInfo, MetricRecordMetadata, MetricResult, @@ -149,6 +150,7 @@ "GpuTelemetrySnapshot", "IOCounters", "Image", + "ImageRetrievalResponseData", "InputsFile", "JsonExportData", "JsonMetricResult", diff --git a/src/aiperf/common/models/record_models.py b/src/aiperf/common/models/record_models.py index 5a5b562f6..e738239d5 100644 --- a/src/aiperf/common/models/record_models.py +++ b/src/aiperf/common/models/record_models.py @@ -625,6 +625,18 @@ class RankingsResponseData(BaseResponseData): ) +class ImageRetrievalResponseData(BaseResponseData): + """Parsed image retrieval response data.""" + + data: list[dict[str, Any]] = Field( + ..., description="The image retrieval data from the response." + ) + + def get_text(self) -> str: + """Get the text of the response (empty for image retrieval).""" + return "" + + class ParsedResponse(AIPerfBaseModel): """Parsed response from a inference client.""" @@ -636,6 +648,7 @@ class ParsedResponse(AIPerfBaseModel): | TextResponseData | EmbeddingResponseData | RankingsResponseData + | ImageRetrievalResponseData | BaseResponseData | None ] = Field( diff --git a/src/aiperf/dataset/__init__.py b/src/aiperf/dataset/__init__.py index 3819a4dfd..8363251fd 100644 --- a/src/aiperf/dataset/__init__.py +++ b/src/aiperf/dataset/__init__.py @@ -46,8 +46,12 @@ ) from aiperf.dataset.utils import ( check_file_exists, + encode_audio, encode_image, + encode_video, + open_audio, open_image, + open_video, ) __all__ = [ @@ -84,7 +88,11 @@ "SyntheticDatasetComposer", "VideoGenerator", "check_file_exists", + "encode_audio", "encode_image", + "encode_video", "main", + "open_audio", "open_image", + "open_video", ] diff --git a/src/aiperf/dataset/dataset_manager.py b/src/aiperf/dataset/dataset_manager.py index b0cbc4a5e..fc7bbe4be 100644 --- a/src/aiperf/dataset/dataset_manager.py +++ b/src/aiperf/dataset/dataset_manager.py @@ -14,6 +14,7 @@ CommAddress, CommandType, ComposerType, + CustomDatasetType, MessageType, ServiceType, ) @@ -21,6 +22,7 @@ from aiperf.common.factories import ( ComposerFactory, DatasetSamplingStrategyFactory, + EndpointFactory, ServiceFactory, ) from aiperf.common.hooks import on_command, on_request @@ -82,11 +84,21 @@ async def _profile_configure_command( ) -> None: """Configure the dataset.""" - self.info("Configuring tokenizer(s) for dataset manager") - begin = time.perf_counter() - await self._configure_tokenizer() - duration = time.perf_counter() - begin - self.info(lambda: f"Tokenizer(s) configured in {duration:.2f} seconds") + endpoint_meta = EndpointFactory.get_metadata(self.user_config.endpoint.type) + if ( + endpoint_meta.tokenizes_input + or self.user_config.input.custom_dataset_type + == CustomDatasetType.MOONCAKE_TRACE + ): + self.info("Configuring tokenizer(s) for dataset manager") + begin = time.perf_counter() + await self._configure_tokenizer() + duration = time.perf_counter() - begin + self.info(lambda: f"Tokenizer(s) configured in {duration:.2f} seconds") + else: + self.info( + "Endpoint does not tokenize input, skipping tokenizer configuration" + ) self.info(lambda: f"Configuring dataset for {self.service_id}") begin = time.perf_counter() diff --git a/src/aiperf/dataset/generator/prompt.py b/src/aiperf/dataset/generator/prompt.py index a9c0e92b0..5d18c38d0 100644 --- a/src/aiperf/dataset/generator/prompt.py +++ b/src/aiperf/dataset/generator/prompt.py @@ -49,7 +49,7 @@ def __init__(self, config: PromptConfig, tokenizer: Tokenizer, **kwargs): # TODO: move this under initialize() method # Initialize corpus if not already done - if self._tokenized_corpus is None: + if self._tokenized_corpus is None and tokenizer: self._initialize_corpus() # Initialize prefix prompts pool if the pool size > 0 diff --git a/src/aiperf/dataset/loader/mixins.py b/src/aiperf/dataset/loader/mixins.py index d803fab43..34e06b976 100644 --- a/src/aiperf/dataset/loader/mixins.py +++ b/src/aiperf/dataset/loader/mixins.py @@ -2,9 +2,13 @@ # SPDX-License-Identifier: Apache-2.0 from collections.abc import Iterable +from urllib.parse import urlparse +from aiperf.common.enums.dataset_enums import AudioFormat +from aiperf.common.enums.media_enums import MediaType from aiperf.common.models import Media -from aiperf.common.types import MediaT +from aiperf.common.types import MediaT, MediaTypeT +from aiperf.dataset import utils from aiperf.dataset.loader.models import CustomDatasetT @@ -51,8 +55,8 @@ def _convert_to_media_objects( Args: data: The custom dataset to construct media objects from. - media_class: The target media class (Text, Image, or Audio). - field: The name of the field (e.g., 'text', 'image', 'audio'). + media_class: The target media class (Text, Image, Audio, or Video). + field: The name of the field (e.g., 'text', 'image', 'audio', 'video'). name: The name of the media field. Returns: @@ -61,6 +65,9 @@ def _convert_to_media_objects( # Check singular field first value = getattr(data, field, None) if value is not None: + # Handle media content (encode local files to base64) + if field in [MediaType.IMAGE, MediaType.VIDEO, MediaType.AUDIO]: + value = self._handle_media_content(value, media_type=MediaType(field)) return [media_class(name=name, contents=[value])] # Check plural field @@ -72,4 +79,124 @@ def _convert_to_media_objects( if all(isinstance(v, media_class) for v in values): return values + # Handle media content (encode local files to base64) + if field in [MediaType.IMAGE, MediaType.VIDEO, MediaType.AUDIO]: + values = [ + self._handle_media_content(v, media_type=MediaType(field)) + for v in values + ] + return [media_class(name=name, contents=values)] + + def _is_url(self, content: str) -> bool: + """Check if content is a valid URL with scheme and netloc. + + Args: + content: The content to check. + + Returns: + True if content is a URL, False otherwise. + + Raises: + ValueError: If URL has only scheme or only netloc (invalid). + """ + url = urlparse(content) + + # Valid URL with both scheme and netloc + if url.scheme and url.netloc: + return True + + # Invalid URL - has one but not both + if url.scheme or url.netloc: + raise ValueError(f"Valid URL must have both a scheme and netloc: {content}") + + # Not a URL + return False + + def _is_already_encoded(self, content: str, media_type: MediaTypeT) -> bool: + """Check if content is already encoded in the expected format. + + Args: + content: The content to check. + media_type: The media type (MediaType.IMAGE, MediaType.AUDIO, MediaType.VIDEO). + + Returns: + True if content is already encoded, False otherwise. + """ + url = urlparse(content) + + if media_type in [MediaType.IMAGE, MediaType.VIDEO]: + # Check for data URL format + return url.scheme == "data" + + elif media_type == MediaType.AUDIO: + # Check for "format,base64" format + if "," in content and not url.scheme: + parts = content.split(",", 1) + return len(parts) == 2 and parts[0].lower() in [ + AudioFormat.WAV, + AudioFormat.MP3, + ] + return False + + return False + + def _encode_media_file(self, content: str, media_type: MediaTypeT) -> str: + """Encode a local media file to base64. + + Args: + content: The file path to encode. + media_type: The media type (MediaType.IMAGE, MediaType.AUDIO, MediaType.VIDEO). + + Returns: + The base64-encoded content in the appropriate format. + + Raises: + FileNotFoundError: If the file doesn't exist. + RuntimeError: If the format is unsupported. + """ + if media_type == MediaType.IMAGE: + img = utils.open_image(content) + img_base64 = utils.encode_image(img, img.format) + return f"data:image/{img.format.lower()};base64,{img_base64}" + + elif media_type == MediaType.AUDIO: + audio_bytes, audio_format = utils.open_audio(content) + return utils.encode_audio(audio_bytes, audio_format) + + elif media_type == MediaType.VIDEO: + video_bytes, video_format = utils.open_video(content) + return utils.encode_video(video_bytes, video_format) + + raise ValueError(f"Unsupported media type: {media_type}") + + def _handle_media_content(self, content: str, media_type: MediaTypeT) -> str: + """Generic handler for media content encoding. + + If the content is a URL, it's returned as-is. + If it's already encoded, it's returned as-is. + If it's a local file path, it's loaded and encoded to base64. + + Args: + content: The media content - URL, encoded string, or local file path. + media_type: The media type (MediaType.IMAGE, MediaType.AUDIO, MediaType.VIDEO). + + Returns: + The processed media content. + + Raises: + FileNotFoundError: If the local file doesn't exist. + RuntimeError: If the media format is unsupported. + ValueError: If URL format is invalid. + """ + # Check if it's already encoded first (before URL check) + # This handles data URLs which have a scheme but no netloc + if self._is_already_encoded(content, media_type): + return content + + # Check if it's a URL + if self._is_url(content): + return content + + # Otherwise, it's a local file path - encode it + return self._encode_media_file(content, media_type) diff --git a/src/aiperf/dataset/loader/models.py b/src/aiperf/dataset/loader/models.py index faba3516c..d5f12acc4 100644 --- a/src/aiperf/dataset/loader/models.py +++ b/src/aiperf/dataset/loader/models.py @@ -6,7 +6,7 @@ from pydantic import Field, model_validator from aiperf.common.enums import CustomDatasetType -from aiperf.common.models import AIPerfBaseModel, Audio, Image, Text +from aiperf.common.models import AIPerfBaseModel, Audio, Image, Text, Video class SingleTurn(AIPerfBaseModel): @@ -16,7 +16,7 @@ class SingleTurn(AIPerfBaseModel): Each line in the file will be treated as a single turn conversation. The single turn type - - supports multi-modal (e.g. text, image, audio) + - supports multi-modal (e.g. text, image, audio, video) - supports client-side batching for each data (e.g. batch_size > 1) - DOES NOT support multi-turn features (e.g. session_id) """ @@ -39,6 +39,11 @@ class SingleTurn(AIPerfBaseModel): None, description="List of audio strings or Audio objects format", ) + video: str | None = Field(None, description="Simple video string content") + videos: list[str] | list[Video] | None = Field( + None, + description="List of video strings or Video objects format", + ) timestamp: int | None = Field( default=None, description="Timestamp of the turn in milliseconds." ) @@ -57,6 +62,8 @@ def validate_mutually_exclusive_fields(self) -> "SingleTurn": raise ValueError("image and images cannot be set together") if self.audio and self.audios: raise ValueError("audio and audios cannot be set together") + if self.video and self.videos: + raise ValueError("video and videos cannot be set together") if self.timestamp and self.delay: raise ValueError("timestamp and delay cannot be set together") return self @@ -65,7 +72,16 @@ def validate_mutually_exclusive_fields(self) -> "SingleTurn": def validate_at_least_one_modality(self) -> "SingleTurn": """Ensure at least one modality is provided""" if not any( - [self.text, self.texts, self.image, self.images, self.audio, self.audios] + [ + self.text, + self.texts, + self.image, + self.images, + self.audio, + self.audios, + self.video, + self.videos, + ] ): raise ValueError("At least one modality must be provided") return self @@ -75,7 +91,7 @@ class MultiTurn(AIPerfBaseModel): """Defines the schema for multi-turn conversations. The multi-turn custom dataset - - supports multi-modal data (e.g. text, image, audio) + - supports multi-modal data (e.g. text, image, audio, video) - supports multi-turn features (e.g. delay, sessions, etc.) - supports client-side batching for each data (e.g. batch size > 1) """ @@ -101,7 +117,7 @@ class RandomPool(AIPerfBaseModel): """Defines the schema for random pool data entry. The random pool custom dataset - - supports multi-modal data (e.g. text, image, audio) + - supports multi-modal data (e.g. text, image, audio, video) - supports client-side batching for each data (e.g. batch size > 1) - supports named fields for each modality (e.g. text_field_a, text_field_b, etc.) - DOES NOT support multi-turn or its features (e.g. delay, sessions, etc.) @@ -124,6 +140,11 @@ class RandomPool(AIPerfBaseModel): None, description="List of audio strings or Audio objects format", ) + video: str | None = Field(None, description="Simple video string content") + videos: list[str] | list[Video] | None = Field( + None, + description="List of video strings or Video objects format", + ) @model_validator(mode="after") def validate_mutually_exclusive_fields(self) -> "RandomPool": @@ -134,13 +155,24 @@ def validate_mutually_exclusive_fields(self) -> "RandomPool": raise ValueError("image and images cannot be set together") if self.audio and self.audios: raise ValueError("audio and audios cannot be set together") + if self.video and self.videos: + raise ValueError("video and videos cannot be set together") return self @model_validator(mode="after") def validate_at_least_one_modality(self) -> "RandomPool": """Ensure at least one modality is provided""" if not any( - [self.text, self.texts, self.image, self.images, self.audio, self.audios] + [ + self.text, + self.texts, + self.image, + self.images, + self.audio, + self.audios, + self.video, + self.videos, + ] ): raise ValueError("At least one modality must be provided") return self diff --git a/src/aiperf/dataset/loader/multi_turn.py b/src/aiperf/dataset/loader/multi_turn.py index 06802739d..f5cb11bbc 100644 --- a/src/aiperf/dataset/loader/multi_turn.py +++ b/src/aiperf/dataset/loader/multi_turn.py @@ -138,6 +138,7 @@ def convert_to_conversations( texts=media[MediaType.TEXT], images=media[MediaType.IMAGE], audios=media[MediaType.AUDIO], + videos=media[MediaType.VIDEO], timestamp=single_turn.timestamp, delay=single_turn.delay, role=single_turn.role, diff --git a/src/aiperf/dataset/loader/random_pool.py b/src/aiperf/dataset/loader/random_pool.py index 46188dc7f..e55cc689d 100644 --- a/src/aiperf/dataset/loader/random_pool.py +++ b/src/aiperf/dataset/loader/random_pool.py @@ -173,6 +173,7 @@ def convert_to_conversations( texts=media[MediaType.TEXT], images=media[MediaType.IMAGE], audios=media[MediaType.AUDIO], + videos=media[MediaType.VIDEO], ) ) sampled_dataset[filename] = turns diff --git a/src/aiperf/dataset/loader/single_turn.py b/src/aiperf/dataset/loader/single_turn.py index a5070e108..0513352a0 100644 --- a/src/aiperf/dataset/loader/single_turn.py +++ b/src/aiperf/dataset/loader/single_turn.py @@ -116,6 +116,7 @@ def convert_to_conversations( texts=media[MediaType.TEXT], images=media[MediaType.IMAGE], audios=media[MediaType.AUDIO], + videos=media[MediaType.VIDEO], timestamp=single_turn.timestamp, delay=single_turn.delay, role=single_turn.role, diff --git a/src/aiperf/dataset/utils.py b/src/aiperf/dataset/utils.py index 12983bf99..db24843b3 100644 --- a/src/aiperf/dataset/utils.py +++ b/src/aiperf/dataset/utils.py @@ -7,7 +7,7 @@ from PIL import Image -from aiperf.common.enums import ImageFormat +from aiperf.common.enums import AudioFormat, ImageFormat, VideoFormat def check_file_exists(filename: Path) -> None: @@ -76,3 +76,99 @@ def encode_image(img: Image, format: str) -> str: else: img.save(buffer, format=format) return base64.b64encode(buffer.getvalue()).decode("utf-8") + + +def open_audio(filename: str) -> tuple[bytes, str]: + """Opens an audio file and returns its bytes and format. + + Args: + filename: The file path to open. + + Returns: + A tuple of (audio_bytes, format) where format is 'wav' or 'mp3'. + + Raises: + FileNotFoundError: If the file does not exist. + RuntimeError: If the audio format is unsupported. + """ + file_path = Path(filename) + check_file_exists(file_path) + + # Determine format from extension + suffix = file_path.suffix.lower() + if suffix == ".wav": + audio_format = AudioFormat.WAV + elif suffix == ".mp3": + audio_format = AudioFormat.MP3 + else: + raise RuntimeError( + f"'{suffix}' is not one of the supported audio formats: " + f"{', '.join([f.value for f in AudioFormat])}" + ) + + # Read file bytes + with open(filename, "rb") as f: + audio_bytes = f.read() + + return audio_bytes, audio_format.value + + +def encode_audio(audio_bytes: bytes, format: AudioFormat) -> str: + """Encodes audio bytes into base64 string with format prefix. + + Args: + audio_bytes: The audio data as bytes. + format: The audio format (e.g., AudioFormat.WAV, AudioFormat.MP3). + + Returns: + A string in the format "format,base64_encoded_data". + """ + base64_data = base64.b64encode(audio_bytes).decode("utf-8") + return f"{format.lower()},{base64_data}" + + +def open_video(filename: str) -> tuple[bytes, VideoFormat]: + """Opens a video file and returns its bytes and format. + + Args: + filename: The file path to open. + + Returns: + A tuple of (video_bytes, format) where format is VideoFormat.MP4. + + Raises: + FileNotFoundError: If the file does not exist. + RuntimeError: If the video format is unsupported. + """ + file_path = Path(filename) + check_file_exists(file_path) + + # Determine format from extension + suffix = file_path.suffix.lower() + if suffix == ".mp4": + video_format = VideoFormat.MP4 + else: + raise RuntimeError( + f"'{suffix}' is not one of the supported video formats: " + f"{', '.join([f for f in VideoFormat])}" + ) + + # Read file bytes + with open(filename, "rb") as f: + video_bytes = f.read() + + return video_bytes, video_format + + +def encode_video(video_bytes: bytes, format: VideoFormat) -> str: + """Encodes video bytes into base64 data URL. + + Args: + video_bytes: The video data as bytes. + format: The video format (e.g., VideoFormat.MP4). + + Returns: + A data URL string in the format "data:video/format;base64,encoded_data". + """ + base64_data = base64.b64encode(video_bytes).decode("utf-8") + return f"data:video/{format.lower()};base64,{base64_data}" diff --git a/src/aiperf/endpoints/__init__.py b/src/aiperf/endpoints/__init__.py index b81e39f88..12aa20178 100644 --- a/src/aiperf/endpoints/__init__.py +++ b/src/aiperf/endpoints/__init__.py @@ -16,6 +16,9 @@ from aiperf.endpoints.huggingface_generate import ( HuggingFaceGenerateEndpoint, ) +from aiperf.endpoints.nim_image_retrieval import ( + ImageRetrievalEndpoint, +) from aiperf.endpoints.nim_rankings import ( NIMRankingsEndpoint, ) @@ -44,6 +47,7 @@ "EmbeddingsEndpoint", "HFTeiRankingsEndpoint", "HuggingFaceGenerateEndpoint", + "ImageRetrievalEndpoint", "NIMRankingsEndpoint", "SolidoEndpoint", "TemplateEndpoint", diff --git a/src/aiperf/endpoints/nim_image_retrieval.py b/src/aiperf/endpoints/nim_image_retrieval.py new file mode 100644 index 000000000..33b384da5 --- /dev/null +++ b/src/aiperf/endpoints/nim_image_retrieval.py @@ -0,0 +1,88 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +from __future__ import annotations + +from typing import Any + +from aiperf.common.enums import EndpointType +from aiperf.common.factories import EndpointFactory +from aiperf.common.models import ParsedResponse +from aiperf.common.models.metadata import EndpointMetadata +from aiperf.common.models.record_models import ( + ImageRetrievalResponseData, + RequestInfo, +) +from aiperf.common.protocols import InferenceServerResponse +from aiperf.endpoints.base_endpoint import BaseEndpoint + + +@EndpointFactory.register(EndpointType.IMAGE_RETRIEVAL) +class ImageRetrievalEndpoint(BaseEndpoint): + """NIM Image Retrieval endpoint.""" + + @classmethod + def metadata(cls) -> EndpointMetadata: + """Return Image Retrieval endpoint metadata.""" + return EndpointMetadata( + endpoint_path="/v1/infer", + supports_images=True, + metrics_title="Image Retrieval Metrics", + ) + + def format_payload(self, request_info: RequestInfo) -> dict[str, Any]: + """Format payload for a image retrieval request.""" + if len(request_info.turns) != 1: + raise ValueError("Image Retrieval endpoint only supports one turn.") + + turn = request_info.turns[0] + model_endpoint = request_info.model_endpoint + + if turn.max_tokens: + self.warning( + "Max_tokens is provided but is not supported for Image Retrieval." + ) + + if not turn.images: + raise ValueError("Image Retrieval request requires at least one image.") + + payload = { + "input": [ + {"type": "image_url", "url": content} + for img in turn.images + if img.contents + for content in img.contents + if content + ], + } + + if not payload["input"]: + raise ValueError( + "No valid image content found. All images have empty contents or " + "empty content values." + ) + + if model_endpoint.endpoint.extra: + payload.update(model_endpoint.endpoint.extra) + + self.debug(lambda: f"Formatted Image Retrieval payload: {payload}") + return payload + + def parse_response( + self, response: InferenceServerResponse + ) -> ParsedResponse | None: + """Parse NIM Image Retrieval response.""" + json_obj = response.get_json() + if not json_obj: + self.debug( + lambda: f"No JSON object found in response: {response.get_raw()}" + ) + return None + + data = json_obj.get("data", None) + if not data: + self.debug(lambda: f"No data found in response: {json_obj}") + return None + + return ParsedResponse( + perf_ns=response.perf_ns, data=ImageRetrievalResponseData(data=data) + ) diff --git a/src/aiperf/metrics/types/image_metrics.py b/src/aiperf/metrics/types/image_metrics.py new file mode 100644 index 000000000..ac1caec3b --- /dev/null +++ b/src/aiperf/metrics/types/image_metrics.py @@ -0,0 +1,84 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +from aiperf.common.enums import MetricFlags +from aiperf.common.enums.metric_enums import GenericMetricUnit, MetricOverTimeUnit +from aiperf.common.exceptions import NoMetricValue +from aiperf.common.models import ParsedResponseRecord +from aiperf.metrics.base_record_metric import BaseRecordMetric +from aiperf.metrics.metric_dicts import MetricRecordDict +from aiperf.metrics.types.request_latency_metric import RequestLatencyMetric + + +class NumImagesMetric(BaseRecordMetric[int]): + """Number of images metric.""" + + tag = "num_images" + header = "Number of Images" + short_header = "Num Images" + unit = GenericMetricUnit.IMAGES + flags = MetricFlags.SUPPORTS_IMAGE_ONLY | MetricFlags.NO_CONSOLE + + def _parse_record( + self, record: ParsedResponseRecord, record_metrics: MetricRecordDict + ) -> int: + """Parse the number of images from the record by summing the number of images in each turn.""" + num_images = sum( + len(image.contents) + for turn in record.request.turns + for image in turn.images + ) + if num_images == 0: + raise NoMetricValue( + "Record must have at least one image in at least one turn." + ) + return num_images + + +class ImageThroughputMetric(BaseRecordMetric[float]): + """Image throughput metric.""" + + tag = "image_throughput" + header = "Image Throughput" + short_header = "Image Throughput" + display_order = 860 + unit = MetricOverTimeUnit.IMAGES_PER_SECOND + flags = MetricFlags.SUPPORTS_IMAGE_ONLY + required_metrics = { + NumImagesMetric.tag, + RequestLatencyMetric.tag, + } + + def _parse_record( + self, record: ParsedResponseRecord, record_metrics: MetricRecordDict + ) -> float: + """Parse the image throughput from the record by dividing the number of images by the request latency.""" + num_images = record_metrics.get_or_raise(NumImagesMetric) + request_latency_sec = record_metrics.get_converted_or_raise( + RequestLatencyMetric, self.unit.time_unit + ) + return num_images / request_latency_sec + + +class ImageLatencyMetric(BaseRecordMetric[float]): + """Image latency metric.""" + + tag = "image_latency" + header = "Image Latency" + short_header = "Image Latency" + display_order = 861 + unit = MetricOverTimeUnit.MS_PER_IMAGE + flags = MetricFlags.SUPPORTS_IMAGE_ONLY + required_metrics = { + NumImagesMetric.tag, + RequestLatencyMetric.tag, + } + + def _parse_record( + self, record: ParsedResponseRecord, record_metrics: MetricRecordDict + ) -> float: + """Parse the image latency from the record by dividing the request latency by the number of images.""" + num_images = record_metrics.get_or_raise(NumImagesMetric) + request_latency_ms = record_metrics.get_converted_or_raise( + RequestLatencyMetric, self.unit.time_unit + ) + return request_latency_ms / num_images diff --git a/src/aiperf/metrics/types/video_metrics.py b/src/aiperf/metrics/types/video_metrics.py new file mode 100644 index 000000000..c157ae3c2 --- /dev/null +++ b/src/aiperf/metrics/types/video_metrics.py @@ -0,0 +1,83 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +from aiperf.common.enums import MetricFlags +from aiperf.common.enums.metric_enums import GenericMetricUnit, MetricOverTimeUnit +from aiperf.common.exceptions import NoMetricValue +from aiperf.common.models import ParsedResponseRecord +from aiperf.metrics.base_record_metric import BaseRecordMetric +from aiperf.metrics.metric_dicts import MetricRecordDict +from aiperf.metrics.types.request_latency_metric import RequestLatencyMetric + + +class NumVideosMetric(BaseRecordMetric[int]): + """Number of videos metric.""" + + tag = "num_videos" + header = "Number of Videos" + short_header = "Num Videos" + unit = GenericMetricUnit.VIDEOS + flags = MetricFlags.SUPPORTS_VIDEO_ONLY | MetricFlags.NO_CONSOLE + + def _parse_record( + self, record: ParsedResponseRecord, record_metrics: MetricRecordDict + ) -> int: + """Parse the number of videos from the record by summing the number of videos in each turn.""" + num_videos = sum( + len(video.contents) + for turn in record.request.turns + for video in turn.videos + ) + if num_videos == 0: + raise NoMetricValue( + "Record must have at least one video in at least one turn." + ) + return num_videos + + +class VideoThroughputMetric(BaseRecordMetric[float]): + """Video throughput metric.""" + + tag = "video_throughput" + header = "Video Throughput" + display_order = 870 + unit = MetricOverTimeUnit.VIDEOS_PER_SECOND + flags = MetricFlags.SUPPORTS_VIDEO_ONLY + required_metrics = { + NumVideosMetric.tag, + RequestLatencyMetric.tag, + } + + def _parse_record( + self, record: ParsedResponseRecord, record_metrics: MetricRecordDict + ) -> float: + """Parse the video throughput from the record by dividing the number of videos by the request latency.""" + num_videos = record_metrics.get_or_raise(NumVideosMetric) + request_latency_sec = record_metrics.get_converted_or_raise( + RequestLatencyMetric, self.unit.time_unit + ) + return num_videos / request_latency_sec + + +class VideoLatencyMetric(BaseRecordMetric[float]): + """Video latency metric.""" + + tag = "video_latency" + header = "Video Latency" + short_header = "Video Latency" + display_order = 871 + unit = MetricOverTimeUnit.MS_PER_VIDEO + flags = MetricFlags.SUPPORTS_VIDEO_ONLY + required_metrics = { + NumVideosMetric.tag, + RequestLatencyMetric.tag, + } + + def _parse_record( + self, record: ParsedResponseRecord, record_metrics: MetricRecordDict + ) -> float: + """Parse the video latency from the record by dividing the request latency by the number of videos.""" + num_videos = record_metrics.get_or_raise(NumVideosMetric) + request_latency_ms = record_metrics.get_converted_or_raise( + RequestLatencyMetric, self.unit.time_unit + ) + return request_latency_ms / num_videos diff --git a/src/aiperf/records/inference_result_parser.py b/src/aiperf/records/inference_result_parser.py index 5216454c9..c8a7fb0e3 100644 --- a/src/aiperf/records/inference_result_parser.py +++ b/src/aiperf/records/inference_result_parser.py @@ -14,7 +14,6 @@ from aiperf.common.tokenizer import Tokenizer -# TODO: Should we create non-tokenizer based parsers? class InferenceResultParser(CommunicationMixin): """InferenceResultParser is responsible for parsing the inference results.""" @@ -47,8 +46,15 @@ async def _initialize(self) -> None: """Initialize inference result parser-specific components.""" self.debug("Initializing inference result parser") - async def configure(self) -> None: + async def _configure_tokenizers(self) -> None: """Configure the tokenizers.""" + endpoint_meta = self.endpoint.metadata() + if not endpoint_meta.tokenizes_input and not endpoint_meta.produces_tokens: + self.info( + "Endpoint does not tokenize input or produce tokens, skipping tokenizer configuration" + ) + return + self.info("Configuring tokenizers for inference result parser") begin = time.perf_counter() async with self.tokenizer_lock: @@ -93,14 +99,17 @@ async def parse_request_record( # Make sure any invalid request records are converted to error records for combined processing. request_record.create_error_from_invalid() - if request_record.has_error: - # Even for error records, compute input token count if possible - try: + try: + if self.endpoint.metadata().tokenizes_input: input_token_count = await self.compute_input_token_count(request_record) - except Exception as e: - self.warning(f"Error computing input token count for error record: {e}") + else: input_token_count = None + except Exception as e: + self.warning(f"Error computing input token count: {e!r}") + input_token_count = None + if request_record.has_error: + # Even for error records, we still store the input token count if possible. return ParsedResponseRecord( request=request_record, responses=[], @@ -109,7 +118,9 @@ async def parse_request_record( else: try: - record = await self.process_valid_record(request_record) + record = await self.process_valid_record( + request_record, input_token_count + ) self.debug( lambda: f"Received {len(record.request.responses)} responses, input_token_count: {record.input_token_count}, " f"output_token_count: {record.output_token_count}, reasoning_token_count: {record.reasoning_token_count}" @@ -120,13 +131,6 @@ async def parse_request_record( self.exception(f"Error processing valid record: {e}") request_record.error = ErrorDetails.from_exception(e) - try: - input_token_count = await self.compute_input_token_count( - request_record - ) - except Exception: - input_token_count = None - return ParsedResponseRecord( request=request_record, responses=[], @@ -134,7 +138,7 @@ async def parse_request_record( ) async def process_valid_record( - self, request_record: RequestRecord + self, request_record: RequestRecord, input_token_count: int | None ) -> ParsedResponseRecord: """Process a valid request record.""" if request_record.model_name is None: @@ -144,12 +148,11 @@ async def process_valid_record( return ParsedResponseRecord( request=request_record, responses=[], - input_token_count=None, + input_token_count=input_token_count, output_token_count=None, ) resp = self.endpoint.extract_response_data(request_record) - input_token_count = await self.compute_input_token_count(request_record) output_texts: list[str] = [] reasoning_texts: list[str] = [] @@ -164,13 +167,19 @@ async def process_valid_record( else: output_texts.append(response.data.get_text()) - tokenizer = await self.get_tokenizer(request_record.model_name) - output_token_count = ( - len(tokenizer.encode("".join(output_texts))) if output_texts else None - ) - reasoning_token_count = ( - len(tokenizer.encode("".join(reasoning_texts))) if reasoning_texts else None - ) + if self.endpoint.metadata().produces_tokens: + tokenizer = await self.get_tokenizer(request_record.model_name) + output_token_count = ( + len(tokenizer.encode("".join(output_texts))) if output_texts else None + ) + reasoning_token_count = ( + len(tokenizer.encode("".join(reasoning_texts))) + if reasoning_texts + else None + ) + else: + output_token_count = None + reasoning_token_count = None return ParsedResponseRecord( request=request_record, diff --git a/src/aiperf/records/record_processor_service.py b/src/aiperf/records/record_processor_service.py index d4e0d92ee..2b4a94ee4 100644 --- a/src/aiperf/records/record_processor_service.py +++ b/src/aiperf/records/record_processor_service.py @@ -113,7 +113,7 @@ async def _profile_configure_command( self, message: ProfileConfigureCommand ) -> None: """Configure the tokenizers.""" - await self.inference_result_parser.configure() + await self.inference_result_parser._configure_tokenizers() async def get_tokenizer(self, model: str) -> Tokenizer: """Get the tokenizer for a given model.""" diff --git a/tests/endpoints/test_nim_image_retrieval_endpoint.py b/tests/endpoints/test_nim_image_retrieval_endpoint.py new file mode 100644 index 000000000..dd22f15d4 --- /dev/null +++ b/tests/endpoints/test_nim_image_retrieval_endpoint.py @@ -0,0 +1,52 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +import pytest + +from aiperf.common.enums import EndpointType +from aiperf.common.models import Image, Turn +from aiperf.common.models.record_models import RequestInfo +from aiperf.endpoints.nim_image_retrieval import ImageRetrievalEndpoint +from tests.endpoints.conftest import ( + create_endpoint_with_mock_transport, + create_model_endpoint, +) + +BASE64_PNG = "" + + +class TestImageRetrievalEndpoint: + @pytest.fixture + def model_endpoint(self): + return create_model_endpoint( + EndpointType.IMAGE_RETRIEVAL, model_name="image-retrieval-model" + ) + + @pytest.fixture + def endpoint(self, model_endpoint): + return create_endpoint_with_mock_transport( + ImageRetrievalEndpoint, model_endpoint + ) + + def test_format_payload_basic(self, endpoint, model_endpoint): + """Test basic format_payload with valid image.""" + turn = Turn( + images=[Image(contents=[BASE64_PNG])], model="image-retrieval-model" + ) + request_info = RequestInfo(model_endpoint=model_endpoint, turns=[turn]) + + payload = endpoint.format_payload(request_info) + + assert len(payload["input"]) == 1 + assert payload["input"][0]["type"] == "image_url" + assert payload["input"][0]["url"] == BASE64_PNG + + def test_format_payload_validation_error(self, endpoint, model_endpoint): + """Test that empty images raises ValueError.""" + turn = Turn(images=[], model="image-retrieval-model") + request_info = RequestInfo(model_endpoint=model_endpoint, turns=[turn]) + + with pytest.raises( + ValueError, match="Image Retrieval request requires at least one image" + ): + endpoint.format_payload(request_info) diff --git a/tests/endpoints/test_nim_image_retrieval_endpoint_parse_response.py b/tests/endpoints/test_nim_image_retrieval_endpoint_parse_response.py new file mode 100644 index 000000000..1ee38bdc0 --- /dev/null +++ b/tests/endpoints/test_nim_image_retrieval_endpoint_parse_response.py @@ -0,0 +1,78 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from unittest.mock import MagicMock, Mock, patch + +import pytest + +from aiperf.common.enums import EndpointType, ModelSelectionStrategy +from aiperf.common.models.model_endpoint_info import ( + EndpointInfo, + ModelEndpointInfo, + ModelInfo, + ModelListInfo, +) +from aiperf.common.models.record_models import ImageRetrievalResponseData +from aiperf.endpoints.nim_image_retrieval import ImageRetrievalEndpoint + + +class TestImageRetrievalEndpointParseResponse: + @pytest.fixture + def endpoint(self): + model_endpoint = ModelEndpointInfo( + models=ModelListInfo( + models=[ModelInfo(name="image-retrieval-model")], + model_selection_strategy=ModelSelectionStrategy.ROUND_ROBIN, + ), + endpoint=EndpointInfo( + type=EndpointType.IMAGE_RETRIEVAL, + base_url="http://localhost:8000", + ), + ) + with patch( + "aiperf.common.factories.TransportFactory.create_instance" + ) as mock_transport: + mock_transport.return_value = MagicMock() + return ImageRetrievalEndpoint(model_endpoint=model_endpoint) + + def test_parse_response_basic(self, endpoint): + """Test basic parse_response with valid bounding box data.""" + mock_response = Mock() + mock_response.perf_ns = 123456789 + mock_response.get_json.return_value = { + "data": [ + { + "index": 0, + "bounding_boxes": { + "chart": [ + { + "x_min": 10, + "y_min": 20, + "x_max": 100, + "y_max": 120, + "confidence": 0.95, + } + ] + }, + } + ], + "usage": {"images_size_mb": 0.5}, + } + + parsed = endpoint.parse_response(mock_response) + + assert parsed is not None + assert parsed.perf_ns == 123456789 + assert isinstance(parsed.data, ImageRetrievalResponseData) + assert len(parsed.data.data) == 1 + assert "chart" in parsed.data.data[0]["bounding_boxes"] + + def test_parse_response_invalid(self, endpoint): + """Test parse_response returns None for invalid/empty data.""" + mock_response = Mock() + mock_response.perf_ns = 123456789 + mock_response.get_json.return_value = None + + parsed = endpoint.parse_response(mock_response) + + assert parsed is None diff --git a/tests/loaders/conftest.py b/tests/loaders/conftest.py index 6faf06025..45469fb14 100644 --- a/tests/loaders/conftest.py +++ b/tests/loaders/conftest.py @@ -1,6 +1,7 @@ # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 +import shutil import tempfile from pathlib import Path @@ -33,3 +34,170 @@ def _create_file(content_lines): def default_user_config(): """Create a default UserConfig for testing.""" return UserConfig(endpoint=EndpointConfig(model_names=["test-model"])) + + +@pytest.fixture +def test_images(tmp_path): + """Create temporary test images copied from source assets. + + Returns: + A dictionary mapping image names to their temporary file paths. + """ + # Get the source images directory + source_images_dir = Path("src/aiperf/dataset/generator/assets/source_images") + + # Get some actual image files + source_images = list(source_images_dir.glob("*.jpg"))[:4] + + if not source_images: + # Create a minimal synthetic JPEG image if no source images found + from PIL import Image + + synthetic_path = tmp_path / "image1.jpg" + img = Image.new("RGB", (1, 1), color="red") + img.save(synthetic_path, format="JPEG") + return {"image1.jpg": str(synthetic_path)} + + # Create temporary copies preserving original file extensions + image_map = {} + for i, source_img in enumerate(source_images, 1): + # Preserve the original file extension to avoid MIME/encoder mismatches + dest_filename = f"image{i}{source_img.suffix}" + dest_path = tmp_path / dest_filename + shutil.copy(source_img, dest_path) + image_map[dest_filename] = str(dest_path) + + return image_map + + +@pytest.fixture +def create_test_image(tmp_path): + """Create a single test image copied from source assets. + + Returns: + A function that creates a test image with the given name. + """ + source_images_dir = Path("src/aiperf/dataset/generator/assets/source_images") + source_images = list(source_images_dir.glob("*.jpg")) + + def _create_image(name: str = "test_image.jpg"): + from PIL import Image + + dest_path = tmp_path / name + requested_ext = Path(name).suffix.lower() + + if source_images: + # Load the source image and save it in the requested format + img = Image.open(source_images[0]) + if requested_ext in [".jpg", ".jpeg"]: + img.save(dest_path, format="JPEG") + elif requested_ext == ".png": + img.save(dest_path, format="PNG") + else: + # Default to JPEG + img.save(dest_path, format="JPEG") + else: + # Create a minimal synthetic image matching the requested format + img = Image.new("RGB", (1, 1), color="red") + if requested_ext in [".jpg", ".jpeg"]: + img.save(dest_path, format="JPEG") + elif requested_ext == ".png": + img.save(dest_path, format="PNG") + else: + # Default to JPEG + img.save(dest_path, format="JPEG") + + return str(dest_path) + + return _create_image + + +@pytest.fixture +def create_test_audio(tmp_path): + """Create test audio files (WAV and MP3). + + Returns: + A function that creates a test audio file with the given name. + """ + import wave + + import numpy as np + + def _create_audio(name: str = "test_audio.wav"): + dest_path = tmp_path / name + + # Generate simple sine wave audio + sample_rate = 16000 + duration = 0.1 # 100ms + frequency = 440 # A4 note + + t = np.linspace(0, duration, int(sample_rate * duration)) + audio_data = np.sin(2 * np.pi * frequency * t) + + # Convert to 16-bit PCM + audio_data = (audio_data * 32767).astype(np.int16) + + # Write WAV file + with wave.open(str(dest_path), "wb") as wav_file: + wav_file.setnchannels(1) # mono + wav_file.setsampwidth(2) # 16-bit + wav_file.setframerate(sample_rate) + wav_file.writeframes(audio_data.tobytes()) + + return str(dest_path) + + return _create_audio + + +@pytest.fixture +def create_test_video(tmp_path): + """Create test video files (MP4). + + Returns: + A function that creates a test video file with the given name. + """ + from PIL import Image, ImageDraw + + def _create_video(name: str = "test_video.mp4"): + dest_path = tmp_path / name + + # Try using ffmpeg-python if available, otherwise create a minimal MP4 + try: + import tempfile + + import ffmpeg + + # Create a few simple frames + temp_frame_dir = tempfile.mkdtemp(prefix="video_frames_") + for i in range(3): + img = Image.new("RGB", (64, 64), (i * 80, 0, 0)) + draw = ImageDraw.Draw(img) + draw.text((10, 25), f"F{i}", fill=(255, 255, 255)) + img.save(f"{temp_frame_dir}/frame_{i:03d}.png") + + # Use ffmpeg to create video + ( + ffmpeg.input(f"{temp_frame_dir}/frame_%03d.png", framerate=1) + .output(str(dest_path), vcodec="libx264", pix_fmt="yuv420p", t=1) + .overwrite_output() + .run(quiet=True) + ) + + for file in Path(temp_frame_dir).glob("*.png"): + file.unlink() + Path(temp_frame_dir).rmdir() + + except (ImportError, Exception): + # Fallback: create a minimal valid MP4 file + # This is a minimal MP4 with just headers (won't play but is valid for testing) + minimal_mp4 = bytes.fromhex( + "000000186674797069736f6d0000020069736f6d69736f32617663310000" + "0008667265650000002c6d6461740000001c6d6f6f7600000000006d7668" + "6400000000000000000000000000000001000000" + ) + with open(dest_path, "wb") as f: + f.write(minimal_mp4) + + return str(dest_path) + + return _create_video diff --git a/tests/loaders/test_multi_turn.py b/tests/loaders/test_multi_turn.py index 3119b2cb1..d6fd1c8e7 100644 --- a/tests/loaders/test_multi_turn.py +++ b/tests/loaders/test_multi_turn.py @@ -485,15 +485,17 @@ def test_convert_multiple_multi_turn_entries_same_session( assert conversation.turns[0].texts[0].contents == ["First"] assert conversation.turns[1].texts[0].contents == ["Second"] - def test_convert_multimodal_multi_turn_data(self, default_user_config): + def test_convert_multimodal_multi_turn_data(self, test_images, default_user_config): """Test converting multimodal multi-turn data.""" data = { "session_1": [ MultiTurn( session_id="session_1", turns=[ - SingleTurn(text="What's this?", image="image1.png"), - SingleTurn(text="Follow up", image="image2.png"), + SingleTurn( + text="What's this?", image=test_images["image1.jpg"] + ), + SingleTurn(text="Follow up", image=test_images["image2.jpg"]), ], ) ] @@ -508,15 +510,19 @@ def test_convert_multimodal_multi_turn_data(self, default_user_config): conversation = conversations[0] assert len(conversation.turns) == 2 - # First turn + # First turn - image should be base64 encoded first_turn = conversation.turns[0] assert first_turn.texts[0].contents == ["What's this?"] - assert first_turn.images[0].contents == ["image1.png"] + assert len(first_turn.images[0].contents) == 1 + assert first_turn.images[0].contents[0].startswith("data:image/") + assert ";base64," in first_turn.images[0].contents[0] - # Second turn + # Second turn - image should be base64 encoded second_turn = conversation.turns[1] assert second_turn.texts[0].contents == ["Follow up"] - assert second_turn.images[0].contents == ["image2.png"] + assert len(second_turn.images[0].contents) == 1 + assert second_turn.images[0].contents[0].startswith("data:image/") + assert ";base64," in second_turn.images[0].contents[0] def test_convert_structured_objects_in_turns(self, default_user_config): """Test converting MultiTurn with structured Text objects.""" diff --git a/tests/loaders/test_random_pool.py b/tests/loaders/test_random_pool.py index 52d85d495..f48a50571 100644 --- a/tests/loaders/test_random_pool.py +++ b/tests/loaders/test_random_pool.py @@ -232,14 +232,16 @@ def test_convert_simple_pool_data(self, default_user_config): assert len(conversations[0].turns) == 1 assert conversations[0].turns[0].texts[0].contents == ["Hello world"] - def test_convert_multimodal_pool_data(self, default_user_config): + def test_convert_multimodal_pool_data(self, create_test_image, default_user_config): """Test converting multimodal random pool data.""" + test_image = create_test_image("test_image.jpg") + data = { "multimodal.jsonl": [ RandomPool( text="What's in this image?", - image="/path/to/image.png", - audio="/path/to/audio.wav", + image=test_image, + audio="https://example.com/audio.wav", ) ] } @@ -254,17 +256,19 @@ def test_convert_multimodal_pool_data(self, default_user_config): assert len(turn.texts) == 1 assert turn.texts[0].contents == ["What's in this image?"] assert len(turn.images) == 1 - assert turn.images[0].contents == ["/path/to/image.png"] + # Image should be base64 encoded + assert turn.images[0].contents[0].startswith("data:image/") + assert ";base64," in turn.images[0].contents[0] assert len(turn.audios) == 1 - assert turn.audios[0].contents == ["/path/to/audio.wav"] + assert turn.audios[0].contents == ["https://example.com/audio.wav"] - def test_convert_batched_pool_data(self, default_user_config): + def test_convert_batched_pool_data(self, test_images, default_user_config): """Test converting pool data with batched content.""" data = { "batched.jsonl": [ RandomPool( texts=["First question", "Second question"], - images=["/image1.png", "/image2.png"], + images=[test_images["image1.jpg"], test_images["image2.jpg"]], ) ] } @@ -279,7 +283,11 @@ def test_convert_batched_pool_data(self, default_user_config): assert len(turn.texts) == 1 assert turn.texts[0].contents == ["First question", "Second question"] assert len(turn.images) == 1 - assert turn.images[0].contents == ["/image1.png", "/image2.png"] + # Both images should be base64 encoded + assert len(turn.images[0].contents) == 2 + for img_content in turn.images[0].contents: + assert img_content.startswith("data:image/") + assert ";base64," in img_content def test_convert_multiple_files_no_name_specified(self, default_user_config): """Test converting data from multiple files without name specified.""" @@ -334,16 +342,18 @@ def test_convert_multiple_files_with_name_specified(self, default_user_config): assert turn.texts[1].name == "def456" # uses name from Text object assert turn.texts[1].contents == ["AI is artificial intelligence"] - def test_convert_multiple_files_with_multiple_samples(self, default_user_config): + def test_convert_multiple_files_with_multiple_samples( + self, test_images, default_user_config + ): """Test converting data from multiple files with multiple samples.""" data = { "queries.jsonl": [ - RandomPool(text="text1", image="image1.png"), - RandomPool(text="text2", image="image2.png"), + RandomPool(text="text1", image=test_images["image1.jpg"]), + RandomPool(text="text2", image=test_images["image2.jpg"]), ], "contexts.jsonl": [ - RandomPool(text="text3", image="image3.png"), - RandomPool(text="text4", image="image4.png"), + RandomPool(text="text3", image=test_images["image3.jpg"]), + RandomPool(text="text4", image=test_images["image4.jpg"]), ], } @@ -373,19 +383,17 @@ def test_convert_multiple_files_with_multiple_samples(self, default_user_config) ("text2", "text3"), ("text2", "text4"), } - possible_image_contents = { - ("image1.png", "image3.png"), - ("image1.png", "image4.png"), - ("image2.png", "image3.png"), - ("image2.png", "image4.png"), - } text_contents = tuple(t.contents[0] for t in turn1.texts) - image_contents = tuple(i.contents[0] for i in turn1.images) assert text_contents in possible_text_contents - assert image_contents in possible_image_contents + # Images should be base64 encoded + for img in turn1.images: + assert img.contents[0].startswith("data:image/") + assert ";base64," in img.contents[0] text_contents = tuple(t.contents[0] for t in turn2.texts) - image_contents = tuple(i.contents[0] for i in turn2.images) assert text_contents in possible_text_contents - assert image_contents in possible_image_contents + # Images should be base64 encoded + for img in turn2.images: + assert img.contents[0].startswith("data:image/") + assert ";base64," in img.contents[0] diff --git a/tests/loaders/test_single_turn.py b/tests/loaders/test_single_turn.py index 1329ddd83..996d128e7 100644 --- a/tests/loaders/test_single_turn.py +++ b/tests/loaders/test_single_turn.py @@ -1,7 +1,9 @@ # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 +import base64 import json +from pathlib import Path import pytest @@ -329,8 +331,8 @@ def test_convert_multimodal_data(self, default_user_config): "session_1": [ SingleTurn( text="What's in this image?", - image="/path/to/image.png", - audio="/path/to/audio.wav", + image="https://example.com/image.png", + audio="https://example.com/audio.wav", ) ] } @@ -342,9 +344,9 @@ def test_convert_multimodal_data(self, default_user_config): assert len(turn.texts) == 1 assert turn.texts[0].contents == ["What's in this image?"] assert len(turn.images) == 1 - assert turn.images[0].contents == ["/path/to/image.png"] + assert turn.images[0].contents == ["https://example.com/image.png"] assert len(turn.audios) == 1 - assert turn.audios[0].contents == ["/path/to/audio.wav"] + assert turn.audios[0].contents == ["https://example.com/audio.wav"] def test_convert_batched_data(self, default_user_config): """Test converting batched data to conversations.""" @@ -355,7 +357,7 @@ def test_convert_batched_data(self, default_user_config): "session_1": [ SingleTurn( texts=["First message", "Second message"], - images=["/path/1.png", "/path/2.png"], + images=["https://example.com/1.png", "https://example.com/2.png"], ) ] } @@ -367,7 +369,10 @@ def test_convert_batched_data(self, default_user_config): assert len(turn.texts) == 1 assert turn.texts[0].contents == ["First message", "Second message"] assert len(turn.images) == 1 - assert turn.images[0].contents == ["/path/1.png", "/path/2.png"] + assert turn.images[0].contents == [ + "https://example.com/1.png", + "https://example.com/2.png", + ] def test_convert_with_timing_data(self, default_user_config): """Test converting data with timestamp and delay.""" @@ -416,3 +421,315 @@ def test_convert_structured_text_objects(self, default_user_config): assert turn.texts[0].contents == ["What is AI?"] assert turn.texts[1].name == "context" assert turn.texts[1].contents == ["AI stands for artificial intelligence"] + + +class TestSingleTurnDatasetLoaderImageEncoding: + """Test base64 encoding for local image files.""" + + def test_convert_local_image_to_base64(self, create_jsonl_file): + """Test that local image files are encoded to base64 data URLs.""" + # Use an actual test image from the source_images directory + test_image = Path( + "src/aiperf/dataset/generator/assets/source_images/0bfd8fdf-457f-43c8-9253-a2346d37d26a_1024.jpg" + ) + + # Skip if the image doesn't exist + if not test_image.exists(): + pytest.skip("Test image not found") + + content = [ + json.dumps({"text": "What is in this image?", "image": str(test_image)}) + ] + filename = create_jsonl_file(content) + + loader = SingleTurnDatasetLoader(filename) + data = loader.load_dataset() + conversations = loader.convert_to_conversations(data) + + assert len(conversations) == 1 + turn = conversations[0].turns[0] + + # Check that the image was encoded + assert len(turn.images) == 1 + image_content = turn.images[0].contents[0] + + # Verify it's a data URL with base64 encoding + assert image_content.startswith("data:image/") + assert ";base64," in image_content + + # Extract and verify the base64 content is valid + base64_part = image_content.split(";base64,")[1] + try: + base64.b64decode(base64_part) + except Exception as e: + pytest.fail(f"Invalid base64 encoding: {e}") + + def test_url_images_not_encoded(self, create_jsonl_file): + """Test that URLs are not encoded and passed through as-is.""" + content = [ + json.dumps( + {"text": "What is this?", "image": "https://example.com/image.png"} + ) + ] + filename = create_jsonl_file(content) + + loader = SingleTurnDatasetLoader(filename) + data = loader.load_dataset() + conversations = loader.convert_to_conversations(data) + + assert len(conversations) == 1 + turn = conversations[0].turns[0] + + # URL should remain unchanged + assert turn.images[0].contents[0] == "https://example.com/image.png" + + def test_data_url_not_reencoded(self, create_jsonl_file): + """Test that existing data URLs are not re-encoded.""" + data_url = "" + content = [json.dumps({"text": "Already encoded", "image": data_url})] + filename = create_jsonl_file(content) + + loader = SingleTurnDatasetLoader(filename) + data = loader.load_dataset() + conversations = loader.convert_to_conversations(data) + + assert len(conversations) == 1 + turn = conversations[0].turns[0] + + # Data URL should remain unchanged + assert turn.images[0].contents[0] == data_url + + def test_multiple_images_encoded(self, create_jsonl_file): + """Test that multiple local images are all encoded.""" + test_images = [ + Path( + "src/aiperf/dataset/generator/assets/source_images/0bfd8fdf-457f-43c8-9253-a2346d37d26a_1024.jpg" + ), + Path( + "src/aiperf/dataset/generator/assets/source_images/119544eb-9bbf-47d1-8d93-a51de6370295_861.jpg" + ), + ] + + # Skip if images don't exist + for img in test_images: + if not img.exists(): + pytest.skip("Test images not found") + + content = [ + json.dumps( + { + "text": "What are in these images?", + "images": [str(img) for img in test_images], + } + ) + ] + filename = create_jsonl_file(content) + + loader = SingleTurnDatasetLoader(filename) + data = loader.load_dataset() + conversations = loader.convert_to_conversations(data) + + assert len(conversations) == 1 + turn = conversations[0].turns[0] + + # Check that both images were encoded + assert len(turn.images) == 1 + assert len(turn.images[0].contents) == 2 + + for image_content in turn.images[0].contents: + assert image_content.startswith("data:image/") + assert ";base64," in image_content + + def test_mixed_image_sources(self, create_jsonl_file): + """Test handling mixed image sources (local files, URLs, data URLs).""" + test_image = Path( + "src/aiperf/dataset/generator/assets/source_images/0bfd8fdf-457f-43c8-9253-a2346d37d26a_1024.jpg" + ) + + if not test_image.exists(): + pytest.skip("Test image not found") + + url = "https://example.com/image.png" + data_url = "" + + content = [ + json.dumps( + {"text": "Mixed sources", "images": [str(test_image), url, data_url]} + ) + ] + filename = create_jsonl_file(content) + + loader = SingleTurnDatasetLoader(filename) + data = loader.load_dataset() + conversations = loader.convert_to_conversations(data) + + assert len(conversations) == 1 + turn = conversations[0].turns[0] + + # Check that we have 3 images + assert len(turn.images) == 1 + assert len(turn.images[0].contents) == 3 + + # First one (local file) should be encoded + assert turn.images[0].contents[0].startswith("data:image/") + assert ";base64," in turn.images[0].contents[0] + + # Second one (URL) should be unchanged + assert turn.images[0].contents[1] == url + + # Third one (data URL) should be unchanged + assert turn.images[0].contents[2] == data_url + + def test_invalid_image_path_raises_error(self, create_jsonl_file): + """Test that invalid local file paths raise appropriate errors.""" + content = [ + json.dumps( + {"text": "Invalid image", "image": "/nonexistent/path/image.png"} + ) + ] + filename = create_jsonl_file(content) + + loader = SingleTurnDatasetLoader(filename) + data = loader.load_dataset() + + # The error should be raised during conversion + with pytest.raises(FileNotFoundError): + loader.convert_to_conversations(data) + + +class TestSingleTurnDatasetLoaderAudioEncoding: + """Test base64 encoding for local audio files.""" + + def test_convert_local_audio_to_base64(self, create_jsonl_file, create_test_audio): + """Test that local audio files are encoded to format,base64 strings.""" + test_audio = create_test_audio("test_audio.wav") + + content = [json.dumps({"text": "Transcribe this audio", "audio": test_audio})] + filename = create_jsonl_file(content) + + loader = SingleTurnDatasetLoader(filename) + data = loader.load_dataset() + conversations = loader.convert_to_conversations(data) + + assert len(conversations) == 1 + turn = conversations[0].turns[0] + + # Check that the audio was encoded + assert len(turn.audios) == 1 + audio_content = turn.audios[0].contents[0] + + # Verify it's in "format,base64" format + assert "," in audio_content + format_part, base64_part = audio_content.split(",", 1) + assert format_part == "wav" + + # Verify the base64 content is valid + try: + base64.b64decode(base64_part) + except Exception as e: + pytest.fail(f"Invalid base64 encoding: {e}") + + def test_audio_url_not_encoded(self, create_jsonl_file): + """Test that audio URLs are not encoded and passed through as-is.""" + content = [ + json.dumps( + {"text": "Audio from URL", "audio": "https://example.com/audio.wav"} + ) + ] + filename = create_jsonl_file(content) + + loader = SingleTurnDatasetLoader(filename) + data = loader.load_dataset() + conversations = loader.convert_to_conversations(data) + + assert len(conversations) == 1 + turn = conversations[0].turns[0] + + # URL should remain unchanged + assert turn.audios[0].contents[0] == "https://example.com/audio.wav" + + def test_audio_already_encoded_not_reencoded(self, create_jsonl_file): + """Test that existing format,base64 audio strings are not re-encoded.""" + encoded_audio = "wav,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mNk+M9QDwADhgGAWjR9awAAAABJRU5ErkJggg==" + content = [json.dumps({"text": "Already encoded", "audio": encoded_audio})] + filename = create_jsonl_file(content) + + loader = SingleTurnDatasetLoader(filename) + data = loader.load_dataset() + conversations = loader.convert_to_conversations(data) + + assert len(conversations) == 1 + turn = conversations[0].turns[0] + + # Should remain unchanged + assert turn.audios[0].contents[0] == encoded_audio + + +class TestSingleTurnDatasetLoaderVideoEncoding: + """Test base64 encoding for local video files.""" + + def test_convert_local_video_to_base64(self, create_jsonl_file, create_test_video): + """Test that local video files are encoded to data URL format.""" + test_video = create_test_video("test_video.mp4") + + content = [json.dumps({"text": "Describe this video", "video": test_video})] + filename = create_jsonl_file(content) + + loader = SingleTurnDatasetLoader(filename) + data = loader.load_dataset() + conversations = loader.convert_to_conversations(data) + + assert len(conversations) == 1 + turn = conversations[0].turns[0] + + # Check that the video was encoded + assert len(turn.videos) == 1 + video_content = turn.videos[0].contents[0] + + # Verify it's a data URL with base64 encoding + assert video_content.startswith("data:video/") + assert ";base64," in video_content + + # Extract and verify the base64 content is valid + base64_part = video_content.split(";base64,")[1] + try: + base64.b64decode(base64_part) + except Exception as e: + pytest.fail(f"Invalid base64 encoding: {e}") + + def test_video_url_not_encoded(self, create_jsonl_file): + """Test that video URLs are not encoded and passed through as-is.""" + content = [ + json.dumps( + {"text": "Video from URL", "video": "https://example.com/video.mp4"} + ) + ] + filename = create_jsonl_file(content) + + loader = SingleTurnDatasetLoader(filename) + data = loader.load_dataset() + conversations = loader.convert_to_conversations(data) + + assert len(conversations) == 1 + turn = conversations[0].turns[0] + + # URL should remain unchanged + assert turn.videos[0].contents[0] == "https://example.com/video.mp4" + + def test_video_data_url_not_reencoded(self, create_jsonl_file): + """Test that existing data URL videos are not re-encoded.""" + data_url = ( + "data:video/mp4;base64,AAAAIGZ0eXBpc29tAAACAGlzb21pc28yYXZjMQAAAAAABW1kYXQ=" + ) + content = [json.dumps({"text": "Already encoded", "video": data_url})] + filename = create_jsonl_file(content) + + loader = SingleTurnDatasetLoader(filename) + data = loader.load_dataset() + conversations = loader.convert_to_conversations(data) + + assert len(conversations) == 1 + turn = conversations[0].turns[0] + + # Data URL should remain unchanged + assert turn.videos[0].contents[0] == data_url diff --git a/tests/parsers/test_usage_passthrough.py b/tests/parsers/test_usage_passthrough.py index 9e1e79c28..39f46cf3a 100644 --- a/tests/parsers/test_usage_passthrough.py +++ b/tests/parsers/test_usage_passthrough.py @@ -98,7 +98,9 @@ async def test_passthrough_single_response(self, parser, mock_tokenizer): ] parser.tokenizers = {"test-model": mock_tokenizer} - result = await parser.process_valid_record(create_test_record()) + result = await parser.process_valid_record( + create_test_record(), input_token_count=10 + ) assert len(result.responses) == 1 assert result.responses[0].usage.prompt_tokens == 10 @@ -126,7 +128,9 @@ async def test_passthrough_streaming_cumulative(self, parser, mock_tokenizer): ] parser.tokenizers = {"test-model": mock_tokenizer} - result = await parser.process_valid_record(create_test_record()) + result = await parser.process_valid_record( + create_test_record(), input_token_count=10 + ) # Each response has cumulative values - parser doesn't aggregate assert len(result.responses) == 3 @@ -150,7 +154,9 @@ async def test_passthrough_nested_reasoning(self, parser, mock_tokenizer): ] parser.tokenizers = {"test-model": mock_tokenizer} - result = await parser.process_valid_record(create_test_record()) + result = await parser.process_valid_record( + create_test_record(), input_token_count=10 + ) assert result.responses[0].usage.reasoning_tokens == 50 @@ -206,7 +212,9 @@ async def test_passthrough_various_scenarios( parser.endpoint.extract_response_data.return_value = responses parser.tokenizers = {"test-model": mock_tokenizer} - result = await parser.process_valid_record(create_test_record()) + result = await parser.process_valid_record( + create_test_record(), input_token_count=10 + ) assert len(result.responses) == len(expected_usage) for i, expected in enumerate(expected_usage):