Skip to content

Commit f27cbd7

Browse files
feat: support structured background tasks with live REPL status
1 parent 91ac7b6 commit f27cbd7

File tree

6 files changed

+900
-91
lines changed

6 files changed

+900
-91
lines changed

README.rst

Lines changed: 63 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -532,27 +532,79 @@ instead of being substituted from the context.
532532
For a more realistic reference app, prefer ``examples/simple_example.py`` over
533533
minimal one-function snippets.
534534

535-
Background output example
536-
~~~~~~~~~~~~~~~~~~~~~~~~~
535+
Background tasks
536+
~~~~~~~~~~~~~~~~
537537

538-
If you want to start a background task that keeps writing to stdout while the
539-
REPL remains active, take a look at ``examples/async_output_example.py``.
540-
It demonstrates a command that starts a daemon thread and emits periodic output
541-
without corrupting the interactive prompt.
538+
Vulcano supports structured background tasks with clean REPL rendering and
539+
status tracking. Background tasks can run concurrently with the REPL,
540+
and their output is queued to avoid corrupting the interactive prompt.
542541

543-
Example session:
542+
The ``VulcanoApp`` includes a ``background_tasks`` manager that:
543+
544+
- Tracks N concurrent background tasks with unique IDs
545+
- Queues background output to prevent prompt corruption
546+
- Displays task status in a bottom toolbar (REPL mode)
547+
- Waits for all tasks to complete in CLI mode before exiting
548+
549+
Example background task command:
550+
551+
.. code:: python
552+
553+
import threading
554+
import time
555+
556+
@app.command
557+
def start_background(interval=1, ticks=5):
558+
"""Start a background task with structured output."""
559+
560+
def worker(task_id):
561+
try:
562+
for i in range(ticks):
563+
time.sleep(interval)
564+
app.background_tasks.enqueue_output(task_id, "tick {}".format(i))
565+
app.background_tasks.mark_completed(task_id)
566+
except Exception as e:
567+
app.background_tasks.mark_failed(task_id, e)
568+
569+
thread = threading.Thread(target=worker, args=(None,), daemon=True)
570+
task_id = app.background_tasks.register_task(
571+
"background_{}".format(ticks), thread
572+
)
573+
# Update thread with actual task_id
574+
thread = threading.Thread(target=worker, args=(task_id,), daemon=True)
575+
app.background_tasks._tasks[task_id].thread = thread
576+
thread.start()
577+
return "Background task started ({})".format(task_id)
578+
579+
See ``examples/async_output_example.py`` for a complete working example.
580+
581+
Example REPL session with background tasks:
544582

545583
.. code:: text
546584
547585
🌋 start_background interval=1 ticks=3
548-
Background task started
586+
Background task started (task_0)
549587
🌋 hello name=Alice
550588
Hello Alice!
551-
[background] tick 0
552-
[background] tick 1
553-
[background] tick 2
589+
[task_0] tick 0
590+
[task_0] tick 1
591+
[task_0] tick 2
554592
🌋
555593
594+
The bottom toolbar shows active task counts (e.g., ``[2 tasks running]``)
595+
when background work is in progress.
596+
597+
**CLI mode behavior** — When running commands directly from the shell,
598+
Vulcano waits for all background tasks to complete before the process exits:
599+
600+
.. code:: bash
601+
602+
$ python your_app.py start_background interval=1 ticks=3
603+
Background task started (task_0)
604+
[task_0] tick 0
605+
[task_0] tick 1
606+
[task_0] tick 2
607+
556608
Development
557609
-----------
558610

examples/async_output_example.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,26 +4,37 @@
44
from vulcano.app import VulcanoApp
55
from vulcano.themes import MonokaiTheme
66

7-
87
app = VulcanoApp("async_output_demo")
98

109

1110
@app.command
1211
def start_background(interval=1, ticks=5):
1312
"""Start a background task that prints while the REPL remains active.
1413
14+
Uses the background task manager for structured output and status tracking.
15+
1516
:param int interval: Seconds to wait between messages.
1617
:param int ticks: Number of messages to print.
1718
"""
1819

19-
def worker():
20-
for i in range(ticks):
21-
time.sleep(interval)
22-
print("[background] tick {}".format(i))
23-
24-
thread = threading.Thread(target=worker, daemon=True)
20+
def worker(task_id):
21+
try:
22+
for i in range(ticks):
23+
time.sleep(interval)
24+
app.background_tasks.enqueue_output(task_id, "tick {}".format(i))
25+
app.background_tasks.mark_completed(task_id)
26+
except Exception as e:
27+
app.background_tasks.mark_failed(task_id, e)
28+
29+
thread = threading.Thread(target=worker, args=(None,), daemon=True)
30+
# Register the task before starting the thread
31+
task_id = app.background_tasks.register_task("background_{}".format(ticks), thread)
32+
# Update the thread args with the actual task_id
33+
thread = threading.Thread(target=worker, args=(task_id,), daemon=True)
34+
# Re-register with the correct thread
35+
app.background_tasks._tasks[task_id].thread = thread
2536
thread.start()
26-
return "Background task started"
37+
return "Background task started ({})".format(task_id)
2738

2839

2940
@app.command

vulcano/app/background.py

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
"""Background task management for structured async work in Vulcano.
2+
3+
This module provides a registry-based approach to tracking background tasks,
4+
managing their output, and displaying their status in the REPL.
5+
"""
6+
7+
from __future__ import annotations
8+
9+
import queue
10+
import threading
11+
import time
12+
from collections import OrderedDict
13+
from collections.abc import Callable
14+
from dataclasses import dataclass
15+
from enum import Enum
16+
17+
18+
class TaskStatus(Enum):
19+
"""Enumeration of possible task states."""
20+
21+
RUNNING = "running"
22+
COMPLETED = "completed"
23+
FAILED = "failed"
24+
25+
26+
@dataclass
27+
class BackgroundTask:
28+
"""Represents a single background task with state and output."""
29+
30+
task_id: str
31+
name: str
32+
thread: threading.Thread
33+
status: TaskStatus
34+
started_at: float
35+
completed_at: float | None = None
36+
error: Exception | None = None
37+
38+
39+
class BackgroundTaskManager:
40+
"""Central registry for managing background tasks and their output.
41+
42+
This manager tracks running tasks, queues their output to avoid
43+
corrupting the REPL prompt, and provides status information for
44+
display in toolbars or other UI elements.
45+
"""
46+
47+
def __init__(self) -> None:
48+
"""Initialize the background task manager."""
49+
self._tasks: OrderedDict[str, BackgroundTask] = OrderedDict()
50+
self._output_queue: queue.Queue[tuple[str, str]] = queue.Queue()
51+
self._lock = threading.Lock()
52+
self._next_task_id = 0
53+
self._ui_invalidate_callback: Callable[[], None] | None = None
54+
55+
def register_task(self, name: str, thread: threading.Thread) -> str:
56+
"""Register a new background task.
57+
58+
Args:
59+
name (str): Human-readable task name.
60+
thread (threading.Thread): The thread executing the task.
61+
62+
Returns:
63+
str: Unique task ID.
64+
"""
65+
with self._lock:
66+
task_id = "task_{}".format(self._next_task_id)
67+
self._next_task_id += 1
68+
task = BackgroundTask(
69+
task_id=task_id,
70+
name=name,
71+
thread=thread,
72+
status=TaskStatus.RUNNING,
73+
started_at=time.time(),
74+
)
75+
self._tasks[task_id] = task
76+
return task_id
77+
78+
def set_ui_invalidate_callback(self, callback: Callable[[], None]) -> None:
79+
"""Set a callback to trigger UI redraw when output is enqueued.
80+
81+
Args:
82+
callback: Function to call to invalidate/redraw the UI.
83+
"""
84+
self._ui_invalidate_callback = callback
85+
86+
def mark_completed(self, task_id: str) -> None:
87+
"""Mark a task as completed.
88+
89+
Args:
90+
task_id (str): Task identifier.
91+
"""
92+
with self._lock:
93+
if task_id in self._tasks:
94+
self._tasks[task_id].status = TaskStatus.COMPLETED
95+
self._tasks[task_id].completed_at = time.time()
96+
# Trigger UI refresh to update toolbar
97+
if self._ui_invalidate_callback:
98+
self._ui_invalidate_callback()
99+
100+
def mark_failed(self, task_id: str, error: Exception) -> None:
101+
"""Mark a task as failed.
102+
103+
Args:
104+
task_id (str): Task identifier.
105+
error (Exception): Exception that caused the failure.
106+
"""
107+
with self._lock:
108+
if task_id in self._tasks:
109+
self._tasks[task_id].status = TaskStatus.FAILED
110+
self._tasks[task_id].completed_at = time.time()
111+
self._tasks[task_id].error = error
112+
# Trigger UI refresh to update toolbar
113+
if self._ui_invalidate_callback:
114+
self._ui_invalidate_callback()
115+
116+
def enqueue_output(self, task_id: str, message: str) -> None:
117+
"""Enqueue output from a background task.
118+
119+
Args:
120+
task_id (str): Task identifier.
121+
message (str): Message to display.
122+
"""
123+
self._output_queue.put((task_id, message))
124+
# Trigger UI refresh to display output immediately
125+
if self._ui_invalidate_callback:
126+
self._ui_invalidate_callback()
127+
128+
def get_queued_output(self) -> list[tuple[str, str]]:
129+
"""Retrieve all queued output messages.
130+
131+
Returns:
132+
list[tuple[str, str]]: List of (task_id, message) tuples.
133+
"""
134+
messages = []
135+
while not self._output_queue.empty():
136+
try:
137+
messages.append(self._output_queue.get_nowait())
138+
except queue.Empty:
139+
break
140+
return messages
141+
142+
def get_active_tasks(self) -> list[BackgroundTask]:
143+
"""Return a list of currently running tasks.
144+
145+
Returns:
146+
list[BackgroundTask]: Active tasks.
147+
"""
148+
with self._lock:
149+
return [
150+
task
151+
for task in self._tasks.values()
152+
if task.status == TaskStatus.RUNNING
153+
]
154+
155+
def get_all_tasks(self) -> list[BackgroundTask]:
156+
"""Return all registered tasks.
157+
158+
Returns:
159+
list[BackgroundTask]: All tasks regardless of status.
160+
"""
161+
with self._lock:
162+
return list(self._tasks.values())
163+
164+
def wait_for_all_tasks(self, timeout: float | None = None) -> None:
165+
"""Wait for all registered tasks to complete.
166+
167+
This is primarily for CLI mode to ensure the process doesn't exit
168+
before background work finishes.
169+
170+
Args:
171+
timeout (float | None): Maximum time to wait in seconds.
172+
"""
173+
with self._lock:
174+
threads = [task.thread for task in self._tasks.values()]
175+
176+
for thread in threads:
177+
if thread.is_alive():
178+
thread.join(timeout=timeout)
179+
180+
def has_active_tasks(self) -> bool:
181+
"""Check if any tasks are currently running.
182+
183+
Returns:
184+
bool: True if at least one task is running.
185+
"""
186+
with self._lock:
187+
return any(
188+
task.status == TaskStatus.RUNNING for task in self._tasks.values()
189+
)
190+
191+
def clear_completed_tasks(self) -> None:
192+
"""Remove completed and failed tasks from the registry."""
193+
with self._lock:
194+
self._tasks = OrderedDict(
195+
(tid, task)
196+
for tid, task in self._tasks.items()
197+
if task.status == TaskStatus.RUNNING
198+
)
199+
200+
def get_status_summary(self, include_names: bool = False) -> str:
201+
"""Return a formatted status summary for display.
202+
203+
Args:
204+
include_names (bool): Include task names in the summary.
205+
206+
Returns:
207+
str: Status summary (e.g., "2 tasks running: task1, task2").
208+
"""
209+
with self._lock:
210+
active_tasks = [
211+
task
212+
for task in self._tasks.values()
213+
if task.status == TaskStatus.RUNNING
214+
]
215+
active_count = len(active_tasks)
216+
if active_count == 0:
217+
return ""
218+
219+
# Build base count message
220+
if active_count == 1:
221+
base_msg = "1 task running"
222+
else:
223+
base_msg = "{} tasks running".format(active_count)
224+
225+
# Optionally append task names
226+
if include_names and active_tasks:
227+
task_names = [task.name for task in active_tasks]
228+
# Truncate long names and limit to 3 tasks
229+
display_names = [
230+
name[:15] + "..." if len(name) > 15 else name
231+
for name in task_names[:3]
232+
]
233+
names_str = ", ".join(display_names)
234+
if active_count > 3:
235+
names_str += ", ..."
236+
return "{}: {}".format(base_msg, names_str)
237+
238+
return base_msg

0 commit comments

Comments
 (0)