forked from tianxie1995/Driver-Identification
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRDclassify_CV.py
296 lines (260 loc) · 10.9 KB
/
RDclassify_CV.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
import os
import json
import numpy as np
import matplotlib.pyplot as plt
import initiate.config as config
import random
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import classification_report
def load_json_file(folder, file_name):
file_name = os.path.join(folder, file_name)
with open(file_name, "r") as json_file:
travel_distance_frequency_dict = json.load(json_file)
return travel_distance_frequency_dict
def driving_date_figure(driving_time_dict):
"""
Show How many days each driver drives their vehicle. Used to
determine train/test data ratio
:return:
"""
driver_num = len(driving_time_dict.keys())
driver_idx = 0
driving_date_matrix = None
for driver_id in driving_time_dict.keys():
local_driving_dict = driving_time_dict[driver_id]
for time_string in local_driving_dict.keys():
if driving_date_matrix is None:
driving_date_matrix = np.zeros((driver_num, 30))
date_idx = int(time_string.split("-")[-1]) - 1
driving_date_matrix[driver_idx, date_idx] = 1
driver_idx += 1
# plt.figure(figsize=[6, 7])
plt.imshow(driving_date_matrix, cmap="gray_r")
plt.colorbar()
plt.title("Driving date display")
plt.xlabel("Day in a month")
plt.ylabel("Driver sequence")
save_figure_path = os.path.join(
config.figures_folder, config.Learn2classify_test_folder)
if not os.path.exists(save_figure_path):
os.mkdir(save_figure_path)
plt.savefig(os.path.join(save_figure_path, "driving date test" + ".png"))
plt.close()
def data_process_for_classify(link_level_dict, driving_time_dict):
"""
Construct each driver's daily driving data as a vector for SVD classify
:param link_level_dict:
:return:
"""
# construct total_link_set
total_link_set = []
for driver_id in link_level_dict.keys():
local_driving_dict = link_level_dict[driver_id]
for time_string in local_driving_dict.keys():
local_driver_link_dict = local_driving_dict[time_string]
for link_string in local_driver_link_dict.keys():
local_link = link_string
if local_link not in total_link_set:
total_link_set.append(link_string)
# construct rd_dict
RD_dict = {}
for driver_id in link_level_dict.keys():
local_driving_dict = link_level_dict[driver_id]
if driver_id not in RD_dict.keys():
RD_dict[driver_id] = {}
for date in local_driving_dict.keys():
local_driver_link_dict = local_driving_dict[date]
if date not in RD_dict[driver_id].keys():
RD_dict[driver_id][date] = []
for link_set_string in total_link_set:
if link_set_string in local_driver_link_dict.keys():
distance = local_driver_link_dict[link_set_string]['distance']
RD_dict[driver_id][date].append(distance)
else:
RD_dict[driver_id][date].append(0.0)
for driver_id in driving_time_dict.keys():
if driver_id not in RD_dict.keys():
print(
"Error! driver id not in RD_dict. driving_time_dict not match with link_level_dict!")
exit()
local_driving_dict = driving_time_dict[driver_id]
svd_driver_dict = RD_dict[driver_id]
for date in local_driving_dict.keys():
local_driving_time_list = local_driving_dict[date]
# some driver may have driving time but don't have link info
# ************* this problem needs further discuss *******************
if date not in svd_driver_dict.keys():
# print(date)
# print(driver_id)
# print("Error! driver date not in RD_dict. driving_time_dict not match with link_level_dict!")
# exit()
zero_distance_in_all_link_set = [0.0] * len(total_link_set)
RD_dict[driver_id][date] = zero_distance_in_all_link_set
for travel_time in local_driving_time_list:
RD_dict[driver_id][date].append(travel_time)
return RD_dict
def RandomForest(RD_dict, experiment_num=100, plot_correction_flag=True):
"""
:param RD_dict:
:param experiment_num:
:param plot_correction_flag:
:return:
"""
# Construct driver_id--label dict and driver_num
driver_num = len(RD_dict.keys())
driver_id2label_dict = {}
count = 0
for driver_id in RD_dict.keys():
driver_id2label_dict[driver_id] = count
count += 1
# Construct SVD matrix
data_list = []
label_list = []
for driver_id in RD_dict.keys():
local_driving_dict = RD_dict[driver_id]
for date in local_driving_dict.keys():
sample_driving_data = local_driving_dict[date]
data_list.append(sample_driving_data)
label_list.append(driver_id2label_dict[driver_id])
svd_matrix = (np.array(data_list)).transpose()
Sigma = np.cov(svd_matrix.T, rowvar=False)
D, V = np.linalg.eig(Sigma)
k = 200
projecting_v = V[:, np.argsort(D)[::-1][:k]]
print('projecting_v.shape:', projecting_v.shape)
#svd_matrix = svd_matrix.T.dot(projecting_v).T
svd_matrix_train, svd_matrix_test, label_train, label_test = \
divide_data_2_train_test(RD_dict, svd_matrix, label_list)
# Set the parameters by cross-validation
tuned_parameters = {'criterion': ['gini'],
'n_estimators': np.arange(5, 51, 5),
'max_depth': np.arange(16, 40, 5),
'min_samples_split': np.arange(2, 11, 2),
'random_state': [0]
}
clf = GridSearchCV(RandomForestClassifier(), tuned_parameters, cv=10)
clf.fit(svd_matrix_train.transpose(), label_train)
print("Best parameters set found on development set:")
print()
print(clf.best_params_)
print()
print("Grid scores on development set:")
print()
means = clf.cv_results_['mean_test_score']
stds = clf.cv_results_['std_test_score']
for mean, std, params in zip(means, stds, clf.cv_results_['params']):
print("%0.3f (+/-%0.03f) for %r"
% (mean, std * 2, params))
print()
print("Detailed classification report:")
print()
print("The model is trained on the full development set.")
print("The scores are computed on the full evaluation set.")
print()
y_true, y_pred = label_test, clf.predict(svd_matrix_test.transpose())
print(classification_report(y_true, y_pred))
print()
def test(correction_table):
np.savetxt("correction tabel RD.csv", correction_table, delimiter=",",
fmt='%10.5f')
def plot_accuracy_figure(pcorr_list):
mean_accuracy = [sum(pcorr_list) / len(pcorr_list)] * \
(len(pcorr_list) + 10)
print("mean accuracy", mean_accuracy[0])
x = [i for i in np.arange(0, 100, 1)]
x1 = [i for i in np.arange(-5, 105, 1)]
f1 = plt.plot(x, pcorr_list, color='k')
f2 = plt.plot(x1, mean_accuracy, 'b--')
plt.legend(f2, "mean accuracy")
plt.title("Accuracy in 100 times experiment, # of test data: 11")
# plt.grid(linewidth=0.3)
plt.xlabel("experiment idx")
plt.ylabel("accuracy")
# plt.show()
save_figure_path = os.path.join(
config.figures_folder, config.Learn2classify_test_folder)
if not os.path.exists(save_figure_path):
os.mkdir(save_figure_path)
plt.savefig(os.path.join(save_figure_path,
"Random Forest 11 test data " + ".png"))
plt.close()
def plot_correction_table(correction_table):
plt.figure(figsize=[14, 4])
plt.imshow(correction_table, cmap='gray', aspect='auto')
plt.colorbar(ticks=range(2), label="classify correct or not")
plt.title("1:correct 0:incorrect")
plt.xlabel("Experiment idx")
plt.ylabel("Driver idx")
save_figure_path = os.path.join(
config.figures_folder, config.Learn2classify_test_folder)
if not os.path.exists(save_figure_path):
os.mkdir(save_figure_path)
plt.savefig(os.path.join(save_figure_path,
"Random Forest correction table" + ".png"))
plt.close()
def divide_data_2_train_test(RD_dict, svd_matrix, label_list):
# divid svd_matrix into train matrix and test matrix
driver_num = len(RD_dict.keys())
test_idx = [] # store the index of test data
svd_matrix_test = None
for i in range(driver_num):
data_idx = [j for j, x in enumerate(label_list) if x == i]
start_idx = data_idx[0]
end_idx = data_idx[-1]
idx_list = np.arange(start_idx, end_idx + 1, 1)
# randomly choose one data as test sample
chosen_test_idx = random.choice(idx_list)
test_idx.append(chosen_test_idx)
dim = svd_matrix[:, 0].shape[0]
local_data = np.reshape(svd_matrix[:, chosen_test_idx], (dim, 1))
if svd_matrix_test is None:
svd_matrix_test = local_data
else:
svd_matrix_test = np.hstack((svd_matrix_test, local_data))
svd_matrix_train = np.delete(svd_matrix, test_idx, 1)
label_train = label_list[:]
label_test = []
for x in test_idx[::-1]:
label_test.append(label_list[x])
label_train = label_train[:x] + label_train[x + 1:]
label_test = label_test[::-1]
return svd_matrix_train, svd_matrix_test, label_train, label_test
def predict_RandomForest(classifier, test_matrix):
predicted_label = classifier.predict(test_matrix)
print(predicted_label)
return predicted_label
def pcorrect(predicted_label, test_label):
count = 0
for i in range(len(predicted_label)):
predict = predicted_label[i]
real = test_label[i]
if predict == real:
count += 1
else:
continue
percent = count / len(predicted_label)
return percent
def RDclassify(folder):
# load the travel distance and frequency data
# travel_distance_frequency_dict = load_json_file(folder, config.grid_travel_info)
# print("Now plot the travel distance and frequency figures...")
# # generate_grid_travel_figures(travel_distance_frequency_dict)
#
# # load the driving time data
all_driving_time_dict = load_json_file(folder, config.driving_time_info)
# print(driving_time_dict['10125'].keys())
# load the link-level data
all_link_level_dict = load_json_file(folder, config.link_level)
# print(link_level_dict['10125'].keys())
driving_time_dict = {}
link_level_dict = {}
for key in all_driving_time_dict.keys():
if len(all_driving_time_dict[key].keys()) > 10:
driving_time_dict[key] = all_driving_time_dict[key]
link_level_dict[key] = all_link_level_dict[key]
RD_dict = data_process_for_classify(
link_level_dict, driving_time_dict)
RandomForest(RD_dict, experiment_num=100)
if __name__ == '__main__':
RDclassify("ann_arbor")