Skip to content

Commit 6506ada

Browse files
Adding a test for save/load of a selected pipeline; this fixed a bug in the PREDICT pipeline, which was incorrect earlier. The issue was that a X object was
being treated as XRef, leading to pointer messup... that is fixed now!
1 parent 35a3358 commit 6506ada

File tree

3 files changed

+95
-9
lines changed

3 files changed

+95
-9
lines changed

codeflare/pipelines/Datamodel.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,18 @@ def get_terminal_nodes(self):
383383
terminal_nodes.append(node)
384384
return terminal_nodes
385385

386+
def get_nodes(self):
387+
nodes = {}
388+
for node in self.__pre_graph__.keys():
389+
nodes[node.get_node_name()] = node
390+
return nodes
391+
392+
def get_pre_nodes(self, node):
393+
return self.__pre_graph__[node]
394+
395+
def get_post_nodes(self, node):
396+
return self.__post_graph__[node]
397+
386398
def save(self, filehandle):
387399
nodes = {}
388400
edges = []

codeflare/pipelines/Runtime.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
from enum import Enum
99

1010
from queue import SimpleQueue
11-
import pickle5 as pickle
1211

1312

1413
class ExecutionType(Enum):
@@ -59,11 +58,11 @@ def execute_or_node_remote(node: dm.EstimatorNode, mode: ExecutionType, xy_ref:
5958
elif mode == ExecutionType.PREDICT:
6059
# Test mode does not clone as it is a simple predict or transform
6160
if base.is_classifier(estimator) or base.is_regressor(estimator):
62-
res_Xref = estimator.predict(X)
61+
res_Xref = ray.put(estimator.predict(X))
6362
result = dm.XYRef(res_Xref, xy_ref.get_yref())
6463
return result
6564
else:
66-
res_Xref = estimator.transform(X)
65+
res_Xref = ray.put(estimator.transform(X))
6766
result = dm.XYRef(res_Xref, xy_ref.get_yref())
6867
return result
6968

codeflare/pipelines/tests/test_save_load.py

Lines changed: 81 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
1-
import pytest
2-
31
import codeflare.pipelines.Datamodel as dm
42
import codeflare.pipelines.Runtime as rt
53

64
import numpy as np
7-
from sklearn.preprocessing import FunctionTransformer
85
from sklearn.preprocessing import MinMaxScaler
96
import os
7+
import pandas as pd
8+
from sklearn.pipeline import Pipeline
9+
from sklearn.impute import SimpleImputer
10+
from sklearn.preprocessing import StandardScaler, OneHotEncoder
11+
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
12+
13+
import ray
1014

1115

1216
class FeatureUnion(dm.AndTransform):
@@ -47,8 +51,7 @@ def test_save_load():
4751
r_fh = open(fname, 'rb')
4852
saved_pipeline = dm.Pipeline.load(r_fh)
4953
pre_edges = saved_pipeline.get_pre_edges(node_c)
50-
assert(len(pre_edges) == 2)
51-
54+
assert (len(pre_edges) == 2)
5255
os.remove(fname)
5356

5457

@@ -58,4 +61,76 @@ def test_runtime_save_load():
5861
captured accurately
5962
:return:
6063
"""
61-
64+
train = pd.read_csv('../../../resources/data/train_ctrUa4K.csv')
65+
train = train.drop('Loan_ID', axis=1)
66+
67+
X = train.drop('Loan_Status', axis=1)
68+
y = train['Loan_Status']
69+
from sklearn.model_selection import train_test_split
70+
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
71+
imputer = SimpleImputer(strategy='median')
72+
scaler = StandardScaler()
73+
74+
numeric_transformer = Pipeline(steps=[
75+
('imputer', imputer),
76+
('scaler', scaler)])
77+
78+
cat_imputer = SimpleImputer(strategy='constant', fill_value='missing')
79+
cat_onehot = OneHotEncoder(handle_unknown='ignore')
80+
81+
categorical_transformer = Pipeline(steps=[
82+
('imputer', cat_imputer),
83+
('onehot', cat_onehot)])
84+
numeric_features = train.select_dtypes(include=['int64', 'float64']).columns
85+
categorical_features = train.select_dtypes(include=['object']).drop(['Loan_Status'], axis=1).columns
86+
from sklearn.compose import ColumnTransformer
87+
preprocessor = ColumnTransformer(
88+
transformers=[
89+
('num', numeric_transformer, numeric_features),
90+
('cat', categorical_transformer, categorical_features)])
91+
92+
classifiers = [
93+
RandomForestClassifier(),
94+
GradientBoostingClassifier()
95+
]
96+
pipeline = dm.Pipeline()
97+
node_pre = dm.EstimatorNode('preprocess', preprocessor)
98+
node_rf = dm.EstimatorNode('random_forest', classifiers[0])
99+
node_gb = dm.EstimatorNode('gradient_boost', classifiers[1])
100+
101+
pipeline.add_edge(node_pre, node_rf)
102+
pipeline.add_edge(node_pre, node_gb)
103+
104+
import ray
105+
ray.shutdown()
106+
ray.init()
107+
pipeline_input = dm.PipelineInput()
108+
xy = dm.Xy(X_train, y_train)
109+
pipeline_input.add_xy_arg(node_pre, xy)
110+
111+
pipeline_output = rt.execute_pipeline(pipeline, rt.ExecutionType.FIT, pipeline_input)
112+
node_rf_xyrefs = pipeline_output.get_xyrefs(node_rf)
113+
114+
# save this pipeline for random forest and load and then predict on test data
115+
fname = 'random_forest.cfp'
116+
w_fh = open(fname, 'wb')
117+
rt.save(pipeline_output, node_rf_xyrefs[0], w_fh)
118+
w_fh.close()
119+
120+
# load it
121+
r_fh = open(fname, 'rb')
122+
saved_pipeline = dm.Pipeline.load(r_fh)
123+
nodes = saved_pipeline.get_nodes()
124+
# this should not exist in the saved pipeline
125+
assert(node_gb.get_node_name() not in nodes.keys())
126+
127+
# should be preditable as well
128+
predict_pipeline_input = dm.PipelineInput()
129+
predict_pipeline_input.add_xy_arg(node_pre, dm.Xy(X_test, y_test))
130+
try:
131+
predict_pipeline_output = rt.execute_pipeline(saved_pipeline, rt.ExecutionType.PREDICT, predict_pipeline_input)
132+
predict_pipeline_output.get_xyrefs(node_rf)
133+
except Exception:
134+
assert False
135+
136+
os.remove(fname)

0 commit comments

Comments
 (0)