Python多进程编程之进程间通信

by LauCyun Aug 18,2016 23:24:28 30,053 views

Process之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing模块包装了底层的机制,提供了QueuePipes等多种方式来交换数据。

1 multiprocessing.Pipe()

multiprocessing.Pipe()即管道模式,调用Pipe()返回管道的两端的Connection。

Python官方文档的描述:
Returns a pair (conn1, conn2) of Connection objects representing the ends of a pipe.

Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发;duplexFalse,conn1只负责接受消息,conn2只负责发送消息。

send()recv()方法分别是发送和接受消息的方法。一个进程从Pipe某一端输入对象,然后被Pipe另一端的进程接收,单向管道只允许管道一端的进程输入另一端的进程接收,不可以反向通信;而双向管道则允许从两端输入和从两端接收。

例如:在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError

栗子1:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import multiprocessing
import time

m = 5


def proc1(pipe):
    for i in range(m):
        print("proc1 send %s" % (i))
        pipe.send(i)
        time.sleep(1)


def proc2(pipe):
    flag = 0
    while True:
        if flag >= m - 3:
            pipe.close()  # 关闭
        flag += 1
        print("proc2 rev %s" % (pipe.recv()))
        time.sleep(1)


if __name__ == "__main__":
    pipe = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=proc1, args=(pipe[0],))
    p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))
    print("start")
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print("end")

输出结果为:

start
proc1 send 0
proc2 rev 0
proc1 send 1
proc2 rev 1
Process Process-2:
Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
proc1 send 2
    self.run()
  File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/repository/python/learn-python3/multiprocessing/pipe1.py", line 23, in proc2
    print("proc2 rev %s" % (pipe.recv()))
  File "/usr/lib/python3.5/multiprocessing/connection.py", line 248, in recv
    self._check_closed()
  File "/usr/lib/python3.5/multiprocessing/connection.py", line 136, in _check_closed
    raise OSError("handle is closed")
OSError: handle is closed
proc1 send 3
proc1 send 4
end

上面的代码中主要用到了pipesend()recv()close()方法。当pipe的输出端被关闭,且无法接收到输入的值,那么就会抛出EOFError

2 multiprocessing.Queue

Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。Queue据官方文档也是基于pipe的实现。

Queue的使用主要是一边put(),一边get(),但是Queue可以是多个Process进行put()操作,也可以是多个Process进行get()操作。

  • put方法用以插入数据到队列中,put方法还有两个可选参数:blockedtimeout。如果blockedTrue(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blockedFalse,但该Queue已满,会立即抛出Queue.Full异常。
  • get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blockedtimeout。如果blockedTrue(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blockedFalse,有两种情况存在,如果Queue有一个值可用,则立即返回该值;否则,如果队列为空,则立即抛出Queue.Empty异常。

举个栗子2,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from multiprocessing import Process, Queue
import os, time, random


# 写数据进程执行的代码:
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())


# 读数据进程执行的代码:
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)


if __name__ == '__main__':
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入:
    pw.start()
    # 启动子进程pr,读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环,无法等待其结束,只能强行终止:
    pr.terminate()

一次执行结果为:

Process to write: 41823
Put A to queue...
Process to read: 41824
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

3 总结

Pipe的读写效率要高于Queue

  • Pipe()只能是两个客户端
  • Queue()可以有多个生产者和消费者

那么我们如何的选择他们?

  • 如果你的环境是多生产者和消费者,那么你只能是选择queue队列
  • 如果两个进程间处理的逻辑简单,但是就是要求绝对的速度,那么pipe是个好选择 

具体的可以参考:Python multiprocessing - Pipe vs Queue

Python多进程之间的通信,除了上述方法消息队列Queue,管道Pipe外,还有SocketRPC等。

Tags