Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion cmd/msgvault/cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,13 +295,16 @@ func runServe(cmd *cobra.Command, args []string) error {
return nil
}

// storeAPIAdapter adapts store.Store to api.MessageStore.
// storeAPIAdapter adapts store.Store to the API store interfaces.
// Since api.APIMessage, api.StoreStats, etc. are type aliases for store types,
// the adapter methods are simple pass-throughs with no conversion needed.
type storeAPIAdapter struct {
store *store.Store
}

var _ api.MessageStore = (*storeAPIAdapter)(nil)
var _ api.SourceStatusStore = (*storeAPIAdapter)(nil)

func (a *storeAPIAdapter) GetStats() (*api.StoreStats, error) {
return a.store.GetStats()
}
Expand All @@ -326,6 +329,22 @@ func (a *storeAPIAdapter) SearchMessagesQuery(q *search.Query, offset, limit int
return a.store.SearchMessagesQuery(q, offset, limit)
}

func (a *storeAPIAdapter) ListSources(sourceType string) ([]*store.Source, error) {
return a.store.ListSources(sourceType)
}

func (a *storeAPIAdapter) GetActiveSync(sourceID int64) (*store.SyncRun, error) {
return a.store.GetActiveSync(sourceID)
}

func (a *storeAPIAdapter) GetLatestSync(sourceID int64) (*store.SyncRun, error) {
return a.store.GetLatestSync(sourceID)
}

func (a *storeAPIAdapter) GetLastSuccessfulSync(sourceID int64) (*store.SyncRun, error) {
return a.store.GetLastSuccessfulSync(sourceID)
}

// schedulerAdapter adapts scheduler.Scheduler to api.SyncScheduler.
// Since api.AccountStatus is a type alias for scheduler.AccountStatus,
// the adapter methods are simple pass-throughs.
Expand Down
64 changes: 64 additions & 0 deletions cmd/msgvault/cmd/serve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@

import (
"context"
"encoding/json"
"io"
"log/slog"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strings"
"testing"

assertpkg "github.com/stretchr/testify/assert"
requirepkg "github.com/stretchr/testify/require"
"go.kenn.io/msgvault/internal/api"
"go.kenn.io/msgvault/internal/config"
imaplib "go.kenn.io/msgvault/internal/imap"
"go.kenn.io/msgvault/internal/oauth"
Expand Down Expand Up @@ -113,6 +119,64 @@
assertpkg.Empty(t, scheduled, "expected no scheduled accounts")
}

func TestStoreAPIAdapterServesSourceStatus(t *testing.T) {
require := requirepkg.New(t)
assert := assertpkg.New(t)
tmpDir := t.TempDir()

s, err := store.Open(filepath.Join(tmpDir, "msgvault.db"))
require.NoError(err, "open store")
defer func() { _ = s.Close() }()
require.NoError(s.InitSchema(), "init schema")

source, err := s.GetOrCreateSource("gmail", "alice@example.com")
require.NoError(err, "create source")
require.NoError(s.UpdateSourceDisplayName(source.ID, "Alice"), "set display name")
require.NoError(s.UpdateSourceSyncCursor(source.ID, "history-1"), "set sync cursor")

completedID, err := s.StartSync(source.ID, "full")
require.NoError(err, "start sync")
require.NoError(s.UpdateSyncCheckpoint(completedID, &store.Checkpoint{
MessagesProcessed: 3,
MessagesAdded: 2,
MessagesUpdated: 1,
}), "update checkpoint")
require.NoError(s.CompleteSync(completedID, "history-2"), "complete sync")

adapter := &storeAPIAdapter{store: s}
srv := api.NewServer(
&config.Config{Server: config.ServerConfig{APIPort: 8080}},
adapter,
nil,
slog.New(slog.NewTextHandler(io.Discard, nil)),

Check failure on line 151 in cmd/msgvault/cmd/serve_test.go

View workflow job for this annotation

GitHub Actions / test

use slog.DiscardHandler instead (sloglint)
)

req := httptest.NewRequest(http.MethodGet, "/api/v1/sources/status?source_type=gmail", nil)
w := httptest.NewRecorder()
srv.Router().ServeHTTP(w, req)

require.Equal(http.StatusOK, w.Code, "body: %s", w.Body.String())

var resp api.SourceStatusResponse
require.NoError(json.NewDecoder(w.Body).Decode(&resp), "decode response")
require.Len(resp.Sources, 1, "sources")

got := resp.Sources[0]
assert.Equal(source.ID, got.ID, "ID")
assert.Equal("gmail", got.SourceType, "SourceType")
assert.Equal("alice@example.com", got.Identifier, "Identifier")
require.NotNil(got.DisplayName, "DisplayName")
assert.Equal("Alice", *got.DisplayName, "DisplayName")
assert.Nil(got.ActiveSync, "ActiveSync")
require.NotNil(got.LatestSync, "LatestSync")
assert.Equal(completedID, got.LatestSync.ID, "LatestSync.ID")
require.NotNil(got.LastSuccessfulSync, "LastSuccessfulSync")
assert.Equal(completedID, got.LastSuccessfulSync.ID, "LastSuccessfulSync.ID")
assert.Equal(store.SyncStatusCompleted, got.LastSuccessfulSync.Status, "LastSuccessfulSync.Status")
require.NotNil(got.LastSuccessfulSync.CursorAfter, "LastSuccessfulSync.CursorAfter")
assert.Equal("history-2", *got.LastSuccessfulSync.CursorAfter, "LastSuccessfulSync.CursorAfter")
}

