1、多线程和多进程
参考文档:
2、ProcessPoolExecutor 和 Process
ProcessPoolExecutor
是 concurrent.futures
模块中的一部分,提供了高级接口,用于创建和管理进程池,自动处理任务分发和结果收集。Process
是 Python 的 multiprocessing
模块中的一部分,它提供了更低级的接口,允许手动创建和管理单个进程,以及控制进程之间的通信和同步。ProcessPoolExecutor
更适合于 CPU 密集型任务,因为它能够在多个 CPU 核心上并行执行任务。Process
更适合于 I/O 密集型任务或需要更复杂进程间通信的情况。
3、多进程中使用多线程
1)Process 和 ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from multiprocessing import Process
from queue import Queue
from time import sleep
class Handler:
def __init__(self):
self.queue = Queue() # this object is essential
def put(self, item):
self.queue.put(item)
def run(self):
while True:
item = self.queue.get()
if item == 'exit':
break
# do other things on the item ...
print(item)
sleep(1)
class Runner:
def __init__(self, name):
self.name = name
self.a = Handler()
self.b = Handler()
def start(self):
for _ in range(3):
self.a.put(f'{self.name}: hello a')
self.b.put(f'{self.name}: hello b')
# 请求正常关闭
self.a.put('exit')
self.b.put('exit')
with ThreadPoolExecutor() as exe:
futures = [exe.submit(r.run) for r in [self.a, self.b]]
for f in futures:
f.result()
# 要求一切都可以被 Pickle 序列化
def run_in_process_pool():
rA = Runner('A')
rB = Runner('B')
rC = Runner('C')
with ProcessPoolExecutor() as exe:
futures = [exe.submit(r.start) for r in [rA, rB, rC]]
for future in futures:
future.result()
# 不会对任何内容进行 Pickle 序列化
def run_in_processes():
rA = Runner('A')
rB = Runner('B')
rC = Runner('C')
procs = [Process(target=r.start) for r in [rA, rB, rC]]
for p in procs:
p.start()
for p in procs:
p.join()
if __name__ == '__main__':
# run_in_process_pool() # `TypeError: cannot pickle '_thread.lock' object`
run_in_processes() # 正常运行
2)ThreadPoolExecutor 和 ProcessPoolExecutor
import concurrent.futures
# 定义一个简单的函数,用于演示任务执行
def worker(task_id):
print(f"Task {task_id} started")
# 在这里执行任务的逻辑
print(f"Task {task_id} completed")
def main():
# 创建一个 ThreadPoolExecutor,指定线程数量
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as thread_executor:
# 创建一个 ProcessPoolExecutor,指定进程数量
with concurrent.futures.ProcessPoolExecutor(max_workers=2) as process_executor:
# 提交多个任务给线程池执行
thread_task_ids = [1, 2]
thread_futures = [thread_executor.submit(worker, task_id) for task_id in thread_task_ids]
# 提交多个任务给进程池执行
process_task_ids = [3, 4]
process_futures = [process_executor.submit(worker, task_id) for task_id in process_task_ids]
# 等待所有任务完成
concurrent.futures.wait(thread_futures + process_futures)
if __name__ == "__main__":
main()