Skip to content

Commit b837571

Browse files
committed
- fix format problems
- add test examples - copy_artifact supports for sort Signed-off-by: zjgemi <[email protected]>
1 parent 4dd1560 commit b837571

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+2220
-1414
lines changed

CHANGELOG.md

+6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# CHANGELOG
22

3+
## 1.1.12
4+
5+
- fix format problems
6+
- add test examples
7+
- copy_artifact supports for sort
8+
39
## 1.1.11
410

511
### Added

VERSION

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.1.11
1+
1.1.12

examples/test_async.py

+15-14
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,29 @@
1-
from dflow import (
2-
ShellOPTemplate,
3-
InputArtifact,
4-
OutputArtifact,
5-
Workflow,
6-
Step
7-
)
81
import time
92

3+
from dflow import (InputArtifact, OutputArtifact, ShellOPTemplate, Step,
4+
Workflow)
5+
106
if __name__ == "__main__":
117
hello = ShellOPTemplate(name='Hello',
12-
image="alpine:latest",
13-
script="sleep 60 && echo Hello > /tmp/bar.txt")
8+
image="alpine:latest",
9+
script="sleep 60 && echo Hello > /tmp/bar.txt")
1410
hello.outputs.artifacts = {"bar": OutputArtifact(path="/tmp/bar.txt")}
1511

1612
echo = ShellOPTemplate(name='Echo',
17-
image="alpine:latest",
18-
script="while [ 1 ]; do echo 'waiting...'; if [ -f /tmp/foo.txt ]; then cat /tmp/foo.txt; break; fi; sleep 1; done")
13+
image="alpine:latest",
14+
script="while [ 1 ]; do echo 'waiting...';"
15+
" if [ -f /tmp/foo.txt ]; then cat /tmp/foo.txt;"
16+
" break; fi; sleep 1; done")
1917
echo.inputs.artifacts = {"foo": InputArtifact(path="/tmp/foo.txt")}
2018

2119
wf = Workflow(name="async")
2220
step0 = Step(name="hello", template=hello)
23-
# This step will give output artifact "bar" which contains "Hello" after 60 seconds
24-
step1 = Step(name="echo", template=echo, artifacts={"foo": step0.outputs.artifacts["bar"].pvc()})
25-
# This step will wait the last step to finish and then print its output artifact
21+
# This step will give output artifact "bar" which contains "Hello" after
22+
# 60 seconds
23+
step1 = Step(name="echo", template=echo, artifacts={
24+
"foo": step0.outputs.artifacts["bar"].pvc()})
25+
# This step will wait the last step to finish and then print its output
26+
# artifact
2627
wf.add([step0, step1])
2728
wf.submit()
2829

examples/test_big_parameter.py

+24-22
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,38 @@
11
import time
2-
from dflow import (
3-
Workflow,
4-
Step,
5-
Steps,
6-
InputParameter,
7-
OutputParameter
8-
)
9-
from dflow.python import (
10-
PythonOPTemplate,
11-
OP,
12-
OPIO,
13-
OPIOSign,
14-
BigParameter
15-
)
2+
3+
from dflow import InputParameter, OutputParameter, Step, Steps, Workflow
4+
from dflow.python import (OP, OPIO, BigParameter, OPIOSign, PythonOPTemplate,
5+
upload_packages)
6+
7+
if "__file__" in locals():
8+
upload_packages.append(__file__)
9+
1610

1711
class Hello:
1812
def __init__(self, msg):
1913
self.msg = msg
2014

15+
2116
class Duplicate(OP):
2217
def __init__(self):
2318
pass
2419

2520
@classmethod
2621
def get_input_sign(cls):
2722
return OPIOSign({
28-
'foo' : BigParameter(Hello)
23+
'foo': BigParameter(Hello)
2924
})
3025

3126
@classmethod
3227
def get_output_sign(cls):
3328
return OPIOSign({
34-
'foo' : BigParameter(Hello)
29+
'foo': BigParameter(Hello)
3530
})
3631

