为了实现决策树,我们需要完成以下几个函数的编写:
- calculate_max_info_gain_index:计算每个特征的信息增益,并选择出最优特征
- split_dataset_by_feature:拿到最优特征,需要将数据进行分割
- majority_vote:当特征划分数据完毕,仍然无法确定类别时,需要进行多数表决
- create_decision_tree:在前几个功能函数基础上,我们就可以构建决策树
- decision_tree_dump、decision_tree_load:决策树的构建非常耗时,需要将其存储起来,下次直接用,不需要重复构建
- classfier_decition_tree:使用决策树进行分类
- main 测试以上功能函数
1. 信息增益计算函数
# 计算信息熵
def calculate_entropy(data_set):
"""
计算某一特征的信息熵
:param data_set: 数据集
:param target_index: 目标特征索引
:return: 特征信息熵值
"""
# 1. 提取特征列元素集合
feature_list = [row[-1] for row in data_set]
# 2. 统计每个特征值出现的次数
feature_value_count = {}
for feature_value in feature_list:
if feature_value not in feature_value_count.keys():
feature_value_count[feature_value] = 0
feature_value_count[feature_value] += 1
# 3. 计算信息熵
my_entropy_value = 0.0
unique_feature_list = set(feature_list)
for feature_value in unique_feature_list:
# 计算每个值出现的概率
probability = feature_value_count[feature_value] / len(feature_list)
# 计算信息熵
my_entropy_value -= probability * math.log(probability, 2)
return my_entropy_value
# 计算条件熵
def calculate_conditional_entropy(data_set, condition_index):
"""
:param data_set: 数据集
:param condition_column_index: 条件特征
:return: 条件熵
"""
# 1. 根据条件特征将数据集分组
dataset_group_by_cond = {}
feature_value = []
for row in data_set:
if row[condition_index] not in dataset_group_by_cond.keys():
dataset_group_by_cond[row[condition_index]] = []
feature_value.append(row[condition_index])
dataset_group_by_cond[row[condition_index]].append(row)
# 2. 计算每个条件值的信息熵 H(target_index|condition_index)
unique_cond_features = set(feature_value)
condition_entropy_value = 0.0
for feature_value in unique_cond_features:
# 计算 y = feature_value 条件下样本占比, 即:p(y)
probability = len(dataset_group_by_cond[feature_value]) / len(data_set)
# 计算 y = feature_value 条件下的信息熵, 即:H(X|Y = y)
entropy_value = calculate_entropy(dataset_group_by_cond[feature_value]) * probability
# 累加所有条件下的条件熵
condition_entropy_value += entropy_value
return condition_entropy_value
# 计算信息增益
def calculate_information_gain(data_set, condition_index):
"""
:param data_set: 数据集
:param condition_index: 条件特征
:param target_index: 目标特征
:return: 信息增益
"""
# 1. 计算目标特征信息熵
target_entropy_value = calculate_entropy(data_set)
# 2. 计算条件特征下的目标特征信息熵
condition_entropy_value = calculate_conditional_entropy(data_set, condition_index)
# 3. 计算信息增益
info_gain_value = target_entropy_value - condition_entropy_value
return info_gain_value
# 获得信息增益最高的特征
def calculate_max_info_gain_index(data_set):
"""
:param data_set: 数据集
:param target_index: 目标特征
:return: 信息增益最高特征索引
"""
# 1. 获得特征数量
feature_index_array = len(data_set[0]) - 1
# 2. 计算每一个特征的信息增益
info_gain_value = 0.0
selected_index = 0
target_feature_index = -1
for index in range(feature_index_array):
temp_info_gain_value = calculate_information_gain(data_set, index)
if temp_info_gain_value > info_gain_value:
info_gain_value = temp_info_gain_value
selected_index = index
return selected_index
2. 数据分割
# 根据特征值拆分数据
def split_dataset_by_feature(data_set, feature_index, feature_value):
"""
:param data_set: 数据集
:param feature_index: 特征索引
:param feature_value: 特征值
:return: 拆分之后子数据集
"""
new_dataset = []
for row in data_set:
if row[feature_index] == feature_value:
temp_dataset = row[:feature_index]
temp_dataset.extend(row[feature_index + 1:])
new_dataset.append(temp_dataset)
return new_dataset
3. 多数表决
# 多数表决
def majority_vote(class_list):
class_count = {}
for vote in class_list:
if vote not in class_count.keys():
class_count[vote] = 0
class_count[vote] += 1
sorted_class_count = sorted(class_count.items(), key=lambda x: x[1], reverse=True)
return sorted_class_count[0][0]
4. 训练决策树
# 构建决策树
def create_decision_tree(data_set):
# 0. 深拷贝数据集、标签集
data_set = copy.deepcopy(data_set)
# 1. 获得所有类别
class_list = [row[-1] for row in data_set]
# 2. 递归退出条件
# 2.1 数据集中的分类都相等(某个类别的个数 == 整个类别的个数)
if len(class_list) == class_list.count(class_list[0]):
return class_list[0]
# 2.2 遍历完所有的特征,其分类仍然不相等,则取分类出现次数最多
if len(data_set[0]) == 1:
return majority_vote(class_list)
# 3. 获得数据集中的信息增益最好的特征
best_feature_index = calculate_max_info_gain_index(data_set)
# 4. 构建决策树
current_feature_value = [row[best_feature_index] for row in data_set]
decision_tree = {best_feature_index: {}}
for feature_value in set(current_feature_value):
# 获得当特征 = feature_value 的所有数据集
sub_dataset = split_dataset_by_feature(data_set, best_feature_index, feature_value)
# 递归构建子树
decision_tree[best_feature_index][feature_value] = create_decision_tree(sub_dataset)
return decision_tree
5. 存储加载
# 决策树存储
def decision_tree_dump(filename, decision_tree):
file = open(filename, 'wb')
pickle.dump(decision_tree, file)
file.close()
# 决策树加载
def decision_tree_load(filename):
file = open(filename, 'rb')
decision_tree = pickle.load(file)
file.close()
return decision_tree
6. 决策树分类
# 测试决策树
def classfier_decition_tree(dicision_tree, feature_labels, test_dataset):
"""
:param decision_tree: 决策树对象
:param feature_labels: 特征标签
:param test_dataset: 测试样本
:return: 分类标签
"""
# 1. 第一个特征索引
best_feature_index = list(dicision_tree.keys())[0]
# 2. 获得其子树
sub_decision_tree = dicision_tree[best_feature_index]
# 3. 递归遍历子树节点的 Key
class_label = None
for key in sub_decision_tree.keys():
# 找到测试样本对应的分支:树、结果
if test_dataset[best_feature_index] == key:
if type(sub_decision_tree[key]).__name__ == 'dict':
class_label = classfier_decition_tree(sub_decision_tree[key], feature_labels, test_dataset)
else:
class_label = sub_decision_tree[key]
return class_label
7. 测试代码
if __name__ == '__main__':
data_labels = ["游泳", "脚蹼", "鱼"]
data_set = [
[1, 1, "yes"],
[1, 1, "yes"],
[1, 0, "no"],
[0, 1, "no"],
[0, 1, "no"],
]
# 1. 构建决策树
dicision_tree = create_decision_tree(data_set)
# 2. 存储决策树
filename = "./decision_tree.data"
decision_tree_dump(filename, dicision_tree)
# 3. 恢复决策树
dt = decision_tree_load(filename)
# 4. 使用决策树分类
classfier_label = classfier_decition_tree(dt, data_labels, [1, 1])
print("classfier_label:", classfier_label)

冀公网安备13050302001966号