diff --git a/aiperf/common/config/config_defaults.py b/aiperf/common/config/config_defaults.py index c6ac7216b..740dba932 100644 --- a/aiperf/common/config/config_defaults.py +++ b/aiperf/common/config/config_defaults.py @@ -11,6 +11,7 @@ AudioFormat, CommunicationBackend, CustomDatasetType, + DatasetType, EndpointType, ImageFormat, ModelSelectionStrategy, @@ -48,6 +49,7 @@ class InputDefaults: FIXED_SCHEDULE_AUTO_OFFSET = False FIXED_SCHEDULE_START_OFFSET = None FIXED_SCHEDULE_END_OFFSET = None + DATASET_TYPE = DatasetType.SYNTHETIC CUSTOM_DATASET_TYPE = CustomDatasetType.MOONCAKE_TRACE RANDOM_SEED = None NUM_DATASET_ENTRIES = 100 @@ -142,6 +144,7 @@ class ServiceDefaults: EXTRA_VERBOSE = False LOG_PATH = None RECORD_PROCESSOR_SERVICE_COUNT = None + DATASET_PROCESSOR_SERVICE_COUNT = 1 PROGRESS_REPORT_INTERVAL = 1.0 UI_TYPE = AIPerfUIType.DASHBOARD diff --git a/aiperf/common/config/input_config.py b/aiperf/common/config/input_config.py index 2c19acf10..f31569617 100644 --- a/aiperf/common/config/input_config.py +++ b/aiperf/common/config/input_config.py @@ -19,7 +19,7 @@ from aiperf.common.config.groups import Groups from aiperf.common.config.image_config import ImageConfig from aiperf.common.config.prompt_config import PromptConfig -from aiperf.common.enums import CustomDatasetType +from aiperf.common.enums import CustomDatasetType, DatasetType logger = AIPerfLogger(__name__) @@ -66,6 +66,24 @@ def validate_fixed_schedule_start_and_end_offset(self) -> Self: ) return self + @model_validator(mode="after") + def validate_dataset_type(self) -> Self: + """Validate the dataset type configuration.""" + if self.dataset_type == DatasetType.SYNTHETIC and self.file is not None: + self.dataset_type = DatasetType.CUSTOM + logger.warning( + "Dataset type is set to CUSTOM because a file or custom dataset type is " + "provided for synthetic dataset" + ) + if self.dataset_type == DatasetType.CUSTOM: + if self.custom_dataset_type is None: + raise ValueError( + "A custom dataset type requires a custom dataset type to be provided" + ) + if self.file is None: + raise ValueError("A custom dataset type requires a file to be provided") + return self + extra: Annotated[ Any, Field( @@ -175,6 +193,15 @@ def validate_fixed_schedule_start_and_end_offset(self) -> Self: ), ] = InputDefaults.FIXED_SCHEDULE_END_OFFSET + dataset_type: Annotated[ + DatasetType, + Field(description="The type of dataset to generate for the requests."), + Parameter( + name=("--dataset-type",), + group=_CLI_GROUP, + ), + ] = InputDefaults.DATASET_TYPE + # NEW AIPerf Option custom_dataset_type: Annotated[ CustomDatasetType, diff --git a/aiperf/common/config/service_config.py b/aiperf/common/config/service_config.py index 288763e94..eb29fdcb5 100644 --- a/aiperf/common/config/service_config.py +++ b/aiperf/common/config/service_config.py @@ -192,6 +192,17 @@ def validate_comm_config(self) -> Self: ), ] = ServiceDefaults.RECORD_PROCESSOR_SERVICE_COUNT + dataset_processor_service_count: Annotated[ + int, + Field( + description="Number of services to spawn for processing dataset generation.", + ), + Parameter( + name=("--dataset-processor-service-count", "--dataset-processors"), + group=_CLI_GROUP, + ), + ] = ServiceDefaults.DATASET_PROCESSOR_SERVICE_COUNT + progress_report_interval: Annotated[ float, Field( diff --git a/aiperf/common/config/zmq_config.py b/aiperf/common/config/zmq_config.py index 2d88219e8..4324f72da 100644 --- a/aiperf/common/config/zmq_config.py +++ b/aiperf/common/config/zmq_config.py @@ -56,6 +56,16 @@ def credit_drop_address(self) -> str: def credit_return_address(self) -> str: """Get the credit return address based on protocol configuration.""" + @property + @abstractmethod + def dataset_job_address(self) -> str: + """Get the dataset job address based on protocol configuration.""" + + @property + @abstractmethod + def dataset_result_address(self) -> str: + """Get the dataset result address based on protocol configuration.""" + def get_address(self, address_type: CommAddress) -> str: """Get the actual address based on the address type.""" address_map = { @@ -65,6 +75,8 @@ def get_address(self, address_type: CommAddress) -> str: CommAddress.DATASET_MANAGER_PROXY_BACKEND: self.dataset_manager_proxy_config.backend_address, CommAddress.CREDIT_DROP: self.credit_drop_address, CommAddress.CREDIT_RETURN: self.credit_return_address, + CommAddress.DATASET_JOB: self.dataset_job_address, + CommAddress.DATASET_RESULT: self.dataset_result_address, CommAddress.RECORDS: self.records_push_pull_address, CommAddress.RAW_INFERENCE_PROXY_FRONTEND: self.raw_inference_proxy_config.frontend_address, CommAddress.RAW_INFERENCE_PROXY_BACKEND: self.raw_inference_proxy_config.backend_address, @@ -170,6 +182,12 @@ class ZMQTCPConfig(BaseZMQCommunicationConfig): credit_return_port: int = Field( default=5563, description="Port for credit return operations" ) + dataset_job_port: int = Field( + default=5665, description="Port for dataset job operations" + ) + dataset_result_port: int = Field( + default=5666, description="Port for dataset result operations" + ) dataset_manager_proxy_config: ZMQTCPProxyConfig = Field( # type: ignore default=ZMQTCPProxyConfig( frontend_port=5661, @@ -239,3 +257,13 @@ def credit_drop_address(self) -> str: def credit_return_address(self) -> str: """Get the credit return address based on protocol configuration.""" return f"ipc://{self.path}/credit_return.ipc" + + @property + def dataset_job_address(self) -> str: + """Get the dataset job address based on protocol configuration.""" + return f"ipc://{self.path}/dataset_job.ipc" + + @property + def dataset_result_address(self) -> str: + """Get the dataset result address based on protocol configuration.""" + return f"ipc://{self.path}/dataset_result.ipc" diff --git a/aiperf/common/enums/__init__.py b/aiperf/common/enums/__init__.py index 27d63bc48..416214e6d 100644 --- a/aiperf/common/enums/__init__.py +++ b/aiperf/common/enums/__init__.py @@ -29,8 +29,8 @@ ) from aiperf.common.enums.dataset_enums import ( AudioFormat, - ComposerType, CustomDatasetType, + DatasetType, ImageFormat, PromptSource, ) @@ -109,11 +109,11 @@ "CommandResponseStatus", "CommandType", "CommunicationBackend", - "ComposerType", "ConsoleExporterType", "CreditPhase", "CustomDatasetType", "DataExporterType", + "DatasetType", "EndpointType", "EndpointTypeInfo", "GenericMetricUnit", diff --git a/aiperf/common/enums/command_enums.py b/aiperf/common/enums/command_enums.py index 0b7416459..b4ba83cc0 100644 --- a/aiperf/common/enums/command_enums.py +++ b/aiperf/common/enums/command_enums.py @@ -14,6 +14,8 @@ class CommandType(CaseInsensitiveStrEnum): SHUTDOWN = "shutdown" SHUTDOWN_WORKERS = "shutdown_workers" SPAWN_WORKERS = "spawn_workers" + SPAWN_DATASET_PROCESSORS = "spawn_dataset_processors" + SHUTDOWN_DATASET_PROCESSORS = "shutdown_dataset_processors" class CommandResponseStatus(CaseInsensitiveStrEnum): diff --git a/aiperf/common/enums/communication_enums.py b/aiperf/common/enums/communication_enums.py index dc931e9a5..42a8b374f 100644 --- a/aiperf/common/enums/communication_enums.py +++ b/aiperf/common/enums/communication_enums.py @@ -49,6 +49,12 @@ class CommAddress(CaseInsensitiveStrEnum): RAW_INFERENCE_PROXY_BACKEND = "raw_inference_proxy_backend" """Backend address for the InferenceParser to receive raw inference messages from Workers.""" + DATASET_JOB = "dataset_job" + """Address for sending dataset generation jobs to the DatasetProcessor.""" + + DATASET_RESULT = "dataset_result" + """Address for sending dataset generation results to the DatasetManager.""" + class ZMQProxyType(CaseInsensitiveStrEnum): DEALER_ROUTER = "dealer_router" diff --git a/aiperf/common/enums/dataset_enums.py b/aiperf/common/enums/dataset_enums.py index 968c0f87f..1b44e860a 100644 --- a/aiperf/common/enums/dataset_enums.py +++ b/aiperf/common/enums/dataset_enums.py @@ -4,10 +4,10 @@ from aiperf.common.enums.base_enums import CaseInsensitiveStrEnum -class ComposerType(CaseInsensitiveStrEnum): +class DatasetType(CaseInsensitiveStrEnum): SYNTHETIC = "synthetic" CUSTOM = "custom" - PUBLIC_DATASET = "public_dataset" + PUBLIC = "public" class CustomDatasetType(CaseInsensitiveStrEnum): diff --git a/aiperf/common/enums/message_enums.py b/aiperf/common/enums/message_enums.py index b35bcf264..b9d2a9263 100644 --- a/aiperf/common/enums/message_enums.py +++ b/aiperf/common/enums/message_enums.py @@ -30,6 +30,12 @@ class MessageType(CaseInsensitiveStrEnum): DATASET_CONFIGURED_NOTIFICATION = "dataset_configured_notification" DATASET_TIMING_REQUEST = "dataset_timing_request" DATASET_TIMING_RESPONSE = "dataset_timing_response" + DATASET_RESULT = "dataset_result" + PROCESS_SYNTHETIC_DATASET = "process_synthetic_dataset" + PROCESS_MOONCAKE_TRACE_DATASET = "process_mooncake_trace_dataset" + PROCESS_MULTI_TURN_DATASET = "process_multi_turn_dataset" + PROCESS_SINGLE_TURN_DATASET = "process_single_turn_dataset" + PROCESS_RANDOM_POOL_DATASET = "process_random_pool_dataset" ERROR = "error" HEARTBEAT = "heartbeat" INFERENCE_RESULTS = "inference_results" diff --git a/aiperf/common/enums/service_enums.py b/aiperf/common/enums/service_enums.py index ca54b68a0..63b5461f8 100644 --- a/aiperf/common/enums/service_enums.py +++ b/aiperf/common/enums/service_enums.py @@ -42,6 +42,7 @@ class ServiceType(CaseInsensitiveStrEnum): TIMING_MANAGER = "timing_manager" RECORD_PROCESSOR = "record_processor" RECORDS_MANAGER = "records_manager" + DATASET_PROCESSOR = "dataset_processor" WORKER_MANAGER = "worker_manager" WORKER = "worker" diff --git a/aiperf/common/factories.py b/aiperf/common/factories.py index 2b2329196..f48a6d1d8 100644 --- a/aiperf/common/factories.py +++ b/aiperf/common/factories.py @@ -10,7 +10,6 @@ AIPerfUIType, CommClientType, CommunicationBackend, - ComposerType, ConsoleExporterType, CustomDatasetType, DataExporterType, @@ -58,7 +57,6 @@ from aiperf.dataset import ( CustomDatasetLoaderProtocol, ) - from aiperf.dataset.composer.base import BaseDatasetComposer from aiperf.exporters.exporter_config import ExporterConfig from aiperf.zmq.zmq_proxy_base import BaseZMQProxy @@ -377,20 +375,6 @@ def create_instance( # type: ignore[override] return super().create_instance(class_type, config=config, **kwargs) -class ComposerFactory(AIPerfFactory[ComposerType, "BaseDatasetComposer"]): - """Factory for registering and creating BaseDatasetComposer instances based on the specified composer type. - see: :class:`aiperf.common.factories.AIPerfFactory` for more details. - """ - - @classmethod - def create_instance( # type: ignore[override] - cls, - class_type: ComposerType | str, - **kwargs, - ) -> "BaseDatasetComposer": - return super().create_instance(class_type, **kwargs) - - class ConsoleExporterFactory( AIPerfFactory[ConsoleExporterType, "ConsoleExporterProtocol"] ): diff --git a/aiperf/common/messages/__init__.py b/aiperf/common/messages/__init__.py index 922692581..b97c7553b 100644 --- a/aiperf/common/messages/__init__.py +++ b/aiperf/common/messages/__init__.py @@ -50,6 +50,13 @@ DatasetConfiguredNotification, DatasetTimingRequest, DatasetTimingResponse, + ProcessDatasetMessage, + ProcessDatasetResponseMessage, + ProcessMooncakeTraceDatasetMessage, + ProcessMultiTurnDatasetMessage, + ProcessRandomPoolDatasetMessage, + ProcessSingleTurnDatasetMessage, + ProcessSyntheticDatasetMessage, ) from aiperf.common.messages.inference_messages import ( InferenceResultsMessage, @@ -110,9 +117,16 @@ "Message", "MetricRecordsMessage", "ParsedInferenceResultsMessage", + "ProcessDatasetMessage", + "ProcessDatasetResponseMessage", + "ProcessMooncakeTraceDatasetMessage", + "ProcessMultiTurnDatasetMessage", + "ProcessRandomPoolDatasetMessage", "ProcessRecordsCommand", "ProcessRecordsResponse", "ProcessRecordsResultMessage", + "ProcessSingleTurnDatasetMessage", + "ProcessSyntheticDatasetMessage", "ProcessingStatsMessage", "ProfileCancelCommand", "ProfileConfigureCommand", diff --git a/aiperf/common/messages/dataset_messages.py b/aiperf/common/messages/dataset_messages.py index c69d7a1ba..f792ecff8 100644 --- a/aiperf/common/messages/dataset_messages.py +++ b/aiperf/common/messages/dataset_messages.py @@ -1,6 +1,8 @@ # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 +from typing import Any + from pydantic import Field from aiperf.common.enums import CreditPhase, MessageType @@ -9,6 +11,72 @@ from aiperf.common.types import MessageTypeT +class ProcessDatasetMessage(BaseServiceMessage): + """Message for sending dataset processing requests to processors.""" + + random_seed: int | None = Field( + default=None, description="Random seed for the dataset generation" + ) + + +class ProcessSyntheticDatasetMessage(ProcessDatasetMessage): + """Message for processing synthetic data.""" + + message_type: MessageTypeT = MessageType.PROCESS_SYNTHETIC_DATASET + num_conversations: int = Field( + ..., description="Number of conversation to generate" + ) + + +class ProcessMooncakeTraceDatasetMessage(ProcessDatasetMessage): + """Message for processing mooncake trace data.""" + + message_type: MessageTypeT = MessageType.PROCESS_MOONCAKE_TRACE_DATASET + dataset: list[tuple[str, Any]] = Field( + ..., description="The Mooncake trace dataset" + ) + + +class ProcessMultiTurnDatasetMessage(ProcessDatasetMessage): + """Message for processing multi-turn data.""" + + message_type: MessageTypeT = MessageType.PROCESS_MULTI_TURN_DATASET + dataset: list[tuple[str, Any]] = Field(..., description="The multi-turn dataset") + + +class ProcessSingleTurnDatasetMessage(ProcessDatasetMessage): + """Message for processing single-turn data.""" + + message_type: MessageTypeT = MessageType.PROCESS_SINGLE_TURN_DATASET + dataset: list[tuple[str, Any]] = Field(..., description="The single-turn dataset") + + +class ProcessRandomPoolDatasetMessage(ProcessDatasetMessage): + """Message for processing random pool data.""" + + message_type: MessageTypeT = MessageType.PROCESS_RANDOM_POOL_DATASET + dataset: list[tuple[str, Any]] = Field(..., description="The random pool dataset") + num_conversations: int = Field( + ..., description="Number of conversations to generate" + ) + + +class ProcessDatasetResponseMessage(ProcessDatasetMessage): + """Message for returning dataset processing responses.""" + + message_type: MessageTypeT = MessageType.DATASET_RESULT + + generated_data: list[Conversation] = Field( + default_factory=list, description="Generated conversations" + ) + error_message: str | None = Field( + default=None, description="Error message if job failed" + ) + processing_time_ms: float | None = Field( + default=None, description="Time taken to process the job in milliseconds" + ) + + class ConversationRequestMessage(BaseServiceMessage): """Message to request a full conversation by ID.""" diff --git a/aiperf/controller/system_controller.py b/aiperf/controller/system_controller.py index 09430042f..35c816b3b 100644 --- a/aiperf/controller/system_controller.py +++ b/aiperf/controller/system_controller.py @@ -92,6 +92,11 @@ def __init__( else: self.scale_record_processors_with_workers = True + if self.service_config.dataset_processor_service_count is not None: + self.required_services[ServiceType.DATASET_PROCESSOR] = ( + self.service_config.dataset_processor_service_count + ) + self.proxy_manager: ProxyManager = ProxyManager( service_config=self.service_config ) diff --git a/aiperf/dataset/__init__.py b/aiperf/dataset/__init__.py index 7205804f1..7af2215a2 100644 --- a/aiperf/dataset/__init__.py +++ b/aiperf/dataset/__init__.py @@ -1,15 +1,16 @@ # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 - -from aiperf.dataset.composer import ( - BaseDatasetComposer, - CustomDatasetComposer, - SyntheticDatasetComposer, -) +######################################################################## +## 🚩 mkinit flags 🚩 ## +######################################################################## +__ignore__ = ["main"] +######################################################################## +## ⚠️ This file is auto-generated by mkinit ⚠️ ## +## ⚠️ Do not edit below this line ⚠️ ## +######################################################################## from aiperf.dataset.dataset_manager import ( DATASET_CONFIGURATION_TIMEOUT, DatasetManager, - main, ) from aiperf.dataset.generator import ( DEFAULT_CORPUS_FILE, @@ -33,6 +34,9 @@ SingleTurn, SingleTurnDatasetLoader, ) +from aiperf.dataset.processor import ( + DatasetProcessor, +) from aiperf.dataset.utils import ( check_file_exists, encode_image, @@ -45,14 +49,13 @@ __all__ = [ "AudioGenerator", - "BaseDatasetComposer", "BaseGenerator", - "CustomDatasetComposer", "CustomDatasetLoaderProtocol", "CustomDatasetT", "DATASET_CONFIGURATION_TIMEOUT", "DEFAULT_CORPUS_FILE", "DatasetManager", + "DatasetProcessor", "ImageGenerator", "MP3_SUPPORTED_SAMPLE_RATES", "MediaConversionMixin", @@ -66,11 +69,9 @@ "SUPPORTED_BIT_DEPTHS", "SingleTurn", "SingleTurnDatasetLoader", - "SyntheticDatasetComposer", "check_file_exists", "encode_image", "load_json_str", - "main", "open_image", "sample_normal", "sample_positive_normal", diff --git a/aiperf/dataset/composer/__init__.py b/aiperf/dataset/composer/__init__.py deleted file mode 100644 index 04892cc8e..000000000 --- a/aiperf/dataset/composer/__init__.py +++ /dev/null @@ -1,21 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 -######################################################################## -## 🚩 mkinit flags 🚩 ## -######################################################################## -__ignore__ = [] -######################################################################## -## ⚠️ This file is auto-generated by mkinit ⚠️ ## -## ⚠️ Do not edit below this line ⚠️ ## -######################################################################## -from aiperf.dataset.composer.base import ( - BaseDatasetComposer, -) -from aiperf.dataset.composer.custom import ( - CustomDatasetComposer, -) -from aiperf.dataset.composer.synthetic import ( - SyntheticDatasetComposer, -) - -__all__ = ["BaseDatasetComposer", "CustomDatasetComposer", "SyntheticDatasetComposer"] diff --git a/aiperf/dataset/composer/base.py b/aiperf/dataset/composer/base.py deleted file mode 100644 index 1c6b42ac7..000000000 --- a/aiperf/dataset/composer/base.py +++ /dev/null @@ -1,88 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 - -import random -from abc import ABC, abstractmethod - -from aiperf.common.config import UserConfig -from aiperf.common.enums import ModelSelectionStrategy -from aiperf.common.mixins import AIPerfLoggerMixin -from aiperf.common.models import Conversation, Turn -from aiperf.common.tokenizer import Tokenizer -from aiperf.dataset import utils -from aiperf.dataset.generator import ( - AudioGenerator, - ImageGenerator, - PromptGenerator, -) - - -class BaseDatasetComposer(AIPerfLoggerMixin, ABC): - def __init__(self, config: UserConfig, tokenizer: Tokenizer, **kwargs): - self.config = config - super().__init__(config=config, tokenizer=tokenizer, **kwargs) - self.prompt_generator = PromptGenerator(config.input.prompt, tokenizer) - self.image_generator = ImageGenerator(config.input.image) - self.audio_generator = AudioGenerator(config.input.audio) - self.turn_count = 0 - - @abstractmethod - def create_dataset(self) -> list[Conversation]: - """ - Create a set of conversation objects from the given configuration. - - Returns: - list[Conversation]: A list of conversation objects. - """ - ... - - def _select_model_name(self) -> str: - if ( - self.config.endpoint.model_selection_strategy - == ModelSelectionStrategy.RANDOM - ): - return random.choice(self.config.endpoint.model_names) - elif ( - self.config.endpoint.model_selection_strategy - == ModelSelectionStrategy.ROUND_ROBIN - ): - model_name = self.config.endpoint.model_names[ - self.turn_count % len(self.config.endpoint.model_names) - ] - self.turn_count += 1 - return model_name - else: - raise ValueError( - f"Invalid model selection strategy: {self.config.endpoint.model_selection_strategy}." - ) - - def _set_max_tokens(self, turn: Turn) -> None: - """Set max_tokens for the turn based on the output configuration. - - Args: - turn: The turn object to finalize. - """ - output_tokens_config = self.config.input.prompt.output_tokens - if output_tokens_config.mean is not None: - stddev = output_tokens_config.stddev - turn.max_tokens = utils.sample_positive_normal_integer( - output_tokens_config.mean, stddev - ) - - def _finalize_turn(self, turn: Turn) -> None: - """Finalize a turn by populating all required metadata fields. - - This method handles: - - Model name selection - - Max tokens sampling based on output configuration - - Any other turn-level metadata that needs to be set - - Args: - turn: The turn object to finalize. - """ - turn.model = self._select_model_name() - self._set_max_tokens(turn) - - @property - def prefix_prompt_enabled(self) -> bool: - return self.config.input.prompt.prefix_prompt.length > 0 diff --git a/aiperf/dataset/composer/custom.py b/aiperf/dataset/composer/custom.py deleted file mode 100644 index 35bced43e..000000000 --- a/aiperf/dataset/composer/custom.py +++ /dev/null @@ -1,55 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 - -from aiperf.common.config import UserConfig -from aiperf.common.decorators import implements_protocol -from aiperf.common.enums import ComposerType, CustomDatasetType -from aiperf.common.factories import ComposerFactory, CustomDatasetFactory -from aiperf.common.models import Conversation -from aiperf.common.protocols import ServiceProtocol -from aiperf.common.tokenizer import Tokenizer -from aiperf.dataset import utils -from aiperf.dataset.composer.base import BaseDatasetComposer - - -@implements_protocol(ServiceProtocol) -@ComposerFactory.register(ComposerType.CUSTOM) -class CustomDatasetComposer(BaseDatasetComposer): - def __init__(self, config: UserConfig, tokenizer: Tokenizer): - super().__init__(config, tokenizer) - - def create_dataset(self) -> list[Conversation]: - """Create conversations from a file or directory. - - Returns: - list[Conversation]: A list of conversation objects. - """ - # TODO: (future) for K8s, we need to transfer file data from SC (across node) - utils.check_file_exists(self.config.input.file) - - self._create_loader_instance(self.config.input.custom_dataset_type) - dataset = self.loader.load_dataset() - conversations = self.loader.convert_to_conversations(dataset) - self._finalize_conversations(conversations) - return conversations - - def _create_loader_instance(self, dataset_type: CustomDatasetType) -> None: - """Initializes the dataset loader based on the custom dataset type. - - Args: - dataset_type: The type of custom dataset to create. - """ - kwargs = {"filename": self.config.input.file} - if dataset_type == CustomDatasetType.MOONCAKE_TRACE: - kwargs["prompt_generator"] = self.prompt_generator - kwargs["user_config"] = self.config - elif dataset_type == CustomDatasetType.RANDOM_POOL: - kwargs["num_conversations"] = self.config.input.conversation.num - - self.loader = CustomDatasetFactory.create_instance(dataset_type, **kwargs) - - def _finalize_conversations(self, conversations: list[Conversation]) -> None: - """Finalize all turns in conversations by adding metadata.""" - for conversation in conversations: - for turn in conversation.turns: - self._finalize_turn(turn) diff --git a/aiperf/dataset/composer/synthetic.py b/aiperf/dataset/composer/synthetic.py deleted file mode 100644 index b5c8557cb..000000000 --- a/aiperf/dataset/composer/synthetic.py +++ /dev/null @@ -1,161 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 - -import uuid - -from aiperf.common.config import UserConfig -from aiperf.common.enums import ComposerType -from aiperf.common.factories import ComposerFactory -from aiperf.common.models import Audio, Conversation, Image, Text, Turn -from aiperf.common.tokenizer import Tokenizer -from aiperf.dataset import utils -from aiperf.dataset.composer.base import BaseDatasetComposer - - -@ComposerFactory.register(ComposerType.SYNTHETIC) -class SyntheticDatasetComposer(BaseDatasetComposer): - def __init__(self, config: UserConfig, tokenizer: Tokenizer): - super().__init__(config, tokenizer) - - if ( - not self.include_prompt - and not self.include_image - and not self.include_audio - ): - raise ValueError( - "All synthetic data are disabled. " - "Please enable at least one of prompt, image, or audio by " - "setting the mean to a positive value." - ) - - def create_dataset(self) -> list[Conversation]: - """Create a synthetic conversation dataset from the given configuration. - - It generates a set of conversations with a varying number of turns, - where each turn contains synthetic text, image, and audio payloads. - - Returns: - list[Conversation]: A list of conversation objects. - """ - conversations = [] - for _ in range(self.config.input.conversation.num): - conversation = Conversation(session_id=str(uuid.uuid4())) - - num_turns = utils.sample_positive_normal_integer( - self.config.input.conversation.turn.mean, - self.config.input.conversation.turn.stddev, - ) - self.logger.debug("Creating conversation with %d turns", num_turns) - - for turn_idx in range(num_turns): - turn = self._create_turn(is_first=(turn_idx == 0)) - conversation.turns.append(turn) - conversations.append(conversation) - return conversations - - def _create_turn(self, is_first: bool) -> Turn: - """Create a turn object that contains synthetic payloads to send. - - It generates multi-modal data (e.g. text, image, audio) using synthetic - generators and also the delay between turns. - - Args: - is_first: Whether the turn is the first turn in the conversation. - - Returns: - Turn: A dataset representation of a single turn. - """ - turn = Turn() - - if self.include_prompt: - turn.texts.append(self._generate_text_payloads(is_first)) - if self.include_image: - turn.images.append(self._generate_image_payloads()) - if self.include_audio: - turn.audios.append(self._generate_audio_payloads()) - - # Add randomized delays between each turn. Skip if first turn. - if not is_first: - turn.delay = utils.sample_positive_normal_integer( - self.config.input.conversation.turn.delay.mean, - self.config.input.conversation.turn.delay.stddev, - ) - - if not turn.texts and not turn.images and not turn.audios: - self.logger.warning( - "There were no synthetic payloads generated. " - "Please enable at least one of prompt, image, or audio by " - "setting the mean to a positive value." - ) - - self._finalize_turn(turn) - - return turn - - def _generate_text_payloads(self, is_first: bool) -> Text: - """Generate synthetic text payloads. - - If the turn is the first turn in the conversation, it could add a prefix prompt - to the prompt. - - Args: - is_first: Whether the turn is the first turn in the conversation. - - Returns: - Text: A text payload object. - """ - text = Text(name="text") - for _ in range(self.config.input.prompt.batch_size): - prompt = self.prompt_generator.generate( - mean=self.config.input.prompt.input_tokens.mean, - stddev=self.config.input.prompt.input_tokens.stddev, - ) - - if self.prefix_prompt_enabled and is_first: - # TODO: Rename - prefix_prompt = self.prompt_generator.get_random_prefix_prompt() - prompt = f"{prefix_prompt} {prompt}" - - text.contents.append(prompt) - return text - - def _generate_image_payloads(self) -> Image: - """ - Generate synthetic images if the image width and height are specified. - - Returns: - Image: An image payload object. - """ - image = Image(name="image_url") - for _ in range(self.config.input.image.batch_size): - data = self.image_generator.generate() - image.contents.append(data) - return image - - def _generate_audio_payloads(self) -> Audio: - """ - Generate synthetic audios if the audio length is specified. - - Returns: - Audio: An audio payload object. - """ - audio = Audio(name="input_audio") - for _ in range(self.config.input.audio.batch_size): - data = self.audio_generator.generate() - audio.contents.append(data) - return audio - - @property - def include_prompt(self) -> bool: - return self.config.input.prompt.input_tokens.mean > 0 - - @property - def include_image(self) -> bool: - return ( - self.config.input.image.width.mean > 0 - and self.config.input.image.height.mean > 0 - ) - - @property - def include_audio(self) -> bool: - return self.config.input.audio.length.mean > 0 diff --git a/aiperf/dataset/dataset_manager.py b/aiperf/dataset/dataset_manager.py index 769cbf994..552c58cde 100644 --- a/aiperf/dataset/dataset_manager.py +++ b/aiperf/dataset/dataset_manager.py @@ -10,12 +10,16 @@ from aiperf.common.enums import ( CommAddress, CommandType, - ComposerType, + CustomDatasetType, + DatasetType, MessageType, ServiceType, ) -from aiperf.common.factories import ComposerFactory, ServiceFactory -from aiperf.common.hooks import on_command, on_request +from aiperf.common.factories import ( + CustomDatasetFactory, + ServiceFactory, +) +from aiperf.common.hooks import on_command, on_pull_message, on_request from aiperf.common.messages import ( ConversationRequestMessage, ConversationResponseMessage, @@ -24,9 +28,16 @@ DatasetConfiguredNotification, DatasetTimingRequest, DatasetTimingResponse, + ProcessDatasetMessage, + ProcessDatasetResponseMessage, + ProcessMooncakeTraceDatasetMessage, + ProcessMultiTurnDatasetMessage, + ProcessRandomPoolDatasetMessage, + ProcessSingleTurnDatasetMessage, + ProcessSyntheticDatasetMessage, ProfileConfigureCommand, ) -from aiperf.common.mixins import ReplyClientMixin +from aiperf.common.mixins import PullClientMixin, ReplyClientMixin from aiperf.common.models import Conversation from aiperf.common.protocols import ServiceProtocol from aiperf.common.tokenizer import Tokenizer @@ -36,7 +47,7 @@ @implements_protocol(ServiceProtocol) @ServiceFactory.register(ServiceType.DATASET_MANAGER) -class DatasetManager(ReplyClientMixin, BaseComponentService): +class DatasetManager(ReplyClientMixin, PullClientMixin, BaseComponentService): """ The DatasetManager primary responsibility is to manage the data generation or acquisition. For synthetic generation, it contains the code to generate the prompts or tokens. @@ -55,6 +66,8 @@ def __init__( service_id=service_id, reply_client_address=CommAddress.DATASET_MANAGER_PROXY_BACKEND, reply_client_bind=False, + pull_client_address=CommAddress.DATASET_RESULT, + pull_client_bind=True, ) self.debug("Dataset manager __init__") self.user_config = user_config @@ -66,6 +79,14 @@ def __init__( ) self.dataset_configured = asyncio.Event() + self.jobs_push_client = self.comms.create_push_client( + address=CommAddress.DATASET_JOB, + bind=True, + ) + self.num_processors = self.service_config.dataset_processor_service_count + self._custom_dataset_type = self.user_config.input.custom_dataset_type + self.total_dataset_size: int | None = None + @on_command(CommandType.PROFILE_CONFIGURE) async def _profile_configure_command( self, message: ProfileConfigureCommand @@ -73,53 +94,116 @@ async def _profile_configure_command( """Configure the dataset.""" self.info(lambda: f"Configuring dataset for {self.service_id}") begin = time.perf_counter() - await self._configure_dataset() - duration = time.perf_counter() - begin - self.info(lambda: f"Dataset configured in {duration:.2f} seconds") - async def _configure_dataset(self) -> None: - if self.user_config is None: - raise self._service_error("User config is required for dataset manager") - - self.dataset_configured.clear() - if self.user_config.input.file: - composer_type = ComposerType.CUSTOM - self.debug( - lambda: f"Detected input file '{self.user_config.input.file}'. Setting the composer type to {ComposerType.CUSTOM}." - ) + if self.user_config.input.dataset_type == DatasetType.SYNTHETIC: + await self._configure_synthetic_dataset() + elif self.user_config.input.dataset_type == DatasetType.CUSTOM: + await self._configure_custom_dataset() else: - composer_type = ComposerType.SYNTHETIC - self.debug( - lambda: f"No input file detected. Setting the composer type to {ComposerType.SYNTHETIC}." + raise NotImplementedError( + f"Dataset type {self.user_config.input.dataset_type} is not supported yet." ) - tokenizer_name = self.user_config.tokenizer.name - if tokenizer_name is None: - # TODO: What do we do if there are multiple models? - # How will we know which tokenizer to use? - tokenizer_name = self.user_config.endpoint.model_names[0] + await self._wait_for_dataset_configuration() + self._session_ids_cache = list(self.dataset.keys()) + duration = time.perf_counter() - begin + self.info( + lambda: f"Dataset configured in {duration:.2f} seconds with {len(self.dataset)} conversations" + ) + + async def _configure_synthetic_dataset(self) -> None: + """Configure a synthetic dataset. - tokenizer = Tokenizer.from_pretrained( - tokenizer_name, - trust_remote_code=self.user_config.tokenizer.trust_remote_code, - revision=self.user_config.tokenizer.revision, + This will generate a synthetic dataset and distribute the work across the + dataset processors. + """ + self.total_dataset_size = self.user_config.input.conversation.num + self.info( + f"Distributing {self.total_dataset_size} conversations " + f"across {self.num_processors} processors " ) - composer = ComposerFactory.create_instance( - composer_type, - config=self.user_config, - tokenizer=tokenizer, + + remaining_dataset_size = self.total_dataset_size + chunk_size = max(self.total_dataset_size // self.num_processors, 1) + while remaining_dataset_size > 0: + await self.jobs_push_client.push( + message=ProcessSyntheticDatasetMessage( + service_id=self.service_id, + num_conversations=chunk_size, + ) + ) + + remaining_dataset_size -= chunk_size + if remaining_dataset_size < chunk_size: + chunk_size = remaining_dataset_size + + async def _configure_custom_dataset(self) -> None: + """Configure a custom dataset. + + This will load a custom dataset from a file and distribute the work across the + dataset processors. + """ + process_messages: dict[CustomDatasetType, type[ProcessDatasetMessage]] = { + CustomDatasetType.MOONCAKE_TRACE: ProcessMooncakeTraceDatasetMessage, + CustomDatasetType.MULTI_TURN: ProcessMultiTurnDatasetMessage, + CustomDatasetType.SINGLE_TURN: ProcessSingleTurnDatasetMessage, + CustomDatasetType.RANDOM_POOL: ProcessRandomPoolDatasetMessage, + } + process_message = process_messages[self._custom_dataset_type] + + custom_kwargs = {} + if self._custom_dataset_type == CustomDatasetType.RANDOM_POOL: + custom_kwargs["num_conversations"] = self.user_config.input.conversation.num + + loader = CustomDatasetFactory.create_instance( + self._custom_dataset_type, + user_config=self.user_config, + ) + dataset_list = list(loader.load_dataset().items()) + self.total_dataset_size = len(dataset_list) + + self.info( + f"Distributing {self.total_dataset_size} dataset items " + f"across {self.num_processors} processors" ) - conversations = composer.create_dataset() - self.dataset = {conv.session_id: conv for conv in conversations} - self._session_ids_cache = list(self.dataset.keys()) - self.dataset_configured.set() - await self.publish( - DatasetConfiguredNotification( - service_id=self.service_id, - ), + remaining_dataset_size = self.total_dataset_size + chunk_size = max(self.total_dataset_size // self.num_processors, 1) + start = 0 + while remaining_dataset_size > 0: + await self.jobs_push_client.push( + message=process_message( + service_id=self.service_id, + dataset=dataset_list[start : start + chunk_size], + **custom_kwargs, + ) + ) + start += chunk_size + remaining_dataset_size -= chunk_size + if remaining_dataset_size < chunk_size: + chunk_size = remaining_dataset_size + + @on_pull_message(MessageType.DATASET_RESULT) + async def _on_result(self, message: ProcessDatasetResponseMessage) -> None: + """Handle a dataset job result.""" + self.debug( + lambda: f"Received processed dataset response from {message.service_id}" ) + for conversation in message.generated_data: + self.dataset[conversation.session_id] = conversation + + if len(self.dataset) == self.total_dataset_size: + self.debug( + lambda: f"Dataset configured with {len(self.dataset)} conversations" + ) + self.dataset_configured.set() + await self.publish( + DatasetConfiguredNotification( + service_id=self.service_id, + ), + ) + @on_request(MessageType.CONVERSATION_REQUEST) async def _handle_conversation_request( self, message: ConversationRequestMessage diff --git a/aiperf/dataset/loader/mooncake_trace.py b/aiperf/dataset/loader/mooncake_trace.py index 410202196..f434e797b 100644 --- a/aiperf/dataset/loader/mooncake_trace.py +++ b/aiperf/dataset/loader/mooncake_trace.py @@ -9,8 +9,6 @@ from aiperf.common.enums import CustomDatasetType from aiperf.common.factories import CustomDatasetFactory from aiperf.common.mixins import AIPerfLoggerMixin -from aiperf.common.models import Conversation, Text, Turn -from aiperf.dataset.generator import PromptGenerator from aiperf.dataset.loader.models import MooncakeTrace from aiperf.dataset.loader.protocol import CustomDatasetLoaderProtocol @@ -33,20 +31,13 @@ class MooncakeTraceDatasetLoader(AIPerfLoggerMixin): ``` """ - def __init__( - self, - filename: str, - prompt_generator: PromptGenerator, - user_config: UserConfig, - **kwargs, - ): - self.filename = filename - self.prompt_generator = prompt_generator - self.user_config = user_config - self._skipped_traces = 0 + def __init__(self, user_config: UserConfig, **kwargs) -> None: + super().__init__(user_config=user_config, **kwargs) + self.debug("MooncakeTraceDatasetLoader __init__") + self.filename = user_config.input.file self._start_offset = user_config.input.fixed_schedule_start_offset self._end_offset = user_config.input.fixed_schedule_end_offset - super().__init__(user_config=user_config, **kwargs) + self._skipped_traces = 0 def load_dataset(self) -> dict[str, list[MooncakeTrace]]: """Load Mooncake trace data from a file. @@ -82,32 +73,3 @@ def _timestamp_within_offsets(self, timestamp: int) -> bool: return (self._start_offset is None or timestamp >= self._start_offset) and ( self._end_offset is None or timestamp <= self._end_offset ) - - def convert_to_conversations( - self, data: dict[str, list[MooncakeTrace]] - ) -> list[Conversation]: - """Convert all the Mooncake trace data to conversation objects. - - Args: - data: A dictionary of session_id and list of Mooncake trace data. - - Returns: - A list of conversations. - """ - conversations = [] - for session_id, traces in data.items(): - conversation = Conversation(session_id=session_id) - for trace in traces: - prompt = self.prompt_generator.generate( - mean=trace.input_length, - stddev=0, - hash_ids=trace.hash_ids, - ) - turn = Turn( - timestamp=trace.timestamp, - texts=[Text(name="text", contents=[prompt])], - max_tokens=trace.output_length, - ) - conversation.turns.append(turn) - conversations.append(conversation) - return conversations diff --git a/aiperf/dataset/loader/multi_turn.py b/aiperf/dataset/loader/multi_turn.py index 94fc8775e..428c47c9c 100644 --- a/aiperf/dataset/loader/multi_turn.py +++ b/aiperf/dataset/loader/multi_turn.py @@ -4,15 +4,15 @@ import uuid from collections import defaultdict -from aiperf.common.enums import CustomDatasetType, MediaType +from aiperf.common.config import UserConfig +from aiperf.common.enums import CustomDatasetType from aiperf.common.factories import CustomDatasetFactory -from aiperf.common.models import Conversation, Turn -from aiperf.dataset.loader.mixins import MediaConversionMixin +from aiperf.common.mixins import AIPerfLoggerMixin from aiperf.dataset.loader.models import MultiTurn @CustomDatasetFactory.register(CustomDatasetType.MULTI_TURN) -class MultiTurnDatasetLoader(MediaConversionMixin): +class MultiTurnDatasetLoader(AIPerfLoggerMixin): """A dataset loader that loads multi-turn data from a file. The multi-turn type @@ -90,8 +90,10 @@ class MultiTurnDatasetLoader(MediaConversionMixin): ``` """ - def __init__(self, filename: str): - self.filename = filename + def __init__(self, user_config: UserConfig, **kwargs) -> None: + super().__init__(user_config=user_config, **kwargs) + self.debug("MultiTurnDatasetLoader __init__") + self.filename = user_config.input.file def load_dataset(self) -> dict[str, list[MultiTurn]]: """Load multi-turn data from a JSONL file. @@ -114,35 +116,3 @@ def load_dataset(self) -> dict[str, list[MultiTurn]]: data[session_id].append(multi_turn_data) return data - - def convert_to_conversations( - self, data: dict[str, list[MultiTurn]] - ) -> list[Conversation]: - """Convert multi-turn data to conversation objects. - - Args: - data: A dictionary mapping session_id to list of MultiTurn objects. - - Returns: - A list of conversations. - """ - conversations = [] - for session_id, multi_turns in data.items(): - conversation = Conversation(session_id=session_id) - - # Process all MultiTurn objects for this session - for multi_turn in multi_turns: - for single_turn in multi_turn.turns: - media = self.convert_to_media_objects(single_turn) - conversation.turns.append( - Turn( - texts=media[MediaType.TEXT], - images=media[MediaType.IMAGE], - audios=media[MediaType.AUDIO], - timestamp=single_turn.timestamp, - delay=single_turn.delay, - role=single_turn.role, - ) - ) - conversations.append(conversation) - return conversations diff --git a/aiperf/dataset/loader/random_pool.py b/aiperf/dataset/loader/random_pool.py index 97fa40812..f5d3f8cf2 100644 --- a/aiperf/dataset/loader/random_pool.py +++ b/aiperf/dataset/loader/random_pool.py @@ -1,16 +1,14 @@ # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 -import random -import uuid from collections import defaultdict from pathlib import Path from typing import TypeAlias -from aiperf.common.enums import CustomDatasetType, MediaType +from aiperf.common.config import UserConfig +from aiperf.common.enums import CustomDatasetType from aiperf.common.factories import CustomDatasetFactory -from aiperf.common.models import Conversation, Turn -from aiperf.dataset.loader.mixins import MediaConversionMixin +from aiperf.common.mixins import AIPerfLoggerMixin from aiperf.dataset.loader.models import RandomPool # Type aliases @@ -18,7 +16,7 @@ @CustomDatasetFactory.register(CustomDatasetType.RANDOM_POOL) -class RandomPoolDatasetLoader(MediaConversionMixin): +class RandomPoolDatasetLoader(AIPerfLoggerMixin): """A dataset loader that loads data from a single file or a directory. Each line in the file represents single-turn conversation data, @@ -69,9 +67,10 @@ class RandomPoolDatasetLoader(MediaConversionMixin): and loader will later sample from these two pools to create conversations. """ - def __init__(self, filename: str, num_conversations: int = 1): - self.filename = filename - self.num_conversations = num_conversations + def __init__(self, user_config: UserConfig, **kwargs) -> None: + super().__init__(user_config=user_config, **kwargs) + self.debug("RandomPoolDatasetLoader __init__") + self.filename = user_config.input.file def load_dataset(self) -> dict[Filename, list[RandomPool]]: """Load random pool data from a file or directory. @@ -131,62 +130,3 @@ def _load_dataset_from_dir( data[file_path.name].extend(dataset_pool) return data - - def convert_to_conversations( - self, data: dict[Filename, list[RandomPool]] - ) -> list[Conversation]: - """Convert random pool data to conversation objects. - - Each RandomPool entry becomes a single-turn conversation with a unique session ID. - - Args: - data: A dictionary mapping filename to list of RandomPool objects. - - Returns: - A list of conversations. - """ - conversations = [ - Conversation(session_id=str(uuid.uuid4())) - for _ in range(self.num_conversations) - ] - - # F x N (F: num of files, N: num of conversations) - sampled_dataset: dict[Filename, list[Turn]] = {} - - # Randomly sample (with replacement) from each dataset pool - for filename, dataset_pool in data.items(): - samples = random.choices(dataset_pool, k=self.num_conversations) - turns: list[Turn] = [] - for sample in samples: - media = self.convert_to_media_objects(sample, name=Path(filename).stem) - turns.append( - Turn( - texts=media[MediaType.TEXT], - images=media[MediaType.IMAGE], - audios=media[MediaType.AUDIO], - ) - ) - sampled_dataset[filename] = turns - - # Merge turns for each conversation - for i, batched_turns in enumerate(zip(*sampled_dataset.values(), strict=False)): - turn = self._merge_turns(batched_turns) - conversations[i].turns.append(turn) - - return conversations - - def _merge_turns(self, turns: list[Turn]) -> Turn: - """Merge turns into a single turn. - - Args: - turns: A list of turns. - - Returns: - A single turn. - """ - merged_turn = Turn( - texts=[text for turn in turns for text in turn.texts], - images=[image for turn in turns for image in turn.images], - audios=[audio for turn in turns for audio in turn.audios], - ) - return merged_turn diff --git a/aiperf/dataset/loader/single_turn.py b/aiperf/dataset/loader/single_turn.py index 6ce612f29..2671bbb7b 100644 --- a/aiperf/dataset/loader/single_turn.py +++ b/aiperf/dataset/loader/single_turn.py @@ -4,15 +4,15 @@ import uuid from collections import defaultdict -from aiperf.common.enums import CustomDatasetType, MediaType +from aiperf.common.config import UserConfig +from aiperf.common.enums import CustomDatasetType from aiperf.common.factories import CustomDatasetFactory -from aiperf.common.models import Conversation, Turn -from aiperf.dataset.loader.mixins import MediaConversionMixin +from aiperf.common.mixins import AIPerfLoggerMixin from aiperf.dataset.loader.models import SingleTurn @CustomDatasetFactory.register(CustomDatasetType.SINGLE_TURN) -class SingleTurnDatasetLoader(MediaConversionMixin): +class SingleTurnDatasetLoader(AIPerfLoggerMixin): """A dataset loader that loads single turn data from a file. The single turn type @@ -64,8 +64,10 @@ class SingleTurnDatasetLoader(MediaConversionMixin): ``` """ - def __init__(self, filename: str): - self.filename = filename + def __init__(self, user_config: UserConfig, **kwargs) -> None: + super().__init__(user_config=user_config, **kwargs) + self.debug("SingleTurnDatasetLoader __init__") + self.filename = user_config.input.file def load_dataset(self) -> dict[str, list[SingleTurn]]: """Load single-turn data from a JSONL file. @@ -88,32 +90,3 @@ def load_dataset(self) -> dict[str, list[SingleTurn]]: data[session_id].append(single_turn_data) return data - - def convert_to_conversations( - self, data: dict[str, list[SingleTurn]] - ) -> list[Conversation]: - """Convert single turn data to conversation objects. - - Args: - data: A dictionary mapping session_id to list of SingleTurn objects. - - Returns: - A list of conversations. - """ - conversations = [] - for session_id, single_turns in data.items(): - conversation = Conversation(session_id=session_id) - for single_turn in single_turns: - media = self.convert_to_media_objects(single_turn) - conversation.turns.append( - Turn( - texts=media[MediaType.TEXT], - images=media[MediaType.IMAGE], - audios=media[MediaType.AUDIO], - timestamp=single_turn.timestamp, - delay=single_turn.delay, - role=single_turn.role, - ) - ) - conversations.append(conversation) - return conversations diff --git a/aiperf/dataset/processor.py b/aiperf/dataset/processor.py new file mode 100644 index 000000000..b8a45e055 --- /dev/null +++ b/aiperf/dataset/processor.py @@ -0,0 +1,461 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +import random +import uuid +from pathlib import Path + +from aiperf.common.base_component_service import BaseComponentService +from aiperf.common.config import ServiceConfig, UserConfig +from aiperf.common.enums import ( + CommAddress, + MediaType, + MessageType, + ModelSelectionStrategy, + ServiceType, +) +from aiperf.common.factories import ServiceFactory +from aiperf.common.hooks import on_init, on_pull_message +from aiperf.common.messages import ( + ProcessDatasetMessage, + ProcessDatasetResponseMessage, + ProcessMooncakeTraceDatasetMessage, + ProcessMultiTurnDatasetMessage, + ProcessRandomPoolDatasetMessage, + ProcessSingleTurnDatasetMessage, + ProcessSyntheticDatasetMessage, +) +from aiperf.common.mixins import PullClientMixin +from aiperf.common.models import Audio, Conversation, Image, Text, Turn +from aiperf.common.protocols import PushClientProtocol +from aiperf.common.tokenizer import Tokenizer +from aiperf.dataset import AudioGenerator, ImageGenerator, PromptGenerator, utils +from aiperf.dataset.loader import MediaConversionMixin # TODO: move to common.mixins + + +@ServiceFactory.register(ServiceType.DATASET_PROCESSOR) +class DatasetProcessor(PullClientMixin, BaseComponentService, MediaConversionMixin): + """ + DatasetProcessor is responsible for generating dataset conversations in parallel. + """ + + def __init__( + self, + service_config: ServiceConfig, + user_config: UserConfig, + service_id: str | None = None, + ) -> None: + super().__init__( + service_config=service_config, + user_config=user_config, + service_id=service_id, + pull_client_address=CommAddress.DATASET_JOB, + pull_client_bind=False, + ) + self.debug("Dataset processor __init__") + self.results_push_client: PushClientProtocol = self.comms.create_push_client( + CommAddress.DATASET_RESULT + ) + self.tokenizer: Tokenizer | None = None + self.model_selection_counter: int | None = None + + self.user_config = user_config + self._conversation_config = user_config.input.conversation + self._prompt_config = user_config.input.prompt + self._image_config = user_config.input.image + self._audio_config = user_config.input.audio + self._endpoint_config = user_config.endpoint + + @property + def include_prompt(self) -> bool: + return self._prompt_config.input_tokens.mean > 0 + + @property + def include_image(self) -> bool: + return self._image_config.width.mean > 0 and self._image_config.height.mean > 0 + + @property + def include_audio(self) -> bool: + return self._audio_config.length.mean > 0 + + @property + def prefix_prompt_enabled(self) -> bool: + return self._prompt_config.prefix_prompt.length > 0 + + @on_init + async def _initialize(self) -> None: + """Initialize dataset processor service-specific components.""" + self.debug("Initializing dataset processor service") + tokenizer_name = self.user_config.tokenizer.name + if tokenizer_name is None: + tokenizer_name = self._endpoint_config.model_names[0] + + self.tokenizer = Tokenizer.from_pretrained( + tokenizer_name, + trust_remote_code=self.user_config.tokenizer.trust_remote_code, + revision=self.user_config.tokenizer.revision, + ) + + self.prompt_generator = PromptGenerator(self._prompt_config, self.tokenizer) + self.image_generator = ImageGenerator(self._image_config) + self.audio_generator = AudioGenerator(self._audio_config) + + async def _reset_states(self, message: ProcessDatasetMessage) -> None: + """Reset the states of the dataset processor.""" + if message.random_seed is not None: + random.seed(message.random_seed) + self.debug(lambda: f"Setting random seed to {message.random_seed}") + + self.model_selection_counter = 0 + + @on_pull_message(MessageType.PROCESS_SYNTHETIC_DATASET) + async def _on_process_synthetic_dataset( + self, + message: ProcessSyntheticDatasetMessage, + ) -> None: + """Handle a dataset generation job.""" + # TODO: change to debug log + self.info( + lambda: f"#### ({self.service_id}) Received synthetic dataset generation job to process {message.num_conversations} conversations" + ) + await self._reset_states(message) + + conversations = [] + for _ in range(message.num_conversations): + conversation = await self._create_conversation() + conversations.append(conversation) + + await self.results_push_client.push( + ProcessDatasetResponseMessage( + service_id=self.service_id, + generated_data=conversations, + ) + ) + + async def _create_conversation(self) -> Conversation: + """Create a synthetic conversation from the given configuration. + + It generates a set of conversations with a varying number of turns, + where each turn contains synthetic text, image, and audio payloads. + + Returns: + Conversation: a conversation objects. + """ + conversation = Conversation(session_id=str(uuid.uuid4())) + + num_turns = utils.sample_positive_normal_integer( + self._conversation_config.turn.mean, + self._conversation_config.turn.stddev, + ) + self.debug("Creating conversation with %d turns", num_turns) + + for turn_idx in range(num_turns): + turn = self._create_turn(is_first=(turn_idx == 0)) + conversation.turns.append(turn) + return conversation + + def _create_turn(self, is_first: bool) -> Turn: + """Create a turn object that contains synthetic payloads to send. + + It generates multi-modal data (e.g. text, image, audio) using synthetic + generators and also the delay between turns. + + Args: + is_first: Whether the turn is the first turn in the conversation. + + Returns: + Turn: A dataset representation of a single turn. + """ + turn = Turn() + + if self.include_prompt: + turn.texts.append(self._generate_text_payloads(is_first)) + if self.include_image: + turn.images.append(self._generate_image_payloads()) + if self.include_audio: + turn.audios.append(self._generate_audio_payloads()) + + # Add randomized delays between each turn. Skip if first turn. + if not is_first: + turn.delay = utils.sample_positive_normal_integer( + self._conversation_config.turn.delay.mean, + self._conversation_config.turn.delay.stddev, + ) + + if not turn.texts and not turn.images and not turn.audios: + self.logger.warning( + "There were no synthetic payloads generated. " + "Please enable at least one of prompt, image, or audio by " + "setting the mean to a positive value." + ) + + self._finalize_turn(turn) + + return turn + + def _generate_text_payloads(self, is_first: bool) -> Text: + """Generate synthetic text payloads. + + If the turn is the first turn in the conversation, it could add a prefix prompt + to the prompt. + + Args: + is_first: Whether the turn is the first turn in the conversation. + + Returns: + Text: A text payload object. + """ + text = Text(name="text") + for _ in range(self._prompt_config.batch_size): + prompt = self.prompt_generator.generate( + mean=self._prompt_config.input_tokens.mean, + stddev=self._prompt_config.input_tokens.stddev, + ) + + if self.prefix_prompt_enabled and is_first: + # TODO: Rename + prefix_prompt = self.prompt_generator.get_random_prefix_prompt() + prompt = f"{prefix_prompt} {prompt}" + + text.contents.append(prompt) + return text + + def _generate_image_payloads(self) -> Image: + """ + Generate synthetic images if the image width and height are specified. + + Returns: + Image: An image payload object. + """ + image = Image(name="image_url") + for _ in range(self._image_config.batch_size): + data = self.image_generator.generate() + image.contents.append(data) + return image + + def _generate_audio_payloads(self) -> Audio: + """ + Generate synthetic audios if the audio length is specified. + + Returns: + Audio: An audio payload object. + """ + audio = Audio(name="input_audio") + for _ in range(self._audio_config.batch_size): + data = self.audio_generator.generate() + audio.contents.append(data) + return audio + + def _select_model_name(self) -> str: + """Select a model name based on the model selection strategy. + + Returns: + str: The selected model name. + """ + strategy = self._endpoint_config.model_selection_strategy + if strategy == ModelSelectionStrategy.RANDOM: + return random.choice(self._endpoint_config.model_names) + elif strategy == ModelSelectionStrategy.ROUND_ROBIN: + index = self.model_selection_counter % len( + self._endpoint_config.model_names + ) + model_name = self._endpoint_config.model_names[index] + self.model_selection_counter += 1 + return model_name + + def _set_max_tokens(self, turn: Turn) -> None: + """Set max_tokens for the turn based on the output configuration. + + Args: + turn: The turn object to finalize. + """ + if self._prompt_config.output_tokens.mean is not None: + turn.max_tokens = utils.sample_positive_normal_integer( + mean=self._prompt_config.output_tokens.mean, + stddev=self._prompt_config.output_tokens.stddev, + ) + + def _finalize_turn(self, turn: Turn) -> None: + """Finalize a turn by populating all required metadata fields. + + This method handles: + - Model name selection + - Max tokens sampling based on output configuration + - Any other turn-level metadata that needs to be set + + Args: + turn: The turn object to finalize. + """ + turn.model = self._select_model_name() + self._set_max_tokens(turn) + + @on_pull_message(MessageType.PROCESS_MOONCAKE_TRACE_DATASET) + async def _on_process_mooncake_trace_dataset( + self, + message: ProcessMooncakeTraceDatasetMessage, + ) -> None: + """Handle a mooncake trace dataset generation job.""" + self.debug(lambda: "Received mooncake trace dataset generation job") + await self._reset_states(message) + + conversations = [] + for session_id, traces in message.dataset: + conversation = Conversation(session_id=session_id) + for trace in traces: + prompt = self.prompt_generator.generate( + mean=trace["input_length"], + stddev=0, + hash_ids=trace["hash_ids"], + ) + turn = Turn( + timestamp=trace["timestamp"], + texts=[Text(name="text", contents=[prompt])], + ) + self._finalize_turn(turn) + conversation.turns.append(turn) + conversations.append(conversation) + + await self.results_push_client.push( + ProcessDatasetResponseMessage( + service_id=self.service_id, + generated_data=conversations, + ) + ) + + @on_pull_message(MessageType.PROCESS_MULTI_TURN_DATASET) + async def _on_process_multi_turn_dataset( + self, + message: ProcessMultiTurnDatasetMessage, + ) -> None: + """Handle a multi-turn dataset generation job.""" + self.debug(lambda: "Received multi-turn dataset generation job") + await self._reset_states(message) + + conversations = [] + for session_id, multi_turns in message.dataset: + conversation = Conversation(session_id=session_id) + for multi_turn in multi_turns: + for single_turn in multi_turn.turns: + media = self.convert_to_media_objects(single_turn) + turn = Turn( + texts=media[MediaType.TEXT], + images=media[MediaType.IMAGE], + audios=media[MediaType.AUDIO], + timestamp=single_turn["timestamp"], + delay=single_turn["delay"], + role=single_turn["role"], + ) + self._finalize_turn(turn) + conversation.turns.append(turn) + conversations.append(conversation) + + await self.results_push_client.push( + ProcessDatasetResponseMessage( + service_id=self.service_id, + generated_data=conversations, + ) + ) + + @on_pull_message(MessageType.PROCESS_SINGLE_TURN_DATASET) + async def _on_process_single_turn_dataset( + self, + message: ProcessSingleTurnDatasetMessage, + ) -> None: + """Handle a single-turn dataset generation job.""" + self.debug(lambda: "Received single-turn dataset generation job") + await self._reset_states(message) + + conversations = [] + for session_id, single_turns in message.dataset: + conversation = Conversation(session_id=session_id) + for single_turn in single_turns: + media = self.convert_to_media_objects(single_turn) + turn = Turn( + texts=media[MediaType.TEXT], + images=media[MediaType.IMAGE], + audios=media[MediaType.AUDIO], + timestamp=single_turn["timestamp"], + delay=single_turn["delay"], + role=single_turn["role"], + ) + self._finalize_turn(turn) + conversation.turns.append(turn) + conversations.append(conversation) + + await self.results_push_client.push( + ProcessDatasetResponseMessage( + service_id=self.service_id, + generated_data=conversations, + ) + ) + + @on_pull_message(MessageType.PROCESS_RANDOM_POOL_DATASET) + async def _on_process_random_pool_dataset( + self, + message: ProcessRandomPoolDatasetMessage, + ) -> None: + """Handle a random pool dataset generation job.""" + self.debug(lambda: "Received random pool dataset generation job") + await self._reset_states(message) + + conversations = [ + Conversation(session_id=str(uuid.uuid4())) + for _ in range(message.num_conversations) + ] + + # F x N (F: num of files, N: num of conversations) + sampled_dataset: dict[str, list[Turn]] = {} + + # Randomly sample (with replacement) from each dataset pool + for filename, dataset_pool in message.dataset: + samples = random.choices(dataset_pool, k=message.num_conversations) + turns: list[Turn] = [] + for sample in samples: + media = self.convert_to_media_objects(sample, name=Path(filename).stem) + turns.append( + Turn( + texts=media[MediaType.TEXT], + images=media[MediaType.IMAGE], + audios=media[MediaType.AUDIO], + ) + ) + sampled_dataset[filename] = turns + + # Merge turns for each conversation + for i, batched_turns in enumerate(zip(*sampled_dataset.values(), strict=False)): + turn = self._merge_turns(batched_turns) + self._finalize_turn(turn) + conversations[i].turns.append(turn) + + await self.results_push_client.push( + ProcessDatasetResponseMessage( + service_id=self.service_id, + generated_data=conversations, + ) + ) + + def _merge_turns(self, turns: list[Turn]) -> Turn: + """Merge turns into a single turn. + + Args: + turns: A list of turns. + + Returns: + A single turn. + """ + merged_turn = Turn( + texts=[text for turn in turns for text in turn.texts], + images=[image for turn in turns for image in turn.images], + audios=[audio for turn in turns for audio in turn.audios], + ) + return merged_turn + + +def main() -> None: + from aiperf.common.bootstrap import bootstrap_and_run_service + + bootstrap_and_run_service(DatasetProcessor) + + +if __name__ == "__main__": + main() diff --git a/tests/composers/test_custom_composer.py b/tests/composers/test_custom_composer.py deleted file mode 100644 index 088a79415..000000000 --- a/tests/composers/test_custom_composer.py +++ /dev/null @@ -1,133 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 - -from unittest.mock import Mock, mock_open, patch - -import pytest - -from aiperf.common.enums import CustomDatasetType -from aiperf.common.models import Conversation, Turn -from aiperf.dataset import ( - MooncakeTraceDatasetLoader, - MultiTurnDatasetLoader, - RandomPoolDatasetLoader, - SingleTurnDatasetLoader, -) -from aiperf.dataset.composer.custom import CustomDatasetComposer - - -class TestInitialization: - """Test class for CustomDatasetComposer basic initialization.""" - - def test_initialization(self, custom_config, mock_tokenizer): - """Test that CustomDatasetComposer can be instantiated with valid config.""" - composer = CustomDatasetComposer(custom_config, mock_tokenizer) - - assert composer is not None - assert isinstance(composer, CustomDatasetComposer) - - def test_config_storage(self, custom_config, mock_tokenizer): - """Test that the config is properly stored.""" - composer = CustomDatasetComposer(custom_config, mock_tokenizer) - - input_config = composer.config.input - assert input_config is custom_config.input - assert input_config.file == "test_data.jsonl" - assert input_config.custom_dataset_type == CustomDatasetType.SINGLE_TURN - - -MOCK_TRACE_CONTENT = """{"timestamp": 0, "input_length": 655, "output_length": 52, "hash_ids": [46, 47]} -{"timestamp": 10535, "input_length": 672, "output_length": 52, "hash_ids": [46, 47]} -{"timestamp": 27482, "input_length": 655, "output_length": 52, "hash_ids": [46, 47]} -""" - - -class TestCoreFunctionality: - """Test class for CustomDatasetComposer core functionality.""" - - @pytest.mark.parametrize( - "dataset_type,expected_instance", - [ - (CustomDatasetType.SINGLE_TURN, SingleTurnDatasetLoader), - (CustomDatasetType.MULTI_TURN, MultiTurnDatasetLoader), - (CustomDatasetType.RANDOM_POOL, RandomPoolDatasetLoader), - (CustomDatasetType.MOONCAKE_TRACE, MooncakeTraceDatasetLoader), - ], - ) - def test_create_loader_instance_dataset_types( - self, custom_config, dataset_type, expected_instance, mock_tokenizer - ): - """Test _create_loader_instance with different dataset types.""" - custom_config.input.custom_dataset_type = dataset_type - composer = CustomDatasetComposer(custom_config, mock_tokenizer) - composer._create_loader_instance(dataset_type) - assert isinstance(composer.loader, expected_instance) - - @patch("aiperf.dataset.composer.custom.utils.check_file_exists") - @patch("builtins.open", mock_open(read_data=MOCK_TRACE_CONTENT)) - def test_create_dataset_trace(self, mock_check_file, trace_config, mock_tokenizer): - """Test that create_dataset returns correct type.""" - composer = CustomDatasetComposer(trace_config, mock_tokenizer) - conversations = composer.create_dataset() - - assert len(conversations) == 3 - assert all(isinstance(c, Conversation) for c in conversations) - assert all(isinstance(turn, Turn) for c in conversations for turn in c.turns) - assert all(len(turn.texts) == 1 for c in conversations for turn in c.turns) - - @patch("aiperf.dataset.composer.custom.utils.check_file_exists") - @patch("builtins.open", mock_open(read_data=MOCK_TRACE_CONTENT)) - def test_max_tokens_config(self, mock_check_file, trace_config, mock_tokenizer): - trace_config.input.prompt.output_tokens.mean = 120 - trace_config.input.prompt.output_tokens.stddev = 8.0 - - composer = CustomDatasetComposer(trace_config, mock_tokenizer) - - with patch( - "aiperf.dataset.utils.sample_positive_normal_integer", return_value=20 - ): - conversations = composer.create_dataset() - - assert len(conversations) > 0 - for conversation in conversations: - for turn in conversation.turns: - assert turn.max_tokens == 20 - - @patch("aiperf.dataset.composer.custom.utils.check_file_exists") - @patch("builtins.open", mock_open(read_data=MOCK_TRACE_CONTENT)) - @patch("pathlib.Path.iterdir", return_value=[]) - def test_max_tokens_mooncake( - self, mock_iterdir, mock_check_file, custom_config, mock_tokenizer - ): - """Test that max_tokens can be set from the custom file""" - mock_check_file.return_value = None - custom_config.input.custom_dataset_type = CustomDatasetType.MOONCAKE_TRACE - - composer = CustomDatasetComposer(custom_config, mock_tokenizer) - conversations = composer.create_dataset() - - for conversation in conversations: - for turn in conversation.turns: - assert turn.max_tokens == 52 - - -class TestErrorHandling: - """Test class for CustomDatasetComposer error handling scenarios.""" - - @patch("aiperf.dataset.composer.custom.utils.check_file_exists") - @patch("aiperf.dataset.composer.custom.CustomDatasetFactory.create_instance") - def test_create_dataset_empty_result( - self, mock_factory, mock_check_file, custom_config, mock_tokenizer - ): - """Test create_dataset when loader returns empty data.""" - mock_check_file.return_value = None - mock_loader = Mock() - mock_loader.load_dataset.return_value = {} - mock_loader.convert_to_conversations.return_value = [] - mock_factory.return_value = mock_loader - - composer = CustomDatasetComposer(custom_config, mock_tokenizer) - result = composer.create_dataset() - - assert isinstance(result, list) - assert len(result) == 0 diff --git a/tests/composers/test_synthetic_composer.py b/tests/composers/test_synthetic_composer.py index efa635171..f898ee77a 100644 --- a/tests/composers/test_synthetic_composer.py +++ b/tests/composers/test_synthetic_composer.py @@ -1,601 +1,577 @@ # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 -import random -from unittest.mock import patch -import numpy as np -import pytest - -from aiperf.common.config import ( - AudioConfig, - AudioLengthConfig, - ConversationConfig, - EndpointConfig, - ImageConfig, - ImageHeightConfig, - ImageWidthConfig, - InputConfig, - InputTokensConfig, - PrefixPromptConfig, - PromptConfig, - TurnConfig, - TurnDelayConfig, - UserConfig, -) -from aiperf.common.models import Audio, Conversation, Image, Text, Turn -from aiperf.dataset.composer.synthetic import SyntheticDatasetComposer - - -class TestSyntheticDatasetComposer: - # ============================================================================ - # Initialization Tests - # ============================================================================ - - def test_initialization_basic_config(self, synthetic_config, mock_tokenizer): - """Test that SyntheticDatasetComposer can be instantiated with basic config.""" - composer = SyntheticDatasetComposer(synthetic_config, mock_tokenizer) - - assert composer.config == synthetic_config - assert composer.config.input.conversation.num == 5 - assert composer.prompt_generator is not None - assert composer.include_image is False - assert composer.include_audio is False - - def test_initialization_with_images(self, image_config, mock_tokenizer): - """Test initialization with image generation enabled.""" - composer = SyntheticDatasetComposer(image_config, mock_tokenizer) - - assert composer.config.input.image.width.mean == 10 - assert composer.config.input.image.height.mean == 10 - assert composer.include_image is True - assert composer.include_audio is False - - def test_initialization_with_audio(self, audio_config, mock_tokenizer): - """Test initialization with audio generation enabled.""" - composer = SyntheticDatasetComposer(audio_config, mock_tokenizer) - - assert composer.config.input.audio.length.mean == 2 - assert composer.include_image is False - assert composer.include_audio is True - - def test_initialization_with_multimodal(self, multimodal_config, mock_tokenizer): - """Test initialization with both image and audio enabled.""" - composer = SyntheticDatasetComposer(multimodal_config, mock_tokenizer) - - assert composer.include_image is True - assert composer.include_audio is True - input_config = composer.config.input - assert input_config.image.batch_size == 2 - assert input_config.audio.batch_size == 2 - assert input_config.image.width.mean == 10 - assert input_config.image.height.mean == 10 - assert input_config.audio.length.mean == 2 - - def test_initialization_with_all_zero_mean(self, mock_tokenizer): - """Test initialization with no generators enabled.""" - config = UserConfig( - endpoint=EndpointConfig( - model_names=["test_model"], - ), - input=InputConfig( - conversation=ConversationConfig(num=5), - prompt=PromptConfig(input_tokens=InputTokensConfig(mean=0)), - image=ImageConfig( - width=ImageWidthConfig(mean=0), height=ImageHeightConfig(mean=0) - ), - audio=AudioConfig(length=AudioLengthConfig(mean=0)), - ), - ) - - with pytest.raises(ValueError): - SyntheticDatasetComposer(config, mock_tokenizer) - - # ============================================================================ - # Create Dataset Method Tests - # ============================================================================ - - @patch("aiperf.dataset.composer.synthetic.utils.sample_positive_normal_integer") - def test_create_dataset_basic(self, mock_sample, synthetic_config, mock_tokenizer): - """Test basic dataset creation with text-only conversations.""" - # Mock the number of turns per conversation - mock_sample.return_value = 2 - - composer = SyntheticDatasetComposer(synthetic_config, mock_tokenizer) - conversations = composer.create_dataset() - - # Test create_dataset returns correct number of conversations - assert len(conversations) == 5 # num_conversations - - # Test each conversation has correct structure (session_id, turns) - for conversation in conversations: - assert isinstance(conversation, Conversation) - assert conversation.session_id is not None - assert len(conversation.turns) == 2 # mocked value - - for turn in conversation.turns: - assert isinstance(turn, Turn) - assert len(turn.texts) == 1 # single text field per turn - assert len(turn.texts[0].contents) == 1 # batch_size = 1 - assert len(turn.images) == 0 # no images - assert len(turn.audios) == 0 # no audio - - @patch("aiperf.dataset.composer.synthetic.utils.sample_positive_normal_integer") - def test_create_dataset_with_images( - self, mock_sample, image_config, mock_tokenizer - ): - """Test dataset creation with image generation enabled.""" - mock_sample.return_value = 1 - - composer = SyntheticDatasetComposer(image_config, mock_tokenizer) - conversations = composer.create_dataset() - - # Test conversations include image payloads - assert len(conversations) == 3 - for conversation in conversations: - for turn in conversation.turns: - assert len(turn.texts) == 1 # single text field per turn - assert len(turn.texts[0].contents) == 1 # batch_size = 1 - assert len(turn.images) == 1 # single image field per turn - assert len(turn.images[0].contents) == 1 # batch_size = 1 - assert len(turn.audios) == 0 # no audio - - # Check image properties - image = turn.images[0] - assert isinstance(image, Image) - assert image.name == "image_url" - - @patch("aiperf.dataset.composer.synthetic.utils.sample_positive_normal_integer") - def test_create_dataset_with_audio(self, mock_sample, audio_config, mock_tokenizer): - """Test dataset creation with audio generation enabled.""" - mock_sample.return_value = 1 - - composer = SyntheticDatasetComposer(audio_config, mock_tokenizer) - conversations = composer.create_dataset() - - # Test conversations include audio payloads - assert len(conversations) == 3 - for conversation in conversations: - for turn in conversation.turns: - assert len(turn.texts) == 1 # single text field per turn - assert len(turn.texts[0].contents) == 1 # batch_size = 1 - assert len(turn.images) == 0 # no images - assert len(turn.audios) == 1 # single audio field per turn - assert len(turn.audios[0].contents) == 1 # batch_size = 1 - - # Check audio properties - audio = turn.audios[0] - assert isinstance(audio, Audio) - - @patch("aiperf.dataset.composer.synthetic.utils.sample_positive_normal_integer") - def test_create_dataset_multimodal( - self, mock_sample, multimodal_config, mock_tokenizer - ): - """Test dataset creation with both image and audio enabled.""" - mock_sample.return_value = 1 - - composer = SyntheticDatasetComposer(multimodal_config, mock_tokenizer) - conversations = composer.create_dataset() - - # Test conversations include both image and audio payloads - assert len(conversations) == multimodal_config.input.conversation.num - for conversation in conversations: - for turn in conversation.turns: - # Test correct batch sizes for all modalities - assert len(turn.texts) == 1 # single text field per turn - assert len(turn.texts[0].contents) == 2 # batch_size = 2 - assert len(turn.images) == 1 # single image field per turn - assert len(turn.images[0].contents) == 2 # batch_size = 2 - assert len(turn.audios) == 1 # single audio field per turn - assert len(turn.audios[0].contents) == 2 # batch_size = 2 - - @patch("aiperf.dataset.composer.synthetic.utils.sample_positive_normal_integer") - @patch("aiperf.dataset.generator.prompt.PromptGenerator.get_random_prefix_prompt") - def test_create_dataset_with_prefix_prompts( - self, mock_prefix, mock_sample, prefix_prompt_config, mock_tokenizer - ): - """Test dataset creation with prefix prompts enabled.""" - mock_sample.return_value = 2 # 2 turns per conversation - mock_prefix.return_value = "Prefix prompt:" - - composer = SyntheticDatasetComposer(prefix_prompt_config, mock_tokenizer) - conversations = composer.create_dataset() - - assert len(conversations) == 5 - for conversation in conversations: - # Test first turns include prefix prompts - first_turn = conversation.turns[0] - first_text_content = first_turn.texts[0].contents[0] - assert "Prefix prompt:" in first_text_content - - # Test subsequent turns don't include prefix prompts (if they exist) - if len(conversation.turns) > 1: - subsequent_turn = conversation.turns[1] - subsequent_text_content = subsequent_turn.texts[0].contents[0] - assert "Prefix prompt:" not in subsequent_text_content - - def test_create_dataset_multiple_turns(self, multiturn_config, mock_tokenizer): - """Test dataset creation with multiple turns and delays.""" - composer = SyntheticDatasetComposer(multiturn_config, mock_tokenizer) - conversations = composer.create_dataset() - - # Test conversations have multiple turns - assert len(conversations) == 3 - - for conversation in conversations: - assert len(conversation.turns) == 2 - assert conversation.turns[0].delay is None # first turn has no delay - assert conversation.turns[1].delay == 1500 # subsequent turns have delays - - # ============================================================================ - # Create Turn Method Tests - # ============================================================================ - - def test_create_first_turn(self, synthetic_config, mock_tokenizer): - """Test _create_turn method for first turn in conversation.""" - synthetic_config.input.conversation.turn.delay.mean = 1500 - synthetic_config.input.conversation.turn.delay.stddev = 0 - - composer = SyntheticDatasetComposer(synthetic_config, mock_tokenizer) - - # Test first turn creation - turn = composer._create_turn(is_first=True) - - assert isinstance(turn, Turn) - assert len(turn.texts) == 1 # single text field per turn - assert len(turn.images) == 0 # no images - assert len(turn.audios) == 0 # no audio - assert turn.delay is None # first turn has no delay - - def test_create_turn_subsequent_turn(self, multiturn_config, mock_tokenizer): - """Test _create_turn method for subsequent turns in conversation.""" - composer = SyntheticDatasetComposer(multiturn_config, mock_tokenizer) - - # Test subsequent turn creation - turn = composer._create_turn(is_first=False) - - assert isinstance(turn, Turn) - assert len(turn.texts) == 1 - # Test subsequent turns have delays - assert turn.delay == 1500 - - def test_create_turn_with_all_modalities(self, multimodal_config, mock_tokenizer): - """Test _create_turn method with text, image, and audio.""" - multimodal_config.input.conversation.turn.delay.mean = 1500 - multimodal_config.input.conversation.turn.delay.stddev = 0 - - composer = SyntheticDatasetComposer(multimodal_config, mock_tokenizer) - - turn = composer._create_turn(is_first=True) - - assert isinstance(turn, Turn) - assert len(turn.texts) == 1 # single text field per turn - assert len(turn.texts[0].contents) == 2 # batch_size = 2 - assert len(turn.images) == 1 # single image field per turn - assert len(turn.images[0].contents) == 2 # batch_size = 2 - assert len(turn.audios) == 1 # single audio field per turn - assert len(turn.audios[0].contents) == 2 # batch_size = 2 - assert turn.delay is None # first turn has no delay - - # Test subsequent turn creation - turn = composer._create_turn(is_first=False) - - assert isinstance(turn, Turn) - assert turn.delay == 1500 - - # ============================================================================ - # Generate Payload Methods Tests - # ============================================================================ - - @patch("aiperf.dataset.generator.prompt.PromptGenerator.generate") - def test_generate_text_payloads_basic( - self, mock_generate, synthetic_config, mock_tokenizer - ): - """Test _generate_text_payloads method with basic configuration.""" - mock_generate.return_value = "Generated text content" - - composer = SyntheticDatasetComposer(synthetic_config, mock_tokenizer) - - # Test text payload generation - turn = Turn() - text = composer._generate_text_payloads(is_first=True) - turn.texts.append(text) - - # Test correct number of text payloads based on batch_size - assert len(turn.texts) == 1 # batch_size = 1 - - # Test text content is generated using prompt generator - text_payload = turn.texts[0] - assert isinstance(text_payload, Text) - assert text_payload.name == "text" - assert text_payload.contents == ["Generated text content"] - - @patch("aiperf.dataset.generator.prompt.PromptGenerator.get_random_prefix_prompt") - @patch("aiperf.dataset.generator.prompt.PromptGenerator.generate") - def test_generate_text_payloads_first_turn_with_prefix( - self, mock_generate, mock_prefix, prefix_prompt_config, mock_tokenizer - ): - """Test _generate_text_payloads for first turn with prefix prompts.""" - mock_generate.return_value = "User message" - mock_prefix.return_value = "Prefix prompt:" - - composer = SyntheticDatasetComposer(prefix_prompt_config, mock_tokenizer) - - # Test prefix prompt is added to first turn - turn = Turn() - text = composer._generate_text_payloads(is_first=True) - turn.texts.append(text) - - text_payload = turn.texts[0] - # Test prefix prompt format ("prefix prompt") - assert text_payload.contents == ["Prefix prompt: User message"] - - @patch("aiperf.dataset.generator.prompt.PromptGenerator.generate") - def test_generate_text_payloads_subsequent_turn_no_prefix( - self, mock_generate, prefix_prompt_config, mock_tokenizer - ): - """Test _generate_text_payloads for subsequent turns without prefix prompts.""" - mock_generate.return_value = "User message" - - composer = SyntheticDatasetComposer(prefix_prompt_config, mock_tokenizer) - - # Test no prefix prompt is added to subsequent turns - turn = Turn() - text = composer._generate_text_payloads(is_first=False) - turn.texts.append(text) - - text_payload = turn.texts[0] - assert text_payload.contents == ["User message"] # No prefix - - @patch("aiperf.dataset.generator.prompt.PromptGenerator.generate") - def test_generate_text_payloads_multiple_batch_size( - self, mock_generate, synthetic_config, mock_tokenizer - ): - """Test _generate_text_payloads with batch_size > 1.""" - mock_generate.return_value = "Generated text" - synthetic_config.input.prompt.batch_size = 3 - - composer = SyntheticDatasetComposer(synthetic_config, mock_tokenizer) - - # Test multiple text payloads are generated per turn - turn = Turn() - text = composer._generate_text_payloads(is_first=True) - turn.texts.append(text) - - assert len(turn.texts) == 1 # single text field per turn - assert len(turn.texts[0].contents) == 3 # batch_size = 3 - - # Batched text payloads - text_payload = turn.texts[0] - assert text_payload.contents == [ - "Generated text", - "Generated text", - "Generated text", - ] - - @patch("aiperf.dataset.generator.image.ImageGenerator.generate") - def test_generate_image_payloads(self, mock_generate, image_config, mock_tokenizer): - """Test _generate_image_payloads method.""" - mock_generate.return_value = "fake_image_data" - - composer = SyntheticDatasetComposer(image_config, mock_tokenizer) - - # Test image payload generation - turn = Turn() - image = composer._generate_image_payloads() - turn.images.append(image) - - # Test correct number of image payloads based on batch_size - assert len(turn.images) == 1 # batch_size = 1 - - # Test image content is generated using image generator - image_payload = turn.images[0] - assert isinstance(image_payload, Image) - assert image_payload.name == "image_url" - assert image_payload.contents == ["fake_image_data"] - - @patch("aiperf.dataset.generator.audio.AudioGenerator.generate") - def test_generate_audio_payloads(self, mock_generate, audio_config, mock_tokenizer): - """Test _generate_audio_payloads method.""" - mock_generate.return_value = "fake_audio_data" - - composer = SyntheticDatasetComposer(audio_config, mock_tokenizer) - - # Test audio payload generation - turn = Turn() - audio = composer._generate_audio_payloads() - turn.audios.append(audio) - - # Test correct number of audio payloads based on batch_size - assert len(turn.audios) == 1 # batch_size = 1 - - audio_payload = turn.audios[0] - assert audio_payload.name == "input_audio" - assert audio_payload.contents == ["fake_audio_data"] - - # ============================================================================ - # Configuration Variations Tests - # ============================================================================ - - def test_zero_conversations(self, synthetic_config, mock_tokenizer): - """Test behavior with zero conversations requested.""" - synthetic_config.input.conversation.num = 0 - - composer = SyntheticDatasetComposer(synthetic_config, mock_tokenizer) - conversations = composer.create_dataset() - - assert len(conversations) == 0 - - @patch("aiperf.dataset.composer.synthetic.utils.sample_positive_normal_integer") - def test_edge_case_statistical_parameters(self, mock_sample, mock_tokenizer): - """Test behavior with edge case statistical parameters.""" - mock_sample.return_value = 1 - - config = UserConfig( - endpoint=EndpointConfig( - model_names=["test-model"], - ), - input=InputConfig( - conversation=ConversationConfig(num=2), - prompt=PromptConfig( - mean=1, # Very small mean - stddev=0, # Zero stddev - prefix_prompt=PrefixPromptConfig(pool_size=0), - ), - turn=TurnConfig( - mean=100, # Large mean - stddev=50, # Large stddev - ), - ), - ) - - composer = SyntheticDatasetComposer(config, mock_tokenizer) - conversations = composer.create_dataset() - - # Test with very small/large mean and stddev values - assert len(conversations) == 2 - assert all(len(conv.turns) == 1 for conv in conversations) # mocked return - - @pytest.mark.parametrize("num_conversations", [1, 5, 10, 50]) - def test_different_conversation_counts( - self, synthetic_config, num_conversations, mock_tokenizer - ): - """Test dataset creation with different conversation counts.""" - synthetic_config.input.conversation.num = num_conversations - - composer = SyntheticDatasetComposer(synthetic_config, mock_tokenizer) - conversations = composer.create_dataset() - - # Parametrized test for different num_conversations values - assert len(conversations) == num_conversations - - @pytest.mark.parametrize("batch_size", [1, 2, 5, 10]) - @patch("aiperf.dataset.composer.synthetic.utils.sample_positive_normal_integer") - def test_different_batch_sizes( - self, mock_sample, synthetic_config, batch_size, mock_tokenizer - ): - """Test dataset creation with different batch sizes.""" - mock_sample.return_value = 1 - - synthetic_config.input.prompt.batch_size = batch_size - - composer = SyntheticDatasetComposer(synthetic_config, mock_tokenizer) - conversations = composer.create_dataset() - - # Parametrized test for different batch_size values - turn = conversations[0].turns[0] - assert len(turn.texts) == 1 # single text field per turn - - text_payload = turn.texts[0] - assert len(text_payload.contents) == batch_size - - # ============================================================================ - # Miscellaneous Tests - # ============================================================================ - - def test_missing_required_generators(self, synthetic_config, mock_tokenizer): - """Test behavior when required generators are missing.""" - composer = SyntheticDatasetComposer(synthetic_config, mock_tokenizer) - - # Test error handling when generators are not properly initialized - # Simulate missing tokenizer in generator - composer.prompt_generator = None - - with pytest.raises(AttributeError): - composer.create_dataset() - - def test_reproducibility_with_fixed_seed(self, multimodal_config, mock_tokenizer): - """Test that dataset generation is reproducible with fixed random seed.""" - multimodal_config.input.prompt.input_tokens.stddev = 2 - multimodal_config.input.image.width.stddev = 2 - multimodal_config.input.image.height.stddev = 2 - multimodal_config.input.audio.length.stddev = 2 - multimodal_config.input.conversation.turn = TurnConfig( - mean=2, stddev=2, delay=TurnDelayConfig(mean=1500, stddev=2) - ) - - random.seed(42) - np.random.seed(42) - composer1 = SyntheticDatasetComposer(multimodal_config, mock_tokenizer) - conversations1 = composer1.create_dataset() - - random.seed(42) - np.random.seed(42) - composer2 = SyntheticDatasetComposer(multimodal_config, mock_tokenizer) - conversations2 = composer2.create_dataset() - - # Basic structure should be the same - assert len(conversations1) == len(conversations2) - assert len(conversations1[0].turns) == len(conversations2[0].turns) - - # Both should have generated the same number of conversations and turns - for conv1, conv2 in zip(conversations1, conversations2, strict=True): - assert len(conv1.turns) == len(conv2.turns) - for turn1, turn2 in zip(conv1.turns, conv2.turns, strict=True): - assert len(turn1.texts) == len(turn2.texts) - assert len(turn1.images) == len(turn2.images) - assert len(turn1.audios) == len(turn2.audios) - assert turn1.texts[0].contents == turn2.texts[0].contents - assert turn1.images[0].contents == turn2.images[0].contents - assert turn1.audios[0].contents == turn2.audios[0].contents - assert turn1.delay == turn2.delay - - # ============================================================================ - # Model Selection Strategy Tests - # ============================================================================ - - @patch("random.choice", return_value="test-model-1") - def test_model_selection_random(self, mock_choice, custom_config, mock_tokenizer): - """Test random model selection strategy.""" - - custom_config.endpoint.model_selection_strategy = "random" - composer = SyntheticDatasetComposer(custom_config, mock_tokenizer) - - conversations = composer.create_dataset() - - for conversation in conversations: - for turn in conversation.turns: - assert turn.model == "test-model-1" - - def test_model_selection_round_robin(self, custom_config, mock_tokenizer): - custom_config.endpoint.model_selection_strategy = "round_robin" - custom_config.endpoint.model_names = ["test-model-1", "test-model-2"] - - composer = SyntheticDatasetComposer(custom_config, mock_tokenizer) - conversations = composer.create_dataset() - - # Check that models are selected in round-robin fashion - for i, conversation in enumerate(conversations): - for j, turn in enumerate(conversation.turns): - expected_model = "test-model-1" if (i + j) % 2 == 0 else "test-model-2" - assert turn.model == expected_model - - # ============================================================================ - # Max Token Tests - # ============================================================================ - - def test_max_tokens_integration_with_mean(self, custom_config, mock_tokenizer): - custom_config.input.prompt.output_tokens.mean = 100 - custom_config.input.prompt.output_tokens.stddev = 5.0 - - composer = SyntheticDatasetComposer(custom_config, mock_tokenizer) - - with patch( - "aiperf.dataset.utils.sample_positive_normal_integer", return_value=98 - ): - conversations = composer.create_dataset() - - for conversation in conversations: - for turn in conversation.turns: - assert turn.max_tokens == 98 - - def test_max_tokens_not_set_when_mean_none(self, custom_config, mock_tokenizer): - custom_config.input.prompt.output_tokens.mean = None - custom_config.input.prompt.output_tokens.stddev = None - - composer = SyntheticDatasetComposer(custom_config, mock_tokenizer) - conversations = composer.create_dataset() - - for conversation in conversations: - for turn in conversation.turns: - assert turn.max_tokens is None +# TODO: change to dataset processor tests +# class TestSyntheticDatasetComposer: +# # ============================================================================ +# # Initialization Tests +# # ============================================================================ +# +# def test_initialization_basic_config(self, synthetic_config, mock_tokenizer): +# """Test that SyntheticDatasetComposer can be instantiated with basic config.""" +# composer = SyntheticDatasetComposer(synthetic_config, mock_tokenizer) +# +# assert composer.config == synthetic_config +# assert composer.config.input.conversation.num == 5 +# assert composer.prompt_generator is not None +# assert composer.include_image is False +# assert composer.include_audio is False +# +# def test_initialization_with_images(self, image_config, mock_tokenizer): +# """Test initialization with image generation enabled.""" +# composer = SyntheticDatasetComposer(image_config, mock_tokenizer) +# +# assert composer.config.input.image.width.mean == 10 +# assert composer.config.input.image.height.mean == 10 +# assert composer.include_image is True +# assert composer.include_audio is False +# +# def test_initialization_with_audio(self, audio_config, mock_tokenizer): +# """Test initialization with audio generation enabled.""" +# composer = SyntheticDatasetComposer(audio_config, mock_tokenizer) +# +# assert composer.config.input.audio.length.mean == 2 +# assert composer.include_image is False +# assert composer.include_audio is True +# +# def test_initialization_with_multimodal(self, multimodal_config, mock_tokenizer): +# """Test initialization with both image and audio enabled.""" +# composer = SyntheticDatasetComposer(multimodal_config, mock_tokenizer) +# +# assert composer.include_image is True +# assert composer.include_audio is True +# input_config = composer.config.input +# assert input_config.image.batch_size == 2 +# assert input_config.audio.batch_size == 2 +# assert input_config.image.width.mean == 10 +# assert input_config.image.height.mean == 10 +# assert input_config.audio.length.mean == 2 +# +# def test_initialization_with_all_zero_mean(self, mock_tokenizer): +# """Test initialization with no generators enabled.""" +# config = UserConfig( +# endpoint=EndpointConfig( +# model_names=["test_model"], +# ), +# input=InputConfig( +# conversation=ConversationConfig(num=5), +# prompt=PromptConfig(input_tokens=InputTokensConfig(mean=0)), +# image=ImageConfig( +# width=ImageWidthConfig(mean=0), height=ImageHeightConfig(mean=0) +# ), +# audio=AudioConfig(length=AudioLengthConfig(mean=0)), +# ), +# ) +# +# with pytest.raises(ValueError): +# SyntheticDatasetComposer(config, mock_tokenizer) +# +# # ============================================================================ +# # Create Dataset Method Tests +# # ============================================================================ +# +# @patch("aiperf.dataset.composer.synthetic.utils.sample_positive_normal_integer") +# def test_create_dataset_basic(self, mock_sample, synthetic_config, mock_tokenizer): +# """Test basic dataset creation with text-only conversations.""" +# # Mock the number of turns per conversation +# mock_sample.return_value = 2 +# +# composer = SyntheticDatasetComposer(synthetic_config, mock_tokenizer) +# conversations = composer.create_dataset() +# +# # Test create_dataset returns correct number of conversations +# assert len(conversations) == 5 # num_conversations +# +# # Test each conversation has correct structure (session_id, turns) +# for conversation in conversations: +# assert isinstance(conversation, Conversation) +# assert conversation.session_id is not None +# assert len(conversation.turns) == 2 # mocked value +# +# for turn in conversation.turns: +# assert isinstance(turn, Turn) +# assert len(turn.texts) == 1 # single text field per turn +# assert len(turn.texts[0].contents) == 1 # batch_size = 1 +# assert len(turn.images) == 0 # no images +# assert len(turn.audios) == 0 # no audio +# +# @patch("aiperf.dataset.composer.synthetic.utils.sample_positive_normal_integer") +# def test_create_dataset_with_images( +# self, mock_sample, image_config, mock_tokenizer +# ): +# """Test dataset creation with image generation enabled.""" +# mock_sample.return_value = 1 +# +# composer = SyntheticDatasetComposer(image_config, mock_tokenizer) +# conversations = composer.create_dataset() +# +# # Test conversations include image payloads +# assert len(conversations) == 3 +# for conversation in conversations: +# for turn in conversation.turns: +# assert len(turn.texts) == 1 # single text field per turn +# assert len(turn.texts[0].contents) == 1 # batch_size = 1 +# assert len(turn.images) == 1 # single image field per turn +# assert len(turn.images[0].contents) == 1 # batch_size = 1 +# assert len(turn.audios) == 0 # no audio +# +# # Check image properties +# image = turn.images[0] +# assert isinstance(image, Image) +# assert image.name == "image_url" +# +# @patch("aiperf.dataset.composer.synthetic.utils.sample_positive_normal_integer") +# def test_create_dataset_with_audio(self, mock_sample, audio_config, mock_tokenizer): +# """Test dataset creation with audio generation enabled.""" +# mock_sample.return_value = 1 +# +# composer = SyntheticDatasetComposer(audio_config, mock_tokenizer) +# conversations = composer.create_dataset() +# +# # Test conversations include audio payloads +# assert len(conversations) == 3 +# for conversation in conversations: +# for turn in conversation.turns: +# assert len(turn.texts) == 1 # single text field per turn +# assert len(turn.texts[0].contents) == 1 # batch_size = 1 +# assert len(turn.images) == 0 # no images +# assert len(turn.audios) == 1 # single audio field per turn +# assert len(turn.audios[0].contents) == 1 # batch_size = 1 +# +# # Check audio properties +# audio = turn.audios[0] +# assert isinstance(audio, Audio) +# +# @patch("aiperf.dataset.composer.synthetic.utils.sample_positive_normal_integer") +# def test_create_dataset_multimodal( +# self, mock_sample, multimodal_config, mock_tokenizer +# ): +# """Test dataset creation with both image and audio enabled.""" +# mock_sample.return_value = 1 +# +# composer = SyntheticDatasetComposer(multimodal_config, mock_tokenizer) +# conversations = composer.create_dataset() +# +# # Test conversations include both image and audio payloads +# assert len(conversations) == multimodal_config.input.conversation.num +# for conversation in conversations: +# for turn in conversation.turns: +# # Test correct batch sizes for all modalities +# assert len(turn.texts) == 1 # single text field per turn +# assert len(turn.texts[0].contents) == 2 # batch_size = 2 +# assert len(turn.images) == 1 # single image field per turn +# assert len(turn.images[0].contents) == 2 # batch_size = 2 +# assert len(turn.audios) == 1 # single audio field per turn +# assert len(turn.audios[0].contents) == 2 # batch_size = 2 +# +# @patch("aiperf.dataset.composer.synthetic.utils.sample_positive_normal_integer") +# @patch("aiperf.dataset.generator.prompt.PromptGenerator.get_random_prefix_prompt") +# def test_create_dataset_with_prefix_prompts( +# self, mock_prefix, mock_sample, prefix_prompt_config, mock_tokenizer +# ): +# """Test dataset creation with prefix prompts enabled.""" +# mock_sample.return_value = 2 # 2 turns per conversation +# mock_prefix.return_value = "Prefix prompt:" +# +# composer = SyntheticDatasetComposer(prefix_prompt_config, mock_tokenizer) +# conversations = composer.create_dataset() +# +# assert len(conversations) == 5 +# for conversation in conversations: +# # Test first turns include prefix prompts +# first_turn = conversation.turns[0] +# first_text_content = first_turn.texts[0].contents[0] +# assert "Prefix prompt:" in first_text_content +# +# # Test subsequent turns don't include prefix prompts (if they exist) +# if len(conversation.turns) > 1: +# subsequent_turn = conversation.turns[1] +# subsequent_text_content = subsequent_turn.texts[0].contents[0] +# assert "Prefix prompt:" not in subsequent_text_content +# +# def test_create_dataset_multiple_turns(self, multiturn_config, mock_tokenizer): +# """Test dataset creation with multiple turns and delays.""" +# composer = SyntheticDatasetComposer(multiturn_config, mock_tokenizer) +# conversations = composer.create_dataset() +# +# # Test conversations have multiple turns +# assert len(conversations) == 3 +# +# for conversation in conversations: +# assert len(conversation.turns) == 2 +# assert conversation.turns[0].delay is None # first turn has no delay +# assert conversation.turns[1].delay == 1500 # subsequent turns have delays +# +# # ============================================================================ +# # Create Turn Method Tests +# # ============================================================================ +# +# def test_create_first_turn(self, synthetic_config, mock_tokenizer): +# """Test _create_turn method for first turn in conversation.""" +# synthetic_config.input.conversation.turn.delay.mean = 1500 +# synthetic_config.input.conversation.turn.delay.stddev = 0 +# +# composer = SyntheticDatasetComposer(synthetic_config, mock_tokenizer) +# +# # Test first turn creation +# turn = composer._create_turn(is_first=True) +# +# assert isinstance(turn, Turn) +# assert len(turn.texts) == 1 # single text field per turn +# assert len(turn.images) == 0 # no images +# assert len(turn.audios) == 0 # no audio +# assert turn.delay is None # first turn has no delay +# +# def test_create_turn_subsequent_turn(self, multiturn_config, mock_tokenizer): +# """Test _create_turn method for subsequent turns in conversation.""" +# composer = SyntheticDatasetComposer(multiturn_config, mock_tokenizer) +# +# # Test subsequent turn creation +# turn = composer._create_turn(is_first=False) +# +# assert isinstance(turn, Turn) +# assert len(turn.texts) == 1 +# # Test subsequent turns have delays +# assert turn.delay == 1500 +# +# def test_create_turn_with_all_modalities(self, multimodal_config, mock_tokenizer): +# """Test _create_turn method with text, image, and audio.""" +# multimodal_config.input.conversation.turn.delay.mean = 1500 +# multimodal_config.input.conversation.turn.delay.stddev = 0 +# +# composer = SyntheticDatasetComposer(multimodal_config, mock_tokenizer) +# +# turn = composer._create_turn(is_first=True) +# +# assert isinstance(turn, Turn) +# assert len(turn.texts) == 1 # single text field per turn +# assert len(turn.texts[0].contents) == 2 # batch_size = 2 +# assert len(turn.images) == 1 # single image field per turn +# assert len(turn.images[0].contents) == 2 # batch_size = 2 +# assert len(turn.audios) == 1 # single audio field per turn +# assert len(turn.audios[0].contents) == 2 # batch_size = 2 +# assert turn.delay is None # first turn has no delay +# +# # Test subsequent turn creation +# turn = composer._create_turn(is_first=False) +# +# assert isinstance(turn, Turn) +# assert turn.delay == 1500 +# +# # ============================================================================ +# # Generate Payload Methods Tests +# # ============================================================================ +# +# @patch("aiperf.dataset.generator.prompt.PromptGenerator.generate") +# def test_generate_text_payloads_basic( +# self, mock_generate, synthetic_config, mock_tokenizer +# ): +# """Test _generate_text_payloads method with basic configuration.""" +# mock_generate.return_value = "Generated text content" +# +# composer = SyntheticDatasetComposer(synthetic_config, mock_tokenizer) +# +# # Test text payload generation +# turn = Turn() +# text = composer._generate_text_payloads(is_first=True) +# turn.texts.append(text) +# +# # Test correct number of text payloads based on batch_size +# assert len(turn.texts) == 1 # batch_size = 1 +# +# # Test text content is generated using prompt generator +# text_payload = turn.texts[0] +# assert isinstance(text_payload, Text) +# assert text_payload.name == "text" +# assert text_payload.contents == ["Generated text content"] +# +# @patch("aiperf.dataset.generator.prompt.PromptGenerator.get_random_prefix_prompt") +# @patch("aiperf.dataset.generator.prompt.PromptGenerator.generate") +# def test_generate_text_payloads_first_turn_with_prefix( +# self, mock_generate, mock_prefix, prefix_prompt_config, mock_tokenizer +# ): +# """Test _generate_text_payloads for first turn with prefix prompts.""" +# mock_generate.return_value = "User message" +# mock_prefix.return_value = "Prefix prompt:" +# +# composer = SyntheticDatasetComposer(prefix_prompt_config, mock_tokenizer) +# +# # Test prefix prompt is added to first turn +# turn = Turn() +# text = composer._generate_text_payloads(is_first=True) +# turn.texts.append(text) +# +# text_payload = turn.texts[0] +# # Test prefix prompt format ("prefix prompt") +# assert text_payload.contents == ["Prefix prompt: User message"] +# +# @patch("aiperf.dataset.generator.prompt.PromptGenerator.generate") +# def test_generate_text_payloads_subsequent_turn_no_prefix( +# self, mock_generate, prefix_prompt_config, mock_tokenizer +# ): +# """Test _generate_text_payloads for subsequent turns without prefix prompts.""" +# mock_generate.return_value = "User message" +# +# composer = SyntheticDatasetComposer(prefix_prompt_config, mock_tokenizer) +# +# # Test no prefix prompt is added to subsequent turns +# turn = Turn() +# text = composer._generate_text_payloads(is_first=False) +# turn.texts.append(text) +# +# text_payload = turn.texts[0] +# assert text_payload.contents == ["User message"] # No prefix +# +# @patch("aiperf.dataset.generator.prompt.PromptGenerator.generate") +# def test_generate_text_payloads_multiple_batch_size( +# self, mock_generate, synthetic_config, mock_tokenizer +# ): +# """Test _generate_text_payloads with batch_size > 1.""" +# mock_generate.return_value = "Generated text" +# synthetic_config.input.prompt.batch_size = 3 +# +# composer = SyntheticDatasetComposer(synthetic_config, mock_tokenizer) +# +# # Test multiple text payloads are generated per turn +# turn = Turn() +# text = composer._generate_text_payloads(is_first=True) +# turn.texts.append(text) +# +# assert len(turn.texts) == 1 # single text field per turn +# assert len(turn.texts[0].contents) == 3 # batch_size = 3 +# +# # Batched text payloads +# text_payload = turn.texts[0] +# assert text_payload.contents == [ +# "Generated text", +# "Generated text", +# "Generated text", +# ] +# +# @patch("aiperf.dataset.generator.image.ImageGenerator.generate") +# def test_generate_image_payloads(self, mock_generate, image_config, mock_tokenizer): +# """Test _generate_image_payloads method.""" +# mock_generate.return_value = "fake_image_data" +# +# composer = SyntheticDatasetComposer(image_config, mock_tokenizer) +# +# # Test image payload generation +# turn = Turn() +# image = composer._generate_image_payloads() +# turn.images.append(image) +# +# # Test correct number of image payloads based on batch_size +# assert len(turn.images) == 1 # batch_size = 1 +# +# # Test image content is generated using image generator +# image_payload = turn.images[0] +# assert isinstance(image_payload, Image) +# assert image_payload.name == "image_url" +# assert image_payload.contents == ["fake_image_data"] +# +# @patch("aiperf.dataset.generator.audio.AudioGenerator.generate") +# def test_generate_audio_payloads(self, mock_generate, audio_config, mock_tokenizer): +# """Test _generate_audio_payloads method.""" +# mock_generate.return_value = "fake_audio_data" +# +# composer = SyntheticDatasetComposer(audio_config, mock_tokenizer) +# +# # Test audio payload generation +# turn = Turn() +# audio = composer._generate_audio_payloads() +# turn.audios.append(audio) +# +# # Test correct number of audio payloads based on batch_size +# assert len(turn.audios) == 1 # batch_size = 1 +# +# audio_payload = turn.audios[0] +# assert audio_payload.name == "input_audio" +# assert audio_payload.contents == ["fake_audio_data"] +# +# # ============================================================================ +# # Configuration Variations Tests +# # ============================================================================ +# +# def test_zero_conversations(self, synthetic_config, mock_tokenizer): +# """Test behavior with zero conversations requested.""" +# synthetic_config.input.conversation.num = 0 +# +# composer = SyntheticDatasetComposer(synthetic_config, mock_tokenizer) +# conversations = composer.create_dataset() +# +# assert len(conversations) == 0 +# +# @patch("aiperf.dataset.composer.synthetic.utils.sample_positive_normal_integer") +# def test_edge_case_statistical_parameters(self, mock_sample, mock_tokenizer): +# """Test behavior with edge case statistical parameters.""" +# mock_sample.return_value = 1 +# +# config = UserConfig( +# endpoint=EndpointConfig( +# model_names=["test-model"], +# ), +# input=InputConfig( +# conversation=ConversationConfig(num=2), +# prompt=PromptConfig( +# mean=1, # Very small mean +# stddev=0, # Zero stddev +# prefix_prompt=PrefixPromptConfig(pool_size=0), +# ), +# turn=TurnConfig( +# mean=100, # Large mean +# stddev=50, # Large stddev +# ), +# ), +# ) +# +# composer = SyntheticDatasetComposer(config, mock_tokenizer) +# conversations = composer.create_dataset() +# +# # Test with very small/large mean and stddev values +# assert len(conversations) == 2 +# assert all(len(conv.turns) == 1 for conv in conversations) # mocked return +# +# @pytest.mark.parametrize("num_conversations", [1, 5, 10, 50]) +# def test_different_conversation_counts( +# self, synthetic_config, num_conversations, mock_tokenizer +# ): +# """Test dataset creation with different conversation counts.""" +# synthetic_config.input.conversation.num = num_conversations +# +# composer = SyntheticDatasetComposer(synthetic_config, mock_tokenizer) +# conversations = composer.create_dataset() +# +# # Parametrized test for different num_conversations values +# assert len(conversations) == num_conversations +# +# @pytest.mark.parametrize("batch_size", [1, 2, 5, 10]) +# @patch("aiperf.dataset.composer.synthetic.utils.sample_positive_normal_integer") +# def test_different_batch_sizes( +# self, mock_sample, synthetic_config, batch_size, mock_tokenizer +# ): +# """Test dataset creation with different batch sizes.""" +# mock_sample.return_value = 1 +# +# synthetic_config.input.prompt.batch_size = batch_size +# +# composer = SyntheticDatasetComposer(synthetic_config, mock_tokenizer) +# conversations = composer.create_dataset() +# +# # Parametrized test for different batch_size values +# turn = conversations[0].turns[0] +# assert len(turn.texts) == 1 # single text field per turn +# +# text_payload = turn.texts[0] +# assert len(text_payload.contents) == batch_size +# +# # ============================================================================ +# # Miscellaneous Tests +# # ============================================================================ +# +# def test_missing_required_generators(self, synthetic_config, mock_tokenizer): +# """Test behavior when required generators are missing.""" +# composer = SyntheticDatasetComposer(synthetic_config, mock_tokenizer) +# +# # Test error handling when generators are not properly initialized +# # Simulate missing tokenizer in generator +# composer.prompt_generator = None +# +# with pytest.raises(AttributeError): +# composer.create_dataset() +# +# def test_reproducibility_with_fixed_seed(self, multimodal_config, mock_tokenizer): +# """Test that dataset generation is reproducible with fixed random seed.""" +# multimodal_config.input.prompt.input_tokens.stddev = 2 +# multimodal_config.input.image.width.stddev = 2 +# multimodal_config.input.image.height.stddev = 2 +# multimodal_config.input.audio.length.stddev = 2 +# multimodal_config.input.conversation.turn = TurnConfig( +# mean=2, stddev=2, delay=TurnDelayConfig(mean=1500, stddev=2) +# ) +# +# random.seed(42) +# np.random.seed(42) +# composer1 = SyntheticDatasetComposer(multimodal_config, mock_tokenizer) +# conversations1 = composer1.create_dataset() +# +# random.seed(42) +# np.random.seed(42) +# composer2 = SyntheticDatasetComposer(multimodal_config, mock_tokenizer) +# conversations2 = composer2.create_dataset() +# +# # Basic structure should be the same +# assert len(conversations1) == len(conversations2) +# assert len(conversations1[0].turns) == len(conversations2[0].turns) +# +# # Both should have generated the same number of conversations and turns +# for conv1, conv2 in zip(conversations1, conversations2, strict=True): +# assert len(conv1.turns) == len(conv2.turns) +# for turn1, turn2 in zip(conv1.turns, conv2.turns, strict=True): +# assert len(turn1.texts) == len(turn2.texts) +# assert len(turn1.images) == len(turn2.images) +# assert len(turn1.audios) == len(turn2.audios) +# assert turn1.texts[0].contents == turn2.texts[0].contents +# assert turn1.images[0].contents == turn2.images[0].contents +# assert turn1.audios[0].contents == turn2.audios[0].contents +# assert turn1.delay == turn2.delay +# +# # ============================================================================ +# # Model Selection Strategy Tests +# # ============================================================================ +# +# @patch("random.choice", return_value="test-model-1") +# def test_model_selection_random(self, mock_choice, custom_config, mock_tokenizer): +# """Test random model selection strategy.""" +# +# custom_config.endpoint.model_selection_strategy = "random" +# composer = SyntheticDatasetComposer(custom_config, mock_tokenizer) +# +# conversations = composer.create_dataset() +# +# for conversation in conversations: +# for turn in conversation.turns: +# assert turn.model == "test-model-1" +# +# def test_model_selection_round_robin(self, custom_config, mock_tokenizer): +# custom_config.endpoint.model_selection_strategy = "round_robin" +# custom_config.endpoint.model_names = ["test-model-1", "test-model-2"] +# +# composer = SyntheticDatasetComposer(custom_config, mock_tokenizer) +# conversations = composer.create_dataset() +# +# # Check that models are selected in round-robin fashion +# for i, conversation in enumerate(conversations): +# for j, turn in enumerate(conversation.turns): +# expected_model = "test-model-1" if (i + j) % 2 == 0 else "test-model-2" +# assert turn.model == expected_model +# +# # ============================================================================ +# # Max Token Tests +# # ============================================================================ +# +# def test_max_tokens_integration_with_mean(self, custom_config, mock_tokenizer): +# custom_config.input.prompt.output_tokens.mean = 100 +# custom_config.input.prompt.output_tokens.stddev = 5.0 +# +# composer = SyntheticDatasetComposer(custom_config, mock_tokenizer) +# +# with patch( +# "aiperf.dataset.utils.sample_positive_normal_integer", return_value=98 +# ): +# conversations = composer.create_dataset() +# +# for conversation in conversations: +# for turn in conversation.turns: +# assert turn.max_tokens == 98 +# +# def test_max_tokens_not_set_when_mean_none(self, custom_config, mock_tokenizer): +# custom_config.input.prompt.output_tokens.mean = None +# custom_config.input.prompt.output_tokens.stddev = None +# +# composer = SyntheticDatasetComposer(custom_config, mock_tokenizer) +# conversations = composer.create_dataset() +# +# for conversation in conversations: +# for turn in conversation.turns: +# assert turn.max_tokens is None diff --git a/tests/generators/test_image_generator.py b/tests/generators/test_image_generator.py index 999c2b5d5..a0254a757 100644 --- a/tests/generators/test_image_generator.py +++ b/tests/generators/test_image_generator.py @@ -2,6 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 import base64 +import random from io import BytesIO from unittest.mock import Mock, patch @@ -164,7 +165,11 @@ def test_generate_multiple_calls_different_results( mock_sample_image.return_value = test_image generator = ImageGenerator(base_config) + + random.seed(123) image1 = generator.generate() + + random.seed(234) image2 = generator.generate() assert image1 != image2 diff --git a/tests/services/conftest.py b/tests/services/conftest.py index 05aa5dcee..b7dc8dbdd 100644 --- a/tests/services/conftest.py +++ b/tests/services/conftest.py @@ -3,6 +3,14 @@ import pytest +from aiperf.common.config import ( + ConversationConfig, + EndpointConfig, + InputConfig, + InputTokensConfig, + PromptConfig, + UserConfig, +) from aiperf.common.messages import ( ConversationRequestMessage, DatasetTimingRequest, @@ -10,6 +18,15 @@ from aiperf.common.models import Conversation, Text, Turn from tests.utils.async_test_utils import async_fixture + +@pytest.fixture +def mock_tokenizer(mock_tokenizer_cls): + """Mock tokenizer class.""" + return mock_tokenizer_cls.from_pretrained( + "deepseek-ai/DeepSeek-R1-Distill-Llama-8B" + ) + + # ============================================================================ # Dataset Manager Test Fixtures # ============================================================================ @@ -62,3 +79,23 @@ async def populated_dataset_manager(initialized_service, sample_conversations): manager = await async_fixture(initialized_service) manager.dataset = {conv.session_id: conv for conv in sample_conversations} return manager + + +# ============================================================================ +# Dataset Processor Fixtures +# ============================================================================ + + +@pytest.fixture +def synthetic_user_config() -> UserConfig: + """Basic synthetic configuration for testing.""" + config = UserConfig( + endpoint=EndpointConfig(model_names=["test-model"]), + input=InputConfig( + conversation=ConversationConfig(num=5), + prompt=PromptConfig( + input_tokens=InputTokensConfig(mean=10, stddev=2), + ), + ), + ) + return config diff --git a/tests/services/test_dataset_processor.py b/tests/services/test_dataset_processor.py new file mode 100644 index 000000000..be1827e23 --- /dev/null +++ b/tests/services/test_dataset_processor.py @@ -0,0 +1,84 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +""" +Tests for the dataset processor service. +""" + +from unittest.mock import patch + +import pytest + +from aiperf.common.config import EndpointConfig, ServiceConfig, UserConfig +from aiperf.common.enums import ServiceType +from aiperf.common.messages import ProcessSyntheticDatasetMessage +from aiperf.common.models import Conversation, Turn +from aiperf.dataset import DatasetProcessor +from aiperf.dataset.generator.prompt import PromptGenerator + + +def create_service(user_config: UserConfig | None = None, filename: str | None = None): + """Create a dataset processor service.""" + service_config = ServiceConfig( + service_id="test-service-id", + ) + user_config = user_config or UserConfig( + endpoint=EndpointConfig( + model_names=["test-model"], + ) + ) + user_config.input.file = filename + return DatasetProcessor( + service_config=service_config, + user_config=user_config, + ) + + +@pytest.mark.asyncio +class TestDatasetProcessorService: + """ + Tests for dataset processor service functionalities and basic properties. + + This test class extends BaseTestComponentService to leverage common + component service tests while adding dataset processor specific tests + for service properties and request handling. + """ + + async def test_service_initialization(self): + """Test that the dataset processor initializes properly with service configuration.""" + service = create_service() + + assert service.service_type == ServiceType.DATASET_PROCESSOR + + @patch("aiperf.dataset.processor.utils.sample_positive_normal_integer") + async def test_create_conversations( + self, mock_sample, synthetic_user_config, mock_tokenizer + ): + """Test basic dataset creation with text-only conversations.""" + # Mock the number of turns per conversation + mock_sample.return_value = 2 + + service = create_service(synthetic_user_config) + service.tokenizer = mock_tokenizer + service.prompt_generator = PromptGenerator( + synthetic_user_config.input.prompt, mock_tokenizer + ) + + await service._reset_states( + ProcessSyntheticDatasetMessage( + service_id="test-service-id", + num_conversations=1, + ) + ) + + conversation = await service._create_conversation() + + assert isinstance(conversation, Conversation) + assert conversation.session_id is not None + assert len(conversation.turns) == 2 # mocked value + + for turn in conversation.turns: + assert isinstance(turn, Turn) + assert len(turn.texts) == 1 # single text field per turn + assert len(turn.texts[0].contents) == 1 # batch_size = 1 + assert len(turn.images) == 0 # no images + assert len(turn.audios) == 0 # no audio