1、多线程和多进程
参考文档:
2、ProcessPoolExecutor 和 Process
ProcessPoolExecutor
是 concurrent.futures
模块中的一部分,提供了高级接口,用于创建和管理进程池,自动处理任务分发和结果收集。Process
是 Python 的 multiprocessing
模块中的一部分,它提供了更低级的接口,允许手动创建和管理单个进程,以及控制进程之间的通信和同步。ProcessPoolExecutor
更适合于 CPU 密集型任务,因为它能够在多个 CPU 核心上并行执行任务。Process
更适合于 I/O 密集型任务或需要更复杂进程间通信的情况。
3、多进程中使用多线程
1)Process 和 ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from multiprocessing import Process from queue import Queue from time import sleep class Handler: def __init__(self): self.queue = Queue() # this object is essential def put(self, item): self.queue.put(item) def run(self): while True: item = self.queue.get() if item == 'exit': break # do other things on the item ... print(item) sleep(1) class Runner: def __init__(self, name): self.name = name self.a = Handler() self.b = Handler() def start(self): for _ in range(3): self.a.put(f'{self.name}: hello a') self.b.put(f'{self.name}: hello b') # 请求正常关闭 self.a.put('exit') self.b.put('exit') with ThreadPoolExecutor() as exe: futures = [exe.submit(r.run) for r in [self.a, self.b]] for f in futures: f.result() # 要求一切都可以被 Pickle 序列化 def run_in_process_pool(): rA = Runner('A') rB = Runner('B') rC = Runner('C') with ProcessPoolExecutor() as exe: futures = [exe.submit(r.start) for r in [rA, rB, rC]] for future in futures: future.result() # 不会对任何内容进行 Pickle 序列化 def run_in_processes(): rA = Runner('A') rB = Runner('B') rC = Runner('C') procs = [Process(target=r.start) for r in [rA, rB, rC]] for p in procs: p.start() for p in procs: p.join() if __name__ == '__main__': # run_in_process_pool() # `TypeError: cannot pickle '_thread.lock' object` run_in_processes() # 正常运行
2)ThreadPoolExecutor 和 ProcessPoolExecutor
import concurrent.futures # 定义一个简单的函数,用于演示任务执行 def worker(task_id): print(f"Task {task_id} started") # 在这里执行任务的逻辑 print(f"Task {task_id} completed") def main(): # 创建一个 ThreadPoolExecutor,指定线程数量 with concurrent.futures.ThreadPoolExecutor(max_workers=2) as thread_executor: # 创建一个 ProcessPoolExecutor,指定进程数量 with concurrent.futures.ProcessPoolExecutor(max_workers=2) as process_executor: # 提交多个任务给线程池执行 thread_task_ids = [1, 2] thread_futures = [thread_executor.submit(worker, task_id) for task_id in thread_task_ids] # 提交多个任务给进程池执行 process_task_ids = [3, 4] process_futures = [process_executor.submit(worker, task_id) for task_id in process_task_ids] # 等待所有任务完成 concurrent.futures.wait(thread_futures + process_futures) if __name__ == "__main__": main()