Python 异步编程

在现代 Python 开发中,随着应用对高并发、高性能的需求不断增加,传统同步编程方式在处理大量 I/O 操作时逐渐显得力不从心。异步编程通过极大提升程序的并发能力和资源利用率,成为了Web开发、微服务、实时通讯、数据抓取等领域的核心技术。

1. 问题场景

在开始学习 Python 的异步编程之前,先来看看传统同步编程在处理I/O 密集型任务时,存在哪些问题。I/O 密集型任务的特点是:程序在执行过程中,需要频繁等待输入/输出操作完成。例如:

  • 等待网络请求返回数据
  • 等待磁盘读写完成
  • 等待数据库查询结果

在这些等待期间,程序会阻塞CPU 处于空闲,无法继续做其他事情,从而大大降低了整体效率。下面通过一段代码,来模拟这个问题场景:

import time

def task1():
    time.sleep(5)  # 模拟一个耗时 5 秒的 I/O 操作
    return 10

def task2():
    time.sleep(3)  # 模拟一个耗时 3 秒的 I/O 操作
    return 20

def main():
    result = task1()
    print('任务1执行结果:', result)
    result = task2()
    print('任务2执行结果:', result)

if __name__ == '__main__':
    start = time.time()
    main()
    print('总耗时:', time.time() - start)
任务1执行结果: 10
任务2执行结果: 20
8.023201704025269

可以看到,task1() 执行时,程序进入 sleep(5),必须等待 5 秒才能继续。task2() 只有在 task1() 完成后才能开始,又等待 3 秒。整体耗时大约 8 秒

在同步编程中,一旦遇到 I/O 阻塞,整个程序会停下来等待。这导致:

  • CPU 资源被浪费
  • 整体执行效率低下
  • 单位时间内完成的任务量少

理想情况应该是:

当 task1 处于等待时,CPU 可以切换去执行其他任务(比如 task2),而不是傻等着。这样,程序可以同时推进多个任务,从而显著提升效率。这就是异步编程要解决的核心问题。

2. 解决方法

我们希望当程序遇到 I/O 等待时,能够挂起当前任务,并切换执行其他任务。并且在合适的时机,恢复挂起的任务,继续完成后续操作。

虽然这听起来很简单,但要正确地管理任务的挂起和恢复时机,需要一套专门的调度机制。幸运的是,Python 内置的 asyncio 模块,正好为我们提供了这样的能力 —— 事件循环(Event Loop)

它的工作流程大致是这样的:

  • 首先,创建一个事件循环
  • 然后,将需要执行的任务注册到事件循环中
  • 最后,启动事件循环,开始调度和执行各个任务

需要注意的是,为了让事件循环正确地识别和调度任务,我们需要遵循以下两个规则:

  • 任务函数定义时,需要使用 async def 定义,表示这是一个异步函数,是需要交给事件循环管理的
  • 在任务函数内部,当遇到需要等待其他异步操作的地方,需要使用 await。这就告诉事件循环:此处可以挂起当前任务,等异步操作完成后再恢复执行。这样可以避免程序在等待期间白白占用资源,从而提升整体效率。

通过这套机制,我们可以让程序在遇到 I/O 等待时不会空耗资源,从而大幅提升整体的执行效率。

import asyncio
import time


async def task1():
    print('task1 开始执行')
    # await 后面的对象必须是 async def 定义的对象,而 sleep 则不是,所以会报错
    # await time.sleep(5)
    # 需要切换为 async def 版本的 sleep 函数
    await asyncio.sleep(5)
    print('task1 结束执行')
    return 10

async def task2():
    print('task2 开始执行')
    # await time.sleep(5)
    await asyncio.sleep(3)
    print('task2 结束执行')
    return 20


# 任务执行入口点
async def main():
    print('main 开始执行')
    # 获得事件循环
    event_loop = asyncio.get_running_loop()
    # 手动注册任务
    t1 = event_loop.create_task(task1())
    t2 = event_loop.create_task(task2())

    # 等待事件循环调度执行、获得 t1 任务的结果
    result = await t1
    print('任务1执行结果:', result)

    # 等待事件循环调度执行、获得 t2 任务的结果
    result = await t2
    print('任务2执行结果:', result)

    print('main 结束执行')


