Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion locales/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,11 @@
"browseFolders": "Browse Folders:",
"createNewFolder": "Create new folder",
"pathPlaceholder": "Type folder path or select from tree below...",
"root": "Root"
"root": "Root",
"pathTemplate": "Path Template (Optional)",
"pathTemplateHelp": "Use placeholders to preserve or customize folder structure when moving",
"pathTemplatePlaceholder": "e.g., {original_path} or {base_model}/{original_path}",
"availablePlaceholders": "Available"
},
"relinkCivitai": {
"title": "Re-link to Civitai",
Expand Down Expand Up @@ -1140,6 +1144,7 @@
"filenameCannotBeEmpty": "File name cannot be empty",
"renameFailed": "Failed to rename file: {message}",
"moveFailed": "Failed to move model(s): {message}",
"pathTooLong": "Path is too long and exceeds filesystem limits: {message}",
"pleaseSelectRoot": "Please select a {type} root directory",
"nameTooLong": "Model name is limited to 100 characters",
"verificationAlreadyDone": "This group has already been verified",
Expand Down
4 changes: 2 additions & 2 deletions py/routes/base_model_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def attach_service(self, service) -> None:
self.service = service
self.model_type = service.model_type
self.model_file_service = ModelFileService(service.scanner, service.model_type)
self.model_move_service = ModelMoveService(service.scanner)
self.model_move_service = ModelMoveService(service.scanner, service.model_type)
self.model_lifecycle_service = ModelLifecycleService(
scanner=service.scanner,
metadata_manager=MetadataManager,
Expand Down Expand Up @@ -249,7 +249,7 @@ def _ensure_file_service(self) -> ModelFileService:
def _ensure_move_service(self) -> ModelMoveService:
if self.model_move_service is None:
service = self._ensure_service()
self.model_move_service = ModelMoveService(service.scanner)
self.model_move_service = ModelMoveService(service.scanner, service.model_type)
return self.model_move_service

def _ensure_lifecycle_service(self) -> ModelLifecycleService:
Expand Down
6 changes: 4 additions & 2 deletions py/routes/handlers/model_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -937,9 +937,10 @@ async def move_model(self, request: web.Request) -> web.Response:
data = await request.json()
file_path = data.get("file_path")
target_path = data.get("target_path")
path_template = data.get("path_template") # Optional path template
if not file_path or not target_path:
return web.Response(text="File path and target path are required", status=400)
result = await self._move_service.move_model(file_path, target_path)
result = await self._move_service.move_model(file_path, target_path, path_template)
status = 200 if result.get("success") else 500
return web.json_response(result, status=status)
except Exception as exc:
Expand All @@ -951,9 +952,10 @@ async def move_models_bulk(self, request: web.Request) -> web.Response:
data = await request.json()
file_paths = data.get("file_paths", [])
target_path = data.get("target_path")
path_template = data.get("path_template") # Optional path template
if not file_paths or not target_path:
return web.Response(text="File paths and target path are required", status=400)
result = await self._move_service.move_models_bulk(file_paths, target_path)
result = await self._move_service.move_models_bulk(file_paths, target_path, path_template)
return web.json_response(result)
except Exception as exc:
self._logger.error("Error moving models in bulk: %s", exc, exc_info=True)
Expand Down
108 changes: 83 additions & 25 deletions py/services/model_file_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import List, Dict, Optional, Any, Set
from abc import ABC, abstractmethod

from ..utils.utils import calculate_relative_path_for_model, remove_empty_dirs
from ..utils.utils import calculate_relative_path_for_model, remove_empty_dirs, validate_path_length
from ..utils.constants import AUTO_ORGANIZE_BATCH_SIZE
from ..services.settings_manager import get_settings_manager

Expand Down Expand Up @@ -261,18 +261,25 @@ async def _process_single_model(
return

current_dir = os.path.dirname(file_path)

# Skip if already in correct location
if current_dir.replace(os.sep, '/') == target_dir.replace(os.sep, '/'):
result.skipped_count += 1
return

# Check for conflicts
file_name = os.path.basename(file_path)
target_file_path = os.path.join(target_dir, file_name)


# Validate path length before attempting move
is_valid, error_msg = validate_path_length(target_file_path)
if not is_valid:
self._add_result(result, model_name, False, f"Path length error: {error_msg}")
result.failure_count += 1
return

if os.path.exists(target_file_path):
self._add_result(result, model_name, False,
self._add_result(result, model_name, False,
f"Target file already exists: {target_file_path}")
result.failure_count += 1
return
Expand Down Expand Up @@ -324,11 +331,16 @@ async def _calculate_target_directory(
return current_root
else:
# Calculate new relative path based on settings
new_relative_path = calculate_relative_path_for_model(model, self.model_type)

model_roots = self.get_model_roots()
new_relative_path = calculate_relative_path_for_model(
model,
self.model_type,
model_roots=model_roots
)

if not new_relative_path:
return None # Signal to skip

return os.path.join(current_root, new_relative_path).replace(os.sep, '/')

def _add_result(
Expand Down Expand Up @@ -368,40 +380,85 @@ async def _cleanup_empty_directories(self, paths: List[str]) -> Dict[str, int]:

class ModelMoveService:
"""Service for handling individual model moves"""
def __init__(self, scanner):

def __init__(self, scanner, model_type: str = None):
"""Initialize the service

Args:
scanner: Model scanner instance
model_type: Type of model (e.g., 'lora', 'checkpoint') - needed for path templates
"""
self.scanner = scanner
self.model_type = model_type or getattr(scanner, 'model_type', 'lora')

async def move_model(self, file_path: str, target_path: str) -> Dict[str, Any]:
async def move_model(self, file_path: str, target_path: str, path_template: str = None) -> Dict[str, Any]:
"""Move a single model file

Args:
file_path: Source file path
target_path: Target directory path

target_path: Target directory (base path / model root)
path_template: Optional path template with placeholders like {original_path}, {base_model}, etc.

Returns:
Dictionary with move result
"""
try:
# If path_template is provided, calculate the full target path
if path_template:
from ..utils.utils import calculate_relative_path_for_model

# Get model data from cache
cache = await self.scanner.get_cached_data()
model_data = next((m for m in cache.raw_data if m.get('file_path') == file_path), None)

if not model_data:
return {
'success': False,
'error': 'Model not found in cache',
'original_file_path': file_path,
'new_file_path': None
}

# Calculate relative path from template
model_roots = self.scanner.get_model_roots()
relative_path = calculate_relative_path_for_model(
model_data,
self.model_type,
path_template=path_template,
model_roots=model_roots
)

# Combine target_path with calculated relative path
if relative_path:
target_path = os.path.join(target_path, relative_path).replace(os.sep, '/')

source_dir = os.path.dirname(file_path)
if os.path.normpath(source_dir) == os.path.normpath(target_path):
logger.info(f"Source and target directories are the same: {source_dir}")
return {
'success': True,
'success': True,
'message': 'Source and target directories are the same',
'original_file_path': file_path,
'new_file_path': file_path
}

# Validate final path length before attempting move
file_name = os.path.basename(file_path)
final_path = os.path.join(target_path, file_name)
is_valid, error_msg = validate_path_length(final_path)
if not is_valid:
logger.warning(f"Path length validation failed: {error_msg}")
return {
'success': False,
'error': error_msg,
'original_file_path': file_path,
'new_file_path': None
}

new_file_path = await self.scanner.move_model(file_path, target_path)
if new_file_path:
return {
'success': True,
'success': True,
'original_file_path': file_path,
'new_file_path': new_file_path
}
Expand All @@ -421,31 +478,32 @@ async def move_model(self, file_path: str, target_path: str) -> Dict[str, Any]:
'new_file_path': None
}

async def move_models_bulk(self, file_paths: List[str], target_path: str) -> Dict[str, Any]:
async def move_models_bulk(self, file_paths: List[str], target_path: str, path_template: str = None) -> Dict[str, Any]:
"""Move multiple model files

Args:
file_paths: List of source file paths
target_path: Target directory path

target_path: Target directory (base path / model root)
path_template: Optional path template with placeholders like {original_path}, {base_model}, etc.

Returns:
Dictionary with bulk move results
"""
try:
results = []

for file_path in file_paths:
result = await self.move_model(file_path, target_path)
result = await self.move_model(file_path, target_path, path_template)
results.append({
"original_file_path": file_path,
"new_file_path": result.get('new_file_path'),
"success": result['success'],
"message": result.get('message', result.get('error', 'Unknown'))
})

success_count = sum(1 for r in results if r["success"])
failure_count = len(results) - success_count

return {
'success': True,
'message': f'Moved {success_count} of {len(file_paths)} models',
Expand Down
50 changes: 47 additions & 3 deletions py/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,19 +131,22 @@ def calculate_recipe_fingerprint(loras):

return fingerprint

def calculate_relative_path_for_model(model_data: Dict, model_type: str = 'lora') -> str:
def calculate_relative_path_for_model(model_data: Dict, model_type: str = 'lora', path_template: str = None, model_roots: list = None) -> str:
"""Calculate relative path for existing model using template from settings

Args:
model_data: Model data from scanner cache
model_type: Type of model ('lora', 'checkpoint', 'embedding')
path_template: Optional custom path template (if None, uses settings default)
model_roots: Optional list of model roots (needed for {original_path} placeholder)

Returns:
Relative path string (empty string for flat structure)
"""
# Get path template from settings for specific model type
settings_manager = get_settings_manager()
path_template = settings_manager.get_download_path_template(model_type)
if path_template is None:
path_template = settings_manager.get_download_path_template(model_type)

# If template is empty, return empty path (flat structure)
if not path_template:
Expand Down Expand Up @@ -174,15 +177,36 @@ def calculate_relative_path_for_model(model_data: Dict, model_type: str = 'lora'
if not first_tag:
first_tag = 'no tags' # Default if no tags available

# Calculate {original_path} if placeholder is used
original_path = ''
if '{original_path}' in path_template:
file_path = model_data.get('file_path', '')
if file_path and model_roots:
# Find which root this file belongs to
for root in model_roots:
root = root.replace(os.sep, '/')
normalized_file = file_path.replace(os.sep, '/')
if normalized_file.startswith(root):
# Get directory path (without filename) relative to root
file_dir = os.path.dirname(file_path)
rel_path = os.path.relpath(file_dir, root).replace(os.sep, '/')
# Remove leading './' if present
original_path = rel_path if rel_path != '.' else ''
break

# Format the template with available data
formatted_path = path_template
formatted_path = formatted_path.replace('{original_path}', original_path)
formatted_path = formatted_path.replace('{base_model}', mapped_base_model)
formatted_path = formatted_path.replace('{first_tag}', first_tag)
formatted_path = formatted_path.replace('{author}', author)

if model_type == 'embedding':
formatted_path = formatted_path.replace(' ', '_')

# Clean up any double slashes or trailing slashes
formatted_path = formatted_path.replace('//', '/').strip('/')

return formatted_path

def remove_empty_dirs(path):
Expand Down Expand Up @@ -215,5 +239,25 @@ def remove_empty_dirs(path):
removed_count += 1
except OSError:
pass

return removed_count

def validate_path_length(file_path: str, max_length: int = 260) -> tuple[bool, str]:
"""Validate that a file path doesn't exceed filesystem limits

Args:
file_path: The full path to validate
max_length: Maximum allowed path length (default 260 for Windows NTFS)
Linux typically supports much longer paths (4096+)

Returns:
Tuple of (is_valid: bool, message: str)
"""
# Normalize path for accurate length calculation
normalized_path = os.path.normpath(file_path)
current_length = len(normalized_path)

if current_length > max_length:
return False, f"Path exceeds maximum length of {max_length} characters ({current_length} characters): {normalized_path}"

return True, ""
Loading