about 5 results (0.02 seconds)

Python多进程编程之进程锁Lock

by LauCyun Aug 19,2016 17:33:03 20,852 views

当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突。

1 不加进程锁

让我们看看没有加进程锁时会产生什么样的结果。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import time


def job(val, num):
    for _ in range(5):
        time.sleep(0.1)  # 暂停0.1秒,让输出效果更明显
        val.value += num  # val.value获取共享变量值
        print(val.value)


def multicore():
    val = multiprocessing.Value('i', 0)  # 定义共享变量
    p1 = multiprocessing.Process(target=job, args=(val, 1))
    p2 = multiprocessing.Process(target=job, args=(val, 3))  # 设定不同的number看如何抢夺内存
    p1.start()
    p2.start()
    p1.join()
    p2.join()


if __name__ == '__main__':
    multicore()

在上面的代码中,我们定义了一个共享变量val,两个进程都可以对它进行操作。 在job()中我们想让val每隔0.1秒输出一次累加num的结果,但是在两个进程p1p2 中设定了不同的累加值。所以接下来让我们来看下这两个进程是否会出现冲突。

运行一下:

3
4
7
8
11
12
15
16
19
20

我们可以看到,进程1和进程2在相互抢着使用共享内存val

2 进程锁

为了解决上述不同进程抢共享资源的问题,我们可以用加进程锁Lock来解决。

首先需要定义一个进程锁

lock = multiprocessing.Lock()  # 定义一个进程锁

然后将进程锁的信息传入各个进程中

p1 = multiprocessing.Process(target=job, args=(val, 1, lock))  # 需要将Lock传入
p2 = multiprocessing.Process(target=job, args=(val, 3, lock))  # 设定不同的number看如何抢夺内存

job()中设置进程锁的使用,保证运行时一个进程的对锁内内容的独占

def job(val, num, lock):
    lock.acquire()  # 锁住
    for _ in range(5):
        time.sleep(0.1)  # 暂停0.1秒,让输出效果更明显
        val.value += num  # val.value获取共享变量值
        print(val.value)
    lock.release()  # 释放

完整代码:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import time


def job(val, num, lock):
    lock.acquire()  # 锁住
    for _ in range(5):
        time.sleep(0.1)  # 暂停0.1秒,让输出效果更明显
        val.value += num  # val.value获取共享变量值
        print(val.value)
    lock.release()  # 释放


def multicore():
    val = multiprocessing.Value('i', 0)  # 定义共享变量
    lock = multiprocessing.Lock()  # 定义一个进程锁
    p1 = multiprocessing.Process(target=job, args=(val, 1, lock))  # 需要将Lock传入
    p2 = multiprocessing.Process(target=job, args=(val, 3, lock))  # 设定不同的number看如何抢夺内存
    p1.start()
    p2.start()
    p1.join()
    p2.join()


if __name__ == '__main__':
    multicore()

运行一下,让我们看看是否还会出现抢占资源的情况:

1
2
3
4
5
8
11
14
17
20

显然,进程锁保证了进程p1的完整运行,然后才进行了进程p2的运行

...

Tags Read More..


Python多进程编程之进程间通信

by LauCyun Aug 18,2016 23:24:28 30,261 views

Process之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing模块包装了底层的机制,提供了QueuePipes等多种方式来交换数据。

1 multiprocessing.Pipe()

multiprocessing.Pipe()即管道模式,调用Pipe()返回管道的两端的Connection。

Python官方文档的描述:
Returns a pair (conn1, conn2) of Connection objects representing the ends of a pipe.

Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发;duplexFalse,conn1只负责接受消息,conn2只负责发送消息。

send()recv()方法分别是发送和接受消息的方法。一个进程从Pipe某一端输入对象,然后被Pipe另一端的进程接收,单向管道只允许管道一端的进程输入另一端的进程接收,不可以反向通信;而双向管道则允许从两端输入和从两端接收。

例如:在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError

栗子1:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import multiprocessing
import time