if __name__ == '__main__':
    start = time.time()
    # 创建事件循环
    event_loop = asyncio.get_event_loop()
    # 启动事件循环
    event_loop.run_until_complete(main())
    print(time.time() - start)
main 开始执行
task1 开始执行
task2 开始执行
task2 结束执行
task1 结束执行
任务1执行结果: 10
任务2执行结果: 20
main 结束执行
5.013811826705933

通过使用异步方式改造代码,我们发现两个任务的执行时间由原来的 8 秒缩减到 5 秒,提高了任务的执行效率。另外,在事件循环初始化这部分代码稍显复杂,我们可以使用更简洁的代码,如下所示:

# 任务执行入口点
async def main():
    # 1. 获得事件循环
    # 2. 注册任务函数
    # 3. 等待所有任务执行结束
    results = await asyncio.gather(task1(), task2())
    print(results)


if __name__ == '__main__':
    start = time.time()
    # 创建事件循环,并启动任务执行
    asyncio.run(main())
    # 大概需要 5 秒
    print(time.time() - start)

3. 细节探讨

上个例子中,我们提到 async 和 await 关键字,下面我们再详细了解下这两个关键字相关内容。

3.1 async

在 Python 中,async 关键字主要用于定义异步函数/协程函数,这种函数可以执行非阻塞操作(通过 await 实现),并允许在等待某些任务完成时,程序继续执行其他任务。使用 async 定义异步函数/协程函数如下代码所示:

import asyncio

async def task():
    print('task')

def demo():
    coro = task()
    # <class 'coroutine'>
    print(type(coro))

if __name__ == '__main__':
    demo()

关于这段代码,有以下几个重要点需要理解:

  • 协程函数的返回值不是直接的结果,而是一个 协程对象
  • 协程对象 是是一个未开始执行的任务。要执行这个任务,必须通过事件循环来调度。

import asyncio

async def task():
    print('task')

def demo():
    coro = task()
    # 创建事件循环,并将 coro 协程对象注册到事件循环中,由事件循环调度运行
    asyncio.run(coro)

if __name__ == '__main__':
    demo()

Python 中的异步编程之所以需要使用协程而不是普通函数,主要是因为协程能够支持暂停和恢复执行,而普通函数则不能。

在异步编程中,程序常常需要处理 I/O 操作(例如读取文件、等待网络响应等),这些操作可能需要较长的时间。如果在这个过程中使用普通函数,程序将会阻塞,直到 I/O 操作完成才能继续执行其他任务。而协程通过 await 关键字,可以在等待某个操作完成时,暂停当前任务的执行,让事件循环去执行其他任务。这种方式能够高效地利用系统资源,避免不必要的阻塞。

3.2 await

在 Python 中, await 关键字主要用在 async def 定义的协程函数中,用于暂停协程的执行,直到异步操作完成,然后继续执行后续代码。

import asyncio
import time

async def sub_task():
    print('sub task')
    return 100

async def task():
    # 暂停当前的 task 执行
    # 等待 sub_task 执行完毕
    result = await sub_task()
    # 再执行后续代码
    print('result:', result)

def demo():
    coro = task()
    asyncio.run(coro)

if __name__ == '__main__':
    demo()

await 关键字后面可以跟 coroutinefuture 对象,如果 await 的是其他类型的对象可能会出现:

TypeError: xxx can't be used in 'await' expression

当 await 后面是协程对象时,事件循环暂停当前协程执行,然后继续执行 await 后面的协程对象中的代码, 此时是不会从 task1 切换到 task2。

import asyncio

async def task02():
    print('task02')
    return 200

async def sub_task():
    print('sub_task')
    return 100

async def task01():
    print('task01')
    result = await sub_task()
    return result

async def start():
    # 在事件循环中注册两个任务 task01 和 task02
    # 并等待两个任务的执行结果
    result = await asyncio.gather(task01(), task02())
    print(result)

def demo():
    asyncio.run(start())


if __name__ == '__main__':
    demo()
task01
sub_task
task02
[100, 200]

但是,当 await 后面的对象换成了 future 对象,则事件循环会挂起当前任务,并转到其他的任务去执行。我们也可以理解为,await future 时,就是事件循环切换任务的一个时机。请看下面的示例代码:

import asyncio

async def task02():
    print('task02')
    return 200

async def sub_task():
    print('sub_task')
    return 100

async def task01():
    print('task01')
    # 注意下面的代码:sleep 内部会执行 await future,此时任务会切换到任务2去执行
    await asyncio.sleep(3)
    result = await sub_task()
    return result

async def start():
    result = await asyncio.gather(task01(), task02())
    print(result)

def demo():
    asyncio.run(start())


if __name__ == '__main__':
    demo()
task01
task02
sub_task
[100, 200]

从执行结果来看,await future 时,事件循环确实从 task01 这个任务线切换到了 task02 这个任务线上。

3.3 future

Future 对象是异步编程中的一个重要概念,它的具体作用如下图所示:

假设:我们需要 Python 事件循环来管理两个任务。执行过程如下:

  1. 当事件循环执行到 task1 -> sub_task -> 网络接收数据 时,需要等待,为了不阻塞事件循环,会创建一个新的线程,并把该任务扔到该线程中去执行。
  2. 创建一个 future 对象,让事件循环和新创建的线程对象持有。当新线程接收到数据后,通过 future 对象将数据传递到 task1 任务对象中。当事件循环检测到 future 对象有返回结果,继续调度 task1 任务执行。
  3. 由于 task1 任务已经被挂起,事件循环就可以切换到任务2去执行,避免在任务1上等待。

从这里我们可以看到 future 对象是负责异步执行等待任务的对象和事件循环之间的桥梁。我们再回头看下 future 这个单词,它表示未来的意思。也就是说,它的作用就是帮助我们的程序,获得未来某个时间的操作结果,这才是真正需要切换任务的时机点。

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

def thread_task(futrue):
    time.sleep(5)
    futrue.set_result(100)


async def sub_task():
    print('sub task 开始')

    # 创建 future 对象
    event_loop = asyncio.get_running_loop()
    future = event_loop.create_future()
    # 创建线程池对象
    executor = ThreadPoolExecutor()
    # 在其他线程执行任务
    event_loop.run_in_executor(executor, thread_task, future)
    # 挂起当前任务,事件循环调度其他任务执行
    result = await future

    print('sub task 结束')
    return result


async def task1():
    print('task1 开始')
    result = await sub_task()
    print('task1 结束')
    return result


async def task2():
    print('task2 开始')
    await asyncio.sleep(1)
    print('task2 结束')
    return 200


async def main():
    result = await asyncio.gather(task1(), task2())
    print(result)


if __name__ == '__main__':
    asyncio.run(main())

程序执行结果:

task1 开始
sub task 开始
task2 开始
task2 结束
sub task 结束
task1 结束
[100, 200]

4. 最后小结

异步编程是一种在单线程中实现并发执行的技术,它通过“事件循环”来调度任务,遇到会等待的操作时(比如网络请求或文件读写),就把控制权交给其他任务,自己等结果再回来继续处理。
这样就避免了“等着发呆”的情况,大大提升了运行效率。与多线程不同,异步不靠开启多个线程,而是通过任务切换来充分利用时间资源。
通俗来说:异步编程就是让单线程也能干多件事,不等白等,高效非阻塞。

当程序主要瓶颈是文件读写、网络请求、数据库操作等 IO 操作时,异步编程可以在等待 IO 响应期间处理其他任务,最大化资源利用率。相比开多线程,异步在性能和资源占用上更轻量。简言之,Python 异步编程在IO密集型任务上最优,避免线程阻塞等待。

写代码时需要具备“异步思维”——凡是会等待的操作,都应该考虑是否使用 await、是否用异步库实现。每写一个任务,都应自问:这个地方是不是可以不阻塞?简言之,关注是否等待,主动识别可异步处理的环节。

未经允许不得转载:一亩三分地 » Python 异步编程
评论 (0)

9 + 7 =