-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdrift_detection.py
More file actions
133 lines (108 loc) · 4.64 KB
/
Copy pathdrift_detection.py
File metadata and controls
133 lines (108 loc) · 4.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Feb 8 21:27:34 2022
@author: lingxiaoli
"""
import numpy as np
from scipy.stats import chi2_contingency, ks_2samp, entropy
from scipy.special import softmax
class ChiSquareDrift:
def __init__(self, x_ref, threshold: float = .05, return_p_val=True, return_distance=True):
self.x_ref = x_ref
self.threshold = threshold
self.return_p_val = return_p_val
self.return_distance = return_distance
def updata_ref(self, x_ref):
self.x_ref = x_ref
def update_threshold(self, threshold):
self.threshold = threshold
def update_return_p_val(self, return_p_val):
self.return_p_val = return_p_val
def update_return_distance(self, return_distance):
self.return_distance = return_distance
def process_data(self, x):
margin_width = 0.1
temp = softmax(x.detach().numpy(), axis=-1)
top_2_probs = -np.partition(-temp, kth=1, axis=-1)[:, :2]
diff = top_2_probs[:, 0] - top_2_probs[:, 1]
x_logist = (diff < margin_width).astype(int)
return x_logist[:, None]
def feature_score_Chi(self, x):
x_ref = self.process_data(self.x_ref)
x = self.process_data(x)
x_ref_categories = {0: [0, 1]}
n_features = 1
x_ref = x_ref.reshape(x_ref.shape[0], -1)
x = x.reshape(x.shape[0], -1)
# apply counts on union of categories per variable in both the reference and test data
x_categories = {f: list(np.unique(x[:, f])) for f in range(n_features)}
all_categories = {f: list(set().union(x_ref_categories[f], x_categories[f])) # type: ignore
for f in range(n_features)}
x_ref_count = self.get_counts(x, all_categories)
x_count = self.get_counts(x, all_categories)
p_val = np.zeros(n_features, dtype=np.float32)
dist = np.zeros_like(p_val)
for f in range(n_features): # apply Chi-Squared test
contingency_table = np.vstack((x_ref_count[f], x_count[f]))
dist[f], p_val[f], _, _ = chi2_contingency(contingency_table)
return p_val, dist
def get_counts(self, x, categories):
return {f: [(x[:, f] == v).sum() for v in vals] for f, vals in categories.items()}
def get_result(self, x):
p_vals, dist = self.feature_score_Chi(x)
threshold = self.threshold
drift_pred = int((p_vals < threshold).any()) # type: ignore[assignment]
cd = {}
cd['is_drift'] = drift_pred
if self.return_p_val:
cd['p_val'] = p_vals
cd['threshold'] = threshold
if self.return_distance:
cd['distance'] = dist
return cd
class KSDrift:
def __init__(self, x_ref, threshold: float = .05, return_p_val=True, return_distance=True):
self.x_ref = x_ref
self.threshold = threshold
self.return_p_val = return_p_val
self.return_distance = return_distance
def updata_ref(self, x_ref):
self.x_ref = x_ref
def updata_ref(self, x_ref):
self.x_ref = x_ref
def update_threshold(self, threshold):
self.threshold = threshold
def update_return_p_val(self, return_p_val):
self.return_p_val = return_p_val
def update_return_distance(self, return_distance):
self.return_distance = return_distance
def feature_score_KS(self, x):
x_ref = entropy(softmax(self.x_ref.detach().numpy(), axis=-1), axis=-1)
x = entropy(softmax(x.detach().numpy(), axis=-1), axis=-1)
n_features = 1
x = x.reshape(x.shape[0], -1)
x_ref = x_ref.reshape(x_ref.shape[0], -1)
p_val = np.zeros(n_features, dtype=np.float32)
dist = np.zeros_like(p_val)
for f in range(n_features):
dist[f], p_val[f] = ks_2samp(x_ref[:, f], x[:, f], alternative='two-sided', mode='asymp')
return p_val, dist
def get_result(self, x):
p_vals, dist = self.feature_score_KS(x)
threshold = self.threshold
drift_pred = int((p_vals < threshold).any()) # type: ignore[assignment]
cd = {}
cd['is_drift'] = drift_pred
if self.return_p_val:
cd['p_val'] = p_vals
cd['threshold'] = threshold
if self.return_distance:
cd['distance'] = dist
return cd
def drift_detection(x_ref, threshold: float = .05,
return_p_val=True, return_distance=True, method='KSDrift'):
if method == 'KSDrift':
return KSDrift(x_ref, threshold, return_p_val, return_distance)
elif method == "ChiSquareDrift":
return ChiSquareDrift(x_ref, threshold, return_p_val, return_distance)