// TestSetupVectorFeatures_Disabled verifies that when
// cfg.Vector.Enabled is false, setupVectorFeatures returns (nil, nil)
// regardless of build tag. Runs under both tagged and untagged builds.
Expand Down
146 changes: 146 additions & 0 deletions internal/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,40 @@
Enabled bool `json:"enabled"`
}

// SourceStatusResponse represents source sync status for all matching sources.
type SourceStatusResponse struct {
Sources []SourceStatus `json:"sources"`
}

// SourceStatus represents one source and its read-only sync status.
type SourceStatus struct {
ID int64 `json:"id"`
SourceType string `json:"source_type"`
Identifier string `json:"identifier"`
DisplayName *string `json:"display_name"`
LastSyncAt *string `json:"last_sync_at"`
UpdatedAt string `json:"updated_at"`
ActiveSync *SyncRunStatus `json:"active_sync"`
LatestSync *SyncRunStatus `json:"latest_sync"`
LastSuccessfulSync *SyncRunStatus `json:"last_successful_sync"`
}

// SyncRunStatus represents the API-visible details for a sync run.
type SyncRunStatus struct {
ID int64 `json:"id"`
SourceID int64 `json:"source_id"`
StartedAt string `json:"started_at"`
CompletedAt *string `json:"completed_at"`
Status string `json:"status"`
MessagesProcessed int64 `json:"messages_processed"`
MessagesAdded int64 `json:"messages_added"`
MessagesUpdated int64 `json:"messages_updated"`
ErrorsCount int64 `json:"errors_count"`
ErrorMessage *string `json:"error_message"`
CursorBefore *string `json:"cursor_before"`
CursorAfter *string `json:"cursor_after"`
}

// SchedulerStatusResponse represents scheduler status.
type SchedulerStatusResponse struct {
Running bool `json:"running"`
Expand Down Expand Up @@ -177,6 +211,15 @@
writeJSON(w, status, ErrorResponse{Error: err, Message: message})
}

func stringPtr(value string) *string {

Check failure on line 214 in internal/api/handlers.go

View workflow job for this annotation

GitHub Actions / test

newexpr: stringPtr can be an inlinable wrapper around new(expr) (modernize)
return &value
}

func nullableTimePtr(value time.Time) *string {
formatted := value.UTC().Format(time.RFC3339)
return &formatted
}

// messageDetailFromQuery builds a MessageDetail response from a query-engine
// MessageDetail, formatting addresses the same way the store path does and
// emitting body_html alongside body when both are present.
Expand Down Expand Up @@ -683,6 +726,109 @@
})
}

// handleSourceStatus returns read-only sync status for all matching sources.
func (s *Server) handleSourceStatus(w http.ResponseWriter, r *http.Request) {
statusStore, ok := s.store.(SourceStatusStore)
if s.store == nil || !ok {
writeError(w, http.StatusServiceUnavailable, "store_unavailable", "Database not available")
return
}

sourceType := r.URL.Query().Get("source_type")
sources, err := statusStore.ListSources(sourceType)
if err != nil {
s.logger.Error("failed to list sources for status",
"source_type", sourceType,
"error", err,
)
writeError(w, http.StatusInternalServerError, "internal_error", "Failed to retrieve source status")
return
}

statuses := make([]SourceStatus, 0, len(sources))
for _, source := range sources {
status, err := s.sourceStatus(statusStore, source)
if err != nil {
s.logger.Error("failed to build source sync status",
"source_id", source.ID,
"source_type", source.SourceType,
"identifier", source.Identifier,
"error", err,
)
writeError(w, http.StatusInternalServerError, "internal_error", "Failed to retrieve source status")
return
}
statuses = append(statuses, status)
}

writeJSON(w, http.StatusOK, SourceStatusResponse{Sources: statuses})
}

