Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .claude/memory.md
Original file line number Diff line number Diff line change
Expand Up @@ -300,3 +300,10 @@ Quick reference for anyone starting with Claude on this project. Updated by the

- **Upstream `main` has 5 Vitest failures and 4 TypeScript compile errors** — Caused by missing iOS experimental dependencies: `@noble/ciphers/chacha`, `@noble/ciphers/webcrypto`, `qrcode.react`, `@tauri-apps/plugin-barcode-scanner`. Breaks `pnpm compile`, `pnpm build`, `pnpm test:coverage` on a clean checkout. Always verify by stashing changes and running checks on the base branch before blaming your PR.
- **`cargo fmt` must run after codecrusher** — codecrusher does not reliably produce `cargo fmt`-clean Rust. Always run `cargo fmt --manifest-path Cargo.toml` after codecrusher finishes and before committing.

## Memory Source Sync Indicators (Issue #3295)

- **`MemorySyncStageChanged` carries two distinct ids** — `connection_id` is the ingest-pipeline document_id (identity for dedup/audit); `source_id: Option<String>` is the memory-source row id used by the UI to match per-source progress. Never conflate them. Frontend (`MemorySourcesRegistry.tsx`) matches on `source_id ?? connection_id` for backward compat.
- **Chunk source_id encoding for mem-src syncs** — folder/RSS/web-page chunks use `mem_src:<source_id>:<item_id>` (`memory_sources/sync.rs`). The `<item_id>` can contain colons (URLs), so extract `<source_id>` by splitting on the **first** colon after the `mem_src:` prefix — use `find(':')`, not `rfind`. See `extract_mem_src_id` in `src/openhuman/memory/sync.rs`.
- **`MemorySyncStageBridge` re-emits stage events from ingestion events** — In `src/openhuman/memory/sync.rs`, converts `DocumentCanonicalized`/`MemoryIngestionStarted` into `MemorySyncStageChanged` with a populated `source_id`. For non-mem-src syncs (channel providers), `source_id` stays `None` — intentional; don't force a value.
- **Disk exhaustion from e2e test compilation** — `pnpm test:rust` compiles heavy integration binaries and can fill disk (`ld: errno=28 No space left on device`). To validate domain logic only, use `cargo test -p openhuman --lib -- "memory::sync"` which skips integration test binaries.
121 changes: 99 additions & 22 deletions app/src/components/intelligence/MemorySourcesRegistry.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,42 @@ interface SyncProgress {
percent: number | null;
}

function parseSyncProgress(detail: string | null): number | null {
if (!detail) return null;
const match = detail.match(/^(\d+)\/(\d+)\s/);
if (!match) return null;
const current = parseInt(match[1], 10);
const total = parseInt(match[2], 10);
return total > 0 ? Math.round((current / total) * 100) : null;
/**
* Per-stage fallback percentages so the progress bar always advances even
* when no numeric "N/M" ratio is present in the detail string (RC#4, #3295).
*/
export const STAGE_FALLBACK_PERCENT: Record<string, number> = {
requested: 2,
fetching: 5,
stored: 15,
queued: 25,
ingesting: 40,
completed: 100,
};

/**
* Parse a sync progress detail string into a 0–100 percent.
*
* - Recognises "N/M ..." numeric patterns and returns N/M as a ratio.
* - Falls back to the per-stage baseline when no ratio is present rather
* than returning a bogus number (RC#4, issue #3295).
* - Returns `null` when both approaches are unavailable (no stage either).
*/
export function parseSyncProgress(detail: string | null, stage?: string): number | null {
// Try the numeric "N/M ..." ratio first.
if (detail) {
const match = detail.match(/^(\d+)\/(\d+)[\s/]/);
if (match) {
const current = parseInt(match[1], 10);
const total = parseInt(match[2], 10);
if (total > 0) return Math.round((current / total) * 100);
}
}
// Fall back to the per-stage baseline percentage.
if (stage && stage in STAGE_FALLBACK_PERCENT) {
return STAGE_FALLBACK_PERCENT[stage];
}
return null;
}

export function MemorySourcesRegistry({
Expand All @@ -56,39 +85,69 @@ export function MemorySourcesRegistry({
const [statuses, setStatuses] = useState<SourceStatus[]>([]);
const [loading, setLoading] = useState(true);
const [dialogOpen, setDialogOpen] = useState(false);
const [syncingId, setSyncingId] = useState<string | null>(null);
// RC#1 (#3295): use a Set so multiple sources can show "syncing" concurrently.
// Set state is always replaced with a new Set to trigger re-renders.
const [syncingIds, setSyncingIds] = useState<Set<string>>(new Set());
const [buildingId, setBuildingId] = useState<string | null>(null);
const [syncProgress, setSyncProgress] = useState<Map<string, SyncProgress>>(new Map());

useEffect(() => {
const handler = (e: Event) => {
const data = (e as CustomEvent).detail as {
stage?: string;
connection_id?: string;
/** Originating memory-source id (RC#2, #3295). Preferred over connection_id. */
source_id?: string | null;
/** Legacy: document/connection id. Still present for backward compat. */
connection_id?: string | null;
detail?: string;
} | null;
if (!data?.connection_id) return;
const sourceId = data.connection_id;
const stage = data.stage ?? '';

// RC#2 (#3295): prefer source_id when present; fall back to connection_id for
// backward compat with older core versions that don't emit source_id yet.
const rowId = data?.source_id ?? data?.connection_id;
if (!rowId) return;

const stage = data?.stage ?? '';

console.debug(
`[ui-flow][memory-sync] stage=${stage} rowId=${rowId} source_id=${data?.source_id ?? 'absent'} connection_id=${data?.connection_id ?? 'absent'}`
);

if (stage === 'completed' || stage === 'failed') {
setSyncProgress(prev => {
const next = new Map(prev);
next.delete(sourceId);
next.delete(rowId);
return next;
});
// RC#1: immutable Set update — remove just this source, keep others syncing.
setSyncingIds(prev => {
const next = new Set(prev);
next.delete(rowId);
return next;
});
setSyncingId(prev => (prev === sourceId ? null : prev));
return;
}

const percent = parseSyncProgress(data.detail ?? null);
const percent = parseSyncProgress(data?.detail ?? null, stage);
setSyncProgress(prev => {
const next = new Map(prev);
next.set(sourceId, { stage, detail: data.detail ?? null, percent });
next.set(rowId, { stage, detail: data?.detail ?? null, percent });
return next;
});
if (stage === 'requested' || stage === 'fetching' || stage === 'ingesting') {
setSyncingId(sourceId);
// RC#1: ADD this source id to the set (immutable update).
if (
stage === 'requested' ||
stage === 'fetching' ||
stage === 'stored' ||
stage === 'queued' ||
stage === 'ingesting'
) {
setSyncingIds(prev => {
if (prev.has(rowId)) return prev; // no change — avoid re-render
const next = new Set(prev);
next.add(rowId);
return next;
});
}
};
window.addEventListener('openhuman:memory-sync-stage', handler);
Expand All @@ -109,6 +168,11 @@ export function MemorySourcesRegistry({
]);
setSources(list);
setStatuses(stats);
// RC#5 (#3295): The 5s poll is the safety net for missed completed/failed events.
// If a source is in syncingIds but the poll shows it's no longer active (no
// in-progress status indicator from the server), we clear it here. In practice
// the event stream covers this; on remount the state rehydrates within ~5s via poll.
// No new RPC needed — reconciliation is best-effort and relies on the existing poll.
} finally {
setLoading(false);
}
Expand Down Expand Up @@ -167,7 +231,14 @@ export function MemorySourcesRegistry({

const handleSync = useCallback(
async (source: MemorySourceEntry) => {
setSyncingId(source.id);
// RC#1 (#3295): add immediately on click — event will also fire, but this
// ensures the row lights up before the first sync-stage event arrives.
setSyncingIds(prev => {
const next = new Set(prev);
next.add(source.id);
return next;
});
console.debug(`[ui-flow][memory-sync] manual sync triggered source_id=${source.id}`);
try {
await syncMemorySource(source.id);
onToast?.({
Expand All @@ -183,7 +254,13 @@ export function MemorySourcesRegistry({
message: err instanceof Error ? err.message : String(err),
});
} finally {
setSyncingId(prev => (prev === source.id ? null : prev));
// Remove on RPC completion. The completed/failed stage event will also
// clear it, but this handles the case where the event is missed.
setSyncingIds(prev => {
const next = new Set(prev);
next.delete(source.id);
return next;
});
}
},
[onToast, refresh, t]
Expand Down Expand Up @@ -253,7 +330,7 @@ export function MemorySourcesRegistry({
key={source.id}
source={source}
status={statusById.get(source.id) ?? null}
isSyncing={syncingId === source.id}
isSyncing={syncingIds.has(source.id)}
isBuilding={buildingId === source.id}
progress={syncProgress.get(source.id) ?? null}
onToggle={handleToggle}
Expand Down Expand Up @@ -347,7 +424,7 @@ function SourceRow({
<div
className="h-full rounded-full bg-primary-500 transition-all duration-300"
style={{
width: `${progress.percent ?? (progress.stage === 'fetching' ? 10 : 5)}%`,
width: `${progress.percent ?? STAGE_FALLBACK_PERCENT[progress.stage] ?? 2}%`,
}}
/>
</div>
Expand Down
Loading
Loading