Skip to content

Commit

Permalink
Merge pull request #29 from thiippal/no_exit
Browse files Browse the repository at this point in the history
Allow pipelines to continue
  • Loading branch information
thiippal authored Apr 25, 2023
2 parents 0a42c61 + 7b3997e commit b09242a
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 24 deletions.
1 change: 0 additions & 1 deletion examples/config/outline_text_verify.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ interface:
text: Text
image: Image
project:
id: 138067
setup:
public_name: Outline text, letters or numbers in diagrams
public_description: Look at diagrams from science textbooks and state if they
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "abulafia"
version = "0.1.10"
version = "0.1.11"
description = "A tool for fair and reproducible crowdsourcing using Toloka"
readme = "README.md"
requires-python = ">=3.8"
Expand Down
30 changes: 8 additions & 22 deletions src/abulafia/task_specs/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,21 @@ class TaskSequence:
This class allows defining a sequence of crowdsourcing tasks on Toloka.
"""
def __init__(self, sequence, client):
def __init__(self, sequence, client, **kwargs):
"""
This function initialises the TaskSequence class.
Parameters:
sequence: A list of objects that inherit from the CrowdsourcingTask class.
client: A TolokaClient object with valid credentials.
kwargs: Keywords and arguments for additional settings.
"""
# Set up attributes
self.complete = False # Tracks if all tasks have been completed
self.sequence = sequence # A list of CrowdsourcingTask objects
self.client = client # A Toloka Client object
self.pipeline = None # Placeholder for a Toloka Pipeline object
self.no_exit = True if 'no_exit' in kwargs and kwargs['no_exit'] else False # Do not exit after finishing

msg.info(f'Creating a task sequence')

Expand Down Expand Up @@ -120,7 +122,7 @@ async def run_sequence(metrics):

for action in (a for a in self.sequence if hasattr(a, 'aggregator')):

if action.complete == True:
if action.complete:

status.append(True)

Expand All @@ -139,29 +141,13 @@ async def run_sequence(metrics):
# Check the outputs
if self.complete:

exit()
if not self.no_exit:

# Check if tasks are supposed to output the results
for task in self.sequence:
exit()

if hasattr(task, 'pool'):
if self.no_exit:

# Get the output DataFrame for each task; assign under 'output_data'
task.output_data = self.client.get_assignments_df(pool_id=task.pool.id)

# Check if the output should be written to disk
try:

if task.conf['actions'] is not None and 'output' in task.conf['actions']:

# Write the DataFrame to disk
task.output_data.to_csv(f'{task.name}_{task.pool.id}.csv')

msg.good(f'Wrote data for task {task.name} ({task.pool.id}) to disk.')

except KeyError:

pass
pass

def create_pipeline(self):

Expand Down

0 comments on commit b09242a

Please sign in to comment.