about 2 results (0.02 seconds)

话说Python内存管理机制篇二之内存分配策略(一)

by LauCyun Jul 26,2017 13:35:42 29,330 views

在Python中,许多时候申请的内存都是小块的内存,这些小块内存在申请后,很快又会被释放,由于这些内存的申请并不是为了创建对象,所以并没有对象一级的内存池机制。这就意味着Python在运行期间会大量地执行mallocfree操作,导致操作系统频繁地在用户态和核心态之间进行切换,这将严重影响Python的执行效率。为了提高Python的执行效率,Python引入了一个内存池机制,用于管理对小块内存的申请和释放。这叫 Pymalloc机制

在Python中,整个小块内存池可以视为一个层次结构,在这个层次结构中,一共分为4层,从下至上分别是:blockpoolarena内存池,如图1。需要说明的是,blockpoolarena都是Python代码中可以找到的实体,而最顶层的内存池只是一个概念上的东西,表示Python对于整个小块内存分配和释放行为的内存管理机制。


图1 Python的小块内存的内存池全景

Python又分为大内存和小内存。大小以512字节为界限,对于大内存使用Malloc进行分配,而对于小内存则使用内存池进行分配。

1 block

在 Python 中,block是最小的内存单元,不同种类的block都有不同的内存大小,这个内存大小的值被称为size class。为了在当前主流的平台都能获得最佳性能,所有的block的长度都是8字节对齐的。

[Objects/obmalloc.c]
/*
 * Alignment of addresses returned to the user. 8-bytes alignment works
 * on most current architectures (with 32-bit or 64-bit address busses).
 * The alignment value is also used for grouping small requests in size
 * classes spaced ALIGNMENT bytes apart.
 *
 * You shouldn't change this unless you know what you are doing.
 */
#define ALIGNMENT               8               /* must be 2^N */
#define ALIGNMENT_SHIFT         3

同时,Python为block的大小设定了一个上限,当申请的内存大小小于这个上限时,Python可以使用不同种类的block来满足对内存的需求;当申请的内存大小超过这个上限时,Python就会对内存的请求转交给第一层的内存管理机制,即PyMem函数族来处理。这个上限值在Python3.5中被设定为512。

[Objects/obmalloc.c]
/*
 * Max size threshold below which malloc requests are considered to be
 * small enough in order to use preallocated memory pools. You can tune
 * this value according to your application behaviour and memory needs.
 *
 * Note: a size threshold of 512 guarantees that newly created dictionaries
 * will be allocated from preallocated memory pools on 64-bit.
 *
 * The following invariants must hold:
 *      1) ALIGNMENT <= SMALL_REQUEST_THRESHOLD <= 512
 *      2) SMALL_REQUEST_THRESHOLD is evenly divisible by ALIGNMENT
 *
 * Although not required, for better performance and space efficiency,
 * it is recommended that SMALL_REQUEST_THRESHOLD is set to a power of 2.
 */
#define SMALL_REQUEST_THRESHOLD 512
#define NB_SMALL_SIZE_CLASSES   (SMALL_REQUEST_THRESHOLD / ALIGNMENT)

根据SMALL_REQUEST_THRESHOLDALIGNMENT的限定,由此可知不同种类block的的size class分别为:8,16,24,32,...,512。而每个size class对应一个size class index,这个size class index从0开始,所以对于小于512字节的小内存的分配,得出如下结论:

表1 不同长度的block

Request in bytes Size of allocated block Size class idx
1-8 8 0
9-16 16 1
17-24 24 2
25-32 32 3
33-40 40 4
41-48 48 5
49-56 56 6
57-64 64 7
65-72 72 8
497-504 504 62
505-512 512 63

注:0, SMALL_REQUEST_THRESHOLD + 1 and up: routed to the underlying allocator.

举个栗子[1]

如果要申请一块29字节大小的内存,实际上PyObject_Malloc从内存池中划一个32字节的block(从size class index为3的pool里面划出),如图2所示:


图2

size classsize class index之间的转换:

[Objects/obmalloc.c]
// 从 size class index 转换到 size class
#define INDEX2SIZE(I) (((uint)(I) + 1) << ALIGNMENT_SHIFT)
/*
 * 即:
 * (0+1) * 8 = 8
 * (1+1) * 8 = 16
 */

