Skip to content

Commit c401eda

Browse files
committed
added tests for more complex pipelines
1 parent d7b98f8 commit c401eda

File tree

4 files changed

+253
-28
lines changed

4 files changed

+253
-28
lines changed
Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Architecture decision record
1+
# Architecture decision record
22

33
Select a test framework for Codeflare pipeline.
44

@@ -8,10 +8,9 @@ Contents:
88
* [Unit test coverage](#unit-test-coverage)
99

1010
## Use pytest as test framework
11-
PyTest is a testing framework in Python, with simple and easy syntax targeting unit tests and simple functional tests. PyTest can run tests in parallel and automatically detects tests in the test folder. PyTest serves the current goal of testing Codeflare pipelines well.
11+
PyTest is a testing framework in Python, with simple and easy syntax targeting unit tests and simple functional tests. PyTest can run tests in parallel and automatically detects tests in the test folder. PyTest serves the current goal of testing Codeflare pipelines well.
1212

1313
## Unit test coverage
14-
* And node in the pipeline graph
15-
* Or node in the pipeline graph
16-
17-
14+
* and (fan-in) node in a pipeline graph, and variants
15+
* or (fan-out) node in a pipeline graph, and variants
16+
* multibranch with mixtures of and/or nodes in a pipeline graph

codeflare/pipelines/tests/test_and.py

Lines changed: 102 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import ray
33
import pandas as pd
44
import numpy as np
5-
from sklearn.preprocessing import StandardScaler, MinMaxScaler
5+
from sklearn.preprocessing import StandardScaler, MinMaxScaler, MaxAbsScaler, RobustScaler
66
import codeflare.pipelines.Datamodel as dm
77
import codeflare.pipelines.Runtime as rt
88
from codeflare.pipelines.Datamodel import Xy
@@ -15,15 +15,108 @@ def __init__(self):
1515

1616
def transform(self, xy_list):
1717
X_list = []
18-
y_list = []
18+
y_vec = None
1919

2020
for xy in xy_list:
2121
X_list.append(xy.get_x())
22+
y_vec = xy.get_y()
2223
X_concat = np.concatenate(X_list, axis=0)
2324

24-
return Xy(X_concat, None)
25+
return Xy(X_concat, y_vec)
2526

26-
def test_and():
27+
def test_two_tier_and():
28+
29+
ray.shutdown()
30+
ray.init()
31+
32+
## prepare the data
33+
X = np.random.randint(0,100,size=(10000, 4))
34+
y = np.random.randint(0,2,size=(10000, 1))
35+
36+
## initialize codeflare pipeline by first creating the nodes
37+
pipeline = dm.Pipeline()
38+
node_a = dm.EstimatorNode('a', MinMaxScaler())
39+
node_b = dm.EstimatorNode('b', StandardScaler())
40+
node_c = dm.EstimatorNode('c', MaxAbsScaler())
41+
node_d = dm.EstimatorNode('d', RobustScaler())
42+
43+
node_e = dm.AndNode('e', FeatureUnion())
44+
node_f = dm.AndNode('f', FeatureUnion())
45+
node_g = dm.AndNode('g', FeatureUnion())
46+
47+
## codeflare nodes are then connected by edges
48+
pipeline.add_edge(node_a, node_e)
49+
pipeline.add_edge(node_b, node_e)
50+
pipeline.add_edge(node_c, node_f)
51+
pipeline.add_edge(node_d, node_f)
52+
pipeline.add_edge(node_e, node_g)
53+
pipeline.add_edge(node_f, node_g)
54+
55+
pipeline_input = dm.PipelineInput()
56+
xy = dm.Xy(X,y)
57+
pipeline_input.add_xy_arg(node_a, xy)
58+
pipeline_input.add_xy_arg(node_b, xy)
59+
pipeline_input.add_xy_arg(node_c, xy)
60+
pipeline_input.add_xy_arg(node_d, xy)
61+
62+
## execute the codeflare pipeline
63+
pipeline_output = rt.execute_pipeline(pipeline, ExecutionType.FIT, pipeline_input)
64+
65+
## retrieve node e
66+
node_g_output = pipeline_output.get_xyrefs(node_g)
67+
Xout = ray.get(node_g_output[0].get_Xref())
68+
yout = ray.get(node_g_output[0].get_yref())
69+
70+
assert Xout.shape == (40000, 4)
71+
assert yout.shape == (10000, 1)
72+
73+
ray.shutdown()
74+
75+
def test_four_input_and():
76+
77+
ray.shutdown()
78+
ray.init()
79+
80+
## prepare the data
81+
X = np.random.randint(0,100,size=(10000, 4))
82+
y = np.random.randint(0,2,size=(10000, 1))
83+
84+
## initialize codeflare pipeline by first creating the nodes
85+
pipeline = dm.Pipeline()
86+
node_a = dm.EstimatorNode('a', MinMaxScaler())
87+
node_b = dm.EstimatorNode('b', StandardScaler())
88+
node_c = dm.EstimatorNode('c', MaxAbsScaler())
89+
node_d = dm.EstimatorNode('d', RobustScaler())
90+
91+
node_e = dm.AndNode('e', FeatureUnion())
92+
93+
## codeflare nodes are then connected by edges
94+
pipeline.add_edge(node_a, node_e)
95+
pipeline.add_edge(node_b, node_e)
96+
pipeline.add_edge(node_c, node_e)
97+
pipeline.add_edge(node_d, node_e)
98+
99+
pipeline_input = dm.PipelineInput()
100+
xy = dm.Xy(X,y)
101+
pipeline_input.add_xy_arg(node_a, xy)
102+
pipeline_input.add_xy_arg(node_b, xy)
103+
pipeline_input.add_xy_arg(node_c, xy)
104+
pipeline_input.add_xy_arg(node_d, xy)
105+
106+
## execute the codeflare pipeline
107+
pipeline_output = rt.execute_pipeline(pipeline, ExecutionType.FIT, pipeline_input)
108+
109+
## retrieve node e
110+
node_e_output = pipeline_output.get_xyrefs(node_e)
111+
Xout = ray.get(node_e_output[0].get_Xref())
112+
yout = ray.get(node_e_output[0].get_yref())
113+
114+
assert Xout.shape == (40000, 4)
115+
assert yout.shape == (10000, 1)
116+
117+
ray.shutdown()
118+
119+
def test_two_input_and():
27120

28121
ray.shutdown()
29122
ray.init()
@@ -52,7 +145,11 @@ def test_and():
52145

53146
## retrieve node c
54147
node_c_output = pipeline_output.get_xyrefs(node_c)
55-
assert node_c_output
148+
Xout = ray.get(node_c_output[0].get_Xref())
149+
yout = ray.get(node_c_output[0].get_yref())
150+
151+
assert Xout.shape == (20000, 4)
152+
assert yout.shape == (10000, 1)
56153

57154
ray.shutdown()
58155

codeflare/pipelines/tests/test_multibranch.py

Lines changed: 79 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from sklearn.pipeline import Pipeline
88
from sklearn.preprocessing import StandardScaler, MinMaxScaler
99
from sklearn.tree import DecisionTreeClassifier
10+
from sklearn.linear_model import LogisticRegression
1011
import codeflare.pipelines.Datamodel as dm
1112
import codeflare.pipelines.Runtime as rt
1213
from codeflare.pipelines.Datamodel import Xy
@@ -19,15 +20,16 @@ def __init__(self):
1920

2021
def transform(self, xy_list):
2122
X_list = []
22-
y_list = []
23+
y_vec = None
2324

2425
for xy in xy_list:
2526
X_list.append(xy.get_x())
26-
X_concat = np.concatenate(X_list, axis=0)
27+
y_vec = xy.get_y()
28+
X_concat = np.concatenate(X_list, axis=1)
2729

28-
return Xy(X_concat, None)
30+
return Xy(X_concat, y_vec.values.ravel())
2931

30-
def test_multibranch():
32+
def test_multibranch_1():
3133

3234
ray.shutdown()
3335
ray.init()
@@ -48,19 +50,70 @@ def test_multibranch():
4850

4951
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
5052

51-
## create two decision tree classifiers with different depth limit
52-
c_a = DecisionTreeClassifier(max_depth=3)
53-
c_b = DecisionTreeClassifier(max_depth=5)
53+
## initialize codeflare pipeline by first creating the nodes
54+
pipeline = dm.Pipeline()
55+
56+
node_a = dm.EstimatorNode('preprocess', preprocessor)
57+
node_b = dm.EstimatorNode('s_b', MinMaxScaler())
58+
node_c = dm.AndNode('s_c', FeatureUnion())
59+
node_d = dm.EstimatorNode('c_d', LogisticRegression())
60+
node_e = dm.EstimatorNode('c_e', DecisionTreeClassifier(max_depth=3))
61+
62+
## codeflare nodes are then connected by edges
63+
pipeline.add_edge(node_a, node_b)
64+
pipeline.add_edge(node_b, node_c)
65+
pipeline.add_edge(node_c, node_d)
66+
pipeline.add_edge(node_c, node_e)
67+
68+
pipeline_input = dm.PipelineInput()
69+
xy = dm.Xy(X_train, y_train)
70+
pipeline_input.add_xy_arg(node_a, xy)
71+
72+
## execute the codeflare pipeline
73+
pipeline_output = rt.execute_pipeline(pipeline, ExecutionType.FIT, pipeline_input)
74+
75+
## retrieve node e
76+
node_e_output = pipeline_output.get_xyrefs(node_e)
77+
Xout = ray.get(node_e_output[0].get_Xref())
78+
yout = ray.get(node_e_output[0].get_yref())
79+
80+
assert Xout.shape[0] == 8000
81+
assert yout.shape[0] == 8000
82+
83+
ray.shutdown()
84+
85+
def test_multibranch_2():
86+
87+
ray.shutdown()
88+
ray.init()
89+
90+
## prepare the data
91+
X = pd.DataFrame(np.random.randint(0,100,size=(10000, 4)), columns=list('ABCD'))
92+
y = pd.DataFrame(np.random.randint(0,2,size=(10000, 1)), columns=['Label'])
93+
94+
numeric_features = X.select_dtypes(include=['int64']).columns
95+
numeric_transformer = Pipeline(steps=[
96+
('scaler', StandardScaler())])
97+
98+
## set up preprocessor as StandardScaler
99+
preprocessor = ColumnTransformer(
100+
transformers=[
101+
('num', numeric_transformer, numeric_features),
102+
])
103+
104+
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
54105

55106
## initialize codeflare pipeline by first creating the nodes
56107
pipeline = dm.Pipeline()
108+
57109
node_a = dm.EstimatorNode('preprocess', preprocessor)
58-
node_b = dm.EstimatorNode('c_a', c_a)
59-
node_c = dm.EstimatorNode('c_b', c_b)
110+
node_b = dm.EstimatorNode('c_a', DecisionTreeClassifier(max_depth=3))
111+
node_c = dm.EstimatorNode('c_b', LogisticRegression())
60112

61-
node_d = dm.EstimatorNode('d', MinMaxScaler())
62-
node_e = dm.EstimatorNode('e', StandardScaler())
63-
node_f = dm.AndNode('f', FeatureUnion())
113+
node_d = dm.EstimatorNode('s_d', MinMaxScaler())
114+
node_e = dm.EstimatorNode('s_e', StandardScaler())
115+
node_f = dm.AndNode('s_f', FeatureUnion())
116+
node_g = dm.EstimatorNode('c_g', DecisionTreeClassifier(max_depth=5))
64117

65118
## codeflare nodes are then connected by edges
66119
pipeline.add_edge(node_a, node_b)
@@ -70,18 +123,30 @@ def test_multibranch():
70123
pipeline.add_edge(node_a, node_e)
71124
pipeline.add_edge(node_d, node_f)
72125
pipeline.add_edge(node_e, node_f)
126+
pipeline.add_edge(node_f, node_g)
73127

74128
pipeline_input = dm.PipelineInput()
75129
xy = dm.Xy(X_train, y_train)
76130
pipeline_input.add_xy_arg(node_a, xy)
77131

78132
## execute the codeflare pipeline
79133
pipeline_output = rt.execute_pipeline(pipeline, ExecutionType.FIT, pipeline_input)
80-
assert pipeline_output
81134

82135
## retrieve node b
83136
node_b_output = pipeline_output.get_xyrefs(node_b)
84-
assert node_b_output
137+
Xout = ray.get(node_b_output[0].get_Xref())
138+
yout = ray.get(node_b_output[0].get_yref())
139+
140+
assert Xout.shape[0] == 8000
141+
assert yout.shape[0] == 8000
142+
143+
## retrieve node g
144+
node_g_output = pipeline_output.get_xyrefs(node_g)
145+
Xout = ray.get(node_g_output[0].get_Xref())
146+
yout = ray.get(node_g_output[0].get_yref())
147+
148+
assert Xout.shape[0] == 8000
149+
assert yout.shape[0] == 8000
85150

86151
ray.shutdown()
87152

codeflare/pipelines/tests/test_or.py

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@
77
from sklearn.pipeline import Pipeline
88
from sklearn.preprocessing import StandardScaler
99
from sklearn.tree import DecisionTreeClassifier
10+
from sklearn.linear_model import LogisticRegression
11+
from sklearn.ensemble import RandomForestClassifier
1012
import codeflare.pipelines.Datamodel as dm
1113
import codeflare.pipelines.Runtime as rt
1214
from codeflare.pipelines.Datamodel import Xy
1315
from codeflare.pipelines.Datamodel import XYRef
1416
from codeflare.pipelines.Runtime import ExecutionType
1517

16-
def test_or():
18+
def test_four_input_or():
1719

1820
ray.shutdown()
1921
ray.init()
@@ -33,7 +35,65 @@ def test_or():
3335
])
3436

