Skip to content

Commit

Permalink
feature: replace dag by carp dag framework
Browse files Browse the repository at this point in the history
  • Loading branch information
wangqi committed Aug 25, 2024
1 parent 60e4b15 commit 665388c
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void run() {
public void onResponse(ActionResult result) {
try {
ActionContext context = result.getContext();
log.debug("workflow task {} run success!, globalInputs: {}, inputs: {}, outputs: {}",
log.info("workflow task {} run success!, globalInputs: {}, inputs: {}, outputs: {}",
configStepDTO.getStepName(), JacksonUtil.toJsonString(context.getGlobalInputs()), JacksonUtil.toJsonString(context.getInputs()), JacksonUtil.toJsonString(context.getOutputs()));
// 记录输出
DagStepDTO dagStepSuccessParam = new DagStepDTO();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public TaskChangeRunner(Long workflowInstanceId) {
@Override
public void run() {
DagInstanceDTO dagInstanceDTO = dagInstanceComplexService.selectSimpleOne(workflowInstanceId);
if (dagInstanceDTO.getStatus().equals(WorkflowInstanceState.FAILURE.getValue())) {
if (WorkflowInstanceState.FAILURE.getValue().equals(dagInstanceDTO.getStatus())) {
return;
}

Expand All @@ -79,7 +79,7 @@ public void run() {
boolean isAnyFailure = false;
String anyFailureMessage = null;
for (DagStepDTO dagStepDTO : stepDag.nodes()) {
if (dagStepDTO.getStatus().equals(WorkflowTaskInstanceStage.FAILURE.getValue())) {
if (WorkflowTaskInstanceStage.FAILURE.getValue().equals(dagStepDTO.getStatus())) {
isAnyFailure = true;
// anyFailureMessage = dagStepDTO.getMessage();
break;
Expand All @@ -93,7 +93,7 @@ public void run() {
// 检测所有任务,如果都执行成功,则成功。在检测过程中,尝试启动所有后置节点
int successTaskCount = 0;
for (DagStepDTO dagStepDTO : stepDag.nodes()) {
if (dagStepDTO.getStatus().equals(WorkflowTaskInstanceStage.SUCCESS.getValue())) {
if (WorkflowTaskInstanceStage.SUCCESS.getValue().equals(dagStepDTO.getStatus())) {
successTaskCount++;
// 如果节点成功,尝试启动后继节点
Set<DagStepDTO> successors = stepDag.successors(dagStepDTO);
Expand All @@ -108,18 +108,18 @@ public void run() {

private void tryDeploySuccessor(Graph<DagStepDTO> stepDag, DagStepDTO dagStepDTO) {
// 已经执行过
if (dagStepDTO.getStatus().equals(WorkflowTaskInstanceStage.PENDING.getValue()) == false) {
if (dagStepDTO.getStatus() != null && WorkflowTaskInstanceStage.PENDING.getValue().equals(dagStepDTO.getStatus()) == false) {
return;
}
// 判断前驱节点是否全部成功
Set<DagStepDTO> predecessors = stepDag.predecessors(dagStepDTO);
for (DagStepDTO predecessor : predecessors) {
// 前驱节点未执行
if (predecessor.getStatus().equals(WorkflowTaskInstanceStage.PENDING.getValue())) {
if (WorkflowTaskInstanceStage.PENDING.getValue().equals(predecessor.getStatus())) {
return;
}
// 前驱节点未成功
if (predecessor.getStatus().equals(WorkflowTaskInstanceStage.SUCCESS.getValue()) == false) {
if (WorkflowTaskInstanceStage.SUCCESS.getValue().equals(predecessor.getStatus()) == false) {
return;
}
}
Expand Down

0 comments on commit 665388c

Please sign in to comment.