From f27cbd7fa7051d00a63f9ca562a547fe551a954c Mon Sep 17 00:00:00 2001 From: HARVIS AI Agent Date: Thu, 2 Apr 2026 00:52:37 +0200 Subject: [PATCH 1/2] feat: support structured background tasks with live REPL status --- README.rst | 74 ++++++-- examples/async_output_example.py | 27 ++- vulcano/app/background.py | 238 ++++++++++++++++++++++++ vulcano/app/background_test.py | 302 +++++++++++++++++++++++++++++++ vulcano/app/classes.py | 246 +++++++++++++++++-------- vulcano/app/classes_test.py | 104 +++++++++++ 6 files changed, 900 insertions(+), 91 deletions(-) create mode 100644 vulcano/app/background.py create mode 100644 vulcano/app/background_test.py diff --git a/README.rst b/README.rst index 31931d1..5c118d3 100644 --- a/README.rst +++ b/README.rst @@ -532,27 +532,79 @@ instead of being substituted from the context. For a more realistic reference app, prefer ``examples/simple_example.py`` over minimal one-function snippets. -Background output example -~~~~~~~~~~~~~~~~~~~~~~~~~ +Background tasks +~~~~~~~~~~~~~~~~ -If you want to start a background task that keeps writing to stdout while the -REPL remains active, take a look at ``examples/async_output_example.py``. -It demonstrates a command that starts a daemon thread and emits periodic output -without corrupting the interactive prompt. +Vulcano supports structured background tasks with clean REPL rendering and +status tracking. Background tasks can run concurrently with the REPL, +and their output is queued to avoid corrupting the interactive prompt. -Example session: +The ``VulcanoApp`` includes a ``background_tasks`` manager that: + +- Tracks N concurrent background tasks with unique IDs +- Queues background output to prevent prompt corruption +- Displays task status in a bottom toolbar (REPL mode) +- Waits for all tasks to complete in CLI mode before exiting + +Example background task command: + +.. code:: python + + import threading + import time + + @app.command + def start_background(interval=1, ticks=5): + """Start a background task with structured output.""" + + def worker(task_id): + try: + for i in range(ticks): + time.sleep(interval) + app.background_tasks.enqueue_output(task_id, "tick {}".format(i)) + app.background_tasks.mark_completed(task_id) + except Exception as e: + app.background_tasks.mark_failed(task_id, e) + + thread = threading.Thread(target=worker, args=(None,), daemon=True) + task_id = app.background_tasks.register_task( + "background_{}".format(ticks), thread + ) + # Update thread with actual task_id + thread = threading.Thread(target=worker, args=(task_id,), daemon=True) + app.background_tasks._tasks[task_id].thread = thread + thread.start() + return "Background task started ({})".format(task_id) + +See ``examples/async_output_example.py`` for a complete working example. + +Example REPL session with background tasks: .. code:: text πŸŒ‹ start_background interval=1 ticks=3 - Background task started + Background task started (task_0) πŸŒ‹ hello name=Alice Hello Alice! - [background] tick 0 - [background] tick 1 - [background] tick 2 + [task_0] tick 0 + [task_0] tick 1 + [task_0] tick 2 πŸŒ‹ +The bottom toolbar shows active task counts (e.g., ``[2 tasks running]``) +when background work is in progress. + +**CLI mode behavior** β€” When running commands directly from the shell, +Vulcano waits for all background tasks to complete before the process exits: + +.. code:: bash + + $ python your_app.py start_background interval=1 ticks=3 + Background task started (task_0) + [task_0] tick 0 + [task_0] tick 1 + [task_0] tick 2 + Development ----------- diff --git a/examples/async_output_example.py b/examples/async_output_example.py index 4bd8803..301302b 100644 --- a/examples/async_output_example.py +++ b/examples/async_output_example.py @@ -4,7 +4,6 @@ from vulcano.app import VulcanoApp from vulcano.themes import MonokaiTheme - app = VulcanoApp("async_output_demo") @@ -12,18 +11,30 @@ def start_background(interval=1, ticks=5): """Start a background task that prints while the REPL remains active. + Uses the background task manager for structured output and status tracking. + :param int interval: Seconds to wait between messages. :param int ticks: Number of messages to print. """ - def worker(): - for i in range(ticks): - time.sleep(interval) - print("[background] tick {}".format(i)) - - thread = threading.Thread(target=worker, daemon=True) + def worker(task_id): + try: + for i in range(ticks): + time.sleep(interval) + app.background_tasks.enqueue_output(task_id, "tick {}".format(i)) + app.background_tasks.mark_completed(task_id) + except Exception as e: + app.background_tasks.mark_failed(task_id, e) + + thread = threading.Thread(target=worker, args=(None,), daemon=True) + # Register the task before starting the thread + task_id = app.background_tasks.register_task("background_{}".format(ticks), thread) + # Update the thread args with the actual task_id + thread = threading.Thread(target=worker, args=(task_id,), daemon=True) + # Re-register with the correct thread + app.background_tasks._tasks[task_id].thread = thread thread.start() - return "Background task started" + return "Background task started ({})".format(task_id) @app.command diff --git a/vulcano/app/background.py b/vulcano/app/background.py new file mode 100644 index 0000000..a8d24ca --- /dev/null +++ b/vulcano/app/background.py @@ -0,0 +1,238 @@ +"""Background task management for structured async work in Vulcano. + +This module provides a registry-based approach to tracking background tasks, +managing their output, and displaying their status in the REPL. +""" + +from __future__ import annotations + +import queue +import threading +import time +from collections import OrderedDict +from collections.abc import Callable +from dataclasses import dataclass +from enum import Enum + + +class TaskStatus(Enum): + """Enumeration of possible task states.""" + + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + + +@dataclass +class BackgroundTask: + """Represents a single background task with state and output.""" + + task_id: str + name: str + thread: threading.Thread + status: TaskStatus + started_at: float + completed_at: float | None = None + error: Exception | None = None + + +class BackgroundTaskManager: + """Central registry for managing background tasks and their output. + + This manager tracks running tasks, queues their output to avoid + corrupting the REPL prompt, and provides status information for + display in toolbars or other UI elements. + """ + + def __init__(self) -> None: + """Initialize the background task manager.""" + self._tasks: OrderedDict[str, BackgroundTask] = OrderedDict() + self._output_queue: queue.Queue[tuple[str, str]] = queue.Queue() + self._lock = threading.Lock() + self._next_task_id = 0 + self._ui_invalidate_callback: Callable[[], None] | None = None + + def register_task(self, name: str, thread: threading.Thread) -> str: + """Register a new background task. + + Args: + name (str): Human-readable task name. + thread (threading.Thread): The thread executing the task. + + Returns: + str: Unique task ID. + """ + with self._lock: + task_id = "task_{}".format(self._next_task_id) + self._next_task_id += 1 + task = BackgroundTask( + task_id=task_id, + name=name, + thread=thread, + status=TaskStatus.RUNNING, + started_at=time.time(), + ) + self._tasks[task_id] = task + return task_id + + def set_ui_invalidate_callback(self, callback: Callable[[], None]) -> None: + """Set a callback to trigger UI redraw when output is enqueued. + + Args: + callback: Function to call to invalidate/redraw the UI. + """ + self._ui_invalidate_callback = callback + + def mark_completed(self, task_id: str) -> None: + """Mark a task as completed. + + Args: + task_id (str): Task identifier. + """ + with self._lock: + if task_id in self._tasks: + self._tasks[task_id].status = TaskStatus.COMPLETED + self._tasks[task_id].completed_at = time.time() + # Trigger UI refresh to update toolbar + if self._ui_invalidate_callback: + self._ui_invalidate_callback() + + def mark_failed(self, task_id: str, error: Exception) -> None: + """Mark a task as failed. + + Args: + task_id (str): Task identifier. + error (Exception): Exception that caused the failure. + """ + with self._lock: + if task_id in self._tasks: + self._tasks[task_id].status = TaskStatus.FAILED + self._tasks[task_id].completed_at = time.time() + self._tasks[task_id].error = error + # Trigger UI refresh to update toolbar + if self._ui_invalidate_callback: + self._ui_invalidate_callback() + + def enqueue_output(self, task_id: str, message: str) -> None: + """Enqueue output from a background task. + + Args: + task_id (str): Task identifier. + message (str): Message to display. + """ + self._output_queue.put((task_id, message)) + # Trigger UI refresh to display output immediately + if self._ui_invalidate_callback: + self._ui_invalidate_callback() + + def get_queued_output(self) -> list[tuple[str, str]]: + """Retrieve all queued output messages. + + Returns: + list[tuple[str, str]]: List of (task_id, message) tuples. + """ + messages = [] + while not self._output_queue.empty(): + try: + messages.append(self._output_queue.get_nowait()) + except queue.Empty: + break + return messages + + def get_active_tasks(self) -> list[BackgroundTask]: + """Return a list of currently running tasks. + + Returns: + list[BackgroundTask]: Active tasks. + """ + with self._lock: + return [ + task + for task in self._tasks.values() + if task.status == TaskStatus.RUNNING + ] + + def get_all_tasks(self) -> list[BackgroundTask]: + """Return all registered tasks. + + Returns: + list[BackgroundTask]: All tasks regardless of status. + """ + with self._lock: + return list(self._tasks.values()) + + def wait_for_all_tasks(self, timeout: float | None = None) -> None: + """Wait for all registered tasks to complete. + + This is primarily for CLI mode to ensure the process doesn't exit + before background work finishes. + + Args: + timeout (float | None): Maximum time to wait in seconds. + """ + with self._lock: + threads = [task.thread for task in self._tasks.values()] + + for thread in threads: + if thread.is_alive(): + thread.join(timeout=timeout) + + def has_active_tasks(self) -> bool: + """Check if any tasks are currently running. + + Returns: + bool: True if at least one task is running. + """ + with self._lock: + return any( + task.status == TaskStatus.RUNNING for task in self._tasks.values() + ) + + def clear_completed_tasks(self) -> None: + """Remove completed and failed tasks from the registry.""" + with self._lock: + self._tasks = OrderedDict( + (tid, task) + for tid, task in self._tasks.items() + if task.status == TaskStatus.RUNNING + ) + + def get_status_summary(self, include_names: bool = False) -> str: + """Return a formatted status summary for display. + + Args: + include_names (bool): Include task names in the summary. + + Returns: + str: Status summary (e.g., "2 tasks running: task1, task2"). + """ + with self._lock: + active_tasks = [ + task + for task in self._tasks.values() + if task.status == TaskStatus.RUNNING + ] + active_count = len(active_tasks) + if active_count == 0: + return "" + + # Build base count message + if active_count == 1: + base_msg = "1 task running" + else: + base_msg = "{} tasks running".format(active_count) + + # Optionally append task names + if include_names and active_tasks: + task_names = [task.name for task in active_tasks] + # Truncate long names and limit to 3 tasks + display_names = [ + name[:15] + "..." if len(name) > 15 else name + for name in task_names[:3] + ] + names_str = ", ".join(display_names) + if active_count > 3: + names_str += ", ..." + return "{}: {}".format(base_msg, names_str) + + return base_msg diff --git a/vulcano/app/background_test.py b/vulcano/app/background_test.py new file mode 100644 index 0000000..90364a5 --- /dev/null +++ b/vulcano/app/background_test.py @@ -0,0 +1,302 @@ +"""Tests for background task management.""" + +import queue +import threading +import time +import unittest +import unittest.mock + +from vulcano.app.background import ( + BackgroundTaskManager, + TaskStatus, +) + + +class BackgroundTaskManagerTest(unittest.TestCase): + """Test the BackgroundTaskManager class.""" + + def setUp(self): + """Set up a fresh task manager for each test.""" + self.manager = BackgroundTaskManager() + + def test_register_task_assigns_unique_id(self): + """Task registration assigns unique sequential IDs.""" + + def dummy_worker(): + pass + + thread1 = threading.Thread(target=dummy_worker) + thread2 = threading.Thread(target=dummy_worker) + + task_id1 = self.manager.register_task("task1", thread1) + task_id2 = self.manager.register_task("task2", thread2) + + self.assertEqual(task_id1, "task_0") + self.assertEqual(task_id2, "task_1") + + def test_register_task_creates_running_task(self): + """Newly registered tasks start with RUNNING status.""" + + def dummy_worker(): + pass + + thread = threading.Thread(target=dummy_worker) + task_id = self.manager.register_task("test_task", thread) + + tasks = self.manager.get_all_tasks() + self.assertEqual(len(tasks), 1) + self.assertEqual(tasks[0].status, TaskStatus.RUNNING) + self.assertEqual(tasks[0].task_id, task_id) + + def test_mark_completed_updates_status(self): + """Marking a task completed updates its status and timestamp.""" + + def dummy_worker(): + pass + + thread = threading.Thread(target=dummy_worker) + task_id = self.manager.register_task("test_task", thread) + + self.manager.mark_completed(task_id) + + tasks = self.manager.get_all_tasks() + self.assertEqual(tasks[0].status, TaskStatus.COMPLETED) + self.assertIsNotNone(tasks[0].completed_at) + + def test_mark_failed_updates_status_and_error(self): + """Marking a task failed captures the exception.""" + + def dummy_worker(): + pass + + thread = threading.Thread(target=dummy_worker) + task_id = self.manager.register_task("test_task", thread) + + error = ValueError("test error") + self.manager.mark_failed(task_id, error) + + tasks = self.manager.get_all_tasks() + self.assertEqual(tasks[0].status, TaskStatus.FAILED) + self.assertEqual(tasks[0].error, error) + self.assertIsNotNone(tasks[0].completed_at) + + def test_enqueue_and_get_output(self): + """Output can be enqueued and retrieved.""" + task_id = "task_0" + self.manager.enqueue_output(task_id, "message 1") + self.manager.enqueue_output(task_id, "message 2") + + output = self.manager.get_queued_output() + self.assertEqual(len(output), 2) + self.assertEqual(output[0], (task_id, "message 1")) + self.assertEqual(output[1], (task_id, "message 2")) + + # Queue should be empty after retrieval + output = self.manager.get_queued_output() + self.assertEqual(len(output), 0) + + def test_get_active_tasks_filters_running(self): + """get_active_tasks returns only tasks with RUNNING status.""" + + def dummy_worker(): + pass + + thread1 = threading.Thread(target=dummy_worker) + thread2 = threading.Thread(target=dummy_worker) + thread3 = threading.Thread(target=dummy_worker) + + task_id1 = self.manager.register_task("task1", thread1) + task_id2 = self.manager.register_task("task2", thread2) + task_id3 = self.manager.register_task("task3", thread3) + + self.manager.mark_completed(task_id1) + self.manager.mark_failed(task_id2, ValueError("error")) + + active = self.manager.get_active_tasks() + self.assertEqual(len(active), 1) + self.assertEqual(active[0].task_id, task_id3) + + def test_has_active_tasks(self): + """has_active_tasks returns True when tasks are running.""" + self.assertFalse(self.manager.has_active_tasks()) + + def dummy_worker(): + pass + + thread = threading.Thread(target=dummy_worker) + task_id = self.manager.register_task("task", thread) + + self.assertTrue(self.manager.has_active_tasks()) + + self.manager.mark_completed(task_id) + self.assertFalse(self.manager.has_active_tasks()) + + def test_clear_completed_tasks(self): + """clear_completed_tasks removes non-running tasks.""" + + def dummy_worker(): + pass + + thread1 = threading.Thread(target=dummy_worker) + thread2 = threading.Thread(target=dummy_worker) + thread3 = threading.Thread(target=dummy_worker) + + task_id1 = self.manager.register_task("task1", thread1) + task_id2 = self.manager.register_task("task2", thread2) + task_id3 = self.manager.register_task("task3", thread3) + + self.manager.mark_completed(task_id1) + self.manager.mark_failed(task_id2, ValueError("error")) + + self.manager.clear_completed_tasks() + + all_tasks = self.manager.get_all_tasks() + self.assertEqual(len(all_tasks), 1) + self.assertEqual(all_tasks[0].task_id, task_id3) + + def test_get_status_summary(self): + """get_status_summary returns appropriate text.""" + # No tasks + self.assertEqual(self.manager.get_status_summary(), "") + + def dummy_worker(): + pass + + # One task + thread1 = threading.Thread(target=dummy_worker) + self.manager.register_task("task1", thread1) + self.assertEqual(self.manager.get_status_summary(), "1 task running") + + # Multiple tasks + thread2 = threading.Thread(target=dummy_worker) + self.manager.register_task("task2", thread2) + self.assertEqual(self.manager.get_status_summary(), "2 tasks running") + + def test_get_status_summary_with_names(self): + """get_status_summary includes task names when requested.""" + + def dummy_worker(): + pass + + # One task with name + thread1 = threading.Thread(target=dummy_worker) + self.manager.register_task("download_data", thread1) + summary = self.manager.get_status_summary(include_names=True) + self.assertEqual(summary, "1 task running: download_data") + + # Multiple tasks with names + thread2 = threading.Thread(target=dummy_worker) + self.manager.register_task("process_file", thread2) + summary = self.manager.get_status_summary(include_names=True) + self.assertIn("2 tasks running:", summary) + self.assertIn("download_data", summary) + self.assertIn("process_file", summary) + + def test_get_status_summary_truncates_long_names(self): + """get_status_summary truncates long task names.""" + + def dummy_worker(): + pass + + thread = threading.Thread(target=dummy_worker) + long_name = "a" * 30 + self.manager.register_task(long_name, thread) + summary = self.manager.get_status_summary(include_names=True) + + # Name should be truncated to 15 chars + "..." + self.assertIn("aaaaaaaaaaaaaaa...", summary) + + def test_get_status_summary_limits_displayed_tasks(self): + """get_status_summary limits the number of displayed task names.""" + + def dummy_worker(): + pass + + # Register 5 tasks + for i in range(5): + thread = threading.Thread(target=dummy_worker) + self.manager.register_task("task{}".format(i), thread) + + summary = self.manager.get_status_summary(include_names=True) + self.assertIn("5 tasks running:", summary) + # Should show first 3 tasks plus "..." + self.assertIn("task0", summary) + self.assertIn("task1", summary) + self.assertIn("task2", summary) + self.assertIn("...", summary) + + def test_wait_for_all_tasks_waits_for_completion(self): + """wait_for_all_tasks blocks until threads complete.""" + results = [] + + def worker(): + time.sleep(0.05) + results.append("done") + + thread = threading.Thread(target=worker) + self.manager.register_task("task", thread) + thread.start() + + # Results should be empty before waiting + self.assertEqual(len(results), 0) + + self.manager.wait_for_all_tasks() + + # Results should be populated after waiting + self.assertEqual(len(results), 1) + self.assertEqual(results[0], "done") + + def test_mark_completed_triggers_ui_invalidate_callback(self): + """Completing a task invalidates the UI when callback is configured.""" + + callback = unittest.mock.MagicMock() + self.manager.set_ui_invalidate_callback(callback) + thread = threading.Thread(target=lambda: None) + task_id = self.manager.register_task("task", thread) + + self.manager.mark_completed(task_id) + + callback.assert_called_once() + + def test_mark_failed_triggers_ui_invalidate_callback(self): + """Failing a task invalidates the UI when callback is configured.""" + + callback = unittest.mock.MagicMock() + self.manager.set_ui_invalidate_callback(callback) + thread = threading.Thread(target=lambda: None) + task_id = self.manager.register_task("task", thread) + + self.manager.mark_failed(task_id, RuntimeError("boom")) + + callback.assert_called_once() + + def test_enqueue_output_triggers_ui_invalidate_callback(self): + """Enqueuing output invalidates the UI when callback is configured.""" + + callback = unittest.mock.MagicMock() + self.manager.set_ui_invalidate_callback(callback) + + self.manager.enqueue_output("task_0", "hello") + + callback.assert_called_once() + + def test_get_queued_output_handles_empty_race(self): + """Queue empty races are handled gracefully.""" + + class RaceQueue(object): + def empty(self): + return False + + def get_nowait(self): + raise queue.Empty + + original_queue = self.manager._output_queue + self.manager._output_queue = RaceQueue() + try: + self.assertEqual(self.manager.get_queued_output(), []) + finally: + self.manager._output_queue = original_queue + + +if __name__ == "__main__": + unittest.main() diff --git a/vulcano/app/classes.py b/vulcano/app/classes.py index 12efa8a..7406e9a 100644 --- a/vulcano/app/classes.py +++ b/vulcano/app/classes.py @@ -6,14 +6,18 @@ # System imports import sys +import threading +import time from collections.abc import Callable from difflib import SequenceMatcher from pathlib import Path from typing import Any # Third-party imports -from prompt_toolkit import PromptSession +from prompt_toolkit import PromptSession, print_formatted_text +from prompt_toolkit.application import get_app from prompt_toolkit.completion import FuzzyCompleter +from prompt_toolkit.formatted_text import HTML from prompt_toolkit.history import FileHistory from prompt_toolkit.lexers import PygmentsLexer from prompt_toolkit.patch_stdout import patch_stdout @@ -26,6 +30,7 @@ # Local imports from vulcano.exceptions import CommandNotFound, CommandParseError +from .background import BackgroundTaskManager from .lexer import MonokaiTheme, create_lexer __all__ = ["VulcanoApp"] @@ -89,6 +94,8 @@ def __init__( # Flat registry of all CommandGroup objects keyed by their full # dot-path (e.g. {"text": grp, "text.formal": formal_grp}). self._groups: dict[str, Any] = {} + # Background task manager for structured async work. + self.background_tasks: BackgroundTaskManager = BackgroundTaskManager() @property def request_is_for_args(self) -> bool: @@ -211,45 +218,91 @@ def _quote_if_spaced(value: Any) -> str: return '"{}"'.format(str(value).replace('"', '\\"')) return value + def _display_background_output(self, use_prompt_toolkit: bool = False) -> None: + """Display any queued background task output. + + Args: + use_prompt_toolkit: Use prompt_toolkit's print function for clean + output above the prompt line. + """ + messages = self.background_tasks.get_queued_output() + for task_id, message in messages: + output = "[{}] {}".format(task_id, message) + if use_prompt_toolkit: + try: + # Use prompt_toolkit printing to render above the prompt + print_formatted_text(output) + except Exception: + # Fallback to regular print if not in prompt context + print(output) + else: + print(output) + def _exec_from_args(self) -> None: """Execute one or more commands provided in CLI argument mode.""" - # Re-quote argv tokens that contain spaces so that multi-word shell - # arguments (e.g. "Hello world") survive the joinβ†’split round-trip. - quoted_args = [self._quote_if_spaced(a) for a in sys.argv[1:]] - commands = split_list_by_arg(lst=quoted_args, separator="and") - flat_cmds = self._flat_commands - all_names = self.manager.command_names + list(flat_cmds.keys()) - for command in commands: - command_list = command.split() - command_name = command_list[0] - arguments = " ".join(command_list[1:]) - if self.enable_context_formatting: + # Start background output renderer for live streaming during CLI execution + output_active = threading.Event() + output_active.set() + + def cli_output_renderer(): + """Continuously display background task output during CLI execution.""" + while output_active.is_set(): + # Poll for queued output every 50ms for responsive streaming + time.sleep(0.05) + if not self.background_tasks._output_queue.empty(): + self._display_background_output(use_prompt_toolkit=False) + + output_thread = threading.Thread(target=cli_output_renderer, daemon=True) + output_thread.start() + + try: + # Re-quote argv tokens that contain spaces so that multi-word shell + # arguments (e.g. "Hello world") survive the joinβ†’split round-trip. + quoted_args = [self._quote_if_spaced(a) for a in sys.argv[1:]] + commands = split_list_by_arg(lst=quoted_args, separator="and") + flat_cmds = self._flat_commands + all_names = self.manager.command_names + list(flat_cmds.keys()) + for command in commands: + command_list = command.split() + command_name = command_list[0] + arguments = " ".join(command_list[1:]) + if self.enable_context_formatting: + try: + # Quote context values that contain spaces so that a substituted + # multi-word result is still treated as a single argument. + safe_context = { + k: self._quote_if_spaced(v) for k, v in self.context.items() + } + arguments = arguments.format(**safe_context) + except KeyError: + pass try: - # Quote context values that contain spaces so that a substituted - # multi-word result is still treated as a single argument. - safe_context = { - k: self._quote_if_spaced(v) for k, v in self.context.items() - } - arguments = arguments.format(**safe_context) - except KeyError: - pass - try: - args, kwargs = inline_parser(arguments) - except CommandParseError as error: - print( - "🚨 Error parsing arguments for '{}': {}".format( - command_name, error + args, kwargs = inline_parser(arguments) + except CommandParseError as error: + print( + "🚨 Error parsing arguments for '{}': {}".format( + command_name, error + ) ) - ) - raise - try: - self._execute_command(command_name, *args, **kwargs) - except CommandNotFound: - print("πŸ€” Command '{}' not found".format(command_name)) - if self.suggestions: - possible_command = self.suggestions(command_name, all_names) - if possible_command: - print('πŸ’‘ Did you mean: "{}"?'.format(possible_command)) + raise + try: + self._execute_command(command_name, *args, **kwargs) + except CommandNotFound: + print("πŸ€” Command '{}' not found".format(command_name)) + if self.suggestions: + possible_command = self.suggestions(command_name, all_names) + if possible_command: + print('πŸ’‘ Did you mean: "{}"?'.format(possible_command)) + + # Wait for all background tasks to complete before exiting + if self.background_tasks.has_active_tasks(): + self.background_tasks.wait_for_all_tasks() + finally: + # Stop the output renderer thread + output_active.clear() + output_thread.join(timeout=1.0) + # Display any final queued output after stopping the renderer + self._display_background_output(use_prompt_toolkit=False) def _exec_from_repl( self, theme: Any = MonokaiTheme, history_file: str | Path | None = None @@ -267,48 +320,97 @@ def _exec_from_repl( CommandCompleter(self.manager, ignore_case=True, flat_commands=flat_cmds) ) lexer = create_lexer(commands=all_names) + + def bottom_toolbar(): + """Display background task status in the bottom toolbar.""" + status = self.background_tasks.get_status_summary(include_names=True) + if status: + return HTML("[{}]".format(status)) + return "" + + def invalidate_ui(): + """Trigger UI redraw from background threads.""" + app = get_app() + if app.is_running: + # Schedule invalidate on event loop to be thread-safe. + app.invalidate() + + # Register the UI invalidation callback with the background task manager + self.background_tasks.set_ui_invalidate_callback(invalidate_ui) + + # Background thread to continuously display queued output + refresh_active = threading.Event() + refresh_active.set() + + def background_output_renderer(): + """Continuously display background task output during REPL session.""" + while refresh_active.is_set(): + # Check for queued output every 100ms + time.sleep(0.1) + if not self.background_tasks._output_queue.empty(): + self._display_background_output(use_prompt_toolkit=True) + # Trigger UI refresh to update toolbar status + invalidate_ui() + + refresh_thread = threading.Thread( + target=background_output_renderer, daemon=True + ) + refresh_thread.start() + session = PromptSession( completer=manager_completer, lexer=PygmentsLexer(lexer), style=theme.pygments_style(), + bottom_toolbar=bottom_toolbar, **session_extra_options, ) - while self.do_repl: - try: - with patch_stdout(): - user_input = "{}".format(session.prompt(self.prompt)) - except KeyboardInterrupt: - continue # Control-C pressed. Try again. - except EOFError: - break # Control-D Pressed. Finish - - if not user_input.strip(): - continue - command = "" - try: - commands = split_list_by_arg(lst=[user_input], separator="and") - for command_str in commands: - command_str = command_str.strip() - if not command_str: - continue - command_parts = command_str.split() - command = command_parts[0] - arguments = " ".join(command_parts[1:]) - if self.enable_context_formatting: - try: - arguments = arguments.format(**self.context) - except KeyError: - pass - args, kwargs = inline_parser(arguments) - self._execute_command(command, *args, **kwargs) - except CommandNotFound: - print("πŸ€” Command '{}' not found".format(command)) - if self.suggestions: - possible_command = self.suggestions(command, all_names) - if possible_command: - print('πŸ’‘ Did you mean: "{}"?'.format(possible_command)) - except Exception as error: - print("πŸ’₯ Error executing '{}': {}".format(command, error)) + try: + while self.do_repl: + # Display any queued background output before prompting + self._display_background_output() + + try: + with patch_stdout(): + user_input = "{}".format(session.prompt(self.prompt)) + except KeyboardInterrupt: + continue # Control-C pressed. Try again. + except EOFError: + break # Control-D Pressed. Finish + + if not user_input.strip(): + continue + command = "" + try: + commands = split_list_by_arg(lst=[user_input], separator="and") + for command_str in commands: + command_str = command_str.strip() + if not command_str: + continue + command_parts = command_str.split() + command = command_parts[0] + arguments = " ".join(command_parts[1:]) + if self.enable_context_formatting: + try: + arguments = arguments.format(**self.context) + except KeyError: + pass + args, kwargs = inline_parser(arguments) + self._execute_command(command, *args, **kwargs) + except CommandNotFound: + print("πŸ€” Command '{}' not found".format(command)) + if self.suggestions: + possible_command = self.suggestions(command, all_names) + if possible_command: + print('πŸ’‘ Did you mean: "{}"?'.format(possible_command)) + except Exception as error: + print("πŸ’₯ Error executing '{}': {}".format(command, error)) + finally: + # Stop the background refresh thread + refresh_active.clear() + refresh_thread.join(timeout=1.0) + + # Display any remaining background output before exiting + self._display_background_output() def _execute_command(self, command_name: str, *args: Any, **kwargs: Any) -> Any: """Execute a command and persist result in shared context. diff --git a/vulcano/app/classes_test.py b/vulcano/app/classes_test.py index dfa4ef5..8c8e277 100644 --- a/vulcano/app/classes_test.py +++ b/vulcano/app/classes_test.py @@ -585,3 +585,107 @@ def start_bg(): app.run() # Background task should have executed without breaking the REPL flow. + + @patch("vulcano.app.classes.sys") + def test_background_task_manager_initialized(self, sys_mock): + """VulcanoApp instances have a background task manager.""" + sys_mock.argv = ["ensure_no_repl", "test"] + app = VulcanoApp() + self.assertIsNotNone(app.background_tasks) + self.assertFalse(app.background_tasks.has_active_tasks()) + + @patch("vulcano.app.classes.sys") + def test_cli_mode_waits_for_background_tasks(self, sys_mock): + """CLI mode waits for all background tasks to complete.""" + import threading + import time + + sys_mock.argv = ["ensure_no_repl", "start_task"] + app = VulcanoApp() + execution_order = [] + + def background_worker(task_id): + time.sleep(0.05) + execution_order.append("background_done") + app.background_tasks.mark_completed(task_id) + + @app.command() + def start_task(): + thread = threading.Thread( + target=background_worker, args=(None,), daemon=True + ) + task_id = app.background_tasks.register_task("test_task", thread) + thread = threading.Thread( + target=background_worker, args=(task_id,), daemon=True + ) + app.background_tasks._tasks[task_id].thread = thread + thread.start() + execution_order.append("command_done") + return "Task started" + + app.run(print_result=False) + + # Command should finish first, then background task + self.assertEqual(execution_order[0], "command_done") + self.assertEqual(execution_order[1], "background_done") + + @patch(print_builtin) + @patch("vulcano.app.classes.PromptSession") + @patch("vulcano.app.classes.sys") + def test_repl_displays_queued_background_output( + self, sys_mock, prompt_session_mock, print_mock + ): + """REPL displays queued background output before each prompt.""" + session_instance = prompt_session_mock.return_value + session_instance.prompt.side_effect = ("queue_output", EOFError) + sys_mock.argv = ["ensure_repl"] + + app = VulcanoApp() + + @app.command() + def queue_output(): + app.background_tasks.enqueue_output("task_0", "test message") + return "Output queued" + + app.run() + + # Check that the background output was printed + print_calls = [str(call) for call in print_mock.call_args_list] + found_message = any( + "task_0" in call and "test message" in call for call in print_calls + ) + self.assertTrue(found_message) + + @patch(print_builtin) + @patch("vulcano.app.classes.print_formatted_text") + @patch("vulcano.app.classes.sys") + def test_display_background_output_falls_back_to_print( + self, sys_mock, prompt_print_mock, print_mock + ): + """Prompt-toolkit print failures fall back to plain print.""" + sys_mock.argv = ["ensure_no_repl", "noop"] + app = VulcanoApp() + app.background_tasks.enqueue_output("task_0", "hello") + prompt_print_mock.side_effect = RuntimeError("not in prompt") + + app._display_background_output(use_prompt_toolkit=True) + + print_mock.assert_called_with("[task_0] hello") + + @patch("vulcano.app.classes.threading.Thread") + @patch("vulcano.app.classes.sys") + def test_cli_mode_skips_wait_when_no_background_tasks(self, sys_mock, thread_mock): + """CLI mode does not wait when there are no active background tasks.""" + sys_mock.argv = ["ensure_no_repl", "noop"] + app = VulcanoApp() + app.background_tasks.wait_for_all_tasks = MagicMock() + + @app.command() + def noop(): + return "ok" + + app.run(print_result=False) + + app.background_tasks.wait_for_all_tasks.assert_not_called() + thread_mock.return_value.start.assert_called_once() + thread_mock.return_value.join.assert_called_once_with(timeout=1.0) From a9519a0e8c2108715689298c697409913c0ed14b Mon Sep 17 00:00:00 2001 From: HARVIS AI Agent Date: Thu, 2 Apr 2026 01:04:20 +0200 Subject: [PATCH 2/2] test: cover live REPL and CLI background rendering --- vulcano/app/classes_test.py | 168 ++++++++++++++++++++++++++++++++++++ 1 file changed, 168 insertions(+) diff --git a/vulcano/app/classes_test.py b/vulcano/app/classes_test.py index 8c8e277..bef92bc 100644 --- a/vulcano/app/classes_test.py +++ b/vulcano/app/classes_test.py @@ -1,5 +1,6 @@ # -* coding: utf-8 *- # System imports +import threading from unittest import TestCase # Third-party imports @@ -689,3 +690,170 @@ def noop(): app.background_tasks.wait_for_all_tasks.assert_not_called() thread_mock.return_value.start.assert_called_once() thread_mock.return_value.join.assert_called_once_with(timeout=1.0) + + @patch(print_builtin) + @patch("vulcano.app.classes.threading.Thread") + @patch("vulcano.app.classes.sys") + def test_cli_mode_shows_suggestion_for_unknown_command( + self, sys_mock, thread_mock, print_mock + ): + """CLI mode shows suggestions for unknown commands.""" + sys_mock.argv = ["ensure_no_repl", "helo"] + app = VulcanoApp() + app.suggestions = MagicMock(return_value="hello") + + @app.command() + def hello(): + return "hi" + + app.run(print_result=False) + + print_calls = [str(call) for call in print_mock.call_args_list] + self.assertTrue(any("Command 'helo' not found" in call for call in print_calls)) + self.assertTrue(any('Did you mean: "hello"?' in call for call in print_calls)) + thread_mock.return_value.join.assert_called_once_with(timeout=1.0) + + @patch(print_builtin) + @patch("vulcano.app.classes.patch_stdout") + @patch("vulcano.app.classes.PromptSession") + @patch("vulcano.app.classes.get_app") + @patch("vulcano.app.classes.threading.Thread") + @patch("vulcano.app.classes.time.sleep") + @patch("vulcano.app.classes.sys") + def test_repl_invalidation_for_active_tasks( + self, + sys_mock, + sleep_mock, + thread_mock, + get_app_mock, + prompt_session_mock, + patch_stdout_mock, + print_mock, + ): + """REPL invalidates the UI when background output arrives.""" + session_instance = prompt_session_mock.return_value + session_instance.prompt.side_effect = ("queue_output", EOFError) + sys_mock.argv = ["ensure_repl"] + fake_app = MagicMock() + fake_app.is_running = True + get_app_mock.return_value = fake_app + + class ImmediateThread(object): + def __init__(self, target=None, daemon=None): + self._target = target + + def start(self): + if self._target: + self._target() + + def join(self, timeout=None): + return None + + thread_mock.side_effect = lambda target=None, daemon=None: ImmediateThread( + target=target, daemon=daemon + ) + + app = VulcanoApp() + app._display_background_output = MagicMock() + + def stop_refresh(_seconds): + app.do_repl = False + # Make the queue appear empty after the first renderer loop. + app.background_tasks._output_queue.empty = MagicMock(return_value=False) + + sleep_mock.side_effect = stop_refresh + + @app.command() + def queue_output(): + app.background_tasks.enqueue_output("task_0", "hello") + worker = threading.Thread(target=lambda: None, daemon=True) + app.background_tasks.register_task("demo", worker) + return "queued" + + app.run() + + fake_app.invalidate.assert_called() + app._display_background_output.assert_any_call(use_prompt_toolkit=True) + + @patch("vulcano.app.classes.PromptSession") + @patch("vulcano.app.classes.sys") + def test_repl_bottom_toolbar_reflects_background_status( + self, sys_mock, prompt_session_mock + ): + """The REPL bottom toolbar exposes the background task summary.""" + session_instance = prompt_session_mock.return_value + session_instance.prompt.side_effect = EOFError + sys_mock.argv = ["ensure_repl"] + + app = VulcanoApp() + worker = threading.Thread(target=lambda: None, daemon=True) + app.background_tasks.register_task("demo", worker) + + app.run() + + toolbar = prompt_session_mock.call_args.kwargs["bottom_toolbar"] + rendered = toolbar() + self.assertIn("task running", str(rendered)) + self.assertIn("demo", str(rendered)) + + @patch("vulcano.app.classes.threading.Thread") + @patch("vulcano.app.classes.sys") + def test_cli_mode_waits_for_active_background_tasks(self, sys_mock, thread_mock): + """CLI mode waits for active tasks when present.""" + sys_mock.argv = ["ensure_no_repl", "noop"] + app = VulcanoApp() + app.background_tasks.wait_for_all_tasks = MagicMock() + app.background_tasks.has_active_tasks = MagicMock(return_value=True) + + @app.command() + def noop(): + return "ok" + + app.run(print_result=False) + + app.background_tasks.wait_for_all_tasks.assert_called_once() + thread_mock.return_value.join.assert_called_once_with(timeout=1.0) + + @patch("vulcano.app.classes.threading.Thread") + @patch("vulcano.app.classes.time.sleep") + @patch("vulcano.app.classes.sys") + def test_cli_output_renderer_streams_queued_output( + self, sys_mock, sleep_mock, thread_mock + ): + """CLI output renderer flushes queued output while running.""" + sys_mock.argv = ["ensure_no_repl", "noop"] + app = VulcanoApp() + app._display_background_output = MagicMock() + app.background_tasks._output_queue.empty = MagicMock(side_effect=[False, True]) + app.background_tasks.has_active_tasks = MagicMock(return_value=False) + + class ImmediateThread(object): + def __init__(self, target=None, daemon=None): + self._target = target + + def start(self): + if self._target: + try: + self._target() + except StopIteration: + pass + + def join(self, timeout=None): + return None + + thread_mock.side_effect = lambda target=None, daemon=None: ImmediateThread( + target=target, daemon=daemon + ) + + def stop_loop(_seconds): + raise StopIteration() + + sleep_mock.side_effect = stop_loop + + @app.command() + def noop(): + return "ok" + + app.run(print_result=False) + + app._display_background_output.assert_any_call(use_prompt_toolkit=False)