Python多进程编程之Pool

by LauCyun Aug 17,2016 10:51:12 22,862 views

在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

构造方法

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

  • processes:工作进程数量。Pool默认大小是CPU的核数os.cpu_count(),我们也可以通过在Pool中传入processes参数即可自定义需要的核数量。
  • initializer: 如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)
  • maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
  • context:用在制定工作进程启动时的上下文,一般使用multiprocessing.Pool()或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context

方法

  • apply(func[, args[, kwds]])是阻塞的。
  • apply_async(func[, args[, kwds[, callback]]])它是非阻塞。(理解与apply区别,看栗子1和栗子2结果区别)
  • close()关闭pool,使其不在接受新的任务。
  • terminate()关闭pool,结束工作进程,不在处理未完成的任务。
  • join()主进程阻塞,等待子进程的退出, join方法要在closeterminate之后使用。
  • map(funciterable[, chunksize])
  • map_async(funciterable[, chunksize[, callback[, error_callback]]])
  • imap(funciterable[, chunksize])
  • imap_unordered(funciterable[, chunksize])
  • starmap(funciterable[, chunksize])
  • starmap_async(funciterable[, chunksize[, callback[, error_back]]])

栗子1:使用进程池(非阻塞)

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import time


def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print("end")


if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=3)
    for i in range(4):
        msg = ("hello %d" % (i))
        pool.apply_async(func, (msg,))  # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去

    print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
    pool.close()
    pool.join()  # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    print("Sub-process(es) done.")

一次执行结果为:

Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
msg: hello 0
msg: hello 1
msg: hello 2
end
end
end
msg: hello 3
end
Sub-process(es) done.

执行说明:创建一个进程池pool,并设定进程的数量为3,range(4)会相继产生四个对象[0, 1, 2, 3],四个对象被提交到pool中,因pool指定进程数为3,所以0、1、2会直接送到进程中执行,当其中一个执行完事后才空出一个进程处理对象3,所以会出现输出“msg: hello 3”出现在"end"后。因为为非阻塞,主函数会自己执行自个的,不搭理进程的执行,所以运行完for循环后直接输出“Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~”,主程序在pool.join()处等待各个进程的结束。

栗子2:使用进程池(阻塞)

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import time


def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print("end")


if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=3)
    for i in range(4):
        msg = "hello %d" % (i)
        pool.apply(func, (msg,))  # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去

    print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
    pool.close()
    pool.join()  # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    print("Sub-process(es) done.")

一次执行结果为:

msg: hello 0
end
msg: hello 1
end
msg: hello 2
end
msg: hello 3
end
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
Sub-process(es) done.

执行说明:for循环内执行的步骤顺序,往进程池中添加一个子进程,执行子进程,等待执行完毕再添加一个子进程…..等4个子进程都执行完了,再执行"Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"和"Sub-process(es) done."。

栗子3:使用进程池(非阻塞),获取子进程的返回值

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import time


def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print("end")
    return "done" + msg


if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=3)
    result = []
    for i in range(4):
        msg = "hello %d" % i
        result.append(pool.apply_async(func, (msg,)))
    pool.close()  #调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
    pool.join()
    for res in result:
        print("return: ", res.get())
    print("Sub-process(es) done.")

一次执行结果为:

msg: hello 0
msg: hello 1
msg: hello 2
end
end
end
msg: hello 3
end
return:  donehello 0
return:  donehello 1
return:  donehello 2
return:  donehello 3
Sub-process(es) done.

栗子4:使用多个进程池

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import os, time, random


def fun1():
    print("Run task 1, pid is %s" % (os.getpid()))  # os.getpid()获取当前的进程的ID
    start = time.time()
    time.sleep(random.random() * 10)  # random.random()随机生成0-1之间的小数
    end = time.time()
    print('Task 1, runs %0.2f seconds.' % (end - start))


def fun2():
    print("Run task 2, pid is %s" % (os.getpid()))
    start = time.time()
    time.sleep(random.random() * 40)
    end = time.time()
    print('Task 2 runs %0.2f seconds.' % (end - start))


def fun3():
    print("Run task 3, pid is %s" % (os.getpid()))
    start = time.time()
    time.sleep(random.random() * 30)
    end = time.time()
    print('Task 3 runs %0.2f seconds.' % (end - start))


def fun4():
    print("Run task 4, pid is %s" % (os.getpid()))
    start = time.time()
    time.sleep(random.random() * 20)
    end = time.time()
    print('Task 4 runs %0.2f seconds.' % (end - start))


if __name__ == '__main__':
    function_list = [fun1, fun2, fun3, fun4]
    print("parent process %s" % (os.getpid()))

    pool = multiprocessing.Pool(4)
    for func in function_list:
        pool.apply_async(func)  # Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中

    print('Waiting for all subprocesses done...')
    pool.close()
    pool.join()  # 调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
    print('All subprocesses done.')

一次执行结果为:

parent process 12496
Waiting for all subprocesses done...
Run task 1, pid is 6836
Run task 2, pid is 6596
Run task 3, pid is 14304
Run task 4, pid is 10900
Task 3 runs 0.81 seconds.
Task 1, runs 1.83 seconds.
Task 2 runs 8.97 seconds.
Task 4 runs 15.82 seconds.
All subprocesses done.

好了,到此为止Pool的使用讲完了。

Tags