Skip to content

Conversation

@mpangrazzi
Copy link
Contributor

@mpangrazzi mpangrazzi commented Oct 17, 2025

Should close #144 and #133.

Currently in Hayhooks we support streaming only considering the last streaming-capable component.

With this PR, we want to support all streaming-capable components in a pipeline. This can enable support from some specific use cases.

Example where we have a pipeline with 2 LLM-based components:

multi_stream

Full example: https://github.com/deepset-ai/hayhooks/tree/multi-component-streaming/examples/pipeline_wrappers/multi_llm_streaming

Sample pipeline and PipelineWrapper: https://github.com/deepset-ai/hayhooks/blob/multi-component-streaming/examples/pipeline_wrappers/multi_llm_streaming/pipeline_wrapper.py

NOTE: I was wondering if to keep the "streaming all components" a default thing, or if it worth make an user to choice between "stream all capable components" and "stream only the last capable one".

Copy link
Member

@anakin87 anakin87 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if to keep the "streaming all components" a default thing, or if it worth make an user to choice between "stream all capable components" and "stream only the last capable one".

I'm not sure... Maybe I would let the user choose.

On the other hand, if streaming isn't needed, the user could simply avoid providing a streaming_callback.

Curious to hear @sjrl's thoughts as well...

@sjrl
Copy link
Contributor

sjrl commented Oct 17, 2025

I was wondering if to keep the "streaming all components" a default thing, or if it worth make an user to choice between "stream all capable components" and "stream only the last capable one".

I'm not sure... Maybe I would let the user choose.

On the other hand, if streaming isn't needed, the user could simply avoid providing a streaming_callback.

Curious to hear @sjrl's thoughts as well...

I'd be for allowing a user to choose. E.g. I could imagine a user providing a list or set of component names where streaming should be enabled.

Turning it on for all by default could cause problems in edge cases where two LLMs in two branches are running simultaneously (like could happen when using AsyncPipeline).

@mpangrazzi
Copy link
Contributor Author

@sjrl

Turning it on for all by default could cause problems in edge cases where two LLMs in two branches are running simultaneously (like could happen when using AsyncPipeline).

Speaking with @tstadel this usually doesn't happen due to XOR branches (so one generator at the time is streaming). But assuming it will happen, we may get the source component from the streaming chunk and "fix" the output stream accordingly.

@anakin87 Probably a better solution would be:

  • Stream only the last capable component by default
  • Accept a param to enable streaming on all capable components (assuming they will do it serially)
  • Accept a param to exclude some specific component from streaming (e.g. by name)

This should cover all use cases. For YAML pipelines, same streaming configuration may be read from a streaming_config field (as for inputs / outputs).

WDYT?

@anakin87
Copy link
Member

For simplicity, I would:

  • Stream only the last capable component by default
  • Accept a param to express streaming_config (streaming_config={"component_a": True, "component_b": False})

Ofc, also your original idea would work...

@mpangrazzi
Copy link
Contributor Author

mpangrazzi commented Oct 20, 2025

Ok in the end I've went for a mixed solution:

Both streaming generators and YAML support a streaming_components field / param which can have the following values:

  • all as a shortcut to enable stream for all capable components
  • comma-separated list of components like comp1, comp2 if you want to enable streaming for specific components
  • if not set or has a falsy value, default behaviour is to stream only the last capable component (most common use case)

In addition, streaming_generators value can be also controlled by a global env var HAYHOOKS_STREAMING_COMPONENTS, which supports the same values. This is useful if you don't want e.g. to change YAML but add a streaming component configuration.

So priority is:

  1. Explicit streaming_components parameter OR YAML streaming_components field
  2. HAYHOOKS_STREAMING_COMPONENTS env var (global default)
  3. Default behaviour: stream only last component (lowest priority)

I've added some more tests and examples.

cc @tstadel @anakin87 @sjrl

@mpangrazzi mpangrazzi requested a review from tstadel October 20, 2025 14:53
@mpangrazzi mpangrazzi marked this pull request as ready for review October 20, 2025 15:02
Copy link
Member

@anakin87 anakin87 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good work!

I've left some minor comments.

Copy link
Member

@tstadel tstadel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. Nice descriptive way to configure streaming. I quickly thought about making streaming_components a list instead of a dict, as you currently don't need a dict for boolean values. However I can totally see the potential for extending streaming configs here and setting additional parameters.

@mpangrazzi
Copy link
Contributor Author

@tstadel

I quickly thought about making streaming_components a list instead of a dict, as you currently don't need a dict for boolean values

Yeah, good point. I was expecting to add more options at first but in the end I need only a boolean here - will convert to a list!

@mpangrazzi
Copy link
Contributor Author

Ok so I've did the following things:

  • Update streaming_components format from boolean dict {"llm_1": True, "llm_2": False} to (white) list ["llm_1"], for both YAML and method params
  • Fixed an old issue which doesn't make the streaming able to work in concurrency due to override of component's streaming_callback. Now it's passed as a run arg. Added also a concurrency integration test here - cc @sjrl
  • Fixed some minor type issues due to Literal usage
  • Updated docs, examples and tests

cc @tstadel @anakin87

Copy link
Member

@anakin87 anakin87 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good

@mpangrazzi mpangrazzi merged commit 084042e into main Oct 21, 2025
5 checks passed
@mpangrazzi mpangrazzi deleted the multi-component-streaming branch October 21, 2025 19:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants