The result of tag: (6 results)

Python多进程编程之进程锁Lock

by LauCyun Aug 19,2016 17:33:03 19,333 views

当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突。

1 不加进程锁

让我们看看没有加进程锁时会产生什么样的结果。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import time


def job(val, num):
    for _ in range(5):
        time.sleep(0.1)  # 暂停0.1秒,让输出效果更明显
        val.value += num  # val.value获取共享变量值
        print(val.value)


def multicore():
    val = multiprocessing.Value('i', 0)  # 定义共享变量
    p1 = multiprocessing.Process(target=job, args=(val, 1))
    p2 = multiprocessing.Process(target=job, args=(val, 3))  # 设定不同的number看如何抢夺内存
    p1.start()
    p2.start()
    p1.join()
    p2.join()


if __name__ == '__main__':
    multicore()

在上面的代码中,我们定义了一个共享变量val,两个进程都可以对它进行操作。 在job()中我们想让val每隔0.1秒输出一次累加num的结果,但是在两个进程p1p2 中设定了不同的累加值。所以接下来让我们来看下这两个进程是否会出现冲突。

运行一下:

3
4
7
8
11
12
15
16
19
20

我们可以看到,进程1和进程2在相互抢着使用共享内存val

2 进程锁

为了解决上述不同进程抢共享资源的问题,我们可以用加进程锁Lock来解决。

首先需要定义一个进程锁

lock = multiprocessing.Lock()  # 定义一个进程锁

然后将进程锁的信息传入各个进程中

p1 = multiprocessing.Process(target=job, args=(val, 1, lock))  # 需要将Lock传入
p2 = multiprocessing.Process(target=job, args=(val, 3, lock))  # 设定不同的number看如何抢夺内存

job()中设置进程锁的使用,保证运行时一个进程的对锁内内容的独占

def job(val, num, lock):
    lock.acquire()  # 锁住
    for _ in range(5):
        time.sleep(0.1)  # 暂停0.1秒,让输出效果更明显
        val.value += num  # val.value获取共享变量值
        print(val.value)
    lock.release()  # 释放

完整代码:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import time


def job(val, num, lock):
    lock.acquire()  # 锁住
    for _ in range(5):
        time.sleep(0.1)  # 暂停0.1秒,让输出效果更明显
        val.value += num  # val.value获取共享变量值
        print(val.value)
    lock.release()  # 释放


def multicore():
    val = multiprocessing.Value('i', 0)  # 定义共享变量
    lock = multiprocessing.Lock()  # 定义一个进程锁
    p1 = multiprocessing.Process(target=job, args=(val, 1, lock))  # 需要将Lock传入
    p2 = multiprocessing.Process(target=job, args=(val, 3, lock))  # 设定不同的number看如何抢夺内存
    p1.start()
    p2.start()
    p1.join()
    p2.join()


if __name__ == '__main__':
    multicore()

运行一下,让我们看看是否还会出现抢占资源的情况:

1
2
3
4
5
8
11
14
17
20

显然,进程锁保证了进程p1的完整运行,然后才进行了进程p2的运行

...

Tags Read More


Python多进程编程之共享资源

by LauCyun Aug 19,2016 00:20:30 29,904 views

在使用多进程的过程中,我们应该尽量避免多进程共享资源。多进程共享资源必然会带来进程间相互竞争。而这种竞争又会造成race condition,我们的结果有可能被竞争的不确定性所影响。

如果非得使用,multiprocessing类中共享资源可以使用3种方式,分别是Queue(具体请参考:Python多进程编程之进程间通信中multiprocessing.Queue部分)、ValueArrayManager。这三个都是multiprocessing自带的组件,使用起来也非常方便。注意:普通的全局变量是不能被子进程所共享的,只有通过multiprocessing组件构造的数据结构可以被共享。

1 共享内存

在python的multiprocessing中,有ValueArray类,可以和共享内存交互,来实现在进程之间共享数据。

举个栗子1:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing as mp
import time


def job(val, arr, p):
    for _ in range(len(arr)):
        val.value += p
        arr[i] += p
        print('proces %s, val=%s, arr=%s' % (p, val.value, arr[:]))
        time.sleep(0.5)  # 暂停0.1秒,让输出效果更明显


if __name__ == '__main__':
    val = mp.Value('d', 3.14)  # Value
    arr = mp.Array('i', range(3))  # Array

    print('val=%s, arr=%s' % (val.value, arr[:]))
    print("start...")

    p1 = mp.Process(target=job, args=(val, arr, 1))
    p2 = mp.Process(target=job, args=(val, arr, 2))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

    print('val=%s, arr=%s' % (val.value, arr[:]))
    print("done...")

在上面的代码中,我们定义了ValueArray两个对象的共享变量valarr,对象val被设置成为双精度数d, 并初始化为3.14。而arr则类似于C中的数组,有固定的类型i。两个进程都可以对它进行操作。 在job()中我们想让valarr每隔0.5秒输出一次结果,但是在两个进程p1p2 中设定了不同的累加值。所以接下来让我们来看下这两个进程是否会出现冲突。

运行一下的结果为:

val=3.14, arr=[0, 1, 2]
start...
proces 2, val=5.140000000000001, arr=[0, 3, 2]
proces 1, val=6.140000000000001, arr=[0, 4, 2]
proces 2, val=8.14, arr=[0, 6, 2]
proces 1, val=9.14, arr=[0, 7, 2]
proces 2, val=11.14, arr=[0, 9, 2]
proces 1, val=12.14, arr=[0, 10, 2]
val=12.14, arr=[0, 10, 2]
done...

我们可以看到了两个对象的值改变,说明资源确实在两个进程之间共享。

但是,这种方法无法与Pool一起使用。还是以上面的栗子为栗:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing as mp
import time


def job(val, arr, p):
    for _ in range(len(arr)):
        val.value += p
        arr[i] += p
        print('proces %s, val=%s, arr=%s' % (p, val.value, arr[:]))
        time.sleep(0.5)  # 暂停0.1秒,让输出效果更明显


if __name__ == '__main__':
    val = mp.Value('d', 3.14)  # Value
    arr = mp.Array('i', range(3))  # Array

    print('val=%s, arr=%s' % (val.value, arr[:]))
    print("start...")

    pool = mp.Pool(processes=3)
    for i in range(2):
        pool.apply_async(job, (val, arr))

    print('val=%s, arr=%s' % (val.value, arr[:]))
    print("done...")

运行结果为:

val=3.14, arr=[0, 1, 2]
start...
val=3.14, arr=[0, 1, 2]
done...

从运行结果上看,两个进程并不起作用。

2 Manager

Manager对象类似于服务器与客户之间的通信 (server-client),与我们在Internet上的活动很类似。我们用一个进程作为服务器,建立Manager来真正存放资源。其它的进程可以通过参数传递或者根据地址来访问Manager,建立连接后,操作服务器上的资源。在防火墙允许的情况下,我们完全可以将Manager运用于多计算机,从而模仿了一个真实的网络情境。

举个栗子2:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing as mp
import time

l = mp.Manager().list()  # 定义可被子进程共享的全局变量l


def job(i, msg):
    print("msg:", msg)
    l.append(i)
    time.sleep(1)


if __name__ == "__main__":
    pool = mp.Pool(processes=5)
    for i in range(10):
        msg = ("hello %d" % i)
        pool.apply_async(job, args=(i, msg))  # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
    pool.close()
    pool.join()  # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    print(l)
    print("Sub-process(es) done.")

Manager利用list()方法提供了表的共享方式。实际上你可以利用dict()来共享词典,Lock()来共享threading.Lock(注意,我们共享的是threading.Lock,而不是进程的mutiprocessing.Lock。后者本身已经实现了进程共享)等。 这样Manager就允许我们共享更多样的对象。

运行结果为:

msg: hello 1
msg: hello 0
msg: hello 2
msg: hello 3
msg: hello 4
msg: hello 5
msg: hello 6
msg: hello 7
msg: hello 8
msg: hello 9
[1, 0, 2, 3, 4, 5, 6, 7, 8, 9]
Sub-process(es) done.

附录:各参数代表的数据类型

Type code C Type Python Type Minimum size in bytes Notes
'b' signed char int 1  
'B' unsigned char int 1  
'u' Py_UNICODE Unicode character 2 (1)
'h' signed short int 2  
'H' unsigned short int 2  
'i' signed int int 2  
'I' unsigned int int 2  
'l' signed long int 4  
'L' unsigned long int 4  
'q' signed long long int 8 (2)
'Q' unsigned long long int 8 (2)
'f' float float 4  
'd' double float 8  

Notes:

  1. The 'u' type code corresponds to Python’s obsolete unicode character (Py_UNICODE which is wchar_t). Depending on the platform, it can be 16 bits or 32 bits.

    'u' will be removed together with the rest of the Py_UNICODE API.

    Deprecated since version 3.3, will be removed in version 4.0.

  2. The 'q' and 'Q' type codes are available only if the platform C compiler used to build Python supports C long long, or, on Windows, __int64.

    New in version 3.3.

(来源:array — Efficient arrays of numeric values

...

Tags Read More


Python多进程编程之进程间通信

by LauCyun Aug 18,2016 23:24:28 25,947 views

Process之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing模块包装了底层的机制,提供了QueuePipes等多种方式来交换数据。

1 multiprocessing.Pipe()

multiprocessing.Pipe()即管道模式,调用Pipe()返回管道的两端的Connection。

Python官方文档的描述:
Returns a pair (conn1, conn2) of Connection objects representing the ends of a pipe.

Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发;duplexFalse,conn1只负责接受消息,conn2只负责发送消息。

send()recv()方法分别是发送和接受消息的方法。一个进程从Pipe某一端输入对象,然后被Pipe另一端的进程接收,单向管道只允许管道一端的进程输入另一端的进程接收,不可以反向通信;而双向管道则允许从两端输入和从两端接收。

例如:在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError

栗子1:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import multiprocessing
import time

m = 5


def proc1(pipe):
    for i in range(m):
        print("proc1 send %s" % (i))
        pipe.send(i)
        time.sleep(1)


def proc2(pipe):
    flag = 0
    while True:
        if flag >= m - 3:
            pipe.close()  # 关闭
        flag += 1
        print("proc2 rev %s" % (pipe.recv()))
        time.sleep(1)


if __name__ == "__main__":
    pipe = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=proc1, args=(pipe[0],))
    p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))
    print("start")
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print("end")

输出结果为:

start
proc1 send 0
proc2 rev 0
proc1 send 1
proc2 rev 1
Process Process-2:
Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
proc1 send 2
    self.run()
  File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/repository/python/learn-python3/multiprocessing/pipe1.py", line 23, in proc2
    print("proc2 rev %s" % (pipe.recv()))
  File "/usr/lib/python3.5/multiprocessing/connection.py", line 248, in recv
    self._check_closed()
  File "/usr/lib/python3.5/multiprocessing/connection.py", line 136, in _check_closed
    raise OSError("handle is closed")
OSError: handle is closed
proc1 send 3
proc1 send 4
end

上面的代码中主要用到了pipesend()recv()close()方法。当pipe的输出端被关闭,且无法接收到输入的值,那么就会抛出EOFError

2 multiprocessing.Queue

Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。Queue据官方文档也是基于pipe的实现。

Queue的使用主要是一边put(),一边get(),但是Queue可以是多个Process进行put()操作,也可以是多个Process进行get()操作。

  • put方法用以插入数据到队列中,put方法还有两个可选参数:blockedtimeout。如果blockedTrue(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blockedFalse,但该Queue已满,会立即抛出Queue.Full异常。
  • get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blockedtimeout。如果blockedTrue(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blockedFalse,有两种情况存在,如果Queue有一个值可用,则立即返回该值;否则,如果队列为空,则立即抛出Queue.Empty异常。

举个栗子2,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from multiprocessing import Process, Queue
import os, time, random


# 写数据进程执行的代码:
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())


# 读数据进程执行的代码:
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)


if __name__ == '__main__':
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入:
    pw.start()
    # 启动子进程pr,读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环,无法等待其结束,只能强行终止:
    pr.terminate()

一次执行结果为:

Process to write: 41823
Put A to queue...
Process to read: 41824
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

3 总结

Pipe的读写效率要高于Queue

  • Pipe()只能是两个客户端
  • Queue()可以有多个生产者和消费者

那么我们如何的选择他们?

  • 如果你的环境是多生产者和消费者,那么你只能是选择queue队列
  • 如果两个进程间处理的逻辑简单,但是就是要求绝对的速度,那么pipe是个好选择 

具体的可以参考:Python multiprocessing - Pipe vs Queue

Python多进程之间的通信,除了上述方法消息队列Queue,管道Pipe外,还有SocketRPC等。

...

Tags Read More


Python多进程编程之Pool

by LauCyun Aug 17,2016 10:51:12 21,550 views

在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

构造方法

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

  • processes:工作进程数量。Pool默认大小是CPU的核数os.cpu_count(),我们也可以通过在Pool中传入processes参数即可自定义需要的核数量。
  • initializer: 如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)
  • maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
  • context:用在制定工作进程启动时的上下文,一般使用multiprocessing.Pool()或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context

方法

  • apply(func[, args[, kwds]])是阻塞的。
  • apply_async(func[, args[, kwds[, callback]]])它是非阻塞。(理解与apply区别,看栗子1和栗子2结果区别)
  • close()关闭pool,使其不在接受新的任务。
  • terminate()关闭pool,结束工作进程,不在处理未完成的任务。
  • join()主进程阻塞,等待子进程的退出, join方法要在closeterminate之后使用。
  • map(funciterable[, chunksize])
  • map_async(funciterable[, chunksize[, callback[, error_callback]]])
  • imap(funciterable[, chunksize])
  • imap_unordered(funciterable[, chunksize])
  • starmap(funciterable[, chunksize])
  • starmap_async(funciterable[, chunksize[, callback[, error_back]]])

栗子1:使用进程池(非阻塞)

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import time


def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print("end")


if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=3)
    for i in range(4):
        msg = ("hello %d" % (i))
        pool.apply_async(func, (msg,))  # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去

    print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
    pool.close()
    pool.join()  # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    print("Sub-process(es) done.")

一次执行结果为:

Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
msg: hello 0
msg: hello 1
msg: hello 2
end
end
end
msg: hello 3
end
Sub-process(es) done.

执行说明:创建一个进程池pool,并设定进程的数量为3,range(4)会相继产生四个对象[0, 1, 2, 3],四个对象被提交到pool中,因pool指定进程数为3,所以0、1、2会直接送到进程中执行,当其中一个执行完事后才空出一个进程处理对象3,所以会出现输出“msg: hello 3”出现在"end"后。因为为非阻塞,主函数会自己执行自个的,不搭理进程的执行,所以运行完for循环后直接输出“Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~”,主程序在pool.join()处等待各个进程的结束。

栗子2:使用进程池(阻塞)

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import time


def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print("end")


if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=3)
    for i in range(4):
        msg = "hello %d" % (i)
        pool.apply(func, (msg,))  # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去

    print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
    pool.close()
    pool.join()  # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    print("Sub-process(es) done.")

一次执行结果为:

msg: hello 0
end
msg: hello 1
end
msg: hello 2
end
msg: hello 3
end
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
Sub-process(es) done.

执行说明:for循环内执行的步骤顺序,往进程池中添加一个子进程,执行子进程,等待执行完毕再添加一个子进程…..等4个子进程都执行完了,再执行"Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"和"Sub-process(es) done."。

栗子3:使用进程池(非阻塞),获取子进程的返回值

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import time


def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print("end")
    return "done" + msg


if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=3)
    result = []
    for i in range(4):
        msg = "hello %d" % i
        result.append(pool.apply_async(func, (msg,)))
    pool.close()  #调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
    pool.join()
    for res in result:
        print("return: ", res.get())
    print("Sub-process(es) done.")

一次执行结果为:

msg: hello 0
msg: hello 1
msg: hello 2
end
end
end
msg: hello 3
end
return:  donehello 0
return:  donehello 1
return:  donehello 2
return:  donehello 3
Sub-process(es) done.

栗子4:使用多个进程池

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import os, time, random


def fun1():
    print("Run task 1, pid is %s" % (os.getpid()))  # os.getpid()获取当前的进程的ID
    start = time.time()
    time.sleep(random.random() * 10)  # random.random()随机生成0-1之间的小数
    end = time.time()
    print('Task 1, runs %0.2f seconds.' % (end - start))


def fun2():
    print("Run task 2, pid is %s" % (os.getpid()))
    start = time.time()
    time.sleep(random.random() * 40)
    end = time.time()
    print('Task 2 runs %0.2f seconds.' % (end - start))


def fun3():
    print("Run task 3, pid is %s" % (os.getpid()))
    start = time.time()
    time.sleep(random.random() * 30)
    end = time.time()
    print('Task 3 runs %0.2f seconds.' % (end - start))


def fun4():
    print("Run task 4, pid is %s" % (os.getpid()))
    start = time.time()
    time.sleep(random.random() * 20)
    end = time.time()
    print('Task 4 runs %0.2f seconds.' % (end - start))


if __name__ == '__main__':
    function_list = [fun1, fun2, fun3, fun4]
    print("parent process %s" % (os.getpid()))

    pool = multiprocessing.Pool(4)
    for func in function_list:
        pool.apply_async(func)  # Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中

    print('Waiting for all subprocesses done...')
    pool.close()
    pool.join()  # 调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
    print('All subprocesses done.')

一次执行结果为:

parent process 12496
Waiting for all subprocesses done...
Run task 1, pid is 6836
Run task 2, pid is 6596
Run task 3, pid is 14304
Run task 4, pid is 10900
Task 3 runs 0.81 seconds.
Task 1, runs 1.83 seconds.
Task 2 runs 8.97 seconds.
Task 4 runs 15.82 seconds.
All subprocesses done.

好了,到此为止Pool的使用讲完了。

...

Tags Read More


Python多进程编程之Process

by LauCyun Aug 14,2016 12:13:34 10,358 views

利用multiprocessing.Process对象可以创建一个进程,该Process对象与Thread对象的用法相同,也有start()run()join()等方法。Process类适合简单的进程创建,如需资源共享可以结合multiprocessing.Queue使用;如果想要控制进程数量,则建议使用进程池Pool类。

构造方法:

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

  • target表示调用对象
  • args表示调用对象的位置参数元组
  • kwargs表示调用对象的字典
  • name为别名
  • group实质上不使用

方法:

  • is_alive():返回进程是否在运行
  • join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)
  • start():进程准备就绪,等待CPU调度
  • run()strat()调用run方法,如果实例进程时未制定传入target,这start()执行默认run()方法
  • terminate():不管任务是否完成,立即停止工作进程

属性:

  • authkey:进程的认证密钥
  • daemon:和线程的setDeamon功能一样(将父进程设置为守护进程,当父进程结束时,子进程也结束),必须在start()之前设置
  • exitcode:进程在运行时为None、如果为–N,表示被信号N结束
  • name:进程名字
  • pid:进程号

使用Process创建子进程

举个栗子1:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import time


def worker(interval):
    n = 5
    while n > 0:
        print("The time is {0}".format(time.ctime()))
        time.sleep(interval)
        n -= 1


if __name__ == "__main__":
    p = multiprocessing.Process(target=worker, args=(3,))
    p.start()
    print("p.pid:", p.pid)
    print("p.name:", p.name)
    print("p.is_alive:", p.is_alive())

输出如下:

p.pid: 6376
p.name: Process-1
p.is_alive: True
The time is Thu Aug 14 09:07:16 2016
The time is Thu Aug 14 09:07:19 2016
The time is Thu Aug 14 09:07:22 2016
The time is Thu Aug 14 09:07:25 2016
The time is Thu Aug 14 09:07:28 2016

栗子2:创建函数并将其作为多个进程

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import time


def worker_1(interval):
    print("worker_1")
    time.sleep(interval)
    print("end worker_1")


def worker_2(interval):
    print("worker_2")
    time.sleep(interval)
    print("end worker_2")


def worker_3(interval):
    print("worker_3")
    time.sleep(interval)
    print("end worker_3")


if __name__ == "__main__":
    p1 = multiprocessing.Process(target=worker_1, args=(2,))
    p2 = multiprocessing.Process(target=worker_2, args=(3,))
    p3 = multiprocessing.Process(target=worker_3, args=(4,))

    p1.start()
    p2.start()
    p3.start()

    print("The number of CPU is:" + str(multiprocessing.cpu_count()))
    for p in multiprocessing.active_children():
        print("child   p.name:" + p.name + "\tp.id" + str(p.pid))
    print("END!!!!!!!!!!!!!!!!!")

输出结果如下:

The number of CPU is:8
child: p.name=Process-1, p.id=7872
child: p.name=Process-2, p.id=10872
child: p.name=Process-3, p.id=132
END!!!!!!!!!!!!!!!!!
worker_1
worker_3
worker_2
end worker_1
end worker_2
end worker_3

使用Process类继承创建子进程

现在将栗子1修改一下,将进程定义为类,如栗子3:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import time


class ClockProcess(multiprocessing.Process):
    def __init__(self, interval):
        multiprocessing.Process.__init__(self)
        self.interval = interval

    def run(self):
        n = 5
        while n > 0:
            print("the time is {0}".format(time.ctime()))
            time.sleep(self.interval)
            n -= 1


if __name__ == '__main__':
    p = ClockProcess(3)
    p.start()  # 进程p调用start()时,自动调用run()
    print("p.pid:", p.pid)
    print("p.name:", p.name)
    print("p.is_alive:", p.is_alive())

输出结果为:

p.pid: 1004
p.name: ClockProcess-1
p.is_alive: True
the time is Thu Aug 14 09:31:40 2016
the time is Thu Aug 14 09:31:43 2016
the time is Thu Aug 14 09:31:46 2016
the time is Thu Aug 14 09:31:49 2016
the time is Thu Aug 14 09:31:52 2016

如何使用daemon

最后,来讲一讲daemon,它是进程的守护神,当它退出时,它会尝试终止其所有的守护进程子进程。但是必须在调用start()之前设置。下面举栗子来说明:

栗子4.1:不加daemon属性

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import time


def worker(interval):
    print("work start:{0}".format(time.ctime()))
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()))


if __name__ == "__main__":
    p = multiprocessing.Process(target=worker, args=(3,))
    p.start()
    print("end!")

输出结果为:

end!
work start:Thu Aug 14 09:38:36 2016
work end:Thu Aug 14 09:38:39 2016

栗子4.2:使用daemon属性

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import time


def worker(interval):
    print("work start:{0}".format(time.ctime()))
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()))


if __name__ == "__main__":
    p = multiprocessing.Process(target=worker, args=(3,))
    p.daemon = True
    p.start()
    print("end!")

输出结果为:

end!

子进程没起作用,不可思议吧!那是因为它结束时,它会尝试终止其所有的守护进程子进程。

栗子4.3:设置daemon执行完结束的方法

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import time


def worker(interval):
    print("work start:{0}".format(time.ctime()))
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()))


if __name__ == "__main__":
    p = multiprocessing.Process(target=worker, args=(3,))
    p.daemon = True
    p.start()
    p.join()
    print("end!")

输出结果为:

work start:Thu Aug 14 09:45:24 2016
work end:Thu Aug 14 09:45:27 2016
end!

...

Tags Read More