1、Python 多进程
参考文档:Python 异步编程 多进程
2、使用multiprocessing.Manager的Namespace()实现
可以使用multiprocessing.Manager
为所有进程提供单例DataFrame
实例。有几种不同的方法可以达到相同的效果,但可能最简单的方法是将DataFrame
放入multiprocessing.Manager
实例的Namespace
中。
from multiprocessing import Manager,Process
import pandas as pd
import time
df = pd.DataFrame([[10,6,7,8],
[1,9,12,14],
[5,8,10,6]],
columns = ['a','b','c','d'])
mgr = Manager()
ns = mgr.Namespace()
ns.df = df
def worker(ns):
print(ns.df)
time.sleep(1)
print("end")
#另一个进程
p = Process(target=worker, args=(ns,))
p.start()
p.join()
3、使用BaseManager和SyncManager实现
使用Python的BaseManager
和SyncManager
类使用客户端/服务器设置。首先设置一个服务器,为数据提供代理类。代码如下,
1)DataServer.py
#!/usr/bin/python
from multiprocessing.managers import SyncManager
import numpy
import pandas as pd
# Global for storing the data to be served
gData = {}
#不同进程共享的代理类
#不要把大数据放在这里,因为那会迫使它被管道传输到
# other进程在那里实例化时,而是只返回一部分当请求时,
#全局数据。
class DataProxy(object):
def __init__(self):
pass
def getData(self, key, default=None):
global gData
return gData.get(key, None)
if __name__ == '__main__':
port = 5000
gData[1] = pd.DataFrame([[10,6,7,8],
[1,9,12,14],
[5,8,10,6]],
columns = ['a','b','c','d'])
# Start the server on address(host,port)
print('Serving data. Press <ctrl>-c to stop.')
class myManager(SyncManager): pass
myManager.register('DataProxy', DataProxy)
mgr = myManager(address=('', port), authkey='DataProxy01'.encode())
server = mgr.get_server()
server.serve_forever()
2)DataClient.py
from multiprocessing.managers import BaseManager
import psutil #用于获取进程信息
# 获取共享代理类。该类中的所有方法都在这里可用
class DataClient(object):
def __init__(self, port):
#assert DataClient._checkForProcess('DataServer.py'), 'Must have DataServer running'
class myManager(BaseManager): pass
myManager.register('DataProxy')
self.mgr = myManager(address=('localhost', port), authkey='DataProxy01'.encode())
self.mgr.connect()
self.proxy = self.mgr.DataProxy()
# 验证服务器正在运行 (非必须的)
@staticmethod
def _checkForProcess(name):
for proc in psutil.process_iter():
print(proc.name())
if proc.name() == name:
return True
return False
3)使用示例
先运行DataServer.py,然后运行保存的下面代码,如下,
#!/usr/bin/python
import time
import multiprocessing as mp
import numpy
from DataClient import *
# “代理”对每个子进程都是全局的,
# 不是在所有进程之间共享的
gProxy = None
gMode = None
gDummy = None
def init(port, mode):
global gProxy, gMode, gDummy
gProxy = DataClient(port).proxy
gMode = mode
gDummy = numpy.random.rand(1000)
print('Init proxy ', id(gProxy), 'in ', mp.current_process())
def worker(key):
global gProxy, gMode, gDummy
if 0 == gMode: # 从代理获取
array = gProxy.getData(key)
print(array)
elif 1 == gMode: # 测试区别
array = gDummy
else: assert 0, 'unknown mode: %s' % gMode
if __name__ == '__main__':
port = 5000
maxkey = 1000
numpts = 100
for mode in [1, 0]:
for nprocs in [16, 1]:
if 0==mode: print('使用 client/server %d processes' % nprocs)
if 1==mode: print('使用 local data %d processes' % nprocs)
pool = mp.Pool(nprocs, initializer=init, initargs=(port,mode))
start = time.time()
ret_data = pool.map(worker,[1],chunksize=1)
print('took %4.3f seconds' % (time.time()-start))
pool.close()