joblib
是一个Python库,用于在Python中高效地保存和加载对象,特别是那些包含大型数据数组的对象。它在机器学习领域中非常有用,因为经常需要保存训练好的模型或中间数据。
除了提供对象序列化的功能外,还有一个重要的特性就是并行计算。这个特性使得它在处理大规模数据或者需要重复运行的任务时能够更加高效地利用多核处理器。
pip install joblib -i https://pypi.tuna.tsinghua.edu.cn/simple/
1. 对象序列化
对象序列化指的是将一个对象转换为一个字节序列或其他格式,以便在需要时可以将其存储到磁盘或者通过网络传输,并在稍后的时间或不同的地点将其还原为原始对象的过程。
joblib
库提供了 dump
和 load
函数用于将 Python 对象保存(序列化)到磁盘并从磁盘加载(反序列化)并恢复成 Python 对象。
import joblib import random class Person: def __init__(self, name, age): self.name = name self.age = age def show(self): print('Name:' + self.name, 'Age:' + str(self.age)) def test01(): my_lst = [random.randint(1, 100) for _ in range(10000)] my_set = {10, 20, 30} person = Person('张三', 18) # 使用 joblib.dump 函数实现对象存储到文件中,实现持久化存储 # value: 存储的对象 # filename: 存储文件路径 joblib.dump([my_set, my_lst, person], filename='demo.joblib', compress=('gzip', 5)) def test02(): # 使用 joblib.load 函数反序列化,将文件数据恢复为 Python 对象 # mmap memory mapping 内存映射 data = joblib.load('demo.joblib') print(data) print(data[0]) print(data[1]) data[2].show() if __name__ == '__main__': test01() test02()
程序输出结果:
[{10, 20, 30}, [82, 85, 59, 87, 92 ...], <__main__.Person object at 0x000001DE93811AC0>] {10, 20, 30} [82, 85, 59, 87, 92 ...] Name:张三 Age:18
2. 并行计算
并行计算是旨在通过将大型任务分解为多个独立且可并行处理的子任务,并将这些子任务分配给多个计算资源以同时执行,以提高整体处理效率。
并行计算需要使用 Joblib 工具中 delayed 装饰器函数,以及 Parallel 类。其使用过程如下:
- 定义任务函数,并将函数转换为延迟计算函数(任务函数)
- 初始化 Parallel 并发对象,并将任务列表传递到并发对象中执行
- 收集每个任务结果,并进行合并处理
import joblib # 问题: 计算 10000 个数字的和 data = [i for i in range(10000)] def test(): # 1. 定义任务函数,将任务函数转换为延迟计算的函数 def task1(s, e): my_data = data[s: e] total = 0 for v in my_data: total += v return total @joblib.delayed def task2(s, e): my_data = data[s: e] total = 0 for v in my_data: total += v return total tasks = [] for i in range(10): s = i * 1000 e = (i + 1) * 1000 # 将普通的任务函数封装成延迟计算的任务函数 # task = joblib.delayed(task1)(s, e) task = task2(s, e) # 将所有的任务函数存储到列表 tasks.append(task) # 2. 初始化 Parallel 对象,内部封装并行计算逻辑 parallel = joblib.Parallel(n_jobs=5, return_as='generator', verbose=100) # 可调用对象,内部实现 __call__ 方法,对象可以当做函数一样调用 # 返回的结果,就是每个任务的返回的结果。列表类型 results = parallel(tasks) # 3. 收集计算结果,合并计算结果 print(sum(results), sum(data), results) if __name__ == '__main__': test()
程序执行结果:
[Parallel(n_jobs=5)]: Using backend LokyBackend with 5 concurrent workers. [Parallel(n_jobs=5)]: Done 1 tasks | elapsed: 0.0s [Parallel(n_jobs=5)]: Batch computation too fast (0.14040446281433105s.) Setting batch_size=2. [Parallel(n_jobs=5)]: Done 2 out of 10 | elapsed: 0.1s remaining: 0.5s [Parallel(n_jobs=5)]: Done 3 out of 10 | elapsed: 0.1s remaining: 0.3s [Parallel(n_jobs=5)]: Done 4 out of 10 | elapsed: 0.1s remaining: 0.1s [Parallel(n_jobs=5)]: Done 5 out of 10 | elapsed: 0.1s remaining: 0.1s [Parallel(n_jobs=5)]: Done 6 out of 10 | elapsed: 0.1s remaining: 0.0s [Parallel(n_jobs=5)]: Done 7 out of 10 | elapsed: 0.1s remaining: 0.0s [Parallel(n_jobs=5)]: Done 8 out of 10 | elapsed: 0.1s remaining: 0.0s [Parallel(n_jobs=5)]: Done 10 out of 10 | elapsed: 0.1s finished 49995000 49995000 <generator object Parallel._get_outputs at 0x00000248E95D3200>