// 从 size class 转换到 size class index
size = (uint)(nbytes - 1) >> ALIGNMENT_SHIFT;
/* 
 * 即:
 * (8 - 1) / 8 = 0
 * (16 - 8) / 8 = 1
 */

在Python中,block只是一个概念,在Python源码中没有与之对应的实体存在。对象在Python源码中有对应的PyObject;列表在Python源码中对应PyListObject、PyType_List,所以block就很奇怪了,它仅仅是概念上的东西,我们知道它是具有一定大小的内存,但它不与Python源码里的某个东西对应。然而,Python却提供了一个管理block的东西,这就是下面要剖析的pool

 

2 pool

一组 block的集合称为一个pool,换句话说,一个pool管理着一堆有固定大小的内存块。事实上,pool管理着一大块内存,它有一定的策略,将这块大的内存划分为多个小的内存块。

2.1 pool大小

[Objects/obmalloc.c]
/*
 * The system's VMM page size can be obtained on most unices with a
 * getpagesize() call or deduced from various header files. To make
 * things simpler, we assume that it is 4K, which is OK for most systems.
 * It is probably better if this is the native page size, but it doesn't
 * have to be.  In theory, if SYSTEM_PAGE_SIZE is larger than the native page
 * size, then `POOL_ADDR(p)->arenaindex' could rarely cause a segmentation
 * violation fault.  4K is apparently OK for all the platforms that python
 * currently targets.
 */
#define SYSTEM_PAGE_SIZE        (4 * 1024)
#define SYSTEM_PAGE_SIZE_MASK   (SYSTEM_PAGE_SIZE - 1)

/*
 * Size of the pools used for small blocks. Should be a power of 2,
 * between 1K and SYSTEM_PAGE_SIZE, that is: 1k, 2k, 4k.
 */
#define POOL_SIZE               SYSTEM_PAGE_SIZE        /* must be 2^N */
#define POOL_SIZE_MASK          SYSTEM_PAGE_SIZE_MASK

在Python中,一个pool的大小通常为一个系统内存页,由于当前大多数Python支持的系统的内存页都是4KB,所以Python内部也将一个pool的大小定义为4KB

2.2 pool结构

pool是由pool_headerblock集合(N多大小一样的block)组成。其pool_header的结构如下:

[Objects/obmalloc.c]

/* Pool for small blocks. */
struct pool_header {
    union { block *_padding;
            uint count; } ref;          /* number of allocated blocks    */
    block *freeblock;                   /* pool's free list head         */
    struct pool_header *nextpool;       /* next pool of this size class  */
    struct pool_header *prevpool;       /* previous pool       ""        */
    uint arenaindex;                    /* index into arenas of base adr */
    uint szidx;                         /* block size class index        */
    uint nextoffset;                    /* bytes to virgin block         */
    uint maxnextoffset;                 /* largest valid nextoffset      */
};

其中:

  • ref表示已分配block的数量
  • freeblock表示指向下一个可用的block的起始地址
  • nextoffset表示下一个可用的block的偏移位置
  • maxnextoffset表示pool的最大偏移位置

一个pool管理的所有block,它们的大小都是一样的。如图3


图3 pool的结构图

每一个pool都和一个size class index联系在一起,(block size=8, szidx=0)、 (block size=16, szidx=1)、...、(block size=512, szidx=63),用于内存分配时匹配到拥有对应大小blockpool,这就是pool_headerszidx的意义。

2.3 pool初始化

假设有一块4KB的内存,Python是如何初始化的呢?如下:

[Objects/obmalloc.c]

/* malloc.  Note that nbytes==0 tries to return a non-NULL pointer, distinct
 * from all other currently live pointers.  This may not be possible.
 */

/*
 * The basic blocks are ordered by decreasing execution frequency,
 * which minimizes the number of jumps in the most common cases,
 * improves branching prediction and instruction scheduling (small
 * block allocations typically result in a couple of instructions).
 * Unless the optimizer reorders everything, being too smart...
 */

