Skip to content

Conversation

@bogwi
Copy link
Contributor

@bogwi bogwi commented Nov 17, 2025

feat: Add streaming token support to contextwindow for real-time LLM response display while maintaining existing persistence patterns.

Design Principles:

  • Stream tokens as they arrive from providers
  • Persist complete response after stream finishes
  • Maintain backward compatibility with existing Call methods
  • Support all providers (OpenAI, Claude, Gemini)
  • Enable middleware hooks for streaming events

What was implemented

1: Core Interfaces

1.1: Define Streaming Interfaces

File: contextwindow/contextwindow.go

Actions:

  • Add StreamChunk struct with fields:

    • Delta string - token/text delta
    • Done bool - indicates stream completion
    • Metadata map[string]any - provider-specific metadata
    • Error error - streaming errors
  • Add StreamCallback function type:

    • type StreamCallback func(chunk StreamChunk) error
    • Returns error to allow early cancellation
  • Add StreamingCapable interface:

    • CallStreaming(ctx context.Context, inputs []Record, callback StreamCallback) ([]Record, int, error)
    • Returns final events and token count after stream completes
  • Add StreamingOptsCapable interface:

    • CallStreamingWithOpts(ctx context.Context, inputs []Record, opts CallModelOpts, callback StreamCallback) ([]Record, int, error)

Testing:

  • Unit test for StreamChunk serialization
  • Verify interface definitions compile

1.2: Add ContextWindow Streaming Methods

File: contextwindow/contextwindow.go

Actions:

  • Add CallModelStreaming method:

    func (cw *ContextWindow) CallModelStreaming(
        ctx context.Context, 
        callback StreamCallback,
    ) (string, error)
    • Check if model implements StreamingCapable
    • Fall back to buffered Call if not supported
    • Accumulate streamed tokens
    • Persist complete response after stream finishes
    • Return final assembled text
  • Add CallModelStreamingWithOpts method:

    • Same as above but with CallModelOpts parameter

Testing:

  • Mock streaming model test
  • Mock non-streaming model fallback test
  • Verify persistence happens after stream completes

2: Middleware Integration

2.1: Streaming Middleware Interface

File: contextwindow/contextwindow.go

Actions:

  • Extend Middleware interface with optional methods:

    • OnStreamStart(ctx context.Context) error
    • OnStreamChunk(ctx context.Context, chunk StreamChunk) error
    • OnStreamComplete(ctx context.Context, fullText string, tokens int) error
  • Make methods optional via type assertion pattern

  • Add middleware invocation in CallModelStreaming

Testing:

  • Test middleware called in correct order
  • Test middleware error propagation
  • Test optional method handling

3: OpenAI Streaming

3.1: Implement OpenAI Streaming

File: contextwindow/openai_model.go

Actions:

  • Implement CallStreaming method:

    • Convert Records to messages (reuse existing logic)
    • Build tool parameters (reuse existing logic)
    • Create ChatCompletionNewParams with Stream: true
    • Call client.Chat.Completions.NewStreaming(ctx, params)
    • Iterate over stream chunks
    • Accumulate deltas in buffer
    • Invoke callback for each chunk
    • Handle tool calls in streaming mode (buffer until complete)
    • Return final events and token count
  • Implement CallStreamingWithOpts method:

    • Same as above but respect opts.DisableTools
  • Handle streaming tool calls:

    • Buffer tool call deltas
    • Execute tools after complete
    • Stream tool results

Testing:

  • Mock OpenAI streaming API
  • Test delta accumulation
  • Test tool call handling in stream
  • Test error handling mid-stream
  • Test cancellation via context
  • Integration test with real API (optional, env-gated)

3.2: OpenAI Streaming with Threading

File: contextwindow/openai_model.go

Actions:

  • Add CallStreamingWithThreadingAndOpts:
    • Return error for server-side threading (not supported)
    • Fall back to client-side streaming

Testing:

  • Verify fallback behavior

4: Claude Streaming

4.1: Implement Claude Streaming

File: contextwindow/claude_model.go

Actions:

  • Implement CallStreaming method:

    • Convert Records to messages (reuse existing logic)
    • Build system blocks (reuse existing logic)
    • Create MessageNewParams with Stream: true
    • Call client.Messages.NewStreaming(ctx, params)
    • Handle Claude's event-based streaming:
      • message_start - initialize
      • content_block_delta - accumulate text
      • content_block_start - handle tool use start
      • message_delta - handle usage updates
      • message_stop - finalize
    • Invoke callback for text deltas
    • Buffer tool calls until complete
    • Return final events and token count
  • Implement CallStreamingWithOpts:

    • Same as above with opts support

Testing:

  • Mock Claude streaming events
  • Test event type handling
  • Test tool use in streaming
  • Test multi-block responses
  • Integration test (env-gated)

4.2: Claude Streaming with Threading

File: contextwindow/claude_model.go

Actions:

  • Add CallStreamingWithThreadingAndOpts:
    • Always use client-side (Claude has no server-side threading)
    • Call CallStreamingWithOpts internally

Testing:

  • Verify behavior matches non-streaming threading

5: Gemini Streaming

5.1: Implement Gemini Streaming

File: contextwindow/gemini_model.go

Actions:

  • Implement CallStreaming method:

    • Convert Records to Gemini parts
    • Build GenerateContentRequest
    • Call model.GenerateContentStream(ctx, parts...)
    • Iterate over response chunks
    • Accumulate text deltas
    • Invoke callback for each chunk
    • Handle function calls in streaming mode
    • Return final events and token count
  • Implement CallStreamingWithOpts:

    • Same with opts support

