diff --git a/docs/concepts/pipeline-wrapper.md b/docs/concepts/pipeline-wrapper.md index 0372d85d..5d9c4c13 100644 --- a/docs/concepts/pipeline-wrapper.md +++ b/docs/concepts/pipeline-wrapper.md @@ -176,6 +176,192 @@ async def run_chat_completion_async(self, model: str, messages: List[dict], body ) ``` +## Streaming from Multiple Components + +!!! info "Smart Streaming Behavior" + By default, Hayhooks streams only the **last** streaming-capable component in your pipeline. This is usually what you want - the final output streaming to users. + +For advanced use cases, you can control which components stream using the `streaming_components` parameter. + +When your pipeline contains multiple components that support streaming (e.g., multiple LLMs), you can control which ones stream their outputs as the pipeline executes. + +### Default Behavior: Stream Only the Last Component + +By default, only the last streaming-capable component will stream: + +```python +class MultiLLMWrapper(BasePipelineWrapper): + def setup(self) -> None: + from haystack.components.builders import ChatPromptBuilder + from haystack.components.generators.chat import OpenAIChatGenerator + from haystack.dataclasses import ChatMessage + + self.pipeline = Pipeline() + + # First LLM - initial answer + self.pipeline.add_component( + "prompt_1", + ChatPromptBuilder( + template=[ + ChatMessage.from_system("You are a helpful assistant."), + ChatMessage.from_user("{{query}}") + ] + ) + ) + self.pipeline.add_component("llm_1", OpenAIChatGenerator(model="gpt-4o-mini")) + + # Second LLM - refines the answer using Jinja2 to access ChatMessage attributes + self.pipeline.add_component( + "prompt_2", + ChatPromptBuilder( + template=[ + ChatMessage.from_system("You are a helpful assistant that refines responses."), + ChatMessage.from_user( + "Previous response: {{previous_response[0].text}}\n\nRefine this." + ) + ] + ) + ) + self.pipeline.add_component("llm_2", OpenAIChatGenerator(model="gpt-4o-mini")) + + # Connect components - LLM 1's replies go directly to prompt_2 + self.pipeline.connect("prompt_1.prompt", "llm_1.messages") + self.pipeline.connect("llm_1.replies", "prompt_2.previous_response") + self.pipeline.connect("prompt_2.prompt", "llm_2.messages") + + def run_chat_completion(self, model: str, messages: List[dict], body: dict) -> Generator: + question = get_last_user_message(messages) + + # By default, only llm_2 (the last streaming component) will stream + return streaming_generator( + pipeline=self.pipeline, + pipeline_run_args={"prompt_1": {"query": question}} + ) +``` + +**What happens:** Only `llm_2` (the last streaming-capable component) streams its responses token by token. The first LLM (`llm_1`) executes normally without streaming, and only the final refined output streams to the user. + +### Advanced: Stream Multiple Components with `streaming_components` + +For advanced use cases where you want to see outputs from multiple components, use the `streaming_components` parameter: + +```python +def run_chat_completion(self, model: str, messages: List[dict], body: dict) -> Generator: + question = get_last_user_message(messages) + + # Enable streaming for BOTH LLMs + return streaming_generator( + pipeline=self.pipeline, + pipeline_run_args={"prompt_1": {"query": question}}, + streaming_components=["llm_1", "llm_2"] # Stream both components + ) +``` + +**What happens:** Both LLMs stream their responses token by token. First you'll see the initial answer from `llm_1` streaming, then the refined answer from `llm_2` streaming. + +You can also selectively enable streaming for specific components: + +```python +# Stream only the first LLM +streaming_components=["llm_1"] + +# Stream only the second LLM (same as default) +streaming_components=["llm_2"] + +# Stream ALL capable components (shorthand) +streaming_components="all" + +# Stream ALL capable components (specific list) +streaming_components=["llm_1", "llm_2"] +``` + +### Using the "all" Keyword + +The `"all"` keyword is a convenient shorthand to enable streaming for all capable components: + +```python +return streaming_generator( + pipeline=self.pipeline, + pipeline_run_args={...}, + streaming_components="all" # Enable all streaming components +) +``` + +This is equivalent to explicitly enabling every streaming-capable component in your pipeline. + +### Global Configuration via Environment Variable + +You can set a global default using the `HAYHOOKS_STREAMING_COMPONENTS` environment variable. This applies to all pipelines unless overridden: + +```bash +# Stream all components by default +export HAYHOOKS_STREAMING_COMPONENTS="all" + +# Stream specific components (comma-separated) +export HAYHOOKS_STREAMING_COMPONENTS="llm_1,llm_2" +``` + +**Priority order:** + +1. Explicit `streaming_components` parameter (highest priority) +2. `HAYHOOKS_STREAMING_COMPONENTS` environment variable +3. Default behavior: stream only last component (lowest priority) + +!!! tip "When to Use Each Approach" + - **Default (last component only)**: Best for most use cases - users see only the final output + - **"all" keyword**: Useful for debugging, demos, or transparent multi-step workflows + - **List of components**: Enable multiple specific components by name + - **Environment variable**: For deployment-wide defaults without code changes + +!!! note "Async Streaming" + All streaming_components options work identically with `async_streaming_generator()` for async pipelines. + +### YAML Pipeline Streaming Configuration + +You can also specify streaming configuration in YAML pipeline definitions: + +```yaml +components: + prompt_1: + type: haystack.components.builders.PromptBuilder + init_parameters: + template: "Answer this question: {{query}}" + llm_1: + type: haystack.components.generators.OpenAIGenerator + prompt_2: + type: haystack.components.builders.PromptBuilder + init_parameters: + template: "Refine this response: {{previous_reply}}" + llm_2: + type: haystack.components.generators.OpenAIGenerator + +connections: + - sender: prompt_1.prompt + receiver: llm_1.prompt + - sender: llm_1.replies + receiver: prompt_2.previous_reply + - sender: prompt_2.prompt + receiver: llm_2.prompt + +inputs: + query: prompt_1.query + +outputs: + replies: llm_2.replies + +# Option 1: List specific components +streaming_components: + - llm_1 + - llm_2 + +# Option 2: Stream all components +# streaming_components: all +``` + +YAML configuration follows the same priority rules: YAML setting > environment variable > default. + +See the [Multi-LLM Streaming Example](https://github.com/deepset-ai/hayhooks/tree/main/examples/pipeline_wrappers/multi_llm_streaming) for a complete working implementation. + ## File Upload Support Hayhooks can handle file uploads by adding a `files` parameter: diff --git a/docs/reference/environment-variables.md b/docs/reference/environment-variables.md index 40eb995f..f665ecda 100644 --- a/docs/reference/environment-variables.md +++ b/docs/reference/environment-variables.md @@ -44,6 +44,32 @@ Hayhooks can be configured via environment variables (loaded with prefix `HAYHOO - Default: `false` - Description: Include tracebacks in error messages (server and MCP) +### HAYHOOKS_STREAMING_COMPONENTS + +- Default: `""` (empty string) +- Description: Global configuration for which pipeline components should stream +- Options: + - `""` (empty): Stream only the last capable component (default) + - `"all"`: Stream all streaming-capable components + - Comma-separated list: `"llm_1,llm_2"` to enable specific components + +!!! note "Priority Order" + Pipeline-specific settings (via `streaming_components` parameter or YAML) override this global default. + +!!! tip "Component-Specific Control" + For component-specific control, use the `streaming_components` parameter in your code or YAML configuration instead of the environment variable to specify exactly which components should stream. + +**Examples:** + +```bash +# Stream all components globally +export HAYHOOKS_STREAMING_COMPONENTS="all" + +# Stream specific components (comma-separated, spaces are trimmed) +export HAYHOOKS_STREAMING_COMPONENTS="llm_1,llm_2" +export HAYHOOKS_STREAMING_COMPONENTS="llm_1, llm_2, llm_3" +``` + ## MCP ### HAYHOOKS_MCP_HOST @@ -154,6 +180,7 @@ HAYHOOKS_ADDITIONAL_PYTHON_PATH=./custom_code HAYHOOKS_USE_HTTPS=false HAYHOOKS_DISABLE_SSL=false HAYHOOKS_SHOW_TRACEBACKS=false +HAYHOOKS_STREAMING_COMPONENTS=all HAYHOOKS_CORS_ALLOW_ORIGINS=["*"] LOG=INFO ``` diff --git a/examples/README.md b/examples/README.md index 2eafe761..c0921929 100644 --- a/examples/README.md +++ b/examples/README.md @@ -6,6 +6,7 @@ This directory contains various examples demonstrating different use cases and f | Example | Description | Key Features | Use Case | |---------|-------------|--------------|----------| +| [multi_llm_streaming](./pipeline_wrappers/multi_llm_streaming/) | Multiple LLM components with automatic streaming | • Two sequential LLMs
• Automatic multi-component streaming
• No special configuration needed
• Shows default streaming behavior | Demonstrating how hayhooks automatically streams from all components in a pipeline | | [async_question_answer](./pipeline_wrappers/async_question_answer/) | Async question-answering pipeline with streaming support | • Async pipeline execution
• Streaming responses
• OpenAI Chat Generator
• Both API and chat completion interfaces | Building conversational AI systems that need async processing and real-time streaming responses | | [chat_with_website](./pipeline_wrappers/chat_with_website/) | Answer questions about website content | • Web content fetching
• HTML to document conversion
• Content-based Q&A
• Configurable URLs | Creating AI assistants that can answer questions about specific websites or web-based documentation | | [chat_with_website_mcp](./pipeline_wrappers/chat_with_website_mcp/) | MCP-compatible website chat pipeline | • MCP (Model Context Protocol) support
• Website content analysis
• API-only interface
• Simplified deployment | Integrating website analysis capabilities into MCP-compatible AI systems and tools | diff --git a/examples/pipeline_wrappers/multi_llm_streaming/README.md b/examples/pipeline_wrappers/multi_llm_streaming/README.md new file mode 100644 index 00000000..ded817a4 --- /dev/null +++ b/examples/pipeline_wrappers/multi_llm_streaming/README.md @@ -0,0 +1,107 @@ +# Multi-LLM Streaming Example + +This example demonstrates hayhooks' configurable multi-component streaming support. + +## Overview + +The pipeline contains **two LLM components in sequence**: + +1. **LLM 1** (`gpt-5-nano` with `reasoning_effort: low`): Provides a short, concise initial answer to the user's question +2. **LLM 2** (`gpt-5-nano` with `reasoning_effort: medium`): Refines and expands the answer into a detailed, professional response + +This example uses `streaming_components` to enable streaming for **both** LLMs. By default, only the last component would stream. + +![Multi-LLM Streaming Example](./multi_stream.gif) + +## How It Works + +### Streaming Configuration + +By default, hayhooks streams only the **last** streaming-capable component (in this case, LLM 2). However, this example demonstrates using the `streaming_components` parameter to enable streaming for both components: + +```python +streaming_generator( + pipeline=self.pipeline, + pipeline_run_args={...}, + streaming_components=["llm_1", "llm_2"] # or streaming_components="all" +) +``` + +**Available options:** + +- **Default behavior** (no `streaming_components` or `None`): Only the last streaming component streams +- **Stream all components**: `streaming_components=["llm_1", "llm_2"]` (same as `streaming_components="all"`) +- **Stream only first**: `streaming_components=["llm_1"]` +- **Stream only last** (same as default): `streaming_components=["llm_2"]` + +### Pipeline Architecture + +The pipeline connects LLM 1's replies directly to the second prompt builder. Using Jinja2 template syntax, the second prompt builder can access the `ChatMessage` attributes directly: `{{previous_response[0].text}}`. This approach is simple and doesn't require any custom extraction components. + +This example also demonstrates injecting a visual separator (`**[LLM 2 - Refining the response]**`) between the two LLM outputs using `StreamingChunk.component_info` to detect component transitions. + +## Usage + +### Deploy with Hayhooks + +```bash +# Set your OpenAI API key +export OPENAI_API_KEY=your_api_key_here + +# Deploy the pipeline +hayhooks deploy examples/pipeline_wrappers/multi_llm_streaming + +# Test it via OpenAI-compatible API +curl -X POST http://localhost:1416/v1/chat/completions \ + -H "Content-Type: application/json" \ + -d '{ + "model": "multi_llm_streaming", + "messages": [{"role": "user", "content": "What is machine learning?"}], + "stream": true + }' +``` + +### Use Directly in Code + +```python +from haystack import Pipeline +from haystack.components.builders import ChatPromptBuilder +from haystack.dataclasses import ChatMessage +from hayhooks import streaming_generator + +# Create your pipeline with multiple streaming components +pipeline = Pipeline() +# ... add LLM 1 and prompt_builder_1 ... + +# Add second prompt builder that accesses ChatMessage attributes via Jinja2 +pipeline.add_component( + "prompt_builder_2", + ChatPromptBuilder( + template=[ + ChatMessage.from_system("You are a helpful assistant."), + ChatMessage.from_user("Previous: {{previous_response[0].text}}\n\nRefine this.") + ] + ) +) +# ... add LLM 2 ... + +# Connect: LLM 1 replies directly to prompt_builder_2 +pipeline.connect("llm_1.replies", "prompt_builder_2.previous_response") + +# Enable streaming for both LLMs (by default, only the last would stream) +for chunk in streaming_generator( + pipeline=pipeline, + pipeline_run_args={"prompt_builder_1": {"query": "Your question"}}, + streaming_components=["llm_1", "llm_2"] # Stream both components +): + print(chunk.content, end="", flush=True) +``` + +## Integration with OpenWebUI + +This pipeline works seamlessly with OpenWebUI: + +1. Configure OpenWebUI to connect to hayhooks (see [OpenWebUI Integration docs](https://deepset-ai.github.io/hayhooks/features/openwebui-integration)) +2. Deploy this pipeline +3. Select it as a model in OpenWebUI +4. Watch both LLMs stream their responses in real-time diff --git a/examples/pipeline_wrappers/multi_llm_streaming/multi_stream.gif b/examples/pipeline_wrappers/multi_llm_streaming/multi_stream.gif new file mode 100644 index 00000000..673d1f6b Binary files /dev/null and b/examples/pipeline_wrappers/multi_llm_streaming/multi_stream.gif differ diff --git a/examples/pipeline_wrappers/multi_llm_streaming/pipeline_wrapper.py b/examples/pipeline_wrappers/multi_llm_streaming/pipeline_wrapper.py new file mode 100644 index 00000000..dd0ff11d --- /dev/null +++ b/examples/pipeline_wrappers/multi_llm_streaming/pipeline_wrapper.py @@ -0,0 +1,138 @@ +from collections.abc import Generator +from typing import Any, List, Union # noqa: UP035 + +from haystack import Pipeline +from haystack.components.builders import ChatPromptBuilder +from haystack.components.generators.chat import OpenAIChatGenerator +from haystack.dataclasses import ChatMessage, StreamingChunk +from haystack.utils import Secret + +from hayhooks import BasePipelineWrapper, get_last_user_message, streaming_generator + + +class PipelineWrapper(BasePipelineWrapper): + """ + A pipeline with two sequential LLM components that can both stream. + + The first LLM (low reasoning) provides a concise answer, and the second LLM + (medium reasoning) refines and expands it with more detail. + + This example demonstrates the streaming_components parameter which controls which + components should stream their responses. + """ + + def setup(self) -> None: + """Initialize the pipeline with two streaming LLM components.""" + self.pipeline = Pipeline() + + # First stage: Initial answer + self.pipeline.add_component( + "prompt_builder_1", + ChatPromptBuilder( + template=[ + ChatMessage.from_system( + "You are a helpful assistant. \nAnswer the user's question in a short and concise manner." + ), + ChatMessage.from_user("{{query}}"), + ], + required_variables="*", + ), + ) + self.pipeline.add_component( + "llm_1", + OpenAIChatGenerator( + api_key=Secret.from_env_var("OPENAI_API_KEY"), + model="gpt-5-nano", + generation_kwargs={ + "reasoning_effort": "low", + }, + ), + ) + + # Second stage: Refinement + # The prompt builder can directly access ChatMessage attributes via Jinja2 + self.pipeline.add_component( + "prompt_builder_2", + ChatPromptBuilder( + template=[ + ChatMessage.from_system("You are a helpful assistant that refines and improves responses."), + ChatMessage.from_user( + "Here is the previous response:\n\n{{previous_response[0].text}}\n\n" + "Please refine and improve this response. " + "Make it a bit more detailed, clear, and professional. " + "Please state that you're refining the response in the beginning of your answer." + ), + ], + required_variables="*", + ), + ) + self.pipeline.add_component( + "llm_2", + OpenAIChatGenerator( + api_key=Secret.from_env_var("OPENAI_API_KEY"), + model="gpt-5-nano", + generation_kwargs={ + "reasoning_effort": "medium", + }, + streaming_callback=None, + ), + ) + + # Connect the components + self.pipeline.connect("prompt_builder_1.prompt", "llm_1.messages") + self.pipeline.connect("llm_1.replies", "prompt_builder_2.previous_response") + self.pipeline.connect("prompt_builder_2.prompt", "llm_2.messages") + + def run_api(self, query: str) -> dict[str, Any]: + """Run the pipeline in non-streaming mode.""" + result = self.pipeline.run( + { + "prompt_builder_1": {"query": query}, + } + ) + return {"reply": result["llm_2"]["replies"][0].text if result["llm_2"]["replies"] else ""} + + def run_chat_completion(self, model: str, messages: List[dict], body: dict) -> Union[str, Generator]: # noqa: ARG002, UP006 + """ + Run the pipeline in streaming mode. + + This demonstrates the streaming_components parameter which controls which components stream. + By default (streaming_components=None), only the last streaming-capable component (llm_2) streams. + To enable streaming for both LLMs, use: streaming_components=["llm_1", "llm_2"] + + We inject a visual separator between LLM 1 and LLM 2 outputs. + """ + question = get_last_user_message(messages) + + def custom_streaming(): + """ + Enhanced streaming that injects a visual separator between LLM outputs. + + Uses StreamingChunk.component_info.name to reliably detect which component + is streaming, avoiding fragile chunk counting or heuristics. + + NOTE: This is simply a workaround to inject a visual separator between LLM outputs. + """ + llm2_started = False + + # Enable streaming for both LLM components + # To stream only the last component (default), omit streaming_components or set to None + for chunk in streaming_generator( + pipeline=self.pipeline, + pipeline_run_args={ + "prompt_builder_1": {"query": question}, + }, + streaming_components=["llm_1", "llm_2"], # Or use streaming_components="all" + ): + # Use component_info to detect which LLM is streaming + if hasattr(chunk, "component_info") and chunk.component_info: + component_name = chunk.component_info.name + + # When we see llm_2 for the first time, inject a visual separator + if component_name == "llm_2" and not llm2_started: + llm2_started = True + yield StreamingChunk(content="\n\n**[LLM 2 - Refining the response]**\n\n") + + yield chunk + + return custom_streaming() diff --git a/src/hayhooks/server/pipelines/utils.py b/src/hayhooks/server/pipelines/utils.py index 9d84792a..f1f40520 100644 --- a/src/hayhooks/server/pipelines/utils.py +++ b/src/hayhooks/server/pipelines/utils.py @@ -2,7 +2,7 @@ import threading from collections.abc import AsyncGenerator, Generator from queue import Queue -from typing import Any, Callable, Optional, Union +from typing import Any, Callable, Literal, Optional, Union from haystack import AsyncPipeline, Pipeline from haystack.components.agents import Agent @@ -12,6 +12,7 @@ from hayhooks.open_webui import OpenWebUIEvent from hayhooks.server.logger import log from hayhooks.server.routers.openai import Message +from hayhooks.settings import settings ToolCallbackReturn = Union[OpenWebUIEvent, str, None, list[Union[OpenWebUIEvent, str]]] OnToolCallStart = Optional[Callable[[str, Optional[str], Optional[str]], ToolCallbackReturn]] @@ -40,52 +41,124 @@ def get_last_user_message(messages: list[Union[Message, dict]]) -> Union[str, No return None -def find_streaming_component(pipeline: Union[Pipeline, AsyncPipeline]) -> tuple[Component, str]: +def find_all_streaming_components(pipeline: Union[Pipeline, AsyncPipeline]) -> list[tuple[Component, str]]: """ - Finds the component in the pipeline that supports streaming_callback + Finds all components in the pipeline that support streaming_callback. Returns: - The first component that supports streaming + A list of tuples containing (component, component_name) for all streaming components """ - streaming_component = None - streaming_component_name = "" + streaming_components = [] for name, component in pipeline.walk(): if hasattr(component, "streaming_callback"): log.trace(f"Streaming component found in '{name}' with type {type(component)}") - streaming_component = component - streaming_component_name = name - if not streaming_component: - msg = "No streaming-capable component found in the pipeline" + streaming_components.append((component, name)) + + if not streaming_components: + msg = "No streaming-capable components found in the pipeline" raise ValueError(msg) - return streaming_component, streaming_component_name + return streaming_components + + +def _parse_streaming_components_setting(setting_value: str) -> Union[list[str], Literal["all"], None]: + """ + Parse the HAYHOOKS_STREAMING_COMPONENTS environment variable. + + Args: + setting_value: The raw setting value from environment variable + + Returns: + - None if empty string (use default behavior) + - "all" if the value is "all" + - list[str] if it's a comma-separated list of component names + """ + if not setting_value or setting_value.strip() == "": + return None + + setting_value = setting_value.strip() + + # Check for "all" keyword + if setting_value.lower() == "all": + return "all" + + # Parse as comma-separated list + components = [c.strip() for c in setting_value.split(",") if c.strip()] + if components: + return components + + return None def _setup_streaming_callback_for_pipeline( - pipeline: Union[Pipeline, AsyncPipeline], pipeline_run_args: dict[str, Any], streaming_callback: Any + pipeline: Union[Pipeline, AsyncPipeline], + pipeline_run_args: dict[str, Any], + streaming_callback: Any, + streaming_components: Optional[Union[list[str], Literal["all"]]] = None, ) -> dict[str, Any]: """ - Sets up streaming callback for pipeline components. + Sets up streaming callbacks for streaming-capable components in the pipeline. + + By default, only the last streaming-capable component will stream. You can customize this + behavior using the streaming_components parameter or HAYHOOKS_STREAMING_COMPONENTS env var. Args: pipeline: The pipeline to configure pipeline_run_args: Arguments for pipeline execution streaming_callback: The callback function to set + streaming_components: Optional config for which components should stream. + Can be: + - None: use HAYHOOKS_STREAMING_COMPONENTS or default (last only) + - "all": stream all capable components + - list[str]: ["llm_1", "llm_2"] to enable specific components Returns: Updated pipeline run arguments """ - _, streaming_component_name = find_streaming_component(pipeline) - - # Ensure component args exist in pipeline run args - if streaming_component_name not in pipeline_run_args: - pipeline_run_args[streaming_component_name] = {} - - # Set the streaming callback on the component - streaming_component = pipeline.get_component(streaming_component_name) - assert hasattr(streaming_component, "streaming_callback") - streaming_component.streaming_callback = streaming_callback + all_streaming_components = find_all_streaming_components(pipeline) + + # If streaming_components not provided, check environment variable + if streaming_components is None: + streaming_components = _parse_streaming_components_setting(settings.streaming_components) + + # Determine which components should stream + components_to_stream = [] + + # Stream all capable components + if streaming_components == "all": + components_to_stream = all_streaming_components + log.trace("Streaming enabled for all components via 'all' keyword") + + # Default behavior: stream only the last capable component + elif streaming_components is None: + if all_streaming_components: + components_to_stream = [all_streaming_components[-1]] + log.trace(f"Streaming enabled for last component only: {all_streaming_components[-1][1]}") + + # Use explicit list of component names + elif isinstance(streaming_components, list): + enabled_component_names = set(streaming_components) + for component, component_name in all_streaming_components: + if component_name in enabled_component_names: + components_to_stream.append((component, component_name)) + log.trace(f"Streaming enabled for components: {[name for _, name in components_to_stream]}") + + for _, component_name in components_to_stream: + # Pass the streaming callback as a parameter instead of mutating the component + # This ensures thread-safety for concurrent requests + streaming_component = pipeline.get_component(component_name) + assert hasattr(streaming_component, "streaming_callback") + + # Ensure component args exist and make a copy to avoid mutating original + if component_name not in pipeline_run_args: + pipeline_run_args[component_name] = {} + else: + # Create a copy of the existing component args to avoid modifying the original + pipeline_run_args[component_name] = pipeline_run_args[component_name].copy() + + pipeline_run_args[component_name]["streaming_callback"] = streaming_callback + log.trace(f"Streaming callback set for component '{component_name}'") return pipeline_run_args @@ -106,7 +179,10 @@ def _setup_streaming_callback_for_agent(pipeline_run_args: dict[str, Any], strea def _setup_streaming_callback( - pipeline: Union[Pipeline, AsyncPipeline, Agent], pipeline_run_args: dict[str, Any], streaming_callback: Any + pipeline: Union[Pipeline, AsyncPipeline, Agent], + pipeline_run_args: dict[str, Any], + streaming_callback: Any, + streaming_components: Optional[Union[list[str], Literal["all"]]] = None, ) -> dict[str, Any]: """ Configures streaming callback for the given pipeline or agent. @@ -115,6 +191,7 @@ def _setup_streaming_callback( pipeline: The pipeline or agent to configure pipeline_run_args: Execution arguments streaming_callback: The callback function + streaming_components: Optional config - list[str], "all", or None (pipelines only) Returns: Updated pipeline run arguments @@ -122,12 +199,96 @@ def _setup_streaming_callback( pipeline_run_args = pipeline_run_args.copy() if isinstance(pipeline, (Pipeline, AsyncPipeline)): - return _setup_streaming_callback_for_pipeline(pipeline, pipeline_run_args, streaming_callback) - elif isinstance(pipeline, Agent): + return _setup_streaming_callback_for_pipeline( + pipeline, pipeline_run_args, streaming_callback, streaming_components + ) + + if isinstance(pipeline, Agent): return _setup_streaming_callback_for_agent(pipeline_run_args, streaming_callback) - else: - msg = f"Unsupported pipeline type: {type(pipeline)}" - raise ValueError(msg) + + msg = f"Unsupported pipeline type: {type(pipeline)}" + raise ValueError(msg) + + +def _yield_callback_results(result: ToolCallbackReturn) -> Generator[Union[OpenWebUIEvent, str], None, None]: + """ + Yields callback results, handling both single values and lists. + + Args: + result: The callback result to yield (can be None, single value, or list) + + Yields: + OpenWebUIEvent or str: The callback results + """ + if result: + if isinstance(result, list): + yield from result + else: + yield result + + +def _process_tool_call_start( + chunk: StreamingChunk, on_tool_call_start: OnToolCallStart +) -> Generator[Union[OpenWebUIEvent, str], None, None]: + """ + Process tool call start events from a streaming chunk. + + Args: + chunk: The streaming chunk that may contain tool calls + on_tool_call_start: Callback function for tool call start + + Yields: + OpenWebUIEvent or str: Results from the callback + """ + if on_tool_call_start and hasattr(chunk, "tool_calls") and chunk.tool_calls: + for tool_call in chunk.tool_calls: + if tool_call.tool_name: + result = on_tool_call_start(tool_call.tool_name, tool_call.arguments, tool_call.id) + yield from _yield_callback_results(result) + + +def _process_tool_call_end( + chunk: StreamingChunk, on_tool_call_end: OnToolCallEnd +) -> Generator[Union[OpenWebUIEvent, str], None, None]: + """ + Process tool call end events from a streaming chunk. + + Args: + chunk: The streaming chunk that may contain tool call results + on_tool_call_end: Callback function for tool call end + + Yields: + OpenWebUIEvent or str: Results from the callback + """ + if on_tool_call_end and hasattr(chunk, "tool_call_result") and chunk.tool_call_result: + result = on_tool_call_end( + chunk.tool_call_result.origin.tool_name, + chunk.tool_call_result.origin.arguments, + chunk.tool_call_result.result, + bool(chunk.tool_call_result.error), + ) + yield from _yield_callback_results(result) + + +def _process_pipeline_end(result: dict[str, Any], on_pipeline_end: OnPipelineEnd) -> Optional[StreamingChunk]: + """ + Process pipeline end callback and return a StreamingChunk if there's content. + + Args: + result: The pipeline execution result + on_pipeline_end: Optional callback function for pipeline end + + Returns: + StreamingChunk with content from callback, or None + """ + if on_pipeline_end: + try: + on_pipeline_end_result = on_pipeline_end(result) + if on_pipeline_end_result: + return StreamingChunk(content=on_pipeline_end_result) + except Exception as e: + log.error(f"Error in on_pipeline_end callback: {e}", exc_info=True) + return None def _execute_pipeline_sync( @@ -146,18 +307,20 @@ def _execute_pipeline_sync( return pipeline.run(data=pipeline_run_args) -def streaming_generator( # noqa: C901, PLR0912 +def streaming_generator( # noqa: PLR0913 pipeline: Union[Pipeline, AsyncPipeline, Agent], *, pipeline_run_args: Optional[dict[str, Any]] = None, on_tool_call_start: OnToolCallStart = None, on_tool_call_end: OnToolCallEnd = None, on_pipeline_end: OnPipelineEnd = None, + streaming_components: Optional[Union[list[str], Literal["all"]]] = None, ) -> Generator[Union[StreamingChunk, OpenWebUIEvent, str], None, None]: """ Creates a generator that yields streaming chunks from a pipeline or agent execution. - Automatically finds the streaming-capable component in pipelines or uses the agent's streaming callback. + By default, only the last streaming-capable component in pipelines will stream. + You can control which components stream using streaming_components or HAYHOOKS_STREAMING_COMPONENTS. Args: pipeline: The Pipeline, AsyncPipeline, or Agent to execute @@ -165,14 +328,20 @@ def streaming_generator( # noqa: C901, PLR0912 on_tool_call_start: Callback for tool call start on_tool_call_end: Callback for tool call end on_pipeline_end: Callback for pipeline end + streaming_components: Optional config for which components should stream. + Can be: + - None: use HAYHOOKS_STREAMING_COMPONENTS or default (last only) + - "all": stream all capable components + - list[str]: ["llm_1", "llm_2"] to enable specific components Yields: StreamingChunk: Individual chunks from the streaming execution OpenWebUIEvent: Event for tool call str: Tool name or stream content - NOTE: This generator works with sync/async pipelines and agents, but pipeline components - which support streaming must have a _sync_ `streaming_callback`. + NOTE: This generator works with sync/async pipelines and agents. Pipeline components + which support streaming must have a _sync_ `streaming_callback`. By default, + only the last streaming-capable component will stream. """ if pipeline_run_args is None: pipeline_run_args = {} @@ -182,22 +351,16 @@ def streaming_callback(chunk: StreamingChunk) -> None: queue.put(chunk) # Configure streaming callback - configured_args = _setup_streaming_callback(pipeline, pipeline_run_args, streaming_callback) + configured_args = _setup_streaming_callback(pipeline, pipeline_run_args, streaming_callback, streaming_components) log.trace(f"Streaming pipeline run args: {configured_args}") def run_pipeline() -> None: try: result = _execute_pipeline_sync(pipeline, configured_args) - # Call on_pipeline_end if provided - if on_pipeline_end: - try: - on_pipeline_end_result = on_pipeline_end(result) - # Send final chunk if on_pipeline_end returned content - if on_pipeline_end_result: - queue.put(StreamingChunk(content=on_pipeline_end_result)) - except Exception as e: - # We don't put the error into the queue to avoid breaking the stream - log.error(f"Error in on_pipeline_end callback: {e}", exc_info=True) + # Process pipeline end callback + final_chunk = _process_pipeline_end(result, on_pipeline_end) + if final_chunk: + queue.put(final_chunk) # Signal completion queue.put(None) except Exception as e: @@ -216,30 +379,8 @@ def run_pipeline() -> None: break # Handle tool calls - if on_tool_call_start and hasattr(item, "tool_calls") and item.tool_calls: - for tool_call in item.tool_calls: - if tool_call.tool_name: - res = on_tool_call_start(tool_call.tool_name, tool_call.arguments, tool_call.id) - if res: - if isinstance(res, list): - for r in res: - yield r - else: - yield res - - if on_tool_call_end and hasattr(item, "tool_call_result") and item.tool_call_result: - res = on_tool_call_end( - item.tool_call_result.origin.tool_name, - item.tool_call_result.origin.arguments, - item.tool_call_result.result, - bool(item.tool_call_result.error), - ) - if res: - if isinstance(res, list): - for r in res: - yield r - else: - yield res + yield from _process_tool_call_start(item, on_tool_call_start) + yield from _process_tool_call_end(item, on_tool_call_end) yield item finally: thread.join() @@ -247,26 +388,27 @@ def run_pipeline() -> None: def _validate_async_streaming_support(pipeline: Union[Pipeline, AsyncPipeline]) -> None: """ - Validates that the pipeline supports async streaming callbacks. + Validates that all streaming components in the pipeline support async streaming callbacks. Args: pipeline: The pipeline to validate Raises: - ValueError: If the pipeline doesn't support async streaming - """ - streaming_component, streaming_component_name = find_streaming_component(pipeline) - - # Check if the streaming component supports async streaming callbacks - # We check for run_async method as an indicator of async support - if not hasattr(streaming_component, "run_async"): - component_type = type(streaming_component).__name__ - msg = ( - f"Component '{streaming_component_name}' of type '{component_type}' seems to not support async streaming " - "callbacks. Use the sync 'streaming_generator' function instead, or switch to a component that supports " - "async streaming callbacks (e.g., OpenAIChatGenerator instead of OpenAIGenerator)." - ) - raise ValueError(msg) + ValueError: If any streaming component doesn't support async streaming + """ + streaming_components = find_all_streaming_components(pipeline) + + for streaming_component, streaming_component_name in streaming_components: + # Check if the streaming component supports async streaming callbacks + # We check for run_async method as an indicator of async support + if not hasattr(streaming_component, "run_async"): + component_type = type(streaming_component).__name__ + msg = ( + f"Component '{streaming_component_name}' of type '{component_type}' seems to not support async " + "streaming callbacks. Use the sync 'streaming_generator' function instead, or switch to a component " + "that supports async streaming callbacks (e.g., OpenAIChatGenerator instead of OpenAIGenerator)." + ) + raise ValueError(msg) async def _execute_pipeline_async( @@ -341,18 +483,20 @@ async def _cleanup_pipeline_task(pipeline_task: asyncio.Task) -> None: raise e -async def async_streaming_generator( # noqa: C901, PLR0912 +async def async_streaming_generator( # noqa: PLR0913 pipeline: Union[Pipeline, AsyncPipeline, Agent], *, pipeline_run_args: Optional[dict[str, Any]] = None, on_tool_call_start: OnToolCallStart = None, on_tool_call_end: OnToolCallEnd = None, on_pipeline_end: OnPipelineEnd = None, + streaming_components: Optional[Union[list[str], Literal["all"]]] = None, ) -> AsyncGenerator[Union[StreamingChunk, OpenWebUIEvent, str], None]: """ Creates an async generator that yields streaming chunks from a pipeline or agent execution. - Automatically finds the streaming-capable component in pipelines or uses the agent's streaming callback. + By default, only the last streaming-capable component in pipelines will stream. + You can control which components stream using streaming_components or HAYHOOKS_STREAMING_COMPONENTS. Args: pipeline: The Pipeline, AsyncPipeline, or Agent to execute @@ -360,14 +504,20 @@ async def async_streaming_generator( # noqa: C901, PLR0912 on_tool_call_start: Callback for tool call start on_tool_call_end: Callback for tool call end on_pipeline_end: Callback for pipeline end + streaming_components: Optional config for which components should stream. + Can be: + - None: use HAYHOOKS_STREAMING_COMPONENTS or default (last only) + - "all": stream all capable components + - list[str]: ["llm_1", "llm_2"] to enable specific components Yields: StreamingChunk: Individual chunks from the streaming execution OpenWebUIEvent: Event for tool call str: Tool name or stream content - NOTE: This generator works with sync/async pipelines and agents. For pipelines, the streaming component + NOTE: This generator works with sync/async pipelines and agents. For pipelines, the streaming components must support an _async_ `streaming_callback`. Agents have built-in async streaming support. + By default, only the last streaming-capable component will stream. """ # Validate async streaming support for pipelines (not needed for agents) if pipeline_run_args is None: @@ -382,7 +532,7 @@ async def streaming_callback(chunk: StreamingChunk) -> None: await queue.put(chunk) # Configure streaming callback - configured_args = _setup_streaming_callback(pipeline, pipeline_run_args, streaming_callback) + configured_args = _setup_streaming_callback(pipeline, pipeline_run_args, streaming_callback, streaming_components) # Start pipeline execution pipeline_task = await _execute_pipeline_async(pipeline, configured_args) @@ -390,40 +540,17 @@ async def streaming_callback(chunk: StreamingChunk) -> None: try: async for chunk in _stream_chunks_from_queue(queue, pipeline_task): # Handle tool calls - if on_tool_call_start and hasattr(chunk, "tool_calls") and chunk.tool_calls: - for tool_call in chunk.tool_calls: - if tool_call.tool_name: - res = on_tool_call_start(tool_call.tool_name, tool_call.arguments, tool_call.id) - if res: - if isinstance(res, list): - for r in res: - yield r - else: - yield res - - if on_tool_call_end and hasattr(chunk, "tool_call_result") and chunk.tool_call_result: - res = on_tool_call_end( - chunk.tool_call_result.origin.tool_name, - chunk.tool_call_result.origin.arguments, - chunk.tool_call_result.result, - bool(chunk.tool_call_result.error), - ) - if res: - if isinstance(res, list): - for r in res: - yield r - else: - yield res + for result in _process_tool_call_start(chunk, on_tool_call_start): + yield result + for result in _process_tool_call_end(chunk, on_tool_call_end): + yield result yield chunk await pipeline_task - if on_pipeline_end: - try: - on_pipeline_end_result = on_pipeline_end(pipeline_task.result()) - if on_pipeline_end_result: - yield StreamingChunk(content=on_pipeline_end_result) - except Exception as e: - log.error(f"Error in on_pipeline_end callback: {e}", exc_info=True) + # Process pipeline end callback + final_chunk = _process_pipeline_end(pipeline_task.result(), on_pipeline_end) + if final_chunk: + yield final_chunk except Exception as e: log.error(f"Unexpected error in async streaming generator: {e}") diff --git a/src/hayhooks/server/utils/deploy_utils.py b/src/hayhooks/server/utils/deploy_utils.py index c0946410..2f766773 100644 --- a/src/hayhooks/server/utils/deploy_utils.py +++ b/src/hayhooks/server/utils/deploy_utils.py @@ -531,11 +531,19 @@ def add_yaml_pipeline_to_registry( clog.error(f"Failed creating request/response models for YAML pipeline '{pipeline_name}': {e!s}") raise + # Extract streaming components configuration if present + from hayhooks.server.utils.yaml_utils import get_streaming_components_from_yaml + + streaming_components = get_streaming_components_from_yaml(source_code) + if streaming_components: + clog.debug(f"Found streaming_components in YAML: {streaming_components}") + metadata = { "description": description or pipeline_name, "request_model": request_model, "response_model": response_model, "skip_mcp": bool(skip_mcp), + "streaming_components": streaming_components, } clog.debug(f"Adding YAML pipeline to registry with metadata: {metadata}") diff --git a/src/hayhooks/server/utils/yaml_utils.py b/src/hayhooks/server/utils/yaml_utils.py index da656e16..2c7b0ddc 100644 --- a/src/hayhooks/server/utils/yaml_utils.py +++ b/src/hayhooks/server/utils/yaml_utils.py @@ -151,3 +151,36 @@ def get_inputs_outputs_from_yaml(yaml_source_code: str) -> ResolvedIO: output_resolutions = _resolve_declared_outputs(declared_outputs, pipeline_outputs) return {"inputs": input_resolutions, "outputs": output_resolutions} + + +def get_streaming_components_from_yaml(yaml_source_code: str) -> Union[list[str], str, None]: + """ + Extract streaming components configuration from a Haystack pipeline YAML. + + The streaming_components field is optional and specifies which components should stream. + By default (when not specified), only the last streaming-capable component will stream. + + Args: + yaml_source_code: Pipeline YAML source code. + + Returns: + - None if not specified (use default behavior) + - "all" if streaming_components is set to "all" + - list[str] of component names that should stream + Example: ["llm_1", "llm_2"] + """ + yaml_dict = yaml.safe_load(yaml_source_code) or {} + streaming_components = yaml_dict.get("streaming_components") + + if streaming_components is None: + return None + + # Support "all" keyword + if isinstance(streaming_components, str) and streaming_components.lower() == "all": + return "all" + + if not isinstance(streaming_components, list): + return None + + # Ensure all items are strings + return [str(item) for item in streaming_components if item] diff --git a/src/hayhooks/settings.py b/src/hayhooks/settings.py index 4ee70e63..92effc0b 100644 --- a/src/hayhooks/settings.py +++ b/src/hayhooks/settings.py @@ -50,6 +50,13 @@ class AppSettings(BaseSettings): # Show tracebacks on errors during pipeline execution and deployment show_tracebacks: bool = False + # Default streaming components configuration + # Can be: + # - Empty string (default): enable stream only for the LAST capable component + # - "all": enable stream for ALL capable components + # - Comma-separated list: "llm_1,llm_2" to enable stream for specific components + streaming_components: str = "" + # CORS Settings cors_allow_origins: list[str] = ["*"] cors_allow_methods: list[str] = ["*"] diff --git a/tests/test_deploy_at_startup.py b/tests/test_deploy_at_startup.py index 5a7c04e4..368b0e85 100644 --- a/tests/test_deploy_at_startup.py +++ b/tests/test_deploy_at_startup.py @@ -89,7 +89,7 @@ def test_app_loads_pipeline_from_yaml_directory(test_client_yaml, test_yaml_pipe assert response.status_code == 200 pipelines = response.json()["pipelines"] - assert len(pipelines) == 3 + assert len(pipelines) == 5 def test_app_loads_pipeline_from_mixed_directory(test_client_mixed, test_mixed_pipelines_dir): diff --git a/tests/test_deploy_yaml.py b/tests/test_deploy_yaml.py index 361bf922..1188d1ce 100644 --- a/tests/test_deploy_yaml.py +++ b/tests/test_deploy_yaml.py @@ -97,3 +97,82 @@ def test_deploy_yaml_pipeline_with_utf8_characters(): metadata = registry.get_metadata(pipeline_data["name"]) assert metadata is not None + + +def test_deploy_yaml_pipeline_with_streaming_components(): + """Test that streaming_components field is properly parsed from YAML and stored in metadata.""" + pipeline_file = Path(__file__).parent / "test_files/yaml/multi_llm_streaming_pipeline.yml" + pipeline_data = { + "name": pipeline_file.stem, + "source_code": pipeline_file.read_text(), + } + + # Verify streaming_components is in the YAML + assert "streaming_components:" in pipeline_data["source_code"] + + add_yaml_pipeline_to_registry( + pipeline_name=pipeline_data["name"], + source_code=pipeline_data["source_code"], + ) + + # Verify pipeline was added to registry + assert registry.get(pipeline_data["name"]) is not None + + # Verify metadata contains streaming_components configuration + metadata = registry.get_metadata(pipeline_data["name"]) + assert metadata is not None + assert "streaming_components" in metadata + assert metadata["streaming_components"] is not None + assert metadata["streaming_components"] == ["llm_1", "llm_2"] + + # Verify it's an AsyncPipeline + pipeline_instance = registry.get(pipeline_data["name"]) + assert isinstance(pipeline_instance, AsyncPipeline) + + +def test_deploy_yaml_pipeline_without_streaming_components(): + """Test that pipelines without streaming_components field have None in metadata.""" + pipeline_file = Path(__file__).parent / "test_files/yaml/inputs_outputs_pipeline.yml" + pipeline_name = pipeline_file.stem + source_code = pipeline_file.read_text() + + # Verify streaming_components is NOT in this YAML + assert "streaming_components:" not in source_code + + add_yaml_pipeline_to_registry(pipeline_name=pipeline_name, source_code=source_code) + + # Verify metadata contains streaming_components as None (default behavior) + metadata = registry.get_metadata(pipeline_name) + assert metadata is not None + assert "streaming_components" in metadata + assert metadata["streaming_components"] is None + + +def test_deploy_yaml_pipeline_with_streaming_components_all_keyword(): + """Test that streaming_components: all is properly parsed and stored.""" + pipeline_file = Path(__file__).parent / "test_files/yaml/multi_llm_streaming_all_pipeline.yml" + pipeline_data = { + "name": pipeline_file.stem, + "source_code": pipeline_file.read_text(), + } + + # Verify streaming_components: all is in the YAML + assert "streaming_components: all" in pipeline_data["source_code"] + + add_yaml_pipeline_to_registry( + pipeline_name=pipeline_data["name"], + source_code=pipeline_data["source_code"], + ) + + # Verify pipeline was added to registry + assert registry.get(pipeline_data["name"]) is not None + + # Verify metadata contains streaming_components as "all" + metadata = registry.get_metadata(pipeline_data["name"]) + assert metadata is not None + assert "streaming_components" in metadata + assert metadata["streaming_components"] == "all" + + # Verify it's an AsyncPipeline + pipeline_instance = registry.get(pipeline_data["name"]) + assert isinstance(pipeline_instance, AsyncPipeline) diff --git a/tests/test_files/yaml/multi_llm_streaming_all_pipeline.yml b/tests/test_files/yaml/multi_llm_streaming_all_pipeline.yml new file mode 100644 index 00000000..8b61fb65 --- /dev/null +++ b/tests/test_files/yaml/multi_llm_streaming_all_pipeline.yml @@ -0,0 +1,64 @@ +components: + prompt_1: + init_parameters: + required_variables: "*" + template: | + "Answer this question: {{query}} + Answer: + " + type: haystack.components.builders.prompt_builder.PromptBuilder + + llm_1: + init_parameters: + api_base_url: null + api_key: + env_vars: + - OPENAI_API_KEY + strict: true + type: env_var + generation_kwargs: {} + model: gpt-4o-mini + streaming_callback: null + system_prompt: null + type: haystack.components.generators.openai.OpenAIGenerator + + prompt_2: + init_parameters: + required_variables: "*" + template: | + "Refine this response: {{previous_reply}} + Improved answer: + " + type: haystack.components.builders.prompt_builder.PromptBuilder + + llm_2: + init_parameters: + api_base_url: null + api_key: + env_vars: + - OPENAI_API_KEY + strict: true + type: env_var + generation_kwargs: {} + model: gpt-4o-mini + streaming_callback: null + system_prompt: null + type: haystack.components.generators.openai.OpenAIGenerator + +connections: + - receiver: llm_1.prompt + sender: prompt_1.prompt + - receiver: prompt_2.previous_reply + sender: llm_1.replies + - receiver: llm_2.prompt + sender: prompt_2.prompt + +metadata: {} + +inputs: + query: prompt_1.query + +outputs: + replies: llm_2.replies + +streaming_components: all diff --git a/tests/test_files/yaml/multi_llm_streaming_pipeline.yml b/tests/test_files/yaml/multi_llm_streaming_pipeline.yml new file mode 100644 index 00000000..8cbe775b --- /dev/null +++ b/tests/test_files/yaml/multi_llm_streaming_pipeline.yml @@ -0,0 +1,66 @@ +components: + prompt_1: + init_parameters: + required_variables: "*" + template: | + "Answer this question: {{query}} + Answer: + " + type: haystack.components.builders.prompt_builder.PromptBuilder + + llm_1: + init_parameters: + api_base_url: null + api_key: + env_vars: + - OPENAI_API_KEY + strict: true + type: env_var + generation_kwargs: {} + model: gpt-4o-mini + streaming_callback: null + system_prompt: null + type: haystack.components.generators.openai.OpenAIGenerator + + prompt_2: + init_parameters: + required_variables: "*" + template: | + "Refine this response: {{previous_reply}} + Improved answer: + " + type: haystack.components.builders.prompt_builder.PromptBuilder + + llm_2: + init_parameters: + api_base_url: null + api_key: + env_vars: + - OPENAI_API_KEY + strict: true + type: env_var + generation_kwargs: {} + model: gpt-4o-mini + streaming_callback: null + system_prompt: null + type: haystack.components.generators.openai.OpenAIGenerator + +connections: + - receiver: llm_1.prompt + sender: prompt_1.prompt + - receiver: prompt_2.previous_reply + sender: llm_1.replies + - receiver: llm_2.prompt + sender: prompt_2.prompt + +metadata: {} + +inputs: + query: prompt_1.query + +outputs: + replies: llm_2.replies + +streaming_components: + - llm_1 + - llm_2 diff --git a/tests/test_it_concurrent_streaming.py b/tests/test_it_concurrent_streaming.py new file mode 100644 index 00000000..67643e1d --- /dev/null +++ b/tests/test_it_concurrent_streaming.py @@ -0,0 +1,180 @@ +import asyncio +import concurrent.futures +import os + +import pytest +from haystack import AsyncPipeline, Pipeline +from haystack.components.builders import ChatPromptBuilder +from haystack.components.generators.chat import OpenAIChatGenerator +from haystack.dataclasses import ChatMessage, StreamingChunk +from haystack.utils import Secret + +from hayhooks.server.pipelines.utils import async_streaming_generator, streaming_generator + +# Test configuration +OPENAI_MODEL = "gpt-4o-mini" +OPENAI_API_KEY_SECRET = Secret.from_env_var("OPENAI_API_KEY") if os.environ.get("OPENAI_API_KEY") else None + +NUM_STRESS_TEST_REQUESTS = 10 +NUM_SYNC_TEST_REQUESTS = 5 + +# Skip tests if OpenAI API key is not available +pytestmark = pytest.mark.skipif( + not os.environ.get("OPENAI_API_KEY"), + reason="OPENAI_API_KEY environment variable required for integration tests", +) + + +@pytest.fixture(scope="module") +def async_streaming_pipeline(): + pipeline = AsyncPipeline() + pipeline.add_component("prompt_builder", ChatPromptBuilder()) + pipeline.add_component( + "llm", + OpenAIChatGenerator( + api_key=OPENAI_API_KEY_SECRET, + model=OPENAI_MODEL, + ), + ) + pipeline.connect("prompt_builder.prompt", "llm.messages") + return pipeline + + +@pytest.fixture(scope="module") +def sync_streaming_pipeline(): + pipeline = Pipeline() + pipeline.add_component("prompt_builder", ChatPromptBuilder()) + pipeline.add_component( + "llm", + OpenAIChatGenerator( + api_key=OPENAI_API_KEY_SECRET, + model=OPENAI_MODEL, + ), + ) + pipeline.connect("prompt_builder.prompt", "llm.messages") + return pipeline + + +def _create_test_message(request_id: str) -> list[ChatMessage]: + return [ChatMessage.from_user(f"Say only 'Response for request {request_id}' and nothing else.")] + + +def _normalize_text(text: str) -> str: + """Normalize text for comparison by removing spaces, underscores and converting to uppercase.""" + return text.replace(" ", "").replace("_", "").upper() + + +def _verify_chunks_belong_to_request(chunks: list[str], request_id: str) -> None: + assert chunks, f"Request {request_id} received no chunks" + + # Reconstruct the full response from chunks + # The LLM response will contain the request_id but may be tokenized across chunks + full_response = "".join(chunks) + + assert _normalize_text(request_id) in _normalize_text(full_response), ( + f"Request {request_id} did not receive its own response. " + f"Expected to find '{request_id}' in response. Got: {full_response[:200]}..." + ) + + +async def _consume_async_stream(pipeline, request_name: str) -> list[str]: + pipeline_args = {"prompt_builder": {"template": _create_test_message(request_name)}} + + chunks = [ + chunk.content + async for chunk in async_streaming_generator( + pipeline, + pipeline_run_args=pipeline_args, + ) + if isinstance(chunk, StreamingChunk) and chunk.content + ] + + return chunks + + +def _consume_sync_stream(pipeline, request_name: str) -> list[str]: + pipeline_args = {"prompt_builder": {"template": _create_test_message(request_name)}} + + chunks = [ + chunk.content + for chunk in streaming_generator( + pipeline, + pipeline_run_args=pipeline_args, + ) + if isinstance(chunk, StreamingChunk) and chunk.content + ] + + return chunks + + +@pytest.mark.integration +@pytest.mark.asyncio +async def test_async_concurrent_streaming_no_interference(async_streaming_pipeline): + """ + Test that concurrent async streaming requests don't interfere with each other. + + This test simulates two clients making simultaneous streaming requests to the same + pipeline instance and verifies that: + 1. Each request receives chunks + 2. Chunks are not cross-contaminated between requests + 3. Requests complete successfully + """ + # Run both requests concurrently + chunks_1, chunks_2 = await asyncio.gather( + _consume_async_stream(async_streaming_pipeline, "REQUEST_1"), + _consume_async_stream(async_streaming_pipeline, "REQUEST_2"), + ) + + # Verify both requests completed successfully without interference + _verify_chunks_belong_to_request(chunks_1, "REQUEST_1") + _verify_chunks_belong_to_request(chunks_2, "REQUEST_2") + + +@pytest.mark.integration +@pytest.mark.asyncio +async def test_async_concurrent_streaming_stress_test(async_streaming_pipeline): + """ + Stress test with many concurrent async streaming requests. + + This test simulates realistic load with multiple concurrent users all streaming + from the same pipeline instance. It verifies that: + 1. All requests complete successfully + 2. No chunks are lost or cross-contaminated + 3. The system handles high concurrency gracefully + """ + # Run all requests concurrently + results = await asyncio.gather( + *[_consume_async_stream(async_streaming_pipeline, f"REQ_{i}") for i in range(NUM_STRESS_TEST_REQUESTS)] + ) + + # Verify all requests completed successfully + assert len(results) == NUM_STRESS_TEST_REQUESTS, f"Expected {NUM_STRESS_TEST_REQUESTS} results, got {len(results)}" + + # Verify each request received its own response + for request_id, chunks in enumerate(results): + _verify_chunks_belong_to_request(chunks, f"REQ_{request_id}") + + +@pytest.mark.integration +def test_sync_concurrent_streaming_with_threads(sync_streaming_pipeline): + """ + Test concurrent sync streaming requests using threading. + + This test verifies the fix works with the synchronous streaming generator, + which uses threading internally. It simulates multiple threads making + simultaneous requests and verifies proper isolation. + """ + # Run concurrent requests using thread pool + with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_SYNC_TEST_REQUESTS) as executor: + futures = [ + executor.submit(_consume_sync_stream, sync_streaming_pipeline, f"SYNC_REQ_{i}") + for i in range(NUM_SYNC_TEST_REQUESTS) + ] + results = [future.result() for future in futures] + + # Verify all requests completed successfully + assert len(results) == NUM_SYNC_TEST_REQUESTS, f"Expected {NUM_SYNC_TEST_REQUESTS} results, got {len(results)}" + + # Verify each request received its own response + for request_id, chunks in enumerate(results): + _verify_chunks_belong_to_request(chunks, f"SYNC_REQ_{request_id}") diff --git a/tests/test_it_pipeline_utils.py b/tests/test_it_pipeline_utils.py index 6f238044..c0928f33 100644 --- a/tests/test_it_pipeline_utils.py +++ b/tests/test_it_pipeline_utils.py @@ -14,7 +14,12 @@ from hayhooks import callbacks from hayhooks.open_webui import OpenWebUIEvent, create_notification_event -from hayhooks.server.pipelines.utils import async_streaming_generator, find_streaming_component, streaming_generator +from hayhooks.server.pipelines.utils import ( + async_streaming_generator, + find_all_streaming_components, + streaming_generator, +) +from hayhooks.settings import AppSettings QUESTION = "Is Haystack a framework for developing AI applications? Answer Yes or No" @@ -138,48 +143,26 @@ def mocked_pipeline_with_streaming_component(mocker): pipeline.walk.return_value = [("streaming_component", streaming_component)] pipeline.get_component.return_value = streaming_component - return streaming_component, pipeline - - -def test_find_streaming_component_no_streaming_component(): - pipeline = Pipeline() - - with pytest.raises(ValueError, match="No streaming-capable component found in the pipeline"): - find_streaming_component(pipeline) - - -def test_find_streaming_component_finds_streaming_component(mocker): - streaming_component = MockComponent(has_streaming=True) - non_streaming_component = MockComponent(has_streaming=False) - - pipeline = mocker.Mock(spec=Pipeline) - pipeline.walk.return_value = [ - ("component1", non_streaming_component), - ("streaming_component", streaming_component), - ("component2", non_streaming_component), - ] - - component, name = find_streaming_component(pipeline) - assert component == streaming_component - assert name == "streaming_component" + return pipeline def test_streaming_generator_no_streaming_component(): pipeline = Pipeline() - with pytest.raises(ValueError, match="No streaming-capable component found in the pipeline"): + with pytest.raises(ValueError, match="No streaming-capable components found in the pipeline"): list(streaming_generator(pipeline)) def test_streaming_generator_with_existing_component_args(mocked_pipeline_with_streaming_component): - streaming_component, pipeline = mocked_pipeline_with_streaming_component + pipeline = mocked_pipeline_with_streaming_component # Mock the run method to simulate streaming def mock_run(data): # Simulate calling the streaming callback - if streaming_component.streaming_callback: - streaming_component.streaming_callback(StreamingChunk(content="chunk1")) - streaming_component.streaming_callback(StreamingChunk(content="chunk2")) + callback = data.get("streaming_component", {}).get("streaming_callback") + if callback: + callback(StreamingChunk(content="chunk1")) + callback(StreamingChunk(content="chunk2")) pipeline.run.side_effect = mock_run @@ -194,7 +177,7 @@ def mock_run(data): def test_streaming_generator_pipeline_exception(mocked_pipeline_with_streaming_component): - _, pipeline = mocked_pipeline_with_streaming_component + pipeline = mocked_pipeline_with_streaming_component # Mock the run method to raise an exception expected_error = RuntimeError("Pipeline execution failed") @@ -207,7 +190,7 @@ def test_streaming_generator_pipeline_exception(mocked_pipeline_with_streaming_c def test_streaming_generator_empty_output(mocked_pipeline_with_streaming_component): - _, pipeline = mocked_pipeline_with_streaming_component + pipeline = mocked_pipeline_with_streaming_component generator = streaming_generator(pipeline) chunks = list(generator) @@ -219,21 +202,22 @@ def test_streaming_generator_empty_output(mocked_pipeline_with_streaming_compone async def test_async_streaming_generator_no_streaming_component(): pipeline = Pipeline() - with pytest.raises(ValueError, match="No streaming-capable component found in the pipeline"): + with pytest.raises(ValueError, match="No streaming-capable components found in the pipeline"): _ = [chunk async for chunk in async_streaming_generator(pipeline)] @pytest.mark.asyncio async def test_async_streaming_generator_with_existing_component_args(mocker, mocked_pipeline_with_streaming_component): - streaming_component, pipeline = mocked_pipeline_with_streaming_component + pipeline = mocked_pipeline_with_streaming_component mock_chunks = [StreamingChunk(content="async_chunk1"), StreamingChunk(content="async_chunk2")] # Mock the run_async method to simulate streaming async def mock_run_async(data): # Simulate calling the streaming callback - if streaming_component.streaming_callback: - await streaming_component.streaming_callback(mock_chunks[0]) - await streaming_component.streaming_callback(mock_chunks[1]) + callback = data.get("streaming_component", {}).get("streaming_callback") + if callback: + await callback(mock_chunks[0]) + await callback(mock_chunks[1]) pipeline.run_async = mocker.AsyncMock(side_effect=mock_run_async) pipeline_run_args = {"streaming_component": {"existing": "args"}} @@ -247,7 +231,7 @@ async def mock_run_async(data): @pytest.mark.asyncio async def test_async_streaming_generator_pipeline_exception(mocker, mocked_pipeline_with_streaming_component): - _, pipeline = mocked_pipeline_with_streaming_component + pipeline = mocked_pipeline_with_streaming_component # Mock the run_async method to raise an exception expected_error = Exception("Async pipeline execution failed") @@ -259,7 +243,7 @@ async def test_async_streaming_generator_pipeline_exception(mocker, mocked_pipel @pytest.mark.asyncio async def test_async_streaming_generator_empty_output(mocker, mocked_pipeline_with_streaming_component): - _, pipeline = mocked_pipeline_with_streaming_component + pipeline = mocked_pipeline_with_streaming_component # Mock the run_async method without calling streaming callback pipeline.run_async = mocker.AsyncMock(return_value=None) @@ -271,7 +255,7 @@ async def test_async_streaming_generator_empty_output(mocker, mocked_pipeline_wi @pytest.mark.asyncio async def test_async_streaming_generator_cancellation(mocker, mocked_pipeline_with_streaming_component): - _, pipeline = mocked_pipeline_with_streaming_component + pipeline = mocked_pipeline_with_streaming_component # Mock the run_async method to simulate long-running task async def mock_long_running_task(data): @@ -301,14 +285,15 @@ async def run_and_cancel(): @pytest.mark.asyncio async def test_async_streaming_generator_timeout_scenarios(mocker, mocked_pipeline_with_streaming_component): - streaming_component, pipeline = mocked_pipeline_with_streaming_component + pipeline = mocked_pipeline_with_streaming_component mock_chunks = [StreamingChunk(content="delayed_chunk")] # Mock the run_async method to simulate delayed completion async def mock_delayed_task(data): await asyncio.sleep(0.5) # Longer than the timeout in the implementation - if streaming_component.streaming_callback: - await streaming_component.streaming_callback(mock_chunks[0]) + callback = data.get("streaming_component", {}).get("streaming_callback") + if callback: + await callback(mock_chunks[0]) pipeline.run_async = mocker.AsyncMock(side_effect=mock_delayed_task) @@ -318,7 +303,7 @@ async def mock_delayed_task(data): def test_streaming_generator_modifies_args_copy(mocked_pipeline_with_streaming_component) -> None: - _, pipeline = mocked_pipeline_with_streaming_component + pipeline = mocked_pipeline_with_streaming_component # Mock the run method pipeline.run.return_value = None @@ -340,7 +325,7 @@ def test_streaming_generator_modifies_args_copy(mocked_pipeline_with_streaming_c @pytest.mark.asyncio async def test_async_streaming_generator_modifies_args_copy(mocker, mocked_pipeline_with_streaming_component) -> None: - _, pipeline = mocked_pipeline_with_streaming_component + pipeline = mocked_pipeline_with_streaming_component pipeline._spec_class = AsyncPipeline # Mock the run_async method @@ -362,7 +347,7 @@ async def test_async_streaming_generator_modifies_args_copy(mocker, mocked_pipel def test_streaming_generator_with_tool_calls_and_default_callbacks(mocked_pipeline_with_streaming_component): - streaming_component, pipeline = mocked_pipeline_with_streaming_component + pipeline = mocked_pipeline_with_streaming_component tool_call_start = ToolCallDelta(index=0, tool_name="test_tool", arguments="") tool_call_end = ToolCallResult( @@ -375,9 +360,10 @@ def test_streaming_generator_with_tool_calls_and_default_callbacks(mocked_pipeli ] def mock_run(data): - if streaming_component.streaming_callback: + callback = data.get("streaming_component", {}).get("streaming_callback") + if callback: for chunk in mock_chunks_from_pipeline: - streaming_component.streaming_callback(chunk) + callback(chunk) return {"result": "Final result"} pipeline.run.side_effect = mock_run @@ -407,7 +393,7 @@ def mock_run(data): def test_streaming_generator_with_custom_callbacks(mocker, mocked_pipeline_with_streaming_component): - streaming_component, pipeline = mocked_pipeline_with_streaming_component + pipeline = mocked_pipeline_with_streaming_component tool_call_start = ToolCallDelta(index=0, tool_name="test_tool", arguments="") tool_call_end = ToolCallResult( @@ -419,9 +405,10 @@ def test_streaming_generator_with_custom_callbacks(mocker, mocked_pipeline_with_ ] def mock_run(data): - if streaming_component.streaming_callback: + callback = data.get("streaming_component", {}).get("streaming_callback") + if callback: for chunk in mock_chunks_from_pipeline: - streaming_component.streaming_callback(chunk) + callback(chunk) return {"result": "Final result"} pipeline.run.side_effect = mock_run @@ -475,7 +462,7 @@ def mock_run(data): async def test_async_streaming_generator_with_tool_calls_and_default_callbacks( mocker, mocked_pipeline_with_streaming_component ): - streaming_component, pipeline = mocked_pipeline_with_streaming_component + pipeline = mocked_pipeline_with_streaming_component tool_call_start = ToolCallDelta(index=0, tool_name="test_tool", arguments="") tool_call_end = ToolCallResult( @@ -488,9 +475,10 @@ async def test_async_streaming_generator_with_tool_calls_and_default_callbacks( ] async def mock_run_async(data): - if streaming_component.streaming_callback: + callback = data.get("streaming_component", {}).get("streaming_callback") + if callback: for chunk in mock_chunks_from_pipeline: - await streaming_component.streaming_callback(chunk) + await callback(chunk) return {"result": "Final result"} pipeline.run_async = mocker.AsyncMock(side_effect=mock_run_async) @@ -524,7 +512,7 @@ async def mock_run_async(data): @pytest.mark.asyncio async def test_async_streaming_generator_with_custom_callbacks(mocker, mocked_pipeline_with_streaming_component): - streaming_component, pipeline = mocked_pipeline_with_streaming_component + pipeline = mocked_pipeline_with_streaming_component tool_call_start = ToolCallDelta(index=0, tool_name="test_tool", arguments="") tool_call_end = ToolCallResult( @@ -536,9 +524,10 @@ async def test_async_streaming_generator_with_custom_callbacks(mocker, mocked_pi ] async def mock_run_async(data): - if streaming_component.streaming_callback: + callback = data.get("streaming_component", {}).get("streaming_callback") + if callback: for chunk in mock_chunks_from_pipeline: - await streaming_component.streaming_callback(chunk) + await callback(chunk) return {"result": "Final result"} pipeline.run_async = mocker.AsyncMock(side_effect=mock_run_async) @@ -589,7 +578,7 @@ async def mock_run_async(data): def test_streaming_generator_with_custom_callbacks_returning_list(mocker, mocked_pipeline_with_streaming_component): - streaming_component, pipeline = mocked_pipeline_with_streaming_component + pipeline = mocked_pipeline_with_streaming_component tool_call_start = ToolCallDelta(index=0, tool_name="test_tool", arguments="") tool_call_end = ToolCallResult( @@ -601,9 +590,10 @@ def test_streaming_generator_with_custom_callbacks_returning_list(mocker, mocked ] def mock_run(data): - if streaming_component.streaming_callback: + callback = data.get("streaming_component", {}).get("streaming_callback") + if callback: for chunk in mock_chunks_from_pipeline: - streaming_component.streaming_callback(chunk) + callback(chunk) pipeline.run.side_effect = mock_run @@ -665,7 +655,7 @@ def custom_on_tool_call_end(tool_name, arguments, result, error): async def test_async_streaming_generator_with_custom_callbacks_returning_list( mocker, mocked_pipeline_with_streaming_component ): - streaming_component, pipeline = mocked_pipeline_with_streaming_component + pipeline = mocked_pipeline_with_streaming_component tool_call_start = ToolCallDelta(index=0, tool_name="test_tool", arguments="") tool_call_end = ToolCallResult( @@ -677,9 +667,10 @@ async def test_async_streaming_generator_with_custom_callbacks_returning_list( ] async def mock_run_async(data): - if streaming_component.streaming_callback: + callback = data.get("streaming_component", {}).get("streaming_callback") + if callback: for chunk in mock_chunks_from_pipeline: - await streaming_component.streaming_callback(chunk) + await callback(chunk) pipeline.run_async = mocker.AsyncMock(side_effect=mock_run_async) @@ -738,7 +729,7 @@ def custom_on_tool_call_end(tool_name, arguments, result, error): def test_streaming_generator_with_tool_calls_and_no_callbacks(mocked_pipeline_with_streaming_component): - streaming_component, pipeline = mocked_pipeline_with_streaming_component + pipeline = mocked_pipeline_with_streaming_component tool_call_start = ToolCallDelta(index=0, tool_name="test_tool", arguments="") tool_call_end = ToolCallResult( @@ -751,9 +742,10 @@ def test_streaming_generator_with_tool_calls_and_no_callbacks(mocked_pipeline_wi ] def mock_run(data): - if streaming_component.streaming_callback: + callback = data.get("streaming_component", {}).get("streaming_callback") + if callback: for chunk in mock_chunks_from_pipeline: - streaming_component.streaming_callback(chunk) + callback(chunk) pipeline.run.side_effect = mock_run @@ -767,7 +759,7 @@ def mock_run(data): async def test_async_streaming_generator_with_tool_calls_and_no_callbacks( mocker, mocked_pipeline_with_streaming_component ): - streaming_component, pipeline = mocked_pipeline_with_streaming_component + pipeline = mocked_pipeline_with_streaming_component tool_call_start = ToolCallDelta(index=0, tool_name="test_tool", arguments="") tool_call_end = ToolCallResult( @@ -780,9 +772,10 @@ async def test_async_streaming_generator_with_tool_calls_and_no_callbacks( ] async def mock_run_async(data): - if streaming_component.streaming_callback: + callback = data.get("streaming_component", {}).get("streaming_callback") + if callback: for chunk in mock_chunks_from_pipeline: - await streaming_component.streaming_callback(chunk) + await callback(chunk) pipeline.run_async = mocker.AsyncMock(side_effect=mock_run_async) @@ -793,7 +786,7 @@ async def mock_run_async(data): def test_sync_streaming_generator_on_pipeline_end_callback(mocked_pipeline_with_streaming_component): - streaming_component, pipeline = mocked_pipeline_with_streaming_component + pipeline = mocked_pipeline_with_streaming_component mock_chunks_from_pipeline = [ StreamingChunk(content="Chunk 1", index=0), @@ -801,9 +794,10 @@ def test_sync_streaming_generator_on_pipeline_end_callback(mocked_pipeline_with_ ] def mock_run(data): - if streaming_component.streaming_callback: + callback = data.get("streaming_component", {}).get("streaming_callback") + if callback: for chunk in mock_chunks_from_pipeline: - streaming_component.streaming_callback(chunk) + callback(chunk) return {"result": "Final result"} pipeline.run.side_effect = mock_run @@ -819,7 +813,7 @@ def mock_run(data): @pytest.mark.asyncio async def test_async_streaming_generator_on_pipeline_end_callback(mocker, mocked_pipeline_with_streaming_component): - streaming_component, pipeline = mocked_pipeline_with_streaming_component + pipeline = mocked_pipeline_with_streaming_component mock_chunks_from_pipeline = [ StreamingChunk(content="Chunk 1", index=0), @@ -827,9 +821,10 @@ async def test_async_streaming_generator_on_pipeline_end_callback(mocker, mocked ] async def mock_run_async(data): - if streaming_component.streaming_callback: + callback = data.get("streaming_component", {}).get("streaming_callback") + if callback: for chunk in mock_chunks_from_pipeline: - await streaming_component.streaming_callback(chunk) + await callback(chunk) return {"result": "Final result"} pipeline.run_async = mocker.AsyncMock(side_effect=mock_run_async) @@ -844,7 +839,7 @@ async def mock_run_async(data): def test_sync_streaming_generator_on_pipeline_end_callback_no_return(mocked_pipeline_with_streaming_component): - streaming_component, pipeline = mocked_pipeline_with_streaming_component + pipeline = mocked_pipeline_with_streaming_component mock_chunks_from_pipeline = [ StreamingChunk(content="Chunk 1", index=0), @@ -852,9 +847,10 @@ def test_sync_streaming_generator_on_pipeline_end_callback_no_return(mocked_pipe ] def mock_run(data): - if streaming_component.streaming_callback: + callback = data.get("streaming_component", {}).get("streaming_callback") + if callback: for chunk in mock_chunks_from_pipeline: - streaming_component.streaming_callback(chunk) + callback(chunk) return {"result": "Final result"} pipeline.run.side_effect = mock_run @@ -875,7 +871,7 @@ def custom_on_pipeline_end(output): async def test_async_streaming_generator_on_pipeline_end_callback_no_return( mocker, mocked_pipeline_with_streaming_component ): - streaming_component, pipeline = mocked_pipeline_with_streaming_component + pipeline = mocked_pipeline_with_streaming_component mock_chunks_from_pipeline = [ StreamingChunk(content="Chunk 1", index=0), @@ -883,9 +879,10 @@ async def test_async_streaming_generator_on_pipeline_end_callback_no_return( ] async def mock_run_async(data): - if streaming_component.streaming_callback: + callback = data.get("streaming_component", {}).get("streaming_callback") + if callback: for chunk in mock_chunks_from_pipeline: - await streaming_component.streaming_callback(chunk) + await callback(chunk) return {"result": "Final result"} pipeline.run_async = mocker.AsyncMock(side_effect=mock_run_async) @@ -903,7 +900,7 @@ def custom_on_pipeline_end(output): def test_sync_streaming_generator_on_pipeline_end_callback_raises(mocked_pipeline_with_streaming_component): - streaming_component, pipeline = mocked_pipeline_with_streaming_component + pipeline = mocked_pipeline_with_streaming_component mock_chunks_from_pipeline = [ StreamingChunk(content="Chunk 1", index=0), @@ -911,9 +908,10 @@ def test_sync_streaming_generator_on_pipeline_end_callback_raises(mocked_pipelin ] def mock_run(data): - if streaming_component.streaming_callback: + callback = data.get("streaming_component", {}).get("streaming_callback") + if callback: for chunk in mock_chunks_from_pipeline: - streaming_component.streaming_callback(chunk) + callback(chunk) return {"result": "Final result"} pipeline.run.side_effect = mock_run @@ -935,7 +933,7 @@ def custom_on_pipeline_end(output): async def test_async_streaming_generator_on_pipeline_end_callback_raises( mocker, mocked_pipeline_with_streaming_component ): - streaming_component, pipeline = mocked_pipeline_with_streaming_component + pipeline = mocked_pipeline_with_streaming_component mock_chunks_from_pipeline = [ StreamingChunk(content="Chunk 1", index=0), @@ -943,9 +941,10 @@ async def test_async_streaming_generator_on_pipeline_end_callback_raises( ] async def mock_run_async(data): - if streaming_component.streaming_callback: + callback = data.get("streaming_component", {}).get("streaming_callback") + if callback: for chunk in mock_chunks_from_pipeline: - await streaming_component.streaming_callback(chunk) + await callback(chunk) return {"result": "Final result"} pipeline.run_async = mocker.AsyncMock(side_effect=mock_run_async) @@ -961,3 +960,423 @@ def custom_on_pipeline_end(output): logger.add(lambda msg: messages.append(msg), level="ERROR") _ = [chunk async for chunk in generator] assert "Callback error" in messages[0] + + +def test_find_all_streaming_components_finds_multiple(mocker): + streaming_component1 = MockComponent(has_streaming=True) + streaming_component2 = MockComponent(has_streaming=True) + non_streaming_component = MockComponent(has_streaming=False) + + pipeline = mocker.Mock(spec=Pipeline) + pipeline.walk.return_value = [ + ("component1", streaming_component1), + ("non_streaming", non_streaming_component), + ("component2", streaming_component2), + ] + + components = find_all_streaming_components(pipeline) + assert len(components) == 2 + assert components[0] == (streaming_component1, "component1") + assert components[1] == (streaming_component2, "component2") + + +def test_find_all_streaming_components_raises_when_none_found(): + pipeline = Pipeline() + + with pytest.raises(ValueError, match="No streaming-capable components found in the pipeline"): + find_all_streaming_components(pipeline) + + +@pytest.fixture +def pipeline_with_multiple_streaming_components(mocker): + streaming_component1 = MockComponent(has_streaming=True) + streaming_component2 = MockComponent(has_streaming=True) + non_streaming_component = MockComponent(has_streaming=False) + + pipeline = mocker.Mock(spec=AsyncPipeline) + pipeline._spec_class = AsyncPipeline + pipeline.walk.return_value = [ + ("component1", streaming_component1), + ("non_streaming", non_streaming_component), + ("component2", streaming_component2), + ] + + def mock_get_component(name): + if name == "component1": + return streaming_component1 + elif name == "component2": + return streaming_component2 + return non_streaming_component + + pipeline.get_component.side_effect = mock_get_component + + return pipeline + + +def test_streaming_generator_with_multiple_components_default_behavior(pipeline_with_multiple_streaming_components): + pipeline = pipeline_with_multiple_streaming_components + + mock_chunks = [ + StreamingChunk(content="chunk1_from_component2"), + StreamingChunk(content="chunk2_from_component2"), + ] + + def mock_run(data): + # Only component2 should stream (it's the last one) + callback = data.get("component2", {}).get("streaming_callback") + if callback: + callback(mock_chunks[0]) + callback(mock_chunks[1]) + + pipeline.run.side_effect = mock_run + + generator = streaming_generator(pipeline) + chunks = list(generator) + + assert chunks == mock_chunks + # Verify the callback was passed in the pipeline run args + # (we can't easily verify this in the test after the fact, but the mock_run will fail if it's not there) + + +@pytest.mark.parametrize( + "streaming_components", + [ + ["component1", "component2"], # Explicit list + "all", # "all" keyword + ], + ids=["list_both", "all_keyword"], +) +def test_streaming_generator_with_all_components_enabled( + pipeline_with_multiple_streaming_components, streaming_components +): + pipeline = pipeline_with_multiple_streaming_components + + mock_chunks = [ + StreamingChunk(content="chunk1_from_component1"), + StreamingChunk(content="chunk2_from_component1"), + StreamingChunk(content="chunk1_from_component2"), + StreamingChunk(content="chunk2_from_component2"), + ] + + def mock_run(data): + callback1 = data.get("component1", {}).get("streaming_callback") + if callback1: + callback1(mock_chunks[0]) + callback1(mock_chunks[1]) + callback2 = data.get("component2", {}).get("streaming_callback") + if callback2: + callback2(mock_chunks[2]) + callback2(mock_chunks[3]) + + pipeline.run.side_effect = mock_run + + generator = streaming_generator(pipeline, streaming_components=streaming_components) + chunks = list(generator) + + assert chunks == mock_chunks + # Verify the callbacks were passed in the pipeline run args + # (we can't easily verify this in the test after the fact, but the mock_run will fail if they're not there) + + +def test_streaming_generator_with_multiple_components_selective(pipeline_with_multiple_streaming_components): + pipeline = pipeline_with_multiple_streaming_components + + mock_chunks = [ + StreamingChunk(content="chunk1_from_component1"), + StreamingChunk(content="chunk2_from_component1"), + ] + + def mock_run(data): + # Only component1 should stream based on config + callback = data.get("component1", {}).get("streaming_callback") + if callback: + callback(mock_chunks[0]) + callback(mock_chunks[1]) + + pipeline.run.side_effect = mock_run + + generator = streaming_generator(pipeline, streaming_components=["component1"]) + chunks = list(generator) + + assert chunks == mock_chunks + + +@pytest.mark.asyncio +async def test_async_streaming_generator_with_multiple_components_default_behavior( + mocker, pipeline_with_multiple_streaming_components +): + pipeline = pipeline_with_multiple_streaming_components + + mock_chunks = [ + StreamingChunk(content="async_chunk1_from_component2"), + StreamingChunk(content="async_chunk2_from_component2"), + ] + + async def mock_run_async(data): + # Only component2 should stream (it's the last one) + callback = data.get("component2", {}).get("streaming_callback") + if callback: + await callback(mock_chunks[0]) + await callback(mock_chunks[1]) + + pipeline.run_async = mocker.AsyncMock(side_effect=mock_run_async) + + chunks = [chunk async for chunk in async_streaming_generator(pipeline)] + + assert chunks == mock_chunks + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "streaming_components", + [ + ["component1", "component2"], # Explicit list + "all", # "all" keyword + ], + ids=["list_both", "all_keyword"], +) +async def test_async_streaming_generator_with_all_components_enabled( + mocker, pipeline_with_multiple_streaming_components, streaming_components +): + pipeline = pipeline_with_multiple_streaming_components + + mock_chunks = [ + StreamingChunk(content="async_chunk1_from_component1"), + StreamingChunk(content="async_chunk2_from_component1"), + StreamingChunk(content="async_chunk1_from_component2"), + StreamingChunk(content="async_chunk2_from_component2"), + ] + + async def mock_run_async(data): + # Both components should stream + callback1 = data.get("component1", {}).get("streaming_callback") + if callback1: + await callback1(mock_chunks[0]) + await callback1(mock_chunks[1]) + callback2 = data.get("component2", {}).get("streaming_callback") + if callback2: + await callback2(mock_chunks[2]) + await callback2(mock_chunks[3]) + + pipeline.run_async = mocker.AsyncMock(side_effect=mock_run_async) + + chunks = [chunk async for chunk in async_streaming_generator(pipeline, streaming_components=streaming_components)] + + assert chunks == mock_chunks + + +@pytest.mark.asyncio +async def test_async_streaming_generator_with_multiple_components_selective( + mocker, pipeline_with_multiple_streaming_components +): + pipeline = pipeline_with_multiple_streaming_components + + mock_chunks = [ + StreamingChunk(content="async_chunk1_from_component1"), + StreamingChunk(content="async_chunk2_from_component1"), + ] + + async def mock_run_async(data): + # Only component1 should stream based on config + callback = data.get("component1", {}).get("streaming_callback") + if callback: + await callback(mock_chunks[0]) + await callback(mock_chunks[1]) + + pipeline.run_async = mocker.AsyncMock(side_effect=mock_run_async) + + chunks = [chunk async for chunk in async_streaming_generator(pipeline, streaming_components=["component1"])] + + assert chunks == mock_chunks + + +@pytest.mark.parametrize( + "env_var_value", + [ + "all", # "all" keyword + "component1,component2", # Comma-separated list + ], + ids=["env_all_keyword", "env_comma_separated"], +) +def test_streaming_generator_with_env_var_all_components( + monkeypatch, pipeline_with_multiple_streaming_components, env_var_value +): + pipeline = pipeline_with_multiple_streaming_components + + # Set environment variable and reload settings + monkeypatch.setenv("HAYHOOKS_STREAMING_COMPONENTS", env_var_value) + monkeypatch.setattr("hayhooks.server.pipelines.utils.settings", AppSettings()) + + mock_chunks = [ + StreamingChunk(content="chunk1_from_component1"), + StreamingChunk(content="chunk2_from_component1"), + StreamingChunk(content="chunk1_from_component2"), + StreamingChunk(content="chunk2_from_component2"), + ] + + def mock_run(data): + # Both components should stream + callback1 = data.get("component1", {}).get("streaming_callback") + if callback1: + callback1(mock_chunks[0]) + callback1(mock_chunks[1]) + callback2 = data.get("component2", {}).get("streaming_callback") + if callback2: + callback2(mock_chunks[2]) + callback2(mock_chunks[3]) + + pipeline.run.side_effect = mock_run + + # Don't pass streaming_components - should use env var + generator = streaming_generator(pipeline) + chunks = list(generator) + + assert chunks == mock_chunks + + +def test_streaming_generator_param_overrides_env_var(monkeypatch, pipeline_with_multiple_streaming_components): + pipeline = pipeline_with_multiple_streaming_components + + # Set environment variable to "all" + monkeypatch.setenv("HAYHOOKS_STREAMING_COMPONENTS", "all") + monkeypatch.setattr("hayhooks.server.pipelines.utils.settings", AppSettings()) + + mock_chunks = [ + StreamingChunk(content="chunk1_from_component1"), + StreamingChunk(content="chunk2_from_component1"), + ] + + def mock_run(data): + # Only component1 should stream (explicit param overrides env var) + callback = data.get("component1", {}).get("streaming_callback") + if callback: + callback(mock_chunks[0]) + callback(mock_chunks[1]) + + pipeline.run.side_effect = mock_run + + # Explicit parameter should override env var + generator = streaming_generator(pipeline, streaming_components=["component1"]) + chunks = list(generator) + + assert chunks == mock_chunks + + +def test_streaming_generator_with_empty_list(pipeline_with_multiple_streaming_components): + pipeline = pipeline_with_multiple_streaming_components + + def mock_run(data): + # Neither component should stream + pass + + pipeline.run.side_effect = mock_run + + generator = streaming_generator(pipeline, streaming_components=[]) + chunks = list(generator) + + assert chunks == [] + + +def test_streaming_generator_with_nonexistent_component_name(pipeline_with_multiple_streaming_components): + pipeline = pipeline_with_multiple_streaming_components + + mock_chunks = [ + StreamingChunk(content="chunk1_from_component1"), + StreamingChunk(content="chunk2_from_component1"), + ] + + def mock_run(data): + # Only component1 should stream (component3 doesn't exist) + callback = data.get("component1", {}).get("streaming_callback") + if callback: + callback(mock_chunks[0]) + callback(mock_chunks[1]) + + pipeline.run.side_effect = mock_run + + # Include non-existent component3 + generator = streaming_generator(pipeline, streaming_components=["component1", "component3"]) + chunks = list(generator) + + assert chunks == mock_chunks + + +def test_streaming_generator_with_single_component_comma_separated( + monkeypatch, pipeline_with_multiple_streaming_components +): + pipeline = pipeline_with_multiple_streaming_components + + # Set environment variable with single component + monkeypatch.setenv("HAYHOOKS_STREAMING_COMPONENTS", "component1") + monkeypatch.setattr("hayhooks.server.pipelines.utils.settings", AppSettings()) + + mock_chunks = [ + StreamingChunk(content="chunk1_from_component1"), + StreamingChunk(content="chunk2_from_component1"), + ] + + def mock_run(data): + callback = data.get("component1", {}).get("streaming_callback") + if callback: + callback(mock_chunks[0]) + callback(mock_chunks[1]) + + pipeline.run.side_effect = mock_run + + generator = streaming_generator(pipeline) + chunks = list(generator) + + assert chunks == mock_chunks + + +def test_parse_streaming_components_with_empty_string(monkeypatch, pipeline_with_multiple_streaming_components): + pipeline = pipeline_with_multiple_streaming_components + + # Set environment variable to empty string (default) + monkeypatch.setenv("HAYHOOKS_STREAMING_COMPONENTS", "") + monkeypatch.setattr("hayhooks.server.pipelines.utils.settings", AppSettings()) + + mock_chunks = [ + StreamingChunk(content="chunk1_from_component2"), + StreamingChunk(content="chunk2_from_component2"), + ] + + def mock_run(data): + # Should use default (last component only) + callback = data.get("component2", {}).get("streaming_callback") + if callback: + callback(mock_chunks[0]) + callback(mock_chunks[1]) + + pipeline.run.side_effect = mock_run + + generator = streaming_generator(pipeline) + chunks = list(generator) + + assert chunks == mock_chunks + + +def test_parse_streaming_components_setting_with_all(): + from hayhooks.server.pipelines.utils import _parse_streaming_components_setting + + assert _parse_streaming_components_setting("all") == "all" + assert _parse_streaming_components_setting("ALL") == "all" + assert _parse_streaming_components_setting(" all ") == "all" + + +def test_parse_streaming_components_setting_with_comma_list(): + from hayhooks.server.pipelines.utils import _parse_streaming_components_setting + + result = _parse_streaming_components_setting("llm_1,llm_2,llm_3") + assert result == ["llm_1", "llm_2", "llm_3"] + + # Test with spaces + result = _parse_streaming_components_setting("llm_1, llm_2 , llm_3") + assert result == ["llm_1", "llm_2", "llm_3"] + + +def test_parse_streaming_components_setting_with_empty(): + from hayhooks.server.pipelines.utils import _parse_streaming_components_setting + + assert _parse_streaming_components_setting("") is None + assert _parse_streaming_components_setting(" ") is None diff --git a/tests/test_yaml_inputs_outputs.py b/tests/test_yaml_inputs_outputs.py index 2148e67c..1cbffd5f 100644 --- a/tests/test_yaml_inputs_outputs.py +++ b/tests/test_yaml_inputs_outputs.py @@ -5,7 +5,12 @@ import pytest from hayhooks.server.exceptions import InvalidYamlIOError -from hayhooks.server.utils.yaml_utils import InputResolution, OutputResolution, get_inputs_outputs_from_yaml +from hayhooks.server.utils.yaml_utils import ( + InputResolution, + OutputResolution, + get_inputs_outputs_from_yaml, + get_streaming_components_from_yaml, +) def test_get_inputs_outputs_from_yaml_matches_pipeline_metadata(): @@ -46,3 +51,118 @@ def test_get_inputs_outputs_from_yaml_raises_when_missing_inputs_outputs(): InvalidYamlIOError, match=re.escape("YAML pipeline must declare at least one of 'inputs' or 'outputs'.") ): get_inputs_outputs_from_yaml(yaml_source) + + +def test_get_streaming_components_from_yaml_with_valid_config(): + """Test parsing streaming_components from YAML with valid configuration.""" + yaml_source = """ +components: + llm_1: + type: haystack.components.generators.OpenAIGenerator + llm_2: + type: haystack.components.generators.OpenAIGenerator + +connections: + - sender: llm_1.replies + receiver: llm_2.prompt + +inputs: + prompt: llm_1.prompt + +outputs: + replies: llm_2.replies + +streaming_components: + - llm_1 +""" + result = get_streaming_components_from_yaml(yaml_source) + + assert result is not None + assert result == ["llm_1"] + + +def test_get_streaming_components_from_yaml_without_config(): + """Test parsing streaming_components from YAML when not specified.""" + yaml_source = """ +components: + llm: + type: haystack.components.generators.OpenAIGenerator + +inputs: + prompt: llm.prompt + +outputs: + replies: llm.replies +""" + result = get_streaming_components_from_yaml(yaml_source) + + assert result is None + + +def test_get_streaming_components_from_yaml_with_invalid_type(): + """Test parsing streaming_components when it's not a list (should return None).""" + yaml_source = """ +components: + llm: + type: haystack.components.generators.OpenAIGenerator + +inputs: + prompt: llm.prompt + +outputs: + replies: llm.replies + +streaming_components: 123 +""" + result = get_streaming_components_from_yaml(yaml_source) + + assert result is None + + +def test_get_streaming_components_from_yaml_converts_to_str(): + """Test that streaming_components items are converted to strings.""" + yaml_source = """ +components: + llm_1: + type: haystack.components.generators.OpenAIGenerator + llm_2: + type: haystack.components.generators.OpenAIGenerator + +inputs: + prompt: llm_1.prompt + +outputs: + replies: llm_2.replies + +streaming_components: + - llm_1 + - llm_2 +""" + result = get_streaming_components_from_yaml(yaml_source) + + assert result is not None + assert result == ["llm_1", "llm_2"] + # Ensure values are actually string type + assert all(isinstance(item, str) for item in result) + + +def test_get_streaming_components_from_yaml_with_all_keyword(): + """Test parsing streaming_components when set to 'all' keyword.""" + yaml_source = """ +components: + llm_1: + type: haystack.components.generators.OpenAIGenerator + llm_2: + type: haystack.components.generators.OpenAIGenerator + +inputs: + prompt: llm_1.prompt + +outputs: + replies: llm_2.replies + +streaming_components: all +""" + result = get_streaming_components_from_yaml(yaml_source) + + assert result == "all"