static void *
_PyObject_Alloc(int use_calloc, void *ctx, size_t nelem, size_t elsize)
{
    size_t nbytes;
    block *bp;
    poolp pool;
    poolp next;
    uint size;

    ...   // pool指向了一块4KB的内存

        init_pool:
            /* Frontlink to used pools. */
            // 连接到 used_pools 双向链表, 作为表头
            // 注意, 这里 usedpools[0] 保存着 block size = 8 的所有used_pools的表头
            next = usedpools[size + size]; /* == prev */
            pool->nextpool = next;
            pool->prevpool = next;
            next->nextpool = pool;
            next->prevpool = pool;
            pool->ref.count = 1;

            if (pool->szidx == size) {
                /* Luckily, this pool last contained blocks
                 * of the same size class, so its header
                 * and free list are already initialized.
                 */
                bp = pool->freeblock;
                assert(bp != NULL);
                pool->freeblock = *(block **)bp;
                UNLOCK();
                if (use_calloc)
                    memset(bp, 0, nbytes);
                return (void *)bp;
            }
            /*
             * Initialize the pool header, set up the free list to
             * contain just the second block, and return the first
             * block.
             */
            // 开始初始化pool_header
            // 这里 size = (uint)(nbytes - 1) >> ALIGNMENT_SHIFT;  其实是Size class idx, 即szidx
            pool->szidx = size; 

            // 将size class index转换为size,如2转换为24字节
            size = INDEX2SIZE(size); 

            // 注意 #define POOL_OVERHEAD ROUNDUP(sizeof(struct pool_header))
            // bp => 初始化为pool + pool_header size,  跳过pool_header的内存
            bp = (block *)pool + POOL_OVERHEAD;

            // 计算偏移量, 这里的偏移量是绝对值
            // #define POOL_SIZE SYSTEM_PAGE_SIZE  /* must be 2^N */
            // POOL_SIZE = 4kb, POOL_OVERHEAD = pool_header size
            // 下一个偏移位置: pool_header size + 2 * size
            pool->nextoffset = POOL_OVERHEAD + (size << 1);

            pool->maxnextoffset = POOL_SIZE - size;

            // freeblock指向 bp + size = pool_header size + size
            pool->freeblock = bp + size;

            // *freeblock赋值NULL
            *(block **)(pool->freeblock) = NULL;
            UNLOCK();
            if (use_calloc)
                memset(bp, 0, nbytes);
            return (void *)bp;
        }
    ...
}

最后返回的bp就是指向从pool中取出的第一块block的指针。而bp返回的实际是一个地址,这个地址之后有将近 4KB 的内存实际上都是可用的,但是可以肯定申请内存的函数只会使用[bp, bp+zise]这个区间的内存,这是由size class index决定的。

图4所示的一块经过初始化后的4KB内存:


图4 pool初始化后的4KB内存

接下来,看看Python在申请pool时,pool_header中的各个域是怎么变动的?

总体分配的代码如下:

[Objects/obmalloc.c]

        /*
         * Most frequent paths first
         */
        size = (uint)(nbytes - 1) >> ALIGNMENT_SHIFT;
        pool = usedpools[size + size];
        if (pool != pool->nextpool) {
            /*
             * There is a used pool for this size class.
             * Pick up the head block of its free list.
             */
            ++pool->ref.count;
            bp = pool->freeblock; //指针指向下一个可用block起始位置
            assert(bp != NULL);
            
            // 代码[1]
            // 调整 pool->freeblock (假设A节点)指向链表下一个, 即bp首字节指向的下一个节点(假设B节点) 
            // 如果此时!= NULL, 表示 A节点可用, 直接返回
            if ((pool->freeblock = *(block **)bp) != NULL) {
                UNLOCK();
                if (use_calloc)
                    memset(bp, 0, nbytes);
                return (void *)bp;
            }

            // 代码[2]
            /*
             * Reached the end of the free list, try to extend it.
             */
            // 有足够的空间, 分配一个, pool->freeblock 指向后移
            if (pool->nextoffset <= pool->maxnextoffset) {
                /* There is room for another block. */
                pool->freeblock = (block*)pool +
                                  pool->nextoffset;
                pool->nextoffset += INDEX2SIZE(size);

                // 注意, *freeblock指向NULL
                // 设置 *freeblock 的动作正是建立离散自由 block 链表的关键所在
                *(block **)(pool->freeblock) = NULL; 
                UNLOCK();
                if (use_calloc)
                    memset(bp, 0, nbytes);
                return (void *)bp;
            }

            // 代码[3]
            /* Pool is full, unlink from used pools. */
            // pool满了, 需要从下一个pool获取
            next = pool->nextpool;
            pool = pool->prevpool;
            next->prevpool = pool;
            pool->nextpool = next;
            UNLOCK();
            if (use_calloc)
                memset(bp, 0, nbytes);
            return (void *)bp;
        }

举个栗子[2]

假设我们从现在开始连续申请5块29字节内存,由于29字节对应的size class index为3,所以实际上会申请5块32字节的内存。

freeblock指向的是下一个可用的block的起始地址,这一点在图4中也可以看得出。当再次申请32字节的block时,只需返回freeblock指向的地址就可以了,这时freeblock需要向前进,指向下一个可用的block

pool_header中,nextoffsetmaxnextoffset是两个用于对pool中的block集合进行迭代的变量:从初始化pool的结果及图4中可以看到,它所指示的偏移位置正好指向了freeblock之后的下一个可用的block的地址。从这里分配block的动作也可以看到,在分配了block之后,freeblocknextoffset都会向前移动一个block的距离,如此反复,就可对所有的block进行一次遍历。而maxnextoffset指名了该pool中最后一个可用的blockpool开始位置的便移,它界定了pool的边界,当nextoffset > maxnextoffset 时,也就意味着已经遍历完了pool中所有的block了。

2.4 pool分配策略 - 情况1

内存块尚未分配完,且此时不存在回收的block;全新pool进来的时候,分配第一块block。其条件为:

(pool->freeblock = *(block **)bp) == NULL

其分配实现逻辑如下:

[Objects/obmalloc.c]

            bp = pool->freeblock; //指针指向下一个可用block起始位置

            // 代码[2]
            /*
             * Reached the end of the free list, try to extend it.
             */
            // 有足够的空间, 分配一个, pool->freeblock 指向后移
            if (pool->nextoffset <= pool->maxnextoffset) {
                /* There is room for another block. */
                pool->freeblock = (block*)pool +
                                  pool->nextoffset;
                pool->nextoffset += INDEX2SIZE(size);

                // 注意, *freeblock指向NULL
                // 设置 *freeblock 的动作正是建立离散自由 block 链表的关键所在
                *(block **)(pool->freeblock) = NULL; 
                UNLOCK();
                if (use_calloc)
                    memset(bp, 0, nbytes);
                return (void *)bp;
            }

所以栗子[2]的结果如图5所示:


图5 分配5块32字节后的4KB内存

这种情况下,申请内存只需要:申请、前进,申请、前进,...,申请、前进,这个过程很容易。

接下来讲第二种情况。

2.5 pool分配策略 - 情况2:回收了某几个block

上面栗子中,我们已经进行了5次连续的32字节的内存分配,但是现在程序释放了其中的第1块和第4块block,那么下一次再分配32字节的内存时,是用第1块还是第6块内存呢?我们肯定期望它使用第1块内存。

一旦Python运转起来,内存的释放动作将会导致pool中出现大量的离散的自由block,Python必须建立一种机制,将这些离散的自由block组织起来,再次使用。这个机制就是所谓的自由block链表。这个链表的关键就着落在pool_header中的那个freeblock身上。

代码[2]中有*(block **)(pool->freeblock) = NULL这个动作,这个动作正是建立离散自由block链表的关键所在。如果某个block用完后,就会释放,其释放代码如下:

[Objects/obmalloc.c]

/* free */

ATTRIBUTE_NO_ADDRESS_SAFETY_ANALYSIS
static void
_PyObject_Free(void *ctx, void *p)
{
    poolp pool;
    block *lastfree;
    poolp next, prev;
    uint size;
    ...

    pool = POOL_ADDR(p);
    // 判断p指向的block是否属于pool
    if (Py_ADDRESS_IN_RANGE(p, pool)) {
        /* We allocated this address. */
        LOCK();
        /* Link p to the start of the pool's freeblock list.  Since
         * the pool had at least the p block outstanding, the pool
         * wasn't empty (so it's already in a usedpools[] list, or
         * was full and is in no list -- it's not in the freeblocks
         * list in any case).
         */
        assert(pool->ref.count > 0);            /* else it was empty */
        // p被释放,p的第一个字节值被设置为当前的 freeblock 的值
        *(block **)p = lastfree = pool->freeblock; // [1]
        // freeblock的值被更新,指向其首地址,则一个 block 被插入到了离散自由的 block 链表中
        pool->freeblock = (block *)p;              // [2]

        // 相当于往list中头插入了一个节点

        ...
    }
    ...
}

假设这是Python释放的是block p,在经过上面代码中的[1]逻辑后,A中第一个字节的值被设置为了当前freeblock的值,而在经过[2]逻辑后,freeblock的值被更新了,指向了block p的首地址。就这样,一个block被插入到了离散自由block链表中。回到上面的栗子,当第1块和第4块block都被释放后,我们就看到了图6中一个离散自由block链表:


图6 释放了block之后产生的离散自由block链表

freeblock开始,可以很容易以freeblock = *freeblock的方式遍历这条链表,而当发现*freeblockNULL是,则表明到达了该链表的尾部。继续回到上面的栗子中,再分配32字节的内存时,其逻辑代码[1]如下:

[Objects/obmalloc.c]

        /*
         * Most frequent paths first
         */
        size = (uint)(nbytes - 1) >> ALIGNMENT_SHIFT;
        pool = usedpools[size + size];
        if (pool != pool->nextpool) {
            /*
             * There is a used pool for this size class.
             * Pick up the head block of its free list.
             */
            ++pool->ref.count;
            bp = pool->freeblock; //指针指向下一个可用block起始位置
            assert(bp != NULL);
            
            // 代码[1]
            // 调整 pool->freeblock (假设A节点)指向链表下一个, 即bp首字节指向的下一个节点(假设B节点) 
            // 如果此时!= NULL, 表示 A节点可用, 直接返回
            if ((pool->freeblock = *(block **)bp) != NULL) {
                UNLOCK();
                if (use_calloc)
                    memset(bp, 0, nbytes);
                return (void *)bp;
            }
            ...
        }

其结果如图7所示:


图7 使用离散自由block链表分配一个block后的4KB内存

2.6 pool分配策略 - 情况3:pool用完了

如果pool中内存空间都用完了,则执行如下逻辑:

[Objects/obmalloc.c]

        /*
         * Most frequent paths first
         */
        size = (uint)(nbytes - 1) >> ALIGNMENT_SHIFT;
        pool = usedpools[size + size];
        if (pool != pool->nextpool) {
            /*
             * There is a used pool for this size class.
             * Pick up the head block of its free list.
             */
            ++pool->ref.count;
            bp = pool->freeblock; //指针指向下一个可用block起始位置
            assert(bp != NULL);
            
            ...

            // 代码[3]
            /* Pool is full, unlink from used pools. */
            // pool满了, 需要从下一个pool获取
            next = pool->nextpool;
            pool = pool->prevpool;
            next->prevpool = pool;
            pool->nextpool = next;
            UNLOCK();
            if (use_calloc)
                memset(bp, 0, nbytes);
            return (void *)bp;
        }

获取下一个pool(链表上每个poolblock size都是一致的)

好了,pool到此为止,下面进入arena

(全文完)

...

Tags Read More..


Python多进程编程之Pool

by LauCyun Aug 17,2016 10:51:12 23,821 views

在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

构造方法

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

  • processes:工作进程数量。Pool默认大小是CPU的核数os.cpu_count(),我们也可以通过在Pool中传入processes参数即可自定义需要的核数量。
  • initializer: 如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)
  • maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
  • context:用在制定工作进程启动时的上下文,一般使用multiprocessing.Pool()或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context

方法

  • apply(func[, args[, kwds]])是阻塞的。
  • apply_async(func[, args[, kwds[, callback]]])它是非阻塞。(理解与apply区别,看栗子1和栗子2结果区别)
  • close()关闭pool,使其不在接受新的任务。
  • terminate()关闭pool,结束工作进程,不在处理未完成的任务。
  • join()主进程阻塞,等待子进程的退出, join方法要在closeterminate之后使用。
  • map(funciterable[, chunksize])
  • map_async(funciterable[, chunksize[, callback[, error_callback]]])
  • imap(funciterable[, chunksize])
  • imap_unordered(funciterable[, chunksize])
  • starmap(funciterable[, chunksize])
  • starmap_async(funciterable[, chunksize[, callback[, error_back]]])

栗子1:使用进程池(非阻塞)

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import time


def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print("end")


if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=3)
    for i in range(4):
        msg = ("hello %d" % (i))
        pool.apply_async(func, (msg,))  # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去

    print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
    pool.close()
    pool.join()  # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    print("Sub-process(es) done.")

一次执行结果为:

Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
msg: hello 0
msg: hello 1
msg: hello 2
end
end
end
msg: hello 3
end
Sub-process(es) done.

执行说明:创建一个进程池pool,并设定进程的数量为3,range(4)会相继产生四个对象[0, 1, 2, 3],四个对象被提交到pool中,因pool指定进程数为3,所以0、1、2会直接送到进程中执行,当其中一个执行完事后才空出一个进程处理对象3,所以会出现输出“msg: hello 3”出现在"end"后。因为为非阻塞,主函数会自己执行自个的,不搭理进程的执行,所以运行完for循环后直接输出“Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~”,主程序在pool.join()处等待各个进程的结束。

栗子2:使用进程池(阻塞)

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import time


def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print("end")


if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=3)
    for i in range(4):
        msg = "hello %d" % (i)
        pool.apply(func, (msg,))  # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去

    print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
    pool.close()
    pool.join()  # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    print("Sub-process(es) done.")

一次执行结果为:

msg: hello 0
end
msg: hello 1
end
msg: hello 2
end
msg: hello 3
end
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
Sub-process(es) done.

执行说明:for循环内执行的步骤顺序,往进程池中添加一个子进程,执行子进程,等待执行完毕再添加一个子进程…..等4个子进程都执行完了,再执行"Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"和"Sub-process(es) done."。

栗子3:使用进程池(非阻塞),获取子进程的返回值

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import time


def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print("end")
    return "done" + msg


if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=3)
    result = []
    for i in range(4):
        msg = "hello %d" % i
        result.append(pool.apply_async(func, (msg,)))
    pool.close()  #调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
    pool.join()
    for res in result:
        print("return: ", res.get())
    print("Sub-process(es) done.")

一次执行结果为:

msg: hello 0
msg: hello 1
msg: hello 2
end
end
end
msg: hello 3
end
return:  donehello 0
return:  donehello 1
return:  donehello 2
return:  donehello 3
Sub-process(es) done.

栗子4:使用多个进程池

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import os, time, random


def fun1():
    print("Run task 1, pid is %s" % (os.getpid()))  # os.getpid()获取当前的进程的ID
    start = time.time()
    time.sleep(random.random() * 10)  # random.random()随机生成0-1之间的小数
    end = time.time()
    print('Task 1, runs %0.2f seconds.' % (end - start))


def fun2():
    print("Run task 2, pid is %s" % (os.getpid()))
    start = time.time()
    time.sleep(random.random() * 40)
    end = time.time()
    print('Task 2 runs %0.2f seconds.' % (end - start))


def fun3():
    print("Run task 3, pid is %s" % (os.getpid()))
    start = time.time()
    time.sleep(random.random() * 30)
    end = time.time()
    print('Task 3 runs %0.2f seconds.' % (end - start))


def fun4():
    print("Run task 4, pid is %s" % (os.getpid()))
    start = time.time()
    time.sleep(random.random() * 20)
    end = time.time()
    print('Task 4 runs %0.2f seconds.' % (end - start))


if __name__ == '__main__':
    function_list = [fun1, fun2, fun3, fun4]
    print("parent process %s" % (os.getpid()))

    pool = multiprocessing.Pool(4)
    for func in function_list:
        pool.apply_async(func)  # Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中

    print('Waiting for all subprocesses done...')
    pool.close()
    pool.join()  # 调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
    print('All subprocesses done.')

一次执行结果为:

parent process 12496
Waiting for all subprocesses done...
Run task 1, pid is 6836
Run task 2, pid is 6596
Run task 3, pid is 14304
Run task 4, pid is 10900
Task 3 runs 0.81 seconds.
Task 1, runs 1.83 seconds.
Task 2 runs 8.97 seconds.
Task 4 runs 15.82 seconds.
All subprocesses done.

好了,到此为止Pool的使用讲完了。

...

Tags Read More..