Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 209 additions & 0 deletions claude_code_log/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,105 @@ def get_archived_session_count(self, valid_session_ids: set[str]) -> int:
1 for row in cached_rows if row["session_id"] not in valid_session_ids
)

def get_archived_sessions(
self, valid_session_ids: set[str]
) -> Dict[str, SessionCacheData]:
"""Get session data for archived sessions (cached but JSONL deleted).

Args:
valid_session_ids: Set of session IDs that currently exist in source data

Returns:
Dict mapping session_id to SessionCacheData for archived sessions
"""
if self._project_id is None:
return {}

archived_sessions: Dict[str, SessionCacheData] = {}

with self._get_connection() as conn:
session_rows = conn.execute(
"SELECT * FROM sessions WHERE project_id = ?",
(self._project_id,),
).fetchall()

for row in session_rows:
session_id = row["session_id"]
if session_id not in valid_session_ids:
archived_sessions[session_id] = SessionCacheData(
session_id=session_id,
summary=row["summary"],
first_timestamp=row["first_timestamp"],
last_timestamp=row["last_timestamp"],
message_count=row["message_count"],
first_user_message=row["first_user_message"],
cwd=row["cwd"],
total_input_tokens=row["total_input_tokens"],
total_output_tokens=row["total_output_tokens"],
total_cache_creation_tokens=row["total_cache_creation_tokens"],
total_cache_read_tokens=row["total_cache_read_tokens"],
)

return archived_sessions

def export_session_to_jsonl(self, session_id: str) -> List[str]:
"""Export all message content JSONs for a session, for JSONL restoration.

Args:
session_id: The session ID to export

Returns:
List of JSON strings (one per line for JSONL file), compact format
"""
if self._project_id is None:
return []

with self._get_connection() as conn:
rows = conn.execute(
"""SELECT content FROM messages
WHERE project_id = ? AND session_id = ?
ORDER BY timestamp NULLS LAST""",
(self._project_id, session_id),
).fetchall()

# Re-serialize to compact JSON format (no spaces after separators)
# to match original JSONL file format
result: List[str] = []
for row in rows:
try:
parsed = json.loads(row["content"])
compact = json.dumps(parsed, separators=(",", ":"))
result.append(compact)
except json.JSONDecodeError:
# If parsing fails, use original content
result.append(row["content"])
return result

def load_session_entries(self, session_id: str) -> List[TranscriptEntry]:
"""Load transcript entries for a session from cache.

Used for rendering archived sessions to HTML/Markdown when
the original JSONL file no longer exists.

Args:
session_id: The session ID to load

Returns:
List of TranscriptEntry objects for the session
"""
if self._project_id is None:
return []

with self._get_connection() as conn:
rows = conn.execute(
"""SELECT content FROM messages
WHERE project_id = ? AND session_id = ?
ORDER BY timestamp NULLS LAST""",
(self._project_id, session_id),
).fetchall()

return [self._deserialize_entry(row) for row in rows]

# ========== Page Cache Methods (Pagination) ==========

def get_page_size_config(self) -> Optional[int]:
Expand Down Expand Up @@ -1224,6 +1323,115 @@ def get_page_count(self) -> int:

return row["cnt"] if row else 0

def delete_session(self, session_id: str) -> bool:
"""Delete a session and its messages from cache.

Args:
session_id: The session ID to delete

Returns:
True if session was deleted, False if not found
"""
if self._project_id is None:
return False

with self._get_connection() as conn:
# Check if session exists
row = conn.execute(
"SELECT id FROM sessions WHERE project_id = ? AND session_id = ?",
(self._project_id, session_id),
).fetchone()

if not row:
return False

# Delete messages for this session
conn.execute(
"DELETE FROM messages WHERE project_id = ? AND session_id = ?",
(self._project_id, session_id),
)

# Delete HTML cache entries for this session
conn.execute(
"DELETE FROM html_cache WHERE project_id = ? AND source_session_id = ?",
(self._project_id, session_id),
)

# Delete the session record
conn.execute(
"DELETE FROM sessions WHERE project_id = ? AND session_id = ?",
(self._project_id, session_id),
)

self._update_last_updated(conn)
conn.commit()

return True

def delete_project(self) -> bool:
"""Delete this project and all its data from cache.

Returns:
True if project was deleted, False if not found
"""
if self._project_id is None:
return False

with self._get_connection() as conn:
# Cascade delete handles messages, sessions, cached_files, html_cache, html_pages
conn.execute("DELETE FROM projects WHERE id = ?", (self._project_id,))
conn.commit()

self._project_id = None
return True


def get_all_cached_projects(projects_dir: Path) -> List[tuple[str, bool]]:
"""Get all projects from cache, indicating which are archived.

This is a standalone function that queries the cache.db directly
to find all project paths, without needing to instantiate CacheManager
for each project.

Args:
projects_dir: Path to the projects directory (e.g., ~/.claude/projects)

Returns:
List of (project_path, is_archived) tuples.
is_archived is True if the project has no JSONL files but exists in cache.
"""
db_path = projects_dir / "cache.db"
if not db_path.exists():
return []

result: List[tuple[str, bool]] = []

try:
conn = sqlite3.connect(db_path, timeout=30.0)
conn.row_factory = sqlite3.Row
try:
rows = conn.execute(
"SELECT project_path FROM projects ORDER BY project_path"
).fetchall()

for row in rows:
project_path = Path(row["project_path"])
# Check if project has JSONL files (non-archived)
has_jsonl = (
bool(list(project_path.glob("*.jsonl")))
if project_path.exists()
else False
)
# is_archived = project exists in cache but has no JSONL files
is_archived = not has_jsonl
result.append((row["project_path"], is_archived))
finally:
conn.close()
except Exception:
pass

return result


__all__ = [
"CacheManager",
Expand All @@ -1232,5 +1440,6 @@ def get_page_count(self) -> int:
"PageCacheData",
"ProjectCache",
"SessionCacheData",
"get_all_cached_projects",
"get_library_version",
]
Loading