当前位置:首页 > Python > 正文内容

Python asyncio 异步编程实战指南

admin2周前 (03-22)Python23

Python 的 asyncio 库自 Python 3.4 引入以来,已经成为异步编程的标准工具。相比于传统的多线程或多进程模型,asyncio 提供了更轻量级的并发方案,特别适合 I/O 密集型任务。本文将通过多个实战示例,深入探讨 asyncio 的核心概念和高级用法。

一、asyncio 核心概念

asyncio 的核心是事件循环(Event Loop)和协程(Coroutine)。事件循环负责调度和执行异步任务,而协程则是使用 async/await 语法定义的异步函数。理解这些概念是掌握 asyncio 的基础。

协程不同于普通函数,调用协程不会立即执行,而是返回一个协程对象。只有将协程交给事件循环运行时,它才会真正执行。这就是为什么我们需要使用 asyncio.run() 或 loop.run_until_complete() 来启动异步程序。

二、基础异步编程示例

让我们从一个简单的异步 HTTP 请求示例开始:


import asyncio
import aiohttp

async def fetch_data(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        'https://httpbin.org/delay/1',
        'https://httpbin.org/delay/1',
        'https://httpbin.org/delay/1'
    ]
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_data(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        print(f"成功获取 {len(results)} 个响应")

asyncio.run(main())

这个例子展示了 asyncio 的基本用法:定义异步函数、创建任务、并发执行。注意我们使用了 aiohttp 库,它是专门为 asyncio 设计的 HTTP 客户端。

三、高级任务控制

asyncio 提供了多种任务控制机制,让我们能够更精细地管理异步操作。

1. 超时控制

在异步编程中,超时控制非常重要。asyncio 提供了 asyncio.wait_for() 来实现这一点:


import asyncio

async def slow_operation():
    await asyncio.sleep(5)
    return "操作完成"

async def main_with_timeout():
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=2)
        print(result)
    except asyncio.TimeoutError:
        print("操作超时!")

asyncio.run(main_with_timeout())

2. 任务取消

有时我们需要主动取消正在运行的任务:


import asyncio

async def background_task():
    try:
        for i in range(10):
            await asyncio.sleep(1)
            print(f"进度: {i+1}/10")
    except asyncio.CancelledError:
        print("任务被取消")
        raise

async def main_with_cancellation():
    task = asyncio.create_task(background_task())
    await asyncio.sleep(3)
    task.cancel()
    
    try:
        await task
    except asyncio.CancelledError:
        print("主程序捕获取消异常")

asyncio.run(main_with_cancellation())

四、并发模式对比

asyncio 提供了多种并发执行模式,每种都有其适用场景。

1. asyncio.gather()

gather 是最常用的并发方式,它会并发执行所有任务并等待全部完成:


import asyncio

async def task_with_id(task_id, delay):
    await asyncio.sleep(delay)
    return f"任务 {task_id} 完成"

async def demonstrate_gather():
    tasks = [task_with_id(i, 1) for i in range(5)]
    results = await asyncio.gather(*tasks)
    print(results)  # 所有结果按顺序返回

asyncio.run(demonstrate_gather())

2. asyncio.wait()

wait 提供了更灵活的控制,可以指定返回条件:


import asyncio

async def demonstrate_wait():
    tasks = [asyncio.create_task(task_with_id(i, i)) for i in range(1, 5)]
    
    # 等待第一个任务完成
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    
    print(f"已完成 {len(done)} 个任务")
    for task in done:
        print(task.result())
    
    # 取消剩余任务
    for task in pending:
        task.cancel()

asyncio.run(demonstrate_wait())

3. asyncio.as_completed()

as_completed 按完成顺序迭代结果:


import asyncio

async def demonstrate_as_completed():
    tasks = [task_with_id(i, i) for i in range(3, 0, -1)]
    
    for coro in asyncio.as_completed(tasks):
        result = await coro
        print(result)  # 按完成顺序输出

asyncio.run(demonstrate_as_completed())

五、实际应用:批量文件处理

让我们通过一个实际场景来展示 asyncio 的威力。假设我们需要处理多个大文件:


import asyncio
import aiofiles
import os

async def process_file(file_path):
    async with aiofiles.open(file_path, 'r') as f:
        content = await f.read()
    # 模拟文件处理
    await asyncio.sleep(0.5)
    return len(content)

