Skip to content

Commit

Permalink
Merge branch 'main' into check_io
Browse files Browse the repository at this point in the history
  • Loading branch information
thiippal authored Jul 10, 2023
2 parents e4bd7a9 + 4f81eca commit 74498e1
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 29 deletions.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ pytest==7.4.0
PyYAML==6.0
shapely==2.0.1
toloka_kit==1.4.1
wasabi==1.1.1
wasabi==1.1.1
7 changes: 6 additions & 1 deletion src/abulafia/observers/observers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from toloka.client.analytics_request import UniqueWorkersCountPoolAnalytics, ActiveWorkersByFilterCountPoolAnalytics, \
SubmittedAssignmentsCountPoolAnalytics
from toloka.client.operations import Operation
from toloka.client.exceptions import DoesNotExistApiError
from toloka.client.exceptions import DoesNotExistApiError, IncorrectActionsApiError


# Set up Printer
Expand Down Expand Up @@ -75,6 +75,11 @@ async def should_resume(self) -> bool:
# If a training is found, but it cannot be closed, raise warning
msg.warn(f"Attempted to close a training for pool {self.pool.id} that does not exist")

# TODO Possible hotfix for crashing pipelines
except IncorrectActionsApiError:

pass

if response['request']['name'] == 'unique_workers_count' and not self.limit_reached:

# Check if number of submissions has changed: only print update
Expand Down
6 changes: 5 additions & 1 deletion src/abulafia/task_specs/core_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,13 @@ def __init__(self, configuration, client, task_spec, **kwargs):
# See if users should be banned from the pool and check that blocklist is configured correctly
try:

self.blocklist = list(pd.read_csv(self.pool_conf['blocklist'], sep="\t")["user_id"]) \
self.blocklist = list(pd.read_csv(self.pool_conf['blocklist'], sep='\t')['user_id'].dropna()) \
if 'blocklist' in self.pool_conf.keys() else []

if len(self.blocklist) > 0:

msg.info(f'Successfully loaded a blocklist with {len(self.blocklist)} user IDs.')

except KeyError:

msg.fail(f"Could not find the column 'user_id' in the blocklist.", exits=1)
Expand Down
44 changes: 18 additions & 26 deletions src/abulafia/task_specs/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,21 @@ class TaskSequence:
This class allows defining a sequence of crowdsourcing tasks on Toloka.
"""
def __init__(self, sequence, client):
def __init__(self, sequence, client, **kwargs):
"""
This function initialises the TaskSequence class.
Parameters:
sequence: A list of objects that inherit from the CrowdsourcingTask class.
client: A TolokaClient object with valid credentials.
kwargs: Keywords and arguments for additional settings.
"""
# Set up attributes
self.complete = False # Tracks if all tasks have been completed
self.sequence = sequence # A list of CrowdsourcingTask objects
self.client = client # A Toloka Client object
self.pipeline = None # Placeholder for a Toloka Pipeline object
self.no_exit = True if 'no_exit' in kwargs and kwargs['no_exit'] else False # Do not exit after finishing

msg.info(f'Creating a task sequence')

Expand Down Expand Up @@ -101,9 +103,12 @@ async def run_sequence(metrics):

else:

self.client.close_pool(pool_id=task.pool.id)
# Check pool status again
if not self.client.get_pool(pool_id=task.pool.id).is_closed():

msg.info(f'Closed pool with ID {task.pool.id}')
self.client.close_pool(pool_id=task.pool.id)

msg.info(f'Closed pool with ID {task.pool.id}')

# Check if there is a training pool that should be closed
if hasattr(task, 'training') and task.training is not None:
Expand All @@ -114,13 +119,16 @@ async def run_sequence(metrics):

else:

self.client.close_pool(pool_id=task.training.id)
# Check status again
if not self.client.get_pool(pool_id=task.training.id).is_closed():

self.client.close_pool(pool_id=task.training.id)

msg.info(f'Closed pool with ID {task.training.id}')
msg.info(f'Closed pool with ID {task.training.id}')

for action in (a for a in self.sequence if hasattr(a, 'aggregator')):

if action.complete == True:
if action.complete:

status.append(True)

Expand All @@ -139,29 +147,13 @@ async def run_sequence(metrics):
# Check the outputs
if self.complete:

exit()

# Check if tasks are supposed to output the results
for task in self.sequence:

if hasattr(task, 'pool'):

# Get the output DataFrame for each task; assign under 'output_data'
task.output_data = self.client.get_assignments_df(pool_id=task.pool.id)
if not self.no_exit:

# Check if the output should be written to disk
try:

if task.conf['actions'] is not None and 'output' in task.conf['actions']:

# Write the DataFrame to disk
task.output_data.to_csv(f'{task.name}_{task.pool.id}.csv')
exit()

msg.good(f'Wrote data for task {task.name} ({task.pool.id}) to disk.')

except KeyError:
if self.no_exit:

pass
pass

def create_pipeline(self):

Expand Down

0 comments on commit 74498e1

Please sign in to comment.