风控ML[11] | 3种连续变量分箱方法的代码分享

共 10059字,需浏览 21分钟

 ·

2022-02-10 03:05

大家好呀!在上一篇文章《风控建模中的自动分箱的方法有哪些》中我们介绍了3种业界常用的自动最优分箱方法。
1)基于CART算法的连续变量最优分箱
2)基于卡方检验的连续变量最优分箱
3)基于最优KS的连续变量最优分箱
今天这篇文章就来分享一下这3种方法的Python实现

00 Index

01 测试数据与评估方法准备
02 基于CART算法的最优分箱代码实现
03 基于卡方检验的最优分箱代码实现
04 基于最优KS的最优分箱代码实现
05 测试效果与小节

01 测试数据与评估方法准备

为了模拟实际在风险建模中我们常遇见的数据集,我这边简单造了一些数据,主要有3列:
其中,target就是我们的Y列,另外两个分别是X列,也就是我们的特征。
我们需要做的就是把数据导入即可,数据集可以在公众号(SamShare)后台回复 cut 获取。

# 导入相关库
import pandas as pd
import numpy as np
import random
import math
from scipy.stats import chi2
import scipy

# 测试数据构造,其中target为Y,1代表坏人,0代表好人。  
df = pd.read_csv('./autocut_testdata.csv')
print(len(df))
print(df.target.value_counts()/len(df))
print(df.head())

另外,我们需要一个评估分箱效果的方法,上篇我们讲到可以用IV值来衡量效果,所以我们需要也构造一个IV值计算的方法。

def iv_count(data, var, target):
    ''' 计算iv值
    Args:
        data: DataFrame,拟操作的数据集
        var: String,拟计算IV值的变量名称
        target: String,Y列名称
    Returns:
        IV值, float
    '''

    value_list = set(list(np.unique(data[var])))
    iv = 0
    data_bad = pd.Series(data[data[target]==1][var].values, index=data[data[target]==1].index)
    data_good = pd.Series(data[data[target]==0][var].values, index=data[data[target]==0].index)
    len_bad = len(data_bad)
    len_good = len(data_good)
    for value in value_list:
        # 判断是否某类是否为0,避免出现无穷小值和无穷大值
        if sum(data_bad == value) == 0:
            bad_rate = 1 / len_bad
        else:
            bad_rate = sum(data_bad == value) / len_bad
        if sum(data_good == value) == 0:
            good_rate = 1 / len_good
        else:
            good_rate = sum(data_good == value) / len_good
        iv += (good_rate - bad_rate) * math.log(good_rate / bad_rate,2)
        # print(value,iv)
    return iv

02 基于CART算法的最优分箱代码实现

基于CART算法的连续变量最优分箱,实现步骤如下:
1,给定连续变量 V,对V中的值进行排序;
2,依次计算相邻元素间中位数作为二值划分点的基尼指数;
3,选择最优(划分后基尼指数下降最大)的划分点作为本次迭代的划分点;
4,递归迭代步骤2-3,直到满足停止条件。(一般是以划分后的样本量作为停止条件,比如叶子节点的样本量>=总样本量的10%)

def get_var_median(data, var):
    """ 得到指定连续变量的所有元素的中位数列表
    Args:
        data: DataFrame,拟操作的数据集
        var: String,拟分箱的连续型变量名称
    Returns:
        关于连续变量的所有元素的中位列表,List
    """

    var_value_list = list(np.unique(data[var]))
    var_median_list = []
    for i in range(len(var_value_list)-1):
        var_median = (var_value_list[i] + var_value_list[i+1]) / 2
        var_median_list.append(var_median)
    return var_median_list


def calculate_gini(y):
    """ 计算基尼指数
    Args:
        y: Array,待计算数据的target,即0和1的数组
    Returns:
        基尼指数,float
    """

    # 将数组转化为列表
    y = y.tolist()
    probs = [y.count(i)/len(y) for i in np.unique(y)]
    gini = sum([p*(1-p) for p in probs])
    return gini


