当前位置:首页 > Python > 正文内容

Python 异步编程实战:从零构建高性能 Web 爬虫

admin6小时前Python4

一、为什么需要异步编程?

在构建 Web 爬虫时,同步代码会面临一个严重的性能瓶颈。当我们用传统的 requests 库发送 HTTP 请求时,程序必须等待服务器响应后才能继续执行下一个请求。如果我们要爬取 100 个页面,而每个请求平均耗时 1 秒,那么总耗时就是 100 秒。

异步编程的核心思想是利用等待时间做其他事情。当我们发送 HTTP 请求后,不需要一直等待响应,而是可以去处理其他请求。当响应到达时,再回来继续处理。这样可以同时处理多个请求,大幅提升效率。

import time
import asyncio
import aiohttp

# 同步方式
async def fetch_sync(session, url):
    # 模拟同步请求
    await asyncio.sleep(1)  # 模拟 I/O 等待
    return f"数据: {url}"

# 异步方式  
async def fetch_async(session, url):
    async with session.get(url) as response:
        return await response.text()

二、async/await 基础语法

Python 3.5 引入了 async/await 语法,让异步代码看起来像同步代码一样直观。

关键概念:

  • async def:定义协程函数,调用它不会立即执行,而是返回一个协程对象
  • await:暂停当前协程,等待异步操作完成
  • asyncio.run():运行协程的入口点
  • asyncio.gather():并发运行多个协程
import asyncio

async def say_hello(name):
    print(f"开始问候: {name}")
    await asyncio.sleep(1)  # 模拟耗时操作
    print(f"完成问候: {name}")
    return f"Hello, {name}"

async def main():
    # 串行执行
    result1 = await say_hello("Alice")
    result2 = await say_hello("Bob")
    
    # 并发执行
    results = await asyncio.gather(
        say_hello("Alice"),
        say_hello("Bob"),
        say_hello("Charlie")
    )
    print(results)

# 运行协程
asyncio.run(main())

三、构建异步 HTTP 客户端

aiohttp 是 Python 异步 HTTP 客户端的标准库,它基于 asyncio 构建,性能出色。

import aiohttp
import asyncio

class AsyncCrawler:
    def __init__(self, max_concurrent=10):
        self.max_concurrent = max_concurrent
        self.session = None
        
    async def __aenter__(self):
        # 创建会话,支持连接池
        connector = aiohttp.TCPConnector(limit=self.max_concurrent)
        self.session = aiohttp.ClientSession(connector=connector)
        return self
        
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # 关闭会话
        await self.session.close()
        
    async def fetch(self, url, headers=None, timeout=30):
        """获取单个 URL 的内容"""
        try:
            async with self.session.get(
                url, 
                headers=headers,
                timeout=aiohttp.ClientTimeout(total=timeout)
            ) as response:
                if response.status == 200:
                    return {
                        url: url,
                        status: response.status,
                        content: await response.text(),
                        size: len(await response.text())
                    }
                else:
                    return {
                        url: url,
                        status: response.status,
                        error: f"HTTP {response.status}"
                    }
        except Exception as e:
            return {
                url: url,
                status: 0,
                error: str(e)
            }
    
    async def fetch_all(self, urls, headers=None):
        """并发获取多个 URL"""
        tasks = [self.fetch(url, headers) for url in urls]
        return await asyncio.gather(*tasks)

# 使用示例
async def demo():
    urls = [
        https://httpbin.org/delay/1,
        https://httpbin.org/delay/2,
        https://httpbin.org/delay/1,
    ]
    
    async with AsyncCrawler(max_concurrent=5) as crawler:
        results = await crawler.fetch_all(urls)
        for result in results:
            print(f"{result[url]}: {result[status]}")

asyncio.run(demo())

四、实战:高性能新闻爬虫

现在我们来构建一个完整的新闻爬虫,展示异步编程的威力。

import aiohttp
import asyncio
import json
from datetime import datetime
from typing import List, Dict

