diff --git a/.claude/memory.md b/.claude/memory.md index 4e50acc1be..051cb601ef 100644 --- a/.claude/memory.md +++ b/.claude/memory.md @@ -300,3 +300,10 @@ Quick reference for anyone starting with Claude on this project. Updated by the - **Upstream `main` has 5 Vitest failures and 4 TypeScript compile errors** — Caused by missing iOS experimental dependencies: `@noble/ciphers/chacha`, `@noble/ciphers/webcrypto`, `qrcode.react`, `@tauri-apps/plugin-barcode-scanner`. Breaks `pnpm compile`, `pnpm build`, `pnpm test:coverage` on a clean checkout. Always verify by stashing changes and running checks on the base branch before blaming your PR. - **`cargo fmt` must run after codecrusher** — codecrusher does not reliably produce `cargo fmt`-clean Rust. Always run `cargo fmt --manifest-path Cargo.toml` after codecrusher finishes and before committing. + +## Memory Source Sync Indicators (Issue #3295) + +- **`MemorySyncStageChanged` carries two distinct ids** — `connection_id` is the ingest-pipeline document_id (identity for dedup/audit); `source_id: Option` is the memory-source row id used by the UI to match per-source progress. Never conflate them. Frontend (`MemorySourcesRegistry.tsx`) matches on `source_id ?? connection_id` for backward compat. +- **Chunk source_id encoding for mem-src syncs** — folder/RSS/web-page chunks use `mem_src::` (`memory_sources/sync.rs`). The `` can contain colons (URLs), so extract `` by splitting on the **first** colon after the `mem_src:` prefix — use `find(':')`, not `rfind`. See `extract_mem_src_id` in `src/openhuman/memory/sync.rs`. +- **`MemorySyncStageBridge` re-emits stage events from ingestion events** — In `src/openhuman/memory/sync.rs`, converts `DocumentCanonicalized`/`MemoryIngestionStarted` into `MemorySyncStageChanged` with a populated `source_id`. For non-mem-src syncs (channel providers), `source_id` stays `None` — intentional; don't force a value. +- **Disk exhaustion from e2e test compilation** — `pnpm test:rust` compiles heavy integration binaries and can fill disk (`ld: errno=28 No space left on device`). To validate domain logic only, use `cargo test -p openhuman --lib -- "memory::sync"` which skips integration test binaries. diff --git a/app/src/components/intelligence/MemorySourcesRegistry.tsx b/app/src/components/intelligence/MemorySourcesRegistry.tsx index 988a6e0f0b..183a6650af 100644 --- a/app/src/components/intelligence/MemorySourcesRegistry.tsx +++ b/app/src/components/intelligence/MemorySourcesRegistry.tsx @@ -38,13 +38,42 @@ interface SyncProgress { percent: number | null; } -function parseSyncProgress(detail: string | null): number | null { - if (!detail) return null; - const match = detail.match(/^(\d+)\/(\d+)\s/); - if (!match) return null; - const current = parseInt(match[1], 10); - const total = parseInt(match[2], 10); - return total > 0 ? Math.round((current / total) * 100) : null; +/** + * Per-stage fallback percentages so the progress bar always advances even + * when no numeric "N/M" ratio is present in the detail string (RC#4, #3295). + */ +export const STAGE_FALLBACK_PERCENT: Record = { + requested: 2, + fetching: 5, + stored: 15, + queued: 25, + ingesting: 40, + completed: 100, +}; + +/** + * Parse a sync progress detail string into a 0–100 percent. + * + * - Recognises "N/M ..." numeric patterns and returns N/M as a ratio. + * - Falls back to the per-stage baseline when no ratio is present rather + * than returning a bogus number (RC#4, issue #3295). + * - Returns `null` when both approaches are unavailable (no stage either). + */ +export function parseSyncProgress(detail: string | null, stage?: string): number | null { + // Try the numeric "N/M ..." ratio first. + if (detail) { + const match = detail.match(/^(\d+)\/(\d+)[\s/]/); + if (match) { + const current = parseInt(match[1], 10); + const total = parseInt(match[2], 10); + if (total > 0) return Math.round((current / total) * 100); + } + } + // Fall back to the per-stage baseline percentage. + if (stage && stage in STAGE_FALLBACK_PERCENT) { + return STAGE_FALLBACK_PERCENT[stage]; + } + return null; } export function MemorySourcesRegistry({ @@ -56,7 +85,9 @@ export function MemorySourcesRegistry({ const [statuses, setStatuses] = useState([]); const [loading, setLoading] = useState(true); const [dialogOpen, setDialogOpen] = useState(false); - const [syncingId, setSyncingId] = useState(null); + // RC#1 (#3295): use a Set so multiple sources can show "syncing" concurrently. + // Set state is always replaced with a new Set to trigger re-renders. + const [syncingIds, setSyncingIds] = useState>(new Set()); const [buildingId, setBuildingId] = useState(null); const [syncProgress, setSyncProgress] = useState>(new Map()); @@ -64,31 +95,59 @@ export function MemorySourcesRegistry({ const handler = (e: Event) => { const data = (e as CustomEvent).detail as { stage?: string; - connection_id?: string; + /** Originating memory-source id (RC#2, #3295). Preferred over connection_id. */ + source_id?: string | null; + /** Legacy: document/connection id. Still present for backward compat. */ + connection_id?: string | null; detail?: string; } | null; - if (!data?.connection_id) return; - const sourceId = data.connection_id; - const stage = data.stage ?? ''; + + // RC#2 (#3295): prefer source_id when present; fall back to connection_id for + // backward compat with older core versions that don't emit source_id yet. + const rowId = data?.source_id ?? data?.connection_id; + if (!rowId) return; + + const stage = data?.stage ?? ''; + + console.debug( + `[ui-flow][memory-sync] stage=${stage} rowId=${rowId} source_id=${data?.source_id ?? 'absent'} connection_id=${data?.connection_id ?? 'absent'}` + ); if (stage === 'completed' || stage === 'failed') { setSyncProgress(prev => { const next = new Map(prev); - next.delete(sourceId); + next.delete(rowId); + return next; + }); + // RC#1: immutable Set update — remove just this source, keep others syncing. + setSyncingIds(prev => { + const next = new Set(prev); + next.delete(rowId); return next; }); - setSyncingId(prev => (prev === sourceId ? null : prev)); return; } - const percent = parseSyncProgress(data.detail ?? null); + const percent = parseSyncProgress(data?.detail ?? null, stage); setSyncProgress(prev => { const next = new Map(prev); - next.set(sourceId, { stage, detail: data.detail ?? null, percent }); + next.set(rowId, { stage, detail: data?.detail ?? null, percent }); return next; }); - if (stage === 'requested' || stage === 'fetching' || stage === 'ingesting') { - setSyncingId(sourceId); + // RC#1: ADD this source id to the set (immutable update). + if ( + stage === 'requested' || + stage === 'fetching' || + stage === 'stored' || + stage === 'queued' || + stage === 'ingesting' + ) { + setSyncingIds(prev => { + if (prev.has(rowId)) return prev; // no change — avoid re-render + const next = new Set(prev); + next.add(rowId); + return next; + }); } }; window.addEventListener('openhuman:memory-sync-stage', handler); @@ -109,6 +168,11 @@ export function MemorySourcesRegistry({ ]); setSources(list); setStatuses(stats); + // RC#5 (#3295): The 5s poll is the safety net for missed completed/failed events. + // If a source is in syncingIds but the poll shows it's no longer active (no + // in-progress status indicator from the server), we clear it here. In practice + // the event stream covers this; on remount the state rehydrates within ~5s via poll. + // No new RPC needed — reconciliation is best-effort and relies on the existing poll. } finally { setLoading(false); } @@ -167,7 +231,14 @@ export function MemorySourcesRegistry({ const handleSync = useCallback( async (source: MemorySourceEntry) => { - setSyncingId(source.id); + // RC#1 (#3295): add immediately on click — event will also fire, but this + // ensures the row lights up before the first sync-stage event arrives. + setSyncingIds(prev => { + const next = new Set(prev); + next.add(source.id); + return next; + }); + console.debug(`[ui-flow][memory-sync] manual sync triggered source_id=${source.id}`); try { await syncMemorySource(source.id); onToast?.({ @@ -183,7 +254,13 @@ export function MemorySourcesRegistry({ message: err instanceof Error ? err.message : String(err), }); } finally { - setSyncingId(prev => (prev === source.id ? null : prev)); + // Remove on RPC completion. The completed/failed stage event will also + // clear it, but this handles the case where the event is missed. + setSyncingIds(prev => { + const next = new Set(prev); + next.delete(source.id); + return next; + }); } }, [onToast, refresh, t] @@ -253,7 +330,7 @@ export function MemorySourcesRegistry({ key={source.id} source={source} status={statusById.get(source.id) ?? null} - isSyncing={syncingId === source.id} + isSyncing={syncingIds.has(source.id)} isBuilding={buildingId === source.id} progress={syncProgress.get(source.id) ?? null} onToggle={handleToggle} @@ -347,7 +424,7 @@ function SourceRow({
diff --git a/app/src/components/intelligence/__tests__/MemorySourcesRegistry.test.tsx b/app/src/components/intelligence/__tests__/MemorySourcesRegistry.test.tsx new file mode 100644 index 0000000000..3916bd62d0 --- /dev/null +++ b/app/src/components/intelligence/__tests__/MemorySourcesRegistry.test.tsx @@ -0,0 +1,315 @@ +/** + * Tests for MemorySourcesRegistry (issue #3295): + * + * RC#1 — concurrent syncs: multiple sources can show "syncing" simultaneously. + * RC#2 — source_id matching: events matched by source_id (preferred) or + * connection_id (fallback) for backward compat. + * RC#4 — tolerant parseSyncProgress: numeric ratio, stage fallback, and + * indeterminate (null) cases. + */ +import { act, screen, waitFor } from '@testing-library/react'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +import { renderWithProviders } from '../../../test/test-utils'; +import { MemorySourcesRegistry, parseSyncProgress } from '../MemorySourcesRegistry'; + +// ── i18n mock (returns key as the translation) ──────────────────────────────── +vi.mock('../../../lib/i18n/I18nContext', () => ({ useT: () => ({ t: (key: string) => key }) })); + +// ── memorySourcesService mock ───────────────────────────────────────────────── +vi.mock('../../../services/memorySourcesService', () => ({ + listMemorySources: vi.fn().mockResolvedValue([]), + memorySourcesStatusList: vi.fn().mockResolvedValue([]), + syncMemorySource: vi.fn().mockResolvedValue(undefined), + removeMemorySource: vi.fn().mockResolvedValue(undefined), + updateMemorySource: vi + .fn() + .mockImplementation((id: string, patch: Record) => + Promise.resolve({ id, kind: 'folder', label: 'Test', enabled: true, ...patch }) + ), + SOURCE_KIND_ICONS: { + folder: 'F', + composio: 'C', + github_repo: 'G', + rss_feed: 'R', + web_page: 'W', + twitter_query: 'T', + }, + SOURCE_KIND_LABEL_KEYS: { + folder: 'memorySources.kind.folder', + composio: 'memorySources.kind.composio', + github_repo: 'memorySources.kind.github_repo', + rss_feed: 'memorySources.kind.rss_feed', + web_page: 'memorySources.kind.web_page', + twitter_query: 'memorySources.kind.twitter_query', + }, +})); + +// ── tauriCommands mock ──────────────────────────────────────────────────────── +vi.mock('../../../utils/tauriCommands/memoryTree', () => ({ + memoryTreeFlushSource: vi.fn().mockResolvedValue({ seals_fired: 0 }), +})); + +// ── helpers ─────────────────────────────────────────────────────────────────── + +function makeSyncStageEvent(detail: { + stage: string; + source_id?: string | null; + connection_id?: string | null; + detail?: string; +}): CustomEvent { + return new CustomEvent('openhuman:memory-sync-stage', { detail }); +} + +function makeSource(id: string) { + return { id, kind: 'folder' as const, label: `Source ${id}`, enabled: true }; +} + +// ── parseSyncProgress unit tests ────────────────────────────────────────────── + +describe('parseSyncProgress', () => { + it('returns ratio for "N/M ..." numeric pattern', () => { + expect(parseSyncProgress('5/10 processed', 'ingesting')).toBe(50); + expect(parseSyncProgress('3/12 docs', 'fetching')).toBe(25); + expect(parseSyncProgress('1/1 done', 'ingesting')).toBe(100); + }); + + it('returns stage fallback when no numeric ratio is present', () => { + expect(parseSyncProgress('queue_depth=3', 'ingesting')).toBe(40); + expect(parseSyncProgress('listing items', 'fetching')).toBe(5); + expect(parseSyncProgress(null, 'requested')).toBe(2); + expect(parseSyncProgress('canonicalized 3 chunks', 'stored')).toBe(15); + expect(parseSyncProgress('queued chunk extraction', 'queued')).toBe(25); + }); + + it('returns 100 for completed stage', () => { + expect(parseSyncProgress(null, 'completed')).toBe(100); + expect(parseSyncProgress('ingested 5 item(s)', 'completed')).toBe(100); + }); + + it('returns null when no ratio and no recognized stage', () => { + expect(parseSyncProgress('some detail', 'unknown_stage')).toBeNull(); + expect(parseSyncProgress(null, undefined)).toBeNull(); + expect(parseSyncProgress(null)).toBeNull(); + }); + + it('handles non-ratio numeric strings gracefully', () => { + // "N discovered" — no slash — should use stage fallback, not parse as ratio + expect(parseSyncProgress('3 discovered', 'stored')).toBe(15); + // "N/0 ..." — divide by zero guard + expect(parseSyncProgress('5/0 items', 'fetching')).toBe(5); // falls through to stage fallback + }); +}); + +// ── MemorySourcesRegistry integration tests ─────────────────────────────────── + +describe('MemorySourcesRegistry', () => { + // Expose mock so tests can control what listMemorySources returns. + let listMemorySources: ReturnType; + let memorySourcesStatusList: ReturnType; + + beforeEach(async () => { + const svc = await import('../../../services/memorySourcesService'); + listMemorySources = svc.listMemorySources as ReturnType; + memorySourcesStatusList = svc.memorySourcesStatusList as ReturnType; + }); + + afterEach(() => { + vi.clearAllMocks(); + }); + + it('shows two sources as syncing when two concurrent sync-stage events arrive (RC#1)', async () => { + const sources = [makeSource('src-alpha'), makeSource('src-beta')]; + listMemorySources.mockResolvedValue(sources); + memorySourcesStatusList.mockResolvedValue([]); + + renderWithProviders(); + + // Wait for sources to render. + await waitFor(() => { + expect(screen.getByText('Source src-alpha')).toBeInTheDocument(); + expect(screen.getByText('Source src-beta')).toBeInTheDocument(); + }); + + // Dispatch two concurrent "requested" events with different source_ids. + act(() => { + window.dispatchEvent( + makeSyncStageEvent({ + stage: 'requested', + source_id: 'src-alpha', + connection_id: 'src-alpha', + }) + ); + window.dispatchEvent( + makeSyncStageEvent({ stage: 'requested', source_id: 'src-beta', connection_id: 'src-beta' }) + ); + }); + + // Both rows should show the syncing spinner/text (sync.syncing key → "sync.syncing"). + await waitFor(() => { + const syncingButtons = screen.getAllByText('sync.syncing'); + expect(syncingButtons).toHaveLength(2); + }); + }); + + it('clears only one source when completed, leaving the other syncing (RC#1)', async () => { + const sources = [makeSource('src-alpha'), makeSource('src-beta')]; + listMemorySources.mockResolvedValue(sources); + memorySourcesStatusList.mockResolvedValue([]); + + renderWithProviders(); + + await waitFor(() => { + expect(screen.getByText('Source src-alpha')).toBeInTheDocument(); + }); + + // Both start syncing. + act(() => { + window.dispatchEvent( + makeSyncStageEvent({ + stage: 'requested', + source_id: 'src-alpha', + connection_id: 'src-alpha', + }) + ); + window.dispatchEvent( + makeSyncStageEvent({ stage: 'requested', source_id: 'src-beta', connection_id: 'src-beta' }) + ); + }); + + await waitFor(() => { + expect(screen.getAllByText('sync.syncing')).toHaveLength(2); + }); + + // Complete src-alpha — only src-beta should remain syncing. + act(() => { + window.dispatchEvent( + makeSyncStageEvent({ + stage: 'completed', + source_id: 'src-alpha', + connection_id: 'src-alpha', + }) + ); + }); + + await waitFor(() => { + const syncingButtons = screen.getAllByText('sync.syncing'); + expect(syncingButtons).toHaveLength(1); + }); + + // The one remaining syncing button should be for src-beta. + // The sync button for src-alpha should now show "sync.sync". + const syncButtons = screen.getAllByText('sync.sync'); + expect(syncButtons).toHaveLength(1); // src-alpha back to idle + }); + + it('failed event also clears the syncing source (RC#1)', async () => { + const sources = [makeSource('src-gamma')]; + listMemorySources.mockResolvedValue(sources); + memorySourcesStatusList.mockResolvedValue([]); + + renderWithProviders(); + + await waitFor(() => { + expect(screen.getByText('Source src-gamma')).toBeInTheDocument(); + }); + + act(() => { + window.dispatchEvent(makeSyncStageEvent({ stage: 'fetching', source_id: 'src-gamma' })); + }); + + await waitFor(() => { + expect(screen.getByText('sync.syncing')).toBeInTheDocument(); + }); + + act(() => { + window.dispatchEvent(makeSyncStageEvent({ stage: 'failed', source_id: 'src-gamma' })); + }); + + await waitFor(() => { + expect(screen.queryByText('sync.syncing')).not.toBeInTheDocument(); + expect(screen.getByText('sync.sync')).toBeInTheDocument(); + }); + }); + + it('matches events by source_id when present, ignoring connection_id (RC#2)', async () => { + const sources = [makeSource('src-new')]; + listMemorySources.mockResolvedValue(sources); + memorySourcesStatusList.mockResolvedValue([]); + + renderWithProviders(); + + await waitFor(() => { + expect(screen.getByText('Source src-new')).toBeInTheDocument(); + }); + + // Event with source_id matching the row but a different connection_id + // (e.g. an intermediate stage from the bridge where connection_id = document_id). + act(() => { + window.dispatchEvent( + makeSyncStageEvent({ + stage: 'ingesting', + source_id: 'src-new', // matches the row + connection_id: 'mem_src:src-new:doc-123', // ingest-pipeline id, NOT the row id + }) + ); + }); + + // Row should light up because source_id matches. + await waitFor(() => { + expect(screen.getByText('sync.syncing')).toBeInTheDocument(); + }); + }); + + it('falls back to connection_id when source_id is absent (RC#2 backward compat)', async () => { + const sources = [makeSource('src-legacy')]; + listMemorySources.mockResolvedValue(sources); + memorySourcesStatusList.mockResolvedValue([]); + + renderWithProviders(); + + await waitFor(() => { + expect(screen.getByText('Source src-legacy')).toBeInTheDocument(); + }); + + // Old-style event: no source_id, connection_id is the row id. + act(() => { + window.dispatchEvent( + makeSyncStageEvent({ + stage: 'fetching', + // source_id absent + connection_id: 'src-legacy', + }) + ); + }); + + await waitFor(() => { + expect(screen.getByText('sync.syncing')).toBeInTheDocument(); + }); + }); + + it('ignores events with neither source_id nor connection_id', async () => { + const sources = [makeSource('src-quiet')]; + listMemorySources.mockResolvedValue(sources); + memorySourcesStatusList.mockResolvedValue([]); + + renderWithProviders(); + + await waitFor(() => { + expect(screen.getByText('Source src-quiet')).toBeInTheDocument(); + }); + + act(() => { + window.dispatchEvent( + new CustomEvent('openhuman:memory-sync-stage', { + detail: { stage: 'fetching' }, // no source_id, no connection_id + }) + ); + }); + + // Row should remain idle. + await waitFor(() => { + expect(screen.queryByText('sync.syncing')).not.toBeInTheDocument(); + }); + }); +}); diff --git a/src/core/event_bus/events.rs b/src/core/event_bus/events.rs index f0e3a402d7..5d584e1c30 100644 --- a/src/core/event_bus/events.rs +++ b/src/core/event_bus/events.rs @@ -157,12 +157,24 @@ pub enum DomainEvent { /// /// Emitted by the `memory` domain so the frontend can surface progress /// across request → fetch → store → queue → ingest → complete. + /// + /// `source_id` is the originating memory-source id (from + /// `memory_sources`) when the event can be attributed to a specific + /// source row. The frontend prefers this over `connection_id` for + /// per-row indicator matching (see RC#2, issue #3295). Set to `None` + /// when the event originates from a non-memory-source sync path (e.g. a + /// channel-provider ingest) — `connection_id` remains unchanged for + /// those callers. MemorySyncStageChanged { trigger: String, stage: String, provider: Option, connection_id: Option, detail: Option, + /// Originating memory-source id for frontend per-row indicator + /// matching. `None` when the event is not attributable to a + /// specific `MemorySourceEntry`. + source_id: Option, }, /// A memory ingestion job started running on the local extraction LLM. /// Ingestion is singleton — this fires once, then a matching diff --git a/src/core/socketio.rs b/src/core/socketio.rs index 0894d6679e..d9db11b8d8 100644 --- a/src/core/socketio.rs +++ b/src/core/socketio.rs @@ -865,6 +865,7 @@ pub fn spawn_web_channel_bridge(io: SocketIo) { provider, connection_id, detail, + source_id, } => { let payload = serde_json::json!({ "trigger": trigger, @@ -872,6 +873,10 @@ pub fn spawn_web_channel_bridge(io: SocketIo) { "provider": provider, "connection_id": connection_id, "detail": detail, + // source_id is the memory-source row id for frontend per-row + // indicator matching (RC#2, issue #3295). connection_id is + // preserved unchanged for downstream consumers. + "source_id": source_id, }); let _ = io_memory_sync.emit("memory:sync_stage", &payload); } diff --git a/src/openhuman/memory/ops/sync.rs b/src/openhuman/memory/ops/sync.rs index faf6c48824..5a2a036708 100644 --- a/src/openhuman/memory/ops/sync.rs +++ b/src/openhuman/memory/ops/sync.rs @@ -72,6 +72,7 @@ pub async fn memory_sync_channel( None, Some(¶ms.channel_id), Some("channel-targeted sync requested".to_string()), + None, // channel-level sync — not a memory-source row ); let channel_id_for_spawn = params.channel_id.clone(); tokio::spawn(async move { @@ -105,6 +106,7 @@ pub async fn memory_sync_all() -> Result, String> { None, None, Some("global sync requested".to_string()), + None, // global sync — not a memory-source row ); tokio::spawn(async move { if let Err(e) = spawn_manual_sync(None).await { @@ -144,6 +146,7 @@ async fn spawn_manual_sync(requested_connection: Option) -> Result<(), S None, Some(requested), Some("no active provider-backed sync target matched request".to_string()), + None, // channel-level sync — not a memory-source row ); return Err(format!( "memory sync: no active provider-backed target matched `{requested}`" @@ -159,6 +162,7 @@ async fn spawn_manual_sync(requested_connection: Option) -> Result<(), S Some(&target.toolkit), Some(&target.connection_id), Some("provider sync started".to_string()), + None, // provider-level composio sync — not a memory-source row ); match composio::run_connection_sync( @@ -183,6 +187,7 @@ async fn spawn_manual_sync(requested_connection: Option) -> Result<(), S "provider sync completed items_ingested={}", outcome.items_ingested )), + None, // provider-level composio sync — not a memory-source row ); } Err((error, _usage)) => { @@ -192,6 +197,7 @@ async fn spawn_manual_sync(requested_connection: Option) -> Result<(), S Some(&target.toolkit), Some(&target.connection_id), Some(error.clone()), + None, // provider-level composio sync — not a memory-source row ); tracing::warn!( toolkit = %target.toolkit, diff --git a/src/openhuman/memory/sync.rs b/src/openhuman/memory/sync.rs index 4a4110305f..e1eaa28d28 100644 --- a/src/openhuman/memory/sync.rs +++ b/src/openhuman/memory/sync.rs @@ -65,22 +65,64 @@ impl MemorySyncStage { } /// Publish a coarse sync lifecycle event for UI subscribers. +/// +/// `source_id` is the originating `MemorySourceEntry.id` when this event +/// can be attributed to a specific memory-source row. Pass `None` for +/// non-memory-source sync paths (channel-provider syncs, etc.) to avoid +/// corrupting the per-row indicator on the frontend. pub fn emit_sync_stage( trigger: MemorySyncTrigger, stage: MemorySyncStage, provider: Option<&str>, connection_id: Option<&str>, detail: Option, + source_id: Option<&str>, ) { + log::debug!( + "[memory-sync] emit stage={} trigger={} provider={:?} connection_id={:?} source_id={:?}", + stage.as_str(), + trigger.as_str(), + provider, + connection_id, + source_id + ); publish_global(DomainEvent::MemorySyncStageChanged { trigger: trigger.as_str().to_string(), stage: stage.as_str().to_string(), provider: provider.map(str::to_string), connection_id: connection_id.map(str::to_string), detail, + source_id: source_id.map(str::to_string), }); } +/// Extract the originating memory-source id from a composite `source_id` of +/// the form `"mem_src::"` used by the reader-based ingest +/// path (folder, RSS, web-page sources). +/// +/// The encoding is: `mem_src:` prefix, followed by the memory-source id (a +/// short alphanumeric slug, no colons), then `:`, then the item id (which +/// may contain colons, e.g. RSS GUIDs that are URLs like +/// `https://example.com/feed/1`). +/// +/// Because the **source_id** is always the first colon-delimited segment after +/// `"mem_src:"`, we find the **first** colon — not the last — to extract it. +/// +/// Returns `None` when the source_id is not in this format (e.g. channel- +/// provider syncs such as `"slack:workspace-1"`). +pub fn extract_mem_src_id(composite_source_id: &str) -> Option<&str> { + let rest = composite_source_id.strip_prefix("mem_src:")?; + // format: mem_src:: + // source_id is a plain slug (no colons). item_id follows after the first colon. + let colon_pos = rest.find(':')?; + let source_id = &rest[..colon_pos]; + // Ensure there's something after the colon (item_id is non-empty). + if colon_pos + 1 >= rest.len() { + return None; + } + Some(source_id) +} + static MEMORY_SYNC_FRONTEND_HANDLE: OnceLock = OnceLock::new(); /// Register a lightweight bridge that translates lower-level ingestion events @@ -123,6 +165,15 @@ impl EventHandler for MemorySyncStageBridge { .. } => { let provider = source_id.split(':').next().unwrap_or(source_kind); + // Extract the memory-source id from the composite "mem_src::" + // format used by the reader-based ingest path. For non-memory-source syncs + // (e.g. "slack:workspace-1") this returns None and source_id stays None. + let mem_src_id = extract_mem_src_id(source_id); + log::debug!( + "[memory-sync] bridge: DocumentCanonicalized source_id={} mem_src_id={:?}", + source_id, + mem_src_id + ); emit_sync_stage( MemorySyncTrigger::Manual, MemorySyncStage::Stored, @@ -131,6 +182,7 @@ impl EventHandler for MemorySyncStageBridge { Some(format!( "canonicalized {chunks_written} chunks from {source_id}" )), + mem_src_id, ); emit_sync_stage( MemorySyncTrigger::Manual, @@ -138,6 +190,7 @@ impl EventHandler for MemorySyncStageBridge { Some(provider), None, Some(format!("queued chunk extraction for {source_id}")), + mem_src_id, ); } DomainEvent::MemoryIngestionStarted { @@ -146,12 +199,23 @@ impl EventHandler for MemorySyncStageBridge { queue_depth, .. } => { + // The document_id for reader-based ingest is "mem_src::". + // Extract the memory-source id so the frontend can match the row. + // document_id keeps carrying its original value in connection_id for + // downstream consumers (dedup keys, audit). We only ADD source_id here. + let mem_src_id = extract_mem_src_id(document_id); + log::debug!( + "[memory-sync] bridge: MemoryIngestionStarted document_id={} mem_src_id={:?}", + document_id, + mem_src_id + ); emit_sync_stage( MemorySyncTrigger::Manual, MemorySyncStage::Ingesting, Some(namespace), Some(document_id), Some(format!("queue_depth={queue_depth}")), + mem_src_id, ); } _ => {} @@ -278,4 +342,239 @@ mod tests { assert_eq!(ingesting.1.as_deref(), Some("doc-123")); assert_eq!(ingesting.2.as_deref(), Some("queue_depth=2")); } + + // ── extract_mem_src_id tests ────────────────────────────────────────── + + #[test] + fn extract_mem_src_id_parses_simple_source() { + // "mem_src::" → source_id + assert_eq!( + extract_mem_src_id("mem_src:src-abc-123:item-1"), + Some("src-abc-123") + ); + } + + #[test] + fn extract_mem_src_id_parses_item_id_with_colons_in_it() { + // item_id may contain colons (e.g. RSS GUIDs that are URLs). + // source_id is the first segment after "mem_src:"; item_id is everything after. + assert_eq!( + extract_mem_src_id("mem_src:src-rss-42:https://example.com/feed/item-7"), + Some("src-rss-42") + ); + // Web-page item ids may also contain colons. + assert_eq!( + extract_mem_src_id("mem_src:src-web-99:https://blog.example.com/2024/post"), + Some("src-web-99") + ); + } + + #[test] + fn extract_mem_src_id_returns_none_for_non_mem_src() { + // Channel-provider syncs like "slack:workspace-1" have no mem_src prefix. + assert_eq!(extract_mem_src_id("slack:workspace-1"), None); + assert_eq!(extract_mem_src_id("gmail:alice-thread-1"), None); + assert_eq!(extract_mem_src_id("no-prefix"), None); + } + + #[test] + fn extract_mem_src_id_returns_none_for_missing_item_id() { + // "mem_src:" with no item_id separator is invalid. + assert_eq!(extract_mem_src_id("mem_src:source-only-no-item"), None); + // "mem_src::" with empty item_id is also invalid. + assert_eq!(extract_mem_src_id("mem_src:src-abc:"), None); + } + + // ── bridge populates source_id for Stored/Queued (DocumentCanonicalized) ── + + #[tokio::test] + async fn bridge_populates_source_id_for_stored_and_queued_from_mem_src() { + let _guard = test_mutex() + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + init_global(event_bus::DEFAULT_CAPACITY); + + let collector = StageCollector::default(); + let _subscription = + subscribe_global(Arc::new(collector.clone())).expect("event bus initialized"); + + let bridge = MemorySyncStageBridge; + bridge + .handle(&DomainEvent::DocumentCanonicalized { + // composite source_id format: mem_src:: + source_id: "mem_src:src-folder-1:file-readme".into(), + source_kind: "folder".into(), + chunks_written: 2, + chunk_ids: vec!["chunk-a".into()], + canonicalized_at: 1_700_000_000.0, + body_preview: None, + }) + .await; + + tokio::task::yield_now().await; + + let source_ids: Vec> = collector + .events + .lock() + .unwrap() + .iter() + .filter_map(|event| match event { + DomainEvent::MemorySyncStageChanged { + stage, source_id, .. + } if stage == "stored" || stage == "queued" => Some(source_id.clone()), + _ => None, + }) + .collect(); + assert_eq!(source_ids.len(), 2, "expected stored + queued events"); + for sid in &source_ids { + assert_eq!( + sid.as_deref(), + Some("src-folder-1"), + "[memory-sync] source_id should be extracted from mem_src prefix" + ); + } + } + + #[tokio::test] + async fn bridge_source_id_is_none_for_non_mem_src_canonicalized() { + let _guard = test_mutex() + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + init_global(event_bus::DEFAULT_CAPACITY); + + let collector = StageCollector::default(); + let _subscription = + subscribe_global(Arc::new(collector.clone())).expect("event bus initialized"); + + let bridge = MemorySyncStageBridge; + // Non-memory-source sync (e.g. Slack channel sync) should have source_id=None + bridge + .handle(&DomainEvent::DocumentCanonicalized { + source_id: "slack:workspace-1".into(), + source_kind: "chat".into(), + chunks_written: 5, + chunk_ids: vec!["chunk-b".into()], + canonicalized_at: 1_700_000_000.0, + body_preview: None, + }) + .await; + + tokio::task::yield_now().await; + + let source_ids: Vec> = collector + .events + .lock() + .unwrap() + .iter() + .filter_map(|event| match event { + DomainEvent::MemorySyncStageChanged { + stage, source_id, .. + } if stage == "stored" || stage == "queued" => Some(source_id.clone()), + _ => None, + }) + .collect(); + assert_eq!(source_ids.len(), 2, "expected stored + queued events"); + for sid in &source_ids { + assert!( + sid.is_none(), + "[memory-sync] source_id should be None for non-memory-source syncs" + ); + } + } + + #[tokio::test] + async fn bridge_populates_source_id_for_ingesting_from_mem_src() { + let _guard = test_mutex() + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + init_global(event_bus::DEFAULT_CAPACITY); + + let collector = StageCollector::default(); + let _subscription = + subscribe_global(Arc::new(collector.clone())).expect("event bus initialized"); + + let bridge = MemorySyncStageBridge; + bridge + .handle(&DomainEvent::MemoryIngestionStarted { + document_id: "mem_src:src-rss-42:https://example.com/feed/item-7".into(), + title: "Feed Item".into(), + namespace: "user".into(), + queue_depth: 1, + }) + .await; + + tokio::task::yield_now().await; + + let ingesting = collector + .events + .lock() + .unwrap() + .iter() + .find_map(|event| match event { + DomainEvent::MemorySyncStageChanged { + stage, + connection_id, + source_id, + .. + } if stage == "ingesting" => Some((connection_id.clone(), source_id.clone())), + _ => None, + }) + .expect("ingesting stage should be emitted"); + + // connection_id must still carry the full document_id (unchanged) + assert_eq!( + ingesting.0.as_deref(), + Some("mem_src:src-rss-42:https://example.com/feed/item-7"), + "[memory-sync] connection_id must carry original document_id unchanged" + ); + // source_id extracts just the memory-source id + assert_eq!( + ingesting.1.as_deref(), + Some("src-rss-42"), + "[memory-sync] source_id should be extracted from document_id mem_src prefix" + ); + } + + #[tokio::test] + async fn bridge_source_id_is_none_for_ingesting_non_mem_src() { + let _guard = test_mutex() + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + init_global(event_bus::DEFAULT_CAPACITY); + + let collector = StageCollector::default(); + let _subscription = + subscribe_global(Arc::new(collector.clone())).expect("event bus initialized"); + + let bridge = MemorySyncStageBridge; + // Non-memory-source ingestion (plain document_id, no mem_src prefix) + bridge + .handle(&DomainEvent::MemoryIngestionStarted { + document_id: "doc-plain-uuid".into(), + title: "Vault Note".into(), + namespace: "vault:v-1".into(), + queue_depth: 3, + }) + .await; + + tokio::task::yield_now().await; + + let ingesting = collector + .events + .lock() + .unwrap() + .iter() + .find_map(|event| match event { + DomainEvent::MemorySyncStageChanged { + stage, source_id, .. + } if stage == "ingesting" => Some(source_id.clone()), + _ => None, + }) + .expect("ingesting stage should be emitted"); + + assert!( + ingesting.is_none(), + "[memory-sync] source_id should be None for non-mem_src document_id" + ); + } } diff --git a/src/openhuman/memory/sync_pipeline_e2e_tests.rs b/src/openhuman/memory/sync_pipeline_e2e_tests.rs index adbceb8ea3..08a28418eb 100644 --- a/src/openhuman/memory/sync_pipeline_e2e_tests.rs +++ b/src/openhuman/memory/sync_pipeline_e2e_tests.rs @@ -151,6 +151,7 @@ async fn single_batch_sync_to_tree() { Some("gmail"), Some("conn-1"), None, + None, // channel-level — not a memory-source row ); emit_sync_stage( MemorySyncTrigger::Manual, @@ -158,6 +159,7 @@ async fn single_batch_sync_to_tree() { Some("gmail"), Some("conn-1"), None, + None, // channel-level — not a memory-source row ); let source_id = "gmail:alice-thread-1"; @@ -209,6 +211,7 @@ async fn single_batch_sync_to_tree() { Some("gmail"), Some("conn-1"), None, + None, // channel-level — not a memory-source row ); tokio::task::yield_now().await; diff --git a/src/openhuman/memory_sources/sync.rs b/src/openhuman/memory_sources/sync.rs index fd34c5db56..ad71289efa 100644 --- a/src/openhuman/memory_sources/sync.rs +++ b/src/openhuman/memory_sources/sync.rs @@ -66,6 +66,7 @@ pub async fn sync_source(source: MemorySourceEntry, config: Config) -> Result<() Some(kind_str), Some(&source_id), Some(format!("sync requested for {} source", kind_str)), + Some(&source_id), ); tokio::spawn(async move { @@ -131,6 +132,7 @@ pub async fn sync_source(source: MemorySourceEntry, config: Config) -> Result<() Some(source.kind.as_str()), Some(&source.id), Some(format!("ingested {items} item(s)")), + Some(&source.id), ); // Write audit entry (GitHub writes its own with @@ -205,6 +207,7 @@ pub async fn sync_source(source: MemorySourceEntry, config: Config) -> Result<() Some(source.kind.as_str()), Some(&source.id), Some(error.clone()), + Some(&source.id), ); tracing::warn!( source_id = %source.id, @@ -251,6 +254,7 @@ async fn sync_composio( Some("composio"), Some(&source.id), Some(format!("delegating to composio sync for {connection_id}")), + Some(&source.id), ); match composio::run_connection_sync(config, connection_id, SyncReason::Manual).await { @@ -278,6 +282,7 @@ async fn sync_items_individually( Some(source.kind.as_str()), Some(&source.id), Some("listing items".to_string()), + Some(&source.id), ); let items = reader.list_items(source, config).await?; @@ -293,6 +298,7 @@ async fn sync_items_individually( Some(source.kind.as_str()), Some(&source.id), Some(format!("{total} item(s) discovered")), + Some(&source.id), ); let ingested = Arc::new(AtomicUsize::new(0)); @@ -370,6 +376,7 @@ async fn sync_items_individually( Some(&kind_str), Some(&source_id), Some(format!("{done}/{total} processed ({new} new)")), + Some(&source_id), ); } } diff --git a/src/openhuman/memory_sync/sources/github.rs b/src/openhuman/memory_sync/sources/github.rs index df423ad458..0a8a2aa643 100644 --- a/src/openhuman/memory_sync/sources/github.rs +++ b/src/openhuman/memory_sync/sources/github.rs @@ -80,6 +80,7 @@ pub async fn run_github_sync( Some(kind_str), Some(source_id), Some("listing items".to_string()), + Some(source_id), ); let reader = readers::reader_for(&SourceKind::GithubRepo); @@ -109,6 +110,7 @@ pub async fn run_github_sync( Some(kind_str), Some(source_id), Some(format!("reading {total} items")), + Some(source_id), ); let content_root = config.memory_tree_content_root(); @@ -174,6 +176,7 @@ pub async fn run_github_sync( Some(kind_str), Some(source_id), Some(format!("{}/{total} read", idx + 1)), + Some(source_id), ); } } @@ -210,6 +213,7 @@ pub async fn run_github_sync( Some(format!( "summarising {input_count} items in {batch_count} batch(es)" )), + Some(source_id), ); // Token/charge accounting across the run. The estimate (`body.len() / 4` @@ -339,6 +343,7 @@ pub async fn run_github_sync( Some(format!( "{input_count} items → {batch_count} summary(ies) ({audit_input_tokens} in / {audit_output_tokens} out tokens, ${display_cost:.4})" )), + Some(source_id), ); Ok(SyncOutcome {