async def batch_process_files(directory):
    files = [os.path.join(directory, f) 
             for f in os.listdir(directory) 
             if f.endswith('.txt')]
    
    tasks = [process_file(file) for file in files]
    sizes = await asyncio.gather(*tasks)
    
    total = sum(sizes)
    print(f"处理了 {len(files)} 个文件,总大小: {total} 字节")

六、性能优化技巧

1. 使用信号量控制并发数

在处理大量任务时,我们可能需要限制并发数量以避免资源耗尽:


import asyncio

semaphore = asyncio.Semaphore(3)  # 限制最多3个并发任务

async def controlled_task(task_id):
    async with semaphore:
        await asyncio.sleep(1)
        print(f"任务 {task_id} 完成")

async def limited_concurrency():
    tasks = [controlled_task(i) for i in range(10)]
    await asyncio.gather(*tasks)

asyncio.run(limited_concurrency())

2. 避免阻塞事件循环

在协程中执行 CPU 密集型任务会阻塞事件循环。解决方法是将这些任务放到线程池中:


import asyncio

async def blocking_task():
    def cpu_bound():
        total = 0
        for i in range(10**7):
            total += i
        return total
    
    # 将阻塞任务放到线程池
    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(None, cpu_bound)
    return result

async def main_with_executor():
    result = await blocking_task()
    print(f"计算结果: {result}")

asyncio.run(main_with_executor())

七、错误处理最佳实践

异步编程中的错误处理需要特别注意。gather 可以配置返回异常而不是抛出:


import asyncio

async def failing_task(task_id):
    if task_id == 2:
        raise ValueError("任务2失败")
    await asyncio.sleep(0.5)
    return f"任务 {task_id} 成功"

async def error_handling_example():
    tasks = [failing_task(i) for i in range(5)]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"任务 {i} 失败: {result}")
        else:
            print(result)

asyncio.run(error_handling_example())

八、总结

asyncio 为 Python 提供了强大的异步编程能力。通过掌握事件循环、协程、任务控制等核心概念,以及正确使用各种并发模式,我们可以构建出高效的异步应用程序。在实际应用中,合理使用信号量控制并发、避免阻塞操作、做好错误处理,这些都是编写高质量异步代码的关键。

记住,异步编程不是银弹,它最适合 I/O 密集型任务。对于 CPU 密集型任务,考虑使用多进程或其他并发模型。选择正确的工具,才能发挥最大的性能优势。

相关文章

[Python 教程] Pandas 数据分析实战

Pandas 数据分析实战 Pandas 是 Python 数据分析的核心库,提供 DataFrame 和 Series 数据结构。本文介绍 Pandas 的实用技巧。 一、创建 DataFrame...

Python 装饰器完全指南:从入门到实战

装饰器本质上是一个接受函数作为参数并返回新函数的高阶函数。理解这一点是掌握装饰器的关键。想象一下,你有一个函数,你想在它执行前后添加一些额外的逻辑,比如日志记录、性能测试、权限验证等。装饰器就是为这种...

Python 装饰器从入门到实战:5 个实用场景详解

装饰器(Decorator)是 Python 中一种优雅的设计模式,它允许我们在不修改原函数代码的前提下,动态地给函数添加功能。想象一下,你有一个已经写好的函数,现在想给它添加日志记录、性能监控或权限...

Python 中利用 functools.lru_cache 实现高效缓存:从入门到进阶

Python 中利用 functools.lru_cache 实现高效缓存:从入门到进阶 在日常 Python 开发中,我们经常会遇到重复计算相同输入的问题,比如递归计算斐波那契数列、多次调用相同参...

Python装饰器实战指南:从入门到精通

Python 装饰器是许多开发者既熟悉又陌生的功能。熟悉是因为我们在框架中经常看到 @符号,陌生是因为很多人只是知其然不知其所以然。本文将从零开始,通过实际案例深入讲解装饰器的工作原理和应用场景。...

Python 装饰器完全指南:从入门到精通

什么是装饰器?装饰器本质上是一个 Python 函数,它可以让其他函数在不需要做任何代码变动的前提下增加额外功能。装饰器的返回值也是一个函数对象。在 Python 中,装饰器使用 @decorator...

发表评论

访客

看不清,换一张

◎欢迎参与讨论,请在这里发表您的看法和观点。