Skip to content

Commit

Permalink
Merge pull request #19526 from mvdbeek/fix_error_message_when_subwork…
Browse files Browse the repository at this point in the history
…flow_input_connection_invalid

[24.2] Fix error message when subworkflow input connection missing
  • Loading branch information
mvdbeek authored Feb 5, 2025
2 parents bcfc88a + 2fc6191 commit 0696805
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 16 deletions.
20 changes: 10 additions & 10 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7911,12 +7911,12 @@ class Workflow(Base, Dictifiable, RepresentById):
has_cycles: Mapped[Optional[bool]]
has_errors: Mapped[Optional[bool]]
reports_config: Mapped[Optional[bytes]] = mapped_column(JSONType)
creator_metadata: Mapped[Optional[bytes]] = mapped_column(JSONType)
creator_metadata: Mapped[Optional[List[Dict[str, Any]]]] = mapped_column(JSONType)
license: Mapped[Optional[str]] = mapped_column(TEXT)
source_metadata: Mapped[Optional[Dict[str, str]]] = mapped_column(JSONType)
uuid: Mapped[Optional[Union[UUID, str]]] = mapped_column(UUIDType)

steps = relationship(
steps: Mapped[List["WorkflowStep"]] = relationship(
"WorkflowStep",
back_populates="workflow",
primaryjoin=(lambda: Workflow.id == WorkflowStep.workflow_id),
Expand Down Expand Up @@ -7967,7 +7967,7 @@ def to_dict(self, view="collection", value_mapper=None):
return rval

@property
def steps_by_id(self):
def steps_by_id(self) -> Dict[int, "WorkflowStep"]:
steps = {}
for step in self.steps:
step_id = step.id
Expand Down Expand Up @@ -8142,7 +8142,7 @@ class WorkflowStep(Base, RepresentById, UsesCreateAndUpdateTime):
back_populates="workflow_step",
)
post_job_actions = relationship("PostJobAction", back_populates="workflow_step", cascade_backrefs=False)
inputs = relationship("WorkflowStepInput", back_populates="workflow_step")
inputs: Mapped[List["WorkflowStepInput"]] = relationship("WorkflowStepInput", back_populates="workflow_step")
workflow_outputs: Mapped[List["WorkflowOutput"]] = relationship(
back_populates="workflow_step", cascade_backrefs=False
)
Expand Down Expand Up @@ -8486,16 +8486,16 @@ class WorkflowStepConnection(Base, RepresentById):
output_name: Mapped[Optional[str]] = mapped_column(TEXT)
input_subworkflow_step_id: Mapped[Optional[int]] = mapped_column(ForeignKey("workflow_step.id"), index=True)

input_step_input = relationship(
input_step_input: Mapped["WorkflowStepInput"] = relationship(
"WorkflowStepInput",
back_populates="connections",
cascade="all",
primaryjoin=(lambda: WorkflowStepConnection.input_step_input_id == WorkflowStepInput.id),
)
input_subworkflow_step = relationship(
input_subworkflow_step: Mapped[Optional["WorkflowStep"]] = relationship(
"WorkflowStep", primaryjoin=(lambda: WorkflowStepConnection.input_subworkflow_step_id == WorkflowStep.id)
)
output_step = relationship(
output_step: Mapped["WorkflowStep"] = relationship(
"WorkflowStep",
back_populates="output_connections",
cascade="all",
Expand All @@ -8519,7 +8519,7 @@ def input_name(self):

@property
def input_step(self) -> Optional[WorkflowStep]:
return self.input_step_input and self.input_step_input.workflow_step
return self.input_step_input.workflow_step

@property
def input_step_id(self):
Expand Down Expand Up @@ -8736,7 +8736,7 @@ class WorkflowInvocation(Base, UsesCreateAndUpdateTime, Dictifiable, Serializabl
order_by=lambda: WorkflowInvocationStep.order_index,
cascade_backrefs=False,
)
workflow = relationship("Workflow")
workflow: Mapped[Workflow] = relationship("Workflow")
output_dataset_collections = relationship(
"WorkflowInvocationOutputDatasetCollectionAssociation",
back_populates="workflow_invocation",
Expand Down Expand Up @@ -9659,7 +9659,7 @@ class WorkflowRequestStepState(Base, Dictifiable, Serializable):
ForeignKey("workflow_invocation.id", onupdate="CASCADE", ondelete="CASCADE"), index=True
)
workflow_step_id: Mapped[Optional[int]] = mapped_column(ForeignKey("workflow_step.id"))
value: Mapped[Optional[bytes]] = mapped_column(MutableJSONType)
value: Mapped[Optional[Dict[str, Any]]] = mapped_column(MutableJSONType)
workflow_step: Mapped[Optional["WorkflowStep"]] = relationship()
workflow_invocation: Mapped[Optional["WorkflowInvocation"]] = relationship(back_populates="step_states")

Expand Down
18 changes: 15 additions & 3 deletions lib/galaxy/workflow/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ def invoke(self) -> Dict[int, Any]:
step_delayed = delayed_steps = True
self.progress.mark_step_outputs_delayed(step, why=de.why)
except Exception as e:
log_function = log.exception
log_function = log.error
if isinstance(e, modules.FailWorkflowEvaluation) and e.why.reason in FAILURE_REASONS_EXPECTED:
log_function = log.info
log_function(
Expand Down Expand Up @@ -704,7 +704,8 @@ def subworkflow_progress(
for input_subworkflow_step in subworkflow.input_steps:
connection_found = False
subworkflow_step_id = input_subworkflow_step.id
for input_connection in step.input_connections:
input_connections = step.input_connections
for input_connection in input_connections:
if input_connection.input_subworkflow_step_id == subworkflow_step_id:
is_data = input_connection.output_step.type != "parameter_input"
replacement = self.replacement_for_connection(
Expand All @@ -715,7 +716,18 @@ def subworkflow_progress(
connection_found = True
break

if not connection_found and not input_subworkflow_step.input_optional:
if not input_subworkflow_step.input_optional and not connection_found:

if not input_connections:
# TODO: Prevent this on import / runtime !
raise modules.FailWorkflowEvaluation(
InvocationUnexpectedFailure(
reason=FailureReason.unexpected_failure,
workflow_step_id=step.id,
details="Subworkflow has disconnected required input.",
)
)

raise modules.FailWorkflowEvaluation(
InvocationFailureOutputNotFound(
reason=FailureReason.output_not_found,
Expand Down
2 changes: 2 additions & 0 deletions lib/galaxy/workflow/run_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ def build_workflow_run_configs(
step = steps_by_id[key]
if step.type == "parameter_input":
module_injector.inject(step)
assert step.module
input_param = step.module.get_runtime_inputs(step.module)["input"]
try:
input_param.validate(input_dict, trans=trans)
Expand Down Expand Up @@ -546,6 +547,7 @@ def add_parameter(name: str, value: str, type: WorkflowRequestInputParameter.typ
for step in workflow.steps:
steps_by_id[step.id] = step
assert step.module
assert step.state
serializable_runtime_state = step.module.encode_runtime_state(step, step.state)

step_state = WorkflowRequestStepState()
Expand Down
31 changes: 29 additions & 2 deletions lib/galaxy_test/api/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,8 @@ def _setup_workflow_run(
def _ds_entry(self, history_content):
return self.dataset_populator.ds_entry(history_content)

def _invocation_details(self, workflow_id, invocation_id, **kwds):
invocation_details_response = self._get(f"workflows/{workflow_id}/usage/{invocation_id}", data=kwds)
def _invocation_details(self, workflow_id: Optional[str], invocation_id: str, **kwds):
invocation_details_response = self._get(f"invocations/{invocation_id}", data=kwds)
self._assert_status_code_is(invocation_details_response, 200)
invocation_details = invocation_details_response.json()
return invocation_details
Expand Down Expand Up @@ -3483,6 +3483,33 @@ def filter_jobs_by_tool(tool_id):
)
assert output_filtered["element_count"] == 2, output_filtered

def test_subworkflow_missing_input_connection_error(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow(
"""
class: GalaxyWorkflow
inputs: []
steps:
subworkflow_step:
run:
class: GalaxyWorkflow
inputs:
my_input:
type: data
steps: []
""",
history_id=history_id,
assert_ok=False,
)
workflow_details = self._invocation_details(summary.workflow_id, summary.invocation_id)
assert workflow_details["messages"] == [
{
"details": "Subworkflow has disconnected required input.",
"reason": "unexpected_failure",
"workflow_step_id": 0,
}
]

def test_workflow_request(self):
workflow = self.workflow_populator.load_workflow(name="test_for_queue")
workflow_request, history_id, workflow_id = self._setup_workflow_run(workflow)
Expand Down
2 changes: 1 addition & 1 deletion test/unit/workflows/test_workflow_progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def _set_previous_progress(self, outputs):

workflow_invocation_step_state = model.WorkflowRequestStepState()
workflow_invocation_step_state.workflow_step_id = step_id
workflow_invocation_step_state.value = cast(bytes, True)
workflow_invocation_step_state.value = {"my_param": True}
self.invocation.step_states.append(workflow_invocation_step_state)

def _step(self, index):
Expand Down

0 comments on commit 0696805

Please sign in to comment.