3537
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
36-
38+
39+
## create two decision tree classifiers with different depth limit
40+
c_a = DecisionTreeClassifier(max_depth=3)
41+
c_b = DecisionTreeClassifier(max_depth=5)
42+
c_c = LogisticRegression()
43+
c_d = RandomForestClassifier(max_depth=5)
44+
45+
## initialize codeflare pipeline by first creating the nodes
46+
pipeline = dm.Pipeline()
47+
node_a = dm.EstimatorNode('preprocess', preprocessor)
48+
node_b = dm.EstimatorNode('c_a', c_a)
49+
node_c = dm.EstimatorNode('c_b', c_b)
50+
node_d = dm.EstimatorNode('c_c', c_c)
51+
node_e = dm.EstimatorNode('c_d', c_d)
52+
53+
## codeflare nodes are then connected by edges
54+
pipeline.add_edge(node_a, node_b)
55+
pipeline.add_edge(node_a, node_c)
56+
pipeline.add_edge(node_a, node_d)
57+
pipeline.add_edge(node_a, node_e)
58+
59+
pipeline_input = dm.PipelineInput()
60+
xy = dm.Xy(X_train, y_train)
61+
pipeline_input.add_xy_arg(node_a, xy)
62+
63+
pipeline_output = rt.execute_pipeline(pipeline, ExecutionType.FIT, pipeline_input)
64+
65+
node_e_output = pipeline_output.get_xyrefs(node_e)
66+
67+
Xout = ray.get(node_e_output[0].get_Xref())
68+
yout = ray.get(node_e_output[0].get_yref())
69+
70+
assert Xout.shape[0] == 8000
71+
assert yout.shape[0] == 8000
72+
73+
ray.shutdown()
74+
75+
76+
def test_two_input_or():
77+
78+
ray.shutdown()
79+
ray.init()
80+
81+
## prepare the data
82+
X = pd.DataFrame(np.random.randint(0,100,size=(10000, 4)), columns=list('ABCD'))
83+
y = pd.DataFrame(np.random.randint(0,2,size=(10000, 1)), columns=['Label'])
84+
85+
numeric_features = X.select_dtypes(include=['int64']).columns
86+
numeric_transformer = Pipeline(steps=[
87+
('scaler', StandardScaler())])
88+
89+
## set up preprocessor as StandardScaler
90+
preprocessor = ColumnTransformer(
91+
transformers=[
92+
('num', numeric_transformer, numeric_features),
93+
])
94+
95+
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
96+
3797
## create two decision tree classifiers with different depth limit
3898
c_a = DecisionTreeClassifier(max_depth=3)
3999
c_b = DecisionTreeClassifier(max_depth=5)
@@ -57,7 +117,11 @@ def test_or():
57117
node_b_output = pipeline_output.get_xyrefs(node_b)
58118
node_c_output = pipeline_output.get_xyrefs(node_c)
59119

60-
assert node_b_output
120+
Xout = ray.get(node_b_output[0].get_Xref())
121+
yout = ray.get(node_b_output[0].get_yref())
122+
123+
assert Xout.shape[0] == 8000
124+
assert yout.shape[0] == 8000
61125

62126
ray.shutdown()
63127

0 commit comments

Comments
 (0)