Testing:

  • Mock Gemini streaming
  • Test part accumulation
  • Test function call handling
  • Integration test (env-gated)

6: Storage Integration

6.1: Streaming Metadata Storage

File: contextwindow/storage.go

Actions:

  • Add optional streamed flag to records table:

    • Migration: ALTER TABLE records ADD COLUMN streamed BOOLEAN DEFAULT 0
    • Add to Record struct: Streamed bool
    • Update InsertRecord to accept streamed flag
  • Update schema initialization:

    • Add column via addColumnIfNotExists

Testing:

  • Test migration on existing database
  • Test new records have streamed flag
  • Test backward compatibility

6.2: Stream Resume Support

File: contextwindow/storage.go

Actions:

  • Add partial response tracking:
    • partial_response_id TEXT
    • accumulated_tokens INT
  • Enable resume after interruption

Testing:

  • Test partial save
  • Test resume from partial

7: Testing Strategy

7.1: Unit Tests

Files:

  • contextwindow/contextwindow_test.go
  • contextwindow/openai_model_test.go
  • contextwindow/claude_model_test.go
  • contextwindow/gemini_model_test.go

Test Cases:

  • Streaming callback invocation
  • Token accumulation accuracy
  • Tool calls during streaming
  • Error handling mid-stream
  • Context cancellation
  • Fallback to non-streaming
  • Persistence after stream completes
  • Middleware integration
  • Concurrent streaming (thread safety)

Actions:

  • Create mock streaming providers
  • Test each provider separately
  • Test ContextWindow orchestration
  • Benchmark streaming vs non-streaming overhead (optional)

7.2: Integration Tests

File: contextwindow/streaming_integration_test.go

Test Cases:

  • End-to-end streaming with real providers (env-gated)
  • Multi-turn streaming conversations
  • Streaming with tools enabled
  • Streaming with summarization
  • Streaming across context switches

Actions:

  • Add build tag // +build integration
  • Test with actual API keys
  • Measure latency improvements

8: Examples and Documentation

8.1: Create Streaming Example

File: contextwindow/_examples/streaming/main.go

Actions:

  • Basic streaming example:

    callback := func(chunk contextwindow.StreamChunk) error {
        if !chunk.Done {
            fmt.Print(chunk.Delta)
        }
        return nil
    }
    
    response, err := cw.CallModelStreaming(ctx, callback)
  • Advanced example with progress tracking

  • Tool calls during streaming example

8.2: Update Documentation

File: contextwindow/README.md

Actions:

  • Add streaming section after tool calling
  • Document callback pattern
  • Show fallback behavior
  • Document performance characteristics (optional)
  • Note provider-specific behaviors (optional)

File: contextwindow/contextwindow.go (package doc)

Actions:

  • Add streaming usage to package documentation
  • Document StreamChunk structure
  • Document callback error handling

9: Performance Optimization

9.1: Buffer Management

File: contextwindow/openai_model.go, claude_model.go, gemini_model.go

Actions:

  • Use strings.Builder for accumulation
  • Pre-allocate buffers based on expected size
  • Profile memory allocations
  • Minimize callback overhead

Testing (Optional):

  • Benchmark streaming performance
  • Profile memory usage
  • Compare with non-streaming baseline

9.2: Concurrent Streaming Safety

File: contextwindow/contextwindow.go

Actions:

  • Document thread safety of streaming
  • Add mutex if needed for callback coordination
  • Ensure database writes don't block streaming

Testing:

  • Race detector tests
  • Concurrent streaming tests

10: Edge Cases and Error Handling

10.1: Handle Stream Interruptions

Actions:

  • Network failures mid-stream
  • Context cancellation
  • Callback errors
  • Partial response handling
  • Provider-specific error formats

Testing:

  • Test each error scenario
  • Verify cleanup on error
  • Verify database consistency after errors

File Manifest

New Files

  1. contextwindow/_examples/streaming/gemini/main.go
  2. contextwindow/streaming_integration_test.go
  3. contextwindow/concurrent_streaming_test.go
  4. contextwindow/stream_interruption_test.go

Modified Files

  1. contextwindow/contextwindow.go - Core streaming interfaces and methods
  2. contextwindow/openai_model.go - OpenAI streaming implementation
  3. contextwindow/claude_model.go - Claude streaming implementation
  4. contextwindow/gemini_model.go - Gemini streaming implementation
  5. contextwindow/storage.go - Storage schema updates
  6. contextwindow/contextwindow_test.go - Streaming tests
  7. contextwindow/openai_model_test.go - OpenAI streaming tests
  8. contextwindow/claude_model_test.go - Claude streaming tests
  9. contextwindow/gemini_model_test.go - Gemini streaming tests
  10. contextwindow/README.md - Documentation updates

@tqbf , what do you think?

Most of the changes are tests, really lots. Eventually, somewhere on the road, you would want to implement streaming. Can't go without it, I think. Well, this is it - streaming :)

…response display while maintaining existing persistence patterns.

Design Principles:

- Stream tokens as they arrive from providers
- Persist complete response after stream finishes
- Maintain backward compatibility with existing Call methods
- Support all providers (OpenAI, Claude, Gemini)
- Enable middleware hooks for streaming events
@tqbf
Copy link
Contributor

tqbf commented Nov 18, 2025

Whoah, neat. It'll be a day or two before I can read this, but: you beat me to it. Do you have the bit set for me to push directly to your PR branch?

Thanks for this!

@bogwi
Copy link
Contributor Author

bogwi commented Nov 18, 2025

Absolutely. Always set!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants