Python asyncio 异步编程实战指南
Python 的 asyncio 库自 Python 3.4 引入以来,已经成为异步编程的标准工具。相比于传统的多线程或多进程模型,asyncio 提供了更轻量级的并发方案,特别适合 I/O 密集型任务。本文将通过多个实战示例,深入探讨 asyncio 的核心概念和高级用法。
一、asyncio 核心概念
asyncio 的核心是事件循环(Event Loop)和协程(Coroutine)。事件循环负责调度和执行异步任务,而协程则是使用 async/await 语法定义的异步函数。理解这些概念是掌握 asyncio 的基础。
协程不同于普通函数,调用协程不会立即执行,而是返回一个协程对象。只有将协程交给事件循环运行时,它才会真正执行。这就是为什么我们需要使用 asyncio.run() 或 loop.run_until_complete() 来启动异步程序。
二、基础异步编程示例
让我们从一个简单的异步 HTTP 请求示例开始:
import asyncio
import aiohttp
async def fetch_data(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/1'
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_data(session, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f"成功获取 {len(results)} 个响应")
asyncio.run(main())
这个例子展示了 asyncio 的基本用法:定义异步函数、创建任务、并发执行。注意我们使用了 aiohttp 库,它是专门为 asyncio 设计的 HTTP 客户端。
三、高级任务控制
asyncio 提供了多种任务控制机制,让我们能够更精细地管理异步操作。
1. 超时控制
在异步编程中,超时控制非常重要。asyncio 提供了 asyncio.wait_for() 来实现这一点:
import asyncio
async def slow_operation():
await asyncio.sleep(5)
return "操作完成"
async def main_with_timeout():
try:
result = await asyncio.wait_for(slow_operation(), timeout=2)
print(result)
except asyncio.TimeoutError:
print("操作超时!")
asyncio.run(main_with_timeout())
2. 任务取消
有时我们需要主动取消正在运行的任务:
import asyncio
async def background_task():
try:
for i in range(10):
await asyncio.sleep(1)
print(f"进度: {i+1}/10")
except asyncio.CancelledError:
print("任务被取消")
raise
async def main_with_cancellation():
task = asyncio.create_task(background_task())
await asyncio.sleep(3)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("主程序捕获取消异常")
asyncio.run(main_with_cancellation())
四、并发模式对比
asyncio 提供了多种并发执行模式,每种都有其适用场景。
1. asyncio.gather()
gather 是最常用的并发方式,它会并发执行所有任务并等待全部完成:
import asyncio
async def task_with_id(task_id, delay):
await asyncio.sleep(delay)
return f"任务 {task_id} 完成"
async def demonstrate_gather():
tasks = [task_with_id(i, 1) for i in range(5)]
results = await asyncio.gather(*tasks)
print(results) # 所有结果按顺序返回
asyncio.run(demonstrate_gather())
2. asyncio.wait()
wait 提供了更灵活的控制,可以指定返回条件:
import asyncio
async def demonstrate_wait():
tasks = [asyncio.create_task(task_with_id(i, i)) for i in range(1, 5)]
# 等待第一个任务完成
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print(f"已完成 {len(done)} 个任务")
for task in done:
print(task.result())
# 取消剩余任务
for task in pending:
task.cancel()
asyncio.run(demonstrate_wait())
3. asyncio.as_completed()
as_completed 按完成顺序迭代结果:
import asyncio
async def demonstrate_as_completed():
tasks = [task_with_id(i, i) for i in range(3, 0, -1)]
for coro in asyncio.as_completed(tasks):
result = await coro
print(result) # 按完成顺序输出
asyncio.run(demonstrate_as_completed())
五、实际应用:批量文件处理
让我们通过一个实际场景来展示 asyncio 的威力。假设我们需要处理多个大文件:
import asyncio
import aiofiles
import os
async def process_file(file_path):
async with aiofiles.open(file_path, 'r') as f:
content = await f.read()
# 模拟文件处理
await asyncio.sleep(0.5)
return len(content)
async def batch_process_files(directory):
files = [os.path.join(directory, f)
for f in os.listdir(directory)
if f.endswith('.txt')]
tasks = [process_file(file) for file in files]
sizes = await asyncio.gather(*tasks)
total = sum(sizes)
print(f"处理了 {len(files)} 个文件,总大小: {total} 字节")
六、性能优化技巧
1. 使用信号量控制并发数
在处理大量任务时,我们可能需要限制并发数量以避免资源耗尽:
import asyncio
semaphore = asyncio.Semaphore(3) # 限制最多3个并发任务
async def controlled_task(task_id):
async with semaphore:
await asyncio.sleep(1)
print(f"任务 {task_id} 完成")
async def limited_concurrency():
tasks = [controlled_task(i) for i in range(10)]
await asyncio.gather(*tasks)
asyncio.run(limited_concurrency())
2. 避免阻塞事件循环
在协程中执行 CPU 密集型任务会阻塞事件循环。解决方法是将这些任务放到线程池中:
import asyncio
async def blocking_task():
def cpu_bound():
total = 0
for i in range(10**7):
total += i
return total
# 将阻塞任务放到线程池
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, cpu_bound)
return result
async def main_with_executor():
result = await blocking_task()
print(f"计算结果: {result}")
asyncio.run(main_with_executor())
七、错误处理最佳实践
异步编程中的错误处理需要特别注意。gather 可以配置返回异常而不是抛出:
import asyncio
async def failing_task(task_id):
if task_id == 2:
raise ValueError("任务2失败")
await asyncio.sleep(0.5)
return f"任务 {task_id} 成功"
async def error_handling_example():
tasks = [failing_task(i) for i in range(5)]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"任务 {i} 失败: {result}")
else:
print(result)
asyncio.run(error_handling_example())
八、总结
asyncio 为 Python 提供了强大的异步编程能力。通过掌握事件循环、协程、任务控制等核心概念,以及正确使用各种并发模式,我们可以构建出高效的异步应用程序。在实际应用中,合理使用信号量控制并发、避免阻塞操作、做好错误处理,这些都是编写高质量异步代码的关键。
记住,异步编程不是银弹,它最适合 I/O 密集型任务。对于 CPU 密集型任务,考虑使用多进程或其他并发模型。选择正确的工具,才能发挥最大的性能优势。