func (s *Server) sourceStatus(statusStore SourceStatusStore, source *store.Source) (SourceStatus, error) {
status := SourceStatus{
ID: source.ID,
SourceType: source.SourceType,
Identifier: source.Identifier,
UpdatedAt: source.UpdatedAt.UTC().Format(time.RFC3339),
}
if source.DisplayName.Valid {
status.DisplayName = stringPtr(source.DisplayName.String)

Check failure on line 775 in internal/api/handlers.go

View workflow job for this annotation

GitHub Actions / test

newexpr: call of stringPtr(x) can be simplified to new(x) (modernize)
}
if source.LastSyncAt.Valid {
status.LastSyncAt = nullableTimePtr(source.LastSyncAt.Time)
}

active, err := statusStore.GetActiveSync(source.ID)
if err != nil && !errors.Is(err, store.ErrSyncRunNotFound) {
return SourceStatus{}, fmt.Errorf("get active sync: %w", err)
}
status.ActiveSync = syncRunStatus(active)

latest, err := statusStore.GetLatestSync(source.ID)
if err != nil && !errors.Is(err, store.ErrSyncRunNotFound) {
return SourceStatus{}, fmt.Errorf("get latest sync: %w", err)
}
status.LatestSync = syncRunStatus(latest)

lastSuccessful, err := statusStore.GetLastSuccessfulSync(source.ID)
if err != nil && !errors.Is(err, store.ErrSyncRunNotFound) {
return SourceStatus{}, fmt.Errorf("get last successful sync: %w", err)
}
status.LastSuccessfulSync = syncRunStatus(lastSuccessful)

return status, nil
}

func syncRunStatus(run *store.SyncRun) *SyncRunStatus {
if run == nil {
return nil
}

status := &SyncRunStatus{
ID: run.ID,
SourceID: run.SourceID,
StartedAt: run.StartedAt.UTC().Format(time.RFC3339),
Status: run.Status,
MessagesProcessed: run.MessagesProcessed,
MessagesAdded: run.MessagesAdded,
MessagesUpdated: run.MessagesUpdated,
ErrorsCount: run.ErrorsCount,
}
if run.CompletedAt.Valid {
status.CompletedAt = nullableTimePtr(run.CompletedAt.Time)
}
if run.ErrorMessage.Valid {
status.ErrorMessage = stringPtr(run.ErrorMessage.String)

Check failure on line 821 in internal/api/handlers.go

View workflow job for this annotation

GitHub Actions / test

newexpr: call of stringPtr(x) can be simplified to new(x) (modernize)
}
if run.CursorBefore.Valid {
status.CursorBefore = stringPtr(run.CursorBefore.String)

Check failure on line 824 in internal/api/handlers.go

View workflow job for this annotation

GitHub Actions / test

newexpr: call of stringPtr(x) can be simplified to new(x) (modernize)
}
if run.CursorAfter.Valid {
status.CursorAfter = stringPtr(run.CursorAfter.String)
}
return status
}

