Skip to content

Conversation

MrCatters
Copy link

Previously the pipeline utilities expected exactly one streaming component. The new implementation allows any number of components to define a streaming_callback attribute:

  • find_streaming_components() returns a list of tuples containing streaming_component, streaming_component_name
  • _setup_streaming_callback_for_pipeline() iterates over all streaming components and attaches the provided callback
  • _validate_async_streaming_support() checks every streaming component for async capabilities and reports all failures in a single error message

Previously the pipeline utilities expected exactly one streaming component.
The new implementation allows any number of components to define a
`streaming_callback` attribute:

- find_streaming_components() returns a list of tuples containing streaming_component, streaming_component_name
- _setup_streaming_callback_for_pipeline() iterates over all streaming
  components and attaches the provided callback
- _validate_async_streaming_support() checks every streaming component for
  async capabilities and reports all failures in a single error message
@MrCatters
Copy link
Author

tests/test_it_pipeline_utils.py will need to be updated as well. I try not to mess with testcases on projects I am not familiar with and typically leave it up to the maintainers 😀.

@hoschmieder
Copy link

Hi, MrCatters,
do you address the same issue as in my request?
#133

@mpangrazzi
Copy link
Contributor

@MrCatters @hoschmieder Can I ask you about the use case of this PR?

@hoschmieder
Copy link

hoschmieder commented Aug 21, 2025

Hi @mpangrazzi,
Use case of this (PR: 133):
I have multiple compontens within a pipeline, e.g. two llm's, vector store ...
all of the providing streaming output to inform the user about the process.
with the original code i just can stream out one component.
with my patch each component can stream in realtime.
example within a own component:

        if streaming_callback:
            streaming_callback(StreamingChunk(content=f"⏳ Der Ranker is starting ... it takes a while">

Maybe there are better way's to solve this. But that works for my demands today.

@mpangrazzi
Copy link
Contributor

@hoschmieder sorry about the delay. Well, I think that it may be useful if the components do streaming serially (so not e.g. both at the same time).

In my opinion, to merge this or #133:

  • @MrCatters I wouldn't update existing methods, but rather add new ones, so the user can choose depending on use case
  • @hoschmieder @MrCatters I would like to have unit tests for such a feature (current streaming utilities are already tested, you can take inspiration from this code).

You may collaborate on a single PR, WDYT?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants