The result of tag: (1 results)

Python多进程编程之共享资源

by LauCyun Aug 19,2016 00:20:30 29,904 views

在使用多进程的过程中,我们应该尽量避免多进程共享资源。多进程共享资源必然会带来进程间相互竞争。而这种竞争又会造成race condition,我们的结果有可能被竞争的不确定性所影响。

如果非得使用,multiprocessing类中共享资源可以使用3种方式,分别是Queue(具体请参考:Python多进程编程之进程间通信中multiprocessing.Queue部分)、ValueArrayManager。这三个都是multiprocessing自带的组件,使用起来也非常方便。注意:普通的全局变量是不能被子进程所共享的,只有通过multiprocessing组件构造的数据结构可以被共享。

1 共享内存

在python的multiprocessing中,有ValueArray类,可以和共享内存交互,来实现在进程之间共享数据。

举个栗子1:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing as mp
import time


def job(val, arr, p):
    for _ in range(len(arr)):
        val.value += p
        arr[i] += p
        print('proces %s, val=%s, arr=%s' % (p, val.value, arr[:]))
        time.sleep(0.5)  # 暂停0.1秒,让输出效果更明显


if __name__ == '__main__':
    val = mp.Value('d', 3.14)  # Value
    arr = mp.Array('i', range(3))  # Array

    print('val=%s, arr=%s' % (val.value, arr[:]))
    print("start...")

    p1 = mp.Process(target=job, args=(val, arr, 1))
    p2 = mp.Process(target=job, args=(val, arr, 2))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

    print('val=%s, arr=%s' % (val.value, arr[:]))
    print("done...")

在上面的代码中,我们定义了ValueArray两个对象的共享变量valarr,对象val被设置成为双精度数d, 并初始化为3.14。而arr则类似于C中的数组,有固定的类型i。两个进程都可以对它进行操作。 在job()中我们想让valarr每隔0.5秒输出一次结果,但是在两个进程p1p2 中设定了不同的累加值。所以接下来让我们来看下这两个进程是否会出现冲突。

运行一下的结果为:

val=3.14, arr=[0, 1, 2]
start...
proces 2, val=5.140000000000001, arr=[0, 3, 2]
proces 1, val=6.140000000000001, arr=[0, 4, 2]
proces 2, val=8.14, arr=[0, 6, 2]
proces 1, val=9.14, arr=[0, 7, 2]
proces 2, val=11.14, arr=[0, 9, 2]
proces 1, val=12.14, arr=[0, 10, 2]
val=12.14, arr=[0, 10, 2]
done...

我们可以看到了两个对象的值改变,说明资源确实在两个进程之间共享。

但是,这种方法无法与Pool一起使用。还是以上面的栗子为栗:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing as mp
import time


def job(val, arr, p):
    for _ in range(len(arr)):
        val.value += p
        arr[i] += p
        print('proces %s, val=%s, arr=%s' % (p, val.value, arr[:]))
        time.sleep(0.5)  # 暂停0.1秒,让输出效果更明显


if __name__ == '__main__':
    val = mp.Value('d', 3.14)  # Value
    arr = mp.Array('i', range(3))  # Array

    print('val=%s, arr=%s' % (val.value, arr[:]))
    print("start...")

    pool = mp.Pool(processes=3)
    for i in range(2):
        pool.apply_async(job, (val, arr))

    print('val=%s, arr=%s' % (val.value, arr[:]))
    print("done...")

运行结果为:

val=3.14, arr=[0, 1, 2]
start...
val=3.14, arr=[0, 1, 2]
done...

从运行结果上看,两个进程并不起作用。

2 Manager

Manager对象类似于服务器与客户之间的通信 (server-client),与我们在Internet上的活动很类似。我们用一个进程作为服务器,建立Manager来真正存放资源。其它的进程可以通过参数传递或者根据地址来访问Manager,建立连接后,操作服务器上的资源。在防火墙允许的情况下,我们完全可以将Manager运用于多计算机,从而模仿了一个真实的网络情境。

举个栗子2:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing as mp
import time

l = mp.Manager().list()  # 定义可被子进程共享的全局变量l


def job(i, msg):
    print("msg:", msg)
    l.append(i)
    time.sleep(1)


if __name__ == "__main__":
    pool = mp.Pool(processes=5)
    for i in range(10):
        msg = ("hello %d" % i)
        pool.apply_async(job, args=(i, msg))  # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
    pool.close()
    pool.join()  # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    print(l)
    print("Sub-process(es) done.")

Manager利用list()方法提供了表的共享方式。实际上你可以利用dict()来共享词典,Lock()来共享threading.Lock(注意,我们共享的是threading.Lock,而不是进程的mutiprocessing.Lock。后者本身已经实现了进程共享)等。 这样Manager就允许我们共享更多样的对象。

运行结果为:

msg: hello 1
msg: hello 0
msg: hello 2
msg: hello 3
msg: hello 4
msg: hello 5
msg: hello 6
msg: hello 7
msg: hello 8
msg: hello 9
[1, 0, 2, 3, 4, 5, 6, 7, 8, 9]
Sub-process(es) done.

附录:各参数代表的数据类型

Type code C Type Python Type Minimum size in bytes Notes
'b' signed char int 1  
'B' unsigned char int 1  
'u' Py_UNICODE Unicode character 2 (1)
'h' signed short int 2  
'H' unsigned short int 2  
'i' signed int int 2  
'I' unsigned int int 2  
'l' signed long int 4  
'L' unsigned long int 4  
'q' signed long long int 8 (2)
'Q' unsigned long long int 8 (2)
'f' float float 4  
'd' double float 8  

Notes:

  1. The 'u' type code corresponds to Python’s obsolete unicode character (Py_UNICODE which is wchar_t). Depending on the platform, it can be 16 bits or 32 bits.

    'u' will be removed together with the rest of the Py_UNICODE API.

    Deprecated since version 3.3, will be removed in version 4.0.

  2. The 'q' and 'Q' type codes are available only if the platform C compiler used to build Python supports C long long, or, on Windows, __int64.

    New in version 3.3.

(来源:array — Efficient arrays of numeric values

...

Tags Read More