Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/httpapi/openai/chat/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Handler struct {

type streamLease struct {
Auth *auth.RequestAuth
SessionID string
ExpiresAt time.Time
}

Expand Down
95 changes: 92 additions & 3 deletions internal/httpapi/openai/chat/vercel_prepare_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package chat

import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -64,14 +65,14 @@ func TestVercelInternalSecret(t *testing.T) {

func TestStreamLeaseLifecycle(t *testing.T) {
h := &Handler{}
leaseID := h.holdStreamLease(&auth.RequestAuth{UseConfigToken: false})
leaseID := h.holdStreamLease(&auth.RequestAuth{UseConfigToken: false}, "test-session-id")
if leaseID == "" {
t.Fatalf("expected non-empty lease id")
}
if ok := h.releaseStreamLease(leaseID); !ok {
if ok, _, _ := h.releaseStreamLease(leaseID); !ok {
t.Fatalf("expected lease release success")
}
if ok := h.releaseStreamLease(leaseID); ok {
if ok, _, _ := h.releaseStreamLease(leaseID); ok {
t.Fatalf("expected duplicate release to fail")
}
}
Expand Down Expand Up @@ -141,6 +142,94 @@ func TestHandleVercelStreamPrepareAppliesCurrentInputFile(t *testing.T) {
}
}

type vercelReleaseAutoDeleteDSStub struct {
resp *http.Response
deleteCallCount int
deletedSessionID string
deletedToken string
deleteErr error
}

func (m *vercelReleaseAutoDeleteDSStub) CreateSession(_ context.Context, _ *auth.RequestAuth, _ int) (string, error) {
return "session-id", nil
}

func (m *vercelReleaseAutoDeleteDSStub) GetPow(_ context.Context, _ *auth.RequestAuth, _ int) (string, error) {
return "pow", nil
}

func (m *vercelReleaseAutoDeleteDSStub) UploadFile(_ context.Context, _ *auth.RequestAuth, _ dsclient.UploadFileRequest, _ int) (*dsclient.UploadFileResult, error) {
return &dsclient.UploadFileResult{ID: "file-id", Filename: "file.txt", Bytes: 1, Status: "uploaded"}, nil
}

func (m *vercelReleaseAutoDeleteDSStub) CallCompletion(_ context.Context, _ *auth.RequestAuth, _ map[string]any, _ string, _ int) (*http.Response, error) {
return m.resp, nil
}

func (m *vercelReleaseAutoDeleteDSStub) DeleteSessionForToken(_ context.Context, token string, sessionID string) (*dsclient.DeleteSessionResult, error) {
m.deleteCallCount++
m.deletedSessionID = sessionID
m.deletedToken = token
if m.deleteErr != nil {
return nil, m.deleteErr
}
return &dsclient.DeleteSessionResult{SessionID: sessionID, Success: true}, nil
}

func (m *vercelReleaseAutoDeleteDSStub) DeleteAllSessionsForToken(_ context.Context, _ string) error {
return nil
}

type vercelReleaseAuthStub struct{}

func (a *vercelReleaseAuthStub) Determine(_ *http.Request) (*auth.RequestAuth, error) {
return &auth.RequestAuth{DeepSeekToken: "test-token", AccountID: "test-account"}, nil
}

func (a *vercelReleaseAuthStub) DetermineCaller(_ *http.Request) (*auth.RequestAuth, error) {
return &auth.RequestAuth{DeepSeekToken: "test-token", AccountID: "test-account"}, nil
}

func (a *vercelReleaseAuthStub) Release(_ *auth.RequestAuth) {}

func TestHandleVercelStreamReleaseTriggersAutoDelete(t *testing.T) {
t.Setenv("VERCEL", "1")
t.Setenv("DS2API_VERCEL_INTERNAL_SECRET", "stream-secret")

ds := &vercelReleaseAutoDeleteDSStub{}
h := &Handler{
Store: mockOpenAIConfig{
autoDeleteMode: "single",
},
Auth: &vercelReleaseAuthStub{},
DS: ds,
}

leaseID := h.holdStreamLease(&auth.RequestAuth{DeepSeekToken: "test-token", AccountID: "test-account"}, "session-to-delete")
if leaseID == "" {
t.Fatalf("expected non-empty lease id")
}

reqBody := map[string]any{"lease_id": leaseID}
reqJSON, _ := json.Marshal(reqBody)
req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions?__stream_release=1", strings.NewReader(string(reqJSON)))
req.Header.Set("X-Ds2-Internal-Token", "stream-secret")
req.Header.Set("Content-Type", "application/json")
rec := httptest.NewRecorder()

h.handleVercelStreamRelease(rec, req)

if rec.Code != http.StatusOK {
t.Fatalf("expected 200, got %d body=%s", rec.Code, rec.Body.String())
}
if ds.deleteCallCount != 1 {
t.Fatalf("expected auto delete call count=1, got %d", ds.deleteCallCount)
}
if ds.deletedSessionID != "session-to-delete" {
t.Fatalf("expected deleted session id=session-to-delete, got %q", ds.deletedSessionID)
}
}

func TestHandleVercelStreamPrepareMapsCurrentInputFileManagedAuthFailureTo401(t *testing.T) {
t.Setenv("VERCEL", "1")
t.Setenv("DS2API_VERCEL_INTERNAL_SECRET", "stream-secret")
Expand Down
19 changes: 12 additions & 7 deletions internal/httpapi/openai/chat/vercel_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (h *Handler) handleVercelStreamPrepare(w http.ResponseWriter, r *http.Reque
}

payload := stdReq.CompletionPayload(sessionID)
leaseID := h.holdStreamLease(a)
leaseID := h.holdStreamLease(a, sessionID)
if leaseID == "" {
writeOpenAIError(w, http.StatusInternalServerError, "failed to create stream lease")
return
Expand Down Expand Up @@ -140,10 +140,14 @@ func (h *Handler) handleVercelStreamRelease(w http.ResponseWriter, r *http.Reque
writeOpenAIError(w, http.StatusBadRequest, "lease_id is required")
return
}
if !h.releaseStreamLease(leaseID) {
ok, leaseAuth, sessionID := h.releaseStreamLease(leaseID)
if !ok {
writeOpenAIError(w, http.StatusNotFound, "stream lease not found")
return
}
if sessionID != "" && leaseAuth != nil {
h.autoDeleteRemoteSession(r.Context(), leaseAuth, sessionID)
}
writeJSON(w, http.StatusOK, map[string]any{"success": true})
}

Expand Down Expand Up @@ -216,7 +220,7 @@ func vercelInternalSecret() string {
return "admin"
}

func (h *Handler) holdStreamLease(a *auth.RequestAuth) string {
func (h *Handler) holdStreamLease(a *auth.RequestAuth, sessionID string) string {
if a == nil {
return ""
}
Expand All @@ -234,6 +238,7 @@ func (h *Handler) holdStreamLease(a *auth.RequestAuth) string {
leaseID := newLeaseID()
h.streamLeases[leaseID] = streamLease{
Auth: a,
SessionID: sessionID,
ExpiresAt: now.Add(ttl),
}
h.leaseMu.Unlock()
Expand All @@ -255,10 +260,10 @@ func (h *Handler) lookupStreamLeaseAuth(leaseID string) *auth.RequestAuth {
return lease.Auth
}

func (h *Handler) releaseStreamLease(leaseID string) bool {
func (h *Handler) releaseStreamLease(leaseID string) (bool, *auth.RequestAuth, string) {
leaseID = strings.TrimSpace(leaseID)
if leaseID == "" {
return false
return false, nil, ""
}

h.leaseMu.Lock()
Expand All @@ -271,12 +276,12 @@ func (h *Handler) releaseStreamLease(leaseID string) bool {
h.releaseExpiredAuths(expired)

if !ok {
return false
return false, nil, ""
}
if h.Auth != nil {
h.Auth.Release(lease.Auth)
}
Comment thread
CJackHwang marked this conversation as resolved.
return true
return true, lease.Auth, lease.SessionID
}

func (h *Handler) popExpiredLeasesLocked(now time.Time) []*auth.RequestAuth {
Expand Down
Loading