1、同步和异步
通常来说,程序都是顺序执行,同一时刻只会发生一件事。如果一个函数方法依赖于另一个函数方法的结果,它只能等待那个函数方法结束才能继续执行,从用户的角度来说,整个程序才算运行完毕。同步是指完成事务的逻辑,先执行第一个事务,如果阻塞了,会一直等待,直到这个事务完成,再执行第二个事务,顺序执行异步是和同步相对的,异步是指在处理调用这个事务的之后,不会等待这个事务的处理结果,直接处理第二个事务去了,通过状态、通知、回调来通知调用者处理结果。多线程和多进程都是通过异步的方式处理事物。
2、Python 多进程
进程是资源的集合,是最小的资源单位。是一个程序在一个数据集上的一次动态执行过程。进程一般由程序、数据集、进程控制块三部分组成。。多进程适合执行计算密集型任务(如:视频编码解码、数据处理、科学计算等)、可以分解为多个并行子任务并能合并子任务执行结果的任务,以及在内存使用方面没有任何限制且不强依赖于I/O操作的任务。
Python的多线程只能运行在单核上,各个线程以并发的方法异步运行。而多进程可以利用CPU的多核,进程数取决于计算机CPU的处理器个数,由于运行在不同的核上,各个进程的运行是并行的。当进程数量大于CPU的内核数量时,等待运行的进程会等到其他进程运行完让出内核为止。如果CPU单核,就无法运行多进程并行。可以使用multiprocessing
库查看CPU核数。
代码如下,
from multiprocessing import cpu_count print(cpu_count())
3、Python 多进程的使用
Python中的多线程其实并不是真正的多线程,如要充分地使用多核CPU的资源,在Python中大部分情况需要使用多进程。Python提供了好用的多进程包multiprocessing
,只需要定义一个函数,Python会完成其他所有事情。借助这个包,可以轻松完成从单进程到并发执行的转换。multiprocessing
支持子进程、通信和共享数据、执行不同形式的同步,提供了Process
、Queue
、Pipe
、Lock
等组件。
1)创建函数执行单个进程
import multiprocessing import time def worker(interval): n = 5 while n > 0: print("The time is {0}".format(time.ctime())) time.sleep(interval) n -= 1 if __name__ == "__main__": p = multiprocessing.Process(target = worker, args = (3,)) p.start() print("p.pid:", p.pid) print("p.name:", p.name) print("p.is_alive:", p.is_alive())
2)创建函数执行多个进程
import multiprocessing import time def worker_1(interval): print("worker_1") time.sleep(interval) print("end worker_1") def worker_2(interval): print("worker_2") time.sleep(interval) print("end worker_2") def worker_3(interval): print("worker_3") time.sleep(interval) print("end worker_3") if __name__ == "__main__": p1 = multiprocessing.Process(target = worker_1, args = (2,)) p2 = multiprocessing.Process(target = worker_2, args = (3,)) p3 = multiprocessing.Process(target = worker_3, args = (4,)) p1.start() p2.start() p3.start() print("The number of CPU is:" + str(multiprocessing.cpu_count())) for p in multiprocessing.active_children(): print("child p.name:" + p.name + "\tp.id" + str(p.pid))
3)通过自定义进程类
import multiprocessing import time class ClockProcess(multiprocessing.Process): def __init__(self, interval): multiprocessing.Process.__init__(self) self.interval = interval def run(self): n = 5 while n > 0: print("the time is {0}".format(time.ctime())) time.sleep(self.interval) n -= 1 if __name__ == '__main__': p = ClockProcess(3) p.start()
注意:调用进程的start()
方法时,会自动调用run()
方法。
4、Python 进程锁
多个进程需要访问共享资源,也需要通过锁机制来解决数据一致性等问题。
import multiprocessing import sys def worker_with(lock, f): with lock: fs = open(f, 'a+') n = 10 while n > 1: fs.write("Lock 通过 with\n") n -= 1 fs.close() def worker_no_with(lock, f): lock.acquire() try: fs = open(f, 'a+') n = 10 while n > 1: fs.write("Lock 直接 操作\n") n -= 1 fs.close() finally: lock.release() if __name__ == "__main__": lock = multiprocessing.Lock() f = "file.txt" w = multiprocessing.Process(target = worker_with, args=(lock, f)) nw = multiprocessing.Process(target = worker_no_with, args=(lock, f)) w.start() nw.start()
5、Semaphore信号量
Semaphore是用来控制对共享资源的访问量,可以控制同一时刻进程的并发数量。
import multiprocessing import time def worker(s, i): s.acquire() print(multiprocessing.current_process().name + "acquire"); time.sleep(i) print(multiprocessing.current_process().name + "release\n"); s.release() if __name__ == "__main__": s = multiprocessing.Semaphore(2) for i in range(5): p = multiprocessing.Process(target = worker, args=(s, i*2)) p.start()
6、Event事件
Event事件用于主线程控制其他线程的执行,以实现进程间同步通信。
方法 | 描述 |
Event().wait() | 插入在进程中插入一个标记(flag) 默认为 false, 然后flag为false时 程序会停止运行 进入阻塞状态 |
Event().set() | 使flag为Ture 然后程序会停止运行 进入运行状态 |
Event().clear() | 使flag为false 然后程序会停止运行 进入阻塞状态 |
Event().is_set() | 判断flag 是否为True 是的话 返回True 不是 返回false |
代码如下,
import multiprocessing import time def wait_for_event(e): print("wait_for_event: starting") e.wait() print("wairt_for_event: e.is_set()->" + str(e.is_set())) def wait_for_event_timeout(e, t): print("wait_for_event_timeout:starting") e.wait(t) print("wait_for_event_timeout:e.is_set->" + str(e.is_set())) if __name__ == "__main__": e = multiprocessing.Event() w1 = multiprocessing.Process(name = "block", target = wait_for_event, args = (e,)) w2 = multiprocessing.Process(name = "non-block", target = wait_for_event_timeout, args = (e, 2)) w1.start() w2.start() time.sleep(3) e.set() print("main: event is set")
7、Queue安全队列
Queue
是多进程安全的队列,可以使用Queue
实现多进程之间的数据传递。put
方法用以插入数据到队列中,put()方法还有两个可选参数:blocked
和timeout
。如果blocked
为True
(默认值),并且timeout
为正值,该方法会阻塞timeout
指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full
异常。如果blocked
为False
,但该Queue
已满,会立即抛出Queue.Full
异常。
get()
方法可以从队列读取并且删除一个元素。同样,get()
方法有两个可选参数:blocked
和timeout
。如果blocked
为True
(默认值),并且timeout
为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty
异常。如果blocked
为False
,有两种情况存在,如果Queue
有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty
异常。
代码如下,
import multiprocessing def writer_proc(q): try: q.put(1, block = False) except: pass def reader_proc(q): try: print(q.get(block = False)) except: pass if __name__ == "__main__": q = multiprocessing.Queue() writer = multiprocessing.Process(target=writer_proc, args=(q,)) writer.start() reader = multiprocessing.Process(target=reader_proc, args=(q,)) reader.start() reader.join() writer.join()
8、Pipe管道
Pipe
方法返回(p1, p2)
代表一个管道的两个端。Pipe
方法有duplex
参数,如果duplex
参数为True(默认值),则管道是全双工模式,即p1
和p2
均可收发。duplex
为False
,p1
只能接受消息,p2只能发送消息。
send()
和recv()
方法分别为发送和接受消息的方法。如在全双工模式下,可以调用p1.send()
发送消息,p1.recv()
接收消息。如没消息可接收,recv()
方法会一直阻塞。如果管道已经被关闭,则recv()
方法会抛出EOFError
。
import multiprocessing import os def consumer(pipe): conn1, conn2 = pipe print('consumer conn1 id',id(conn1)) print('consumer conn2 id',id(conn2)) print("process consumer id", os.getpid()) conn2.close() # input pipe close 1 while True: try: item = conn1.recv() except EOFError: print('EOFError') break print(item) print('consumer done') def producer(listArr, conn2): for item in listArr: conn2.send(item) if __name__ == '__main__': (conn1, conn2) = multiprocessing.Pipe() cons_p = multiprocessing.Process(target=consumer, args=((conn1,conn2),)) cons_p.start() conn1.close() # output pipe clsoe 1 arr = [1,2,3,4,5] producer(arr, conn2) print(' main conn1 id',id(conn1)) print(' main conn2 id',id(conn2)) conn2.close() # input pipe close 2
Pool
类可提供指定数量的进程供用户调用,当新的请求提交到Pool
中时,如果池还没有满,就会创建一个新的进程来执行请求。 如果池满,请求就会先等待,直到池中有进程结束,才会创建新的进程来执行这些请求。对于性能优化和数量限制很有用。
1)使用非阻塞进程池
方法 | 描述 |
apply_async(func[, args[, kwds[, callback]]]) | 此方法是是非阻塞。 |
apply(func[, args[, kwds]]) | 此方法是阻塞的。 |
close() | 关闭pool,使其不在接受新的任务。 |
terminate() | 结束工作进程,不在处理未完成的任务。 |
join() | 主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。 |
代码如下,
#coding: utf-8 import multiprocessing import time def func(msg): print("msg:", msg) time.sleep(3) print("end") if __name__ == "__main__": pool = multiprocessing.Pool(processes = 3) for i in range(4): msg = "hello %d" %(i) pool.apply_async(func, (msg, )) #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去 print("开始执行") pool.close() pool.join() #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束 print("子进程执行完成")
2)使用阻塞进程池
#coding: utf-8 import multiprocessing import time def func(msg): print("msg:", msg) time.sleep(3) print("end") if __name__ == "__main__": pool = multiprocessing.Pool(processes = 3) for i in range(4): msg = "hello %d" %(i) pool.apply(func, (msg, )) #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去 print("开始执行") pool.close() pool.join() #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束 print("子进程执行完成")