// handleTriggerSync manually triggers a sync for an account.
func (s *Server) handleTriggerSync(w http.ResponseWriter, r *http.Request) {
if s.scheduler == nil {
Expand Down
89 changes: 89 additions & 0 deletions internal/api/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"go.kenn.io/msgvault/internal/query"
"go.kenn.io/msgvault/internal/query/querytest"
"go.kenn.io/msgvault/internal/remote"
"go.kenn.io/msgvault/internal/store"
"go.kenn.io/msgvault/internal/testutil"
"go.kenn.io/msgvault/internal/vector"
"go.kenn.io/msgvault/internal/vector/hybrid"
)
Expand Down Expand Up @@ -160,6 +162,93 @@ func TestHandleListMessagesPagination(t *testing.T) {
assert.InDelta(float64(10), resp["page_size"], 1e-9, "page_size")
}

func TestHandleSourceStatus(t *testing.T) {
require := requirepkg.New(t)
assert := assertpkg.New(t)
st := testutil.NewTestStore(t)
sched := newMockScheduler()
srv := NewServer(&config.Config{Server: config.ServerConfig{APIPort: 8080}}, st, sched, testLogger())

gmail, err := st.GetOrCreateSource("gmail", "alice@example.com")
require.NoError(err, "GetOrCreateSource gmail")
require.NoError(st.UpdateSourceDisplayName(gmail.ID, "Alice"), "UpdateSourceDisplayName")
require.NoError(st.UpdateSourceSyncCursor(gmail.ID, "history-1"), "UpdateSourceSyncCursor")

completedID, err := st.StartSync(gmail.ID, "full")
require.NoError(err, "StartSync completed")
require.NoError(st.UpdateSyncCheckpoint(completedID, &store.Checkpoint{
PageToken: "page-1",
MessagesProcessed: 10,
MessagesAdded: 8,
MessagesUpdated: 2,
}), "UpdateSyncCheckpoint")
require.NoError(st.CompleteSync(completedID, "history-2"), "CompleteSync")

runningID, err := st.StartSync(gmail.ID, "incremental")
require.NoError(err, "StartSync running")

_, err = st.GetOrCreateSource("imap", "imaps://mail.example.com/alice")
require.NoError(err, "GetOrCreateSource imap")

req := httptest.NewRequest(http.MethodGet, "/api/v1/sources/status?source_type=gmail", nil)
w := httptest.NewRecorder()
srv.Router().ServeHTTP(w, req)

assert.Equal(http.StatusOK, w.Code, "status")

var resp SourceStatusResponse
require.NoError(json.NewDecoder(w.Body).Decode(&resp), "decode response")
require.Len(resp.Sources, 1, "sources")

got := resp.Sources[0]
assert.Equal(gmail.ID, got.ID, "ID")
assert.Equal("gmail", got.SourceType, "SourceType")
assert.Equal("alice@example.com", got.Identifier, "Identifier")
require.NotNil(got.DisplayName, "DisplayName")
assert.Equal("Alice", *got.DisplayName, "DisplayName")
require.NotNil(got.LastSyncAt, "LastSyncAt")
assert.NotEmpty(*got.LastSyncAt, "LastSyncAt")
assert.NotEmpty(got.UpdatedAt, "UpdatedAt")

require.NotNil(got.ActiveSync, "ActiveSync")
assert.Equal(runningID, got.ActiveSync.ID, "ActiveSync.ID")
assert.Equal(store.SyncStatusRunning, got.ActiveSync.Status, "ActiveSync.Status")

require.NotNil(got.LatestSync, "LatestSync")
assert.Equal(runningID, got.LatestSync.ID, "LatestSync.ID")

require.NotNil(got.LastSuccessfulSync, "LastSuccessfulSync")
assert.Equal(completedID, got.LastSuccessfulSync.ID, "LastSuccessfulSync.ID")
assert.Equal(store.SyncStatusCompleted, got.LastSuccessfulSync.Status, "LastSuccessfulSync.Status")
assert.Equal(int64(10), got.LastSuccessfulSync.MessagesProcessed, "LastSuccessfulSync.MessagesProcessed")
require.NotNil(got.LastSuccessfulSync.CursorAfter, "LastSuccessfulSync.CursorAfter")
assert.Equal("history-2", *got.LastSuccessfulSync.CursorAfter, "LastSuccessfulSync.CursorAfter")
}

func TestHandleSourceStatusNoSyncRuns(t *testing.T) {
require := requirepkg.New(t)
assert := assertpkg.New(t)
st := testutil.NewTestStore(t)
srv := NewServer(&config.Config{Server: config.ServerConfig{APIPort: 8080}}, st, nil, testLogger())

source, err := st.GetOrCreateSource("gmail", "empty@example.com")
require.NoError(err, "GetOrCreateSource")

req := httptest.NewRequest(http.MethodGet, "/api/v1/sources/status", nil)
w := httptest.NewRecorder()
srv.Router().ServeHTTP(w, req)

assert.Equal(http.StatusOK, w.Code, "status")

var resp SourceStatusResponse
require.NoError(json.NewDecoder(w.Body).Decode(&resp), "decode response")
require.Len(resp.Sources, 1, "sources")
assert.Equal(source.ID, resp.Sources[0].ID, "ID")
assert.Nil(resp.Sources[0].ActiveSync, "ActiveSync")
assert.Nil(resp.Sources[0].LatestSync, "LatestSync")
assert.Nil(resp.Sources[0].LastSuccessfulSync, "LastSuccessfulSync")
}

func TestHandleGetMessage(t *testing.T) {
assert := assertpkg.New(t)
srv, _ := newTestServerWithMockStore(t)
Expand Down
Loading
Loading