Skip to content

Conversation

@igordayen
Copy link
Contributor

@igordayen igordayen commented Nov 29, 2025

LLM Streaming Integration with Tooling Support

Overview

This PR introduces comprehensive LLM streaming capabilities with full tooling integration, providing reactive streaming APIs that work seamlessly with @Tool annotated methods.

User Code Usage

Basic Streaming with Tools

// Extension function approach (recommended)
runner = ai.withLlmByRole("fastest")

    val results = runner.asStreaming()
          .withPrompt("Test integration streaming")
          .createObjectListWithThinking(SimpleItem::class.java)

      results
          .doOnNext { event ->
              when {
                  event.isThinking() -> {
                      val content = event.getThinking()!!
                      receivedEvents.add("THINKING: $content")
                      logger.info("Integration test received thinking: {}", content)
                  }
                  event.isObject() -> {
                      val obj = event.getObject()!!
                      receivedEvents.add("OBJECT: ${obj.name}")
                      logger.info("Integration test received object: {}", obj.name)
                  }
              }
          }
          .doOnError { error ->
              errorOccurred = error
              logger.error("Integration test stream error: {}", error.message)
          }
          .doOnComplete {
              completionCalled = true
              logger.info("Integration test stream completed successfully")
          }

Extension Functions API

// Pure casting (fast)
runner.asStreaming()

// Safe conversion with validation
runner.asStreamingWithValidation()

Code Organization

New Components

  • StreamingPromptRunnerOperations - Core streaming operations interface
  • StreamingPromptRunnerOperationsImpl - Implementation bridging API to SPI
  • StreamingChatClientOperations - Spring AI integration layer
  • Extension Functions - Clean alternatives to manual casting

Architecture Flow

  LLM Raw Chunks → Line Buffering → Content Classification → Event Generation → User Stream
      Flux<String>     LineBuffer       Thinking vs JSON      StreamingEvent<T>    Subscription

OperationContextPromptRunner serves as the bridge between existing code and streaming features, enabling seamless transition from traditional blocking operations to reactive streaming
without requiring changes to existing business logic or tool definitions.

Challenge: Minimal Changes to Existing Artifacts

StreamingJacksonOutputConverter

Workflow

  1. LLM streams JSONL → {"name": "item1"}\n{"name": "item2"}
  2. Converter processes → Real-time object creation
  3. Objects emitted → Flux for reactive consumption. possible enhancement - conceal Flux
    4.Tools invoked

Testing Coverage

  • Unit Tests - Streaming capability detection and operations
  • Integration Tests - Tool registration and streaming workflow
  • API Tests - Both traditional and extension function approaches

Remaining Major Tasks:

  • Implement combination of WithExampleConverter format with StreamingJacksonOutputConverter for few-shot examples in streaming
  • Create common PromptFormatBuilder utility to reduce code duplication between JSON and JSONL format instructions
  • Add tests for mixed content, error cases, and edge conditions
  • Incorporate StreamingUtility into Streaming converter
  • Explore Streaming Metadata (from model DB?)
  • Discuss classification as {Object | Thinking |String}. Currently non-classified AS {Object |Thinking} got swallowed
  • Implement backpressure strategies for high-volume streaming
  • Add retry / reconnect logic for transient OpenAI failures
  • clean-up APIs

@igordayen igordayen marked this pull request as draft November 29, 2025 04:40
@igordayen igordayen requested a review from poutsma November 29, 2025 04:58
@igordayen
Copy link
Contributor Author

additional documentation.
streaming-design

@igordayen igordayen marked this pull request as ready for review November 30, 2025 05:59
@igordayen igordayen mentioned this pull request Nov 30, 2025
@igordayen
Copy link
Contributor Author

Please see write-up on introduction of Streaming Capability interface.
streaming-capability.md

Copy link
Contributor

@poutsma poutsma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, I only have a few minor suggestions and one major change that I would like to see: the method name change in StreamPromptRunnerOperations.

That said, I would like to revisit StreamingCapabilities later, once this PR has been merged, as discussed here: embabel/embabel-common#89 (comment)

	rebased one more time;
	generalized LLM Thinking detection;
	added more tests on reactive pattern: {onNext, onError, onComplete...}
	enhanced logic on multi-chuncked JSONL-based objects
rebased to agent-api 0.3.1
synchronized with test-domain
major update to Streaming Chat Client Operations;
addition of LLM IT test, to be executed manually
@sonarqubecloud
Copy link

sonarqubecloud bot commented Dec 4, 2025

Copy link
Contributor

@poutsma poutsma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be merged from my perspective.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants