Skip to content

Commit 045a5a0

Browse files
committed
fix: improve thread safety and execution logging with minimal changes
- Replaces generator-based task queue with thread-safe queue.Queue in threader.py to prevent thread starvation - Moves '[THREAD] Added to Queue' logging from task generation to execution time - Integrates OutputHelper into Worker for consistent styled logs - Preserves original behavior of stderr visibility and test harness - Updates interlace.py to pass output_helper, removes duplicate logging - Retains original structure, repeat logic, and test interface to minimize diff
1 parent 79b8949 commit 045a5a0

File tree

2 files changed

+27
-28
lines changed

2 files changed

+27
-28
lines changed

Interlace/interlace.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def task_queue_generator_func(arguments, output, repeat):
1515
for i in range(repeat):
1616
tasks_iterator = tasks_generator_func()
1717
for task in tasks_iterator:
18-
output.terminal(Level.THREAD, task.name(), "Added to Queue")
18+
# output.terminal(Level.THREAD, task.name(), "Added to Queue")
1919
yield task
2020

2121

@@ -38,6 +38,7 @@ def main():
3838
output,
3939
arguments.sober,
4040
silent=arguments.silent,
41+
output_helper=output
4142
)
4243
pool.run()
4344

Interlace/lib/threader.py

+25-27
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import subprocess
22
import os
3+
import queue
4+
import platform
35
from concurrent.futures import ThreadPoolExecutor
46
from multiprocessing import Event
57
from tqdm import tqdm
68

7-
import platform
9+
from Interlace.lib.core.output import OutputHelper, Level
810

911
if platform.system().lower() == 'linux':
1012
shell = os.getenv("SHELL") if os.getenv("SHELL") else "/bin/sh"
@@ -60,8 +62,7 @@ def _run_task(self, t=False):
6062
stdout=subprocess.DEVNULL,
6163
encoding="utf-8",
6264
executable=shell)
63-
out, _ = s.communicate()
64-
65+
s.communicate()
6566
return
6667
else:
6768
s = subprocess.Popen(self.task, shell=True,
@@ -78,57 +79,54 @@ def _run_task(self, t=False):
7879

7980

8081
class Worker(object):
81-
def __init__(self, task_queue, timeout, output, tq):
82+
def __init__(self, task_queue, timeout, output, tq, output_helper):
8283
self.queue = task_queue
8384
self.timeout = timeout
8485
self.output = output
8586
self.tqdm = tq
87+
self.output_helper = output_helper
8688

8789
def __call__(self):
88-
queue = self.queue
8990
while True:
9091
try:
91-
task = next(queue)
92+
task = self.queue.get(timeout=1)
93+
except queue.Empty:
94+
return
95+
96+
self.output_helper.terminal(Level.THREAD, task.name(), "Added to Queue")
97+
98+
try:
9299
if isinstance(self.tqdm, tqdm):
93100
self.tqdm.update(1)
94-
# run task
95101
task.run(self.tqdm)
96102
else:
97103
task.run()
98-
except StopIteration:
99-
break
104+
except Exception as e:
105+
self.output_helper.terminal(Level.ERROR, task.name(), f"Task failed: {e}")
100106

101107

102108
class Pool(object):
103-
def __init__(self, max_workers, task_queue, timeout, output, progress_bar, silent=False):
104-
105-
# convert stdin input to integer
109+
def __init__(self, max_workers, task_queue, timeout, output, progress_bar, silent=False, output_helper=None):
106110
max_workers = int(max_workers)
107-
108-
# check if there are enough workers
109-
if max_workers <= 0:
110-
raise ValueError("Workers must be >= 1")
111-
112111
tasks_count = next(task_queue)
113-
114-
# check if the queue is empty
115112
if not tasks_count:
116113
raise ValueError("The queue is empty")
117114

118-
self.queue = task_queue
115+
self.queue = queue.Queue()
116+
for task in task_queue:
117+
self.queue.put(task)
118+
119119
self.timeout = timeout
120120
self.output = output
121121
self.max_workers = min(tasks_count, max_workers)
122122

123-
if not progress_bar and not silent:
124-
self.tqdm = tqdm(total=tasks_count)
125-
else:
126-
self.tqdm = True
123+
self.output_helper = output_helper or OutputHelper()
124+
self.tqdm = tqdm(total=tasks_count) if not progress_bar and not silent else True
127125

128126
def run(self):
129-
workers = [Worker(self.queue, self.timeout, self.output, self.tqdm) for w in range(self.max_workers)]
127+
workers = [Worker(self.queue, self.timeout, self.output, self.tqdm, self.output_helper)
128+
for _ in range(self.max_workers)]
130129

131-
# run
132130
with ThreadPoolExecutor(self.max_workers) as executors:
133131
for worker in workers:
134132
executors.submit(worker)
@@ -147,5 +145,5 @@ def run(self):
147145
"sleep 9",
148146
"sleep 1",
149147
"echo 'Char!'"]
150-
p = Pool(4, tasks, 0, 0, True)
148+
p = Pool(4, iter([len(tasks)] + [Task(t) for t in tasks]), 0, 0, True, output_helper=OutputHelper())
151149
p.run()

0 commit comments

Comments
 (0)