分类: Python/Ruby
2022-07-07 17:19:16
# 定义决策树类
class DecisionTree(object):
def __init__(self, classes, features,
max_depth=10, min_samples_split=10,
impurity_t='entropy'):
'''
传入一些可能用到的模型参数,也可能不会用到
classes表示模型分类总共有几类
features是每个特征的名字,也方便查询总的共特征数
max_depth表示构建决策树时的最大深度
min_samples_split表示构建决策树分裂节点时,如果到达该节点的样本数小于该值则不再分裂
impurity_t表示计算混杂度(不纯度)的计算方式,例如entropy或gini
'''
self.classes = classes
self.features = features
self.max_depth = max_depth
self.min_samples_split = min_samples_split
self.impurity_t = impurity_t
self.root = None # 定义根节点,未训练时为空
def impurity(self, data):
'''
计算某个特征下的信息增益
:param data: ,numpy一维数组
:return: 混杂度
'''
cnt = Counter(data) # 计数每个值出现的次数
probability_lst = [1.0 * cnt[i] / len(data) for i in cnt]
if self.impurity_t == 'entropy': # 如果是信息熵
return -np.sum([p * np.log2(p) for p in probability_lst if p > 0]), cnt # 返回熵 和可能用到的数据次数(方便以后使用)
return 1 - np.sum([p * p for p in probability_lst]), cnt # 否则返回gini系数
def gain(self, feature, label):
'''
计算某个特征下的信息增益
:param feature: 特征的值,numpy一维数组
:param label: 对应的标签,numpy一维数组
:return: 信息增益
'''
c_impurity, _ = self.impurity(label) # 不考虑特征时标签的混杂度
# 记录特征的每种取值所对应的数组下标
f_index = {}
for idx, v in enumerate(feature):
if v not in f_index:
f_index[v] = []
f_index[v].append(idx)
# 计算根据该特征分裂后的不纯度,根据特征的每种值的数目加权和
f_impurity = 0
for v in f_index:
# 根据该特征取值对应的数组下标 取出对应的标签列表 比如分支1有多少个正负例 分支2有...
f_l = label[f_index[v]]
f_impurity += self.impurity(f_l)[0] * len(f_l) / len(label) # 循环结束得到各分支混杂度的期望
gain = c_impurity - f_impurity # 得到gain
# 有些特征取值很多,天然不纯度高、信息增益高,模型会偏向于取值很多的特征比如日期,但很可能过拟合
# 计算信息增益率可以缓解该问题
splitInformation = self.impurity(feature)[0] # 计算该特征在标签无关时的不纯度
gainRatio = gain / splitInformation if splitInformation > 0 else gain # 除数不为0时为信息增益率
return gainRatio, f_index # 返回信息增益率,以及每个特征取值的数组下标(方便以后使用)
def expand_node(self, feature, label, depth, skip_features=set()):
'''
递归函数分裂节点
:param feature:二维numpy(n*m)数组,每行表示一个样本,n行,有m个特征
:param label:一维numpy(n)数组,表示每个样本的分类标签
:param depth:当前节点的深度
:param skip_features:表示当前路径已经用到的特征
在当前ID3算法离散特征的实现下,一条路径上已经用过的特征不会再用(其他实现有可能会选重复特征)
'''
l_cnt = Counter(label) # 计数每个类别的样本出现次数
if len(l_cnt) <= 1: # 如果只有一种类别了,无需分裂,已经是叶节点
return label[0] # 只需记录类别
if len(label) < self.min_samples_split or depth > self.max_depth: # 如果达到了最小分裂的样本数或者最大深度的阈值
return l_cnt.most_common(1)[0][0] # 则只记录当前样本中最多的类别
f_idx, max_gain, f_v_index = -1, -1, None # 准备挑选分裂特征
for idx in range(len(self.features)): # 遍历所有特征
if idx in skip_features: # 如果当前路径已经用到,不用再算
continue
f_gain, fv = self.gain(feature[:, idx], label) # 计算特征的信息增益,fv是特征每个取值的样本下标
# if f_gain <= 0: # 如果信息增益不为正,跳过该特征
# continue
if f_idx < 0 or f_gain > max_gain: # 如果个更好的分裂特征
f_idx, max_gain, f_v_index = idx, f_gain, fv # 则记录该特征
# if f_idx < 0: # 跟单网gendan5.com如果没有找到合适的特征,即所有特征都没有信息增益
# return l_cnt.most_common(1)[0][0] # 则只记录当前样本中最多的类别
decision = {} # 用字典记录每个特征取值所对应的子节点,key是特征取值,value是子节点
skip_features = set([f_idx] + [f for f in skip_features]) # 子节点要跳过的特征包括当前选择的特征
for v in f_v_index: # 遍历特征的每种取值
decision[v] = self.expand_node(feature[f_v_index[v], :], label[f_v_index[v]], # 取出该特征取值所对应的样本
depth=depth + 1, skip_features=skip_features) # 深度+1,递归调用节点分裂
# 返回一个元组,有三个元素
# 第一个是选择的特征下标,第二个特征取值和对应的子节点(字典),第三个是到达当前节点的样本中最多的类别
return (f_idx, decision, l_cnt.most_common(1)[0][0])
def traverse_node(self, node, feature):
'''
预测样本时从根节点开始遍历节点,根据特征路由。
:param node: 当前到达的节点,例如self.root
:param feature: 长度为m的numpy一维数组
'''
assert len(self.features) == len(feature) # 要求输入样本特征数和模型定义时特征数目一致
if type(node) is not tuple: # 如果到达了一个节点是叶节点(不再分裂),则返回该节点类别
return node
fv = feature[node[0]] # 否则取出该节点对应的特征值,node[0]记录了特征的下标
if fv in node[1]: # 根据特征值找到子节点,注意需要判断训练节点分裂时到达该节点的样本是否有该特征值(分支)
return self.traverse_node(node[1][fv], feature) # 如果有,则进入到子节点继续遍历
return node[-1] # 如果没有,返回训练时到达当前节点的样本中最多的类别
def fit(self, feature, label):
'''
训练模型
:param feature:feature为二维numpy(n*m)数组,每行表示一个样本,有m个特征
:param label:label为一维numpy(n)数组,表示每个样本的分类标签
'''
assert len(self.features) == len(feature[0]) # 输入数据的特征数目应该和模型定义时的特征数目相同
self.root = self.expand_node(feature, label, depth=1) # 从根节点开始分裂,模型记录根节点
def predict(self, feature):
'''
预测
:param feature:输入feature可以是一个一维numpy数组也可以是一个二维numpy数组
如果是一维numpy(m)数组则是一个样本,包含m个特征,返回一个类别值
如果是二维numpy(n*m)数组则表示n个样本,每个样本包含m个特征,返回一个numpy一维数组
'''
assert len(feature.shape) == 1 or len(feature.shape) == 2 # 只能是1维或2维
if len(feature.shape) == 1: # 如果是一个样本
return self.traverse_node(self.root, feature) # 从根节点开始路由
return np.array([self.traverse_node(self.root, f) for f in feature]) # 如果是很多个样本
def get_params(self, deep): # 要调用sklearn的cross_validate需要实现该函数返回所有参数
return {'classes': self.classes, 'features': self.features,
'max_depth': self.max_depth, 'min_samples_split': self.min_samples_split,
'impurity_t': self.impurity_t}
def set_params(self, **parameters): # 要调用sklearn的GridSearchCV需要实现该函数给类设定所有参数
for parameter, value in parameters.items():
setattr(self, parameter, value)
return self
# 定义决策树模型,传入算法参数
DT = DecisionTree(classes=[0, 1], features=feature_names, max_depth=5, min_samples_split=10, impurity_t='gini')
DT.fit(x_train, y_train) # 在训练集上训练
p_test = DT.predict(x_test) # 在测试集上预测,获得预测值
print(p_test) # 输出预测值
test_acc = accuracy_score(p_test, y_test) # 将测试预测值与测试集标签对比获得准确率
print('accuracy: {:.4f}'.format(test_acc)) # 输出准确率