Python 协程与 async/await 异步编程实战指南
Python 3.5 引入了 async/await 语法,让异步编程变得简洁和直观。协程(Coroutine)是一种用户态的轻量级线程,可以在执行过程中挂起和恢复,特别适合处理 I/O 密集型任务。与传统的多线程相比,协程的切换开销极小,不需要操作系统调度,可以在单线程中实现高并发。
异步编程的核心思想是:当一个任务需要等待 I/O 操作(如网络请求、文件读写、数据库查询)时,不要阻塞整个程序,而是让出 CPU 控制权,让其他任务可以执行。当 I/O 操作完成后,再恢复执行原来的任务。这种非阻塞的执行方式让单个线程能够同时处理成百上千个并发任务。
理解协程的关键在于掌握几个核心概念:async 定义异步函数,await 等待异步操作完成,事件循环负责调度和执行协程。async def 声明的函数会返回一个协程对象,而不是直接执行函数体。只有当这个协程被事件循环调度时,函数体才会真正开始执行。
让我们从一个最简单的异步函数开始。定义异步函数只需要在 def 前面加上 async 关键字。但是,调用异步函数并不会立即执行它,而是返回一个协程对象。要执行这个协程,需要使用 asyncio.run() 函数,它会创建并运行一个事件循环。
在实际应用中,我们经常需要同时执行多个异步任务。asyncio.gather() 函数可以并发运行多个协程,并收集它们的结果。这比顺序执行要快得多,特别是当任务之间没有依赖关系时。gather 会等待所有协程完成,然后返回一个结果列表,顺序与传入的协程顺序一致。
除了 gather,asyncio 还提供了其他任务管理工具。asyncio.wait() 可以更精细地控制任务执行,支持返回完成的和未完成的任务集合。asyncio.as_completed() 则按任务完成的顺序生成结果,先完成的先返回,适合需要尽快处理结果的场景。这些工具让我们能够灵活地管理并发任务。
异步编程最强大的应用之一是网络请求。传统的同步爬虫在请求网络时需要等待响应,效率很低。使用异步 HTTP 客户端(如 aiohttp),我们可以同时发起多个请求,大幅提升爬取速度。每个请求都是独立的异步任务,等待响应时不会阻塞其他请求的发送和接收。
异步数据库操作同样能带来显著的性能提升。当需要执行多个数据库查询时,异步驱动可以并发执行这些查询,而不是顺序等待每个查询完成。这对于数据分析和报表生成等场景特别有用。虽然异步数据库驱动相对较少,但主流数据库(MySQL、PostgreSQL、MongoDB)都有相应的异步库支持。
编写异步代码时需要注意一些最佳实践。避免在异步函数中使用阻塞操作(如 time.sleep,应该用 asyncio.sleep)。不要在 async 函数中忘记 await,否则协程不会真正地等待异步操作完成。使用 try/except/finally 来正确处理异常和资源清理,确保即使发生异常也不会导致资源泄漏。对于复杂的异步逻辑,考虑使用异步上下文管理器来简化资源代码。
异步编程也有一些常见陷阱需要警惕。混用同步和异步代码会导致死锁或性能下降。过度使用 async/await 反而会增加复杂度,只有 I/O 密集型任务才真正受益于异步编程。调试异步代码相对困难,因为执行流程不是线性的,需要借助日志和异步调试工具。理解这些陷阱可以帮助我们写出更稳定、更高效的异步代码。
性能对比是验证异步编程效果的重要方式。通过对比同步和异步版本的相同功能实现,我们能够清楚地看到并发任务数量增加时,异步版本的执行时间增长缓慢,而同步版本线性增长。这种性能优势在处理大量并发请求时尤为明显,也是异步编程在现代高并发应用中流行的原因。
下面通过具体的代码示例来演示协程的使用。首先定义一个简单的异步函数来模拟耗时操作:
import asyncio
import time
async def simulate_io_task(name, duration):
"""模拟 I/O 密集型任务的异步函数"""
print(f"[{time.strftime('%H:%M:%S')}] {name} 开始执行")
await asyncio.sleep(duration) # 模拟 I/O 等待
print(f"[{time.strftime('%H:%M:%S')}] {name} 完成执行")
return f"{name} 的结果"
async def main():
"""主协程函数"""
start = time.time()
# 顺序执行三个任务
result1 = await simulate_io_task("任务1", 2)
result2 = await simulate_io_task("任务2", 1)
result3 = await simulate_io_task("任务3", 1.5)
print(f"结果: {result1}, {result2}, {result3}")
print(f"总耗时: {time.time() - start:.2f} 秒")
asyncio.run(main())
上面的代码顺序执行三个任务,总耗时约为 4.5 秒。使用 asyncio.gather() 可以并发执行这些任务,大幅提升效率:
import asyncio
import time
async def simulate_io_task(name, duration):
"""模拟 I/O 密集型任务的异步函数"""
print(f"[{time.strftime('%H:%M:%S')}] {name} 开始执行")
await asyncio.sleep(duration)
print(f"[{time.strftime('%H:%M:%S')}] {name} 完成执行")
return f"{name} 的结果"
async def main_concurrent():
"""使用 gather 并发执行任务"""
start = time.time()
# 并发执行三个任务
results = await asyncio.gather(
simulate_io_task("任务1", 2),
simulate_io_task("任务2", 1),
simulate_io_task("任务3", 1.5)
)
print(f"结果: {results}")
print(f"总耗时: {time.time() - start:.2f} 秒")
asyncio.run(main_concurrent())
运行并发版本会发现总耗时只有约 2 秒,因为三个任务是同时进行的,总耗时取决于最慢的任务。这就是异步编程的核心优势。
实际应用中,我们经常需要处理超时和异常。asyncio.wait_for() 可以设置超时时间,asyncio.TimeoutError 会在超时时抛出。下面是一个带超时控制的异步请求示例:
import asyncio
import random
async def fetch_data(url):
"""模拟异步网络请求"""
delay = random.uniform(0.5, 3.0)
await asyncio.sleep(delay)
if delay > 2.0:
raise asyncio.TimeoutError(f"请求 {url} 超时")
return f"{url} 的数据"
async def fetch_with_timeout(url, timeout=1.5):
"""带超时控制的异步请求"""
try:
result = await asyncio.wait_for(fetch_data(url), timeout)
return {"url": url, "status": "success", "data": result}
except asyncio.TimeoutError:
return {"url": url, "status": "timeout", "error": "请求超时"}
except Exception as e:
return {"url": url, "status": "error", "error": str(e)}
async def main_timeout():
"""测试带超时的并发请求"""
urls = [
"https://api.example.com/users",
"https://api.example.com/posts",
"https://api.example.com/comments"
]
# 并发请求所有 URL,每个请求最多等待 1.5 秒
results = await asyncio.gather(
*[fetch_with_timeout(url) for url in urls],
return_exceptions=True
)
for result in results:
if result["status"] == "success":
print(f"✓ {result['url']}: {result['data']}")
else:
print(f"✗ {result['url']}: {result['error']}")
asyncio.run(main_timeout())
asyncio.as_completed() 提供了另一种任务管理方式,它会按照任务完成的顺序产生结果,而不是按照提交的顺序。这在需要尽快处理完成的任务时非常有用:
import asyncio
import time
async def process_task(name, duration):
"""模拟处理任务"""
await asyncio.sleep(duration)
return {"name": name, "duration": duration, "completed_at": time.time()}
async def main_as_completed():
"""使用 as_completed 按完成顺序处理结果"""
tasks = [
process_task("快速任务", 1),
process_task("中速任务", 2),
process_task("慢速任务", 3),
process_task("极快任务", 0.5)
]
start = time.time()
completed_count = 0
# as_completed 按实际完成顺序产生结果
for coro in asyncio.as_completed(tasks):
result = await coro
completed_count += 1
elapsed = time.time() - start
print(f"完成 #{completed_count}: {result['name']} "
f"(耗时: {result['duration']}s, 实际用时: {elapsed:.2f}s)")
asyncio.run(main_as_completed())
异步爬虫是异步编程的经典应用场景。下面的示例展示了如何使用异步方式并发爬取多个网页:
import asyncio
import time
from typing import AsyncGenerator, List
class AsyncWebScraper:
"""异步网页爬虫"""
def __init__(self, max_concurrent: int = 5):
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
self.session = None
async def fetch_page(self, url: str) -> dict:
"""异步获取单个页面"""
async with self.semaphore:
# 模拟网络请求延迟
delay = 0.5 + len(url) % 3 * 0.3
await asyncio.sleep(delay)
# 模拟页面内容
return {
"url": url,
"status": 200,
"content": f"页面 {url} 的内容",
"size": 1024 + len(url) * 100
}
async def crawl_urls(self, urls: List[str]) -> AsyncGenerator[dict, None]:
"""并发爬取多个 URL"""
tasks = [self.fetch_page(url) for url in urls]
for task in asyncio.as_completed(tasks):
yield await task
async def crawl_all(self, urls: List[str]) -> List[dict]:
"""爬取所有 URL 并返回结果列表"""
results = []
async for page in self.crawl_urls(urls):
results.append(page)
print(f"✓ 已爬取: {page['url']} ({page['size']} bytes)")
return results
async def main_scraper():
"""测试异步爬虫"""
urls = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3",
"https://example.com/page4",
"https://example.com/page5",
"https://example.com/page6",
"https://example.com/page7",
"https://example.com/page8"
]
scraper = AsyncWebScraper(max_concurrent=3)
start = time.time()
results = await scraper.crawl_all(urls)
total_size = sum(page["size"] for page in results)
elapsed = time.time() - start
print(f"\n爬取完成!")
print(f"总页面数: {len(results)}")
print(f"总数据量: {total_size} bytes")
print(f"总耗时: {elapsed:.2f} 秒")
print(f"平均速度: {total_size / elapsed:.0f} bytes/s")
asyncio.run(main_scraper())
异步数据库操作可以显著提升数据密集型应用的性能。下面的示例模拟了异步数据库查询的场景:
import asyncio
import time
from dataclasses import dataclass
from typing import Optional, List
@dataclass
class User:
"""用户数据模型"""
id: int
name: str
email: str
age: int
class AsyncDatabase:
"""模拟异步数据库"""
def __init__(self):
self._users = {
1: User(1, "张三", "zhangsan@example.com", 25),
2: User(2, "李四", "lisi@example.com", 30),
3: User(3, "王五", "wangwu@example.com", 28),
4: User(4, "赵六", "zhaoliu@example.com", 35),
5: User(5, "孙七", "sunqi@example.com", 22)
}
async def get_user_by_id(self, user_id: int) -> Optional[User]:
"""异步获取用户信息"""
# 模拟数据库查询延迟
await asyncio.sleep(0.3)
return self._users.get(user_id)
async def get_user_by_name(self, name: str) -> Optional[User]:
"""异步根据姓名查询用户"""
await asyncio.sleep(0.25)
for user in self._users.values():
if user.name == name:
return user
return None
async def get_all_users(self) -> List[User]:
"""异步获取所有用户"""
await asyncio.sleep(0.5)
return list(self._users.values())
async def update_user_age(self, user_id: int, new_age: int) -> bool:
"""异步更新用户年龄"""
await asyncio.sleep(0.2)
if user_id in self._users:
self._users[user_id].age = new_age
return True
return False
async def demo_database_queries():
"""演示异步数据库查询"""
db = AsyncDatabase()
print("=== 并发查询多个用户 ===")
start = time.time()
# 并发查询多个用户
results = await asyncio.gather(
db.get_user_by_id(1),
db.get_user_by_id(3),
db.get_user_by_id(5)
)
for user in results:
if user:
print(f"ID: {user.id}, 姓名: {user.name}, 年龄: {user.age}")
print(f"查询耗时: {time.time() - start:.2f} 秒")
print("\n=== 批量更新用户年龄 ===")
update_tasks = [
db.update_user_age(1, 26),
db.update_user_age(3, 29),
db.update_user_age(5, 23)
]
await asyncio.gather(*update_tasks)
print("批量更新完成!")
# 验证更新结果
updated_users = await asyncio.gather(
db.get_user_by_id(1),
db.get_user_by_id(3),
db.get_user_by_id(5)
)
print("更新后的用户信息:")
for user in updated_users:
if user:
print(f" {user.name}: {user.age} 岁")
asyncio.run(demo_database_queries())
最后,我们来实现一个同步和异步的性能对比,直观展示异步编程在 I/O 密集型任务中的优势:
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def sync_io_task(name: str, duration: float) -> str:
"""同步 I/O 任务"""
print(f"[同步] {name} 开始执行")
time.sleep(duration)
print(f"[同步] {name} 完成执行")
return f"{name} 的结果"
async def async_io_task(name: str, duration: float) -> str:
"""异步 I/O 任务"""
print(f"[异步] {name} 开始执行")
await asyncio.sleep(duration)
print(f"[异步] {name} 完成执行")
return f"{name} 的结果"
def run_sync_tasks():
"""顺序执行同步任务"""
print("\n=== 同步版本 ===")
start = time.time()
results = [
sync_io_task(f"任务{i}", 1 + i * 0.1)
for i in range(1, 6)
]
elapsed = time.time() - start
print(f"同步版本总耗时: {elapsed:.2f} 秒")
return elapsed
async def run_async_tasks():
"""并发执行异步任务"""
print("\n=== 异步版本 ===")
start = time.time()
results = await asyncio.gather(
async_io_task(f"任务{i}", 1 + i * 0.1)
for i in range(1, 6)
)
elapsed = time.time() - start
print(f"异步版本总耗时: {elapsed:.2f} 秒")
return elapsed
def run_threaded_tasks():
"""线程池执行任务"""
print("\n=== 线程池版本 ===")
start = time.time()
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [
executor.submit(sync_io_task, f"任务{i}", 1 + i * 0.1)
for i in range(1, 6)
]
results = [f.result() for f in futures]
elapsed = time.time() - start
print(f"线程池版本总耗时: {elapsed:.2f} 秒")
return elapsed
async def main_performance_compare():
"""性能对比主函数"""
print("性能对比:同步 vs 异步 vs 线程池")
print("=" * 40)
# 运行同步版本
sync_time = run_sync_tasks()
# 运行异步版本
async_time = await run_async_tasks()
# 运行线程池版本
threaded_time = run_threaded_tasks()
# 显示对比结果
print("\n=== 性能对比结果 ===")
print(f"同步版本: {sync_time:.2f} 秒 (基准)")
print(f"异步版本: {async_time:.2f} 秒 "
f"(提升 {sync_time/async_time:.1f}x)")
print(f"线程池版本: {threaded_time:.2f} 秒 "
f"(提升 {sync_time/threaded_time:.1f}x)")
print("\n结论:")
print("- 异步版本在 I/O 密集型任务中性能最优")
print("- 线程池也能提升性能,但资源开销更大")
print("- 对于大量并发 I/O 操作,协程是最佳选择")
asyncio.run(main_performance_compare())
通过上面的代码示例,我们可以看到 async/await 异步编程的强大之处。协程让我们能够用同步风格的代码写出高性能的异步应用,特别适合处理网络请求、数据库操作、文件读写等 I/O 密集型任务。掌握异步编程将大大提升 Python 在高并发场景下的应用能力。
总结一下,学习异步编程的关键点包括:理解协程的挂起和恢复机制,正确使用 async/await 语法,熟练运用 asyncio.gather 等并发工具,注意避免阻塞操作和正确处理异常。随着对异步编程的深入理解,你将能够构建出高效、可扩展的 Python 应用程序。