3732
@OP.exec_sign_check
3833
def execute(
3934
self,
40-
op_in : OPIO,
35+
op_in: OPIO,
4136
) -> OPIO:
4237
foo = op_in["foo"]
4338
print(foo.msg)
@@ -47,32 +42,35 @@ def execute(
4742
})
4843
return op_out
4944

50-
if __name__ == "__main__":
45+
46+
def test_big_parameter():
5147
wf = Workflow(name="big-param")
5248

5349
steps = Steps(name="hello-steps")
5450
steps.inputs.parameters["foo"] = InputParameter()
5551
steps.outputs.parameters["foo"] = OutputParameter()
5652

5753
step1 = Step(
58-
name="step1",
54+
name="step1",
5955
template=PythonOPTemplate(Duplicate, image="python:3.8"),
6056
parameters={"foo": steps.inputs.parameters["foo"]},
6157
key="step1"
6258
)
6359
steps.add(step1)
6460

6561
step2 = Step(
66-
name="step2",
62+
name="step2",
6763
template=PythonOPTemplate(Duplicate, image="python:3.8"),
6864
parameters={"foo": step1.outputs.parameters["foo"]},
6965
key="step2"
7066
)
7167
steps.add(step2)
7268

73-
steps.outputs.parameters["foo"].value_from_parameter = step2.outputs.parameters["foo"]
69+
steps.outputs.parameters["foo"].value_from_parameter = \
70+
step2.outputs.parameters["foo"]
7471

75-
big_step = Step(name="big-step", template=steps, parameters={"foo": Hello("hello")})
72+
big_step = Step(name="big-step", template=steps,
73+
parameters={"foo": Hello("hello")})
7674
wf.add(big_step)
7775
wf.submit()
7876

@@ -88,3 +86,7 @@ def execute(
8886
wf = Workflow(name="big-param-resubmit")
8987
wf.add(big_step)
9088
wf.submit(reuse_step=[step])
89+
90+
91+
if __name__ == "__main__":
92+
test_big_parameter()

examples/test_conditional_outputs.py

+33-27
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,14 @@
1-
from dflow import (
2-
Workflow,
3-
Step,
4-
Steps,
5-
if_expression,
6-
Outputs,
7-
OutputArtifact,
8-
OutputParameter
9-
)
10-
from dflow.python import (
11-
PythonOPTemplate,
12-
OP,
13-
OPIO,
14-
OPIOSign,
15-
Artifact
16-
)
171
import random
2+
import time
3+
4+
from dflow import (OutputArtifact, OutputParameter, Outputs, Step, Steps,
5+
Workflow, if_expression)
6+
from dflow.python import (OP, OPIO, Artifact, OPIOSign, PythonOPTemplate,
7+
upload_packages)
8+
9+
if "__file__" in locals():
10+
upload_packages.append(__file__)
11+
1812

1913
class Random(OP):
2014
@classmethod
@@ -34,7 +28,7 @@ def get_output_sign(cls):
3428
@OP.exec_sign_check
3529
def execute(
3630
self,
37-
op_in : OPIO,
31+
op_in: OPIO,
3832
) -> OPIO:
3933
open("foo.txt", "w").write("head")
4034
open("bar.txt", "w").write("tail")
@@ -50,24 +44,36 @@ def execute(
5044
"bar": "bar.txt"
5145
})
5246

53-
if __name__ == "__main__":
54-
steps = Steps("conditional-steps", outputs=Outputs(parameters={"msg": OutputParameter()},
55-
artifacts={"res": OutputArtifact()}))
5647

57-
random = Step(
58-
name="random",
48+
def test_conditional_outputs():
49+
steps = Steps("conditional-steps", outputs=Outputs(
50+
parameters={"msg": OutputParameter()},
51+
artifacts={"res": OutputArtifact()}))
52+
53+
random_step = Step(
54+
name="random",
5955
template=PythonOPTemplate(Random, image="python:3.8")
6056
)
61-
steps.add(random)
57+
steps.add(random_step)
6258

6359
steps.outputs.parameters["msg"].value_from_expression = if_expression(
64-
_if=random.outputs.parameters["is_head"] == True,
65-
_then=random.outputs.parameters["msg1"], _else=random.outputs.parameters["msg2"])
60+
_if=random_step.outputs.parameters["is_head"],
61+
_then=random_step.outputs.parameters["msg1"],
62+
_else=random_step.outputs.parameters["msg2"])
6663

