1、Python 多进程
参考文档:Python 异步编程 多进程
2、使用multiprocessing.Manager的Namespace()实现
可以使用multiprocessing.Manager
为所有进程提供单例DataFrame
实例。有几种不同的方法可以达到相同的效果,但可能最简单的方法是将DataFrame
放入multiprocessing.Manager
实例的Namespace
中。
from multiprocessing import Manager,Process import pandas as pd import time df = pd.DataFrame([[10,6,7,8], [1,9,12,14], [5,8,10,6]], columns = ['a','b','c','d']) mgr = Manager() ns = mgr.Namespace() ns.df = df def worker(ns): print(ns.df) time.sleep(1) print("end") #另一个进程 p = Process(target=worker, args=(ns,)) p.start() p.join()
3、使用BaseManager和SyncManager实现
使用Python的BaseManager
和SyncManager
类使用客户端/服务器设置。首先设置一个服务器,为数据提供代理类。代码如下,
1)DataServer.py
#!/usr/bin/python from multiprocessing.managers import SyncManager import numpy import pandas as pd # Global for storing the data to be served gData = {} #不同进程共享的代理类 #不要把大数据放在这里,因为那会迫使它被管道传输到 # other进程在那里实例化时,而是只返回一部分当请求时, #全局数据。 class DataProxy(object): def __init__(self): pass def getData(self, key, default=None): global gData return gData.get(key, None) if __name__ == '__main__': port = 5000 gData[1] = pd.DataFrame([[10,6,7,8], [1,9,12,14], [5,8,10,6]], columns = ['a','b','c','d']) # Start the server on address(host,port) print('Serving data. Press <ctrl>-c to stop.') class myManager(SyncManager): pass myManager.register('DataProxy', DataProxy) mgr = myManager(address=('', port), authkey='DataProxy01'.encode()) server = mgr.get_server() server.serve_forever()
2)DataClient.py
from multiprocessing.managers import BaseManager import psutil #用于获取进程信息 # 获取共享代理类。该类中的所有方法都在这里可用 class DataClient(object): def __init__(self, port): #assert DataClient._checkForProcess('DataServer.py'), 'Must have DataServer running' class myManager(BaseManager): pass myManager.register('DataProxy') self.mgr = myManager(address=('localhost', port), authkey='DataProxy01'.encode()) self.mgr.connect() self.proxy = self.mgr.DataProxy() # 验证服务器正在运行 (非必须的) @staticmethod def _checkForProcess(name): for proc in psutil.process_iter(): print(proc.name()) if proc.name() == name: return True return False
3)使用示例
先运行DataServer.py,然后运行保存的下面代码,如下,
#!/usr/bin/python import time import multiprocessing as mp import numpy from DataClient import * # “代理”对每个子进程都是全局的, # 不是在所有进程之间共享的 gProxy = None gMode = None gDummy = None def init(port, mode): global gProxy, gMode, gDummy gProxy = DataClient(port).proxy gMode = mode gDummy = numpy.random.rand(1000) print('Init proxy ', id(gProxy), 'in ', mp.current_process()) def worker(key): global gProxy, gMode, gDummy if 0 == gMode: # 从代理获取 array = gProxy.getData(key) print(array) elif 1 == gMode: # 测试区别 array = gDummy else: assert 0, 'unknown mode: %s' % gMode if __name__ == '__main__': port = 5000 maxkey = 1000 numpts = 100 for mode in [1, 0]: for nprocs in [16, 1]: if 0==mode: print('使用 client/server %d processes' % nprocs) if 1==mode: print('使用 local data %d processes' % nprocs) pool = mp.Pool(nprocs, initializer=init, initargs=(port,mode)) start = time.time() ret_data = pool.map(worker,[1],chunksize=1) print('took %4.3f seconds' % (time.time()-start)) pool.close()