进程是资源的集合,是最小的资源单位。是一个程序在一个数据集上的一次动态执行过程。进程一般由程序、数据集、进程控制块三部分组成。多进程适合执行计算密集型任务(如:视频编码解码、数据处理、科学计算等)、可以分解为多个并行子任务并能合并子任务执行结果的任务,以及在内存使用方面没有任何限制且不强依赖于I/O操作的任务。本文主要介绍Python 中 multiprocessing 多进程间通信传递值方法,以及相关的示例代码。

1、Python 多进程

参考文档:Python 异步编程 多进程

2、使用multiprocessing.Manager的Namespace()实现

可以使用multiprocessing.Manager为所有进程提供单例DataFrame实例。有几种不同的方法可以达到相同的效果,但可能最简单的方法是将DataFrame放入multiprocessing.Manager实例的Namespace中。

from multiprocessing import Manager,Process
import pandas as pd
import time
df = pd.DataFrame([[10,6,7,8],
[1,9,12,14],
[5,8,10,6]],
columns = ['a','b','c','d'])
mgr = Manager()
ns = mgr.Namespace()
ns.df = df
def worker(ns):
print(ns.df)
time.sleep(1)
print("end")
#另一个进程
p = Process(target=worker, args=(ns,))
p.start()
p.join()

3、使用BaseManager和SyncManager实现

使用Python的BaseManagerSyncManager类使用客户端/服务器设置。首先设置一个服务器,为数据提供代理类。代码如下,

1)DataServer.py

#!/usr/bin/python
from    multiprocessing.managers import SyncManager
import  numpy
import pandas as pd

# Global for storing the data to be served
gData = {}

#不同进程共享的代理类
#不要把大数据放在这里,因为那会迫使它被管道传输到
# other进程在那里实例化时,而是只返回一部分当请求时,
#全局数据。
class DataProxy(object):
    def __init__(self):
        pass

    def getData(self, key, default=None):
        global gData
        return gData.get(key, None)

if __name__ == '__main__':
    port  = 5000

    gData[1] = pd.DataFrame([[10,6,7,8],
[1,9,12,14],
[5,8,10,6]],
columns = ['a','b','c','d'])

    # Start the server on address(host,port)
    print('Serving data. Press <ctrl>-c to stop.')
    class myManager(SyncManager): pass
    myManager.register('DataProxy', DataProxy)
    mgr = myManager(address=('', port), authkey='DataProxy01'.encode())
    server = mgr.get_server()
    server.serve_forever()

2)DataClient.py

from   multiprocessing.managers import BaseManager
import psutil   #用于获取进程信息

# 获取共享代理类。该类中的所有方法都在这里可用
class DataClient(object):
    def __init__(self, port):
        #assert DataClient._checkForProcess('DataServer.py'), 'Must have DataServer running'
        class myManager(BaseManager): pass
        myManager.register('DataProxy')
        self.mgr = myManager(address=('localhost', port), authkey='DataProxy01'.encode())
        self.mgr.connect()
        self.proxy = self.mgr.DataProxy()

    # 验证服务器正在运行 (非必须的)
    @staticmethod
    def _checkForProcess(name):
        for proc in psutil.process_iter():
            print(proc.name())
            if proc.name() == name:
                
                return True
        return False

3)使用示例

先运行DataServer.py,然后运行保存的下面代码,如下,

#!/usr/bin/python
import time
import multiprocessing as mp
import numpy
from   DataClient import *    

# “代理”对每个子进程都是全局的,
# 不是在所有进程之间共享的
gProxy = None
gMode  = None
gDummy = None
def init(port, mode):
    global gProxy, gMode, gDummy
    gProxy  = DataClient(port).proxy
    gMode  = mode
    gDummy = numpy.random.rand(1000) 
    print('Init proxy ', id(gProxy), 'in ', mp.current_process())

def worker(key):
    global gProxy, gMode, gDummy
    if 0 == gMode:   # 从代理获取
        array = gProxy.getData(key)
        print(array)
    elif 1 == gMode: # 测试区别
        array = gDummy
    else: assert 0, 'unknown mode: %s' % gMode 

if __name__ == '__main__':
    port   = 5000
    maxkey = 1000
    numpts = 100
                   
    for mode in [1, 0]:
        for nprocs in [16, 1]:
            if 0==mode: print('使用 client/server %d processes' % nprocs)
            if 1==mode: print('使用 local data %d processes' % nprocs)
            pool = mp.Pool(nprocs, initializer=init, initargs=(port,mode))
            start = time.time()
            ret_data = pool.map(worker,[1],chunksize=1)
            print('took %4.3f seconds' % (time.time()-start))
            pool.close()

推荐文档