m = 5


def proc1(pipe):
    for i in range(m):
        print("proc1 send %s" % (i))
        pipe.send(i)
        time.sleep(1)


def proc2(pipe):
    flag = 0
    while True:
        if flag >= m - 3:
            pipe.close()  # 关闭
        flag += 1
        print("proc2 rev %s" % (pipe.recv()))
        time.sleep(1)


if __name__ == "__main__":
    pipe = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=proc1, args=(pipe[0],))
    p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))
    print("start")
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print("end")

输出结果为:

start
proc1 send 0
proc2 rev 0
proc1 send 1
proc2 rev 1
Process Process-2:
Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
proc1 send 2
    self.run()
  File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/repository/python/learn-python3/multiprocessing/pipe1.py", line 23, in proc2
    print("proc2 rev %s" % (pipe.recv()))
  File "/usr/lib/python3.5/multiprocessing/connection.py", line 248, in recv
    self._check_closed()
  File "/usr/lib/python3.5/multiprocessing/connection.py", line 136, in _check_closed
    raise OSError("handle is closed")
OSError: handle is closed
proc1 send 3
proc1 send 4
end

上面的代码中主要用到了pipesend()recv()close()方法。当pipe的输出端被关闭,且无法接收到输入的值,那么就会抛出EOFError

2 multiprocessing.Queue

Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。Queue据官方文档也是基于pipe的实现。

Queue的使用主要是一边put(),一边get(),但是Queue可以是多个Process进行put()操作,也可以是多个Process进行get()操作。

  • put方法用以插入数据到队列中,put方法还有两个可选参数:blockedtimeout。如果blockedTrue(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blockedFalse,但该Queue已满,会立即抛出Queue.Full异常。
  • get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blockedtimeout。如果blockedTrue(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blockedFalse,有两种情况存在,如果Queue有一个值可用,则立即返回该值;否则,如果队列为空,则立即抛出Queue.Empty异常。

举个栗子2,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from multiprocessing import Process, Queue
import os, time, random


# 写数据进程执行的代码:
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())


# 读数据进程执行的代码:
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)


if __name__ == '__main__':
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入:
    pw.start()
    # 启动子进程pr,读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环,无法等待其结束,只能强行终止:
    pr.terminate()

一次执行结果为:

Process to write: 41823
Put A to queue...
Process to read: 41824
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

3 总结

Pipe的读写效率要高于Queue

  • Pipe()只能是两个客户端
  • Queue()可以有多个生产者和消费者

那么我们如何的选择他们?

  • 如果你的环境是多生产者和消费者,那么你只能是选择queue队列
  • 如果两个进程间处理的逻辑简单,但是就是要求绝对的速度,那么pipe是个好选择 

具体的可以参考:Python multiprocessing - Pipe vs Queue

Python多进程之间的通信,除了上述方法消息队列Queue,管道Pipe外,还有SocketRPC等。

...

Tags Read More..


Python多进程编程之Pool

by LauCyun Aug 17,2016 10:51:12 23,888 views

在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

构造方法

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

  • processes:工作进程数量。Pool默认大小是CPU的核数os.cpu_count(),我们也可以通过在Pool中传入processes参数即可自定义需要的核数量。
  • initializer: 如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)
  • maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
  • context:用在制定工作进程启动时的上下文,一般使用multiprocessing.Pool()或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context

方法

  • apply(func[, args[, kwds]])是阻塞的。
  • apply_async(func[, args[, kwds[, callback]]])它是非阻塞。(理解与apply区别,看栗子1和栗子2结果区别)
  • close()关闭pool,使其不在接受新的任务。
  • terminate()关闭pool,结束工作进程,不在处理未完成的任务。
  • join()主进程阻塞,等待子进程的退出, join方法要在closeterminate之后使用。
  • map(funciterable[, chunksize])
  • map_async(funciterable[, chunksize[, callback[, error_callback]]])
  • imap(funciterable[, chunksize])
  • imap_unordered(funciterable[, chunksize])
  • starmap(funciterable[, chunksize])
  • starmap_async(funciterable[, chunksize[, callback[, error_back]]])

栗子1:使用进程池(非阻塞)

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import time


def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print("end")


if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=3)
    for i in range(4):
        msg = ("hello %d" % (i))
        pool.apply_async(func, (msg,))  # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去

    print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
    pool.close()
    pool.join()  # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    print("Sub-process(es) done.")

一次执行结果为:

Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
msg: hello 0
msg: hello 1
msg: hello 2
end
end
end
msg: hello 3
end
Sub-process(es) done.

执行说明:创建一个进程池pool,并设定进程的数量为3,range(4)会相继产生四个对象[0, 1, 2, 3],四个对象被提交到pool中,因pool指定进程数为3,所以0、1、2会直接送到进程中执行,当其中一个执行完事后才空出一个进程处理对象3,所以会出现输出“msg: hello 3”出现在"end"后。因为为非阻塞,主函数会自己执行自个的,不搭理进程的执行,所以运行完for循环后直接输出“Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~”,主程序在pool.join()处等待各个进程的结束。

栗子2:使用进程池(阻塞)

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import time


def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print("end")


if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=3)
    for i in range(4):
        msg = "hello %d" % (i)
        pool.apply(func, (msg,))  # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去

    print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
    pool.close()
    pool.join()  # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    print("Sub-process(es) done.")

一次执行结果为:

msg: hello 0
end
msg: hello 1
end
msg: hello 2
end
msg: hello 3
end
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
Sub-process(es) done.

执行说明:for循环内执行的步骤顺序,往进程池中添加一个子进程,执行子进程,等待执行完毕再添加一个子进程…..等4个子进程都执行完了,再执行"Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"和"Sub-process(es) done."。

栗子3:使用进程池(非阻塞),获取子进程的返回值

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import time


def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print("end")
    return "done" + msg


if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=3)
    result = []
    for i in range(4):
        msg = "hello %d" % i
        result.append(pool.apply_async(func, (msg,)))
    pool.close()  #调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
    pool.join()
    for res in result:
        print("return: ", res.get())
    print("Sub-process(es) done.")

一次执行结果为:

msg: hello 0
msg: hello 1
msg: hello 2
end
end
end
msg: hello 3
end
return:  donehello 0
return:  donehello 1
return:  donehello 2
return:  donehello 3
Sub-process(es) done.

栗子4:使用多个进程池

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import os, time, random


def fun1():
    print("Run task 1, pid is %s" % (os.getpid()))  # os.getpid()获取当前的进程的ID
    start = time.time()
    time.sleep(random.random() * 10)  # random.random()随机生成0-1之间的小数
    end = time.time()
    print('Task 1, runs %0.2f seconds.' % (end - start))


def fun2():
    print("Run task 2, pid is %s" % (os.getpid()))
    start = time.time()
    time.sleep(random.random() * 40)
    end = time.time()
    print('Task 2 runs %0.2f seconds.' % (end - start))


def fun3():
    print("Run task 3, pid is %s" % (os.getpid()))
    start = time.time()
    time.sleep(random.random() * 30)
    end = time.time()
    print('Task 3 runs %0.2f seconds.' % (end - start))


def fun4():
    print("Run task 4, pid is %s" % (os.getpid()))
    start = time.time()
    time.sleep(random.random() * 20)
    end = time.time()
    print('Task 4 runs %0.2f seconds.' % (end - start))


if __name__ == '__main__':
    function_list = [fun1, fun2, fun3, fun4]
    print("parent process %s" % (os.getpid()))

    pool = multiprocessing.Pool(4)
    for func in function_list:
        pool.apply_async(func)  # Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中

    print('Waiting for all subprocesses done...')
    pool.close()
    pool.join()  # 调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
    print('All subprocesses done.')

一次执行结果为:

