Skip to content

Commit 084042e

Browse files
mpangrazzianakin87
andauthored
Support streaming from multiple pipeline components (#178)
* Support streaming from multiple pipeline components * Lint * Fix example * Update example (cleaner) and docs * Support for granular / 'all' / env var based streaming components configuration * Update src/hayhooks/server/pipelines/utils.py Co-authored-by: Stefano Fiorucci <[email protected]> * Lint * Fix type issues with Literal ; Make streaming concurrency-safe * Refactoring * Refine example README * Use a list instead of a dict for streaming_components (whitelist) * Reformat * remove unneeded 'template_variables' ; add required_variables * to turn off warnings * Fix docs --------- Co-authored-by: Stefano Fiorucci <[email protected]>
1 parent 86d3003 commit 084042e

File tree

17 files changed

+1769
-207
lines changed

17 files changed

+1769
-207
lines changed

docs/concepts/pipeline-wrapper.md

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,192 @@ async def run_chat_completion_async(self, model: str, messages: List[dict], body
176176
)
177177
```
178178

179+
## Streaming from Multiple Components
180+
181+
!!! info "Smart Streaming Behavior"
182+
By default, Hayhooks streams only the **last** streaming-capable component in your pipeline. This is usually what you want - the final output streaming to users.
183+
184+
For advanced use cases, you can control which components stream using the `streaming_components` parameter.
185+
186+
When your pipeline contains multiple components that support streaming (e.g., multiple LLMs), you can control which ones stream their outputs as the pipeline executes.
187+
188+
### Default Behavior: Stream Only the Last Component
189+
190+
By default, only the last streaming-capable component will stream:
191+
192+
```python
193+
class MultiLLMWrapper(BasePipelineWrapper):
194+
def setup(self) -> None:
195+
from haystack.components.builders import ChatPromptBuilder
196+
from haystack.components.generators.chat import OpenAIChatGenerator
197+
from haystack.dataclasses import ChatMessage
198+
199+
self.pipeline = Pipeline()
200+
201+
# First LLM - initial answer
202+
self.pipeline.add_component(
203+
"prompt_1",
204+
ChatPromptBuilder(
205+
template=[
206+
ChatMessage.from_system("You are a helpful assistant."),
207+
ChatMessage.from_user("{{query}}")
208+
]
209+
)
210+
)
211+
self.pipeline.add_component("llm_1", OpenAIChatGenerator(model="gpt-4o-mini"))
212+
213+
# Second LLM - refines the answer using Jinja2 to access ChatMessage attributes
214+
self.pipeline.add_component(
215+
"prompt_2",
216+
ChatPromptBuilder(
217+
template=[
218+
ChatMessage.from_system("You are a helpful assistant that refines responses."),
219+
ChatMessage.from_user(
220+
"Previous response: {{previous_response[0].text}}\n\nRefine this."
221+
)
222+
]
223+
)
224+
)
225+
self.pipeline.add_component("llm_2", OpenAIChatGenerator(model="gpt-4o-mini"))
226+
227+
# Connect components - LLM 1's replies go directly to prompt_2
228+
self.pipeline.connect("prompt_1.prompt", "llm_1.messages")
229+
self.pipeline.connect("llm_1.replies", "prompt_2.previous_response")
230+
self.pipeline.connect("prompt_2.prompt", "llm_2.messages")
231+
232+
def run_chat_completion(self, model: str, messages: List[dict], body: dict) -> Generator:
233+
question = get_last_user_message(messages)
234+
235+
# By default, only llm_2 (the last streaming component) will stream
236+
return streaming_generator(
237+
pipeline=self.pipeline,
238+
pipeline_run_args={"prompt_1": {"query": question}}
239+
)
240+
```
241+
242+
**What happens:** Only `llm_2` (the last streaming-capable component) streams its responses token by token. The first LLM (`llm_1`) executes normally without streaming, and only the final refined output streams to the user.
243+
244+
### Advanced: Stream Multiple Components with `streaming_components`
245+
246+
For advanced use cases where you want to see outputs from multiple components, use the `streaming_components` parameter:
247+
248+
```python
249+
def run_chat_completion(self, model: str, messages: List[dict], body: dict) -> Generator:
250+
question = get_last_user_message(messages)
251+
252+
# Enable streaming for BOTH LLMs
253+
return streaming_generator(
254+
pipeline=self.pipeline,
255+
pipeline_run_args={"prompt_1": {"query": question}},
256+
streaming_components=["llm_1", "llm_2"] # Stream both components
257+
)
258+
```
259+
260+
**What happens:** Both LLMs stream their responses token by token. First you'll see the initial answer from `llm_1` streaming, then the refined answer from `llm_2` streaming.
261+
262+
You can also selectively enable streaming for specific components:
263+
264+
```python
265+
# Stream only the first LLM
266+
streaming_components=["llm_1"]
267+
268+
# Stream only the second LLM (same as default)
269+
streaming_components=["llm_2"]
270+
271+
# Stream ALL capable components (shorthand)
272+
streaming_components="all"
273+
274+
# Stream ALL capable components (specific list)
275+
streaming_components=["llm_1", "llm_2"]
276+
```
277+
278+
### Using the "all" Keyword
279+
280+
The `"all"` keyword is a convenient shorthand to enable streaming for all capable components:
281+
282+
```python
283+
return streaming_generator(
284+
pipeline=self.pipeline,
285+
pipeline_run_args={...},
286+
streaming_components="all" # Enable all streaming components
287+
)
288+
```
289+
290+
This is equivalent to explicitly enabling every streaming-capable component in your pipeline.
291+
292+
### Global Configuration via Environment Variable
293+
294+
You can set a global default using the `HAYHOOKS_STREAMING_COMPONENTS` environment variable. This applies to all pipelines unless overridden:
295+
296+
```bash
297+
# Stream all components by default
298+
export HAYHOOKS_STREAMING_COMPONENTS="all"
299+
300+
# Stream specific components (comma-separated)
301+
export HAYHOOKS_STREAMING_COMPONENTS="llm_1,llm_2"
302+
```
303+
304+
**Priority order:**
305+
306+
1. Explicit `streaming_components` parameter (highest priority)
307+
2. `HAYHOOKS_STREAMING_COMPONENTS` environment variable
308+
3. Default behavior: stream only last component (lowest priority)
309+
310+
!!! tip "When to Use Each Approach"
311+
- **Default (last component only)**: Best for most use cases - users see only the final output
312+
- **"all" keyword**: Useful for debugging, demos, or transparent multi-step workflows
313+
- **List of components**: Enable multiple specific components by name
314+
- **Environment variable**: For deployment-wide defaults without code changes
315+
316+
!!! note "Async Streaming"
317+
All streaming_components options work identically with `async_streaming_generator()` for async pipelines.
318+
319+
### YAML Pipeline Streaming Configuration
320+
321+
You can also specify streaming configuration in YAML pipeline definitions:
322+
323+
```yaml
324+
components:
325+
prompt_1:
326+
type: haystack.components.builders.PromptBuilder
327+
init_parameters:
328+
template: "Answer this question: {{query}}"
329+
llm_1:
330+
type: haystack.components.generators.OpenAIGenerator
331+
prompt_2:
332+
type: haystack.components.builders.PromptBuilder
333+
init_parameters:
334+
template: "Refine this response: {{previous_reply}}"
335+
llm_2:
336+
type: haystack.components.generators.OpenAIGenerator
337+
338+
connections:
339+
- sender: prompt_1.prompt
340+
receiver: llm_1.prompt
341+
- sender: llm_1.replies
342+
receiver: prompt_2.previous_reply
343+
- sender: prompt_2.prompt
344+
receiver: llm_2.prompt
345+
346+
inputs:
347+
query: prompt_1.query
348+
349+
outputs:
350+
replies: llm_2.replies
351+
352+
# Option 1: List specific components
353+
streaming_components:
354+
- llm_1
355+
- llm_2
356+
357+
# Option 2: Stream all components
358+
# streaming_components: all
359+
```
360+
361+
YAML configuration follows the same priority rules: YAML setting > environment variable > default.
362+
363+
See the [Multi-LLM Streaming Example](https://github.com/deepset-ai/hayhooks/tree/main/examples/pipeline_wrappers/multi_llm_streaming) for a complete working implementation.
364+
179365
## File Upload Support
180366

181367
Hayhooks can handle file uploads by adding a `files` parameter:

docs/reference/environment-variables.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,32 @@ Hayhooks can be configured via environment variables (loaded with prefix `HAYHOO
4444
- Default: `false`
4545
- Description: Include tracebacks in error messages (server and MCP)
4646

47+
### HAYHOOKS_STREAMING_COMPONENTS
48+
49+
- Default: `""` (empty string)
50+
- Description: Global configuration for which pipeline components should stream
51+
- Options:
52+
- `""` (empty): Stream only the last capable component (default)
53+
- `"all"`: Stream all streaming-capable components
54+
- Comma-separated list: `"llm_1,llm_2"` to enable specific components
55+
56+
!!! note "Priority Order"
57+
Pipeline-specific settings (via `streaming_components` parameter or YAML) override this global default.
58+
59+
!!! tip "Component-Specific Control"
60+
For component-specific control, use the `streaming_components` parameter in your code or YAML configuration instead of the environment variable to specify exactly which components should stream.
61+
62+
**Examples:**
63+
64+
```bash
65+
# Stream all components globally
66+
export HAYHOOKS_STREAMING_COMPONENTS="all"
67+
68+
# Stream specific components (comma-separated, spaces are trimmed)
69+
export HAYHOOKS_STREAMING_COMPONENTS="llm_1,llm_2"
70+
export HAYHOOKS_STREAMING_COMPONENTS="llm_1, llm_2, llm_3"
71+
```
72+
4773
## MCP
4874

4975
### HAYHOOKS_MCP_HOST
@@ -154,6 +180,7 @@ HAYHOOKS_ADDITIONAL_PYTHON_PATH=./custom_code
154180
HAYHOOKS_USE_HTTPS=false
155181
HAYHOOKS_DISABLE_SSL=false
156182
HAYHOOKS_SHOW_TRACEBACKS=false
183+
HAYHOOKS_STREAMING_COMPONENTS=all
157184
HAYHOOKS_CORS_ALLOW_ORIGINS=["*"]
158185
LOG=INFO
159186
```

examples/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ This directory contains various examples demonstrating different use cases and f
66

77
| Example | Description | Key Features | Use Case |
88
|---------|-------------|--------------|----------|
9+
| [multi_llm_streaming](./pipeline_wrappers/multi_llm_streaming/) | Multiple LLM components with automatic streaming | • Two sequential LLMs<br/>• Automatic multi-component streaming<br/>• No special configuration needed<br/>• Shows default streaming behavior | Demonstrating how hayhooks automatically streams from all components in a pipeline |
910
| [async_question_answer](./pipeline_wrappers/async_question_answer/) | Async question-answering pipeline with streaming support | • Async pipeline execution<br/>• Streaming responses<br/>• OpenAI Chat Generator<br/>• Both API and chat completion interfaces | Building conversational AI systems that need async processing and real-time streaming responses |
1011
| [chat_with_website](./pipeline_wrappers/chat_with_website/) | Answer questions about website content | • Web content fetching<br/>• HTML to document conversion<br/>• Content-based Q&A<br/>• Configurable URLs | Creating AI assistants that can answer questions about specific websites or web-based documentation |
1112
| [chat_with_website_mcp](./pipeline_wrappers/chat_with_website_mcp/) | MCP-compatible website chat pipeline | • MCP (Model Context Protocol) support<br/>• Website content analysis<br/>• API-only interface<br/>• Simplified deployment | Integrating website analysis capabilities into MCP-compatible AI systems and tools |
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
# Multi-LLM Streaming Example
2+
3+
This example demonstrates hayhooks' configurable multi-component streaming support.
4+
5+
## Overview
6+
7+
The pipeline contains **two LLM components in sequence**:
8+
9+
1. **LLM 1** (`gpt-5-nano` with `reasoning_effort: low`): Provides a short, concise initial answer to the user's question
10+
2. **LLM 2** (`gpt-5-nano` with `reasoning_effort: medium`): Refines and expands the answer into a detailed, professional response
11+
12+
This example uses `streaming_components` to enable streaming for **both** LLMs. By default, only the last component would stream.
13+
14+
![Multi-LLM Streaming Example](./multi_stream.gif)
15+
16+
## How It Works
17+
18+
### Streaming Configuration
19+
20+
By default, hayhooks streams only the **last** streaming-capable component (in this case, LLM 2). However, this example demonstrates using the `streaming_components` parameter to enable streaming for both components:
21+
22+
```python
23+
streaming_generator(
24+
pipeline=self.pipeline,
25+
pipeline_run_args={...},
26+
streaming_components=["llm_1", "llm_2"] # or streaming_components="all"
27+
)
28+
```
29+
30+
**Available options:**
31+
32+
- **Default behavior** (no `streaming_components` or `None`): Only the last streaming component streams
33+
- **Stream all components**: `streaming_components=["llm_1", "llm_2"]` (same as `streaming_components="all"`)
34+
- **Stream only first**: `streaming_components=["llm_1"]`
35+
- **Stream only last** (same as default): `streaming_components=["llm_2"]`
36+
37+
### Pipeline Architecture
38+
39+
The pipeline connects LLM 1's replies directly to the second prompt builder. Using Jinja2 template syntax, the second prompt builder can access the `ChatMessage` attributes directly: `{{previous_response[0].text}}`. This approach is simple and doesn't require any custom extraction components.
40+
41+
This example also demonstrates injecting a visual separator (`**[LLM 2 - Refining the response]**`) between the two LLM outputs using `StreamingChunk.component_info` to detect component transitions.
42+
43+
## Usage
44+
45+
### Deploy with Hayhooks
46+
47+
```bash
48+
# Set your OpenAI API key
49+
export OPENAI_API_KEY=your_api_key_here
50+
51+
# Deploy the pipeline
52+
hayhooks deploy examples/pipeline_wrappers/multi_llm_streaming
53+
54+
# Test it via OpenAI-compatible API
55+
curl -X POST http://localhost:1416/v1/chat/completions \
56+
-H "Content-Type: application/json" \
57+
-d '{
58+
"model": "multi_llm_streaming",
59+
"messages": [{"role": "user", "content": "What is machine learning?"}],
60+
"stream": true
61+
}'
62+
```
63+
64+
### Use Directly in Code
65+
66+
```python
67+
from haystack import Pipeline
68+
from haystack.components.builders import ChatPromptBuilder
69+
from haystack.dataclasses import ChatMessage
70+
from hayhooks import streaming_generator
71+
72+
# Create your pipeline with multiple streaming components
73+
pipeline = Pipeline()
74+
# ... add LLM 1 and prompt_builder_1 ...
75+
76+
# Add second prompt builder that accesses ChatMessage attributes via Jinja2
77+
pipeline.add_component(
78+
"prompt_builder_2",
79+
ChatPromptBuilder(
80+
template=[
81+
ChatMessage.from_system("You are a helpful assistant."),
82+
ChatMessage.from_user("Previous: {{previous_response[0].text}}\n\nRefine this.")
83+
]
84+
)
85+
)
86+
# ... add LLM 2 ...
87+
88+
# Connect: LLM 1 replies directly to prompt_builder_2
89+
pipeline.connect("llm_1.replies", "prompt_builder_2.previous_response")
90+
91+
# Enable streaming for both LLMs (by default, only the last would stream)
92+
for chunk in streaming_generator(
93+
pipeline=pipeline,
94+
pipeline_run_args={"prompt_builder_1": {"query": "Your question"}},
95+
streaming_components=["llm_1", "llm_2"] # Stream both components
96+
):
97+
print(chunk.content, end="", flush=True)
98+
```
99+
100+
## Integration with OpenWebUI
101+
102+
This pipeline works seamlessly with OpenWebUI:
103+
104+
1. Configure OpenWebUI to connect to hayhooks (see [OpenWebUI Integration docs](https://deepset-ai.github.io/hayhooks/features/openwebui-integration))
105+
2. Deploy this pipeline
106+
3. Select it as a model in OpenWebUI
107+
4. Watch both LLMs stream their responses in real-time
1.13 MB
Loading

0 commit comments

Comments
 (0)