Python多进程编程之基础

by LauCyun Aug 13,2016 10:18:48 16,657 views

Python提供了非常好用的多进程包multiprocessing,只需要定义一个函数,Python会完成其他所有事情。借助这个包,可以轻松完成从单进程到并发执行的转换。multiprocessing支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。

为什么要使用python多进程?

因为python使用全局解释器锁(GIL),他会将进程中的线程序列化,也就是多核cpu实际上并不能达到并行提高速度的目的,而使用多进程则是不受限的,所以实际应用中都是推荐多进程的。

如果每个子进程执行需要消耗的时间非常短(执行+1操作等),这不必使用多进程,因为进程的启动关闭也会耗费资源。

当然使用多进程往往是用来处理CPU密集型(科学计算)的需求,如果是IO密集型(文件读取,爬虫等)则可以使用多线程去处理。

1 fork

Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。

子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。

Python的os模块封装了常见的系统调用,其中就包括fork,可以在Python程序中轻松创建子进程:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os

print('Process (%s) start...' % os.getpid())
pid = os.fork()
if pid==0:
    print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
    print('I (%s) just created a child process (%s).' % (os.getpid(), pid))

运行结果如下:

Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.

由于Windows没有fork调用,上面的代码在Windows上无法运行。由于Mac系统是基于BSD(Unix的一种)内核,所以,在Mac下运行是没有问题的,推荐大家用Mac学Python!

有了fork调用,一个进程在接到新任务时就可以复制出一个子进程来处理新任务,常见的Apache服务器就是由父进程监听端口,每当有新的http请求时,就fork出子进程来处理新的http请求。

2 multiprocessing

2.1 multiprocessing常用组件及功能

创建管理进程模块:

同步子进程模块:

  • Condition
  • Event:用来实现进程间同步通信
  • Lock:当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突,具体请参考:Python多进程编程之进程锁Lock
  • RLock
  • Semaphore:用来控制对共享资源的访问数量,例如池的最大连接数。
2.2  多进程资源共享

多进程中不推荐使用资源共享,如果非要使用,具体请参考:Python多进程编程之多进程资源共享

2.3 获取子进程返回值

多进程中往往会碰到获取子进程返回值的问题,具体介绍请参考:Python多进程编程之Pool的栗子3

2.4 Semaphore

Semaphore用来控制对共享资源的访问数量,例如池的最大连接数。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import time


def worker(s, i):
    s.acquire()
    print(multiprocessing.current_process().name + " acquire")
    time.sleep(i)
    print(multiprocessing.current_process().name + " release\n")
    s.release()


if __name__ == "__main__":
    s = multiprocessing.Semaphore(2)
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(s, i * 2))
        p.start()

输出结果为:

Process-4 acquire
Process-1 acquire
Process-1 release

Process-3 acquire
Process-3 release

Process-2 acquire
Process-4 release

Process-5 acquire
Process-2 release

Process-5 release

上面的实例中使用semaphore限制了最多有2个进程同时执行。

2.6 Event

Event用来实现进程间同步通信。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import time


def wait_for_event(e):
    print("wait_for_event: starting")
    e.wait()
    print("wairt_for_event: e.is_set()->" + str(e.is_set()))


def wait_for_event_timeout(e, t):
    print("wait_for_event_timeout:starting")
    e.wait(t)
    print("wait_for_event_timeout:e.is_set->" + str(e.is_set()))


if __name__ == "__main__":
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(name="block",
                                 target=wait_for_event,
                                 args=(e,))

    w2 = multiprocessing.Process(name="non-block",
                                 target=wait_for_event_timeout,
                                 args=(e, 2))
    w1.start()
    w2.start()

    time.sleep(3)

    e.set()
    print("main: event is set")

输出结果为:

wait_for_event_timeout:starting
wait_for_event: starting
wait_for_event_timeout:e.is_set->False
main: event is set
wairt_for_event: e.is_set()->True

在Unix/Linux下,multiprocessing模块封装了fork()调用,使我们不需要关注fork()的细节。由于Windows没有fork调用,因此,multiprocessing需要“模拟”出fork的效果,父进程所有Python对象都必须通过pickle序列化再传到子进程去,所有,如果multiprocessing在Windows下调用失败了,要先考虑是不是pickle失败了。

3 subprocess

很多时候,子进程并不是自身,而是一个外部进程。我们创建了子进程后,还需要控制子进程的输入和输出。

subprocess模块可以让我们非常方便地启动一个子进程,然后控制其输入和输出。

下面的例子演示了如何在Python代码中运行命令nslookup www.python.org,这和命令行直接运行的效果是一样的:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import subprocess

print('$ nslookup www.liuker.org')
r = subprocess.call(['nslookup', 'www.liuker.org'])
print('Exit code:', r)

运行结果:

$ nslookup www.liuker.org
Server:		127.0.1.1
Address:	127.0.1.1#53

Non-authoritative answer:
Name:	www.liuker.org
Address: 52.192.214.10

Exit code: 0

如果子进程还需要输入,则可以通过communicate()方法输入:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import subprocess

print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\liuker.org\nexit\n')
print(output.decode('utf-8'))
print('Exit code:', p.returncode)

上面的代码相当于在命令行执行命令nslookup,然后手动输入:

set q=mx
python.org
exit

运行结果如下:

$ nslookup
Server:		127.0.1.1
Address:	127.0.1.1#53

Non-authoritative answer:
*** Can't find liuker.org: No answer

Authoritative answers can be found from:
liuker.org
	origin = dns9.hichina.com
	mail addr = hostmaster.hichina.com
	serial = 2016101611
	refresh = 3600
	retry = 1200
	expire = 3600
	minimum = 360


Exit code: 0

小结

  • 在Unix/Linux下,可以使用fork()调用实现多进程。
  • 要实现跨平台的多进程,可以使用multiprocessingsubprocess模块。
  • 进程间通信是通过QueuePipes等实现的。

Tags