Python异步编程实战:从概念到高阶应用
在 Python 开发中,异步编程已成为处理 I/O 密集型任务的重要方式。无论是网络请求、文件操作还是数据库访问,异步编程都能显著提升程序的并发性能。本文将带你从零开始,深入理解 Python 的 async/(await 机制,并通过实战代码掌握其核心用法。
一、理解异步编程的核心概念
传统的同步编程中,当程序执行 I/O 操作(如网络请求)时,整个线程会被阻塞,直到操作完成。而异步编程通过事件循环机制,允许程序在等待 I/O 操作时继续执行其他任务。
在 Python 中,async/await 语法是实现异步编程的关键:
- async def:定义协程函数,调用后返回协程对象而非直接执行
- await:暂停协程执行,等待异步操作完成
import asyncio
import time
async def simulate_io_task(task_name, delay):
print(f"开始执行 {task_name}")
await asyncio.sleep(delay) # 模拟 I/O 操作
print(f"完成 {task_name}")
return f"{task_name} 结果"
async def main():
start = time.time()
# 串行执行(同步风格)
await simulate_io_task("任务A", 2)
await simulate_io_task("任务B", 2)
print(f"串行执行耗时: {time.time() - start:.2f}秒")
start = time.time()
# 并发执行
task1 = simulate_io_task("任务A", 2)
task2 = simulate_io_task("任务B", 2)
await asyncio.gather(task1, task2)
print(f"并发执行耗时: {time.time() - start:.2f}秒")
# 运行异步程序
asyncio.run(main())
运行结果显示:串行执行耗时约 4 秒,而并发执行仅需约 2 秒。这正是异步编程的优势所在。
二、asyncio 的核心组件详解
1. 事件循环(Event Loop)
事件循环是异步编程的调度中心,负责管理和执行所有协程。asyncio.run() 函数会自动创建和关闭事件循环。
import asyncio
async def task1():
print("任务1执行中...")
await asyncio.sleep(1)
print("任务1完成")
async def task2():
print("任务2执行中...")
await asyncio.sleep(1)
print("任务2完成")
async def main():
# 创建任务对象(自动加入事件循环)
t1 = asyncio.create_task(task1())
t2 = asyncio.create_task(task2())
# 等待所有任务完成
await asyncio.gather(t1, t2)
# 获取任务结果
# 注意:这里的任务没有返回值,实际应用中可以获取返回值
asyncio.run(main())
2. 协程与任务的区别
- 协程:由 async def 定义的函数,调用后返回协程对象
- 任务:协程的包装器,可以立即调度执行
import asyncio
async def my_coroutine():
await asyncio.sleep(1)
return "完成"
async def main():
# 协程对象(未执行)
coro = my_coroutine()
print(f"协程类型: {type(coro)}")
# 创建任务(立即调度)
task = asyncio.create_task(coro)
print(f"任务类型: {type(task)}")
# 等待任务完成并获取结果
result = await task
print(f"任务结果: {result}")
asyncio.run(main())
三、实战案例:异步 HTTP 请求
异步编程最常见的场景是处理大量网络请求。下面实现一个异步的 HTTP 客户端:
import asyncio
import aiohttp # 需要安装: pip install aiohttp
import time
async def fetch_url(session, url):
try:
async with session.get(url) as response:
if response.status == 200:
text = await response.text()
return {"url": url, "status": response.status, "length": len(text)}
else:
return {"url": url, "status": response.status, "error": "HTTP 错误"}
except Exception as e:
return {"url": url, "error": str(e)}
async def fetch_all_urls(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果
successful = [r for r in results if isinstance(r, dict) and "error" not in r and r.get("status") == 200]
failed = [r for r in results if isinstance(r, dict) and ("error" in r or r.get("status") != 200)]
return {
"total": len(urls),
"successful": len(successful),
"failed": len(failed),
"results": results
}
async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/status/404",
"https://httpbin.org/delay/1",
]
start = time.time()
result = await fetch_all_urls(urls)
elapsed = time.time() - start
print(f"\n请求完成!耗时: {elapsed:.2f}秒")
print(f"总计: {result["total"]} | 成功: {result["successful"]} | 失败: {result["failed"]}")
for r in result["results"]:
if "error" in r:
print(f❌ 