parent process 12496
Waiting for all subprocesses done...
Run task 1, pid is 6836
Run task 2, pid is 6596
Run task 3, pid is 14304
Run task 4, pid is 10900
Task 3 runs 0.81 seconds.
Task 1, runs 1.83 seconds.
Task 2 runs 8.97 seconds.
Task 4 runs 15.82 seconds.
All subprocesses done.

好了,到此为止Pool的使用讲完了。

...

Tags Read More..


Python多进程编程之Process

by LauCyun Aug 14,2016 12:13:34 11,668 views

利用multiprocessing.Process对象可以创建一个进程,该Process对象与Thread对象的用法相同,也有start()run()join()等方法。Process类适合简单的进程创建,如需资源共享可以结合multiprocessing.Queue使用;如果想要控制进程数量,则建议使用进程池Pool类。

构造方法:

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

  • target表示调用对象
  • args表示调用对象的位置参数元组
  • kwargs表示调用对象的字典
  • name为别名
  • group实质上不使用

方法:

  • is_alive():返回进程是否在运行
  • join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)
  • start():进程准备就绪,等待CPU调度
  • run()strat()调用run方法,如果实例进程时未制定传入target,这start()执行默认run()方法
  • terminate():不管任务是否完成,立即停止工作进程

属性:

  • authkey:进程的认证密钥
  • daemon:和线程的setDeamon功能一样(将父进程设置为守护进程,当父进程结束时,子进程也结束),必须在start()之前设置
  • exitcode:进程在运行时为None、如果为–N,表示被信号N结束
  • name:进程名字
  • pid:进程号

使用Process创建子进程

举个栗子1:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import time


def worker(interval):
    n = 5
    while n > 0:
        print("The time is {0}".format(time.ctime()))
        time.sleep(interval)
        n -= 1


if __name__ == "__main__":
    p = multiprocessing.Process(target=worker, args=(3,))
    p.start()
    print("p.pid:", p.pid)
    print("p.name:", p.name)
    print("p.is_alive:", p.is_alive())

输出如下:

p.pid: 6376
p.name: Process-1
p.is_alive: True
The time is Thu Aug 14 09:07:16 2016
The time is Thu Aug 14 09:07:19 2016
The time is Thu Aug 14 09:07:22 2016
The time is Thu Aug 14 09:07:25 2016
The time is Thu Aug 14 09:07:28 2016

栗子2:创建函数并将其作为多个进程

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import time


def worker_1(interval):
    print("worker_1")
    time.sleep(interval)
    print("end worker_1")


def worker_2(interval):
    print("worker_2")
    time.sleep(interval)
    print("end worker_2")


def worker_3(interval):
    print("worker_3")
    time.sleep(interval)
    print("end worker_3")


if __name__ == "__main__":
    p1 = multiprocessing.Process(target=worker_1, args=(2,))
    p2 = multiprocessing.Process(target=worker_2, args=(3,))
    p3 = multiprocessing.Process(target=worker_3, args=(4,))

    p1.start()
    p2.start()
    p3.start()

    print("The number of CPU is:" + str(multiprocessing.cpu_count()))
    for p in multiprocessing.active_children():
        print("child   p.name:" + p.name + "\tp.id" + str(p.pid))
    print("END!!!!!!!!!!!!!!!!!")

输出结果如下:

The number of CPU is:8
child: p.name=Process-1, p.id=7872
child: p.name=Process-2, p.id=10872
child: p.name=Process-3, p.id=132
END!!!!!!!!!!!!!!!!!
worker_1
worker_3
worker_2
end worker_1
end worker_2
end worker_3

使用Process类继承创建子进程

现在将栗子1修改一下,将进程定义为类,如栗子3:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import time


class ClockProcess(multiprocessing.Process):
    def __init__(self, interval):
        multiprocessing.Process.__init__(self)
        self.interval = interval

    def run(self):
        n = 5
        while n > 0:
            print("the time is {0}".format(time.ctime()))
            time.sleep(self.interval)
            n -= 1


