diff --git a/.github/workflows/ci_cd.yml b/.github/workflows/ci_cd.yml index e3272ad4..d7126e39 100644 --- a/.github/workflows/ci_cd.yml +++ b/.github/workflows/ci_cd.yml @@ -84,7 +84,7 @@ jobs: source venv/bin/activate # use version of ifcopenshell with desired schema parsing # TODO: revert to pyPI when schema parsing is published in the future - wget -O /tmp/ifcopenshell_python.zip "https://s3.amazonaws.com/ifcopenshell-builds/ifcopenshell-python-311-v0.8.3-260bc80-linux64.zip" + wget -O /tmp/ifcopenshell_python.zip "https://s3.amazonaws.com/ifcopenshell-builds/ifcopenshell-python-311-v0.8.4-6924012-linux64.zip" mkdir -p venv/lib/python3.11/site-packages unzip -d venv/lib/python3.11/site-packages /tmp/ifcopenshell_python.zip diff --git a/backend/Makefile b/backend/Makefile index ff33cec0..a8a3ad90 100644 --- a/backend/Makefile +++ b/backend/Makefile @@ -14,7 +14,7 @@ venv: install: venv $(PIP) install --upgrade pip find . -name 'requirements.txt' -exec $(PIP) install -r {} \; - wget -O /tmp/ifcopenshell_python.zip "https://s3.amazonaws.com/ifcopenshell-builds/ifcopenshell-python-311-v0.8.3-260bc80-linux64.zip" + wget -O /tmp/ifcopenshell_python.zip "https://s3.amazonaws.com/ifcopenshell-builds/ifcopenshell-python-311-v0.8.4-6924012-linux64.zip" mkdir -p $(VIRTUAL_ENV)/lib/python3.11/site-packages unzip -f -d $(VIRTUAL_ENV)/lib/python3.11/site-packages /tmp/ifcopenshell_python.zip rm /tmp/ifcopenshell_python.zip @@ -22,7 +22,7 @@ install: venv install-macos: venv find . -name 'requirements.txt' -exec $(PIP) install -r {} \; $(PIP) install -r requirements.txt - wget -O /tmp/ifcopenshell_python.zip "https://s3.amazonaws.com/ifcopenshell-builds/ifcopenshell-python-311-v0.8.3-260bc80-macos64.zip" + wget -O /tmp/ifcopenshell_python.zip "https://s3.amazonaws.com/ifcopenshell-builds/ifcopenshell-python-311-v0.8.4-6924012-macos64.zip" mkdir -p $(VIRTUAL_ENV)/lib/python3.11/site-packages unzip /tmp/ifcopenshell_python.zip -d $(VIRTUAL_ENV)/lib/python3.11/site-packages rm /tmp/ifcopenshell_python.zip @@ -30,7 +30,7 @@ install-macos: venv install-macos-m1: venv find . -name 'requirements.txt' -exec $(PIP) install -r {} \; $(PIP) install -r requirements.txt - wget -O /tmp/ifcopenshell_python.zip "https://s3.amazonaws.com/ifcopenshell-builds/ifcopenshell-python-311-v0.8.3-260bc80-macosm164.zip" + wget -O /tmp/ifcopenshell_python.zip "https://s3.amazonaws.com/ifcopenshell-builds/ifcopenshell-python-311-v0.8.4-6924012-macosm164.zip" mkdir -p $(VIRTUAL_ENV)/lib/python3.11/site-packages unzip /tmp/ifcopenshell_python.zip -d $(VIRTUAL_ENV)/lib/python3.11/site-packages rm /tmp/ifcopenshell_python.zip @@ -66,7 +66,7 @@ stop-worker: -$(PYTHON) -m celery -A core control shutdown \ --destination=worker@$(shell hostname) || true -test: test-models test-bsdd-task test-header-validation-task test-syntax-task test-syntax-header-validation-task test-schema-task +test: test-models test-header-validation-task test-syntax-task test-syntax-header-validation-task test-schema-task test-models: MEDIA_ROOT=./apps/ifc_validation/fixtures $(PYTHON) manage.py test apps/ifc_validation_models --settings apps.ifc_validation_models.test_settings --debug-mode --verbosity 3 diff --git a/backend/apps/ifc_validation/tasks.py b/backend/apps/ifc_validation/tasks.py deleted file mode 100644 index 3af912f5..00000000 --- a/backend/apps/ifc_validation/tasks.py +++ /dev/null @@ -1,1213 +0,0 @@ -import os -import sys -import datetime -import subprocess -import functools -import json -import ifcopenshell - -from celery import shared_task, chain, chord, group -from celery.utils.log import get_task_logger -from django.db import transaction -from django.db.utils import IntegrityError - - -from core.utils import log_execution -from core.settings import DJANGO_DB_BULK_CREATE_BATCH_SIZE - -from apps.ifc_validation_models.settings import TASK_TIMEOUT_LIMIT, MEDIA_ROOT -from apps.ifc_validation_models.decorators import requires_django_user_context -from apps.ifc_validation_models.models import * - -from .email_tasks import * - -logger = get_task_logger(__name__) - -PROGRESS_INCREMENTS = { - 'instance_completion_subtask': 5, - 'syntax_validation_subtask': 5, - 'header_syntax_validation_subtask': 5, - 'header_validation_subtask': 10, - 'prerequisites_subtask': 10, - 'schema_validation_subtask': 10, - 'digital_signatures_subtask': 5, - 'bsdd_validation_subtask': 0, - 'normative_rules_ia_validation_subtask': 20, - 'normative_rules_ip_validation_subtask': 20, - 'industry_practices_subtask': 10 -} - -assert sum(PROGRESS_INCREMENTS.values()) == 100 - - -def update_progress(func): - @functools.wraps(func) - def wrapper(self, *args, **kwargs): - return_value = func(self, *args, **kwargs) - try: - request_id = args[1] - # @nb not the most efficient because we fetch the ValidationRequest anew, but - # assuming django will cache this efficiently enough for us to keep the code clean - request = ValidationRequest.objects.get(pk=request_id) - increment = PROGRESS_INCREMENTS.get(func.__name__, 0) - request.progress = min(request.progress + increment, 100) - request.save() - except Exception as e: - print(f"Error updating progress for {func.__name__}: {e}") - return return_value - return wrapper - - -@functools.lru_cache(maxsize=1024) -def get_absolute_file_path(file_name): - - """ - Resolves the absolute file path of an uploaded file and checks if it exists. - It tries resolving Django MEDIA_ROOT and current working directory, and caches the result. - - Mandatory Args: - file_name: relative file name of the uploaded file. - - Returns: - Absolute file path of the uploaded file. - """ - - ifc_fn = os.path.join(MEDIA_ROOT, file_name) - - if not os.path.exists(ifc_fn): - ifc_fn2 = os.path.join(os.getcwd(), ifc_fn) - if not os.path.exists(ifc_fn2): - raise FileNotFoundError(f"File path for file_name={file_name} was not found (tried loading '{ifc_fn}' and '{ifc_fn2}').") - - ifc_fn = os.path.abspath(ifc_fn) - - logger.debug(f"get_absolute_file_path(): file_name={file_name} returned '{ifc_fn}'") - return ifc_fn - - -@shared_task(bind=True) -@log_execution -def error_handler(self, *args, **kwargs): - - on_workflow_failed.delay(*args, **kwargs) - - -@shared_task(bind=True) -@log_execution -def chord_error_handler(self, request, exc, traceback, *args, **kwargs): - - on_workflow_failed.apply_async([request, exc, traceback]) - - -@shared_task(bind=True) -@log_execution -@requires_django_user_context -def on_workflow_started(self, *args, **kwargs): - - # update status - id = self.request.args[0] - reason = f"args={args} kwargs={kwargs}" - request = ValidationRequest.objects.get(pk=id) - request.mark_as_initiated(reason) - - # queue sending emails - nbr_of_tasks = request.tasks.count() - if nbr_of_tasks == 0: - # send_acknowledgement_user_email_task.delay(id=id, file_name=request.file_name) # disabled - send_acknowledgement_admin_email_task.delay(id=id, file_name=request.file_name) - else: - # send_revalidating_user_email_task.delay(id=id, file_name=request.file_name) # disabled - send_revalidating_admin_email_task.delay(id=id, file_name=request.file_name) - - -@shared_task(bind=True) -@log_execution -@requires_django_user_context -def on_workflow_completed(self, *args, **kwargs): - - # update status - id = args[1] - reason = "Processing completed" - request = ValidationRequest.objects.get(pk=id) - request.mark_as_completed(reason) - - # queue sending email - send_completion_email_task.delay(id=id, file_name=request.file_name) - - -@shared_task(bind=True) -@log_execution -@requires_django_user_context -def on_workflow_failed(self, *args, **kwargs): - - logger.debug(f'Function {self.__name__} called with args={args} kwargs={kwargs}') - - # update status - id = args[1] - reason = f"Processing failed: args={args} kwargs={kwargs}" - request = ValidationRequest.objects.get(pk=id) - request.mark_as_failed(reason) - - # queue sending email - send_failure_email_task.delay(id=id, file_name=request.file_name) - send_failure_admin_email_task.delay(id=id, file_name=request.file_name) - - -@log_execution -@requires_django_user_context -@transaction.atomic -# @requires_django_exclusive_table_lock(Model, 'EXCLUSIVE') -# --> table lock, slower - DO NOT USE -def get_or_create_ifc_model(request_id): - - id = request_id - request = ValidationRequest.objects.get(pk=id) - if request.model is None: - - # acquire row lock (... uses "FOR UPDATE" hint) - request = ValidationRequest.objects.select_for_update().get(pk=id) - - model, _ = Model.objects.get_or_create( - file_name=request.file_name, - file=request.file, - size=request.file.size, - uploaded_by=request.created_by - ) - request.model = model - request.save() - - return model - - else: - return request.model - - -@shared_task(bind=True) -@log_execution -def ifc_file_validation_task(self, id, file_name, *args, **kwargs): - - if id is None or file_name is None: - raise ValueError("Arguments 'id' and/or 'file_name' are required.") - - error_task = error_handler.s(id, file_name) - chord_error_task = chord_error_handler.s(id, file_name) - - workflow_started = on_workflow_started.s(id, file_name) - workflow_completed = on_workflow_completed.s(id, file_name) - - serial_tasks = chain( - header_syntax_validation_subtask.s(id, file_name), - header_validation_subtask.s(id, file_name), - syntax_validation_subtask.s(id, file_name), - prerequisites_subtask.s(id, file_name), - ) - - parallel_tasks = group([ - digital_signatures_subtask.s(id, file_name), - schema_validation_subtask.s(id, file_name), - #bsdd_validation_subtask.s(id, file_name), # disabled - normative_rules_ia_validation_subtask.s(id, file_name), - normative_rules_ip_validation_subtask.s(id, file_name), - industry_practices_subtask.s(id, file_name) - ]) - - final_tasks = chain( - instance_completion_subtask.s(id, file_name) - ) - - workflow = ( - workflow_started | - serial_tasks | - chord( - chord(parallel_tasks, final_tasks).on_error(chord_error_task), - workflow_completed - ).on_error(chord_error_task)) - workflow.set(link_error=[error_task]) - workflow.apply_async() - - -@shared_task(bind=True) -@log_execution -@requires_django_user_context -@update_progress -def instance_completion_subtask(self, prev_result, id, file_name, *args, **kwargs): - # fetch request info - request = ValidationRequest.objects.get(pk=id) - file_path = get_absolute_file_path(request.file.name) - - # add task - task = ValidationTask.objects.create(request=request, type=ValidationTask.Type.INSTANCE_COMPLETION) - - prev_result_succeeded = prev_result is not None and prev_result[0]['is_valid'] is True - if prev_result_succeeded: - - task.mark_as_initiated() - - try: - ifc_file = ifcopenshell.open(file_path) - except: - reason = f'Failed to open {file_path}. Likely previous tasks also failed.' - task.mark_as_completed(reason) - return {'is_valid': False, 'reason': reason} - - if ifc_file: - - # fetch and update ModelInstance records without ifc_type - with transaction.atomic(): - model_id = request.model.id - model_instances = ModelInstance.objects.filter(model_id=model_id, ifc_type__in=[None, '']) - instance_count = model_instances.count() - logger.info(f'Retrieved {instance_count:,} ModelInstance record(s)') - - for inst in model_instances.iterator(): - inst.ifc_type = ifc_file[inst.stepfile_id].is_a() - inst.save() - - # update Task info and return - reason = f'Updated {instance_count:,} ModelInstance record(s)' - task.mark_as_completed(reason) - return {'is_valid': True, 'reason': reason} - - else: - reason = f'Skipped as prev_result = {prev_result}.' - task.mark_as_skipped(reason) - return {'is_valid': None, 'reason': reason} - - -@shared_task(bind=True) -@log_execution -@requires_django_user_context -@update_progress -def syntax_validation_subtask(self, prev_result, id, file_name, *args, **kwargs): - # fetch request info - request = ValidationRequest.objects.get(pk=id) - file_path = get_absolute_file_path(request.file.name) - - # determine program/script to run - check_program = [sys.executable, "-m", "ifcopenshell.simple_spf", '--json', file_path] - logger.debug(f'Command for {self.__qualname__}: {" ".join(check_program)}') - - # add task - task = ValidationTask.objects.create(request=request, type=ValidationTask.Type.SYNTAX) - task.mark_as_initiated() - - prev_result_succeeded = prev_result is not None and prev_result['is_valid'] is True - if prev_result_succeeded: - # check syntax - try: - - # note: use run instead of Popen b/c PIPE output can be very big... - task.set_process_details(None, check_program) # run() has no pid... - proc = subprocess.run( - check_program, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - universal_newlines=True, - timeout=TASK_TIMEOUT_LIMIT - ) - - # parse output - output = proc.stdout - error_output = proc.stderr - success = (len(list(filter(None, output.split("\n")))) == 0) and len(proc.stderr) == 0 - - with transaction.atomic(): - - # create or retrieve Model info - model = get_or_create_ifc_model(id) - - # update Model info - if success: - model.status_syntax = Model.Status.VALID - task.outcomes.create( - severity=ValidationOutcome.OutcomeSeverity.PASSED, - outcome_code=ValidationOutcome.ValidationOutcomeCode.PASSED, - observed=output if output != '' else None - ) - - elif len(error_output) != 0: - model.status_syntax = Model.Status.INVALID - task.outcomes.create( - severity=ValidationOutcome.OutcomeSeverity.ERROR, - outcome_code=ValidationOutcome.ValidationOutcomeCode.SYNTAX_ERROR, - observed=list(filter(None, proc.stderr.split("\n")))[-1] # last line of traceback - ) - - else: - messages = json.loads(output) - model.status_syntax = Model.Status.INVALID - task.outcomes.create( - severity=ValidationOutcome.OutcomeSeverity.ERROR, - outcome_code=ValidationOutcome.ValidationOutcomeCode.SYNTAX_ERROR, - observed=messages['message'] if 'message' in messages else None - ) - - model.save(update_fields=['status_syntax']) - - # store and return - if success: - reason = "No IFC syntax error(s)." - task.mark_as_completed(reason) - return {'is_valid': True, 'reason': task.status_reason} - else: - reason = f"Found IFC syntax errors:\n\nConsole: \n{output}\n\nError: {error_output}" - task.mark_as_completed(reason) - return {'is_valid': False, 'reason': reason} - - except subprocess.TimeoutExpired as err: - task.mark_as_failed(err) - raise - - except Exception as err: - task.mark_as_failed(err) - raise - - -@shared_task(bind=True) -@log_execution -@requires_django_user_context -@update_progress -def header_syntax_validation_subtask(self, prev_result, id, file_name, *args, **kwargs): - # fetch request info - request = ValidationRequest.objects.get(pk=id) - file_path = get_absolute_file_path(request.file.name) - - # determine program/script to run - check_program = [sys.executable, "-m", "ifcopenshell.simple_spf", '--json', '--only-header', file_path] - logger.debug(f'Command for {self.__qualname__}: {" ".join(check_program)}') - - # add task - task = ValidationTask.objects.create(request=request, type=ValidationTask.Type.HEADER_SYNTAX) - task.mark_as_initiated() - - # check header syntax - try: - - # note: use run instead of Popen b/c PIPE output can be very big... - task.set_process_details(None, check_program) # run() has no pid... - proc = subprocess.run( - check_program, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - universal_newlines=True, - timeout=TASK_TIMEOUT_LIMIT - ) - - # parse output - output = proc.stdout - error_output = proc.stderr - success = (len(list(filter(None, output.split("\n")))) == 0) and len(proc.stderr) == 0 - - with transaction.atomic(): - - # create or retrieve Model info - model = get_or_create_ifc_model(id) - - # update Model info - if success: - model.status_header_syntax = Model.Status.VALID - task.outcomes.create( - severity=ValidationOutcome.OutcomeSeverity.PASSED, - outcome_code=ValidationOutcome.ValidationOutcomeCode.PASSED, - observed=output if output != '' else None - ) - - elif len(error_output) != 0: - model.status_header_syntax = Model.Status.INVALID - task.outcomes.create( - severity=ValidationOutcome.OutcomeSeverity.ERROR, - outcome_code=ValidationOutcome.ValidationOutcomeCode.SYNTAX_ERROR, - observed=list(filter(None, proc.stderr.split("\n")))[-1] # last line of traceback - ) - - else: - messages = json.loads(output) - model.status_header_syntax = Model.Status.INVALID - task.outcomes.create( - severity=ValidationOutcome.OutcomeSeverity.ERROR, - outcome_code=ValidationOutcome.ValidationOutcomeCode.SYNTAX_ERROR, - observed=messages['message'] if 'message' in messages else None - ) - - model.save(update_fields=['status_header_syntax']) - - # store and return - if success: - reason = "No IFC syntax error(s)." - task.mark_as_completed(reason) - return {'is_valid': True, 'reason': task.status_reason} - else: - reason = f"Found IFC syntax errors in header:\n\nConsole: \n{output}\n\nError: {error_output}" - task.mark_as_completed(reason) - return {'is_valid': False, 'reason': reason} - - except subprocess.TimeoutExpired as err: - task.mark_as_failed(err) - raise - - except Exception as err: - task.mark_as_failed(err) - raise - - -@shared_task(bind=True) -@log_execution -@requires_django_user_context -@update_progress -def header_validation_subtask(self, prev_result, id, file_name, *args, **kwargs): - """" - Parses and validates the file header - """ - - # fetch request info - request = ValidationRequest.objects.get(pk=id) - file_path = get_absolute_file_path(request.file.name) - - # add task - task = ValidationTask.objects.create(request=request, type=ValidationTask.Type.HEADER) - - prev_result_succeeded = prev_result is not None and prev_result['is_valid'] is True - if prev_result_succeeded: - task.mark_as_initiated() - # check for header policy - check_script = os.path.join(os.path.dirname(__file__), "checks", "header_policy", "validate_header.py") - - try: - logger.debug(f'before header validation task, path {file_path}, script {check_script} ') - proc = subprocess.run( - [sys.executable, check_script, file_path], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - timeout=TASK_TIMEOUT_LIMIT # Add timeout to prevent infinite hangs - ) - - - if (proc.returncode is not None and proc.returncode != 0) or (len(proc.stderr) > 0): - error_message = f"Running {' '.join(proc.args)} failed with exit code {proc.returncode}\n{proc.stdout}\n{proc.stderr}" - task.mark_as_failed(error_message) - raise RuntimeError(error_message) - - header_validation = {} - stdout_lines = proc.stdout.splitlines() - for line in stdout_lines: - try: - header_validation = json.loads(line) - except json.JSONDecodeError: - continue - - logger.debug(f'header validation output : {header_validation}') - - with transaction.atomic(): - # create or retrieve Model info - model = get_or_create_ifc_model(id) - - # update Model info - agg_status = task.determine_aggregate_status() - model.status_prereq = agg_status - - # size - model.size = os.path.getsize(file_path) - logger.debug(f'Detected size = {model.size} bytes') - - # schema - model.schema = header_validation.get('schema_identifier') - - logger.debug(f'The schema identifier = {header_validation.get("schema")}') - # time_stamp - if ifc_file_time_stamp := header_validation.get('time_stamp', False): - try: - logger.debug(f'Timestamp within file = {ifc_file_time_stamp}') - date = datetime.datetime.strptime(ifc_file_time_stamp, "%Y-%m-%dT%H:%M:%S") - date_with_tz = datetime.datetime( - date.year, - date.month, - date.day, - date.hour, - date.minute, - date.second, - tzinfo=datetime.timezone.utc) - model.date = date_with_tz - except ValueError: - try: - model.date = datetime.datetime.fromisoformat(ifc_file_time_stamp) - except ValueError: - pass - - # mvd - model.mvd = header_validation.get('mvd') - - app = header_validation.get('application_name') - - version = header_validation.get('version') - name = None if any(value in (None, "Not defined") for value in (app, version)) else app + ' ' + version - company_name = header_validation.get('company_name') - logger.debug(f'Detected Authoring Tool in file = {name}') - - validation_errors = header_validation.get('validation_errors', []) - invalid_marker_fields = ['originating_system', 'version', 'company_name', 'application_name'] - if any(field in validation_errors for field in invalid_marker_fields): - model.status_header = Model.Status.INVALID - else: - # parsing was successful and model can be considered for scorecards - model.status_header = Model.Status.VALID - authoring_tool = AuthoringTool.find_by_full_name(full_name=name) - if (isinstance(authoring_tool, AuthoringTool)): - - if authoring_tool.company is None: - company, _ = Company.objects.get_or_create(name=company_name) - authoring_tool.company = company - authoring_tool.save() - logger.debug(f'Updated existing Authoring Tool with company: {company.name}') - - model.produced_by = authoring_tool - logger.debug(f'Retrieved existing Authoring Tool from DB = {model.produced_by.full_name}') - - elif authoring_tool is None: - company, _ = Company.objects.get_or_create(name=company_name) - authoring_tool, _ = AuthoringTool.objects.get_or_create( - company=company, - name=app, - version=version - ) - model.produced_by = authoring_tool - logger.debug(f'Authoring app not found, ApplicationFullName = {app}, Version = {version} - created new instance') - else: - model.produced_by = None - logger.warning(f'Retrieved multiple Authoring Tool from DB: {authoring_tool} - could not assign any') - - # update header validation - model.header_validation = header_validation - model.save(update_fields=['status_header', 'header_validation']) - model.save() - - - # update Task info and return - is_valid = agg_status != Model.Status.INVALID - reason = f'agg_status = {Model.Status(agg_status).label}\nraw_output = {header_validation}' - task.mark_as_completed(reason) - return {'is_valid': is_valid, 'reason': reason} - - except subprocess.TimeoutExpired as err: - task.mark_as_failed(err) - raise - except IntegrityError as err: - task.mark_as_failed(err) - raise - except Exception as err: - task.mark_as_failed(err) - raise - else: - reason = f'Skipped as prev_result = {prev_result}.' - task.mark_as_skipped(reason) - return {'is_valid': None, 'reason': reason} - - -@shared_task(bind=True) -@log_execution -@requires_django_user_context -@update_progress -def prerequisites_subtask(self, prev_result, id, file_name, *args, **kwargs): - - # fetch request info - request = ValidationRequest.objects.get(pk=id) - file_path = get_absolute_file_path(request.file.name) - - # add task - task = ValidationTask.objects.create(request=request, type=ValidationTask.Type.PREREQUISITES) - - prev_result_succeeded = prev_result is not None and prev_result['is_valid'] is True - if prev_result_succeeded: - - task.mark_as_initiated() - - # determine program/script to run - check_script = os.path.join(os.path.dirname(__file__), "checks", "check_gherkin.py") - check_program = [sys.executable, check_script, '--file-name', file_path, '--task-id', str(task.id), '--rule-type', 'CRITICAL', "--purepythonparser"] - logger.debug(f'Command for {self.__qualname__}: {" ".join(check_program)}') - logger.debug(f"gherkin log path : {os.path.join(os.getenv('Django_LOG_FOLDER', 'logs'), 'gherkin_rules.log')}") - logger.debug(f"Log folder exists and writable: {os.access(os.getenv('Django_LOG_FOLDER', 'logs'), os.W_OK)}") - - # check Gherkin IP - try: - # note: use run instead of Popen b/c PIPE output can be very big... - proc = subprocess.run( - check_program, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - universal_newlines=True, - timeout=TASK_TIMEOUT_LIMIT, - env=os.environ.copy() - ) - task.set_process_details(None, check_program) # run() has no pid... - - except subprocess.TimeoutExpired as err: - task.mark_as_failed(err) - raise - - except Exception as err: - task.mark_as_failed(err) - raise - - # @nb previously we also checked for: - # or (len(proc.stderr) > 0): - # - # but I now get warnings: - # - # - features/environment.py:86: ContextMaskWarning: user code is masking context attribute 'gherkin_outcomes'; - # see the tutorial for what this means - if proc.returncode is not None and proc.returncode != 0: - error_message = f"Running {' '.join(proc.args)} failed with exit code {proc.returncode}\n{proc.stdout}\n{proc.stderr}" - task.mark_as_failed(error_message) - raise RuntimeError(error_message) - - raw_output = proc.stdout - - with transaction.atomic(): - - # create or retrieve Model info - model = get_or_create_ifc_model(id) - - # update Model info - agg_status = task.determine_aggregate_status() - model.status_prereq = agg_status - model.save(update_fields=['status_prereq']) - - # update Task info and return - is_valid = agg_status != Model.Status.INVALID - reason = f'agg_status = {Model.Status(agg_status).label}\nraw_output = {raw_output}' - task.mark_as_completed(reason) - return {'is_valid': is_valid, 'reason': reason} - - else: - reason = f'Skipped as prev_result = {prev_result}.' - task.mark_as_skipped(reason) - return {'is_valid': None, 'reason': reason} - - -@shared_task(bind=True) -@log_execution -@requires_django_user_context -@update_progress -def schema_validation_subtask(self, prev_result, id, file_name, *args, **kwargs): - - # fetch request info - request = ValidationRequest.objects.get(pk=id) - file_path = get_absolute_file_path(request.file.name) - - # add task - task = ValidationTask.objects.create(request=request, type=ValidationTask.Type.SCHEMA) - - # TODO: revisit schema validation task, perhaps change order of flow? - prev_result_succeeded = prev_result is not None and (prev_result['is_valid'] is True or 'Unsupported schema' in prev_result['reason']) - if prev_result_succeeded: - - task.mark_as_initiated() - - # determine program/script to run - check_program = [sys.executable, '-m', 'ifcopenshell.validate', '--json', '--rules', '--fields', file_path] - logger.debug(f'Command for {self.__qualname__}: {" ".join(check_program)}') - - # check schema - try: - # note: use run instead of Popen b/c PIPE output can be very big... - proc = subprocess.run( - check_program, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - universal_newlines=True, - timeout=TASK_TIMEOUT_LIMIT - ) - task.set_process_details(None, check_program) # run() has no pid... - except subprocess.TimeoutExpired as err: - task.mark_as_failed(err) - raise - except ifcopenshell.Error as err: - # logical validation error OR C++ errors - task.mark_as_failed(err) - pass - except Exception as err: - task.mark_as_failed(err) - raise - - # schema check returns either multiple JSON lines, or a single line message, or nothing. - def is_schema_error(line): - try: - json.loads(line) # ignoring non-JSON messages - except ValueError: - return False - return True - - output = list(filter(is_schema_error, proc.stdout.split("\n"))) - # success = (len(output) == 0) - # tfk: if we mark this task as failed we don't do the instance population either. - # marking as failed should probably be reserved for blocking errors (prerequisites) - # and internal errors and differentiate between valid and task_success. - success = proc.returncode >= 0 - valid = (len(output) == 0) - - with transaction.atomic(): - - # create or retrieve Model info - model = get_or_create_ifc_model(id) - - # update Model and Validation Outcomes - if valid: - model.status_schema = Model.Status.VALID - task.outcomes.create( - severity=ValidationOutcome.OutcomeSeverity.PASSED, - outcome_code=ValidationOutcome.ValidationOutcomeCode.PASSED, - observed=None - ) - else: - outcomes_to_save = list() - outcomes_instances_to_save = list() - - for line in output: - message = json.loads(line) - model.status_schema = Model.Status.INVALID - outcome = ValidationOutcome( - severity=ValidationOutcome.OutcomeSeverity.ERROR, - outcome_code=ValidationOutcome.ValidationOutcomeCode.SCHEMA_ERROR, - observed=message['message'], - feature=json.dumps({ - 'type': message['type'] if 'type' in message else None, - 'attribute': message['attribute'] if 'attribute' in message else None - }) - ) - outcome.validation_task = task - outcomes_to_save.append(outcome) - - if 'instance' in message and message['instance'] is not None and 'id' in message['instance'] and 'type' in message['instance']: - instance = ModelInstance( - stepfile_id=message['instance']['id'], - ifc_type=message['instance']['type'], - model=model - ) - outcome.instance_id = instance.stepfile_id # store for later reference (not persisted) - outcomes_instances_to_save.append(instance) - - ModelInstance.objects.bulk_create(outcomes_instances_to_save, batch_size=DJANGO_DB_BULK_CREATE_BATCH_SIZE, ignore_conflicts=True) # ignore existing - model_instances = dict(ModelInstance.objects.filter(model_id=model.id).values_list('stepfile_id', 'id')) # retrieve all - - for outcome in outcomes_to_save: - if outcome.instance_id: - instance_id = model_instances[outcome.instance_id] - if instance_id: - outcome.instance_id = instance_id - - ValidationOutcome.objects.bulk_create(outcomes_to_save, batch_size=DJANGO_DB_BULK_CREATE_BATCH_SIZE) - - model.save(update_fields=['status_schema']) - - # return - if success: - reason = 'No IFC schema errors.' - task.mark_as_completed(reason) - return {'is_valid': True, 'reason': reason} - else: - reason = f"'ifcopenshell.validate' returned exit code {proc.returncode} and {len(output):,} errors : {output}" - task.mark_as_completed(reason) - return {'is_valid': False, 'reason': reason} - - else: - reason = f'Skipped as prev_result = {prev_result}.' - task.mark_as_skipped(reason) - return {'is_valid': None, 'reason': reason} - - -@shared_task(bind=True) -@log_execution -@requires_django_user_context -@update_progress -def digital_signatures_subtask(self, prev_result, id, file_name, *args, **kwargs): - - # fetch request info - request = ValidationRequest.objects.get(pk=id) - file_path = get_absolute_file_path(request.file.name) - - # add task - task = ValidationTask.objects.create(request=request, type=ValidationTask.Type.DIGITAL_SIGNATURES) - - prev_result_succeeded = prev_result is not None and prev_result['is_valid'] is True - if prev_result_succeeded: - - task.mark_as_initiated() - - # determine program/script to run - check_script = os.path.join(os.path.dirname(__file__), "checks", "signatures", "check_signatures.py") - check_program = [sys.executable, check_script, file_path] - logger.debug(f'Command for {self.__qualname__}: {" ".join(check_program)}') - - # check signatures - try: - # note: use run instead of Popen b/c PIPE output can be very big... - proc = subprocess.run( - check_program, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - universal_newlines=True, - timeout=TASK_TIMEOUT_LIMIT - ) - task.set_process_details(None, check_program) # run() has no pid... - except subprocess.TimeoutExpired as err: - task.mark_as_failed(err) - raise - except Exception as err: - task.mark_as_failed(err) - raise - - output = list(map(json.loads, filter(None, map(lambda s: s.strip(), proc.stdout.split("\n"))))) - success = proc.returncode >= 0 - valid = all(m['signature'] != "invalid" for m in output) - - with transaction.atomic(): - - # create or retrieve Model info - model = get_or_create_ifc_model(id) - model.status_signatures = Model.Status.NOT_APPLICABLE if not output else Model.Status.VALID if valid else Model.Status.INVALID - - def create_outcome(di): - return ValidationOutcome( - severity=ValidationOutcome.OutcomeSeverity.ERROR if di.get("signature") == "invalid" else ValidationOutcome.OutcomeSeverity.PASSED, - outcome_code=ValidationOutcome.ValidationOutcomeCode.VALUE_ERROR if di.get("signature") == "invalid" else ValidationOutcome.ValidationOutcomeCode.PASSED, - observed=di, - feature=json.dumps({'digital_signature': 1}), - validation_task = task - ) - - ValidationOutcome.objects.bulk_create(list(map(create_outcome, output)), batch_size=DJANGO_DB_BULK_CREATE_BATCH_SIZE) - - model.save(update_fields=['status_signatures']) - - if success: - reason = 'Digital signature check completed' - task.mark_as_completed(reason) - return {'is_valid': True, 'reason': reason} - else: - reason = f"Script returned exit code {proc.returncode} and {proc.stderr}" - task.mark_as_completed(reason) - return {'is_valid': False, 'reason': reason} - - else: - reason = f'Skipped as prev_result = {prev_result}.' - task.mark_as_skipped(reason) - return {'is_valid': None, 'reason': reason} - - -@shared_task(bind=True) -@log_execution -@requires_django_user_context -@update_progress -def bsdd_validation_subtask(self, prev_result, id, file_name, *args, **kwargs): - - # fetch request info - request = ValidationRequest.objects.get(pk=id) - file_path = get_absolute_file_path(request.file.name) - - # add task - task = ValidationTask.objects.create(request=request, type=ValidationTask.Type.BSDD) - - prev_result_succeeded = prev_result is not None and prev_result['is_valid'] is True - if prev_result_succeeded: - - task.mark_as_initiated() - - # determine program/script to run - check_script = os.path.join(os.path.dirname(__file__), "checks", "check_bsdd.py") - check_program = [sys.executable, check_script, '--file-name', file_path, '--task-id', str(id)] - logger.debug(f'Command for {self.__qualname__}: {" ".join(check_program)}') - - # check bSDD - try: - # note: use run instead of Popen b/c PIPE output can be very big... - proc = subprocess.run( - check_program, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - universal_newlines=True, - timeout=TASK_TIMEOUT_LIMIT, - env=os.environ.copy() - ) - task.set_process_details(None, check_program) # run() has no pid... - - except subprocess.TimeoutExpired as err: - task.mark_as_failed(err) - raise - - except Exception as err: - task.mark_as_failed(err) - raise - - if proc.returncode is not None and proc.returncode != 0: - error_message = f"Running {' '.join(proc.args)} failed with exit code {proc.returncode}\n{proc.stdout}\n{proc.stderr}" - task.mark_as_failed(error_message) - raise RuntimeError(error_message) - - raw_output = proc.stdout - - logger.info(f'Output for {self.__name__}: {raw_output}') - - with transaction.atomic(): - - # create or retrieve Model info - model = get_or_create_ifc_model(id) - - # update Validation Outcomes - json_output = json.loads(raw_output) - for message in json_output['messages']: - - outcome = task.outcomes.create( - severity=[c[0] for c in ValidationOutcome.OutcomeSeverity.choices if c[1] == (message['severity'])][0], - outcome_code=[c[0] for c in ValidationOutcome.ValidationOutcomeCode.choices if c[1] == (message['outcome'])][0], - observed=message['message'], - feature=json.dumps({ - 'rule': message['rule'] if 'rule' in message else None, - 'category': message['category'] if 'category' in message else None, - 'dictionary': message['dictionary'] if 'dictionary' in message else None, - 'class': message['class'] if 'class' in message else None, - 'instance_id': message['instance_id'] if 'instance_id' in message else None - }) - ) - - if 'instance_id' in message and message['instance_id'] is not None: - instance, _ = model.instances.get_or_create( - stepfile_id = message['instance_id'], - model=model - ) - outcome.instance = instance - outcome.save() - - # update Model info - agg_status = task.determine_aggregate_status() - model.status_bsdd = agg_status - model.save(update_fields=['status_bsdd']) - - # update Task info and return - is_valid = agg_status != Model.Status.INVALID - reason = f"agg_status = {Model.Status(agg_status).label}\nmessages = {json_output['messages']}" - task.mark_as_completed(reason) - return {'is_valid': is_valid, 'reason': reason} - - else: - reason = f'Skipped as prev_result = {prev_result}.' - task.mark_as_skipped(reason) - return {'is_valid': None, 'reason': reason} - - -@shared_task(bind=True) -@log_execution -@requires_django_user_context -@update_progress -def normative_rules_ia_validation_subtask(self, prev_result, id, file_name, *args, **kwargs): - - # fetch request info - request = ValidationRequest.objects.get(pk=id) - file_path = get_absolute_file_path(request.file.name) - - # add task - task = ValidationTask.objects.create(request=request, type=ValidationTask.Type.NORMATIVE_IA) - - prev_result_succeeded = prev_result is not None and prev_result['is_valid'] is True - if prev_result_succeeded: - - task.mark_as_initiated() - - # determine program/script to run - check_script = os.path.join(os.path.dirname(__file__), "checks", "check_gherkin.py") - check_program = [sys.executable, check_script, '--file-name', file_path, '--task-id', str(task.id), '--rule-type', 'IMPLEMENTER_AGREEMENT'] - logger.debug(f'Command for {self.__qualname__}: {" ".join(check_program)}') - - # check Gherkin IA - try: - # note: use run instead of Popen b/c PIPE output can be very big... - proc = subprocess.run( - check_program, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - universal_newlines=True, - timeout=TASK_TIMEOUT_LIMIT, - env=os.environ.copy() - ) - task.set_process_details(None, check_program) # run() has no pid... - - except subprocess.TimeoutExpired as err: - task.mark_as_failed(err) - raise - - except Exception as err: - task.mark_as_failed(err) - raise - - if proc.returncode is not None and proc.returncode != 0: - error_message = f"Running {' '.join(proc.args)} failed with exit code {proc.returncode}\n{proc.stdout}\n{proc.stderr}" - task.mark_as_failed(error_message) - raise RuntimeError(error_message) - - raw_output = proc.stdout - - with transaction.atomic(): - - # create or retrieve Model info - model = get_or_create_ifc_model(id) - - # update Model info - agg_status = task.determine_aggregate_status() - logger.debug(f'Aggregate status for {self.__qualname__}: {agg_status}') - model.status_ia = agg_status - model.save(update_fields=['status_ia']) - - # update Task info and return - is_valid = agg_status != Model.Status.INVALID - reason = f'agg_status = {Model.Status(agg_status).label}\nraw_output = {raw_output}' - task.mark_as_completed(reason) - return {'is_valid': is_valid, 'reason': reason} - - else: - reason = f'Skipped as prev_result = {prev_result}.' - task.mark_as_skipped(reason) - return {'is_valid': None, 'reason': reason} - - -@shared_task(bind=True) -@log_execution -@requires_django_user_context -@update_progress -def normative_rules_ip_validation_subtask(self, prev_result, id, file_name, *args, **kwargs): - - # fetch request info - request = ValidationRequest.objects.get(pk=id) - file_path = get_absolute_file_path(request.file.name) - - # add task - task = ValidationTask.objects.create(request=request, type=ValidationTask.Type.NORMATIVE_IP) - - prev_result_succeeded = prev_result is not None and prev_result['is_valid'] is True - if prev_result_succeeded: - - task.mark_as_initiated() - - # determine program/script to run - check_script = os.path.join(os.path.dirname(__file__), "checks", "check_gherkin.py") - check_program = [sys.executable, check_script, '--file-name', file_path, '--task-id', str(task.id), '--rule-type', 'INFORMAL_PROPOSITION'] - logger.debug(f'Command for {self.__qualname__}: {" ".join(check_program)}') - - # check Gherkin IP - try: - # note: use run instead of Popen b/c PIPE output can be very big... - proc = subprocess.run( - check_program, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - universal_newlines=True, - timeout=TASK_TIMEOUT_LIMIT - ) - task.set_process_details(None, check_program) # run() has no pid... - - except subprocess.TimeoutExpired as err: - task.mark_as_failed(err) - raise - - except Exception as err: - task.mark_as_failed(err) - raise - - if proc.returncode is not None and proc.returncode != 0: - error_message = f"Running {' '.join(proc.args)} failed with exit code {proc.returncode}\n{proc.stdout}\n{proc.stderr}" - task.mark_as_failed(error_message) - raise RuntimeError(error_message) - - raw_output = proc.stdout - - with transaction.atomic(): - - # create or retrieve Model info - model = get_or_create_ifc_model(id) - - # update Model info - agg_status = task.determine_aggregate_status() - model.status_ip = agg_status - model.save(update_fields=['status_ip']) - - # update Task info and return - is_valid = agg_status != Model.Status.INVALID - reason = f'agg_status = {Model.Status(agg_status).label}\nraw_output = {raw_output}' - task.mark_as_completed(reason) - return {'is_valid': is_valid, 'reason': reason} - - else: - reason = f'Skipped as prev_result = {prev_result}.' - task.mark_as_skipped(reason) - return {'is_valid': None, 'reason': reason} - - -@shared_task(bind=True) -@log_execution -@requires_django_user_context -@update_progress -def industry_practices_subtask(self, prev_result, id, file_name, *args, **kwargs): - - # fetch request info - request = ValidationRequest.objects.get(pk=id) - file_path = get_absolute_file_path(request.file.name) - - # add task - task = ValidationTask.objects.create(request=request, type=ValidationTask.Type.INDUSTRY_PRACTICES) - - prev_result_succeeded = prev_result is not None and prev_result['is_valid'] is True - if prev_result_succeeded: - - task.mark_as_initiated() - - # determine program/script to run - check_script = os.path.join(os.path.dirname(__file__), "checks", "check_gherkin.py") - check_program = [sys.executable, check_script, '--file-name', file_path, '--task-id', str(task.id), '--rule-type', 'INDUSTRY_PRACTICE'] - logger.debug(f'Command for {self.__qualname__}: {" ".join(check_program)}') - - # check Gherkin IP - try: - # note: use run instead of Popen b/c PIPE output can be very big... - proc = subprocess.run( - check_program, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - universal_newlines=True, - timeout=TASK_TIMEOUT_LIMIT - ) - task.set_process_details(None, check_program) # run() has no pid... - - except subprocess.TimeoutExpired as err: - task.mark_as_failed(err) - raise - - except Exception as err: - task.mark_as_failed(err) - raise - - if proc.returncode is not None and proc.returncode != 0: - error_message = f"Running {' '.join(proc.args)} failed with exit code {proc.returncode}\n{proc.stdout}\n{proc.stderr}" - task.mark_as_failed(error_message) - raise RuntimeError(error_message) - - raw_output = proc.stdout - - with transaction.atomic(): - - # create or retrieve Model info - model = get_or_create_ifc_model(id) - - # update Model info - agg_status = task.determine_aggregate_status() - model.status_industry_practices = agg_status - model.save(update_fields=['status_industry_practices']) - - # update Task info and return - is_valid = agg_status != Model.Status.INVALID - reason = f'agg_status = {Model.Status(agg_status).label}\nraw_output = {raw_output}' - task.mark_as_completed(reason) - return {'is_valid': is_valid, 'reason': reason} - - else: - reason = f'Skipped as prev_result = {prev_result}.' - task.mark_as_skipped(reason) - return {'is_valid': None, 'reason': reason} diff --git a/backend/apps/ifc_validation/tasks/__init__.py b/backend/apps/ifc_validation/tasks/__init__.py new file mode 100644 index 00000000..603604bb --- /dev/null +++ b/backend/apps/ifc_validation/tasks/__init__.py @@ -0,0 +1,31 @@ +from .context import TaskContext +from .utils import with_model, get_absolute_file_path, get_or_create_ifc_model +from .logger import logger + +from .task_runner import ( + ifc_file_validation_task, + header_syntax_validation_subtask, + header_validation_subtask, + syntax_validation_subtask, + prerequisites_subtask, + schema_validation_subtask, + normative_rules_ia_validation_subtask, + normative_rules_ip_validation_subtask, + bsdd_validation_subtask, + industry_practices_subtask, + instance_completion_subtask +) + +__all__ = [ + "ifc_file_validation_task", + "header_syntax_validation_subtask", + "header_validation_subtask", + "syntax_validation_subtask", + "prerequisites_subtask", + "schema_validation_subtask", + "bsdd_validation_subtask", + "normative_rules_ia_validation_subtask", + "normative_rules_ip_validation_subtask", + "industry_practices_subtask", + "instance_completion_subtask", +] \ No newline at end of file diff --git a/backend/apps/ifc_validation/tasks/check_programs.py b/backend/apps/ifc_validation/tasks/check_programs.py new file mode 100644 index 00000000..57cdf154 --- /dev/null +++ b/backend/apps/ifc_validation/tasks/check_programs.py @@ -0,0 +1,197 @@ +import os +import sys +import json +import subprocess +from typing import List + +from apps.ifc_validation_models.settings import TASK_TIMEOUT_LIMIT +from apps.ifc_validation_models.models import ValidationTask + +from .logger import logger +from .context import TaskContext + +checks_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "checks")) + +def check_syntax(context:TaskContext): + proc = run_subprocess(context.task, [sys.executable, "-m", "ifcopenshell.simple_spf", "--json", context.file_path ]) + output = proc.stdout + error_output = proc.stderr + success = (len(list(filter(None, output.split("\n")))) == 0) and len(error_output) == 0 + context.result = { + 'output': proc.stdout, + 'error_output': proc.stderr, + 'success': success + } + return context + +def check_header_syntax(context:TaskContext): + proc = run_subprocess(context.task, [sys.executable, "-m", "ifcopenshell.simple_spf", "--json", "--only-header", context.file_path]) + output = proc.stdout + error_output = proc.stderr + success = (len(list(filter(None, output.split("\n")))) == 0) and len(error_output) == 0 + context.result = { + 'output': proc.stdout, + 'error_output': proc.stderr, + 'success': success + } + return context + +def is_schema_error(line): + try: + json.loads(line) + except ValueError: + return False + return True + +def check_schema(context:TaskContext): + proc = run_subprocess( + task = context.task, + command = [sys.executable, "-m", "ifcopenshell.validate", "--json", "--rules", "--fields", context.file_path ] + ) + output = list(filter(is_schema_error, proc.stdout.split("\n"))) + success = proc.returncode >= 0 + valid = len(output) == 0 + + context.result = { + 'output': output, + 'success': success, + 'valid': valid + } + return context + + +def check_header(context:TaskContext): + proc = run_subprocess( + task=context.task, + command=[sys.executable, os.path.join(checks_dir, "header_policy", "validate_header.py"), context.file_path] + ) + header_validation = {} + for line in proc.stdout.splitlines(): + try: + header_validation = json.loads(line) + except json.JSONDecodeError: + continue + context.result = header_validation + return context + + +def check_digital_signatures(context:TaskContext): + proc = run_subprocess( + task=context.task, + command=[sys.executable, os.path.join(checks_dir, "signatures", "check_signatures.py"), context.file_path] + ) + output = list(map(json.loads, filter(None, map(lambda s: s.strip(), proc.stdout.split("\n"))))) + success = proc.returncode >= 0 + valid = all(m['signature'] != "invalid" for m in output) + + context.result = { + 'output': output, + 'success': success, + 'valid': valid + } + return context + + +def check_bsdd(context:TaskContext): + proc = run_subprocess( + task=context.task, + command=[sys.executable, os.path.join(checks_dir, "check_bsdd.py"), "-file-name", context.file_path, "--task-id", str(context.task.id) ] + ) + raw_output = check_proc_success_or_fail(proc, context.task) + logger.info(f'Output for {context.config.type}: {raw_output}') + context.result = raw_output + return context + +def check_prerequisites(context:TaskContext): + proc = run_subprocess( + task=context.task, + command = [ + sys.executable, + os.path.join(checks_dir, "check_gherkin.py"), + "--file-name", context.file_path, + "--task-id", str(context.task.id), + "--rule-type", "CRITICAL", + "--purepythonparser" + ] + ) + raw_output = check_proc_success_or_fail(proc, context.task) + context.result = raw_output + return context + +def check_normative_ia(context:TaskContext): + proc = run_subprocess( + task=context.task, + command = [ + sys.executable, + os.path.join(checks_dir, "check_gherkin.py"), + "--file-name", context.file_path, + "--task-id", str(context.task.id), + "--rule-type", "IMPLEMENTER_AGREEMENT" + ] + ) + raw_output = check_proc_success_or_fail(proc, context.task) + context.result = raw_output + return context + +def check_normative_ip(context:TaskContext): + proc = run_subprocess( + task=context.task, + command = [ + sys.executable, + os.path.join(checks_dir, "check_gherkin.py"), + "--file-name", context.file_path, + "--task-id", str(context.task.id), + "--rule-type", "INFORMAL_PROPOSITION" + ] + ) + raw_output = check_proc_success_or_fail(proc, context.task) + context.result = raw_output + return context + +def check_industry_practices(context:TaskContext): + proc = run_subprocess( + task=context.task, + command = [ + sys.executable, + os.path.join(checks_dir, "check_gherkin.py"), + "--file-name", context.file_path, + "--task-id", str(context.task.id), + "--rule-type", "INDUSTRY_PRACTICE" + ] + ) + raw_output = check_proc_success_or_fail(proc, context.task) + context.result = raw_output + return context + +def check_instance_completion(context:TaskContext): + return context + +def check_proc_success_or_fail(proc, task): + if proc.returncode is not None and proc.returncode != 0: + error_message = f"Running {' '.join(proc.args)} failed with exit code {proc.returncode}\n{proc.stdout}\n{proc.stderr}" + task.mark_as_failed(error_message) + raise RuntimeError(error_message) + return proc.stdout + +def run_subprocess( + task: ValidationTask, + command: List[str], +) -> subprocess.CompletedProcess[str]: + logger.debug(f'Command for {task.type}: {" ".join(command)}') + task.set_process_details(None, command) + try: + proc = subprocess.run( + command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + timeout=TASK_TIMEOUT_LIMIT, + env= os.environ.copy() + ) + logger.info(f'test run task task name {task.type}, task value : {task}') + return proc + + except Exception as err: + logger.exception(f"{type(err).__name__} in task {task.id} : {task.type}") + task.mark_as_failed(err) + raise type(err)(f"Unknown error during validation task {task.id}: {task.type}") from err \ No newline at end of file diff --git a/backend/apps/ifc_validation/tasks/configs.py b/backend/apps/ifc_validation/tasks/configs.py new file mode 100644 index 00000000..029b9927 --- /dev/null +++ b/backend/apps/ifc_validation/tasks/configs.py @@ -0,0 +1,108 @@ +from dataclasses import dataclass +from typing import List, Optional, Callable +from apps.ifc_validation_models.models import ValidationTask, Model +from . import check_programs # execution layer +from . import processing # processing layer + +@dataclass +class TaskConfig: + type: str + increment: int + status_field: Optional[str] + check_program: Callable[[str, int], list] + blocks: Optional[List[str]] + execution_stage: str = "parallel" + process_results: Callable | None = None + + @property + def celery_task_name(self) -> str: + return f"apps.ifc_validation.tasks.{self.type.name.lower()}_subtask" + +# create blueprint +def make_task(*, type, increment, field=None, stage="parallel"): + def _load_function(module, prefix, type): + func_name = f"{prefix}_{type.name.lower()}" + try: + return getattr(module, func_name) + except AttributeError: + raise ImportError( + f"Missing `{prefix}` function for task type '{type.name}'. " + f"Expected `{func_name}()` in `{module.__name__}.py`." + ) from None + + check_program = _load_function(check_programs, "check", type) + process_results = _load_function(processing, "process", type) + + return TaskConfig( + type=type, + increment=increment, + status_field=Model._meta.get_field(field) if field else None, + check_program=check_program, + blocks=[], + execution_stage=stage, + process_results = process_results + ) + +# define task info +header_syntax = make_task(type=ValidationTask.Type.HEADER_SYNTAX, increment=5, field='status_header_syntax', stage="serial") +header = make_task(type=ValidationTask.Type.HEADER, increment=10, field='status_header', stage="serial") +syntax = make_task(type=ValidationTask.Type.SYNTAX, increment=5, field='status_syntax', stage="serial") +prerequisites = make_task(type=ValidationTask.Type.PREREQUISITES, increment=10, field='status_prereq', stage="serial") +schema = make_task(type=ValidationTask.Type.SCHEMA, increment=10, field='status_schema') +digital_signatures = make_task(type=ValidationTask.Type.DIGITAL_SIGNATURES, increment=5, field='status_signatures') +bsdd = make_task(type=ValidationTask.Type.BSDD, increment=0, field='status_bsdd') +normative_ia = make_task(type=ValidationTask.Type.NORMATIVE_IA, increment=20, field='status_ia') +normative_ip = make_task(type=ValidationTask.Type.NORMATIVE_IP, increment=20, field='status_ip') +industry_practices = make_task(type=ValidationTask.Type.INDUSTRY_PRACTICES, increment=10, field='status_industry_practices') +instance_completion = make_task(type=ValidationTask.Type.INSTANCE_COMPLETION, increment=5, field=None, stage="final") + +# block tasks on error +post_tasks = [digital_signatures, schema, normative_ia, normative_ip, industry_practices, instance_completion] +header_syntax.blocks = [header, syntax, prerequisites] + post_tasks +syntax.blocks = post_tasks.copy() +prerequisites.blocks = post_tasks.copy() + +# register +ALL_TASKS = [ + header_syntax, header, syntax, prerequisites, + schema, digital_signatures, bsdd, + normative_ia, normative_ip, industry_practices, instance_completion, +] +class TaskRegistry: + def __init__(self, config_map: dict[str, TaskConfig]): + self._configs = config_map + self._by_task_type = {cfg.type: name for name, cfg in config_map.items()} + self._by_task_type_name = {cfg.type.name: name for name, cfg in config_map.items()} + + def get_config_by_celery_name(self, name: str) -> TaskConfig: + return self._configs[self.get_task_type_from_celery_name(name)] + + def get_celery_name_by_task_type(self, task_type: ValidationTask.Type) -> str: + return self._by_task_type.get(task_type) + + def get_celery_name_by_task_type_name(self, task_type_name: str) -> str: + return self._by_task_type_name.get(task_type_name) + + def get_blocked_tasks(self, task_type: ValidationTask.Type) -> List[TaskConfig]: + return self._configs[task_type].blocks or [] + + def get_tasks_by_stage(self, stage: str) -> List[str]: + return [cfg for cfg in self._configs.values() if cfg.execution_stage == stage] + + def __getitem__(self, task_type: ValidationTask.Type) -> TaskConfig: + return self._configs[task_type] + + def get_blockers_of(self, task_type: ValidationTask.Type) -> List[ValidationTask.Type]: + return [ + blocker_type + for blocker_type, cfg in self._configs.items() + if any(block.type == task_type for block in cfg.blocks or []) + ] + + def all(self) -> dict[str, TaskConfig]: + return self._configs + + def total_increment(self) -> int: + return sum(cfg.increment for cfg in self._configs.values()) + +task_registry = TaskRegistry({task.type: task for task in ALL_TASKS}) \ No newline at end of file diff --git a/backend/apps/ifc_validation/tasks/context.py b/backend/apps/ifc_validation/tasks/context.py new file mode 100644 index 00000000..ae918bb5 --- /dev/null +++ b/backend/apps/ifc_validation/tasks/context.py @@ -0,0 +1,11 @@ +from typing import Any, Optional +from dataclasses import dataclass +from apps.ifc_validation_models.models import ValidationRequest, ValidationTask + +@dataclass # moving context from exeuction to processing layer +class TaskContext: + config: Any # Static info -- hould be TaskConfig — delayed import due to modular imports + request: ValidationRequest # the current request + task: ValidationTask #the current task + file_path: str # for IFC files + result: Optional[Any] = None # result from execution layer diff --git a/backend/apps/ifc_validation/email_tasks.py b/backend/apps/ifc_validation/tasks/email_tasks.py similarity index 99% rename from backend/apps/ifc_validation/email_tasks.py rename to backend/apps/ifc_validation/tasks/email_tasks.py index e53551ec..49084122 100644 --- a/backend/apps/ifc_validation/email_tasks.py +++ b/backend/apps/ifc_validation/tasks/email_tasks.py @@ -1,5 +1,4 @@ from celery import shared_task -from celery.utils.log import get_task_logger from django.template.loader import render_to_string from core.utils import log_execution @@ -9,9 +8,6 @@ from apps.ifc_validation_models.models import ValidationRequest -logger = get_task_logger(__name__) - - def status_combine(*args): statuses = "-pvnwi" return statuses[max(map(statuses.index, args))] diff --git a/backend/apps/ifc_validation/tasks/logger.py b/backend/apps/ifc_validation/tasks/logger.py new file mode 100644 index 00000000..cf11ea61 --- /dev/null +++ b/backend/apps/ifc_validation/tasks/logger.py @@ -0,0 +1,2 @@ +from celery.utils.log import get_task_logger +logger = get_task_logger("ifc_validation") \ No newline at end of file diff --git a/backend/apps/ifc_validation/tasks/processing/__init__.py b/backend/apps/ifc_validation/tasks/processing/__init__.py new file mode 100644 index 00000000..40fea8ea --- /dev/null +++ b/backend/apps/ifc_validation/tasks/processing/__init__.py @@ -0,0 +1,13 @@ +from .instance_completion import process_instance_completion +from .gherkin import ( + process_gherkin_outcomes, + process_normative_ia, + process_normative_ip, + process_prerequisites, + process_industry_practices +) +from .syntax import process_syntax, process_header_syntax +from .schema import process_schema +from .header import process_header +from .digital_signatures import process_digital_signatures +from .bsdd import process_bsdd \ No newline at end of file diff --git a/backend/apps/ifc_validation/tasks/processing/bsdd.py b/backend/apps/ifc_validation/tasks/processing/bsdd.py new file mode 100644 index 00000000..b5990ef3 --- /dev/null +++ b/backend/apps/ifc_validation/tasks/processing/bsdd.py @@ -0,0 +1,42 @@ + +import json + +from apps.ifc_validation_models.models import Model, ValidationOutcome +from .. import TaskContext, logger, with_model + + +def process_bsdd(context:TaskContext): + + with with_model(context.request.id) as model: + + # update Validation Outcomes + json_output = json.loads(context.result) + for message in json_output['messages']: + + outcome = context.task.outcomes.create( + severity=[c[0] for c in ValidationOutcome.OutcomeSeverity.choices if c[1] == (message['severity'])][0], + outcome_code=[c[0] for c in ValidationOutcome.ValidationOutcomeCode.choices if c[1] == (message['outcome'])][0], + observed=message['message'], + feature=json.dumps({ + 'rule': message['rule'] if 'rule' in message else None, + 'category': message['category'] if 'category' in message else None, + 'dictionary': message['dictionary'] if 'dictionary' in message else None, + 'class': message['class'] if 'class' in message else None, + 'instance_id': message['instance_id'] if 'instance_id' in message else None + }) + ) + + if 'instance_id' in message and message['instance_id'] is not None: + instance, _ = model.instances.get_or_create( + stepfile_id = message['instance_id'], + model=model + ) + outcome.instance = instance + outcome.save() + + # update Model info + agg_status = context.task.determine_aggregate_status() + model.status_bsdd = agg_status + model.save(update_fields=['status_bsdd']) + + return f"agg_status = {Model.Status(agg_status).label}\nmessages = {json_output['messages']}" \ No newline at end of file diff --git a/backend/apps/ifc_validation/tasks/processing/digital_signatures.py b/backend/apps/ifc_validation/tasks/processing/digital_signatures.py new file mode 100644 index 00000000..231ee109 --- /dev/null +++ b/backend/apps/ifc_validation/tasks/processing/digital_signatures.py @@ -0,0 +1,28 @@ + +import json + +from core.settings import DJANGO_DB_BULK_CREATE_BATCH_SIZE + +from apps.ifc_validation_models.models import Model, ValidationOutcome +from .. import TaskContext, logger, with_model + + +def process_digital_signatures(context:TaskContext): + output, success, valid = (context.result.get(k) for k in ("output", "success", "valid")) + + with with_model(context.request.id) as model: + model.status_signatures = Model.Status.NOT_APPLICABLE if not output else Model.Status.VALID if valid else Model.Status.INVALID + + def create_outcome(di): + return ValidationOutcome( + severity=ValidationOutcome.OutcomeSeverity.ERROR if di.get("signature") == "invalid" else ValidationOutcome.OutcomeSeverity.PASSED, + outcome_code=ValidationOutcome.ValidationOutcomeCode.VALUE_ERROR if di.get("signature") == "invalid" else ValidationOutcome.ValidationOutcomeCode.PASSED, + observed=di, + feature=json.dumps({'digital_signature': 1}), + validation_task = context.task + ) + + ValidationOutcome.objects.bulk_create(list(map(create_outcome, output)), batch_size=DJANGO_DB_BULK_CREATE_BATCH_SIZE) + + model.save(update_fields=['status_signatures']) + return 'Digital signature check completed' if success else f"Script returned exit code {context.result.returncode} and {context.result.stderr}" diff --git a/backend/apps/ifc_validation/tasks/processing/gherkin.py b/backend/apps/ifc_validation/tasks/processing/gherkin.py new file mode 100644 index 00000000..a38d5a93 --- /dev/null +++ b/backend/apps/ifc_validation/tasks/processing/gherkin.py @@ -0,0 +1,25 @@ +from apps.ifc_validation_models.models import Model + +from .. import TaskContext, logger, with_model + +def process_gherkin_outcomes(context:TaskContext): + with with_model(context.request.id) as model: + # @gh todo, actually write gherkin results to DB here, currently in gherkin environment.py + status_field = context.config.status_field.name + agg_status = context.task.determine_aggregate_status() + setattr(model, status_field, agg_status) + model.save(update_fields=[status_field]) + + return f'agg_status = {Model.Status(agg_status).label}\nraw_output = {context.result}' + +def process_normative_ia(context:TaskContext): + return process_gherkin_outcomes(context) + +def process_normative_ip(context:TaskContext): + return process_gherkin_outcomes(context) + +def process_prerequisites(context:TaskContext): + return process_gherkin_outcomes(context) + +def process_industry_practices(context:TaskContext): + return process_gherkin_outcomes(context) \ No newline at end of file diff --git a/backend/apps/ifc_validation/tasks/processing/header.py b/backend/apps/ifc_validation/tasks/processing/header.py new file mode 100644 index 00000000..a1d47a0a --- /dev/null +++ b/backend/apps/ifc_validation/tasks/processing/header.py @@ -0,0 +1,86 @@ +import json +import datetime +import os + +from apps.ifc_validation_models.models import Model, AuthoringTool, Company +from .. import TaskContext, logger, with_model + +def process_header(context:TaskContext): + header_validation = context.result + + with with_model(context.request.id) as model: + agg_status = context.task.determine_aggregate_status() + model.status_prereq = agg_status + model.size = os.path.getsize(context.file_path) + logger.debug(f'Detected size = {model.size} bytes') + + model.schema = header_validation.get('schema_identifier') + logger.debug(f'The schema identifier = {header_validation.get("schema")}') + # time_stamp + if ifc_file_time_stamp := header_validation.get('time_stamp', False): + try: + logger.debug(f'Timestamp within file = {ifc_file_time_stamp}') + date = datetime.datetime.strptime(ifc_file_time_stamp, "%Y-%m-%dT%H:%M:%S") + date_with_tz = datetime.datetime( + date.year, + date.month, + date.day, + date.hour, + date.minute, + date.second, + tzinfo=datetime.timezone.utc) + model.date = date_with_tz + except ValueError: + try: + model.date = datetime.datetime.fromisoformat(ifc_file_time_stamp) + except ValueError: + pass + + # mvd + model.mvd = header_validation.get('mvd') + + app = header_validation.get('application_name') + + version = header_validation.get('version') + name = None if any(value in (None, "Not defined") for value in (app, version)) else app + ' ' + version + company_name = header_validation.get('company_name') + logger.debug(f'Detected Authoring Tool in file = {name}') + + validation_errors = header_validation.get('validation_errors', []) + invalid_marker_fields = ['originating_system', 'version', 'company_name', 'application_name'] + if any(field in validation_errors for field in invalid_marker_fields): + model.status_header = Model.Status.INVALID + else: + # parsing was successful and model can be considered for scorecards + model.status_header = Model.Status.VALID + authoring_tool = AuthoringTool.find_by_full_name(full_name=name) + if (isinstance(authoring_tool, AuthoringTool)): + + if authoring_tool.company is None: + company, _ = Company.objects.get_or_create(name=company_name) + authoring_tool.company = company + authoring_tool.save() + logger.debug(f'Updated existing Authoring Tool with company: {company.name}') + + model.produced_by = authoring_tool + logger.debug(f'Retrieved existing Authoring Tool from DB = {model.produced_by.full_name}') + + elif authoring_tool is None: + company, _ = Company.objects.get_or_create(name=company_name) + authoring_tool, _ = AuthoringTool.objects.get_or_create( + company=company, + name=app, + version=version + ) + model.produced_by = authoring_tool + logger.debug(f'Authoring app not found, ApplicationFullName = {app}, Version = {version} - created new instance') + else: + model.produced_by = None + logger.warning(f'Retrieved multiple Authoring Tool from DB: {authoring_tool} - could not assign any') + + # update header validation + model.header_validation = header_validation + model.save(update_fields=['status_header', 'header_validation']) + model.save() + + return f'agg_status = {Model.Status(agg_status).label}\nraw_output = {header_validation}' \ No newline at end of file diff --git a/backend/apps/ifc_validation/tasks/processing/instance_completion.py b/backend/apps/ifc_validation/tasks/processing/instance_completion.py new file mode 100644 index 00000000..1e26e6c5 --- /dev/null +++ b/backend/apps/ifc_validation/tasks/processing/instance_completion.py @@ -0,0 +1,21 @@ +import ifcopenshell +from .. import TaskContext, logger + +from apps.ifc_validation_models.models import ModelInstance + +from django.db import transaction + +def process_instance_completion(context:TaskContext): + # the current task doesn't have any execution layer and links instance ids to outcomes + ifc_file = ifcopenshell.open(context.file_path) + with transaction.atomic(): + model_id = context.request.model.id + model_instances = ModelInstance.objects.filter(model_id=model_id, ifc_type__in=[None, '']) + instance_count = model_instances.count() + logger.info(f'Retrieved {instance_count:,} ModelInstance record(s)') + + for inst in model_instances.iterator(): + inst.ifc_type = ifc_file[inst.stepfile_id].is_a() + inst.save() + + return f'Updated {instance_count:,} ModelInstance record(s)' diff --git a/backend/apps/ifc_validation/tasks/processing/schema.py b/backend/apps/ifc_validation/tasks/processing/schema.py new file mode 100644 index 00000000..2279461e --- /dev/null +++ b/backend/apps/ifc_validation/tasks/processing/schema.py @@ -0,0 +1,62 @@ +import json +from django.db import transaction + +from core.settings import DJANGO_DB_BULK_CREATE_BATCH_SIZE + +from apps.ifc_validation_models.models import ModelInstance, Model, ValidationOutcome +from .. import TaskContext, logger, with_model + + +def process_schema(context:TaskContext): + output, success, valid = (context.result.get(k) for k in ("output", "success", "valid")) + + with with_model(context.request.id) as model: + + if valid: + model.status_schema = Model.Status.VALID + context.task.outcomes.create( + severity=ValidationOutcome.OutcomeSeverity.PASSED, + outcome_code=ValidationOutcome.ValidationOutcomeCode.PASSED, + observed=None + ) + else: + model.status_schema = Model.Status.INVALID + outcomes_to_save = list() + outcomes_instances_to_save = list() + + for line in output: + message = json.loads(line) + outcome = ValidationOutcome( + severity=ValidationOutcome.OutcomeSeverity.ERROR, + outcome_code=ValidationOutcome.ValidationOutcomeCode.SCHEMA_ERROR, + observed=message['message'], + feature=json.dumps({ + 'type': message['type'] if 'type' in message else None, + 'attribute': message['attribute'] if 'attribute' in message else None + }), + validation_task=context.task + ) + outcomes_to_save.append(outcome) + if 'instance' in message and message['instance'] is not None and 'id' in message['instance'] and 'type' in message['instance']: + instance = ModelInstance( + stepfile_id=message['instance']['id'], + ifc_type=message['instance']['type'], + model=model + ) + outcome.instance_id = instance.stepfile_id # store for later reference (not persisted) + outcomes_instances_to_save.append(instance) + + ModelInstance.objects.bulk_create(outcomes_instances_to_save, batch_size=DJANGO_DB_BULK_CREATE_BATCH_SIZE, ignore_conflicts=True) #ignore existing + model_instances = dict(ModelInstance.objects.filter(model_id=model.id).values_list('stepfile_id', 'id')) # retrieve all + + for outcome in outcomes_to_save: + if outcome.instance_id: + instance_id = model_instances[outcome.instance_id] + if instance_id: + outcome.instance_id = instance_id + + ValidationOutcome.objects.bulk_create(outcomes_to_save, batch_size=DJANGO_DB_BULK_CREATE_BATCH_SIZE) + + model.save(update_fields=['status_schema']) + + return "No IFC schema errors." if success else f"'ifcopenshell.validate' returned exit code {context.proc.returncode} and {len(output):,} errors." diff --git a/backend/apps/ifc_validation/tasks/processing/syntax.py b/backend/apps/ifc_validation/tasks/processing/syntax.py new file mode 100644 index 00000000..112cbab9 --- /dev/null +++ b/backend/apps/ifc_validation/tasks/processing/syntax.py @@ -0,0 +1,49 @@ + +import json + +from apps.ifc_validation_models.models import Model, ValidationOutcome +from .. import TaskContext, logger, with_model + + +def process_syntax_outcomes(context:TaskContext): + #todo - unify output for all task executions + output, error_output, success = (context.result.get(k) for k in ("output", "error_output", "success")) + + # process + with with_model(context.request.id) as model: + status_field = context.config.status_field.name + task = context.task + if success: + setattr(model, status_field, Model.Status.VALID) + task.outcomes.create( + severity=ValidationOutcome.OutcomeSeverity.PASSED, + outcome_code=ValidationOutcome.ValidationOutcomeCode.PASSED, + observed=output if output else None + ) + elif error_output: + setattr(model, status_field, Model.Status.INVALID) + task.outcomes.create( + severity=ValidationOutcome.OutcomeSeverity.ERROR, + outcome_code=ValidationOutcome.ValidationOutcomeCode.SYNTAX_ERROR, + observed=list(filter(None, error_output.split("\n")))[-1] + ) + else: + for msg in json.loads(output): + setattr(model, status_field, Model.Status.INVALID) + task.outcomes.create( + severity=ValidationOutcome.OutcomeSeverity.ERROR, + outcome_code=ValidationOutcome.ValidationOutcomeCode.SYNTAX_ERROR, + observed=msg.get("message") + ) + + model.save(update_fields=[status_field]) + + # return reason for logging + return "No IFC syntax error(s)." if success else f"Found IFC syntax errors:\n\nConsole: \n{output}\n\nError: {error_output}" + + +def process_syntax(context:TaskContext): + return process_syntax_outcomes(context) + +def process_header_syntax(context:TaskContext): + return process_syntax_outcomes(context) \ No newline at end of file diff --git a/backend/apps/ifc_validation/tasks/task_runner.py b/backend/apps/ifc_validation/tasks/task_runner.py new file mode 100644 index 00000000..f21d7df8 --- /dev/null +++ b/backend/apps/ifc_validation/tasks/task_runner.py @@ -0,0 +1,225 @@ +import functools + +from celery import shared_task, chain, chord, group + +from core.utils import log_execution + +from apps.ifc_validation_models.decorators import requires_django_user_context +from apps.ifc_validation_models.models import * +from .configs import task_registry +from .context import TaskContext +from .utils import get_absolute_file_path +from .logger import logger +from .email_tasks import * + + +assert task_registry.total_increment() == 100 + +def check_proc_success_or_fail(proc, task): + if proc.returncode is not None and proc.returncode != 0: + error_message = f"Running {' '.join(proc.args)} failed with exit code {proc.returncode}\n{proc.stdout}\n{proc.stderr}" + task.mark_as_failed(error_message) + raise RuntimeError(error_message) + return proc.stdout + +@shared_task(bind=True) +@log_execution +def error_handler(self, *args, **kwargs): + + on_workflow_failed.delay(*args, **kwargs) + + +@shared_task(bind=True) +@log_execution +def chord_error_handler(self, request, exc, traceback, *args, **kwargs): + + on_workflow_failed.apply_async([request, exc, traceback]) + +@shared_task(bind=True) +@log_execution +@requires_django_user_context +def on_workflow_started(self, *args, **kwargs): + + # update status + id = kwargs.get('id') + reason = f"args={args} kwargs={kwargs}" + request = ValidationRequest.objects.get(pk=id) + request.mark_as_initiated(reason) + + # queue sending emails + nbr_of_tasks = request.tasks.count() + if nbr_of_tasks == 0: + # send_acknowledgement_user_email_task.delay(id=id, file_name=request.file_name) # disabled + send_acknowledgement_admin_email_task.delay(id=id, file_name=request.file_name) + else: + # send_revalidating_user_email_task.delay(id=id, file_name=request.file_name) # disabled + send_revalidating_admin_email_task.delay(id=id, file_name=request.file_name) + + +@shared_task(bind=True) +@log_execution +@requires_django_user_context +def on_workflow_completed(self, result, **kwargs): + + # update status + id = kwargs.get('id') + if not isinstance(id, int): + raise ValueError(f"Invalid id: {id!r}") + reason = "Processing completed" + request = ValidationRequest.objects.get(pk=id) + request.mark_as_completed(reason) + + # queue sending email + send_completion_email_task.delay(id=id, file_name=request.file_name) + + +@shared_task(bind=True) +@log_execution +@requires_django_user_context +def on_workflow_failed(self, *args, **kwargs): + + logger.debug(f'Function {self.__name__} called with args={args} kwargs={kwargs}') + + # update status + id = args[1] + reason = f"Processing failed: args={args} kwargs={kwargs}" + request = ValidationRequest.objects.get(pk=id) + request.mark_as_failed(reason) + + # queue sending email + send_failure_email_task.delay(id=id, file_name=request.file_name) + send_failure_admin_email_task.delay(id=id, file_name=request.file_name) + + +def task_factory(task_type): + config = task_registry[task_type] + + @shared_task(bind=True, name=config.celery_task_name) + @log_execution + @requires_django_user_context + def validation_subtask_runner(self, *args, **kwargs): + + id = kwargs.get('id') + + request = ValidationRequest.objects.get(pk=id) + file_path = get_absolute_file_path(request.file.name) + + # Always create the task record, even if it will be skipped due to blocking conditions, + # so it is logged and its status can be marked as 'skipped' + task = ValidationTask.objects.create(request=request, type=task_type) + + if model := request.model: + invalid_blockers = list(filter( + lambda b: getattr(model, task_registry[b].status_field.name) == Model.Status.INVALID, + task_registry.get_blockers_of(task_type) + )) + else: # for testing, we're not instantiating a model + invalid_blockers = [] + + # update progress + increment = config.increment + request.progress = min(request.progress + increment, 100) + request.save() + + # run or skip + if not invalid_blockers: + task.mark_as_initiated() + + # Execution Layer + try: + context = config.check_program(TaskContext( + config=config, + task=task, + request=request, + file_path=file_path, + )) + except Exception as err: + task.mark_as_failed(str(err)) + logger.exception(f"Execution failed in task {task_type}: {task}") + return + + # Processing Layer / write to DB + try: + reason = config.process_results(context) + task.mark_as_completed(reason) + logger.debug(f"Task {task_type} completed, reason: {reason}") + except Exception as err: + task.mark_as_failed(str(err)) + logger.exception(f"Processing failed in task {task_type}: {err}") + return + + # Handle skipped tasks + else: + reason = f"Skipped due to fail in blocking tasks: {', '.join(invalid_blockers)}" + logger.debug(reason) + task.mark_as_skipped(reason) + + validation_subtask_runner.__doc__ = f"Validation task for {task_type} generated by the task_factory func." + return validation_subtask_runner + + +@shared_task(bind=True) +@log_execution +def ifc_file_validation_task(self, id, file_name, *args, **kwargs): + + if id is None or file_name is None: + raise ValueError("Arguments 'id' and/or 'file_name' are required.") + + error_task = error_handler.s(id, file_name) + chord_error_task = chord_error_handler.s(id, file_name) + + workflow_started = on_workflow_started.s(id=id, file_name=file_name) + workflow_completed = on_workflow_completed.s(id=id, file_name=file_name) + + serial_tasks = chain( + header_syntax_validation_subtask.s(id=id, file_name=file_name), + header_validation_subtask.s(id=id, file_name=file_name), + syntax_validation_subtask.s(id=id, file_name=file_name), + prerequisites_subtask.s(id=id, file_name=file_name), + ) + + parallel_tasks = group([ + digital_signatures_subtask.s(id=id, file_name=file_name), + schema_validation_subtask.s(id=id, file_name=file_name), + #bsdd_validation_subtask.s(id=id, file_name=file_name), # disabled + normative_rules_ia_validation_subtask.s(id=id, file_name=file_name), + normative_rules_ip_validation_subtask.s(id=id, file_name=file_name), + industry_practices_subtask.s(id=id, file_name=file_name) + ]) + + final_tasks = chain( + instance_completion_subtask.s(id=id, file_name=file_name) + ) + + workflow = ( + workflow_started | + serial_tasks | + chord( + chord(parallel_tasks, final_tasks).on_error(chord_error_task), + workflow_completed + ).on_error(chord_error_task)) + workflow.set(link_error=[error_task]) + workflow.apply_async() + + +instance_completion_subtask = task_factory(ValidationTask.Type.INSTANCE_COMPLETION) + +normative_rules_ia_validation_subtask = task_factory(ValidationTask.Type.NORMATIVE_IA) + +normative_rules_ip_validation_subtask = task_factory(ValidationTask.Type.NORMATIVE_IP) + +prerequisites_subtask = task_factory(ValidationTask.Type.PREREQUISITES) + +syntax_validation_subtask = task_factory(ValidationTask.Type.SYNTAX) + +header_syntax_validation_subtask = task_factory(ValidationTask.Type.HEADER_SYNTAX) + +schema_validation_subtask = task_factory(ValidationTask.Type.SCHEMA) + +header_validation_subtask = task_factory(ValidationTask.Type.HEADER) + +digital_signatures_subtask = task_factory(ValidationTask.Type.DIGITAL_SIGNATURES) + +bsdd_validation_subtask = task_factory(ValidationTask.Type.BSDD) + +industry_practices_subtask = task_factory(ValidationTask.Type.INDUSTRY_PRACTICES) diff --git a/backend/apps/ifc_validation/tasks/utils.py b/backend/apps/ifc_validation/tasks/utils.py new file mode 100644 index 00000000..74f95701 --- /dev/null +++ b/backend/apps/ifc_validation/tasks/utils.py @@ -0,0 +1,75 @@ +import contextlib +import functools +import os + +from django.db import transaction + +from core.utils import log_execution +from celery.utils.log import get_task_logger + +from apps.ifc_validation_models.settings import MEDIA_ROOT +from apps.ifc_validation_models.decorators import requires_django_user_context +from apps.ifc_validation_models.models import ValidationRequest, Model + +logger = get_task_logger(__name__) + +@log_execution +@requires_django_user_context +@transaction.atomic +# @requires_django_exclusive_table_lock(Model, 'EXCLUSIVE') +# --> table lock, slower - DO NOT USE +def get_or_create_ifc_model(request_id): + + id = request_id + request = ValidationRequest.objects.get(pk=id) + if request.model is None: + + # acquire row lock (... uses "FOR UPDATE" hint) + request = ValidationRequest.objects.select_for_update().get(pk=id) + + model, _ = Model.objects.get_or_create( + file_name=request.file_name, + file=request.file, + size=request.file.size, + uploaded_by=request.created_by + ) + request.model = model + request.save() + + return model + + else: + return request.model + + +@contextlib.contextmanager +def with_model(request_id): + with transaction.atomic(): + yield get_or_create_ifc_model(request_id) + + +@functools.lru_cache(maxsize=1024) +def get_absolute_file_path(file_name): + + """ + Resolves the absolute file path of an uploaded file and checks if it exists. + It tries resolving Django MEDIA_ROOT and current working directory, and caches the result. + + Mandatory Args: + file_name: relative file name of the uploaded file. + + Returns: + Absolute file path of the uploaded file. + """ + + ifc_fn = os.path.join(MEDIA_ROOT, file_name) + + if not os.path.exists(ifc_fn): + ifc_fn2 = os.path.join(os.getcwd(), ifc_fn) + if not os.path.exists(ifc_fn2): + raise FileNotFoundError(f"File path for file_name={file_name} was not found (tried loading '{ifc_fn}' and '{ifc_fn2}').") + + ifc_fn = os.path.abspath(ifc_fn) + + logger.debug(f"get_absolute_file_path(): file_name={file_name} returned '{ifc_fn}'") + return ifc_fn \ No newline at end of file diff --git a/backend/core/settings.py b/backend/core/settings.py index e32ae8fc..57fe62a5 100644 --- a/backend/core/settings.py +++ b/backend/core/settings.py @@ -300,6 +300,7 @@ CELERY_WORKER_SEND_TASK_EVENTS = True CELERY_TASK_TRACK_STARTED = True CELERY_RESULT_EXPIRES = 90*24*3600 # Results in backend expire after 3 months +CELERY_TASK_ALLOW_ERROR_CB_ON_CHORD_HEADER = True # reliability settings - see https://www.francoisvoron.com/blog/configure-celery-for-reliable-delivery CELERY_TASK_REJECT_ON_WORKER_LOST = True diff --git a/docker/backend/Dockerfile b/docker/backend/Dockerfile index 1e2d6faa..99b15cca 100644 --- a/docker/backend/Dockerfile +++ b/docker/backend/Dockerfile @@ -37,7 +37,7 @@ RUN --mount=type=cache,target=/root/.cache \ find /app/backend -name 'requirements.txt' -exec pip install --no-cache-dir -r {} \; && \ # use version of ifcopenshell with desired schema parsing # TODO: revert to pyPI when schema parsing is published in the future - wget -O /tmp/ifcopenshell_python.zip "https://s3.amazonaws.com/ifcopenshell-builds/ifcopenshell-python-311-v0.8.3-260bc80-linux64.zip" && \ + wget -O /tmp/ifcopenshell_python.zip "https://s3.amazonaws.com/ifcopenshell-builds/ifcopenshell-python-311-v0.8.4-6924012-linux64.zip" && \ mkdir -p /opt/venv/lib/python3.11/site-packages && \ unzip -d /opt/venv/lib/python3.11/site-packages /tmp/ifcopenshell_python.zip && \ # some cleanup