Python 异步编程 协程(async/await)

Python 中,异步编程可以通过协程(async/await)来实现。协程可以编写非阻塞的代码,使得可以在 I/O 操作(如网络请求、文件读写等)时并发处理其他任务,而不是等待操作完成。本文主要介绍Python 中协程(async/await)的使用,以及相关的示例代码。

1、基本概念

Python 异步编程协程(async/await)是以进程、线程、协程、函数/方法作为执行任务程序的基本单位。在 Python 中,3.5 版本引入了关于协程的语法糖 async 和 await,在 3.7 版本可以通过 asyncio.run() 运行一个协程,建议学习协程时使用 3.7+版本。 async def 用于定义一个异步函数(协程),返回一个协程对象。await 用于等待一个协程完成,它只能在另一个异步函数内部使用。

2、Python 协程(async/await)

协程,英文叫做 Coroutine,又称微线程,纤程,协程是一种用户态的轻量级线程。

协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来时,恢复先前保存的寄存器上下文和栈。因此协程能保留上一次调用时的状态,即所有局部状态的一个特定组合,每次过程重入时,就相当于进入上一次调用的状态。

协程的本质是个单进程,协程相对于多进程来说,无需线程上下文切换的开销,无需原子操作锁定及同步的开销,编程模型也很简单。

可以使用协程来实现异步操作,如在网络爬虫场景下,我们发出一个请求之后,需要等待一定的时间才能得到响应,但其实在这个等待过程中,程序可以干许多其他的事情,等到响应得到之后才切换回来继续处理,这样可以充分利用 CPU 和其他资源,这也是异步协程的优势。一个线程内的多个协程是串行执行的,不能利用多核,协程不适合计算密集型的场景。协程适合I/O 阻塞型。

注意:对协程的执行顺序有要求,但执行代码非原子操作,且被 await 语句隔开。可以使用锁解决。锁的使用可以参考下面多线程文档中的介绍。

相关文档Python 异步编程 多线程

3、Python 协程(async/await)的使用

从 Python 3.4 开始,Python 中加入了协程的概念,但此版本的协程依然是以生成器对象为基础,在 Python 3.5 则增加了 async/await,使得协程的实现更加方便。

术语

描述

event_loop

事件循环,相当于一个无限循环,

可以把一些函数注册到这个事件循环上,

当满足条件发生时,

就会调用对应的处理方法。

coroutine

中文翻译叫协程,

在 Python 中常指代为协程对象类型,

可以将协程对象注册到时间循环中,

它会被事件循环调用。

可以使用 async 关键字来定义一个方法,

这个方法在调用时不会立即被执行,

而是返回一个协程对象。

task

任务,它是对协程对象的进一步封装,

包含了任务的各个状态。

future

代表将来执行或没有执行的任务的结果,

实际上和 task 没有本质区别。 

另外还需要了解 async/await 关键字,

它是从 Python 3.5 才出现的,专用于定义协程。

其中,async 定义一个协程,

await 用来挂起阻塞方法的执行。

1)定义协程

import asyncio
 
async def execute(x):
    print('x = ', x)
 
coroutine = execute(1)
print('协程:', coroutine)
print('After calling execute')
 
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)
print('After calling loop')

2)使用loop.create_task()创建task任务

import asyncio
 
async def execute(x):
    print('Number:', x)
    return x
 
coroutine = execute(1)
print('协程:', coroutine)
print('After calling execute')
 
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print('Task:', task)
loop.run_until_complete(task)
print('Task:', task)
print('After calling loop')

3)使用asyncio.ensure_future()创建task任务

import asyncio
 
async def execute(x):
    print('Number:', x)
    return x
 
coroutine = execute(1)
print('协程:', coroutine)
print('After calling execute')
 
task = asyncio.ensure_future(coroutine)
print('Task:', task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
print('After calling loop')

4)绑定回调函数方法

可以为某个 task 绑定一个回调函数方法。

import asyncio
import requests
 
async def request():
    url = 'https://www.baidu.com'
    status = requests.get(url)
    return status
 
def callback(task):
    print('Status:', task.result())
 
coroutine = request()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
print('Task:', task)
 
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)

5)调用 result() 获取结果

如不用回调方法,直接在 task 运行完毕之后也可以直接调用 result() 方法获取结果。

import asyncio
import requests
 
async def request():
    url = 'https://www.baidu.com'
    status = requests.get(url)
    return status
 
coroutine = request()
task = asyncio.ensure_future(coroutine)
print('Task:', task)
 
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
print('Task Result:', task.result())

4、多任务协程

如需执行多次请求,则可以创建多个task的列表,执行asynciowait() 方法即可执行。

import asyncio
import requests
 
async def request():
    url = 'https://www.baidu.com'
    status = requests.get(url)
    return status
 
tasks = [asyncio.ensure_future(request()) for _ in range(5)]
print('Tasks:', tasks)
 
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
 
for task in tasks:
    print('Task Result:', task.result())

1)并发运行多个异步任务

import asyncio

async def task(name, delay):
    print(f"Task {name} started")
    await asyncio.sleep(delay)  # 模拟异步操作
    print(f"Task {name} completed")

async def main():
    # 创建多个异步任务
    tasks = [
        task("A", 2),
        task("B", 1),
        task("C", 3)
    ]
    # 等待所有任务完成
    await asyncio.gather(*tasks)

asyncio.run(main())

2)异步 I/O 操作

import aiohttp
import asyncio

async def fetch_url(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

async def main():
    urls = [
        'http://example.com',
        'http://python.org',
        'http://github.com'
    ]
    # 创建多个异步任务
    tasks = [fetch_url(url) for url in urls]
    # 等待所有任务完成
    responses = await asyncio.gather(*tasks)
    for url, response in zip(urls, responses):
        print(f"URL: {url}, Length: {len(response)}")

asyncio.run(main())

注意:

asyncio.sleep: 用于模拟异步操作。它允许事件循环在等待期间执行其他任务。

asyncio.gather: 用于并发运行多个异步任务,并等待它们全部完成。

aiohttp: 是一个第三方库,用于进行异步 HTTP 请求。在示例中,它用来异步获取网页内容。

推荐阅读
cjavapy编程之路首页