1+ import numpy as np
2+ from scipy .stats import chi2_contingency
3+ from scipy .special import softmax
4+
5+ def process_data (x ):
6+ margin_width = 0.1
7+ temp = softmax (x .detach ().numpy (), axis = - 1 )
8+ top_2_probs = - np .partition (- temp , kth = 1 , axis = - 1 )[:, :2 ]
9+ diff = top_2_probs [:, 0 ] - top_2_probs [:, 1 ]
10+ x_logist = (diff < margin_width ).astype (int )
11+ return x_logist [:, None ]
12+
13+ def feature_score (x_ref , x ):
14+ x_ref = process_data (x_ref )
15+ x = process_data (x )
16+ x_ref_categories = {0 : [0 , 1 ]}
17+ n_features = 1
18+ x_ref = x_ref .reshape (x_ref .shape [0 ], - 1 )
19+ x = x .reshape (x .shape [0 ], - 1 )
20+ # apply counts on union of categories per variable in both the reference and test data
21+ x_categories = {f : list (np .unique (x [:, f ])) for f in range (n_features )}
22+ all_categories = {f : list (set ().union (x_ref_categories [f ], x_categories [f ])) # type: ignore
23+ for f in range (n_features )}
24+ x_ref_count = get_counts (x_ref , all_categories )
25+ x_count = get_counts (x , all_categories )
26+
27+ p_val = np .zeros (n_features , dtype = np .float32 )
28+ dist = np .zeros_like (p_val )
29+ for f in range (n_features ): # apply Chi-Squared test
30+ contingency_table = np .vstack ((x_ref_count [f ], x_count [f ]))
31+ dist [f ], p_val [f ], _ , _ = chi2_contingency (contingency_table )
32+ return p_val , dist
33+
34+ def get_counts (x , categories ):
35+ return {f : [(x [:, f ] == v ).sum () for v in vals ] for f , vals in categories .items ()}
36+
37+ def ChiSquareDrift (x_ref , x , threshold : float = .05 , return_p_val = True , return_distance = True ):
38+ p_vals , dist = feature_score (x_ref , x )
39+ threshold = threshold
40+ drift_pred = int ((p_vals < threshold ).any ()) # type: ignore[assignment]
41+ cd = {}
42+ cd ['is_drift' ] = drift_pred
43+ if return_p_val :
44+ cd ['p_val' ] = p_vals
45+ cd ['threshold' ] = threshold
46+ if return_distance :
47+ cd ['distance' ] = dist
48+ return cd
0 commit comments