multiprocessing 使用子进程代替线程,有效避免 GIL(Global Interpreter Lock) 的影响.

multiprocessing 模块允许充分利用机器上的多个核心进行处理.

multiprocessing 库中的 multiprocessing.pool.Pool对象,提供了可以跨多个输入值并行化函数的执行,跨进程分配输入数据(数据并行)的方法.

multiprocessing.pool.Pool 提供了如下接口:

[1] - apply(func[, args[, kwds]]) (等价于 apply_async( ... ).get())

[2] - apply_async(func[, args[, kwds[, callback[, error_callback]]]])

[3] - map(func, iterable[, chunksize]) (等价于 map_async( ... ).get())

[4] - map_async(func, iterable[, chunksize[, callback[, error_callback]]])

[5] - imap(func, iterable[, chunksize])

[6] - imap_unordered(func, iterable[, chunksize])

[7] - starmap(func, iterable[, chunksize]) (等价于 starmap_async( ... ).get())

[8] - starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])

1. apply 和 apply_async

1.1. apply

apply(func[,args[,kwds]])

apply是阻塞的,需要等待上一个进程结束,下一个进程才开始,所以无法加速.

示例1如:

from multiprocessing import Pool
import time

def square(x):
    time.sleep(2)
    print('[INFO]...processing: ', x)
    return x**x


if __name__ == '__main__':
    p = Pool(2)
    xs = [1, 2, 3, 4]
    #
    start = time.time()
    for x in xs:
        ret = p.apply(square, (x,))  #会阻塞
        print(ret)
    print("[INFO]timecost: ", time.time() - start)

示例2:

if __name__ == '__main__':
    start = time.time()
    with Pool(processes=2) as p:
        ret = list(p.apply(square, args=(x,)) for x in xs)
    print("[INFO]timecost: ", time.time() - start)

示例3:

#With tqdm
from tqdm import tqdm

if __name__ == '__main__':
    start = time.time()
    with Pool(processes=2) as p:
        ret = list(tqdm((p.apply(square, args=(x,)) for x in xs), total=len(xs)))
    print("[INFO]timecost: ", time.time() - start)

1.2. apply_async

apply_async(func[,args[,kwds[,callback[,error_callback]]]])

单次启动一个任务,但是异步执行,启动后不等这个进程结束又开始执行新任务.

相比 apply,apply_async是异步的,返回一个异步对象,可以使用 .get() 方法等待结果 , 如果不需结果不必获取. 有加速效果.

示例1如:

from multiprocessing import Pool
import time

def square(x):
    time.sleep(2)
    print('[INFO]...processing: ', x)
    return x**x


if __name__ == '__main__':
    p = Pool(2)
    xs = [1, 2, 3, 4]
    #
    start = time.time()
    rets = []
    for x in xs:
        ret = p.apply_async(square, (x,))
        rets.append(ret)
    #
    for ret in rets:
        print(ret.get())  #get会阻塞
    print("[INFO]timecost: ", time.time() - start)

示例2如:

if __name__ == '__main__':
    start = time.time()
    with Pool(processes=2) as p:
        rets = list(p.apply_async(square, args=(x,)) for x in xs)
        rets = [r.get() for r in rets]
    print("[INFO]timecost: ", time.time() - start)

示例3如:

#With tqdm
from tqdm import tqdm

if __name__ == '__main__':
    start = time.time()
    with Pool(processes=2) as p:
        rets = list(p.apply_async(square, args=(x,)) for x in xs)
        rets = [r.get() for r in tqdm(rets)]
    print("[INFO]timecost: ", time.time() - start)

示例4如:

#回调函数Callback方式
#With tqdm
from tqdm import tqdm

if __name__ == '__main__':
    start = time.time()
    with tqdm(total=len(xs)) as pbar:
        with Pool(processes=2) as p:
            def callback(*args):
                #callback
                pbar.update()
                return
            results = [
                p.apply_async(
                    square,
                    args=(x, ),
                    callback=callback) for x in xs]
            results = [r.get() for r in results]
    print("[INFO]timecost: ", time.time() - start)

2. map 和 map_async

注:避免使用 map 和 map_async,有更好的选择,如starmap.

2.1. map

map(func,iterable[,chunksize])

阻塞到任务列表中所有任务完成再往下执行.

示例1如:

from multiprocessing import Pool
import time

def square(x): #map:只接收一个参数
    time.sleep(2)
    print('[INFO]...processing: ', x)
    return x**x


if __name__ == '__main__':
    p = Pool(2)
    xs = [1, 2, 3, 4]
    #
    start = time.time()
    ret = p.map(square, xs)  #会阻塞
    print(ret)
    print("[INFO]timecost: ", time.time() - start)

示例2如:

#With tqdm

if __name__ == '__main__':
    start = time.time()
    with Pool(processes=2) as p:
        rets = list(tqdm(p.map(square, xs, chunksize=len(xs)//2)))
    print(rets)
    print("[INFO]timecost: ", time.time() - start)

2.2. map_async

map_async(func,iterable[,chunksize[,callback[,error_callback]]])

map_async生成子进程时使用的是list.

示例如:

if __name__ == '__main__':
    start = time.time()
    with Pool(processes=2) as p:
        rets = p.map_async(square, xs)
        print(rets.get())#get会阻塞
    print("[INFO]timecost: ", time.time() - start)

3. imap 和 imap_unordered

imap 和 imap_unordered 与 map_async 同样是异步,区别是:

[1] - map_async生成子进程时使用的是list,而imap和 imap_unordered则是Iterable,map_async效率略高,而imap和 imap_unordered内存消耗显著的小.

[2] - 在处理结果上,imap 和 imap_unordered 可以尽快返回一个Iterable的结果,而map_async 则需要等待全部Task执行完毕,返回list.

imap 和 imap_unordered 的区别是:

imap 和 map_async一样,都按顺序等待Task的执行结果,而imap_unordered则不必.

imap_unordered返回的Iterable,会优先迭代到先执行完成的Task.

使用imap/imap_unordered替代map_async主要的原因有

[1] - 可迭代对象足够大,将其转换为列表会导致您耗尽/使用太多内存。

[2] - 希望能够在完成所有结果之前就先处理结果

3.1. imap

imap(func,iterable[,chunksize])

示例如:

from multiprocessing import Pool
import time

def square(x): #map:只接收一个参数
    time.sleep(2)
    print('[INFO]...processing: ', x)
    return x**x


if __name__ == '__main__':
    p = Pool(2)
    xs = [1, 2, 3, 4]
    #
    start = time.time()
    rets = p.imap(square, xs)  #不会阻塞
    for ret in rets:#这里会阻塞
        print(ret)
    print("[INFO]timecost: ", time.time() - start)

示例2如:

if __name__ == '__main__':
    start = time.time()
    with Pool(processes=2) as p:
        results = list(p.imap(square, xs, chunksize=len(xs) // 2))
        print(results)
    print("[INFO]timecost: ", time.time() - start)

示例3如:

#With tqdm
if __name__ == '__main__':
    start = time.time()
    with Pool(processes=2) as p:
        results = list(tqdm(p.imap(square, xs, chunksize=len(xs) // 2), total=len(xs)))
        print(results)
    print("[INFO]timecost: ", time.time() - start)

3.2. imap_unordered

imap_unordered(func,iterable[,chunksize])

相对 imap,imap_unordered 的结果是无序的,哪个进程先结束,结果就先获得. 而 imap结果是有序的.

示例1如:

from multiprocessing import Pool
import time

def square(x): #map:只接收一个参数
    time.sleep(2)
    print('[INFO]...processing: ', x)
    return x**x


if __name__ == '__main__':
    p = Pool(2)
    xs = [1, 2, 3, 4]
    #
    start = time.time()
    rets = p.imap_unordered(square, xs)  #不会阻塞
    for ret in rets:#这里会阻塞
        print(ret)
    print("[INFO]timecost: ", time.time() - start)

示例2如:

#With tqdm
if __name__ == '__main__':
    start = time.time()
    with Pool(processes=2) as p:
        results = list(tqdm(p.imap(square, xs, chunksize=len(xs) // 2), total=len(xs)))
        print(results)
    print("[INFO]timecost: ", time.time() - start)

4. starmap 和 starmap_async

starmap 和 starmap_async 与 map 和 map_async 的区别是: starmap 和 starmap_async 可以传入多个参数.

4.1. startmap

示例如:

from multiprocessing import Pool
import time

def square(x, y):
    time.sleep(2)
    print('[INFO]...processing: ', x)
    return x**y

if __name__ == '__main__':
    xs = [1, 2, 3, 4]
    #
    start = time.time()
    with Pool(processes=2) as p:
        rets = p.starmap(square, zip(xs, xs), chunksize=len(xs)//2)
    print("[INFO]timecost: ", time.time() - start)

4.2. starmap_async

示例如:

from multiprocessing import Pool
import time

def square(x, y):
    time.sleep(2)
    print('[INFO]...processing: ', x)
    return x**y

if __name__ == '__main__':
    xs = [1, 2, 3, 4]
    #
    start = time.time()
    with Pool(processes=2) as p:
        rets = p.starmap_async(square, zip(xs, xs), chunksize=len(xs)//2).get()
    print("[INFO]timecost: ", time.time() - start)

参考

[1] - python 多进程加速执行代码 mutiprocessing Pool - 2021.01.06 - 知乎

[2] - multiprocessing --- 基于进程的并行

[3] - Python进程池multiprocessing.Pool八个函数对比 - 2020.10.24

[4] - python multiprocessing 中imap和map的不同 - 2018.11.20

[5] - Progress Bars for Python Multiprocessing Tasks

[6] - Parallelism with Python - 2020.12.17

Last modification:April 18th, 2021 at 10:25 pm