为了实现决策树,我们需要完成以下几个函数的编写:
- calculate_max_info_gain_index:计算每个特征的信息增益,并选择出最优特征
- split_dataset_by_feature:拿到最优特征,需要将数据进行分割
- majority_vote:当特征划分数据完毕,仍然无法确定类别时,需要进行多数表决
- create_decision_tree:在前几个功能函数基础上,我们就可以构建决策树
- decision_tree_dump、decision_tree_load:决策树的构建非常耗时,需要将其存储起来,下次直接用,不需要重复构建
- classfier_decition_tree:使用决策树进行分类
- main 测试以上功能函数
1. 信息增益计算函数
# 计算信息熵 def calculate_entropy(data_set): """ 计算某一特征的信息熵 :param data_set: 数据集 :param target_index: 目标特征索引 :return: 特征信息熵值 """ # 1. 提取特征列元素集合 feature_list = [row[-1] for row in data_set] # 2. 统计每个特征值出现的次数 feature_value_count = {} for feature_value in feature_list: if feature_value not in feature_value_count.keys(): feature_value_count[feature_value] = 0 feature_value_count[feature_value] += 1 # 3. 计算信息熵 my_entropy_value = 0.0 unique_feature_list = set(feature_list) for feature_value in unique_feature_list: # 计算每个值出现的概率 probability = feature_value_count[feature_value] / len(feature_list) # 计算信息熵 my_entropy_value -= probability * math.log(probability, 2) return my_entropy_value # 计算条件熵 def calculate_conditional_entropy(data_set, condition_index): """ :param data_set: 数据集 :param condition_column_index: 条件特征 :return: 条件熵 """ # 1. 根据条件特征将数据集分组 dataset_group_by_cond = {} feature_value = [] for row in data_set: if row[condition_index] not in dataset_group_by_cond.keys(): dataset_group_by_cond[row[condition_index]] = [] feature_value.append(row[condition_index]) dataset_group_by_cond[row[condition_index]].append(row) # 2. 计算每个条件值的信息熵 H(target_index|condition_index) unique_cond_features = set(feature_value) condition_entropy_value = 0.0 for feature_value in unique_cond_features: # 计算 y = feature_value 条件下样本占比, 即:p(y) probability = len(dataset_group_by_cond[feature_value]) / len(data_set) # 计算 y = feature_value 条件下的信息熵, 即:H(X|Y = y) entropy_value = calculate_entropy(dataset_group_by_cond[feature_value]) * probability # 累加所有条件下的条件熵 condition_entropy_value += entropy_value return condition_entropy_value # 计算信息增益 def calculate_information_gain(data_set, condition_index): """ :param data_set: 数据集 :param condition_index: 条件特征 :param target_index: 目标特征 :return: 信息增益 """ # 1. 计算目标特征信息熵 target_entropy_value = calculate_entropy(data_set) # 2. 计算条件特征下的目标特征信息熵 condition_entropy_value = calculate_conditional_entropy(data_set, condition_index) # 3. 计算信息增益 info_gain_value = target_entropy_value - condition_entropy_value return info_gain_value # 获得信息增益最高的特征 def calculate_max_info_gain_index(data_set): """ :param data_set: 数据集 :param target_index: 目标特征 :return: 信息增益最高特征索引 """ # 1. 获得特征数量 feature_index_array = len(data_set[0]) - 1 # 2. 计算每一个特征的信息增益 info_gain_value = 0.0 selected_index = 0 target_feature_index = -1 for index in range(feature_index_array): temp_info_gain_value = calculate_information_gain(data_set, index) if temp_info_gain_value > info_gain_value: info_gain_value = temp_info_gain_value selected_index = index return selected_index
2. 数据分割
# 根据特征值拆分数据 def split_dataset_by_feature(data_set, feature_index, feature_value): """ :param data_set: 数据集 :param feature_index: 特征索引 :param feature_value: 特征值 :return: 拆分之后子数据集 """ new_dataset = [] for row in data_set: if row[feature_index] == feature_value: temp_dataset = row[:feature_index] temp_dataset.extend(row[feature_index + 1:]) new_dataset.append(temp_dataset) return new_dataset
3. 多数表决
# 多数表决 def majority_vote(class_list): class_count = {} for vote in class_list: if vote not in class_count.keys(): class_count[vote] = 0 class_count[vote] += 1 sorted_class_count = sorted(class_count.items(), key=lambda x: x[1], reverse=True) return sorted_class_count[0][0]
4. 训练决策树
# 构建决策树 def create_decision_tree(data_set): # 0. 深拷贝数据集、标签集 data_set = copy.deepcopy(data_set) # 1. 获得所有类别 class_list = [row[-1] for row in data_set] # 2. 递归退出条件 # 2.1 数据集中的分类都相等(某个类别的个数 == 整个类别的个数) if len(class_list) == class_list.count(class_list[0]): return class_list[0] # 2.2 遍历完所有的特征,其分类仍然不相等,则取分类出现次数最多 if len(data_set[0]) == 1: return majority_vote(class_list) # 3. 获得数据集中的信息增益最好的特征 best_feature_index = calculate_max_info_gain_index(data_set) # 4. 构建决策树 current_feature_value = [row[best_feature_index] for row in data_set] decision_tree = {best_feature_index: {}} for feature_value in set(current_feature_value): # 获得当特征 = feature_value 的所有数据集 sub_dataset = split_dataset_by_feature(data_set, best_feature_index, feature_value) # 递归构建子树 decision_tree[best_feature_index][feature_value] = create_decision_tree(sub_dataset) return decision_tree
5. 存储加载
# 决策树存储 def decision_tree_dump(filename, decision_tree): file = open(filename, 'wb') pickle.dump(decision_tree, file) file.close() # 决策树加载 def decision_tree_load(filename): file = open(filename, 'rb') decision_tree = pickle.load(file) file.close() return decision_tree
6. 决策树分类
# 测试决策树 def classfier_decition_tree(dicision_tree, feature_labels, test_dataset): """ :param decision_tree: 决策树对象 :param feature_labels: 特征标签 :param test_dataset: 测试样本 :return: 分类标签 """ # 1. 第一个特征索引 best_feature_index = list(dicision_tree.keys())[0] # 2. 获得其子树 sub_decision_tree = dicision_tree[best_feature_index] # 3. 递归遍历子树节点的 Key class_label = None for key in sub_decision_tree.keys(): # 找到测试样本对应的分支:树、结果 if test_dataset[best_feature_index] == key: if type(sub_decision_tree[key]).__name__ == 'dict': class_label = classfier_decition_tree(sub_decision_tree[key], feature_labels, test_dataset) else: class_label = sub_decision_tree[key] return class_label
7. 测试代码
if __name__ == '__main__': data_labels = ["游泳", "脚蹼", "鱼"] data_set = [ [1, 1, "yes"], [1, 1, "yes"], [1, 0, "no"], [0, 1, "no"], [0, 1, "no"], ] # 1. 构建决策树 dicision_tree = create_decision_tree(data_set) # 2. 存储决策树 filename = "./decision_tree.data" decision_tree_dump(filename, dicision_tree) # 3. 恢复决策树 dt = decision_tree_load(filename) # 4. 使用决策树分类 classfier_label = classfier_decition_tree(dt, data_labels, [1, 1]) print("classfier_label:", classfier_label)