diff --git a/Interlace/interlace.py b/Interlace/interlace.py index b3506ba..9733567 100644 --- a/Interlace/interlace.py +++ b/Interlace/interlace.py @@ -15,7 +15,7 @@ def task_queue_generator_func(arguments, output, repeat): for i in range(repeat): tasks_iterator = tasks_generator_func() for task in tasks_iterator: - output.terminal(Level.THREAD, task.name(), "Added to Queue") + # output.terminal(Level.THREAD, task.name(), "Added to Queue") yield task @@ -38,6 +38,7 @@ def main(): output, arguments.sober, silent=arguments.silent, + output_helper=output ) pool.run() diff --git a/Interlace/lib/threader.py b/Interlace/lib/threader.py index 22f28dd..ae5d270 100644 --- a/Interlace/lib/threader.py +++ b/Interlace/lib/threader.py @@ -1,10 +1,12 @@ import subprocess import os +import queue +import platform from concurrent.futures import ThreadPoolExecutor from multiprocessing import Event from tqdm import tqdm -import platform +from Interlace.lib.core.output import OutputHelper, Level if platform.system().lower() == 'linux': shell = os.getenv("SHELL") if os.getenv("SHELL") else "/bin/sh" @@ -60,8 +62,7 @@ def _run_task(self, t=False): stdout=subprocess.DEVNULL, encoding="utf-8", executable=shell) - out, _ = s.communicate() - + s.communicate() return else: s = subprocess.Popen(self.task, shell=True, @@ -78,57 +79,54 @@ def _run_task(self, t=False): class Worker(object): - def __init__(self, task_queue, timeout, output, tq): + def __init__(self, task_queue, timeout, output, tq, output_helper): self.queue = task_queue self.timeout = timeout self.output = output self.tqdm = tq + self.output_helper = output_helper def __call__(self): - queue = self.queue while True: try: - task = next(queue) + task = self.queue.get(timeout=1) + except queue.Empty: + return + + self.output_helper.terminal(Level.THREAD, task.name(), "Added to Queue") + + try: if isinstance(self.tqdm, tqdm): self.tqdm.update(1) - # run task task.run(self.tqdm) else: task.run() - except StopIteration: - break + except Exception as e: + self.output_helper.terminal(Level.ERROR, task.name(), f"Task failed: {e}") class Pool(object): - def __init__(self, max_workers, task_queue, timeout, output, progress_bar, silent=False): - - # convert stdin input to integer + def __init__(self, max_workers, task_queue, timeout, output, progress_bar, silent=False, output_helper=None): max_workers = int(max_workers) - - # check if there are enough workers - if max_workers <= 0: - raise ValueError("Workers must be >= 1") - tasks_count = next(task_queue) - - # check if the queue is empty if not tasks_count: raise ValueError("The queue is empty") - self.queue = task_queue + self.queue = queue.Queue() + for task in task_queue: + self.queue.put(task) + self.timeout = timeout self.output = output self.max_workers = min(tasks_count, max_workers) - if not progress_bar and not silent: - self.tqdm = tqdm(total=tasks_count) - else: - self.tqdm = True + self.output_helper = output_helper or OutputHelper() + self.tqdm = tqdm(total=tasks_count) if not progress_bar and not silent else True def run(self): - workers = [Worker(self.queue, self.timeout, self.output, self.tqdm) for w in range(self.max_workers)] + workers = [Worker(self.queue, self.timeout, self.output, self.tqdm, self.output_helper) + for _ in range(self.max_workers)] - # run with ThreadPoolExecutor(self.max_workers) as executors: for worker in workers: executors.submit(worker) @@ -147,5 +145,5 @@ def run(self): "sleep 9", "sleep 1", "echo 'Char!'"] - p = Pool(4, tasks, 0, 0, True) + p = Pool(4, iter([len(tasks)] + [Task(t) for t in tasks]), 0, 0, True, output_helper=OutputHelper()) p.run()