Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 0 additions & 29 deletions AGENTS.md

This file was deleted.

6 changes: 6 additions & 0 deletions internal/deepseek/client/client_completion.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ import (
trans "ds2api/internal/deepseek/transport"
)

// ThinkingCacheInjector is a function that injects thinking content into request payload
var ThinkingCacheInjector func(payload map[string]any, model string) map[string]any

// ThinkingCacheExtractor is a function that extracts and stores thinking from response
var ThinkingCacheExtractor func(payload map[string]any, model string, thinking string)

func (c *Client) CallCompletion(ctx context.Context, a *auth.RequestAuth, payload map[string]any, powResp string, maxAttempts int) (*http.Response, error) {
if maxAttempts <= 0 {
maxAttempts = c.maxRetries
Expand Down
41 changes: 37 additions & 4 deletions internal/httpapi/claude/handler_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"ds2api/internal/httpapi/requestbody"
"ds2api/internal/promptcompat"
"ds2api/internal/responsehistory"
"ds2api/internal/thinkingcache"

Check failure on line 23 in internal/httpapi/claude/handler_messages.go

View workflow job for this annotation

GitHub Actions / Lint and Refactor Gate

File is not properly formatted (gofmt)
streamengine "ds2api/internal/stream"
"ds2api/internal/translatorcliproxy"
"ds2api/internal/util"
Expand Down Expand Up @@ -70,12 +71,32 @@
writeClaudeError(w, http.StatusBadRequest, "invalid json")
return true
}
// Save original messages for thinking cache (before any modifications)
var originalMessages []any
if msgs, ok := req["messages"].([]any); ok {
originalMessages = msgs
}

norm, err := normalizeClaudeRequest(h.Store, req)
if err != nil {
writeClaudeError(w, http.StatusBadRequest, err.Error())
return true
}
exposeThinking := norm.Standard.Thinking

// Entry point: Apply thinking cache to restore assistant reasoning from previous turns
cacheModel := norm.Standard.ResolvedModel
if cacheModel == "" {
cacheModel = norm.Standard.RequestedModel
}
if len(originalMessages) > 0 {
if injectedMessages, changed := thinkingcache.Apply(originalMessages, cacheModel); changed {
req["messages"] = injectedMessages
norm, _ = normalizeClaudeRequest(h.Store, req)
originalMessages = injectedMessages
}
}

a, err := h.Auth.Determine(r)
if err != nil {
writeClaudeError(w, http.StatusUnauthorized, err.Error())
Expand All @@ -96,7 +117,7 @@
Standard: stdReq,
})
if stdReq.Stream {
h.handleClaudeDirectStream(w, r, a, stdReq, historySession)
h.handleClaudeDirectStream(w, r, a, stdReq, historySession, originalMessages, cacheModel)
return true
}
result, outErr := completionruntime.ExecuteNonStreamWithRetry(r.Context(), h.DS, a, stdReq, completionruntime.Options{
Expand All @@ -113,6 +134,12 @@
if historySession != nil {
historySession.SuccessTurn(http.StatusOK, result.Turn, responsehistory.GenericUsage(result.Turn))
}

// Exit point: Store thinking content for future turns (non-stream)
if thinking := result.Turn.Thinking; thinking != "" {
thinkingcache.Store(originalMessages, cacheModel, thinking)
}

writeJSON(w, http.StatusOK, claudefmt.BuildMessageResponseFromTurn(
fmt.Sprintf("msg_%d", time.Now().UnixNano()),
stdReq.ResponseModel,
Expand All @@ -133,7 +160,7 @@
return history.MapError(err)
}

func (h *Handler) handleClaudeDirectStream(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, stdReq promptcompat.StandardRequest, historySession *responsehistory.Session) {
func (h *Handler) handleClaudeDirectStream(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, stdReq promptcompat.StandardRequest, historySession *responsehistory.Session, originalMessages []any, cacheModel string) {
start, outErr := completionruntime.StartCompletion(r.Context(), h.DS, a, stdReq, completionruntime.Options{
CurrentInputFile: h.Store,
})
Expand All @@ -145,7 +172,7 @@
return
}
streamReq := start.Request
h.handleClaudeStreamRealtime(w, r, start.Response, streamReq.ResponseModel, streamReq.Messages, streamReq.Thinking, streamReq.Search, streamReq.ToolNames, streamReq.ToolsRaw, historySession)
h.handleClaudeStreamRealtimeWithCache(w, r, start.Response, streamReq.ResponseModel, streamReq.Messages, streamReq.Thinking, streamReq.Search, streamReq.ToolNames, streamReq.ToolsRaw, originalMessages, cacheModel, historySession)
}

func (h *Handler) proxyViaOpenAI(w http.ResponseWriter, r *http.Request, store ConfigReader) bool {
Expand Down Expand Up @@ -300,6 +327,10 @@
}

func (h *Handler) handleClaudeStreamRealtime(w http.ResponseWriter, r *http.Request, resp *http.Response, model string, messages []any, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, historySessions ...*responsehistory.Session) {
h.handleClaudeStreamRealtimeWithCache(w, r, resp, model, messages, thinkingEnabled, searchEnabled, toolNames, toolsRaw, nil, "", historySessions...)
}

func (h *Handler) handleClaudeStreamRealtimeWithCache(w http.ResponseWriter, r *http.Request, resp *http.Response, model string, messages []any, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, originalMessages []any, cacheModel string, historySessions ...*responsehistory.Session) {
var historySession *responsehistory.Session
if len(historySessions) > 0 {
historySession = historySessions[0]
Expand All @@ -324,7 +355,7 @@
config.Logger.Warn("[claude_stream] response writer does not support flush; streaming may be buffered")
}

streamRuntime := newClaudeStreamRuntime(
streamRuntime := newClaudeStreamRuntimeWithCache(
w,
rc,
canFlush,
Expand All @@ -337,6 +368,8 @@
toolsRaw,
buildClaudePromptTokenText(messages, thinkingEnabled),
historySession,
originalMessages,
cacheModel,
)
streamRuntime.sendMessageStart()

Expand Down
26 changes: 26 additions & 0 deletions internal/httpapi/claude/stream_runtime_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ type claudeStreamRuntime struct {
ended bool
upstreamErr string
history *responsehistory.Session

// For thinking cache
originalMessages []any
cacheModel string
}

func newClaudeStreamRuntime(
Expand Down Expand Up @@ -84,6 +88,28 @@ func newClaudeStreamRuntime(
}
}

func newClaudeStreamRuntimeWithCache(
w http.ResponseWriter,
rc *http.ResponseController,
canFlush bool,
model string,
messages []any,
thinkingEnabled bool,
searchEnabled bool,
stripReferenceMarkers bool,
toolNames []string,
toolsRaw any,
promptTokenText string,
history *responsehistory.Session,
originalMessages []any,
cacheModel string,
) *claudeStreamRuntime {
s := newClaudeStreamRuntime(w, rc, canFlush, model, messages, thinkingEnabled, searchEnabled, stripReferenceMarkers, toolNames, toolsRaw, promptTokenText, history)
s.originalMessages = originalMessages
s.cacheModel = cacheModel
return s
}

func (s *claudeStreamRuntime) onParsed(parsed sse.LineResult) streamengine.ParsedDecision {
if !parsed.Parsed {
return streamengine.ParsedDecision{}
Expand Down
6 changes: 6 additions & 0 deletions internal/httpapi/claude/stream_runtime_finalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"ds2api/internal/assistantturn"
"ds2api/internal/responsehistory"
"ds2api/internal/sse"
"ds2api/internal/thinkingcache"
"ds2api/internal/toolcall"
"ds2api/internal/toolstream"
"encoding/json"
Expand Down Expand Up @@ -186,6 +187,11 @@ func (s *claudeStreamRuntime) finalize(stopReason string) {
)
}

// Exit point: Store thinking content for future turns (stream)
if thinking := turn.Thinking; thinking != "" {
thinkingcache.Store(s.originalMessages, s.cacheModel, thinking)
}

s.send("message_delta", map[string]any{
"type": "message_delta",
"delta": map[string]any{
Expand Down
Loading
Loading