Skip to content

Commit 90b01a9

Browse files
Kun-Lung WuGitHub Enterprise
authored andcommitted
Merge pull request #39 from codeflare/test
Merge unit tests from test branch to develop branch
2 parents 616516b + 2613953 commit 90b01a9

File tree

4 files changed

+464
-0
lines changed

4 files changed

+464
-0
lines changed
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# Codeflare pipeline tests
2+
3+
Contents:
4+
5+
* [Unit test execution](#unit-test-execution)
6+
* [Unit test coverage](#unit-test-coverage)
7+
* [Use pytest as test framework](#use-pytest-as-test-framework)
8+
9+
## Unit test execution
10+
Run pytest to execute all test code in the folder
11+
```
12+
pytest
13+
```
14+
15+
## Unit test coverage
16+
* and (fan-in) node in a pipeline graph, and variants
17+
* or (fan-out) node in a pipeline graph, and variants
18+
* multibranch with mixtures of and/or nodes in a pipeline graph
19+
20+
## Use pytest as test framework
21+
PyTest is a testing framework in Python, with simple and easy syntax targeting unit tests and simple functional tests. PyTest can run tests in parallel and automatically detects tests in the test folder. PyTest serves the current goal of testing Codeflare pipelines well.
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
import pytest
2+
import ray
3+
import pandas as pd
4+
import numpy as np
5+
from sklearn.preprocessing import StandardScaler, MinMaxScaler, MaxAbsScaler, RobustScaler
6+
import codeflare.pipelines.Datamodel as dm
7+
import codeflare.pipelines.Runtime as rt
8+
from codeflare.pipelines.Datamodel import Xy
9+
from codeflare.pipelines.Datamodel import XYRef
10+
from codeflare.pipelines.Runtime import ExecutionType
11+
12+
class FeatureUnion(dm.AndTransform):
13+
def __init__(self):
14+
pass
15+
16+
def transform(self, xy_list):
17+
X_list = []
18+
y_vec = None
19+
20+
for xy in xy_list:
21+
X_list.append(xy.get_x())
22+
y_vec = xy.get_y()
23+
X_concat = np.concatenate(X_list, axis=1)
24+
25+
return Xy(X_concat, y_vec)
26+
27+
def test_two_tier_and():
28+
29+
ray.shutdown()
30+
ray.init()
31+
32+
## prepare the data
33+
X = np.random.randint(0,100,size=(10000, 4))
34+
y = np.random.randint(0,2,size=(10000, 1))
35+
36+
## initialize codeflare pipeline by first creating the nodes
37+
pipeline = dm.Pipeline()
38+
node_a = dm.EstimatorNode('a', MinMaxScaler())
39+
node_b = dm.EstimatorNode('b', StandardScaler())
40+
node_c = dm.EstimatorNode('c', MaxAbsScaler())
41+
node_d = dm.EstimatorNode('d', RobustScaler())
42+
43+
node_e = dm.AndNode('e', FeatureUnion())
44+
node_f = dm.AndNode('f', FeatureUnion())
45+
node_g = dm.AndNode('g', FeatureUnion())
46+
47+
## codeflare nodes are then connected by edges
48+
pipeline.add_edge(node_a, node_e)
49+
pipeline.add_edge(node_b, node_e)
50+
pipeline.add_edge(node_c, node_f)
51+
pipeline.add_edge(node_d, node_f)
52+
pipeline.add_edge(node_e, node_g)
53+
pipeline.add_edge(node_f, node_g)
54+
55+
pipeline_input = dm.PipelineInput()
56+
xy = dm.Xy(X,y)
57+
pipeline_input.add_xy_arg(node_a, xy)
58+
pipeline_input.add_xy_arg(node_b, xy)
59+
pipeline_input.add_xy_arg(node_c, xy)
60+
pipeline_input.add_xy_arg(node_d, xy)
61+
62+
## execute the codeflare pipeline
63+
pipeline_output = rt.execute_pipeline(pipeline, ExecutionType.FIT, pipeline_input)
64+
65+
## retrieve node e
66+
node_g_output = pipeline_output.get_xyrefs(node_g)
67+
Xout = ray.get(node_g_output[0].get_Xref())
68+
yout = ray.get(node_g_output[0].get_yref())
69+
70+
assert Xout.shape[0] == 10000
71+
assert yout.shape[0] == 10000
72+
73+
ray.shutdown()
74+
75+
def test_four_input_and():
76+
77+
ray.shutdown()
78+
ray.init()
79+
80+
## prepare the data
81+
X = np.random.randint(0,100,size=(10000, 4))
82+
y = np.random.randint(0,2,size=(10000, 1))
83+
84+
## initialize codeflare pipeline by first creating the nodes
85+
pipeline = dm.Pipeline()
86+
node_a = dm.EstimatorNode('a', MinMaxScaler())
87+
node_b = dm.EstimatorNode('b', StandardScaler())
88+
node_c = dm.EstimatorNode('c', MaxAbsScaler())
89+
node_d = dm.EstimatorNode('d', RobustScaler())
90+
91+
node_e = dm.AndNode('e', FeatureUnion())
92+
93+
## codeflare nodes are then connected by edges
94+
pipeline.add_edge(node_a, node_e)
95+
pipeline.add_edge(node_b, node_e)
96+
pipeline.add_edge(node_c, node_e)
97+
pipeline.add_edge(node_d, node_e)
98+
99+
pipeline_input = dm.PipelineInput()
100+
xy = dm.Xy(X,y)
101+
pipeline_input.add_xy_arg(node_a, xy)
102+
pipeline_input.add_xy_arg(node_b, xy)
103+
pipeline_input.add_xy_arg(node_c, xy)
104+
pipeline_input.add_xy_arg(node_d, xy)
105+
106+
## execute the codeflare pipeline
107+
pipeline_output = rt.execute_pipeline(pipeline, ExecutionType.FIT, pipeline_input)
108+
109+
## retrieve node e
110+
node_e_output = pipeline_output.get_xyrefs(node_e)
111+
Xout = ray.get(node_e_output[0].get_Xref())
112+
yout = ray.get(node_e_output[0].get_yref())
113+
114+
assert Xout.shape[0] == 10000
115+
assert yout.shape[0] == 10000
116+
117+
ray.shutdown()
118+
119+
def test_two_input_and():
120+
121+
ray.shutdown()
122+
ray.init()
123+
124+
## prepare the data
125+
X = np.random.randint(0,100,size=(10000, 4))
126+
y = np.random.randint(0,2,size=(10000, 1))
127+
128+
## initialize codeflare pipeline by first creating the nodes
129+
pipeline = dm.Pipeline()
130+
node_a = dm.EstimatorNode('a', MinMaxScaler())
131+
node_b = dm.EstimatorNode('b', StandardScaler())
132+
node_c = dm.AndNode('c', FeatureUnion())
133+
134+
## codeflare nodes are then connected by edges
135+
pipeline.add_edge(node_a, node_c)
136+
pipeline.add_edge(node_b, node_c)
137+
138+
pipeline_input = dm.PipelineInput()
139+
xy = dm.Xy(X,y)
140+
pipeline_input.add_xy_arg(node_a, xy)
141+
pipeline_input.add_xy_arg(node_b, xy)
142+
143+
## execute the codeflare pipeline
144+
pipeline_output = rt.execute_pipeline(pipeline, ExecutionType.FIT, pipeline_input)
145+
146+
## retrieve node c
147+
node_c_output = pipeline_output.get_xyrefs(node_c)
148+
Xout = ray.get(node_c_output[0].get_Xref())
149+
yout = ray.get(node_c_output[0].get_yref())
150+
151+
assert Xout.shape[0] == 10000
152+
assert yout.shape[0] == 10000
153+
154+
ray.shutdown()
155+
156+
157+
if __name__ == "__main__":
158+
sys.exit(pytest.main(["-v", __file__]))
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
import pytest
2+
import ray
3+
import pandas as pd
4+
import numpy as np
5+
from sklearn.compose import ColumnTransformer
6+
from sklearn.model_selection import train_test_split
7+
from sklearn.pipeline import Pipeline
8+
from sklearn.preprocessing import StandardScaler, MinMaxScaler
9+
from sklearn.tree import DecisionTreeClassifier
10+
from sklearn.linear_model import LogisticRegression
11+
import codeflare.pipelines.Datamodel as dm
12+
import codeflare.pipelines.Runtime as rt
13+
from codeflare.pipelines.Datamodel import Xy
14+
from codeflare.pipelines.Datamodel import XYRef
15+
from codeflare.pipelines.Runtime import ExecutionType
16+
17+
class FeatureUnion(dm.AndTransform):
18+
def __init__(self):
19+
pass
20+
21+
def transform(self, xy_list):
22+
X_list = []
23+
y_vec = None
24+
25+
for xy in xy_list:
26+
X_list.append(xy.get_x())
27+
y_vec = xy.get_y()
28+
X_concat = np.concatenate(X_list, axis=1)
29+
30+
return Xy(X_concat, y_vec.values.ravel())
31+
32+
def test_multibranch_1():
33+
34+
ray.shutdown()
35+
ray.init()
36+
37+
## prepare the data
38+
X = pd.DataFrame(np.random.randint(0,100,size=(10000, 4)), columns=list('ABCD'))
39+
y = pd.DataFrame(np.random.randint(0,2,size=(10000, 1)), columns=['Label'])
40+
41+
numeric_features = X.select_dtypes(include=['int64']).columns
42+
numeric_transformer = Pipeline(steps=[
43+
('scaler', StandardScaler())])
44+
45+
## set up preprocessor as StandardScaler
46+
preprocessor = ColumnTransformer(
47+
transformers=[
48+
('num', numeric_transformer, numeric_features),
49+
])
50+
51+
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
52+
53+
## initialize codeflare pipeline by first creating the nodes
54+
pipeline = dm.Pipeline()
55+
56+
node_a = dm.EstimatorNode('preprocess', preprocessor)
57+
node_b = dm.EstimatorNode('s_b', MinMaxScaler())
58+
node_c = dm.AndNode('s_c', FeatureUnion())
59+
node_d = dm.EstimatorNode('c_d', LogisticRegression())
60+
node_e = dm.EstimatorNode('c_e', DecisionTreeClassifier(max_depth=3))
61+
62+
## codeflare nodes are then connected by edges
63+
pipeline.add_edge(node_a, node_b)
64+
pipeline.add_edge(node_b, node_c)
65+
pipeline.add_edge(node_c, node_d)
66+
pipeline.add_edge(node_c, node_e)
67+
68+
pipeline_input = dm.PipelineInput()
69+
xy = dm.Xy(X_train, y_train)
70+
pipeline_input.add_xy_arg(node_a, xy)
71+
72+
## execute the codeflare pipeline
73+
pipeline_output = rt.execute_pipeline(pipeline, ExecutionType.FIT, pipeline_input)
74+
75+
## retrieve node e
76+
node_e_output = pipeline_output.get_xyrefs(node_e)
77+
Xout = ray.get(node_e_output[0].get_Xref())
78+
yout = ray.get(node_e_output[0].get_yref())
79+
80+
assert Xout.shape[0] == 8000
81+
assert yout.shape[0] == 8000
82+
83+
ray.shutdown()
84+
85+
def test_multibranch_2():
86+
87+
ray.shutdown()
88+
ray.init()
89+
90+
## prepare the data
91+
X = pd.DataFrame(np.random.randint(0,100,size=(10000, 4)), columns=list('ABCD'))
92+
y = pd.DataFrame(np.random.randint(0,2,size=(10000, 1)), columns=['Label'])
93+
94+
numeric_features = X.select_dtypes(include=['int64']).columns
95+
numeric_transformer = Pipeline(steps=[
96+
('scaler', StandardScaler())])
97+
98+
## set up preprocessor as StandardScaler
99+
preprocessor = ColumnTransformer(
100+
transformers=[
101+
('num', numeric_transformer, numeric_features),
102+
])
103+
104+
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
105+
106+
## initialize codeflare pipeline by first creating the nodes
107+
pipeline = dm.Pipeline()
108+
109+
node_a = dm.EstimatorNode('preprocess', preprocessor)
110+
node_b = dm.EstimatorNode('c_a', DecisionTreeClassifier(max_depth=3))
111+
node_c = dm.EstimatorNode('c_b', LogisticRegression())
112+
113+
node_d = dm.EstimatorNode('s_d', MinMaxScaler())
114+
node_e = dm.EstimatorNode('s_e', StandardScaler())
115+
node_f = dm.AndNode('s_f', FeatureUnion())
116+
node_g = dm.EstimatorNode('c_g', DecisionTreeClassifier(max_depth=5))
117+
118+
## codeflare nodes are then connected by edges
119+
pipeline.add_edge(node_a, node_b)
120+
pipeline.add_edge(node_a, node_c)
121+
122+
pipeline.add_edge(node_a, node_d)
123+
pipeline.add_edge(node_a, node_e)
124+
pipeline.add_edge(node_d, node_f)
125+
pipeline.add_edge(node_e, node_f)
126+
pipeline.add_edge(node_f, node_g)
127+
128+
pipeline_input = dm.PipelineInput()
129+
xy = dm.Xy(X_train, y_train)
130+
pipeline_input.add_xy_arg(node_a, xy)
131+
132+
## execute the codeflare pipeline
133+
pipeline_output = rt.execute_pipeline(pipeline, ExecutionType.FIT, pipeline_input)
134+
135+
## retrieve node b
136+
node_b_output = pipeline_output.get_xyrefs(node_b)
137+
Xout = ray.get(node_b_output[0].get_Xref())
138+
yout = ray.get(node_b_output[0].get_yref())
139+
140+
assert Xout.shape[0] == 8000
141+
assert yout.shape[0] == 8000
142+
143+
## retrieve node g
144+
node_g_output = pipeline_output.get_xyrefs(node_g)
145+
Xout = ray.get(node_g_output[0].get_Xref())
146+
yout = ray.get(node_g_output[0].get_yref())
147+
148+
assert Xout.shape[0] == 8000
149+
assert yout.shape[0] == 8000
150+
151+
ray.shutdown()
152+
153+
154+
if __name__ == "__main__":
155+
sys.exit(pytest.main(["-v", __file__]))

0 commit comments

Comments
 (0)