class NewsCrawler:
    def __init__(self, max_concurrent=20, request_delay=0.1):
        self.max_concurrent = max_concurrent
        self.request_delay = request_delay
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.session = None
        self.results = []
        
    async def __aenter__(self):
        timeout = aiohttp.ClientTimeout(total=30)
        connector = aiohttp.TCPConnector(limit=self.max_concurrent)
        self.session = aiohttp.ClientSession(
            timeout=timeout,
            connector=connector
        )
        return self
        
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.session.close()
        
    async def fetch_with_semaphore(self, url: str, headers: dict = None):
        """使用信号量控制并发"""
        async with self.semaphore:
            await asyncio.sleep(self.request_delay)
            return await self.fetch(url, headers)
    
    async def fetch(self, url: str, headers: dict = None) -> Dict:
        """获取页面内容"""
        try:
            async with self.session.get(url, headers=headers) as response:
                if response.status == 200:
                    text = await response.text()
                    return {
                        url: url,
                        status: response.status,
                        content: text,
                        size: len(text),
                        timestamp: datetime.now().isoformat()
                    }
                return {
                    url: url,
                    status: response.status,
                    error: f"HTTP {response.status}"
                }
        except Exception as e:
            return {
                url: url,
                status: 0,
                error: str(e)
            }
    
    def parse_news(self, result: Dict) -> Dict:
        """解析新闻内容(示例)"""
        if result.get(status) != 200:
            return result
            
        content = result[content]
        # 这里可以根据实际网站结构进行解析
        # 示例:提取标题和部分内容
        news_item = {
            url: result[url],
            title: f"新闻标题 - {len(content)} 字符",
            summary: content[:200] if len(content) > 200 else content,
            timestamp: result[timestamp]
        }
        return news_item
    
    async def crawl(self, urls: List[str], headers: dict = None) -> List[Dict]:
        """爬取多个页面"""
        tasks = [self.fetch_with_semaphore(url, headers) for url in urls]
        raw_results = await asyncio.gather(*tasks)
        
        # 解析结果
        self.results = [self.parse_news(result) for result in raw_results]
        return self.results
    
    def save_results(self, filename: str = news_results.json):
        """保存结果到文件"""
        with open(filename, w, encoding=utf-8) as f:
            json.dump(self.results, f, ensure_ascii=False, indent=2)
        print(f"结果已保存到 {filename}")

# 性能对比测试
async def performance_test():
    test_urls = [
        https://httpbin.org/delay/1 for _ in range(10)
    ]
    
    # 异步爬取
    start_time = datetime.now()
    async with NewsCrawler(max_concurrent=10) as crawler:
        async_results = await crawler.crawl(test_urls)
    async_time = (datetime.now() - start_time).total_seconds()
    
    print(f"异步爬取 {len(test_urls)} 个页面耗时: {async_time:.2f} 秒")
    print(f"平均每个请求: {async_time/len(test_urls):.2f} 秒")
    
    # 异步结果通常比同步快 5-10 倍
    # 同步方式需要 10 秒(10 个请求 × 1 秒)
    # 异步方式只需要约 1-2 秒(并发处理)

if __name__ == __main__:
    asyncio.run(performance_test())

五、性能优化技巧

要让异步爬虫达到最佳性能,需要注意以下几点:

1. 合理设置并发数

并发数不是越大越好。过高的并发数会导致:

  • 服务器拒绝连接(429 错误)
  • 本地资源耗尽
  • 被反爬虫系统识别

建议从 10-20 开始,根据目标网站调整。

2. 使用连接池

aiohttp 的 ClientSession 内置连接池,复用 TCP 连接可以减少握手开销:

connector = aiohttp.TCPConnector(
    limit=100,        # 总连接数限制
    limit_per_host=10,  # 每个主机的连接数
    keepalive_timeout=30
)

3. 添加请求延迟

await asyncio.sleep(0.1)  # 每次请求间隔 0.1 秒

4. 错误处理和重试机制

async def fetch_with_retry(self, url, max_retries=3):
    for attempt in range(max_retries):
        result = await self.fetch(url)
        if result[status] == 200:
            return result
        await asyncio.sleep(2 ** attempt)  # 指数退避
    return result

六、完整项目示例

下面是一个可直接使用的完整爬虫项目:

import aiohttp
import asyncio
import json
from datetime import datetime
from pathlib import Path

class AsyncWebCrawler:
    """异步 Web 爬虫"""
    
    def __init__(
        self,
        max_concurrent: int = 20,
        timeout: int = 30,
        request_delay: float = 0.1
    ):
        self.max_concurrent = max_concurrent
        self.timeout = timeout
        self.request_delay = request_delay
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.session = None
        self.stats = {
            total: 0,
            success: 0,
            failed: 0,
            start_time: None,
            end_time: None
        }
    
    async def __aenter__(self):
        connector = aiohttp.TCPConnector(limit=self.max_concurrent)
        timeout = aiohttp.ClientTimeout(total=self.timeout)
        self.session = aiohttp.ClientSession(
            connector=connector,
            timeout=timeout
        )
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.session.close()
    
    async def fetch_one(self, url: str, headers: dict = None) -> dict:
        """获取单个 URL"""
        async with self.semaphore:
            await asyncio.sleep(self.request_delay)
            try:
                async with self.session.get(url, headers=headers) as resp:
                    content = await resp.text()
                    self.stats[success]  = 1
                    return {
                        url: url,
                        status: resp.status,
                        content: content,
                        size: len(content)
                    }
            except Exception as e:
                self.stats[failed]  = 1
                return {
                    url: url,
                    status: 0,
                    error: str(e)
                }
    
    async def crawl(self, urls: list, headers: dict = None) -> list:
        """爬取多个 URL"""
        self.stats[start_time] = datetime.now()
        self.stats[total] = len(urls)
        
        tasks = [self.fetch_one(url, headers) for url in urls]
        results = await asyncio.gather(*tasks)
        
        self.stats[end_time] = datetime.now()
        return results
    
    def get_stats(self) -> dict:
        """获取统计信息"""
        if self.stats[start_time] and self.stats[end_time]:
            duration = (self.stats[end_time] - self.stats[start_time]).total_seconds()
            self.stats[duration] = f"{duration:.2f}s"
            self.stats[avg_time] = f"{duration/self.stats[total]:.2f}s"
        return self.stats
    
    def save_results(self, results: list, filename: str = results.json):
        """保存结果"""
        Path(filename).write_text(
            json.dumps(results, ensure_ascii=False, indent=2),
            encoding=utf-8
        )
        print(f"✅ 结果已保存: {filename}")

# 使用示例
async def main():
    urls = [
        https://httpbin.org/get,
        https://httpbin.org/uuid,
        https://httpbin.org/headers,
    ]
    
    async with AsyncWebCrawler(max_concurrent=10) as crawler:
        results = await crawler.crawl(urls)
        crawler.save_results(results)
        
        stats = crawler.get_stats()
        print(f"\n📊 统计信息:")
        print(f"总请求数: {stats[total]}")
        print(f"成功: {stats[success]}")
        print(f"失败: {stats[failed]}")
        print(f"总耗时: {stats.get(duration, N/A)}")
        print(f"平均耗时: {stats.get(avg_time, N/A)}")

if __name__ == __main__:
    asyncio.run(main())

七、总结

Python 异步编程通过 async/await 语法和 asyncio 库,让我们能够编写高性能的并发代码。在 Web 爬虫场景中,异步编程可以实现:

  • 10 倍以上的性能提升:并发处理多个请求
  • 更好的资源利用:在等待 I/O 时做其他工作
  • 简洁的代码:看起来像同步代码,易于理解和维护

关键要点:

  1. 使用 aiohttp 替代 requests
  2. 合理设置并发数和请求延迟
  3. 使用信号量控制并发
  4. 添加错误处理和重试机制
  5. 保存统计信息,监控性能

异步编程是 Python 高级开发的必备技能,掌握它将让你的程序性能飞跃。开始尝试吧!

相关资源:

相关文章

[Python 教程] OpenCV 实战:图像与视频文件处理

OpenCV 实战:图像与视频文件处理本文详细介绍如何使用 OpenCV 处理图像和视频文件,包括读取、显示、保存等操作。一、图像文件操作1.1 读取图像import cv2 #&nb...

[Python 教程] OpenCV 绘图教程:图形与文本标注

OpenCV 绘图教程:图形与文本标注本文介绍如何在 OpenCV 中绘制各种图形和添加文本,用于图像标注和可视化。一、绘制基本图形1.1 创建画布import cv2 import&nb...

[Python 教程] NumPy 数组操作详解

NumPy 数组操作详解 NumPy 是 Python 科学计算的基础库,提供高性能的多维数组对象。本文详细介绍 NumPy 数组的核心操作。 一、创建数组 import numpy as np...

[Python 教程] Matplotlib 数据可视化教程

Matplotlib 数据可视化教程 Matplotlib 是 Python 最常用的绘图库。本文介绍常用图表的绘制方法。 一、基础设置 import matplotlib.pyplot as pl...

[Python 教程] Python 网络请求与爬虫基础

Python 网络请求与爬虫基础 requests 是 Python 最常用的 HTTP 库。本文介绍网络请求和爬虫的基础知识。 一、基础请求 import requests # GET 请求 r...

Python 装饰器实用技巧:从入门到精通

装饰器是 Python 最强大的特性之一,但也是很多开发者感到困惑的概念。简单来说,装饰器是一个函数,它接受另一个函数作为输入,并返回一个新的函数。使用装饰器,你可以在不修改原函数代码的情况下,为其添...

发表评论

访客

看不清,换一张

◎欢迎参与讨论,请在这里发表您的看法和观点。