def get_cart_split_point(data, var, target, min_sample):
    """ 获得最优的二值划分点(即基尼指数下降最大的点)
    Args:
        data: DataFrame,拟操作的数据集
        var: String,拟分箱的连续型变量名称
        target: String,Y列名称
        min_sample: int,分箱的最小数据样本,也就是数据量至少达到多少才需要去分箱,一般作用在开头或者结尾处的分箱点
    
    Returns:
        BestSplit_Point: 返回本次迭代的最优划分点,float
        BestSplit_Position: 返回最优划分点的位置,最左边为0,最右边为1,float
    """

    
    # 初始化
    Gini = calculate_gini(data[target].values)
    Best_Gini = 0.0
    BestSplit_Point = -99999
    BestSplit_Position = 0.0
    median_list = get_var_median(data, var) # 获取当前数据集指定元素的所有中位数列表
    
    for i in range(len(median_list)):
        left = data[data[var] < median_list[i]]
        right = data[data[var] > median_list[i]]
        
        # 如果切分后的数据量少于指定阈值,跳出本次分箱计算
        if len(left) < min_sample or len(right) < min_sample:
            continue
        
        Left_Gini = calculate_gini(left[target].values)
        Right_Gini = calculate_gini(right[target].values)
        Left_Ratio = len(left) / len(data)
        Right_Ratio = len(right) / len(data)
        
        Temp_Gini = Gini - (Left_Gini * Left_Ratio + Right_Gini * Right_Ratio)
        if Temp_Gini > Best_Gini:
            Best_Gini = Temp_Gini
            BestSplit_Point = median_list[i]
            # 获取切分点的位置,最左边为0,最右边为1
            if len(median_list) > 1:
                BestSplit_Position = i / (len(median_list) - 1)
            else:
                BestSplit_Position = i / len(len(median_list))
        else:
            continue
    Gini = Gini - Best_Gini
    # print("最优切分点:", BestSplit_Point)
    return BestSplit_Point, BestSplit_Position


def get_cart_bincut(data, var, target, leaf_stop_percent=0.05):
    """ 计算最优分箱切分点
    Args:
        data: DataFrame,拟操作的数据集
        var: String,拟分箱的连续型变量名称
        target: String,Y列名称
        leaf_stop_percent: 叶子节点占比,作为停止条件,默认5%
    
    Returns:
        best_bincut: 最优的切分点列表,List
    """

    min_sample = len(data) * leaf_stop_percent
    best_bincut = []
    
    def cutting_data(data, var, target, min_sample, best_bincut):
        split_point, position = get_cart_split_point(data, var, target, min_sample)
        
        if split_point != -99999:
            best_bincut.append(split_point)
        
        # 根据最优切分点切分数据集,并对切分后的数据集递归计算切分点,直到满足停止条件
        # print("本次分箱的值域范围为{0} ~ {1}".format(data[var].min(), data[var].max()))
        left = data[data[var] < split_point]
        right = data[data[var] > split_point]
        
        # 当切分后的数据集仍大于最小数据样本要求,则继续切分
        if len(left) >= min_sample and position not in [0.01.0]:
            cutting_data(left, var, target, min_sample, best_bincut)
        else:
            pass
        if len(right) >= min_sample and position not in [0.01.0]:
            cutting_data(right, var, target, min_sample, best_bincut)
        else:
            pass
        return best_bincut
    best_bincut = cutting_data(data, var, target, min_sample, best_bincut)
    
    # 把切分点补上头尾
    best_bincut.append(data[var].min())
    best_bincut.append(data[var].max())
    best_bincut_set = set(best_bincut)
    best_bincut = list(best_bincut_set)
    
    best_bincut.remove(data[var].min())
    best_bincut.append(data[var].min()-1)
    # 排序切分点
    best_bincut.sort()
    
    return best_bincut

03 基于卡方检验的最优分箱代码实现

基于卡方检验的连续变量最优分箱,实现步骤如下:
1,给定连续变量 V,对V中的值进行排序,然后每个元素值单独一组,完成初始化阶段;
2,对相邻的组,两两计算卡方值;
3,合并卡方值最小的两组;
4,递归迭代步骤2-3,直到满足停止条件。(一般是卡方值都高于设定的阈值,或者达到最大分组数等等)

def calculate_chi(freq_array):
    """ 计算卡方值
    Args:
        freq_array: Array,待计算卡方值的二维数组,频数统计结果
    Returns:
        卡方值,float
    """

    # 检查是否为二维数组
    assert(freq_array.ndim==2)
    
    # 计算每列的频数之和
    col_nums = freq_array.sum(axis=0)
    # 计算每行的频数之和
    row_nums = freq_array.sum(axis=1)
    # 计算总频数
    nums = freq_array.sum()
    # 计算期望频数
    E_nums = np.ones(freq_array.shape) * col_nums / nums
    E_nums = (E_nums.T * row_nums).T
    # 计算卡方值
    tmp_v = (freq_array - E_nums)**2 / E_nums
    # 如果期望频数为0,则计算结果记为0
    tmp_v[E_nums==0] = 0
    chi_v = tmp_v.sum()
    return chi_v


