Python 异步编程实战:从零构建高性能 Web 爬虫
一、为什么需要异步编程?
在构建 Web 爬虫时,同步代码会面临一个严重的性能瓶颈。当我们用传统的 requests 库发送 HTTP 请求时,程序必须等待服务器响应后才能继续执行下一个请求。如果我们要爬取 100 个页面,而每个请求平均耗时 1 秒,那么总耗时就是 100 秒。
异步编程的核心思想是利用等待时间做其他事情。当我们发送 HTTP 请求后,不需要一直等待响应,而是可以去处理其他请求。当响应到达时,再回来继续处理。这样可以同时处理多个请求,大幅提升效率。
import time
import asyncio
import aiohttp
# 同步方式
async def fetch_sync(session, url):
# 模拟同步请求
await asyncio.sleep(1) # 模拟 I/O 等待
return f"数据: {url}"
# 异步方式
async def fetch_async(session, url):
async with session.get(url) as response:
return await response.text()
二、async/await 基础语法
Python 3.5 引入了 async/await 语法,让异步代码看起来像同步代码一样直观。
关键概念:
async def:定义协程函数,调用它不会立即执行,而是返回一个协程对象await:暂停当前协程,等待异步操作完成asyncio.run():运行协程的入口点asyncio.gather():并发运行多个协程
import asyncio
async def say_hello(name):
print(f"开始问候: {name}")
await asyncio.sleep(1) # 模拟耗时操作
print(f"完成问候: {name}")
return f"Hello, {name}"
async def main():
# 串行执行
result1 = await say_hello("Alice")
result2 = await say_hello("Bob")
# 并发执行
results = await asyncio.gather(
say_hello("Alice"),
say_hello("Bob"),
say_hello("Charlie")
)
print(results)
# 运行协程
asyncio.run(main())
三、构建异步 HTTP 客户端
aiohttp 是 Python 异步 HTTP 客户端的标准库,它基于 asyncio 构建,性能出色。
import aiohttp
import asyncio
class AsyncCrawler:
def __init__(self, max_concurrent=10):
self.max_concurrent = max_concurrent
self.session = None
async def __aenter__(self):
# 创建会话,支持连接池
connector = aiohttp.TCPConnector(limit=self.max_concurrent)
self.session = aiohttp.ClientSession(connector=connector)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
# 关闭会话
await self.session.close()
async def fetch(self, url, headers=None, timeout=30):
"""获取单个 URL 的内容"""
try:
async with self.session.get(
url,
headers=headers,
timeout=aiohttp.ClientTimeout(total=timeout)
) as response:
if response.status == 200:
return {
url: url,
status: response.status,
content: await response.text(),
size: len(await response.text())
}
else:
return {
url: url,
status: response.status,
error: f"HTTP {response.status}"
}
except Exception as e:
return {
url: url,
status: 0,
error: str(e)
}
async def fetch_all(self, urls, headers=None):
"""并发获取多个 URL"""
tasks = [self.fetch(url, headers) for url in urls]
return await asyncio.gather(*tasks)
# 使用示例
async def demo():
urls = [
https://httpbin.org/delay/1,
https://httpbin.org/delay/2,
https://httpbin.org/delay/1,
]
async with AsyncCrawler(max_concurrent=5) as crawler:
results = await crawler.fetch_all(urls)
for result in results:
print(f"{result[url]}: {result[status]}")
asyncio.run(demo())
四、实战:高性能新闻爬虫
现在我们来构建一个完整的新闻爬虫,展示异步编程的威力。
import aiohttp
import asyncio
import json
from datetime import datetime
from typing import List, Dict
class NewsCrawler:
def __init__(self, max_concurrent=20, request_delay=0.1):
self.max_concurrent = max_concurrent
self.request_delay = request_delay
self.semaphore = asyncio.Semaphore(max_concurrent)
self.session = None
self.results = []
async def __aenter__(self):
timeout = aiohttp.ClientTimeout(total=30)
connector = aiohttp.TCPConnector(limit=self.max_concurrent)
self.session = aiohttp.ClientSession(
timeout=timeout,
connector=connector
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.close()
async def fetch_with_semaphore(self, url: str, headers: dict = None):
"""使用信号量控制并发"""
async with self.semaphore:
await asyncio.sleep(self.request_delay)
return await self.fetch(url, headers)
async def fetch(self, url: str, headers: dict = None) -> Dict:
"""获取页面内容"""
try:
async with self.session.get(url, headers=headers) as response:
if response.status == 200:
text = await response.text()
return {
url: url,
status: response.status,
content: text,
size: len(text),
timestamp: datetime.now().isoformat()
}
return {
url: url,
status: response.status,
error: f"HTTP {response.status}"
}
except Exception as e:
return {
url: url,
status: 0,
error: str(e)
}
def parse_news(self, result: Dict) -> Dict:
"""解析新闻内容(示例)"""
if result.get(status) != 200:
return result
content = result[content]
# 这里可以根据实际网站结构进行解析
# 示例:提取标题和部分内容
news_item = {
url: result[url],
title: f"新闻标题 - {len(content)} 字符",
summary: content[:200] if len(content) > 200 else content,
timestamp: result[timestamp]
}
return news_item
async def crawl(self, urls: List[str], headers: dict = None) -> List[Dict]:
"""爬取多个页面"""
tasks = [self.fetch_with_semaphore(url, headers) for url in urls]
raw_results = await asyncio.gather(*tasks)
# 解析结果
self.results = [self.parse_news(result) for result in raw_results]
return self.results
def save_results(self, filename: str = news_results.json):
"""保存结果到文件"""
with open(filename, w, encoding=utf-8) as f:
json.dump(self.results, f, ensure_ascii=False, indent=2)
print(f"结果已保存到 {filename}")
# 性能对比测试
async def performance_test():
test_urls = [
https://httpbin.org/delay/1 for _ in range(10)
]
# 异步爬取
start_time = datetime.now()
async with NewsCrawler(max_concurrent=10) as crawler:
async_results = await crawler.crawl(test_urls)
async_time = (datetime.now() - start_time).total_seconds()
print(f"异步爬取 {len(test_urls)} 个页面耗时: {async_time:.2f} 秒")
print(f"平均每个请求: {async_time/len(test_urls):.2f} 秒")
# 异步结果通常比同步快 5-10 倍
# 同步方式需要 10 秒(10 个请求 × 1 秒)
# 异步方式只需要约 1-2 秒(并发处理)
if __name__ == __main__:
asyncio.run(performance_test())
五、性能优化技巧
要让异步爬虫达到最佳性能,需要注意以下几点:
1. 合理设置并发数
并发数不是越大越好。过高的并发数会导致:
- 服务器拒绝连接(429 错误)
- 本地资源耗尽
- 被反爬虫系统识别
建议从 10-20 开始,根据目标网站调整。
2. 使用连接池
aiohttp 的 ClientSession 内置连接池,复用 TCP 连接可以减少握手开销:
connector = aiohttp.TCPConnector(
limit=100, # 总连接数限制
limit_per_host=10, # 每个主机的连接数
keepalive_timeout=30
)
3. 添加请求延迟
await asyncio.sleep(0.1) # 每次请求间隔 0.1 秒
4. 错误处理和重试机制
async def fetch_with_retry(self, url, max_retries=3):
for attempt in range(max_retries):
result = await self.fetch(url)
if result[status] == 200:
return result
await asyncio.sleep(2 ** attempt) # 指数退避
return result
六、完整项目示例
下面是一个可直接使用的完整爬虫项目:
import aiohttp
import asyncio
import json
from datetime import datetime
from pathlib import Path
class AsyncWebCrawler:
"""异步 Web 爬虫"""
def __init__(
self,
max_concurrent: int = 20,
timeout: int = 30,
request_delay: float = 0.1
):
self.max_concurrent = max_concurrent
self.timeout = timeout
self.request_delay = request_delay
self.semaphore = asyncio.Semaphore(max_concurrent)
self.session = None
self.stats = {
total: 0,
success: 0,
failed: 0,
start_time: None,
end_time: None
}
async def __aenter__(self):
connector = aiohttp.TCPConnector(limit=self.max_concurrent)
timeout = aiohttp.ClientTimeout(total=self.timeout)
self.session = aiohttp.ClientSession(
connector=connector,
timeout=timeout
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.close()
async def fetch_one(self, url: str, headers: dict = None) -> dict:
"""获取单个 URL"""
async with self.semaphore:
await asyncio.sleep(self.request_delay)
try:
async with self.session.get(url, headers=headers) as resp:
content = await resp.text()
self.stats[success] = 1
return {
url: url,
status: resp.status,
content: content,
size: len(content)
}
except Exception as e:
self.stats[failed] = 1
return {
url: url,
status: 0,
error: str(e)
}
async def crawl(self, urls: list, headers: dict = None) -> list:
"""爬取多个 URL"""
self.stats[start_time] = datetime.now()
self.stats[total] = len(urls)
tasks = [self.fetch_one(url, headers) for url in urls]
results = await asyncio.gather(*tasks)
self.stats[end_time] = datetime.now()
return results
def get_stats(self) -> dict:
"""获取统计信息"""
if self.stats[start_time] and self.stats[end_time]:
duration = (self.stats[end_time] - self.stats[start_time]).total_seconds()
self.stats[duration] = f"{duration:.2f}s"
self.stats[avg_time] = f"{duration/self.stats[total]:.2f}s"
return self.stats
def save_results(self, results: list, filename: str = results.json):
"""保存结果"""
Path(filename).write_text(
json.dumps(results, ensure_ascii=False, indent=2),
encoding=utf-8
)
print(f"✅ 结果已保存: {filename}")
# 使用示例
async def main():
urls = [
https://httpbin.org/get,
https://httpbin.org/uuid,
https://httpbin.org/headers,
]
async with AsyncWebCrawler(max_concurrent=10) as crawler:
results = await crawler.crawl(urls)
crawler.save_results(results)
stats = crawler.get_stats()
print(f"\n📊 统计信息:")
print(f"总请求数: {stats[total]}")
print(f"成功: {stats[success]}")
print(f"失败: {stats[failed]}")
print(f"总耗时: {stats.get(duration, N/A)}")
print(f"平均耗时: {stats.get(avg_time, N/A)}")
if __name__ == __main__:
asyncio.run(main())
七、总结
Python 异步编程通过 async/await 语法和 asyncio 库,让我们能够编写高性能的并发代码。在 Web 爬虫场景中,异步编程可以实现:
- 10 倍以上的性能提升:并发处理多个请求
- 更好的资源利用:在等待 I/O 时做其他工作
- 简洁的代码:看起来像同步代码,易于理解和维护
关键要点:
- 使用
aiohttp替代requests - 合理设置并发数和请求延迟
- 使用信号量控制并发
- 添加错误处理和重试机制
- 保存统计信息,监控性能
异步编程是 Python 高级开发的必备技能,掌握它将让你的程序性能飞跃。开始尝试吧!
相关资源:
