Administrator
Published on 2025-03-14 / 5 Visits
0
0

使用 asyncio 实现并发爬虫控制

以下是使用 asyncioaiohttp 实现并发爬虫并控制最大并发数的示例代码:

import asyncio
import aiohttp
from typing import List

async def fetch_url(url: str, session: aiohttp.ClientSession, semaphore: asyncio.Semaphore) -> str:
    """
    异步获取单个URL的内容,使用信号量控制并发
    """
    async with semaphore:  # 限制并发数
        try:
            async with session.get(url, timeout=10) as response:
                content = await response.text()
                return f"{url[:30]}...: {len(content)} bytes"
        except Exception as e:
            return f"Error fetching {url}: {str(e)}"

async def crawl(urls: List[str], max_concurrent: int) -> List[str]:
    """
    并发爬虫主函数
    """
    semaphore = asyncio.Semaphore(max_concurrent)
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(url, session, semaphore) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

if __name__ == "__main__":
    # 示例配置
    urls = [
        "https://www.example.com",
        "https://www.python.org",
        "https://www.google.com",
        "https://www.github.com",
        "https://www.stackoverflow.com",
    ] * 3  # 重复URL用于演示
    
    max_concurrent = 3  # 最大并发数
    
    # 运行爬虫
    results = asyncio.run(crawl(urls, max_concurrent))
    
    # 打印结果
    for idx, result in enumerate(results, 1):
        print(f"{idx}. {result}")

代码说明:

  1. 异步HTTP客户端:

    • 使用 aiohttp.ClientSession 创建HTTP客户端会话,相比为每个请求创建新会话更高效
    • 设置10秒超时防止长时间等待
  2. 并发控制:

    • 使用 asyncio.Semaphore 控制最大并发数
    • async with semaphore 上下文管理器确保同时运行的请求不超过设定值
  3. 任务管理:

    • 为每个URL创建异步任务
    • 使用 asyncio.gather 并行执行所有任务并收集结果
  4. 错误处理:

    • 捕获通用异常并返回错误信息
    • 实际应用中可根据需求添加重试逻辑或特定异常处理

扩展建议:

  1. 性能优化:

    # 在创建ClientSession时添加连接池限制
    connector = aiohttp.TCPConnector(limit=0)  # 0表示不限制(默认100)
    async with aiohttp.ClientSession(connector=connector) as session:
    
  2. 添加重试机制:

    from async_retrying import retry  # 需要安装 async_retrying
    
    @retry(attempts=3)
    async def fetch_url(url, session, semaphore):
        # ...
    
  3. 结果处理:

    # 在crawl函数中添加进度跟踪
    from tqdm.asyncio import tqdm_asyncio
    
    results = await tqdm_asyncio.gather(*tasks, desc="Crawling")
    
  4. 速率限制:

    # 添加请求间隔限制
    from aiolimiter import AsyncLimiter
    
    limiter = AsyncLimiter(100, 1)  # 每秒100个请求
    async with limiter:
        async with session.get(url) as response:
            # ...
    

该实现能够有效控制并发请求数量,同时保持高效的异步IO性能,适合中等规模的网页抓取任务。实际应用中应根据目标网站的robots.txt和服务条款合理设置并发参数。


Comment