def get_chimerge_bincut(data, var, target, max_group=None, chi_threshold=None):
    """ 计算卡方分箱的最优分箱点
    Args:
        data: DataFrame,待计算卡方分箱最优切分点列表的数据集
        var: 待计算的连续型变量名称
        target: 待计算的目标列Y的名称
        max_group: 最大的分箱数量(因为卡方分箱实际上是合并箱体的过程,需要限制下最大可以保留的分箱数量)
        chi_threshold: 卡方阈值,如果没有指定max_group,我们默认选择类别数量-1,置信度95%来设置阈值
        如果不知道卡方阈值怎么取,可以生成卡方表来看看,代码如下:  
        import pandas as pd
        import numpy as np
        from scipy.stats import chi2
        p = [0.995, 0.99, 0.975, 0.95, 0.9, 0.5, 0.1, 0.05, 0.025, 0.01, 0.005]
        pd.DataFrame(np.array([chi2.isf(p, df=i) for i in range(1,10)]), columns=p, index=list(range(1,10)))
    Returns:
        最优切分点列表,List
    """

    freq_df = pd.crosstab(index=data[var], columns=data[target])
    # 转化为二维数组
    freq_array = freq_df.values
    
    # 初始化箱体,每个元素单独一组
    best_bincut = freq_df.index.values
    
    # 初始化阈值 chi_threshold,如果没有指定 chi_threshold,则默认选择target数量-1,置信度95%来设置阈值
    if max_group is None:
        if chi_threshold is None:
            chi_threshold = chi2.isf(0.05, df = freq_array.shape[-1])
    
    # 开始迭代
    while True:
        min_chi = None
        min_idx = None
        for i in range(len(freq_array) - 1):
            # 两两计算相邻两组的卡方值,得到最小卡方值的两组
            v = calculate_chi(freq_array[i: i+2])
            if min_chi is None or min_chi > v:
                min_chi = v
                min_idx = i
        
        # 是否继续迭代条件判断
        # 条件1:当前箱体数仍大于 最大分箱数量阈值
        # 条件2:当前最小卡方值仍小于制定卡方阈值
        if (max_group is not None and max_group < len(freq_array)) or (chi_threshold is not None and min_chi < chi_threshold):
            tmp = freq_array[min_idx] + freq_array[min_idx+1]
            freq_array[min_idx] = tmp
            freq_array = np.delete(freq_array, min_idx+10)
            best_bincut = np.delete(best_bincut, min_idx+10)
        else:
            break
    
    # 把切分点补上头尾
    best_bincut = best_bincut.tolist()
    best_bincut.append(data[var].min())
    best_bincut.append(data[var].max())
    best_bincut_set = set(best_bincut)
    best_bincut = list(best_bincut_set)
    
    best_bincut.remove(data[var].min())
    best_bincut.append(data[var].min()-1)
    # 排序切分点
    best_bincut.sort()
    
    return best_bincut

04 基于最优KS的最优分箱代码实现

基于最优KS的连续变量最优分箱,实现步骤如下:
1,给定连续变量 V,对V中的值进行排序;
2,每一个元素值就是一个计算点,对应上图中的bin0~9;
3,计算出KS最大的那个元素,作为最优划分点,将变量划分成两部分D1和D2;
4,递归迭代步骤3,计算由步骤3中产生的数据集D1 D2的划分点,直到满足停止条件。(一般是分箱数量达到某个阈值,或者是KS值小于某个阈值)

def get_maxks_split_point(data, var, target, min_sample=0.05):
    """ 计算KS值
    Args:
        data: DataFrame,待计算卡方分箱最优切分点列表的数据集
        var: 待计算的连续型变量名称
        target: 待计算的目标列Y的名称
        min_sample: int,分箱的最小数据样本,也就是数据量至少达到多少才需要去分箱,一般作用在开头或者结尾处的分箱点
    Returns:
        ks_v: KS值,float
        BestSplit_Point: 返回本次迭代的最优划分点,float
        BestSplit_Position: 返回最优划分点的位置,最左边为0,最右边为1,float
    """

    if len(data) < min_sample:
        ks_v, BestSplit_Point, BestSplit_Position = 0-99990.0
    else:
        freq_df = pd.crosstab(index=data[var], columns=data[target])
        freq_array = freq_df.values
        if freq_array.shape[1] == 1# 如果某一组只有一个枚举值,如0或1,则数组形状会有问题,跳出本次计算
            # tt = np.zeros(freq_array.shape).T
            # freq_array = np.insert(freq_array, 0, values=tt, axis=1)
            ks_v, BestSplit_Point, BestSplit_Position = 0-999990.0
        else:
            bincut = freq_df.index.values
            tmp = freq_array.cumsum(axis=0)/(np.ones(freq_array.shape) * freq_array.sum(axis=0).T)
            tmp_abs = abs(tmp.T[0] - tmp.T[1])
            ks_v = tmp_abs.max()
            BestSplit_Point = bincut[tmp_abs.tolist().index(ks_v)]
            BestSplit_Position = tmp_abs.tolist().index(ks_v)/max(len(bincut) - 11)
        
    return ks_v, BestSplit_Point, BestSplit_Position