if __name__ == '__main__':
    p = ClockProcess(3)
    p.start()  # 进程p调用start()时,自动调用run()
    print("p.pid:", p.pid)
    print("p.name:", p.name)
    print("p.is_alive:", p.is_alive())

输出结果为:

p.pid: 1004
p.name: ClockProcess-1
p.is_alive: True
the time is Thu Aug 14 09:31:40 2016
the time is Thu Aug 14 09:31:43 2016
the time is Thu Aug 14 09:31:46 2016
the time is Thu Aug 14 09:31:49 2016
the time is Thu Aug 14 09:31:52 2016

如何使用daemon

最后,来讲一讲daemon,它是进程的守护神,当它退出时,它会尝试终止其所有的守护进程子进程。但是必须在调用start()之前设置。下面举栗子来说明:

栗子4.1:不加daemon属性

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import time


def worker(interval):
    print("work start:{0}".format(time.ctime()))
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()))


if __name__ == "__main__":
    p = multiprocessing.Process(target=worker, args=(3,))
    p.start()
    print("end!")

输出结果为:

end!
work start:Thu Aug 14 09:38:36 2016
work end:Thu Aug 14 09:38:39 2016

栗子4.2:使用daemon属性

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import time


def worker(interval):
    print("work start:{0}".format(time.ctime()))
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()))


if __name__ == "__main__":
    p = multiprocessing.Process(target=worker, args=(3,))
    p.daemon = True
    p.start()
    print("end!")

输出结果为:

end!

子进程没起作用,不可思议吧!那是因为它结束时,它会尝试终止其所有的守护进程子进程。

栗子4.3:设置daemon执行完结束的方法

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import time


def worker(interval):
    print("work start:{0}".format(time.ctime()))
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()))


if __name__ == "__main__":
    p = multiprocessing.Process(target=worker, args=(3,))
    p.daemon = True
    p.start()
    p.join()
    print("end!")

输出结果为:

work start:Thu Aug 14 09:45:24 2016
work end:Thu Aug 14 09:45:27 2016
end!

...

Tags Read More..


Python多进程编程之基础

by LauCyun Aug 13,2016 10:18:48 17,648 views

Python提供了非常好用的多进程包multiprocessing,只需要定义一个函数,Python会完成其他所有事情。借助这个包,可以轻松完成从单进程到并发执行的转换。multiprocessing支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。

为什么要使用python多进程?

因为python使用全局解释器锁(GIL),他会将进程中的线程序列化,也就是多核cpu实际上并不能达到并行提高速度的目的,而使用多进程则是不受限的,所以实际应用中都是推荐多进程的。

如果每个子进程执行需要消耗的时间非常短(执行+1操作等),这不必使用多进程,因为进程的启动关闭也会耗费资源。

当然使用多进程往往是用来处理CPU密集型(科学计算)的需求,如果是IO密集型(文件读取,爬虫等)则可以使用多线程去处理。

1 fork

Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。

子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。

Python的os模块封装了常见的系统调用,其中就包括fork,可以在Python程序中轻松创建子进程:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os

print('Process (%s) start...' % os.getpid())
pid = os.fork()
if pid==0:
    print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
    print('I (%s) just created a child process (%s).' % (os.getpid(), pid))

运行结果如下:

Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.

由于Windows没有fork调用,上面的代码在Windows上无法运行。由于Mac系统是基于BSD(Unix的一种)内核,所以,在Mac下运行是没有问题的,推荐大家用Mac学Python!

有了fork调用,一个进程在接到新任务时就可以复制出一个子进程来处理新任务,常见的Apache服务器就是由父进程监听端口,每当有新的http请求时,就fork出子进程来处理新的http请求。

2 multiprocessing

2.1 multiprocessing常用组件及功能

创建管理进程模块:

同步子进程模块:

  • Condition
  • Event:用来实现进程间同步通信
  • Lock:当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突,具体请参考:Python多进程编程之进程锁Lock
  • RLock
  • Semaphore:用来控制对共享资源的访问数量,例如池的最大连接数。
2.2  多进程资源共享

