Skip to content

fix: improve thread safety and execution logging with minimal changes #184

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Interlace/interlace.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def task_queue_generator_func(arguments, output, repeat):
for i in range(repeat):
tasks_iterator = tasks_generator_func()
for task in tasks_iterator:
output.terminal(Level.THREAD, task.name(), "Added to Queue")
# output.terminal(Level.THREAD, task.name(), "Added to Queue")
yield task


Expand All @@ -38,6 +38,7 @@ def main():
output,
arguments.sober,
silent=arguments.silent,
output_helper=output
)
pool.run()

Expand Down
52 changes: 25 additions & 27 deletions Interlace/lib/threader.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import subprocess
import os
import queue
import platform
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Event
from tqdm import tqdm

import platform
from Interlace.lib.core.output import OutputHelper, Level

if platform.system().lower() == 'linux':
shell = os.getenv("SHELL") if os.getenv("SHELL") else "/bin/sh"
Expand Down Expand Up @@ -60,8 +62,7 @@ def _run_task(self, t=False):
stdout=subprocess.DEVNULL,
encoding="utf-8",
executable=shell)
out, _ = s.communicate()

s.communicate()
return
else:
s = subprocess.Popen(self.task, shell=True,
Expand All @@ -78,57 +79,54 @@ def _run_task(self, t=False):


class Worker(object):
def __init__(self, task_queue, timeout, output, tq):
def __init__(self, task_queue, timeout, output, tq, output_helper):
self.queue = task_queue
self.timeout = timeout
self.output = output
self.tqdm = tq
self.output_helper = output_helper

def __call__(self):
queue = self.queue
while True:
try:
task = next(queue)
task = self.queue.get(timeout=1)
except queue.Empty:
return

self.output_helper.terminal(Level.THREAD, task.name(), "Added to Queue")

try:
if isinstance(self.tqdm, tqdm):
self.tqdm.update(1)
# run task
task.run(self.tqdm)
else:
task.run()
except StopIteration:
break
except Exception as e:
self.output_helper.terminal(Level.ERROR, task.name(), f"Task failed: {e}")


class Pool(object):
def __init__(self, max_workers, task_queue, timeout, output, progress_bar, silent=False):

# convert stdin input to integer
def __init__(self, max_workers, task_queue, timeout, output, progress_bar, silent=False, output_helper=None):
max_workers = int(max_workers)

# check if there are enough workers
if max_workers <= 0:
raise ValueError("Workers must be >= 1")

tasks_count = next(task_queue)

# check if the queue is empty
if not tasks_count:
raise ValueError("The queue is empty")

self.queue = task_queue
self.queue = queue.Queue()
for task in task_queue:
self.queue.put(task)

self.timeout = timeout
self.output = output
self.max_workers = min(tasks_count, max_workers)

if not progress_bar and not silent:
self.tqdm = tqdm(total=tasks_count)
else:
self.tqdm = True
self.output_helper = output_helper or OutputHelper()
self.tqdm = tqdm(total=tasks_count) if not progress_bar and not silent else True

def run(self):
workers = [Worker(self.queue, self.timeout, self.output, self.tqdm) for w in range(self.max_workers)]
workers = [Worker(self.queue, self.timeout, self.output, self.tqdm, self.output_helper)
for _ in range(self.max_workers)]

# run
with ThreadPoolExecutor(self.max_workers) as executors:
for worker in workers:
executors.submit(worker)
Expand All @@ -147,5 +145,5 @@ def run(self):
"sleep 9",
"sleep 1",
"echo 'Char!'"]
p = Pool(4, tasks, 0, 0, True)
p = Pool(4, iter([len(tasks)] + [Task(t) for t in tasks]), 0, 0, True, output_helper=OutputHelper())
p.run()
Loading