在进行文本预处理工作时,自己写了一些工具函数,主要如下:
- 程序耗时统计
- 任务并行计算
- 未完待续
1. 程序耗时统计
# 1. 统计时间
def timer(func):
def inner(*args, **kwargs):
print(args[0].flag)
print('function [%s] starts runing' % (func.__name__))
start = time.time()
result = func(*args, **kwargs)
end = time.time()
print('function [%s] executed for %.2f seconds' % (func.__name__, end - start))
print('-' * 51)
return result
return inner
2. 任务并行计算
将数据集分块也可以使用 numpy 的 array_split 来完成,例如: np.array_split(data, partitions)
import random
from multiprocessing import cpu_count
from multiprocessing import Pool
class Parallel:
def __init__(self, cores=None, handle_fun=None, merge_fun=None):
self.cores = cpu_count() if cores == None or cores > cpu_count() else cores
self.handle_fun = handle_fun
self.merge_fun = merge_fun
def partition(self, data):
if len(data) < self.cores:
raise ValueError("data size %s should be greater than cores number %s" % (len(data), self.cores))
step, rest = divmod(len(data), self.cores)
blocks = [data[start * step: start * step + step] for start in range(self.cores)]
# 如果无法按块的数量整除,则剩余部分平均分配到前几个块中
if rest > 0:
for idx, ele in zip(range(rest), data[-rest:]):
blocks[idx].append(ele)
return blocks
def __call__(self, data):
# 数据分块
blocks = self.partition(data)
print(blocks)
# 并行处理
pool = Pool(processes=self.cores)
results = pool.map(self.handle_fun, blocks)
pool.close()
pool.join()
# 合并结果
return self.merge_fun(results)
def merge_fun(results):
return sum(results)
def handle_fun(data):
return sum(data)
# 使用示例
def test():
data = [v for v in range(11)]
print(data, sum(data))
result = Parallel(3, handle_fun, merge_fun)(data)
print(result)
if __name__ == '__main__':
test()
程序执行结果:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10] 55 [[0, 1, 2, 9], [3, 4, 5, 10], [6, 7, 8]] 55

冀公网安备13050302001966号