手动实现决策树

为了实现决策树,我们需要完成以下几个函数的编写:

  1. calculate_max_info_gain_index:计算每个特征的信息增益,并选择出最优特征
  2. split_dataset_by_feature:拿到最优特征,需要将数据进行分割
  3. majority_vote:当特征划分数据完毕,仍然无法确定类别时,需要进行多数表决
  4. create_decision_tree:在前几个功能函数基础上,我们就可以构建决策树
  5. decision_tree_dump、decision_tree_load:决策树的构建非常耗时,需要将其存储起来,下次直接用,不需要重复构建
  6. classfier_decition_tree:使用决策树进行分类
  7. main 测试以上功能函数

1. 信息增益计算函数

# 计算信息熵
def calculate_entropy(data_set):
    """
    计算某一特征的信息熵
    :param data_set: 数据集
    :param target_index: 目标特征索引
    :return: 特征信息熵值
    """

    # 1. 提取特征列元素集合
    feature_list = [row[-1] for row in data_set]

    # 2. 统计每个特征值出现的次数
    feature_value_count = {}
    for feature_value in feature_list:
        if feature_value not in feature_value_count.keys():
            feature_value_count[feature_value] = 0
        feature_value_count[feature_value] += 1

    # 3. 计算信息熵
    my_entropy_value = 0.0
    unique_feature_list = set(feature_list)
    for feature_value in unique_feature_list:
        # 计算每个值出现的概率
        probability = feature_value_count[feature_value] / len(feature_list)
        # 计算信息熵
        my_entropy_value -= probability * math.log(probability, 2)

    return my_entropy_value


# 计算条件熵
def calculate_conditional_entropy(data_set, condition_index):
    """
    :param data_set: 数据集
    :param condition_column_index: 条件特征
    :return: 条件熵
    """

    # 1. 根据条件特征将数据集分组
    dataset_group_by_cond = {}
    feature_value = []
    for row in data_set:
        if row[condition_index] not in dataset_group_by_cond.keys():
            dataset_group_by_cond[row[condition_index]] = []
        feature_value.append(row[condition_index])
        dataset_group_by_cond[row[condition_index]].append(row)

    # 2. 计算每个条件值的信息熵 H(target_index|condition_index)
    unique_cond_features = set(feature_value)
    condition_entropy_value = 0.0
    for feature_value in unique_cond_features:
        # 计算 y = feature_value 条件下样本占比, 即:p(y)
        probability = len(dataset_group_by_cond[feature_value]) / len(data_set)
        # 计算 y = feature_value 条件下的信息熵, 即:H(X|Y = y)
        entropy_value = calculate_entropy(dataset_group_by_cond[feature_value]) * probability
        # 累加所有条件下的条件熵
        condition_entropy_value += entropy_value

    return condition_entropy_value


# 计算信息增益
def calculate_information_gain(data_set, condition_index):
    """
    :param data_set: 数据集
    :param condition_index: 条件特征
    :param target_index: 目标特征
    :return: 信息增益
    """

    # 1. 计算目标特征信息熵
    target_entropy_value = calculate_entropy(data_set)
    # 2. 计算条件特征下的目标特征信息熵
    condition_entropy_value = calculate_conditional_entropy(data_set, condition_index)
    # 3. 计算信息增益
    info_gain_value = target_entropy_value - condition_entropy_value

    return info_gain_value


# 获得信息增益最高的特征
def calculate_max_info_gain_index(data_set):
    """
    :param data_set: 数据集
    :param target_index: 目标特征
    :return: 信息增益最高特征索引
    """

    # 1. 获得特征数量
    feature_index_array = len(data_set[0]) - 1
    # 2. 计算每一个特征的信息增益
    info_gain_value = 0.0
    selected_index = 0
    target_feature_index = -1
    for index in range(feature_index_array):
        temp_info_gain_value = calculate_information_gain(data_set, index)
        if temp_info_gain_value > info_gain_value:
            info_gain_value = temp_info_gain_value
            selected_index = index

    return selected_index

2. 数据分割

