Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pull_request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.13", "3.14"]
python-version: ["3.13"]
fail-fast: false

steps:
Expand Down
201 changes: 142 additions & 59 deletions src/hother/streamblocks/core/block_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from hother.streamblocks.core.models import BlockCandidate, ExtractedBlock
from hother.streamblocks.core.registry import MetadataValidationFailureMode
from hother.streamblocks.core.types import (
BaseContent,
BaseMetadata,
BlockContentDeltaEvent,
BlockContentEndEvent,
BlockEndEvent,
Expand All @@ -21,6 +23,8 @@
BlockStartEvent,
BlockState,
Event,
ParseResult,
SectionType,
TextContentEvent,
)
from hother.streamblocks.core.utils import get_syntax_name
Expand Down Expand Up @@ -201,7 +205,11 @@ def _handle_boundary(self, candidate: BlockCandidate, line: str, line_number: in
events.append(self._create_section_delta_event(candidate, line, line_number, is_boundary=True))

# Check if metadata section just ended (transition to content)
if self._emit_section_end_events and old_section == "metadata" and candidate.current_section == "content":
if (
self._emit_section_end_events
and old_section == SectionType.METADATA
and candidate.current_section == SectionType.CONTENT
):
metadata_end_event = self._create_metadata_end_event(candidate, line_number)
events.append(metadata_end_event)

Expand Down Expand Up @@ -337,72 +345,71 @@ def reset(self) -> None:
self._candidates.clear()
self._block_ids.clear()

def _try_extract_block(
self,
candidate: BlockCandidate,
line_number: int,
) -> BlockEndEvent | BlockErrorEvent:
"""Try to parse and validate a complete block.
def _parse_candidate(self, candidate: BlockCandidate) -> tuple[str | None, ParseResult[BaseMetadata, BaseContent]]:
"""Extract block type, look up class, and parse the candidate.

Args:
candidate: Block candidate to extract
line_number: Current line number (end of block)
candidate: Block candidate to parse

Returns:
BlockEndEvent if successful, BlockErrorEvent if validation fails
Tuple of (block_type, parse_result)
"""
# Step 1: Extract block_type from candidate
# Extract block_type from candidate
block_type = candidate.syntax.extract_block_type(candidate)

self._logger.debug(
"extracting_block",
syntax=get_syntax_name(candidate.syntax),
block_type=block_type,
start_line=candidate.start_line,
end_line=line_number,
size_bytes=len(candidate.raw_text),
)

# Step 2: Look up block_class from registry
block_class = None
if block_type:
block_class = self._registry.get_block_class(block_type)
# Look up block_class from registry
block_class = self._registry.get_block_class(block_type) if block_type else None

# Step 3: Parse with the appropriate block_class
# Parse with the appropriate block_class
parse_result = candidate.syntax.parse_block(candidate, block_class)

if not parse_result.success:
error = parse_result.error or "Parse failed"
self._logger.warning(
"block_parse_failed",
block_type=block_type,
error=error,
syntax=get_syntax_name(candidate.syntax),
exc_info=parse_result.exception,
)
return self._create_error_event(
candidate, line_number, error, BlockErrorCode.PARSE_FAILED, parse_result.exception
)
return block_type, parse_result

metadata = parse_result.metadata
content = parse_result.content
def _create_extracted_block(
self,
candidate: BlockCandidate,
parse_result: ParseResult[BaseMetadata, BaseContent],
line_number: int,
) -> ExtractedBlock[BaseMetadata, BaseContent]:
"""Create ExtractedBlock from parse result.

if metadata is None or content is None:
error_code = BlockErrorCode.MISSING_METADATA if metadata is None else BlockErrorCode.MISSING_CONTENT
return self._create_error_event(candidate, line_number, "Missing metadata or content", error_code)
Args:
candidate: Block candidate
parse_result: Successful parse result
line_number: Current line number (end of block)

# Create extracted block with metadata, content, and extraction info
block = ExtractedBlock(
metadata=metadata,
content=content,
Returns:
ExtractedBlock with metadata, content, and extraction info
"""
return ExtractedBlock(
metadata=parse_result.metadata, # type: ignore[arg-type] # Checked by caller
content=parse_result.content, # type: ignore[arg-type] # Checked by caller
syntax_name=get_syntax_name(candidate.syntax),
raw_text=candidate.raw_text,
line_start=candidate.start_line,
line_end=line_number,
hash_id=candidate.compute_hash(),
)

# Additional validation from syntax
def _validate_extracted_block(
self,
candidate: BlockCandidate,
block: ExtractedBlock[BaseMetadata, BaseContent],
block_type: str | None,
line_number: int,
) -> BlockErrorEvent | None:
"""Run syntax and registry validation on extracted block.

Args:
candidate: Block candidate
block: Extracted block to validate
block_type: Block type from candidate
line_number: Current line number

Returns:
BlockErrorEvent if validation fails, None if passes
"""
# Syntax validation
if not candidate.syntax.validate_block(block):
self._logger.warning(
"syntax_validation_failed",
Expand All @@ -424,6 +431,24 @@ def _try_extract_block(
candidate, line_number, "Registry validation failed", BlockErrorCode.VALIDATION_FAILED
)

return None

def _create_success_event(
self,
candidate: BlockCandidate,
block: ExtractedBlock[BaseMetadata, BaseContent],
block_type: str | None,
) -> BlockEndEvent:
"""Create BlockEndEvent for successful extraction.

Args:
candidate: Block candidate
block: Successfully extracted and validated block
block_type: Block type from candidate

Returns:
BlockEndEvent with all block information
"""
self._logger.info(
"block_extracted",
block_type=block_type,
Expand All @@ -437,19 +462,79 @@ def _try_extract_block(

event = BlockEndEvent(
block_id=block_id,
block_type=block_type or metadata.block_type,
block_type=block_type or block.metadata.block_type,
syntax=block.syntax_name,
start_line=block.line_start,
end_line=block.line_end,
metadata=metadata.model_dump(),
content=content.model_dump(),
metadata=block.metadata.model_dump(),
content=block.content.model_dump(),
raw_content=block.raw_text,
hash_id=block.hash_id,
)
# Set private attribute after construction
object.__setattr__(event, "_block", block)
return event

def _try_extract_block(
self,
candidate: BlockCandidate,
line_number: int,
) -> BlockEndEvent | BlockErrorEvent:
"""Try to parse and validate a complete block.

This orchestrates the extraction process by delegating to specialized helpers.

Args:
candidate: Block candidate to extract
line_number: Current line number (end of block)

Returns:
BlockEndEvent if successful, BlockErrorEvent if validation fails
"""
# Step 1: Parse the candidate
block_type, parse_result = self._parse_candidate(candidate)

self._logger.debug(
"extracting_block",
syntax=get_syntax_name(candidate.syntax),
block_type=block_type,
start_line=candidate.start_line,
end_line=line_number,
size_bytes=len(candidate.raw_text),
)

# Handle parse failure
if not parse_result.success:
error = parse_result.error or "Parse failed"
self._logger.warning(
"block_parse_failed",
block_type=block_type,
error=error,
syntax=get_syntax_name(candidate.syntax),
exc_info=parse_result.exception,
)
return self._create_error_event(
candidate, line_number, error, BlockErrorCode.PARSE_FAILED, parse_result.exception
)

# Check for missing metadata or content
if parse_result.metadata is None or parse_result.content is None:
error_code = (
BlockErrorCode.MISSING_METADATA if parse_result.metadata is None else BlockErrorCode.MISSING_CONTENT
)
return self._create_error_event(candidate, line_number, "Missing metadata or content", error_code)

# Step 2: Create extracted block
block = self._create_extracted_block(candidate, parse_result, line_number)

# Step 3: Validate
validation_error = self._validate_extracted_block(candidate, block, block_type, line_number)
if validation_error:
return validation_error

# Step 4: Success!
return self._create_success_event(candidate, block, block_type)

def _create_section_delta_event(
self,
candidate: BlockCandidate,
Expand All @@ -470,11 +555,11 @@ def _create_section_delta_event(
Section-specific delta event
"""
block_id = self.get_block_id(candidate.start_line)
section = candidate.current_section or "content"
section = candidate.current_section or SectionType.CONTENT
syntax_name = get_syntax_name(candidate.syntax)
accumulated_size = len(candidate.raw_text)

if section == "header":
if section == SectionType.HEADER:
return BlockHeaderDeltaEvent(
block_id=block_id,
delta=line,
Expand All @@ -483,7 +568,7 @@ def _create_section_delta_event(
accumulated_size=accumulated_size,
inline_metadata=None,
)
if section == "metadata":
if section == SectionType.METADATA:
return BlockMetadataDeltaEvent(
block_id=block_id,
delta=line,
Expand Down Expand Up @@ -580,9 +665,8 @@ def _create_metadata_end_event(
validation_passed = result.passed
validation_error = result.error

# Store validation state in candidate
candidate.metadata_validation_passed = validation_passed
candidate.metadata_validation_error = validation_error
# Cache validation state in candidate
candidate.cache_metadata_validation(validation_passed, validation_error)

return BlockMetadataEndEvent(
block_id=block_id,
Expand Down Expand Up @@ -638,9 +722,8 @@ def _create_content_end_event(
validation_passed = result.passed
validation_error = result.error

# Store validation state in candidate
candidate.content_validation_passed = validation_passed
candidate.content_validation_error = validation_error
# Cache validation state in candidate
candidate.cache_content_validation(validation_passed, validation_error)

return BlockContentEndEvent(
block_id=block_id,
Expand Down
47 changes: 47 additions & 0 deletions src/hother/streamblocks/core/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""Core constants and limits for StreamBlocks.

This module centralizes all numeric constants and default values used throughout
the library, making them easy to discover and modify if needed.
"""

from __future__ import annotations

from dataclasses import dataclass


@dataclass(frozen=True)
class ProcessingLimits:
"""Default limits for stream processing.

These constants define the default safety limits for processing streams
and extracting blocks. They are used as defaults in ProcessorConfig.

Attributes:
MAX_BLOCK_SIZE: Maximum block size in bytes (1 MiB).
Blocks exceeding this limit are rejected to prevent memory exhaustion.
MAX_LINE_LENGTH: Maximum line length in bytes (16 KiB).
Lines exceeding this limit are truncated to prevent memory issues.
HASH_PREFIX_LENGTH: Number of characters used for hash computation (64).
Used when generating block IDs from content hash.
LINES_BUFFER: Default number of recent lines to keep in buffer (5).
Used for debugging and error context.
"""

#: Maximum block size in bytes (1 MiB)
MAX_BLOCK_SIZE: int = 1_048_576

#: Maximum line length in bytes (16 KiB)
MAX_LINE_LENGTH: int = 16_384

#: Number of characters used for hash computation
HASH_PREFIX_LENGTH: int = 64

#: Default number of recent lines to keep in buffer
LINES_BUFFER: int = 5


# Singleton instance for convenient access
LIMITS = ProcessingLimits()

# Type system constants
EXPECTED_BLOCK_TYPE_PARAMS = 2 # For Block[TMetadata, TContent]
Loading