-
Notifications
You must be signed in to change notification settings - Fork 0
/
run.py
241 lines (196 loc) · 9.63 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
from copy import deepcopy
from random import seed
import joblib
import numpy as np
from secml.ml.peval.metrics import CMetricAccuracy
from attacks import custom_log_reg_poisoning_attack
from attacks import log_reg_poisoning_attack
from commons import SEED
from metrics import average_odds_difference
from metrics import disparate_impact
from metrics import error_rates
from plot import plot_2d_decision_boundary
from scenario import construct_dimp_scenarios
from train import train_logistic
from train import train_SVM
def train_base_model(train_set, test_set, test_sens_attributes):
"""Train base model.
Trains a base model that will server to evaluate the poisoning strategy on.
Args:
train_set (CDataset): the training set.
test_set (CDataset): the testing set.
test_sens_attributes (CArray): the sensitive attributes.
Returns:
dict: dictionary with the trained base model and its performance metrics.
"""
# train model
base_model, base_acc = train_logistic(train_set=train_set, test_set=test_set)
# make prediction
y_pred = base_model.predict(test_set.X)
# calculate performance metrics
fnr, fpr = error_rates(y_true=test_set.Y.get_data(), y_pred=y_pred.get_data(),
sensitive_attributes=test_sens_attributes)
dimp = disparate_impact(y=y_pred.get_data(), sensitive_attributes=test_sens_attributes)
odds_diff = average_odds_difference(y_true=test_set.Y.get_data(), y_pred=y_pred.get_data(),
sensitive_attributes=test_sens_attributes)
# structure output
output = {
'base_clf': base_model,
'base_clf_acc': base_acc,
'base_clf_dimp': dimp,
'base_clf_odds': odds_diff,
'base_clf_FNR': fnr,
'base_clf_FPR': fpr
}
return output
def whitebox_attack(base_clf, train_set, test_set, val_set,
test_sensitive_attribute, val_sensitive_attribute):
"""Perform white box attack on logistic regression.
Args:
base_clf ([type]): the base model.
train_set (CDataset): the training set.
test_set (CDataset): the testing set.
val_set (CDataset): the validation set.
test_sensitive_attribute (CArray): the sensitive attributes for the test set.
val_sensitive_attribute (CArray]): the sensitive attributes for the validation set.
Returns:
dict: dictionary with the trained white poisoned model and its performance metrics.
"""
white_pois_clf = deepcopy(base_clf)
# perform attack
white_pois_pts, white_pois_tr = custom_log_reg_poisoning_attack(
white_pois_clf, train_set, val_set, test_set, test_sensitive_attribute, val_sensitive_attribute)
# retraining with poisoned points
white_pois_clf = white_pois_clf.fit(white_pois_tr.X, white_pois_tr.Y)
white_pois_ypred = white_pois_clf.predict(test_set.X)
# calculate perfomance metrics
metric = CMetricAccuracy()
acc = metric.performance_score(y_true=test_set.Y, y_pred=white_pois_ypred)
dimp = disparate_impact(y=white_pois_ypred.get_data(), sensitive_attributes=test_sensitive_attribute)
odds_diff = average_odds_difference(y_true=test_set.Y.get_data(), y_pred=white_pois_ypred.get_data(),
sensitive_attributes=test_sensitive_attribute)
fnr, fpr = error_rates(y_true=test_set.Y.get_data(), y_pred=white_pois_ypred.get_data(),
sensitive_attributes=test_sensitive_attribute)
# structure output
output = {
'white_pois_clf': white_pois_clf,
'white_pois_pts': white_pois_pts,
'white_pois_dimp': dimp,
'white_pois_odds': odds_diff,
'white_pois_ypred': white_pois_ypred,
'white_pois_acc': acc,
'white_pois_FNR': fnr,
'white_pois_FPR': fpr
}
return output
def blackbox_attack(base_clf, train_set, test_set, val_set,
test_sensitive_attribute, val_sensitive_attribute):
"""Perform blackbox attack on logistic regression.
Args:
base_clf ([type]): the base model.
train_set (CDataset): the training set.
test_set (CDataset): the testing set.
val_set (CDataset): the validation set.
test_sensitive_attribute (CArray): the sensitive attributes for the test set.
val_sensitive_attribute (CArray]): the sensitive attributes for the validation set.
Returns:
dict: dictionary with the trained blackbox poisoned model and its performance metrics.
"""
real_model, real_acc = train_SVM(train_set, test_set)
surrogate_clf = deepcopy(base_clf)
black_pois_points, black_pois_tr = custom_log_reg_poisoning_attack(surrogate_clf, train_set,
val_set, test_set, test_sensitive_attribute,
val_sensitive_attribute)
# retraining with poisoned points
black_pois_clf = deepcopy(real_model)
black_pois_clf = black_pois_clf.fit(black_pois_tr.X, black_pois_tr.Y)
black_pois_ypred = black_pois_clf.predict(test_set.X)
# calculate performance metrics
metric = CMetricAccuracy()
acc = metric.performance_score(y_true=test_set.Y, y_pred=black_pois_ypred)
dimp = disparate_impact(y=black_pois_ypred.get_data(), sensitive_attributes=test_sensitive_attribute)
odds_diff = average_odds_difference(y_true=test_set.Y.get_data(), y_pred=black_pois_ypred.get_data(),
sensitive_attributes=test_sensitive_attribute)
fnr, fpr = error_rates(y_true=test_set.Y.get_data(), y_pred=black_pois_ypred.get_data(),
sensitive_attributes=test_sensitive_attribute)
# structure output
output = {
'black_poisoned_clf': black_pois_clf,
'black_poisoned_pts': black_pois_points,
'black_pois_dimp': dimp,
'black_odds': odds_diff,
'black_pois_ypred': black_pois_ypred,
'black_pois_acc': acc,
'black_pois_FNR': fnr,
'black_pois_FPR': fpr
}
return output
def classic_poisoning_attack(base_clf, train_set, test_set, val_set,
test_sensitive_attribute, val_sensitive_attribute):
"""Perform standard poisoning attack to logistic regression.
Args:
base_clf ([type]): the base model.
train_set (CDataset): the training set.
test_set (CDataset): the testing set.
val_set (CDataset): the validation set.
test_sensitive_attribute (CArray): the sensitive attributes for the test set.
val_sensitive_attribute (CArray]): the sensitive attributes for the validation set.
Returns:
dict: dictionary with the trained poisoned model and its performance metrics.
"""
normal_pois_clf = deepcopy(base_clf)
# perform attack to standard logistic regression.
normal_pois_points, normal_pois_tr = log_reg_poisoning_attack(normal_pois_clf, train_set, val_set,
test_set, test_sensitive_attribute,
val_sensitive_attribute)
# retraining with poisoned points
normal_pois_clf = normal_pois_clf.fit(normal_pois_tr)
normal_pois_ypred = normal_pois_clf.predict(test_set.X)
# calculate performance metrics
metric = CMetricAccuracy()
acc = metric.performance_score(y_true=test_set.Y, y_pred=normal_pois_ypred)
dimp = disparate_impact(y=normal_pois_ypred.get_data(), sensitive_attributes=test_sensitive_attribute)
odds_diff = average_odds_difference(y_true=test_set.Y.get_data(), y_pred=normal_pois_ypred.get_data(),
sensitive_attributes=test_sensitive_attribute)
fnr, fpr = error_rates(y_true=test_set.Y.get_data(), y_pred=normal_pois_ypred.get_data(),
sensitive_attributes=test_sensitive_attribute)
# structure output
output = {
'normal_poisoned_classifier': normal_pois_clf,
'normal_poisoned_points': normal_pois_points,
'normal_pois_d_imp': dimp,
'normal_odds': odds_diff,
'normal_pois_y_pred': normal_pois_ypred,
'normal_pois_acc': acc,
'normal_pois_FNR': fnr,
'normal_pois_FPR': fpr
}
return output
if __name__ == '__main__':
# set seeds for repeated experiments.
seed(SEED)
np.random.seed(SEED)
# construct disparate impact (dimp) scenarios
dimp_scenarios, scenario_dimps = construct_dimp_scenarios()
for i in range(len(dimp_scenarios)):
if i==1:
break
scenario = dimp_scenarios[i]
print(f"\n\n ==== {scenario['name']} ====")
print(f"\t- {scenario['description']}")
# train base model.
base_output = train_base_model(train_set=scenario["training"], test_set=scenario["test"],
test_sens_attributes=scenario["test_sensitive_att"])
scenario = {**scenario, **base_output}
print('--> Whitebox attack...')
whitebox_output = whitebox_attack(base_clf=base_output['base_clf'], train_set=scenario["training"],
test_set=scenario["test"], val_set=scenario["validation"],
test_sensitive_attribute=scenario["test_sensitive_att"],
val_sensitive_attribute=scenario["validation_sensitive_att"])
scenario = {**scenario, **whitebox_output}
dimp_scenarios[i] = scenario
print(scenario.keys())
# save scenarios
joblib.dump(dimp_scenarios, 'dimp_scenarios.pkl')
# plot figure for scenario
plot_2d_decision_boundary(dimp_scenarios[0])