6764
steps.outputs.artifacts["res"].from_expression = if_expression(
68-
_if=random.outputs.parameters["is_head"] == True,
69-
_then=random.outputs.artifacts["foo"], _else=random.outputs.artifacts["bar"])
65+
_if=random_step.outputs.parameters["is_head"],
66+
_then=random_step.outputs.artifacts["foo"],
67+
_else=random_step.outputs.artifacts["bar"])
7068

7169
wf = Workflow(name="conditional", steps=steps)
7270

7371
wf.submit()
72+
while wf.query_status() in ["Pending", "Running"]:
73+
time.sleep(1)
74+
75+
assert(wf.query_status() == "Succeeded")
76+
77+
78+
if __name__ == "__main__":
79+
test_conditional_outputs()

examples/test_dag.py

+18-21
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,33 @@
1-
from dflow import (
2-
ShellOPTemplate,
3-
InputParameter,
4-
OutputParameter,
5-
InputArtifact,
6-
OutputArtifact,
7-
Workflow,
8-
Task,
9-
DAG,
10-
)
1+
from dflow import (DAG, InputArtifact, InputParameter, OutputArtifact,
2+
OutputParameter, ShellOPTemplate, Task, Workflow)
113

124
if __name__ == "__main__":
13-
hello = ShellOPTemplate(name='Hello',
14-
image="alpine:latest",
15-
script="echo Hello > /tmp/bar.txt && echo 1 > /tmp/result.txt")
16-
hello.outputs.parameters = {"msg": OutputParameter(value_from_path="/tmp/result.txt")}
5+
hello = ShellOPTemplate(
6+
name='Hello',
7+
image="alpine:latest",
8+
script="echo Hello > /tmp/bar.txt && echo 1 > /tmp/result.txt")
9+
hello.outputs.parameters = {"msg": OutputParameter(
10+
value_from_path="/tmp/result.txt")}
1711
hello.outputs.artifacts = {"bar": OutputArtifact(path="/tmp/bar.txt")}
1812

19-
duplicate = ShellOPTemplate(name='Duplicate',
20-
image="alpine:latest",
21-
script="cat /tmp/foo.txt /tmp/foo.txt > /tmp/bar.txt && echo $(({{inputs.parameters.msg}}*2)) > /tmp/result.txt")
13+
duplicate = ShellOPTemplate(
14+
name='Duplicate',
15+
image="alpine:latest",
16+
script="cat /tmp/foo.txt /tmp/foo.txt > /tmp/bar.txt && "
17+
"echo $(({{inputs.parameters.msg}}*2)) > /tmp/result.txt")
2218
duplicate.inputs.parameters = {"msg": InputParameter()}
23-
duplicate.outputs.parameters = {"msg": OutputParameter(value_from_path="/tmp/result.txt")}
19+
duplicate.outputs.parameters = {"msg": OutputParameter(
20+
value_from_path="/tmp/result.txt")}
2421
duplicate.inputs.artifacts = {"foo": InputArtifact(path="/tmp/foo.txt")}
2522
duplicate.outputs.artifacts = {"bar": OutputArtifact(path="/tmp/bar.txt")}
2623

2724
dag = DAG()
2825
hello0 = Task(name="hello0", template=hello)
2926
dag.add(hello0)
3027
hello1 = Task(name="hello1",
31-
template=duplicate,
32-
parameters={"msg": hello0.outputs.parameters["msg"]},
33-
artifacts={"foo": hello0.outputs.artifacts["bar"]})
28+
template=duplicate,
29+
parameters={"msg": hello0.outputs.parameters["msg"]},
30+
artifacts={"foo": hello0.outputs.artifacts["bar"]})
3431
dag.add(hello1)
3532

3633
wf = Workflow(name="dag", dag=dag)

0 commit comments

Comments
 (0)