From ee62395f019f9ff6d8c219da205ea87217871d2f Mon Sep 17 00:00:00 2001 From: thiippal Date: Mon, 24 Apr 2023 22:10:16 +0300 Subject: [PATCH 1/7] Add kwargs to Pipeline; implement no_exit --- examples/config/outline_text_verify.yaml | 1 - src/abulafia/task_specs/pipeline.py | 30 +++++++----------------- 2 files changed, 8 insertions(+), 23 deletions(-) diff --git a/examples/config/outline_text_verify.yaml b/examples/config/outline_text_verify.yaml index 1f56c1f..45cf6ba 100644 --- a/examples/config/outline_text_verify.yaml +++ b/examples/config/outline_text_verify.yaml @@ -15,7 +15,6 @@ interface: text: Text image: Image project: - id: 138067 setup: public_name: Outline text, letters or numbers in diagrams public_description: Look at diagrams from science textbooks and state if they diff --git a/src/abulafia/task_specs/pipeline.py b/src/abulafia/task_specs/pipeline.py index ce10df4..e4ad7b6 100644 --- a/src/abulafia/task_specs/pipeline.py +++ b/src/abulafia/task_specs/pipeline.py @@ -20,19 +20,21 @@ class TaskSequence: This class allows defining a sequence of crowdsourcing tasks on Toloka. """ - def __init__(self, sequence, client): + def __init__(self, sequence, client, **kwargs): """ This function initialises the TaskSequence class. Parameters: sequence: A list of objects that inherit from the CrowdsourcingTask class. client: A TolokaClient object with valid credentials. + kwargs: Keywords and arguments for additional settings. """ # Set up attributes self.complete = False # Tracks if all tasks have been completed self.sequence = sequence # A list of CrowdsourcingTask objects self.client = client # A Toloka Client object self.pipeline = None # Placeholder for a Toloka Pipeline object + self.no_exit = True if 'no_exit' in kwargs and kwargs['no_exit'] else False # Do not exit after finishing msg.info(f'Creating a task sequence') @@ -120,7 +122,7 @@ async def run_sequence(metrics): for action in (a for a in self.sequence if hasattr(a, 'aggregator')): - if action.complete == True: + if action.complete: status.append(True) @@ -139,29 +141,13 @@ async def run_sequence(metrics): # Check the outputs if self.complete: - exit() + if not self.no_exit: - # Check if tasks are supposed to output the results - for task in self.sequence: + exit() - if hasattr(task, 'pool'): + if self.no_exit: - # Get the output DataFrame for each task; assign under 'output_data' - task.output_data = self.client.get_assignments_df(pool_id=task.pool.id) - - # Check if the output should be written to disk - try: - - if task.conf['actions'] is not None and 'output' in task.conf['actions']: - - # Write the DataFrame to disk - task.output_data.to_csv(f'{task.name}_{task.pool.id}.csv') - - msg.good(f'Wrote data for task {task.name} ({task.pool.id}) to disk.') - - except KeyError: - - pass + pass def create_pipeline(self): From 7b3997e7beb645a8f7e9038e5f2355b9b3fcadd2 Mon Sep 17 00:00:00 2001 From: thiippal Date: Tue, 25 Apr 2023 14:25:02 +0300 Subject: [PATCH 2/7] update pyproject.toml --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index d4b214c..b979e46 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "abulafia" -version = "0.1.10" +version = "0.1.11" description = "A tool for fair and reproducible crowdsourcing using Toloka" readme = "README.md" requires-python = ">=3.8" From bab2686f3c7db701d91f7383998d416a26d781b2 Mon Sep 17 00:00:00 2001 From: thiippal Date: Thu, 27 Apr 2023 14:25:01 +0300 Subject: [PATCH 3/7] temporary fix to AddOutlines --- src/abulafia/task_specs/task_specs.py | 25 +++++-------------------- 1 file changed, 5 insertions(+), 20 deletions(-) diff --git a/src/abulafia/task_specs/task_specs.py b/src/abulafia/task_specs/task_specs.py index 1bfb0a4..d38f929 100644 --- a/src/abulafia/task_specs/task_specs.py +++ b/src/abulafia/task_specs/task_specs.py @@ -275,7 +275,7 @@ def specify_task(configuration): A Toloka TaskSpec object. """ # Define expected input and output types for the task - expected_i, expected_o = {'url', 'json'}, {'json', 'bool'} + expected_i, expected_o = {'url', 'json'}, {'json'} # Configure Toloka data specifications and check the expected input against configuration data_in, data_out, input_data, output_data = check_io(configuration=configuration, @@ -317,32 +317,17 @@ def specify_task(configuration): # Set up labels for the outlines labels=labels, - disabled=False - ) - - # Create a checkbox for special cases - try: - checkbox = tb.CheckboxFieldV1( - data=tb.OutputData(output_data['bool'], default=False), - label=configuration['interface']['checkbox']) + disabled=False, - except KeyError: - msg.warn(f"Please add the key 'checkbox' under the top-level key 'interface' to " - f"define a text that is displayed above the checkbox. Define the text as a " - f"string e.g. checkbox: There is nothing to outline.", exits=1) + validation=tb.RequiredConditionV1(hint="Please select at least one area!") + ) # Define the text prompt below the segmentation UI prompt = tb.TextViewV1(content=configuration['interface']['prompt']) # Combine the task interface elements into a view interface = toloka.project.TemplateBuilderViewSpec( - view=tb.ListViewV1([img_ui, prompt, checkbox], - validation=tb.AnyConditionV1(conditions=[tb.SchemaConditionV1(data=tb.OutputData(output_data['json']), - schema={'type': 'array', 'minItems': 2}), - tb.EqualsConditionV1(data=tb.OutputData(output_data['bool']), to=True)], - hint="Outline at least one target or check the box if necessary."), - ) - ) + view=tb.ListViewV1([img_ui, prompt])) # Create a task specification with interface and input/output data task_spec = toloka.project.task_spec.TaskSpec( From 3f76189e7fd3d6c7d522adb10cb7c9aef9c2cba6 Mon Sep 17 00:00:00 2001 From: thiippal Date: Wed, 10 May 2023 13:48:16 +0300 Subject: [PATCH 4/7] Hotfix for failing pipelines --- src/abulafia/actions/actions.py | 48 +++++++++++++++++++++-------- src/abulafia/observers/observers.py | 7 ++++- 2 files changed, 41 insertions(+), 14 deletions(-) diff --git a/src/abulafia/actions/actions.py b/src/abulafia/actions/actions.py index 23405f5..4effe36 100644 --- a/src/abulafia/actions/actions.py +++ b/src/abulafia/actions/actions.py @@ -81,24 +81,36 @@ def __call__(self, events: List[AssignmentEvent]) -> None: # Accept the task suite if all assignments in the suite have been verified as correct if all(results) is True: - self.client.accept_assignment(assignment_id=assignment_id, - public_comment=self.conf['messages']['accepted']) + try: + + self.client.accept_assignment(assignment_id=assignment_id, + public_comment=self.conf['messages']['accepted']) + + msg.good(f'Accepted assignment {assignment_id}') + + except IncorrectActionsApiError: - msg.good(f'Accepted assignment {assignment_id}') + msg.fail(f'Failed to accept assignment {assignment_id}!') # Reject the task suite if all assignments in the suite have not been verified as correct if all(results) is not True: - self.client.reject_assignment(assignment_id=assignment_id, - public_comment=self.conf['messages']['rejected']) + try: + + self.client.reject_assignment(assignment_id=assignment_id, + public_comment=self.conf['messages']['rejected']) + + msg.warn(f'Rejected assignment {assignment_id}') - msg.warn(f'Rejected assignment {assignment_id}') + except IncorrectActionsApiError: + + msg.fail(f'Failed to reject assignment {assignment_id}!') # Catch the error that might be raised by manually accepting/rejecting tasks in # the web interface except IncorrectActionsApiError: - msg.warn(f'Could not {"accept" if all(results) == True else "reject"} assignment {assignment_id}') + msg.fail(f'Could not {"accept" if all(results) == True else "reject"} assignment {assignment_id}!') # Append the task suite to the list of processed suites processed.append(assignment_id) @@ -282,16 +294,26 @@ def __call__(self, events: Union[List[AssignmentEvent], List[dict]]) -> None: if solution in self.reject: # TODO Implement dynamic public comment handling - self.client.reject_assignment(assignment_id=event.assignment.tasks[i].input_values['assignment_id'], - public_comment="Assignment was verified as incorrect by another user.") - msg.warn(f'Rejected assignment {event.assignment.tasks[i].input_values["assignment_id"]}') + try: + self.client.reject_assignment(assignment_id=event.assignment.tasks[i].input_values['assignment_id'], + public_comment="Assignment was verified as incorrect by another user.") + msg.warn(f'Rejected assignment {event.assignment.tasks[i].input_values["assignment_id"]}') + + except IncorrectActionsApiError: + + msg.fail(f'Failed to reject {event.assignment.tasks[i].input_values["assignment_id"]}!') # If performer verified the task as correct, accept original assignment and don't forward task if solution in self.accept: - self.client.accept_assignment(assignment_id=event.assignment.tasks[i].input_values['assignment_id'], - public_comment="Assignment was verified as correct by another user.") - msg.good(f'Accepted assignment {event.assignment.tasks[i].input_values["assignment_id"]}') + try: + self.client.accept_assignment(assignment_id=event.assignment.tasks[i].input_values['assignment_id'], + public_comment="Assignment was verified as correct by another user.") + msg.good(f'Accepted assignment {event.assignment.tasks[i].input_values["assignment_id"]}') + + except IncorrectActionsApiError: + + msg.fail(f'Failed to accept {event.assignment.tasks[i].input_values["assignment_id"]}!') # If no forward pool was configured, submit task without forwarding/accepting/rejecting if solution in self.dont_forward: diff --git a/src/abulafia/observers/observers.py b/src/abulafia/observers/observers.py index cd0ff05..b73a921 100644 --- a/src/abulafia/observers/observers.py +++ b/src/abulafia/observers/observers.py @@ -6,7 +6,7 @@ from toloka.client.analytics_request import UniqueWorkersCountPoolAnalytics, ActiveWorkersByFilterCountPoolAnalytics, \ SubmittedAssignmentsCountPoolAnalytics from toloka.client.operations import Operation -from toloka.client.exceptions import DoesNotExistApiError +from toloka.client.exceptions import DoesNotExistApiError, IncorrectActionsApiError # Set up Printer @@ -75,6 +75,11 @@ async def should_resume(self) -> bool: # If a training is found, but it cannot be closed, raise warning msg.warn(f"Attempted to close a training for pool {self.pool.id} that does not exist") + # TODO Possible hotfix for crashing pipelines + except IncorrectActionsApiError: + + pass + if response['request']['name'] == 'unique_workers_count' and not self.limit_reached: # Check if number of submissions has changed: only print update From a2a9b3debf0b0553e0642f95e723cf7776b3b99d Mon Sep 17 00:00:00 2001 From: thiippal Date: Thu, 11 May 2023 21:12:51 +0300 Subject: [PATCH 5/7] Another hotfix for failing pipelines --- src/abulafia/task_specs/pipeline.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/abulafia/task_specs/pipeline.py b/src/abulafia/task_specs/pipeline.py index e4ad7b6..4d27184 100644 --- a/src/abulafia/task_specs/pipeline.py +++ b/src/abulafia/task_specs/pipeline.py @@ -103,9 +103,12 @@ async def run_sequence(metrics): else: - self.client.close_pool(pool_id=task.pool.id) + # Check pool status again + if not self.client.get_pool(pool_id=task.pool.id).is_closed(): - msg.info(f'Closed pool with ID {task.pool.id}') + self.client.close_pool(pool_id=task.pool.id) + + msg.info(f'Closed pool with ID {task.pool.id}') # Check if there is a training pool that should be closed if hasattr(task, 'training') and task.training is not None: @@ -116,9 +119,12 @@ async def run_sequence(metrics): else: - self.client.close_pool(pool_id=task.training.id) + # Check status again + if not self.client.get_pool(pool_id=task.training.id).is_closed(): + + self.client.close_pool(pool_id=task.training.id) - msg.info(f'Closed pool with ID {task.training.id}') + msg.info(f'Closed pool with ID {task.training.id}') for action in (a for a in self.sequence if hasattr(a, 'aggregator')): From 5b8fcdde40224a84efd092e197353e15b69e8f6d Mon Sep 17 00:00:00 2001 From: thiippal Date: Mon, 15 May 2023 20:41:07 +0300 Subject: [PATCH 6/7] Improve blocklist handling -> drop NaN values --- src/abulafia/task_specs/core_task.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/abulafia/task_specs/core_task.py b/src/abulafia/task_specs/core_task.py index 4b3b973..2e07ab2 100644 --- a/src/abulafia/task_specs/core_task.py +++ b/src/abulafia/task_specs/core_task.py @@ -65,9 +65,13 @@ def __init__(self, configuration, client, task_spec, **kwargs): # See if users should be banned from the pool and check that blocklist is configured correctly try: - self.blocklist = list(pd.read_csv(self.pool_conf['blocklist'], sep="\t")["user_id"]) \ + self.blocklist = list(pd.read_csv(self.pool_conf['blocklist'], sep='\t')['user_id'].dropna()) \ if 'blocklist' in self.pool_conf.keys() else [] + if len(self.blocklist) > 0: + + msg.info(f'Successfully loaded a blocklist with {len(self.blocklist)} user IDs.') + except KeyError: msg.warn(f"Could not find the column 'user_id' in the blocklist.", exits=1) From 8f9516ea33698bb2fdb41ec02fb50ae8585cfb76 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 10 Jul 2023 19:05:48 +0000 Subject: [PATCH 7/7] Bump torch from 1.11.0 to 1.13.1 Bumps [torch](https://github.com/pytorch/pytorch) from 1.11.0 to 1.13.1. - [Release notes](https://github.com/pytorch/pytorch/releases) - [Changelog](https://github.com/pytorch/pytorch/blob/main/RELEASE.md) - [Commits](https://github.com/pytorch/pytorch/compare/v1.11.0...v1.13.1) --- updated-dependencies: - dependency-name: torch dependency-type: direct:production ... Signed-off-by: dependabot[bot] --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 7c119a6..42026e2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ --find-links https://download.pytorch.org/whl/torch_stable.html -torch==1.11.0 +torch==1.13.1 crowd_kit==1.0.0 pandas==1.4.1 pytest==7.1.0