-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathanalysis_functions.py
346 lines (281 loc) · 10.8 KB
/
analysis_functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
# from numpy.linalg import LinAlgError
import numpy as np
import pandas as pd
import statsmodels.api as sm
import scipy as sp
import os
import errno
def sample_corr(x1, x2, alpha=0.05, verbose=True, return_result=False):
w, normal_1 = sp.stats.shapiro(x1)
w, normal_2 = sp.stats.shapiro(x2)
if (normal_1 < alpha) or (normal_2 < alpha):
r, p = sp.stats.spearmanr(x1, x2)
if verbose:
print('Normality assumption violated: True')
print('spearman r = {}, p = {}'.format(r, p))
else:
r, p = sp.stats.pearsonr(x1, x2)
if verbose:
print('pearson r = {}, p = {}'.format(r, p))
if return_result:
return r, p
def ind_sample_comp(x1, x2, alpha=0.05):
# count N
N = x1.shape[0] + x2.shape[0]
df = N-2
# test for equal variance
unequal_variance = False
w, p = sp.stats.levene(x1, x2)
if p < alpha:
unequal_variance = True
print('Equal variance assumption violated: {}'.format(unequal_variance))
# test for normal variance
non_normal_variance = False
w1, p1 = sp.stats.shapiro(x1)
w2, p2 = sp.stats.shapiro(x2)
if p1 < alpha:
non_normal_variance = True
if p2 < alpha:
non_normal_variance = True
print('Normality assumption violated: {}'.format(non_normal_variance))
# compute test statistic
if (not unequal_variance) & (not non_normal_variance):
# statistic
t, p = sp.stats.ttest_ind(x1, x2)
# effect size
d = (np.mean(x1) - np.mean(x2)) / \
np.sqrt((np.var(x1) + np.var(x2)) / 2.)
# 95% CI
CI = [(np.mean(x1) - np.mean(x2)) - sp.stats.t.ppf(1-(alpha/2.), df) * np.sqrt((np.var(x1)/x1.shape[0]) + (np.var(x2)/x2.shape[0])),
(np.mean(x1) - np.mean(x2)) + sp.stats.t.ppf(1-(alpha/2.), df) * np.sqrt((np.var(x1)/x1.shape[0]) + (np.var(x2)/x2.shape[0]))]
test_indicator = 't'
elif (unequal_variance) * (not non_normal_variance):
# statistic
t, p = sp.stats.ttest_ind(x1, x2, equal_var=False)
# effect size
d = (np.mean(x1) - np.mean(x2)) / \
np.sqrt((np.var(x1) + np.var(x2)) / 2.)
# 95% CI
CI = [(np.mean(x1) - np.mean(x2)) - sp.stats.t.ppf(1-(alpha/2.), df) * np.sqrt((np.var(x1)/x1.shape[0]) + (np.var(x2)/x2.shape[0])),
(np.mean(x1) - np.mean(x2)) + sp.stats.t.ppf(1-(alpha/2.), df) * np.sqrt((np.var(x1)/x1.shape[0]) + (np.var(x2)/x2.shape[0]))]
test_indicator = 't'
else:
# statistic
t, p = sp.stats.mannwhitneyu(x1, x2)
# effect size
d = 1 - (2*t / (x1.shape[0] * x2.shape[0]))
# 95% CI
CI = [None, None]
test_indicator = 'U'
if test_indicator != 'U':
print('X1 - X2: {}({}) = {}, p = {}, d = {}, CI = [{}, {}]'.format(
test_indicator, df, t, p, d, CI[0], CI[1]))
else:
print('X1 - X2: {} = {}, p = {}, d = {}'.format(
test_indicator, np.int(t), p, d))
def compute_gaze_influence_score(data, n_items=None):
"""
Compute gaze influence score for each
subject in the data;
Gaze influence score is defined
as the average difference between
the corrected choice probability
of all positive and negative relative gaze values
(see manuscript).
Input
---
data (dataframe):
aggregate response data
Returns
---
array of single-subject gaze influence scores
"""
import statsmodels.api as sm
data = data.copy()
choice = np.zeros((data.shape[0], n_items))
choice[np.arange(data.shape[0]), data['choice'].values.astype('int32')] = 1
# compute value measures
gaze = data[['gaze_{}'.format(i) for i in range(n_items)]].values
rel_gaze = np.zeros_like(gaze)
values = data[['item_value_{}'.format(i) for i in range(n_items)]].values
rel_values = np.zeros_like(values)
value_range = np.zeros_like(values)
for t in range(values.shape[0]):
for i in range(n_items):
index = np.where(np.arange(n_items) != i)
rel_gaze[t, i] = gaze[t, i] - np.mean(gaze[t, index])
rel_values[t, i] = values[t, i] - np.mean(values[t, index])
value_range[t, i] = np.max(
values[t, index]) - np.min(values[t, index])
# create new dataframe (long format, one row per item)
data_long = pd.DataFrame(dict(subject=np.repeat(data['subject'].values, n_items),
is_choice=choice.ravel(),
value=values.ravel(),
rel_value=rel_values.ravel(),
value_range_others=value_range.ravel(),
rel_gaze=rel_gaze.ravel(),
gaze_pos=np.array(
rel_gaze.ravel() > 0, dtype=np.bool),
gaze=gaze.ravel()))
# estimate value-based choice prob.
# for each individual and subtract
# from empirical choice
data_out_list = []
for s, subject in enumerate(data['subject'].unique()):
subject_data = data_long[data_long['subject'] == subject].copy()
# Only add range of other items if more than 2 items (otherwise it's just another constant, resulting in a singular matrix)
if n_items > 2:
X = subject_data[['rel_value', 'value_range_others']]
else:
X = subject_data[['rel_value']]
X = sm.add_constant(X)
y = subject_data['is_choice']
logit = sm.Logit(y, X)
result = logit.fit(disp=0, maxiter=100) # method='lbfgs',
predicted_pchoose = result.predict(X)
subject_data['corrected_choice'] = subject_data['is_choice'] - \
predicted_pchoose
data_out_list.append(subject_data)
data_out = pd.concat(data_out_list)
# compute corrected psychometric, given gaze
tmp = data_out.groupby(['subject', 'gaze_pos']
).corrected_choice.mean().unstack()
gaze_influence_score = (tmp[True] - tmp[False]).values
return gaze_influence_score
def compute_mean_rt(df):
"""
Computes subject wise mean RT
"""
return df.groupby('subject').rt.mean().values
def compute_p_choose_best(df):
"""
Computes subject wise P(choose best)
"""
if 'best_chosen' not in df.columns:
df = add_best_chosen(df)
return df.groupby('subject').best_chosen.mean().values
def add_best_chosen(df):
"""
Adds 'best_chosen' variable to DataFrame,
independent of number of items (works with nan columns)
"""
df = df.copy()
values = df[[c for c in df.columns if c.startswith('item_value_')]].values
choices = df['choice'].values.astype(np.int)
best_chosen = (values[np.arange(choices.size), choices]
== np.nanmax(values, axis=1)).astype(int)
df['best_chosen'] = best_chosen
return df
def run_linear_model(x, y, verbose=True):
X = sm.add_constant(x)
lm = sm.OLS(y, X).fit()
if verbose:
print(lm.summary())
print('Slope = {:.2f}'.format(lm.params[-1]))
print('t({:d}) = {:.2f}'.format(int(lm.df_resid), lm.tvalues[-1]))
print('P = {:.10f}'.format(lm.pvalues[-1]))
return lm
def q1(series):
q1 = series.quantile(0.25)
return q1
def q3(series):
q3 = series.quantile(0.75)
return q3
def iqr(series):
Q1 = series.quantile(0.25)
Q3 = series.quantile(0.75)
IQR = Q3 - Q1
return IQR
def std(series):
sd = series.std(ddof=0)
return sd
def se(series):
n = len(series)
se = series.std() / np.sqrt(n)
return se
def sci_notation(num, exact=False, decimal_digits=1, precision=None, exponent=None):
"""
exact keyword toggles between
- exact reporting with coefficient
- reporting next higher power of 10 for P < 10^XX reporting
https://stackoverflow.com/a/18313780
Returns a string representation of the scientific
notation of the given number formatted for use with
LaTeX or Mathtext, with specified number of significant
decimal digits and precision (number of decimal digits
to show). The exponent to be used can also be specified
explicitly.
"""
from math import floor, log10
if not exponent:
exponent = int(floor(log10(abs(num))))
coeff = round(num / float(10**exponent), decimal_digits)
if not precision:
precision = decimal_digits
if exact:
return r"${0:.{2}f}\times10^{{{1:d}}}$".format(coeff, exponent, precision)
else:
return r"$10^{{{0}}}$".format(exponent+1)
def write_summary(lm, filename):
"""
Write statsmodels lm summary as to file as csv.
"""
predictors = lm.model.exog_names
tvals = lm.tvalues
pvals = lm.pvalues
betas = lm.params
se = lm.bse
ci = lm.conf_int()
r2 = lm.rsquared
r2a = lm.rsquared_adj
aic = lm.aic
bic = lm.bic
ll = lm.llf
F = lm.fvalue
df = lm.df_resid
n = lm.nobs
table = pd.DataFrame(dict(predictors=predictors,
tvals=tvals,
pvals=pvals,
betas=betas,
se=se,
ci0=ci[0],
ci1=ci[1],
df=df,
n=n,
r2=r2))
table.to_csv(filename)
def make_sure_path_exists(path):
"""
Used to check or create existing folder structure for results.
https://stackoverflow.com/a/5032238
"""
try:
os.makedirs(path)
except OSError as exception:
if exception.errno != errno.EEXIST:
raise
def aggregate_subject_level(data, n_items):
"""
Aggregates a single dataset to subject level
"""
data = data.copy()
# add best chosen variable
data = add_best_chosen(data)
# Summarize variables
subject_summary = data.groupby('subject').agg({'rt': ['mean', std, 'min', 'max', se, q1, q3, iqr],
'best_chosen': 'mean'})
# Influence of gaze on P(choose left)
subject_summary['gaze_influence'] = compute_gaze_influence_score(data, n_items=n_items)
subject_summary['dataset'] = data.groupby('subject')['dataset'].head(1).values
return subject_summary
def aggregate_group_level(subject_summary):
"""
Aggregates a subject summary to group level
"""
group_summary = subject_summary.agg({('rt', 'mean'): ['mean', std, 'min', 'max', se, iqr],
('best_chosen', 'mean'): ['mean', std, 'min', 'max', se, iqr],
'gaze_influence': ['mean', std, 'min', 'max', se, iqr]})
group_summary = group_summary[[('rt', 'mean'), ('best_chosen', 'mean'), ('gaze_influence')]].copy()
group_summary.columns = ['Mean RT', 'P(choose best)', 'Gaze Influence']
return group_summary.T