多进程中不推荐使用资源共享,如果非要使用,具体请参考:Python多进程编程之多进程资源共享

2.3 获取子进程返回值

多进程中往往会碰到获取子进程返回值的问题,具体介绍请参考:Python多进程编程之Pool的栗子3

2.4 Semaphore

Semaphore用来控制对共享资源的访问数量,例如池的最大连接数。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import time


def worker(s, i):
    s.acquire()
    print(multiprocessing.current_process().name + " acquire")
    time.sleep(i)
    print(multiprocessing.current_process().name + " release\n")
    s.release()


if __name__ == "__main__":
    s = multiprocessing.Semaphore(2)
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(s, i * 2))
        p.start()

输出结果为:

Process-4 acquire
Process-1 acquire
Process-1 release

Process-3 acquire
Process-3 release

Process-2 acquire
Process-4 release

Process-5 acquire
Process-2 release

Process-5 release

上面的实例中使用semaphore限制了最多有2个进程同时执行。

2.6 Event

Event用来实现进程间同步通信。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import time


def wait_for_event(e):
    print("wait_for_event: starting")
    e.wait()
    print("wairt_for_event: e.is_set()->" + str(e.is_set()))


def wait_for_event_timeout(e, t):
    print("wait_for_event_timeout:starting")
    e.wait(t)
    print("wait_for_event_timeout:e.is_set->" + str(e.is_set()))


if __name__ == "__main__":
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(name="block",
                                 target=wait_for_event,
                                 args=(e,))

    w2 = multiprocessing.Process(name="non-block",
                                 target=wait_for_event_timeout,
                                 args=(e, 2))
    w1.start()
    w2.start()

    time.sleep(3)

    e.set()
    print("main: event is set")

输出结果为:

wait_for_event_timeout:starting
wait_for_event: starting
wait_for_event_timeout:e.is_set->False
main: event is set
wairt_for_event: e.is_set()->True

在Unix/Linux下,multiprocessing模块封装了fork()调用,使我们不需要关注fork()的细节。由于Windows没有fork调用,因此,multiprocessing需要“模拟”出fork的效果,父进程所有Python对象都必须通过pickle序列化再传到子进程去,所有,如果multiprocessing在Windows下调用失败了,要先考虑是不是pickle失败了。

3 subprocess

很多时候,子进程并不是自身,而是一个外部进程。我们创建了子进程后,还需要控制子进程的输入和输出。

subprocess模块可以让我们非常方便地启动一个子进程,然后控制其输入和输出。

下面的例子演示了如何在Python代码中运行命令nslookup www.python.org,这和命令行直接运行的效果是一样的:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import subprocess

print('$ nslookup www.liuker.org')
r = subprocess.call(['nslookup', 'www.liuker.org'])
print('Exit code:', r)

运行结果:

$ nslookup www.liuker.org
Server:		127.0.1.1
Address:	127.0.1.1#53

Non-authoritative answer:
Name:	www.liuker.org
Address: 52.192.214.10

Exit code: 0

如果子进程还需要输入,则可以通过communicate()方法输入:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import subprocess

print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\liuker.org\nexit\n')
print(output.decode('utf-8'))
print('Exit code:', p.returncode)

上面的代码相当于在命令行执行命令nslookup,然后手动输入:

set q=mx
python.org
exit

运行结果如下:

$ nslookup
Server:		127.0.1.1
Address:	127.0.1.1#53

Non-authoritative answer:
*** Can't find liuker.org: No answer

Authoritative answers can be found from:
liuker.org
	origin = dns9.hichina.com
	mail addr = hostmaster.hichina.com
	serial = 2016101611
	refresh = 3600
	retry = 1200
	expire = 3600
	minimum = 360


Exit code: 0

小结

  • 在Unix/Linux下,可以使用fork()调用实现多进程。
  • 要实现跨平台的多进程,可以使用multiprocessingsubprocess模块。
  • 进程间通信是通过QueuePipes等实现的。

...

Tags Read More..