# 根据特征值拆分数据
def split_dataset_by_feature(data_set, feature_index, feature_value):
    """
    :param data_set: 数据集
    :param feature_index: 特征索引
    :param feature_value: 特征值
    :return: 拆分之后子数据集
    """
    new_dataset = []
    for row in data_set:
        if row[feature_index] == feature_value:
            temp_dataset = row[:feature_index]
            temp_dataset.extend(row[feature_index + 1:])
            new_dataset.append(temp_dataset)

    return new_dataset

3. 多数表决

# 多数表决
def majority_vote(class_list):

    class_count = {}
    for vote in class_list:
        if vote not in class_count.keys():
            class_count[vote] = 0
        class_count[vote] += 1

    sorted_class_count = sorted(class_count.items(), key=lambda x: x[1], reverse=True)
    return sorted_class_count[0][0]

4. 训练决策树

# 构建决策树
def create_decision_tree(data_set):

    # 0. 深拷贝数据集、标签集
    data_set = copy.deepcopy(data_set)

    # 1. 获得所有类别
    class_list = [row[-1] for row in data_set]

    # 2. 递归退出条件
    # 2.1 数据集中的分类都相等(某个类别的个数 == 整个类别的个数)
    if len(class_list) == class_list.count(class_list[0]):
        return class_list[0]
    # 2.2 遍历完所有的特征,其分类仍然不相等,则取分类出现次数最多
    if len(data_set[0]) == 1:
        return majority_vote(class_list)

    # 3. 获得数据集中的信息增益最好的特征
    best_feature_index = calculate_max_info_gain_index(data_set)

    # 4. 构建决策树
    current_feature_value = [row[best_feature_index] for row in data_set]
    decision_tree = {best_feature_index: {}}

    for feature_value in set(current_feature_value):
        # 获得当特征 = feature_value 的所有数据集
        sub_dataset = split_dataset_by_feature(data_set, best_feature_index, feature_value)
        # 递归构建子树
        decision_tree[best_feature_index][feature_value] = create_decision_tree(sub_dataset)

    return decision_tree

5. 存储加载

# 决策树存储
def decision_tree_dump(filename, decision_tree):

    file = open(filename, 'wb')
    pickle.dump(decision_tree, file)
    file.close()


# 决策树加载
def decision_tree_load(filename):
    file = open(filename, 'rb')
    decision_tree = pickle.load(file)
    file.close()

    return decision_tree

6. 决策树分类

# 测试决策树
def classfier_decition_tree(dicision_tree, feature_labels, test_dataset):
    """
    :param decision_tree: 决策树对象
    :param feature_labels: 特征标签
    :param test_dataset: 测试样本
    :return: 分类标签
    """

    # 1. 第一个特征索引
    best_feature_index = list(dicision_tree.keys())[0]
    # 2. 获得其子树
    sub_decision_tree = dicision_tree[best_feature_index]

    # 3. 递归遍历子树节点的 Key
    class_label = None
    for key in sub_decision_tree.keys():
        # 找到测试样本对应的分支:树、结果
        if test_dataset[best_feature_index] == key:
            if type(sub_decision_tree[key]).__name__ == 'dict':
                class_label = classfier_decition_tree(sub_decision_tree[key], feature_labels, test_dataset)
            else:
                class_label = sub_decision_tree[key]

    return class_label

7. 测试代码

if __name__ == '__main__':
    data_labels = ["游泳", "脚蹼", "鱼"]
    data_set = [
        [1, 1, "yes"],
        [1, 1, "yes"],
        [1, 0, "no"],
        [0, 1, "no"],
        [0, 1, "no"],
    ]
    # 1. 构建决策树
    dicision_tree = create_decision_tree(data_set)
    # 2. 存储决策树
    filename = "./decision_tree.data"
    decision_tree_dump(filename, dicision_tree)
    # 3. 恢复决策树
    dt = decision_tree_load(filename)
    # 4. 使用决策树分类
    classfier_label = classfier_decition_tree(dt, data_labels, [1, 1])
    print("classfier_label:", classfier_label)
未经允许不得转载:一亩三分地 » 手动实现决策树
评论 (0)

2 + 5 =