def get_bestks_bincut(data, var, target, leaf_stop_percent=0.05):
    """ 计算最优分箱切分点
    Args:
        data: DataFrame,拟操作的数据集
        var: String,拟分箱的连续型变量名称
        target: String,Y列名称
        leaf_stop_percent: 叶子节点占比,作为停止条件,默认5%
    
    Returns:
        best_bincut: 最优的切分点列表,List
    """

    min_sample = len(data) * leaf_stop_percent
    best_bincut = []
    
    def cutting_data(data, var, target, min_sample, best_bincut):
        ks, split_point, position = get_maxks_split_point(data, var, target, min_sample)
        
        if split_point != -99999:
            best_bincut.append(split_point)
        
        # 根据最优切分点切分数据集,并对切分后的数据集递归计算切分点,直到满足停止条件
        # print("本次分箱的值域范围为{0} ~ {1}".format(data[var].min(), data[var].max()))
        left = data[data[var] < split_point]
        right = data[data[var] > split_point]
        
        # 当切分后的数据集仍大于最小数据样本要求,则继续切分
        if len(left) >= min_sample and position not in [0.01.0]:
            cutting_data(left, var, target, min_sample, best_bincut)
        else:
            pass
        if len(right) >= min_sample and position not in [0.01.0]:
            cutting_data(right, var, target, min_sample, best_bincut)
        else:
            pass
        return best_bincut
    best_bincut = cutting_data(data, var, target, min_sample, best_bincut)
    
    # 把切分点补上头尾
    best_bincut.append(data[var].min())
    best_bincut.append(data[var].max())
    best_bincut_set = set(best_bincut)
    best_bincut = list(best_bincut_set)
    
    best_bincut.remove(data[var].min())
    best_bincut.append(data[var].min()-1)
    # 排序切分点
    best_bincut.sort()
    
    return best_bincut

05 测试效果与小节

好了,我们也把上面的3种连续变量分箱的方法用Python实现了一下,马上来测试下效果吧。

df['age_bins1'] = pd.cut(df['age'], bins=get_cart_bincut(df, 'age''target'))
df['age_bins2'] = pd.cut(df['age'], bins=get_chimerge_bincut(df, 'age''target'))
df['age_bins3'] = pd.cut(df['age'], bins=get_bestks_bincut(df, 'age''target'))
print("变量 age 的分箱结果如下:")
print("age_cart_bins:", get_cart_bincut(df, 'age''target'))
print("age_chimerge_bins:", get_chimerge_bincut(df, 'age''target'))
print("age_bestks_bins:", get_bestks_bincut(df, 'age''target'))
print("IV值如下:")
print("age:", iv_count(df, 'age''target'))
print("age_cart_bins:", iv_count(df, 'age_bins1''target'))
print("age_chimerge_bins:", iv_count(df, 'age_bins2''target'))
print("age_bestks_bins:", iv_count(df, 'age_bins3''target'))


df['income_bins1'] = pd.cut(df['income'], bins=get_cart_bincut(df, 'income''target'))
df['income_bins2'] = pd.cut(df['income'], bins=get_chimerge_bincut(df, 'income''target'))
df['income_bins3'] = pd.cut(df['income'], bins=get_bestks_bincut(df, 'income''target'))
print("变量 income 的分箱结果如下:")
print("income_cart_bins:", get_cart_bincut(df, 'income''target'))
print("income_chimerge_bins:", get_chimerge_bincut(df, 'income''target'))
print("income_bestks_bins:", get_bestks_bincut(df, 'income''target'))
print("IV值如下:")
print("income:", iv_count(df, 'income''target'))
print("income_cart_bins:", iv_count(df, 'income_bins1''target'))
print("income_chimerge_bins:", iv_count(df, 'income_bins2''target'))
print("income_bestks_bins:", iv_count(df, 'income_bins3''target'))

我们从中可以看到,3种不同的分箱方法效果还是有些不同的,但有一个共通点就是IV值都比分箱前要小,毕竟为了效率牺牲一些“IV”也是合理的。而在实际建模中,我一般都是直接用3种方法,选择最优分箱效果的那个。
以上是相对比较简单的实现,也欢迎大家试用下,有什么问题可以随机反馈~另外如果大家喜欢这篇文章的话,欢迎点赞转发哦,谢谢!

Reference

https://blog.csdn.net/xgxyxs/article/details/90413036
https://zhuanlan.zhihu.com/p/44943177
https://blog.csdn.net/hxcaifly/article/details/84593770
https://blog.csdn.net/haoxun12/article/details/105301414/
https://www.bilibili.com/read/cv12971807

浏览 65
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报