在进行文本预处理工作时,自己写了一些工具函数,主要如下:
- 程序耗时统计
- 任务并行计算
- 未完待续
1. 程序耗时统计
# 1. 统计时间 def timer(func): def inner(*args, **kwargs): print(args[0].flag) print('function [%s] starts runing' % (func.__name__)) start = time.time() result = func(*args, **kwargs) end = time.time() print('function [%s] executed for %.2f seconds' % (func.__name__, end - start)) print('-' * 51) return result return inner
2. 任务并行计算
将数据集分块也可以使用 numpy 的 array_split 来完成,例如: np.array_split(data, partitions)
import random from multiprocessing import cpu_count from multiprocessing import Pool class Parallel: def __init__(self, cores=None, handle_fun=None, merge_fun=None): self.cores = cpu_count() if cores == None or cores > cpu_count() else cores self.handle_fun = handle_fun self.merge_fun = merge_fun def partition(self, data): if len(data) < self.cores: raise ValueError("data size %s should be greater than cores number %s" % (len(data), self.cores)) step, rest = divmod(len(data), self.cores) blocks = [data[start * step: start * step + step] for start in range(self.cores)] # 如果无法按块的数量整除,则剩余部分平均分配到前几个块中 if rest > 0: for idx, ele in zip(range(rest), data[-rest:]): blocks[idx].append(ele) return blocks def __call__(self, data): # 数据分块 blocks = self.partition(data) print(blocks) # 并行处理 pool = Pool(processes=self.cores) results = pool.map(self.handle_fun, blocks) pool.close() pool.join() # 合并结果 return self.merge_fun(results) def merge_fun(results): return sum(results) def handle_fun(data): return sum(data) # 使用示例 def test(): data = [v for v in range(11)] print(data, sum(data)) result = Parallel(3, handle_fun, merge_fun)(data) print(result) if __name__ == '__main__': test()
程序执行结果:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10] 55 [[0, 1, 2, 9], [3, 4, 5, 10], [6, 7, 8]] 55