-```
-
-### Phase 12e: Remove css_class
-
-Once all references use modifiers:
-1. Remove `css_class` parameter from `TemplateMessage.__init__`
-2. Remove `self.css_class = css_class`
-3. Clean up all `css_class=...` at creation sites
-4. Update tests to use modifiers
-
-## Files Changed
-
-| File | Changes |
-|------|---------|
-| `models.py` | Add `MessageModifiers` dataclass |
-| `renderer.py` | Update TemplateMessage, populate modifiers, update hierarchy logic |
-| `html_renderer.py` | New file with HTML utilities and css_class_from_message |
-| `templates/transcript.html` | Use css_class_from_message filter |
-| `test_*.py` | Update tests to use modifiers |
-
-## Testing Strategy
-
-1. **Snapshot tests**: Run after each phase to verify HTML output unchanged
-2. **Unit tests for css_class_from_message**: Verify it produces same strings
-3. **Unit tests for modifiers**: Test each modifier flag
-4. **Integration tests**: Full render with real transcripts
-
-## Commit Plan
-
-1. `Add MessageModifiers dataclass to models.py` (12a)
-2. `Add modifiers field to TemplateMessage` (12a)
-3. `Populate modifiers in message processing` (12b part 1)
-4. `Update hierarchy logic to use modifiers` (12b part 2)
-5. `Create html_renderer.py with css_class_from_message` (12c)
-6. `Move escape_html and render_markdown to html_renderer` (12c)
-7. `Update template to use css_class_from_message` (12d)
-8. `Remove css_class field from TemplateMessage` (12e)
-
-## Risk Assessment
-
-- **Low risk**: MessageModifiers is additive, doesn't break existing code
-- **Medium risk**: Moving functions to html_renderer.py requires import updates
-- **High risk**: Template changes and css_class removal need careful testing
-
-## Estimated Scope
-
-- Phase 12a: ~30 lines added to models.py, ~10 lines to renderer.py
-- Phase 12b: ~50 modifications across renderer.py
-- Phase 12c: ~200 lines new file, ~200 lines moved from renderer.py
-- Phase 12d: ~10 lines template changes
-- Phase 12e: ~20 lines removed
-
-Total: Moderate refactoring, ~5-8 commits
diff --git a/claude_code_log/cache.py b/claude_code_log/cache.py
index ad443726..3f5d43b7 100644
--- a/claude_code_log/cache.py
+++ b/claude_code_log/cache.py
@@ -172,10 +172,10 @@ def load_cached_entries(self, jsonl_path: Path) -> Optional[list[TranscriptEntry
entries_data.extend(cast(list[dict[str, Any]], timestamp_entries))
# Deserialize back to TranscriptEntry objects
- from .parser import parse_transcript_entry
+ from .factories import create_transcript_entry
entries = [
- parse_transcript_entry(entry_dict) for entry_dict in entries_data
+ create_transcript_entry(entry_dict) for entry_dict in entries_data
]
return entries
except Exception as e:
@@ -257,10 +257,10 @@ def load_cached_entries_filtered(
)
# Deserialize filtered entries
- from .parser import parse_transcript_entry
+ from .factories import create_transcript_entry
entries = [
- parse_transcript_entry(entry_dict)
+ create_transcript_entry(entry_dict)
for entry_dict in filtered_entries_data
]
return entries
diff --git a/claude_code_log/converter.py b/claude_code_log/converter.py
index 12a2fa83..8407f6b3 100644
--- a/claude_code_log/converter.py
+++ b/claude_code_log/converter.py
@@ -21,7 +21,8 @@
get_warmup_session_ids,
)
from .cache import CacheManager, SessionCacheData, get_library_version
-from .parser import parse_timestamp, parse_transcript_entry
+from .parser import parse_timestamp
+from .factories import create_transcript_entry
from .models import (
TranscriptEntry,
AssistantTranscriptEntry,
@@ -183,7 +184,7 @@ def load_transcript(
"queue-operation",
]:
# Parse using Pydantic models
- entry = parse_transcript_entry(entry_dict)
+ entry = create_transcript_entry(entry_dict)
messages.append(entry)
elif (
entry_type
diff --git a/claude_code_log/factories/__init__.py b/claude_code_log/factories/__init__.py
new file mode 100644
index 00000000..108a9c91
--- /dev/null
+++ b/claude_code_log/factories/__init__.py
@@ -0,0 +1,115 @@
+"""Factory modules for creating typed objects from raw data."""
+
+from .meta_factory import (
+ # Metadata creation
+ create_meta,
+)
+from .system_factory import (
+ # System message detection
+ is_system_message,
+ # System message creation
+ create_system_message,
+)
+from .user_factory import (
+ # User message type detection
+ is_bash_input,
+ is_bash_output,
+ is_command_message,
+ is_local_command_output,
+ # User message creation
+ create_bash_input_message,
+ create_bash_output_message,
+ create_command_output_message,
+ create_compacted_summary_message,
+ create_ide_notification_content,
+ create_slash_command_message,
+ create_user_memory_message,
+ create_user_message,
+ # Patterns and constants
+ COMPACTED_SUMMARY_PREFIX,
+ IDE_DIAGNOSTICS_PATTERN,
+ IDE_OPENED_FILE_PATTERN,
+ IDE_SELECTION_PATTERN,
+)
+from .assistant_factory import (
+ # Assistant message creation
+ create_assistant_message,
+ create_thinking_message,
+)
+from .tool_factory import (
+ # Tool message creation
+ create_tool_input,
+ create_tool_use_message,
+ create_tool_result_message,
+ # Tool processing result
+ ToolItemResult,
+ # Tool input models mapping
+ TOOL_INPUT_MODELS,
+)
+from .transcript_factory import (
+ # Content type constants
+ ASSISTANT_CONTENT_TYPES,
+ USER_CONTENT_TYPES,
+ # Conditional casts
+ as_assistant_entry,
+ as_user_entry,
+ # Usage normalization
+ normalize_usage_info,
+ # Content item creation
+ create_content_item,
+ create_message_content,
+ # Transcript entry creation
+ create_transcript_entry,
+)
+
+__all__ = [
+ # Metadata creation
+ "create_meta",
+ # Content type constants
+ "USER_CONTENT_TYPES",
+ "ASSISTANT_CONTENT_TYPES",
+ # Conditional casts
+ "as_user_entry",
+ "as_assistant_entry",
+ # Usage normalization
+ "normalize_usage_info",
+ # Content item creation
+ "create_content_item",
+ "create_message_content",
+ # Transcript entry creation
+ "create_transcript_entry",
+ # System message detection
+ "is_system_message",
+ # System message creation
+ "create_system_message",
+ # User message type detection
+ "is_bash_input",
+ "is_bash_output",
+ "is_command_message",
+ "is_local_command_output",
+ # User message creation
+ "create_bash_input_message",
+ "create_bash_output_message",
+ "create_command_output_message",
+ "create_compacted_summary_message",
+ "create_ide_notification_content",
+ "create_slash_command_message",
+ "create_user_memory_message",
+ "create_user_message",
+ # Patterns and constants
+ "COMPACTED_SUMMARY_PREFIX",
+ "IDE_DIAGNOSTICS_PATTERN",
+ "IDE_OPENED_FILE_PATTERN",
+ "IDE_SELECTION_PATTERN",
+ # Assistant message creation
+ "create_assistant_message",
+ "create_thinking_message",
+ # Tool message creation
+ "create_tool_input",
+ "create_tool_use_message",
+ "create_tool_result_message",
+ # Tool processing result
+ "ToolItemResult",
+ # Tool input models mapping
+ "TOOL_INPUT_MODELS",
+]
diff --git a/claude_code_log/factories/assistant_factory.py b/claude_code_log/factories/assistant_factory.py
new file mode 100644
index 00000000..3f8b977d
--- /dev/null
+++ b/claude_code_log/factories/assistant_factory.py
@@ -0,0 +1,114 @@
+"""Factory for assistant transcript entries.
+
+This module handles creation of AssistantTranscriptEntry content into MessageContent
+subclasses:
+- AssistantTextMessage: Claude's text responses
+- ThinkingMessage: Extended thinking blocks
+"""
+
+from typing import Optional
+
+from ..models import (
+ AssistantTextMessage,
+ ContentItem,
+ MessageMeta,
+ TextContent,
+ ThinkingContent,
+ ThinkingMessage,
+ UsageInfo,
+)
+
+
+# =============================================================================
+# Token Usage Formatting
+# =============================================================================
+
+
+def format_token_usage(usage: UsageInfo) -> str:
+ """Format token usage information as a display string.
+
+ Args:
+ usage: UsageInfo object with token counts.
+
+ Returns:
+ Formatted string like "Input: 100 | Output: 50 | Cache Read: 25"
+ """
+ token_parts = [
+ f"Input: {usage.input_tokens}",
+ f"Output: {usage.output_tokens}",
+ ]
+ if usage.cache_creation_input_tokens:
+ token_parts.append(f"Cache Creation: {usage.cache_creation_input_tokens}")
+ if usage.cache_read_input_tokens:
+ token_parts.append(f"Cache Read: {usage.cache_read_input_tokens}")
+ return " | ".join(token_parts)
+
+
+# =============================================================================
+# Message Creation Functions
+# =============================================================================
+
+
+def create_assistant_message(
+ meta: MessageMeta,
+ items: list[ContentItem],
+ usage: Optional[UsageInfo] = None,
+) -> Optional[AssistantTextMessage]:
+ """Create AssistantTextMessage from content items.
+
+ Creates AssistantTextMessage from text/image content items.
+
+ Args:
+ meta: Message metadata.
+ items: List of text/image content items (no tool_use, tool_result, thinking).
+ usage: Optional token usage info to format and attach.
+
+ Returns:
+ AssistantTextMessage if items is non-empty, None otherwise.
+ """
+ # Create AssistantTextMessage directly from items
+ # (empty text already filtered by chunk_message_content)
+ if items:
+ # Extract text content from items for dedup matching and simple renderers
+ text_content = "\n".join(
+ item.text for item in items if isinstance(item, TextContent)
+ )
+ return AssistantTextMessage(
+ meta,
+ items=items, # type: ignore[arg-type]
+ raw_text_content=text_content if text_content else None,
+ token_usage=format_token_usage(usage) if usage else None,
+ )
+ return None
+
+
+def create_thinking_message(
+ meta: MessageMeta,
+ tool_item: ContentItem,
+ usage: Optional[UsageInfo] = None,
+) -> ThinkingMessage:
+ """Create ThinkingMessage from a thinking content item.
+
+ Args:
+ meta: Message metadata.
+ tool_item: ThinkingContent or compatible object with 'thinking' attribute
+ usage: Optional token usage info to format and attach.
+
+ Returns:
+ ThinkingMessage containing the thinking text and optional signature.
+ """
+ # Extract thinking text from the content item
+ if isinstance(tool_item, ThinkingContent):
+ thinking_text = tool_item.thinking.strip()
+ signature = getattr(tool_item, "signature", None)
+ else:
+ thinking_text = getattr(tool_item, "thinking", str(tool_item)).strip()
+ signature = None
+
+ # Create the content model (formatting happens in HtmlRenderer)
+ return ThinkingMessage(
+ meta,
+ thinking=thinking_text,
+ signature=signature,
+ token_usage=format_token_usage(usage) if usage else None,
+ )
diff --git a/claude_code_log/factories/meta_factory.py b/claude_code_log/factories/meta_factory.py
new file mode 100644
index 00000000..ee6f0d42
--- /dev/null
+++ b/claude_code_log/factories/meta_factory.py
@@ -0,0 +1,33 @@
+"""Factory for creating MessageMeta from transcript entries.
+
+This module handles extraction of common metadata from transcript entries
+that is shared across all message types.
+"""
+
+from ..models import BaseTranscriptEntry, MessageMeta
+
+
+def create_meta(transcript: BaseTranscriptEntry) -> MessageMeta:
+ """Create MessageMeta from a transcript entry.
+
+ Extracts all shared fields from BaseTranscriptEntry subclasses.
+
+ Args:
+ transcript: Any transcript entry inheriting from BaseTranscriptEntry
+
+ Returns:
+ MessageMeta with identity and context fields
+ """
+ return MessageMeta(
+ # Identity fields
+ session_id=transcript.sessionId,
+ timestamp=transcript.timestamp,
+ uuid=transcript.uuid,
+ parent_uuid=transcript.parentUuid,
+ # Context fields
+ is_sidechain=transcript.isSidechain,
+ is_meta=getattr(transcript, "isMeta", False) or False,
+ agent_id=transcript.agentId,
+ cwd=transcript.cwd,
+ git_branch=transcript.gitBranch,
+ )
diff --git a/claude_code_log/factories/system_factory.py b/claude_code_log/factories/system_factory.py
new file mode 100644
index 00000000..3ec9c785
--- /dev/null
+++ b/claude_code_log/factories/system_factory.py
@@ -0,0 +1,87 @@
+"""Factory for system transcript entries.
+
+This module handles creation of MessageContent from SystemTranscriptEntry:
+- SystemMessage: Regular system messages with level (info, warning, error)
+- HookSummaryMessage: Hook execution summaries
+
+Also provides:
+- is_system_message: Check if text content is a system message to filter
+"""
+
+from typing import Optional, Union
+
+from ..models import (
+ HookInfo,
+ HookSummaryMessage,
+ SystemMessage,
+ SystemTranscriptEntry,
+)
+from .meta_factory import create_meta
+
+
+# =============================================================================
+# System Message Detection
+# =============================================================================
+
+
+def is_system_message(text_content: str) -> bool:
+ """Check if a message is a system message that should be filtered out."""
+ system_message_patterns = [
+ "Caveat: The messages below were generated by the user while running local commands. DO NOT respond to these messages or otherwise consider them in your response unless the user explicitly asks you to.",
+ "[Request interrupted by user for tool use]",
+ "
",
+ ]
+
+ return any(text_content.startswith(pattern) for pattern in system_message_patterns)
+
+
+# =============================================================================
+# System Message Creation
+# =============================================================================
+
+
+def create_system_message(
+ transcript: SystemTranscriptEntry,
+) -> Optional[Union[SystemMessage, HookSummaryMessage]]:
+ """Create a MessageContent from a system transcript entry.
+
+ Handles:
+ - Hook summaries (subtype="stop_hook_summary")
+ - Regular system messages with level-specific styling (info, warning, error)
+
+ Args:
+ transcript: The system transcript entry to process
+
+ Returns:
+ SystemMessage or HookSummaryMessage (with meta attached),
+ or None if the message should be skipped (e.g., silent hook successes)
+
+ Note:
+ Slash command messages (, ) are user messages,
+ not system messages. They are handled separately.
+ """
+ if transcript.subtype == "stop_hook_summary":
+ # Skip silent hook successes (no output, no errors)
+ if not transcript.hasOutput and not transcript.hookErrors:
+ return None
+ # Create structured hook summary content
+ meta = create_meta(transcript)
+ hook_infos = [
+ HookInfo(command=info.get("command", "unknown"))
+ for info in (transcript.hookInfos or [])
+ ]
+ return HookSummaryMessage(
+ has_output=bool(transcript.hasOutput),
+ hook_errors=transcript.hookErrors or [],
+ hook_infos=hook_infos,
+ meta=meta,
+ )
+
+ if not transcript.content:
+ # Skip system messages without content (shouldn't happen normally)
+ return None
+
+ # Create structured system content
+ meta = create_meta(transcript)
+ level = getattr(transcript, "level", "info")
+ return SystemMessage(level=level, text=transcript.content, meta=meta)
diff --git a/claude_code_log/factories/tool_factory.py b/claude_code_log/factories/tool_factory.py
new file mode 100644
index 00000000..312472db
--- /dev/null
+++ b/claude_code_log/factories/tool_factory.py
@@ -0,0 +1,769 @@
+"""Factory for tool use and tool result content.
+
+This module handles creation of tool-related content into MessageContent subclasses:
+- ToolUseMessage: Tool invocations with typed inputs (BashInput, ReadInput, etc.)
+- ToolResultMessage: Tool results with output and context
+
+Also provides creation of tool inputs into typed models:
+- create_tool_input(): Create typed tool input from raw dict
+- create_tool_use_message(): Process ToolUseContent into ToolItemResult
+- create_tool_result_message(): Process ToolResultContent into ToolItemResult
+"""
+
+from dataclasses import dataclass
+from typing import Any, Callable, Optional, cast
+
+from pydantic import BaseModel
+
+import re
+
+from ..models import (
+ # Tool input models
+ AskUserQuestionInput,
+ AskUserQuestionItem,
+ AskUserQuestionOption,
+ BashInput,
+ EditInput,
+ EditItem,
+ ExitPlanModeInput,
+ GlobInput,
+ GrepInput,
+ MessageContent,
+ MessageMeta,
+ MultiEditInput,
+ ReadInput,
+ TaskInput,
+ TodoWriteInput,
+ TodoWriteItem,
+ ToolInput,
+ ToolResultContent,
+ ToolResultMessage,
+ ToolUseContent,
+ ToolUseMessage,
+ WriteInput,
+ # Tool output models
+ AskUserQuestionAnswer,
+ AskUserQuestionOutput,
+ BashOutput,
+ EditOutput,
+ ExitPlanModeOutput,
+ ReadOutput,
+ TaskOutput,
+ ToolOutput,
+ WriteOutput,
+)
+
+
+# =============================================================================
+# Tool Input Models Mapping
+# =============================================================================
+
+TOOL_INPUT_MODELS: dict[str, type[BaseModel]] = {
+ "Bash": BashInput,
+ "Read": ReadInput,
+ "Write": WriteInput,
+ "Edit": EditInput,
+ "MultiEdit": MultiEditInput,
+ "Glob": GlobInput,
+ "Grep": GrepInput,
+ "Task": TaskInput,
+ "TodoWrite": TodoWriteInput,
+ "AskUserQuestion": AskUserQuestionInput,
+ "ask_user_question": AskUserQuestionInput, # Legacy tool name
+ "ExitPlanMode": ExitPlanModeInput,
+}
+
+
+# =============================================================================
+# Lenient Parsing Helpers
+# =============================================================================
+# These functions create typed models even when strict validation fails.
+# They use defaults for missing fields and skip invalid nested items.
+
+
+def _parse_todowrite_lenient(data: dict[str, Any]) -> TodoWriteInput:
+ """Parse TodoWrite input leniently, handling malformed data."""
+ todos_raw = data.get("todos", [])
+ valid_todos: list[TodoWriteItem] = []
+ for item in todos_raw:
+ if isinstance(item, dict):
+ try:
+ valid_todos.append(TodoWriteItem.model_validate(item))
+ except Exception:
+ pass
+ elif isinstance(item, str):
+ valid_todos.append(TodoWriteItem(content=item))
+ return TodoWriteInput(todos=valid_todos)
+
+
+def _parse_bash_lenient(data: dict[str, Any]) -> BashInput:
+ """Parse Bash input leniently."""
+ return BashInput(
+ command=data.get("command", ""),
+ description=data.get("description"),
+ timeout=data.get("timeout"),
+ run_in_background=data.get("run_in_background"),
+ )
+
+
+def _parse_write_lenient(data: dict[str, Any]) -> WriteInput:
+ """Parse Write input leniently."""
+ return WriteInput(
+ file_path=data.get("file_path", ""),
+ content=data.get("content", ""),
+ )
+
+
+def _parse_edit_lenient(data: dict[str, Any]) -> EditInput:
+ """Parse Edit input leniently."""
+ return EditInput(
+ file_path=data.get("file_path", ""),
+ old_string=data.get("old_string", ""),
+ new_string=data.get("new_string", ""),
+ replace_all=data.get("replace_all"),
+ )
+
+
+def _parse_multiedit_lenient(data: dict[str, Any]) -> MultiEditInput:
+ """Parse Multiedit input leniently."""
+ edits_raw = data.get("edits", [])
+ valid_edits: list[EditItem] = []
+ for edit in edits_raw:
+ if isinstance(edit, dict):
+ try:
+ valid_edits.append(EditItem.model_validate(edit))
+ except Exception:
+ pass
+ return MultiEditInput(file_path=data.get("file_path", ""), edits=valid_edits)
+
+
+def _parse_task_lenient(data: dict[str, Any]) -> TaskInput:
+ """Parse Task input leniently."""
+ return TaskInput(
+ prompt=data.get("prompt", ""),
+ subagent_type=data.get("subagent_type", ""),
+ description=data.get("description", ""),
+ model=data.get("model"),
+ run_in_background=data.get("run_in_background"),
+ resume=data.get("resume"),
+ )
+
+
+def _parse_read_lenient(data: dict[str, Any]) -> ReadInput:
+ """Parse Read input leniently."""
+ return ReadInput(
+ file_path=data.get("file_path", ""),
+ offset=data.get("offset"),
+ limit=data.get("limit"),
+ )
+
+
+def _parse_askuserquestion_lenient(data: dict[str, Any]) -> AskUserQuestionInput:
+ """Parse AskUserQuestion input leniently, handling malformed data."""
+ questions_raw = data.get("questions", [])
+ valid_questions: list[AskUserQuestionItem] = []
+ for q in questions_raw:
+ if isinstance(q, dict):
+ q_dict = cast(dict[str, Any], q)
+ try:
+ # Parse options leniently
+ options_raw = q_dict.get("options", [])
+ valid_options: list[AskUserQuestionOption] = []
+ for opt in options_raw:
+ if isinstance(opt, dict):
+ try:
+ valid_options.append(
+ AskUserQuestionOption.model_validate(opt)
+ )
+ except Exception:
+ pass
+ valid_questions.append(
+ AskUserQuestionItem(
+ question=str(q_dict.get("question", "")),
+ header=q_dict.get("header"),
+ options=valid_options,
+ multiSelect=bool(q_dict.get("multiSelect", False)),
+ )
+ )
+ except Exception:
+ pass
+ return AskUserQuestionInput(
+ questions=valid_questions,
+ question=data.get("question"),
+ )
+
+
+def _parse_exitplanmode_lenient(data: dict[str, Any]) -> ExitPlanModeInput:
+ """Parse ExitPlanMode input leniently."""
+ return ExitPlanModeInput(
+ plan=data.get("plan", ""),
+ launchSwarm=data.get("launchSwarm"),
+ teammateCount=data.get("teammateCount"),
+ )
+
+
+# Mapping of tool names to their lenient parsers
+TOOL_LENIENT_PARSERS: dict[str, Any] = {
+ "Bash": _parse_bash_lenient,
+ "Write": _parse_write_lenient,
+ "Edit": _parse_edit_lenient,
+ "MultiEdit": _parse_multiedit_lenient,
+ "Task": _parse_task_lenient,
+ "TodoWrite": _parse_todowrite_lenient,
+ "Read": _parse_read_lenient,
+ "AskUserQuestion": _parse_askuserquestion_lenient,
+ "ask_user_question": _parse_askuserquestion_lenient, # Legacy tool name
+ "ExitPlanMode": _parse_exitplanmode_lenient,
+}
+
+
+# =============================================================================
+# Tool Input Creation
+# =============================================================================
+
+
+def create_tool_input(
+ tool_name: str, input_data: dict[str, Any]
+) -> Optional[ToolInput]:
+ """Create typed tool input from raw dictionary.
+
+ Uses strict validation first, then lenient parsing if available.
+
+ Args:
+ tool_name: The name of the tool (e.g., "Bash", "Read")
+ input_data: The raw input dictionary from the tool_use content
+
+ Returns:
+ A typed input model if parsing succeeds, None otherwise.
+ When None is returned, the caller should use ToolUseContent itself
+ as the fallback (it's part of the ToolInput union).
+ """
+ model_class = TOOL_INPUT_MODELS.get(tool_name)
+ if model_class is not None:
+ try:
+ return cast(ToolInput, model_class.model_validate(input_data))
+ except Exception:
+ # Try lenient parsing if available
+ lenient_parser = TOOL_LENIENT_PARSERS.get(tool_name)
+ if lenient_parser is not None:
+ return cast(ToolInput, lenient_parser(input_data))
+ return None
+ return None
+
+
+# =============================================================================
+# Tool Output Parsing
+# =============================================================================
+# Parse raw tool result content into typed output models (ReadOutput, EditOutput, etc.)
+# Symmetric with Tool Input parsing above.
+
+
+def _parse_cat_n_snippet(
+ lines: list[str], start_idx: int = 0
+) -> Optional[tuple[str, Optional[str], int]]:
+ """Parse cat-n formatted snippet from lines.
+
+ Args:
+ lines: List of lines to parse
+ start_idx: Index to start parsing from (default: 0)
+
+ Returns:
+ Tuple of (code_content, system_reminder, line_offset) or None if not parseable
+ """
+ code_lines: list[str] = []
+ system_reminder: Optional[str] = None
+ in_system_reminder = False
+ line_offset = 1 # Default offset
+
+ for line in lines[start_idx:]:
+ # Check for system-reminder start
+ if "" in line:
+ in_system_reminder = True
+ system_reminder = ""
+ continue
+
+ # Check for system-reminder end
+ if "" in line:
+ in_system_reminder = False
+ continue
+
+ # If in system reminder, accumulate reminder text
+ if in_system_reminder:
+ if system_reminder is not None:
+ system_reminder += line + "\n"
+ continue
+
+ # Parse regular code line (format: " 123βcontent")
+ match = re.match(r"\s+(\d+)β(.*)$", line)
+ if match:
+ line_num = int(match.group(1))
+ # Capture the first line number as offset
+ if not code_lines:
+ line_offset = line_num
+ code_lines.append(match.group(2))
+ elif line.strip() == "": # Allow empty lines between cat-n lines
+ continue
+ else: # Non-matching non-empty line, stop parsing
+ break
+
+ if not code_lines:
+ return None
+
+ # Join code lines and trim trailing reminder text
+ code_content = "\n".join(code_lines)
+ if system_reminder:
+ system_reminder = system_reminder.strip()
+
+ return (code_content, system_reminder, line_offset)
+
+
+def _extract_tool_result_text(tool_result: ToolResultContent) -> str:
+ """Extract text content from a ToolResultContent.
+
+ Handles both string content and structured content (list of dicts).
+
+ Args:
+ tool_result: The tool result to extract text from
+
+ Returns:
+ Extracted text content, or empty string if none found
+ """
+ content = tool_result.content
+ if isinstance(content, str):
+ return content
+ # Structured content - extract text from list of content items
+ # Format: [{"type": "text", "text": "..."}, ...]
+ text_parts: list[str] = []
+ for item in content:
+ if item.get("type") == "text":
+ text_parts.append(str(item.get("text", "")))
+ return "\n".join(text_parts)
+
+
+def parse_read_output(
+ tool_result: ToolResultContent, file_path: Optional[str]
+) -> Optional[ReadOutput]:
+ """Parse Read tool result into structured content.
+
+ Args:
+ tool_result: The tool result content
+ file_path: Path to the file that was read (required for ReadOutput)
+
+ Returns:
+ ReadOutput if parsing succeeds, None otherwise
+ """
+ if not file_path:
+ return None
+ if not (content := _extract_tool_result_text(tool_result)):
+ return None
+
+ # Check if content matches the cat-n format pattern (line_number β content)
+ lines = content.split("\n")
+ if not lines or not re.match(r"\s+\d+β", lines[0]):
+ return None
+
+ result = _parse_cat_n_snippet(lines)
+ if result is None:
+ return None
+
+ code_content, system_reminder, line_offset = result
+ num_lines = len(code_content.split("\n"))
+
+ return ReadOutput(
+ file_path=file_path,
+ content=code_content,
+ start_line=line_offset,
+ num_lines=num_lines,
+ total_lines=num_lines, # We don't know total from result
+ is_truncated=False, # Can't determine from result
+ system_reminder=system_reminder,
+ )
+
+
+def parse_edit_output(
+ tool_result: ToolResultContent, file_path: Optional[str]
+) -> Optional[EditOutput]:
+ """Parse Edit tool result into structured content.
+
+ Edit tool results typically have format:
+ "The file ... has been updated. Here's the result of running `cat -n` on a snippet..."
+ followed by cat-n formatted lines.
+
+ Args:
+ tool_result: The tool result content
+ file_path: Path to the file that was edited (required for EditOutput)
+
+ Returns:
+ EditOutput if parsing succeeds, None otherwise
+ """
+ if not file_path:
+ return None
+ if not (content := _extract_tool_result_text(tool_result)):
+ return None
+
+ # Look for the cat-n snippet after the preamble
+ # Pattern: look for first line that matches the cat-n format
+ lines = content.split("\n")
+ code_start_idx = None
+
+ for i, line in enumerate(lines):
+ if re.match(r"\s+\d+β", line):
+ code_start_idx = i
+ break
+
+ if code_start_idx is None:
+ return None
+
+ result = _parse_cat_n_snippet(lines, code_start_idx)
+ if result is None:
+ return None
+
+ code_content, _system_reminder, line_offset = result
+ # Edit tool doesn't use system_reminder
+
+ return EditOutput(
+ file_path=file_path,
+ success=True, # If we got here, edit succeeded
+ diffs=[], # We don't have diff info from result
+ message=code_content,
+ start_line=line_offset,
+ )
+
+
+def parse_write_output(
+ tool_result: ToolResultContent, file_path: Optional[str]
+) -> Optional[WriteOutput]:
+ """Parse Write tool result into structured content.
+
+ Write tool results contain an acknowledgment on the first line.
+ We extract just the first line for display.
+
+ Args:
+ tool_result: The tool result content
+ file_path: Path to the file that was written (required for WriteOutput)
+
+ Returns:
+ WriteOutput if parsing succeeds, None otherwise
+ """
+ if not file_path:
+ return None
+ if not (content := _extract_tool_result_text(tool_result)):
+ return None
+
+ lines = content.split("\n")
+ if not lines[0]:
+ return None
+
+ first_line = lines[0]
+ return WriteOutput(
+ file_path=file_path,
+ success=True, # If we got content, write succeeded
+ message=first_line,
+ )
+
+
+def parse_task_output(
+ tool_result: ToolResultContent, file_path: Optional[str]
+) -> Optional[TaskOutput]:
+ """Parse Task tool result into structured content.
+
+ Task tool results contain the agent's response as markdown.
+
+ Args:
+ tool_result: The tool result content (agent's response)
+ file_path: Unused for Task tool
+
+ Returns:
+ TaskOutput with the agent's response
+ """
+ del file_path # Unused
+ if not (content := _extract_tool_result_text(tool_result)):
+ return None
+ return TaskOutput(result=content)
+
+
+def _looks_like_bash_output(content: str) -> bool:
+ """Check if content looks like it's from a Bash tool based on common patterns."""
+ if not content:
+ return False
+
+ # Check for ANSI escape sequences
+ if "\x1b[" in content:
+ return True
+
+ # Check for common bash/terminal patterns
+ bash_indicators = [
+ "$ ", # Shell prompt
+ "β― ", # Modern shell prompt
+ "> ", # Shell continuation
+ "\n+ ", # Bash -x output
+ "bash: ", # Bash error messages
+ "/bin/bash", # Bash path
+ "command not found", # Common bash error
+ "Permission denied", # Common bash error
+ "No such file or directory", # Common bash error
+ ]
+
+ # Check for file path patterns that suggest command output
+ if re.search(r"/[a-zA-Z0-9_-]+(/[a-zA-Z0-9_.-]+)*", content): # Unix-style paths
+ return True
+
+ # Check for common command output patterns
+ if any(indicator in content for indicator in bash_indicators):
+ return True
+
+ return False
+
+
+def parse_bash_output(
+ tool_result: ToolResultContent, file_path: Optional[str]
+) -> Optional[BashOutput]:
+ """Parse Bash tool result into structured content.
+
+ Detects ANSI escape sequences for terminal formatting.
+
+ Args:
+ tool_result: The tool result content
+ file_path: Unused for Bash tool
+
+ Returns:
+ BashOutput with content and ANSI flag
+ """
+ del file_path # Unused
+ if not (content := _extract_tool_result_text(tool_result)):
+ return None
+ has_ansi = _looks_like_bash_output(content)
+ return BashOutput(content=content, has_ansi=has_ansi)
+
+
+def parse_askuserquestion_output(
+ tool_result: ToolResultContent, file_path: Optional[str]
+) -> Optional[AskUserQuestionOutput]:
+ """Parse AskUserQuestion tool result into structured content.
+
+ Parses the result format:
+ 'User has answered your questions: "Q1"="A1", "Q2"="A2". You can now continue...'
+
+ Args:
+ tool_result: The tool result content
+ file_path: Unused for AskUserQuestion tool
+
+ Returns:
+ AskUserQuestionOutput with Q&A pairs
+ """
+ del file_path # Unused
+ if not (content := _extract_tool_result_text(tool_result)):
+ return None
+ # Check if this is a successful answer
+ if not content.startswith("User has answered your question"):
+ return None
+
+ # Extract the Q&A portion between the colon and the final sentence
+ match = re.match(
+ r"User has answered your questions?: (.+)\. You can now continue",
+ content,
+ re.DOTALL,
+ )
+ if not match:
+ return None
+
+ qa_portion = match.group(1)
+
+ # Parse "Question"="Answer" pairs
+ qa_pattern = re.compile(r'"([^"]+)"="([^"]+)"')
+ pairs = qa_pattern.findall(qa_portion)
+
+ if not pairs:
+ return None
+
+ answers = [AskUserQuestionAnswer(question=q, answer=a) for q, a in pairs]
+ return AskUserQuestionOutput(answers=answers, raw_message=content)
+
+
+def parse_exitplanmode_output(
+ tool_result: ToolResultContent, file_path: Optional[str]
+) -> Optional[ExitPlanModeOutput]:
+ """Parse ExitPlanMode tool result into structured content.
+
+ Truncates redundant plan echo on success.
+ When a plan is approved, the result contains:
+ 1. A confirmation message
+ 2. Path to saved plan file
+ 3. "## Approved Plan:" followed by full plan text (redundant)
+
+ Args:
+ tool_result: The tool result content
+ file_path: Unused for ExitPlanMode tool
+
+ Returns:
+ ExitPlanModeOutput with truncated message
+ """
+ del file_path # Unused
+ if not (content := _extract_tool_result_text(tool_result)):
+ return None
+ approved = "User has approved your plan" in content
+
+ if approved:
+ # Truncate at "## Approved Plan:"
+ marker = "## Approved Plan:"
+ marker_pos = content.find(marker)
+ if marker_pos > 0:
+ message = content[:marker_pos].rstrip()
+ else:
+ message = content
+ else:
+ message = content
+
+ return ExitPlanModeOutput(message=message, approved=approved)
+
+
+# Type alias for tool output parsers
+ToolOutputParser = Callable[[ToolResultContent, Optional[str]], Optional[ToolOutput]]
+
+# Registry of tool output parsers: tool_name -> parser(tool_result, file_path) -> Optional[ToolOutput]
+# Parsers receive the full ToolResultContent and can use _extract_tool_result_text() for text.
+TOOL_OUTPUT_PARSERS: dict[str, ToolOutputParser] = {
+ "Read": parse_read_output,
+ "Edit": parse_edit_output,
+ "Write": parse_write_output,
+ "Bash": parse_bash_output,
+ "Task": parse_task_output,
+ "AskUserQuestion": parse_askuserquestion_output,
+ "ExitPlanMode": parse_exitplanmode_output,
+}
+
+
+def create_tool_output(
+ tool_name: str,
+ tool_result: ToolResultContent,
+ file_path: Optional[str] = None,
+) -> ToolOutput:
+ """Create typed tool output from raw ToolResultContent.
+
+ Parses the raw content into specialized output types when possible,
+ using the TOOL_OUTPUT_PARSERS registry. Each parser receives the full
+ ToolResultContent and can use _extract_tool_result_text() if it needs text.
+
+ Args:
+ tool_name: The name of the tool (e.g., "Bash", "Read")
+ tool_result: The raw tool result content
+ file_path: Optional file path for file-based tools (Read, Edit, Write)
+
+ Returns:
+ A typed output model if parsing succeeds, ToolResultContent as fallback.
+ """
+ # Look up parser in registry and parse if available
+ if (parser := TOOL_OUTPUT_PARSERS.get(tool_name)) and (
+ parsed := parser(tool_result, file_path)
+ ):
+ return parsed
+
+ # Fallback to raw ToolResultContent
+ return tool_result
+
+
+# =============================================================================
+# Tool Item Processing
+# =============================================================================
+
+
+@dataclass
+class ToolItemResult:
+ """Result of processing a single tool/thinking/image item.
+
+ Note: Titles are computed at render time by Renderer.title_content() dispatch.
+ """
+
+ message_type: str
+ content: Optional[MessageContent] = None # Structured content for rendering
+ tool_use_id: Optional[str] = None
+ is_error: bool = False # For tool_result error state
+
+
+def create_tool_use_message(
+ meta: MessageMeta,
+ tool_use: ToolUseContent,
+ tool_use_context: dict[str, ToolUseContent],
+) -> ToolItemResult:
+ """Create ToolItemResult from a tool_use content item.
+
+ Args:
+ meta: Message metadata
+ tool_use: The tool use content item
+ tool_use_context: Dict to populate with tool_use_id -> ToolUseContent mapping
+
+ Returns:
+ ToolItemResult with tool_use content model
+ """
+ # Parse tool input into typed model (BashInput, ReadInput, etc.)
+ parsed = create_tool_input(tool_use.name, tool_use.input)
+
+ # Populate tool_use_context for later use when processing tool results
+ tool_use_context[tool_use.id] = tool_use
+
+ # Create ToolUseMessage wrapper with parsed input for specialized formatting
+ # Use ToolUseContent as fallback when no specialized parser exists
+ tool_use_message = ToolUseMessage(
+ meta,
+ input=parsed if parsed is not None else tool_use,
+ tool_use_id=tool_use.id,
+ tool_name=tool_use.name,
+ )
+
+ return ToolItemResult(
+ message_type="tool_use",
+ content=tool_use_message,
+ tool_use_id=tool_use.id,
+ )
+
+
+def create_tool_result_message(
+ meta: MessageMeta,
+ tool_result: ToolResultContent,
+ tool_use_context: dict[str, ToolUseContent],
+) -> ToolItemResult:
+ """Create ToolItemResult from a tool_result content item.
+
+ Args:
+ meta: Message metadata
+ tool_result: The tool result content item
+ tool_use_context: Dict with tool_use_id -> ToolUseContent mapping
+
+ Returns:
+ ToolItemResult with tool_result content model
+ """
+ # Get file_path and tool_name from tool_use context for specialized rendering
+ result_file_path: Optional[str] = None
+ result_tool_name: Optional[str] = None
+ if tool_result.tool_use_id in tool_use_context:
+ tool_use_from_ctx = tool_use_context[tool_result.tool_use_id]
+ result_tool_name = tool_use_from_ctx.name
+ if (
+ result_tool_name in ("Read", "Edit", "Write")
+ and "file_path" in tool_use_from_ctx.input
+ ):
+ result_file_path = tool_use_from_ctx.input["file_path"]
+
+ # Parse into typed output (ReadOutput, EditOutput, etc.) when possible
+ parsed_output = create_tool_output(
+ result_tool_name or "",
+ tool_result,
+ result_file_path,
+ )
+
+ # Create content model with rendering context
+ content_model = ToolResultMessage(
+ meta,
+ tool_use_id=tool_result.tool_use_id,
+ output=parsed_output,
+ is_error=tool_result.is_error or False,
+ tool_name=result_tool_name,
+ file_path=result_file_path,
+ )
+
+ return ToolItemResult(
+ message_type="tool_result",
+ content=content_model,
+ tool_use_id=tool_result.tool_use_id,
+ is_error=tool_result.is_error or False,
+ )
diff --git a/claude_code_log/factories/transcript_factory.py b/claude_code_log/factories/transcript_factory.py
new file mode 100644
index 00000000..c70821e1
--- /dev/null
+++ b/claude_code_log/factories/transcript_factory.py
@@ -0,0 +1,265 @@
+"""Factory for creating TranscriptEntry and ContentItem instances from raw data.
+
+This module creates typed model instances from JSONL transcript data:
+- TranscriptEntry subclasses (User, Assistant, Summary, System, QueueOperation)
+- ContentItem subclasses (Text, ToolUse, ToolResult, Thinking, Image)
+
+Also provides:
+- Conditional casts for TranscriptEntry discrimination
+- Usage info normalization for Anthropic SDK compatibility
+"""
+
+from typing import Any, Callable, Optional, Sequence, cast
+
+from pydantic import BaseModel
+
+from ..models import (
+ # Content types
+ ContentItem,
+ ImageContent,
+ TextContent,
+ ThinkingContent,
+ ToolResultContent,
+ ToolUseContent,
+ # Transcript entry types
+ AssistantTranscriptEntry,
+ MessageType,
+ QueueOperationTranscriptEntry,
+ SummaryTranscriptEntry,
+ SystemTranscriptEntry,
+ TranscriptEntry,
+ UsageInfo,
+ UserTranscriptEntry,
+)
+
+
+# =============================================================================
+# Content Item Registry
+# =============================================================================
+
+# Maps content type strings to their model classes
+CONTENT_ITEM_CREATORS: dict[str, type[BaseModel]] = {
+ "text": TextContent,
+ "tool_result": ToolResultContent,
+ "image": ImageContent,
+ "tool_use": ToolUseContent,
+ "thinking": ThinkingContent,
+}
+
+# Content types allowed in each context
+USER_CONTENT_TYPES: Sequence[str] = ("text", "tool_result", "image")
+ASSISTANT_CONTENT_TYPES: Sequence[str] = ("text", "tool_use", "thinking")
+
+
+# =============================================================================
+# Conditional Casts
+# =============================================================================
+
+
+def as_user_entry(entry: TranscriptEntry) -> UserTranscriptEntry | None:
+ """Return entry as UserTranscriptEntry if it is one, else None."""
+ if entry.type == MessageType.USER:
+ return cast(UserTranscriptEntry, entry)
+ return None
+
+
+def as_assistant_entry(entry: TranscriptEntry) -> AssistantTranscriptEntry | None:
+ """Return entry as AssistantTranscriptEntry if it is one, else None."""
+ if entry.type == MessageType.ASSISTANT:
+ return cast(AssistantTranscriptEntry, entry)
+ return None
+
+
+# =============================================================================
+# Usage Info Normalization
+# =============================================================================
+
+
+def normalize_usage_info(usage_data: Any) -> Optional[UsageInfo]:
+ """Normalize usage data from various formats to UsageInfo."""
+ if usage_data is None:
+ return None
+
+ # If it's already a UsageInfo instance, return as-is
+ if isinstance(usage_data, UsageInfo):
+ return usage_data
+
+ # If it's a dict, validate and convert
+ if isinstance(usage_data, dict):
+ return UsageInfo.model_validate(usage_data)
+
+ # Handle object-like access (e.g., from SDK types)
+ if hasattr(usage_data, "input_tokens"):
+ server_tool_use = getattr(usage_data, "server_tool_use", None)
+ if server_tool_use is not None and hasattr(server_tool_use, "model_dump"):
+ server_tool_use = server_tool_use.model_dump()
+ return UsageInfo(
+ input_tokens=getattr(usage_data, "input_tokens", None),
+ output_tokens=getattr(usage_data, "output_tokens", None),
+ cache_creation_input_tokens=getattr(
+ usage_data, "cache_creation_input_tokens", None
+ ),
+ cache_read_input_tokens=getattr(
+ usage_data, "cache_read_input_tokens", None
+ ),
+ service_tier=getattr(usage_data, "service_tier", None),
+ server_tool_use=server_tool_use,
+ )
+
+ return None
+
+
+# =============================================================================
+# Content Item Creation
+# =============================================================================
+
+
+def create_content_item(
+ item_data: dict[str, Any],
+ type_filter: Sequence[str] | None = None,
+) -> ContentItem:
+ """Create a ContentItem from raw data using the registry.
+
+ Args:
+ item_data: The raw dictionary data
+ type_filter: Sequence of content type strings to allow, or None to allow all
+ (e.g., USER_CONTENT_TYPES, ASSISTANT_CONTENT_TYPES)
+
+ Returns:
+ ContentItem instance, with fallback to TextContent for unknown types
+ """
+ try:
+ content_type = item_data.get("type", "")
+
+ if type_filter is None or content_type in type_filter:
+ model_class = CONTENT_ITEM_CREATORS.get(content_type)
+ if model_class is not None:
+ return cast(ContentItem, model_class.model_validate(item_data))
+
+ # Fallback to text content for unknown/disallowed types
+ return TextContent(type="text", text=str(item_data))
+ except Exception:
+ return TextContent(type="text", text=str(item_data))
+
+
+def create_message_content(
+ content_data: Any,
+ type_filter: Sequence[str] | None = None,
+) -> list[ContentItem]:
+ """Create a list of ContentItems from message content data.
+
+ Always returns a list for consistent downstream handling. String content
+ is wrapped in a TextContent item.
+
+ Args:
+ content_data: Raw content data (string or list of items)
+ type_filter: Sequence of content type strings to allow, or None to allow all
+ """
+ if isinstance(content_data, str):
+ return [TextContent(type="text", text=content_data)]
+ elif isinstance(content_data, list):
+ content_list = cast(list[Any], content_data)
+ result: list[ContentItem] = []
+ for item in content_list:
+ if isinstance(item, dict):
+ result.append(
+ create_content_item(cast(dict[str, Any], item), type_filter)
+ )
+ else:
+ # Non-dict items (e.g., raw strings) become TextContent
+ result.append(TextContent(type="text", text=str(item)))
+ return result
+ else:
+ return [TextContent(type="text", text=str(content_data))]
+
+
+# =============================================================================
+# Transcript Entry Creation
+# =============================================================================
+
+
+def _create_user_entry(data: dict[str, Any]) -> UserTranscriptEntry:
+ """Create a UserTranscriptEntry from raw data."""
+ data_copy = data.copy()
+ if "message" in data_copy and "content" in data_copy["message"]:
+ data_copy["message"] = data_copy["message"].copy()
+ data_copy["message"]["content"] = create_message_content(
+ data_copy["message"]["content"],
+ USER_CONTENT_TYPES,
+ )
+ # Parse toolUseResult if present and it's a list of content items
+ if "toolUseResult" in data_copy and isinstance(data_copy["toolUseResult"], list):
+ # Check if it's a list of content items (MCP tool results)
+ tool_use_result = cast(list[Any], data_copy["toolUseResult"])
+ if (
+ tool_use_result
+ and isinstance(tool_use_result[0], dict)
+ and "type" in tool_use_result[0]
+ ):
+ data_copy["toolUseResult"] = [
+ create_content_item(cast(dict[str, Any], item))
+ for item in tool_use_result
+ if isinstance(item, dict)
+ ]
+ return UserTranscriptEntry.model_validate(data_copy)
+
+
+def _create_assistant_entry(data: dict[str, Any]) -> AssistantTranscriptEntry:
+ """Create an AssistantTranscriptEntry from raw data."""
+ data_copy = data.copy()
+
+ if "message" in data_copy and "content" in data_copy["message"]:
+ message_copy = data_copy["message"].copy()
+ message_copy["content"] = create_message_content(
+ message_copy["content"],
+ ASSISTANT_CONTENT_TYPES,
+ )
+
+ # Normalize usage data to support both Anthropic and custom formats
+ if "usage" in message_copy:
+ message_copy["usage"] = normalize_usage_info(message_copy["usage"])
+
+ data_copy["message"] = message_copy
+ return AssistantTranscriptEntry.model_validate(data_copy)
+
+
+def _create_queue_operation_entry(
+ data: dict[str, Any],
+) -> QueueOperationTranscriptEntry:
+ """Create a QueueOperationTranscriptEntry from raw data."""
+ data_copy = data.copy()
+ if "content" in data_copy and isinstance(data_copy["content"], list):
+ data_copy["content"] = create_message_content(data_copy["content"])
+ return QueueOperationTranscriptEntry.model_validate(data_copy)
+
+
+# Registry mapping entry types to their creator functions
+ENTRY_CREATORS: dict[str, Callable[[dict[str, Any]], TranscriptEntry]] = {
+ "user": _create_user_entry,
+ "assistant": _create_assistant_entry,
+ "summary": lambda data: SummaryTranscriptEntry.model_validate(data),
+ "system": lambda data: SystemTranscriptEntry.model_validate(data),
+ "queue-operation": _create_queue_operation_entry,
+}
+
+
+def create_transcript_entry(data: dict[str, Any]) -> TranscriptEntry:
+ """Create a TranscriptEntry from a JSON dictionary.
+
+ Uses a registry-based dispatch to create the appropriate TranscriptEntry
+ subclass based on the 'type' field in the data.
+
+ Args:
+ data: Dictionary parsed from JSON
+
+ Returns:
+ The appropriate TranscriptEntry subclass
+
+ Raises:
+ ValueError: If the data doesn't match any known transcript entry type
+ """
+ entry_type = data.get("type")
+ creator = ENTRY_CREATORS.get(entry_type) # type: ignore[arg-type]
+ if creator is None:
+ raise ValueError(f"Unknown transcript entry type: {entry_type}")
+ return creator(data)
diff --git a/claude_code_log/factories/user_factory.py b/claude_code_log/factories/user_factory.py
new file mode 100644
index 00000000..22b30f66
--- /dev/null
+++ b/claude_code_log/factories/user_factory.py
@@ -0,0 +1,477 @@
+"""Factory for user transcript entries.
+
+This module handles creation of MessageContent from user transcript entries:
+- SlashCommandMessage: Slash command invocations
+- CommandOutputMessage: Local command output
+- BashInputMessage: Bash command input
+- BashOutputMessage: Bash command output
+- UserTextMessage: Regular user text (with optional IDE notifications)
+- UserSlashCommandMessage: Expanded slash command prompts (isMeta)
+- CompactedSummaryMessage: Compacted conversation summaries
+- UserMemoryMessage: User memory content
+- UserSteeringMessage: User steering prompts (queue-operation 'remove')
+
+Also provides:
+- is_command_message: Check if text is a slash command
+- is_local_command_output: Check if text is local command output
+- is_bash_input: Check if text is bash input
+- is_bash_output: Check if text is bash output
+"""
+
+import json
+import re
+from typing import Any, Optional, Union, cast
+
+from ..models import (
+ BashInputMessage,
+ BashOutputMessage,
+ CommandOutputMessage,
+ CompactedSummaryMessage,
+ ContentItem,
+ IdeDiagnostic,
+ IdeNotificationContent,
+ IdeOpenedFile,
+ IdeSelection,
+ ImageContent,
+ MessageMeta,
+ SlashCommandMessage,
+ TextContent,
+ UserMemoryMessage,
+ UserSlashCommandMessage,
+ UserTextMessage,
+)
+
+
+# =============================================================================
+# Message Type Detection
+# =============================================================================
+
+
+def is_command_message(text_content: str) -> bool:
+ """Check if a message contains command information that should be displayed."""
+ return "" in text_content and "" in text_content
+
+
+def is_local_command_output(text_content: str) -> bool:
+ """Check if a message contains local command output."""
+ return "" in text_content
+
+
+def is_bash_input(text_content: str) -> bool:
+ """Check if a message contains bash input command."""
+ return "" in text_content and "" in text_content
+
+
+def is_bash_output(text_content: str) -> bool:
+ """Check if a message contains bash command output."""
+ return "" in text_content or "" in text_content
+
+
+# =============================================================================
+# Slash Command Creation
+# =============================================================================
+
+
+def create_slash_command_message(
+ meta: MessageMeta,
+ text: str,
+) -> Optional[SlashCommandMessage]:
+ """Create SlashCommandMessage from text containing command tags.
+
+ Args:
+ text: Raw text that may contain command-name, command-args, command-contents tags
+ meta: Message metadata
+
+ Returns:
+ SlashCommandMessage if tags found, None otherwise
+ """
+ command_name_match = re.search(r"([^<]+)", text)
+ if not command_name_match:
+ return None
+
+ command_name = command_name_match.group(1).strip()
+
+ command_args_match = re.search(r"([^<]*)", text)
+ command_args = command_args_match.group(1).strip() if command_args_match else ""
+
+ # Parse command contents, handling JSON format
+ command_contents_match = re.search(
+ r"(.+?)", text, re.DOTALL
+ )
+ command_contents = ""
+ if command_contents_match:
+ contents_text = command_contents_match.group(1).strip()
+ # Try to parse as JSON and extract the text field
+ try:
+ contents_json: Any = json.loads(contents_text)
+ if isinstance(contents_json, dict) and "text" in contents_json:
+ text_dict = cast(dict[str, Any], contents_json)
+ text_value = text_dict["text"]
+ command_contents = str(text_value)
+ else:
+ command_contents = contents_text
+ except json.JSONDecodeError:
+ command_contents = contents_text
+
+ return SlashCommandMessage(
+ command_name=command_name,
+ command_args=command_args,
+ command_contents=command_contents,
+ meta=meta,
+ )
+
+
+def create_command_output_message(
+ meta: MessageMeta,
+ text: str,
+) -> Optional[CommandOutputMessage]:
+ """Create CommandOutputMessage from text containing local-command-stdout tags.
+
+ Args:
+ text: Raw text that may contain local-command-stdout tags
+ meta: Message metadata
+
+ Returns:
+ CommandOutputMessage if tags found, None otherwise
+ """
+ stdout_match = re.search(
+ r"(.*?)",
+ text,
+ re.DOTALL,
+ )
+ if not stdout_match:
+ return None
+
+ stdout_content = stdout_match.group(1).strip()
+ # Check if content looks like markdown (starts with markdown headers)
+ is_markdown = bool(re.match(r"^#+\s+", stdout_content, re.MULTILINE))
+
+ return CommandOutputMessage(
+ stdout=stdout_content, is_markdown=is_markdown, meta=meta
+ )
+
+
+# =============================================================================
+# Bash Input/Output Creation
+# =============================================================================
+
+
+def create_bash_input_message(
+ meta: MessageMeta,
+ text: str,
+) -> Optional[BashInputMessage]:
+ """Create BashInputMessage from text containing bash-input tags.
+
+ Args:
+ text: Raw text that may contain bash-input tags
+ meta: Message metadata
+
+ Returns:
+ BashInputMessage if tags found, None otherwise
+ """
+ bash_match = re.search(r"(.*?)", text, re.DOTALL)
+ if not bash_match:
+ return None
+
+ return BashInputMessage(command=bash_match.group(1).strip(), meta=meta)
+
+
+def create_bash_output_message(
+ meta: MessageMeta,
+ text: str,
+) -> Optional[BashOutputMessage]:
+ """Create BashOutputMessage from text containing bash-stdout/bash-stderr tags.
+
+ Args:
+ text: Raw text that may contain bash-stdout/bash-stderr tags
+ meta: Message metadata
+
+ Returns:
+ BashOutputMessage if tags found, None otherwise
+ """
+ stdout_match = re.search(r"(.*?)", text, re.DOTALL)
+ stderr_match = re.search(r"(.*?)", text, re.DOTALL)
+
+ if not stdout_match and not stderr_match:
+ return None
+
+ stdout = stdout_match.group(1).strip() if stdout_match else None
+ stderr = stderr_match.group(1).strip() if stderr_match else None
+
+ # Convert empty strings to None for cleaner representation
+ if stdout == "":
+ stdout = None
+ if stderr == "":
+ stderr = None
+
+ return BashOutputMessage(stdout=stdout, stderr=stderr, meta=meta)
+
+
+# =============================================================================
+# IDE Notification Creation
+# =============================================================================
+
+# Shared regex patterns for IDE notification tags
+IDE_OPENED_FILE_PATTERN = re.compile(
+ r"(.*?)", re.DOTALL
+)
+IDE_SELECTION_PATTERN = re.compile(r"(.*?)", re.DOTALL)
+IDE_DIAGNOSTICS_PATTERN = re.compile(
+ r"\s*(.*?)\s*",
+ re.DOTALL,
+)
+
+
+def create_ide_notification_content(text: str) -> Optional[IdeNotificationContent]:
+ """Create IdeNotificationContent from text containing IDE tags.
+
+ Handles:
+ - : Simple file open notifications
+ - : Code selection notifications
+ - : JSON diagnostic arrays
+
+ Args:
+ text: Raw text that may contain IDE notification tags
+
+ Returns:
+ IdeNotificationContent if any tags found, None otherwise
+ """
+ opened_files: list[IdeOpenedFile] = []
+ selections: list[IdeSelection] = []
+ diagnostics: list[IdeDiagnostic] = []
+ remaining_text = text
+
+ # Pattern 1: content
+ for match in IDE_OPENED_FILE_PATTERN.finditer(remaining_text):
+ content = match.group(1).strip()
+ opened_files.append(IdeOpenedFile(content=content))
+
+ remaining_text = IDE_OPENED_FILE_PATTERN.sub("", remaining_text)
+
+ # Pattern 2: content
+ for match in IDE_SELECTION_PATTERN.finditer(remaining_text):
+ content = match.group(1).strip()
+ selections.append(IdeSelection(content=content))
+
+ remaining_text = IDE_SELECTION_PATTERN.sub("", remaining_text)
+
+ # Pattern 3: JSON
+ for match in IDE_DIAGNOSTICS_PATTERN.finditer(remaining_text):
+ json_content = match.group(1).strip()
+ try:
+ parsed_diagnostics: Any = json.loads(json_content)
+ if isinstance(parsed_diagnostics, list):
+ diagnostics.append(
+ IdeDiagnostic(
+ diagnostics=cast(list[dict[str, Any]], parsed_diagnostics)
+ )
+ )
+ else:
+ # Not a list, store as raw content
+ diagnostics.append(IdeDiagnostic(raw_content=json_content))
+ except (json.JSONDecodeError, ValueError):
+ # JSON parsing failed, store raw content
+ diagnostics.append(IdeDiagnostic(raw_content=json_content))
+
+ remaining_text = IDE_DIAGNOSTICS_PATTERN.sub("", remaining_text)
+
+ # Only return if we found any IDE tags
+ if not opened_files and not selections and not diagnostics:
+ return None
+
+ return IdeNotificationContent(
+ opened_files=opened_files,
+ selections=selections,
+ diagnostics=diagnostics,
+ remaining_text=remaining_text.strip(),
+ )
+
+
+# =============================================================================
+# Compacted Summary and User Memory Creation
+# =============================================================================
+
+# Pattern for compacted session summary detection
+COMPACTED_SUMMARY_PREFIX = "This session is being continued from a previous conversation that ran out of context"
+
+
+def create_compacted_summary_message(
+ meta: MessageMeta,
+ content_list: list[ContentItem],
+) -> Optional[CompactedSummaryMessage]:
+ """Create CompactedSummaryMessage from content list.
+
+ Compacted summaries are generated when a session runs out of context and
+ needs to be continued. They contain a summary of the previous conversation.
+
+ If the first text item starts with the compacted summary prefix, all text
+ items are combined into a single CompactedSummaryMessage.
+
+ Args:
+ content_list: List of ContentItem from user message
+ meta: Message metadata
+
+ Returns:
+ CompactedSummaryMessage if first text is a compacted summary, None otherwise
+ """
+ if not content_list or not isinstance(content_list[0], TextContent):
+ return None
+
+ first_text = content_list[0].text
+ if not first_text.startswith(COMPACTED_SUMMARY_PREFIX):
+ return None
+
+ # Combine all text content for compacted summaries
+ texts = [item.text for item in content_list if isinstance(item, TextContent)]
+ all_text = "\n\n".join(texts)
+ return CompactedSummaryMessage(summary_text=all_text, meta=meta)
+
+
+# Pattern for user memory input tag
+USER_MEMORY_PATTERN = re.compile(
+ r"(.*?)", re.DOTALL
+)
+
+
+def create_user_memory_message(
+ meta: MessageMeta,
+ text: str,
+) -> Optional[UserMemoryMessage]:
+ """Create UserMemoryMessage from text containing user-memory-input tag.
+
+ User memory input contains context that the user has provided from
+ their CLAUDE.md or other memory sources.
+
+ Args:
+ text: Raw text that may contain user memory input tag
+ meta: Message metadata
+
+ Returns:
+ UserMemoryMessage if tag found, None otherwise
+ """
+ match = USER_MEMORY_PATTERN.search(text)
+ if match:
+ memory_content = match.group(1).strip()
+ return UserMemoryMessage(memory_text=memory_content, meta=meta)
+ return None
+
+
+# =============================================================================
+# User Message Content Creation
+# =============================================================================
+
+# Type alias for content models returned by create_user_message
+UserMessageContent = Union[
+ SlashCommandMessage,
+ CommandOutputMessage,
+ BashInputMessage,
+ BashOutputMessage,
+ CompactedSummaryMessage,
+ UserMemoryMessage,
+ UserSlashCommandMessage,
+ UserTextMessage,
+]
+
+
+def create_user_message(
+ meta: MessageMeta,
+ content_list: list[ContentItem],
+ text_content: str,
+ is_slash_command: bool = False,
+) -> Optional[UserMessageContent]:
+ """Create a user message content model from content items.
+
+ This is the main entry point for creating user message content.
+ It handles all user message types by detecting patterns in the text:
+ - Slash commands (, )
+ - Local command output ()
+ - Bash input ()
+ - Bash output (, )
+ - Compacted summaries (special prefix)
+ - User memory ()
+ - Slash command expanded prompts (isMeta=True)
+ - Regular user text with IDE notifications
+
+ Args:
+ content_list: List of ContentItem from user message
+ text_content: Pre-extracted text content for pattern detection
+ is_slash_command: True for slash command expanded prompts (isMeta=True)
+ meta: Message metadata
+
+ Returns:
+ A content model, or None if content_list is empty.
+ """
+ if not content_list:
+ return None
+
+ # Check for special message patterns first (before generic parsing)
+ if is_command_message(text_content):
+ return create_slash_command_message(meta, text_content)
+
+ if is_local_command_output(text_content):
+ return create_command_output_message(meta, text_content)
+
+ if is_bash_input(text_content):
+ return create_bash_input_message(meta, text_content)
+
+ if is_bash_output(text_content):
+ return create_bash_output_message(meta, text_content)
+
+ # Slash command expanded prompts - combine all text as markdown
+ if is_slash_command:
+ all_text = "\n\n".join(
+ getattr(item, "text", "") for item in content_list if hasattr(item, "text")
+ )
+ return UserSlashCommandMessage(text=all_text, meta=meta) if all_text else None
+
+ # Get first text item for special case detection
+ first_text_item = next(
+ (item for item in content_list if hasattr(item, "text")),
+ None,
+ )
+ first_text = getattr(first_text_item, "text", "") if first_text_item else ""
+
+ # Check for compacted session summary first (handles text combining internally)
+ if compacted := create_compacted_summary_message(meta, content_list):
+ return compacted
+
+ # Check for user memory input
+ if user_memory := create_user_memory_message(meta, first_text):
+ return user_memory
+
+ # Build items list preserving order, extracting IDE notifications from text
+ items: list[TextContent | ImageContent | IdeNotificationContent] = []
+
+ for item in content_list:
+ # Check for text content
+ if hasattr(item, "text"):
+ item_text: str = getattr(item, "text") # type: ignore[assignment]
+
+ if ide_content := create_ide_notification_content(item_text):
+ # Add IDE notification item first
+ items.append(ide_content)
+ remaining_text: str = ide_content.remaining_text
+ else:
+ remaining_text = item_text
+
+ # Add remaining text as TextContent if non-empty
+ if remaining_text.strip():
+ items.append(TextContent(type="text", text=remaining_text))
+ elif isinstance(item, ImageContent):
+ # ImageContent model - use as-is
+ items.append(item)
+ elif hasattr(item, "source") and getattr(item, "type", None) == "image":
+ # Duck-typed image content - convert to our Pydantic model
+ items.append(ImageContent.model_validate(item.model_dump())) # type: ignore[union-attr]
+
+ # Extract text content from items for dedup matching and simple renderers
+ raw_text_content = "\n".join(
+ item.text for item in items if isinstance(item, TextContent)
+ )
+
+ # Return UserTextMessage with items list and cached raw text
+ return UserTextMessage(
+ items=items,
+ raw_text_content=raw_text_content if raw_text_content else None,
+ meta=meta,
+ )
diff --git a/claude_code_log/html/__init__.py b/claude_code_log/html/__init__.py
index ef847ef9..168ba9e2 100644
--- a/claude_code_log/html/__init__.py
+++ b/claude_code_log/html/__init__.py
@@ -8,6 +8,7 @@
escape_html,
get_message_emoji,
get_template_environment,
+ is_session_header,
render_collapsible_code,
render_file_content_collapsible,
render_markdown,
@@ -15,25 +16,32 @@
starts_with_emoji,
)
from .tool_formatters import (
- format_askuserquestion_content,
+ # Tool input formatters (called by HtmlRenderer.format_{InputClass})
+ format_askuserquestion_input,
+ format_bash_input,
+ format_edit_input,
+ format_exitplanmode_input,
+ format_multiedit_input,
+ format_read_input,
+ format_task_input,
+ format_todowrite_input,
+ format_write_input,
+ # Tool output formatters (called by HtmlRenderer.format_{OutputClass})
+ format_askuserquestion_output,
+ format_bash_output,
+ format_edit_output,
+ format_exitplanmode_output,
+ format_read_output,
+ format_task_output,
+ format_write_output,
+ # Fallback formatter
+ format_tool_result_content_raw,
+ # Legacy formatters (still used)
format_askuserquestion_result,
- format_bash_tool_content,
- format_edit_tool_content,
- format_edit_tool_result,
- format_exitplanmode_content,
format_exitplanmode_result,
- format_multiedit_tool_content,
- format_read_tool_content,
- format_read_tool_result,
- format_task_tool_content,
- format_todowrite_content,
- format_tool_result_content,
- format_tool_use_content,
+ # Tool summary and title
format_tool_use_title,
- format_write_tool_content,
get_tool_summary,
- parse_edit_output,
- parse_read_output,
render_params_table,
)
from .system_formatters import (
@@ -43,28 +51,28 @@
format_system_content,
)
from ..models import (
- AssistantTextContent,
- BashInputContent,
- BashOutputContent,
- CommandOutputContent,
- CompactedSummaryContent,
- DedupNoticeContent,
+ AssistantTextMessage,
+ BashInputMessage,
+ BashOutputMessage,
+ CommandOutputMessage,
+ CompactedSummaryMessage,
+ DedupNoticeMessage,
IdeDiagnostic,
IdeNotificationContent,
IdeOpenedFile,
IdeSelection,
- SessionHeaderContent,
- SlashCommandContent,
- ThinkingContentModel,
- UserMemoryContent,
- UserTextContent,
+ SessionHeaderMessage,
+ SlashCommandMessage,
+ ThinkingMessage,
+ UserMemoryMessage,
+ UserTextMessage,
)
-from ..parser import (
- parse_bash_input,
- parse_bash_output,
- parse_command_output,
- parse_ide_notifications,
- parse_slash_command,
+from ..factories import (
+ create_bash_input_message,
+ create_bash_output_message,
+ create_command_output_message,
+ create_ide_notification_content,
+ create_slash_command_message,
)
from .user_formatters import (
format_bash_input_content,
@@ -89,49 +97,55 @@
"escape_html",
"get_message_emoji",
"get_template_environment",
+ "is_session_header",
"render_collapsible_code",
"render_file_content_collapsible",
"render_markdown",
"render_markdown_collapsible",
"starts_with_emoji",
- # tool_formatters (input)
- "format_askuserquestion_content",
+ # tool_formatters (input) - called by HtmlRenderer.format_{InputClass}
+ "format_askuserquestion_input",
+ "format_bash_input",
+ "format_edit_input",
+ "format_exitplanmode_input",
+ "format_multiedit_input",
+ "format_read_input",
+ "format_task_input",
+ "format_todowrite_input",
+ "format_write_input",
+ # tool_formatters (output) - called by HtmlRenderer.format_{OutputClass}
+ "format_askuserquestion_output",
+ "format_bash_output",
+ "format_edit_output",
+ "format_exitplanmode_output",
+ "format_read_output",
+ "format_task_output",
+ "format_write_output",
+ # Fallback formatter
+ "format_tool_result_content_raw",
+ # Legacy formatters (still used)
"format_askuserquestion_result",
- "format_bash_tool_content",
- "format_edit_tool_content",
- "format_exitplanmode_content",
"format_exitplanmode_result",
- "format_multiedit_tool_content",
- "format_read_tool_content",
- "format_task_tool_content",
- "format_todowrite_content",
- "format_tool_use_content",
+ # Tool summary and title
"format_tool_use_title",
- "format_write_tool_content",
"get_tool_summary",
"render_params_table",
- # tool_formatters (output/result)
- "parse_read_output",
- "format_read_tool_result",
- "parse_edit_output",
- "format_edit_tool_result",
- "format_tool_result_content",
# system_formatters
"format_dedup_notice_content",
"format_hook_summary_content",
"format_session_header_content",
"format_system_content",
# system content models
- "DedupNoticeContent",
- "SessionHeaderContent",
+ "DedupNoticeMessage",
+ "SessionHeaderMessage",
# user_formatters (content models)
- "SlashCommandContent",
- "CommandOutputContent",
- "BashInputContent",
- "BashOutputContent",
- "CompactedSummaryContent",
- "UserMemoryContent",
- "UserTextContent",
+ "SlashCommandMessage",
+ "CommandOutputMessage",
+ "BashInputMessage",
+ "BashOutputMessage",
+ "CompactedSummaryMessage",
+ "UserMemoryMessage",
+ "UserTextMessage",
"IdeNotificationContent",
"IdeOpenedFile",
"IdeSelection",
@@ -146,15 +160,15 @@
"format_user_text_content",
"format_user_text_model_content",
"format_ide_notification_content",
- # user_formatters (parsing)
- "parse_slash_command",
- "parse_command_output",
- "parse_bash_input",
- "parse_bash_output",
- "parse_ide_notifications",
+ # user_factory (message creation)
+ "create_slash_command_message",
+ "create_command_output_message",
+ "create_bash_input_message",
+ "create_bash_output_message",
+ "create_ide_notification_content",
# assistant_formatters (content models)
- "AssistantTextContent",
- "ThinkingContentModel",
+ "AssistantTextMessage",
+ "ThinkingMessage",
# assistant_formatters (formatting)
"format_assistant_text_content",
"format_thinking_content",
diff --git a/claude_code_log/html/assistant_formatters.py b/claude_code_log/html/assistant_formatters.py
index cbfe41d0..9bd725f1 100644
--- a/claude_code_log/html/assistant_formatters.py
+++ b/claude_code_log/html/assistant_formatters.py
@@ -2,19 +2,19 @@
This module formats assistant message content types to HTML.
Part of the thematic formatter organization:
-- system_formatters.py: SystemContent, HookSummaryContent
-- user_formatters.py: SlashCommandContent, CommandOutputContent, BashInputContent
-- assistant_formatters.py: AssistantTextContent, ThinkingContentModel, ImageContent
+- system_formatters.py: SystemMessage, HookSummaryMessage
+- user_formatters.py: SlashCommandMessage, CommandOutputMessage, BashInputMessage
+- assistant_formatters.py: AssistantTextMessage, ThinkingMessage, ImageContent
- tool_formatters.py: tool use/result content
Content models are defined in models.py, this module only handles formatting.
"""
from ..models import (
- AssistantTextContent,
+ AssistantTextMessage,
ImageContent,
- ThinkingContentModel,
- UnknownContent,
+ ThinkingMessage,
+ UnknownMessage,
)
from .utils import escape_html, render_markdown_collapsible
@@ -25,7 +25,7 @@
def format_assistant_text_content(
- content: AssistantTextContent,
+ content: AssistantTextMessage,
line_threshold: int = 30,
preview_line_count: int = 10,
) -> str:
@@ -36,7 +36,7 @@ def format_assistant_text_content(
- ImageContent: Rendered as inline
tag with base64 data URL
Args:
- content: AssistantTextContent with text/items to render
+ content: AssistantTextMessage with text/items to render
line_threshold: Number of lines before content becomes collapsible
preview_line_count: Number of preview lines to show when collapsed
@@ -60,14 +60,14 @@ def format_assistant_text_content(
def format_thinking_content(
- content: ThinkingContentModel,
+ content: ThinkingMessage,
line_threshold: int = 20,
preview_line_count: int = 5,
) -> str:
"""Format thinking content as HTML.
Args:
- content: ThinkingContentModel with the thinking text
+ content: ThinkingMessage with the thinking text
line_threshold: Number of lines before content becomes collapsible
preview_line_count: Number of preview lines to show when collapsed
@@ -95,11 +95,11 @@ def format_image_content(image: ImageContent) -> str:
return f'
'
-def format_unknown_content(content: UnknownContent) -> str:
+def format_unknown_content(content: UnknownMessage) -> str:
"""Format unknown content type as HTML.
Args:
- content: UnknownContent with the type name
+ content: UnknownMessage with the type name
Returns:
HTML paragraph with escaped type name
diff --git a/claude_code_log/html/renderer.py b/claude_code_log/html/renderer.py
index c05bc85d..893768e2 100644
--- a/claude_code_log/html/renderer.py
+++ b/claude_code_log/html/renderer.py
@@ -1,30 +1,47 @@
"""HTML renderer implementation for Claude Code transcripts."""
-from functools import partial
from pathlib import Path
-from typing import TYPE_CHECKING, Any, Callable, Optional, Tuple
+from typing import TYPE_CHECKING, Any, Optional, Tuple, cast
from ..cache import get_library_version
from ..models import (
- AssistantTextContent,
- BashInputContent,
- BashOutputContent,
- CommandOutputContent,
- CompactedSummaryContent,
- DedupNoticeContent,
- HookSummaryContent,
- SessionHeaderContent,
- SlashCommandContent,
- SystemContent,
- ThinkingContentModel,
- ToolResultContent,
- ToolResultContentModel,
- ToolUseContent,
+ AssistantTextMessage,
+ BashInputMessage,
+ BashOutputMessage,
+ CommandOutputMessage,
+ CompactedSummaryMessage,
+ DedupNoticeMessage,
+ HookSummaryMessage,
+ SessionHeaderMessage,
+ SlashCommandMessage,
+ SystemMessage,
+ ThinkingMessage,
+ ToolUseMessage,
TranscriptEntry,
- UnknownContent,
- UserMemoryContent,
- UserSlashCommandContent,
- UserTextContent,
+ UnknownMessage,
+ UserMemoryMessage,
+ UserSlashCommandMessage,
+ UserTextMessage,
+ # Tool input types
+ AskUserQuestionInput,
+ BashInput,
+ EditInput,
+ ExitPlanModeInput,
+ MultiEditInput,
+ ReadInput,
+ TaskInput,
+ TodoWriteInput,
+ ToolUseContent,
+ WriteInput,
+ # Tool output types
+ AskUserQuestionOutput,
+ BashOutput,
+ EditOutput,
+ ExitPlanModeOutput,
+ ReadOutput,
+ TaskOutput,
+ ToolResultContent,
+ WriteOutput,
)
from ..renderer import (
Renderer,
@@ -33,7 +50,13 @@
prepare_projects_index,
title_for_projects_index,
)
-from ..renderer_timings import log_timing
+from ..renderer_timings import (
+ DEBUG_TIMING,
+ log_timing,
+ report_timing_statistics,
+ set_timing_var,
+)
+from ..utils import format_timestamp
from .system_formatters import (
format_dedup_notice_content,
format_hook_summary_content,
@@ -55,8 +78,33 @@
format_thinking_content,
format_unknown_content,
)
-from .tool_formatters import format_tool_result_content, format_tool_use_content
-from .utils import css_class_from_message, get_message_emoji, get_template_environment
+from .tool_formatters import (
+ format_askuserquestion_input,
+ format_askuserquestion_output,
+ format_bash_input,
+ format_bash_output,
+ format_edit_input,
+ format_edit_output,
+ format_exitplanmode_input,
+ format_exitplanmode_output,
+ format_multiedit_input,
+ format_read_input,
+ format_read_output,
+ format_task_input,
+ format_task_output,
+ format_todowrite_input,
+ format_tool_result_content_raw,
+ format_write_input,
+ format_write_output,
+ render_params_table,
+)
+from .utils import (
+ css_class_from_message,
+ escape_html,
+ get_message_emoji,
+ get_template_environment,
+ is_session_header,
+)
if TYPE_CHECKING:
from ..cache import CacheManager
@@ -94,77 +142,226 @@ def check_html_version(html_file_path: Path) -> Optional[str]:
class HtmlRenderer(Renderer):
"""HTML renderer for Claude Code transcripts."""
- def _build_dispatcher(self) -> dict[type, Callable[..., str]]:
- """Build content type to HTML formatter mapping.
+ # -------------------------------------------------------------------------
+ # System Content Formatters
+ # -------------------------------------------------------------------------
- Maps MessageContent subclasses to their HTML formatting functions.
- Handlers receive the content directly (not the full TemplateMessage).
- The cast to the correct type happens in format_content().
- """
- return {
- # System content types
- SystemContent: format_system_content,
- HookSummaryContent: format_hook_summary_content,
- SessionHeaderContent: format_session_header_content,
- DedupNoticeContent: format_dedup_notice_content,
- # User content types
- SlashCommandContent: format_slash_command_content,
- CommandOutputContent: format_command_output_content,
- BashInputContent: format_bash_input_content,
- BashOutputContent: format_bash_output_content,
- CompactedSummaryContent: format_compacted_summary_content,
- UserMemoryContent: format_user_memory_content,
- UserSlashCommandContent: format_user_slash_command_content,
- UserTextContent: format_user_text_model_content,
- # Assistant content types
- ThinkingContentModel: partial(format_thinking_content, line_threshold=10),
- AssistantTextContent: format_assistant_text_content,
- UnknownContent: format_unknown_content,
- # Tool content types
- ToolUseContent: format_tool_use_content,
- ToolResultContentModel: self._format_tool_result_content,
- }
-
- def _format_tool_result_content(self, content: ToolResultContentModel) -> str:
- """Format ToolResultContentModel with associated tool context."""
- tool_result = ToolResultContent(
- type="tool_result",
- tool_use_id=content.tool_use_id,
- content=content.content,
- is_error=content.is_error,
- )
- return format_tool_result_content(
- tool_result,
- content.file_path,
- content.tool_name,
+ def format_SystemMessage(self, message: SystemMessage) -> str:
+ return format_system_content(message)
+
+ def format_HookSummaryMessage(self, message: HookSummaryMessage) -> str:
+ return format_hook_summary_content(message)
+
+ def format_SessionHeaderMessage(self, message: SessionHeaderMessage) -> str:
+ return format_session_header_content(message)
+
+ def format_DedupNoticeMessage(self, message: DedupNoticeMessage) -> str:
+ return format_dedup_notice_content(message)
+
+ # -------------------------------------------------------------------------
+ # User Content Formatters
+ # -------------------------------------------------------------------------
+
+ def format_UserTextMessage(self, message: UserTextMessage) -> str:
+ return format_user_text_model_content(message)
+
+ def format_UserSlashCommandMessage(self, message: UserSlashCommandMessage) -> str:
+ return format_user_slash_command_content(message)
+
+ def format_SlashCommandMessage(self, message: SlashCommandMessage) -> str:
+ return format_slash_command_content(message)
+
+ def format_CommandOutputMessage(self, message: CommandOutputMessage) -> str:
+ return format_command_output_content(message)
+
+ def format_BashInputMessage(self, message: BashInputMessage) -> str:
+ return format_bash_input_content(message)
+
+ def format_BashOutputMessage(self, message: BashOutputMessage) -> str:
+ return format_bash_output_content(message)
+
+ def format_CompactedSummaryMessage(self, message: CompactedSummaryMessage) -> str:
+ return format_compacted_summary_content(message)
+
+ def format_UserMemoryMessage(self, message: UserMemoryMessage) -> str:
+ return format_user_memory_content(message)
+
+ # -------------------------------------------------------------------------
+ # Assistant Content Formatters
+ # -------------------------------------------------------------------------
+
+ def format_AssistantTextMessage(self, message: AssistantTextMessage) -> str:
+ return format_assistant_text_content(message)
+
+ def format_ThinkingMessage(self, message: ThinkingMessage) -> str:
+ return format_thinking_content(message, line_threshold=10)
+
+ def format_UnknownMessage(self, message: UnknownMessage) -> str:
+ return format_unknown_content(message)
+
+ # -------------------------------------------------------------------------
+ # Tool Input Formatters
+ # -------------------------------------------------------------------------
+
+ def format_BashInput(self, input: BashInput) -> str:
+ return format_bash_input(input)
+
+ def format_ReadInput(self, input: ReadInput) -> str:
+ return format_read_input(input)
+
+ def format_WriteInput(self, input: WriteInput) -> str:
+ return format_write_input(input)
+
+ def format_EditInput(self, input: EditInput) -> str:
+ return format_edit_input(input)
+
+ def format_MultiEditInput(self, input: MultiEditInput) -> str:
+ return format_multiedit_input(input)
+
+ def format_TaskInput(self, input: TaskInput) -> str:
+ return format_task_input(input)
+
+ def format_TodoWriteInput(self, input: TodoWriteInput) -> str:
+ return format_todowrite_input(input)
+
+ def format_AskUserQuestionInput(self, input: AskUserQuestionInput) -> str:
+ return format_askuserquestion_input(input)
+
+ def format_ExitPlanModeInput(self, input: ExitPlanModeInput) -> str:
+ return format_exitplanmode_input(input)
+
+ def format_ToolUseContent(self, content: ToolUseContent) -> str:
+ return render_params_table(content.input)
+
+ # -------------------------------------------------------------------------
+ # Tool Output Formatters
+ # -------------------------------------------------------------------------
+
+ def format_ReadOutput(self, output: ReadOutput) -> str:
+ return format_read_output(output)
+
+ def format_WriteOutput(self, output: WriteOutput) -> str:
+ return format_write_output(output)
+
+ def format_EditOutput(self, output: EditOutput) -> str:
+ return format_edit_output(output)
+
+ def format_BashOutput(self, output: BashOutput) -> str:
+ return format_bash_output(output)
+
+ def format_TaskOutput(self, output: TaskOutput) -> str:
+ return format_task_output(output)
+
+ def format_AskUserQuestionOutput(self, output: AskUserQuestionOutput) -> str:
+ return format_askuserquestion_output(output)
+
+ def format_ExitPlanModeOutput(self, output: ExitPlanModeOutput) -> str:
+ return format_exitplanmode_output(output)
+
+ def format_ToolResultContent(self, output: ToolResultContent) -> str:
+ return format_tool_result_content_raw(output)
+
+ # -------------------------------------------------------------------------
+ # Tool Input Title Methods (for Renderer.title_ToolUseMessage dispatch)
+ # -------------------------------------------------------------------------
+
+ def _tool_title(
+ self, message: TemplateMessage, icon: str, summary: Optional[str] = None
+ ) -> str:
+ """Format tool title with icon and optional summary."""
+ content = cast(ToolUseMessage, message.content)
+ escaped_name = escape_html(content.tool_name)
+ prefix = f"{icon} " if icon else ""
+ if summary:
+ escaped_summary = escape_html(summary)
+ return f"{prefix}{escaped_name} {escaped_summary}"
+ return f"{prefix}{escaped_name}"
+
+ def title_TodoWriteInput(self, message: TemplateMessage) -> str: # noqa: ARG002
+ return "π Todo List"
+
+ def title_TaskInput(self, message: TemplateMessage) -> str:
+ content = cast(ToolUseMessage, message.content)
+ input = cast(TaskInput, content.input)
+ escaped_name = escape_html(content.tool_name)
+ escaped_subagent = (
+ escape_html(input.subagent_type) if input.subagent_type else ""
)
+ if input.description and input.subagent_type:
+ escaped_desc = escape_html(input.description)
+ return f"π§ {escaped_name} {escaped_desc} ({escaped_subagent})"
+ elif input.description:
+ return self._tool_title(message, "π§", input.description)
+ elif input.subagent_type:
+ return f"π§ {escaped_name} ({escaped_subagent})"
+ return f"π§ {escaped_name}"
+
+ def title_EditInput(self, message: TemplateMessage) -> str:
+ input = cast(EditInput, cast(ToolUseMessage, message.content).input)
+ return self._tool_title(message, "π", input.file_path)
+
+ def title_WriteInput(self, message: TemplateMessage) -> str:
+ input = cast(WriteInput, cast(ToolUseMessage, message.content).input)
+ return self._tool_title(message, "π", input.file_path)
+
+ def title_ReadInput(self, message: TemplateMessage) -> str:
+ input = cast(ReadInput, cast(ToolUseMessage, message.content).input)
+ return self._tool_title(message, "π", input.file_path)
+
+ def title_BashInput(self, message: TemplateMessage) -> str:
+ input = cast(BashInput, cast(ToolUseMessage, message.content).input)
+ return self._tool_title(message, "π»", input.description)
def _flatten_preorder(
self, roots: list[TemplateMessage]
- ) -> list[Tuple[TemplateMessage, str]]:
+ ) -> Tuple[
+ list[Tuple[TemplateMessage, str, str, str]],
+ list[Tuple[str, list[Tuple[float, str]]]],
+ ]:
"""Flatten message tree via pre-order traversal, formatting each message.
- Traverses the tree depth-first (pre-order), formats each message's
- content to HTML, and builds a flat list of (message, html) pairs.
+ Traverses the tree depth-first (pre-order), computes title and formats
+ content to HTML, building a flat list of (message, title, html, timestamp) tuples.
+
+ Also tracks timing statistics for Markdown and Pygments operations when
+ DEBUG_TIMING is enabled.
Args:
roots: Root messages (typically session headers) with children populated
Returns:
- Flat list of (message, html_content) tuples in pre-order
+ Tuple of:
+ - Flat list of (message, title, html_content, formatted_timestamp) tuples
+ - Operation timing data for reporting: [("Markdown", timings), ("Pygments", timings)]
"""
- flat: list[Tuple[TemplateMessage, str]] = []
+ flat: list[Tuple[TemplateMessage, str, str, str]] = []
+
+ # Initialize timing tracking for expensive operations
+ markdown_timings: list[Tuple[float, str]] = []
+ pygments_timings: list[Tuple[float, str]] = []
+ set_timing_var("_markdown_timings", markdown_timings)
+ set_timing_var("_pygments_timings", pygments_timings)
def visit(msg: TemplateMessage) -> None:
+ # Update current message ID for timing tracking
+ set_timing_var("_current_msg_id", msg.message_id)
+ title = self.title_content(msg)
html = self.format_content(msg)
- flat.append((msg, html))
+ formatted_ts = format_timestamp(msg.meta.timestamp if msg.meta else None)
+ flat.append((msg, title, html, formatted_ts))
for child in msg.children:
visit(child)
for root in roots:
visit(root)
- return flat
+ # Return timing data for reporting
+ operation_timings: list[Tuple[str, list[Tuple[float, str]]]] = [
+ ("Markdown", markdown_timings),
+ ("Pygments", pygments_timings),
+ ]
+
+ return flat, operation_timings
def generate(
self,
@@ -185,7 +382,11 @@ def generate(
# Flatten tree via pre-order traversal, formatting content along the way
with log_timing("Content formatting (pre-order)", t_start):
- template_messages = self._flatten_preorder(root_messages)
+ template_messages, operation_timings = self._flatten_preorder(root_messages)
+
+ # Report timing statistics for Markdown/Pygments operations
+ if DEBUG_TIMING:
+ report_timing_statistics([], operation_timings)
# Render template
with log_timing("Template environment setup", t_start):
@@ -204,6 +405,7 @@ def generate(
library_version=get_library_version(),
css_class_from_message=css_class_from_message,
get_message_emoji=get_message_emoji,
+ is_session_header=is_session_header,
)
)
diff --git a/claude_code_log/html/system_formatters.py b/claude_code_log/html/system_formatters.py
index 72eff8b8..2d71876d 100644
--- a/claude_code_log/html/system_formatters.py
+++ b/claude_code_log/html/system_formatters.py
@@ -2,28 +2,28 @@
This module formats SystemTranscriptEntry-derived content types to HTML.
Part of the thematic formatter organization:
-- system_formatters.py: SystemContent, HookSummaryContent
-- user_formatters.py: (future) user message variants
-- assistant_formatters.py: (future) assistant message variants
-- tool_renderers.py: tool use/result content
+- system_formatters.py: SystemMessage, HookSummaryMessage
+- user_formatters.py: SlashCommandMessage, CommandOutputMessage, etc.
+- assistant_formatters.py: AssistantTextMessage, ThinkingMessage, ImageContent
+- tool_formatters.py: tool use/result content
"""
import html
from .ansi_colors import convert_ansi_to_html
from ..models import (
- DedupNoticeContent,
- HookSummaryContent,
- SessionHeaderContent,
- SystemContent,
+ DedupNoticeMessage,
+ HookSummaryMessage,
+ SessionHeaderMessage,
+ SystemMessage,
)
-def format_system_content(content: SystemContent) -> str:
+def format_system_content(content: SystemMessage) -> str:
"""Format a system message with level-specific icon.
Args:
- content: SystemContent with level and text
+ content: SystemMessage with level and text
Returns:
HTML with icon and ANSI-converted text
@@ -33,13 +33,13 @@ def format_system_content(content: SystemContent) -> str:
return f"{level_icon} {html_content}"
-def format_hook_summary_content(content: HookSummaryContent) -> str:
+def format_hook_summary_content(content: HookSummaryMessage) -> str:
"""Format a hook summary as collapsible details.
Shows a compact summary with expandable hook commands and error output.
Args:
- content: HookSummaryContent with execution details
+ content: HookSummaryMessage with execution details
Returns:
HTML with collapsible details section
@@ -79,11 +79,11 @@ def format_hook_summary_content(content: HookSummaryContent) -> str:
"""
-def format_session_header_content(content: SessionHeaderContent) -> str:
+def format_session_header_content(content: SessionHeaderMessage) -> str:
"""Format a session header as HTML.
Args:
- content: SessionHeaderContent with title, session_id, and optional summary
+ content: SessionHeaderMessage with title, session_id, and optional summary
Returns:
HTML for the session header display
@@ -92,11 +92,11 @@ def format_session_header_content(content: SessionHeaderContent) -> str:
return escaped_title
-def format_dedup_notice_content(content: DedupNoticeContent) -> str:
+def format_dedup_notice_content(content: DedupNoticeMessage) -> str:
"""Format a deduplication notice as HTML.
Args:
- content: DedupNoticeContent with notice text and optional target link
+ content: DedupNoticeMessage with notice text and optional target link
Returns:
HTML for the dedup notice display with optional anchor link
diff --git a/claude_code_log/html/templates/components/search.html b/claude_code_log/html/templates/components/search.html
index cc36d353..634e2006 100644
--- a/claude_code_log/html/templates/components/search.html
+++ b/claude_code_log/html/templates/components/search.html
@@ -158,7 +158,7 @@
let prev = messageElement.previousElementSibling;
while (prev) {
if (prev.classList.contains('session-header')) {
- return prev.id.replace('session-', '');
+ return prev.dataset.sessionId || null;
}
prev = prev.previousElementSibling;
}
diff --git a/claude_code_log/html/templates/components/session_nav.html b/claude_code_log/html/templates/components/session_nav.html
index 99568db2..143d4a89 100644
--- a/claude_code_log/html/templates/components/session_nav.html
+++ b/claude_code_log/html/templates/components/session_nav.html
@@ -12,7 +12,7 @@ Session Navigation