Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion core/relay/adaptor/anthropic/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ func StreamHandler(
writed bool
)

streamState := NewStreamState()

for scanner.Scan() {
data := scanner.Bytes()
if !render.IsValidSSEData(data) {
Expand All @@ -253,7 +255,7 @@ func StreamHandler(
break
}

response, err := StreamResponse2OpenAI(m, data)
response, err := streamState.StreamResponse2OpenAI(m, data)
if err != nil {
if writed {
log.Errorf("response error: %+v", err)
Expand Down
48 changes: 43 additions & 5 deletions core/relay/adaptor/anthropic/openai.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,42 @@ func batchPatchImage2Base64(ctx context.Context, imageTasks []*relaymodel.Claude
return nil
}

// https://docs.anthropic.com/claude/reference/messages-streaming
func StreamResponse2OpenAI(
// StreamState maintains state during streaming response conversion
type StreamState struct {
// claudeIndexToToolCallIndex maps Claude's content block index to OpenAI tool call index
// Claude's index includes all content blocks (text, thinking, tool_use), but OpenAI only counts tool calls
claudeIndexToToolCallIndex map[int]int
// nextToolCallIndex tracks the next tool call index to assign (0-based)
nextToolCallIndex int
}

func NewStreamState() *StreamState {
return &StreamState{
claudeIndexToToolCallIndex: make(map[int]int),
nextToolCallIndex: 0,
}
}

// getToolCallIndex returns the OpenAI tool call index for a given Claude content block index
// If this is the first time seeing this Claude index for a tool call, assigns a new tool call index
func (s *StreamState) getToolCallIndex(claudeIndex int, isNewToolCall bool) int {
if idx, exists := s.claudeIndexToToolCallIndex[claudeIndex]; exists {
return idx
}

if isNewToolCall {
toolCallIndex := s.nextToolCallIndex
s.claudeIndexToToolCallIndex[claudeIndex] = toolCallIndex
s.nextToolCallIndex++
return toolCallIndex
}

// This shouldn't happen in normal flow, but return a safe default
return 0
}

// StreamResponse2OpenAI converts Claude streaming response to OpenAI format
func (s *StreamState) StreamResponse2OpenAI(
meta *meta.Meta,
respData []byte,
) (*relaymodel.ChatCompletionsStreamResponse, adaptor.Error) {
Expand Down Expand Up @@ -340,8 +374,9 @@ func StreamResponse2OpenAI(
if claudeResponse.ContentBlock != nil {
content = claudeResponse.ContentBlock.Text
if claudeResponse.ContentBlock.Type == toolUseType {
toolCallIndex := s.getToolCallIndex(claudeResponse.Index, true)
tools = append(tools, relaymodel.ToolCall{
Index: claudeResponse.Index,
Index: toolCallIndex,
ID: claudeResponse.ContentBlock.ID,
Type: "function",
Function: relaymodel.Function{
Expand All @@ -354,8 +389,9 @@ func StreamResponse2OpenAI(
if claudeResponse.Delta != nil {
switch claudeResponse.Delta.Type {
case "input_json_delta":
toolCallIndex := s.getToolCallIndex(claudeResponse.Index, false)
tools = append(tools, relaymodel.ToolCall{
Index: claudeResponse.Index,
Index: toolCallIndex,
Type: "function",
Function: relaymodel.Function{
Arguments: claudeResponse.Delta.PartialJSON,
Expand Down Expand Up @@ -514,6 +550,8 @@ func OpenAIStreamHandler(
writed bool
)

streamState := NewStreamState()

for scanner.Scan() {
data := scanner.Bytes()
if !render.IsValidSSEData(data) {
Expand All @@ -525,7 +563,7 @@ func OpenAIStreamHandler(
break
}

response, err := StreamResponse2OpenAI(m, data)
response, err := streamState.StreamResponse2OpenAI(m, data)
if err != nil {
if writed {
log.Errorf("response error: %+v", err)
Expand Down
4 changes: 3 additions & 1 deletion core/relay/adaptor/aws/claude/claude.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,16 @@ func StreamHandler(meta *meta.Meta, c *gin.Context) (model.Usage, adaptor.Error)
writed bool
)

streamState := anthropic.NewStreamState()

log := common.GetLogger(c)

for event := range stream.Events() {
switch v := event.(type) {
case *types.ResponseStreamMemberChunk:
data := v.Value.Bytes

response, err := anthropic.StreamResponse2OpenAI(meta, v.Value.Bytes)
response, err := streamState.StreamResponse2OpenAI(meta, v.Value.Bytes)
if err != nil {
if writed {
log.Errorf("response error: %+v", err)
Expand Down
4 changes: 3 additions & 1 deletion core/relay/adaptor/aws/claude/openai.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,14 @@ func OpenaiStreamHandler(meta *meta.Meta, c *gin.Context) (model.Usage, adaptor.
writed bool
)

streamState := anthropic.NewStreamState()

log := common.GetLogger(c)

for event := range stream.Events() {
switch v := event.(type) {
case *types.ResponseStreamMemberChunk:
response, err := anthropic.StreamResponse2OpenAI(meta, v.Value.Bytes)
response, err := streamState.StreamResponse2OpenAI(meta, v.Value.Bytes)
if err != nil {
if writed {
log.Errorf("response error: %+v", err)
Expand Down
Loading