Python 异步 IO:aiohttp 与 asyncio 结合
现代 Web 应用对并发处理能力的要求越来越高。传统的同步编程模型在处理大量 I/O 操作时,往往会让程序陷入"等待"的困境——CPU 明明在空转,却只能眼巴巴地等着网络请求返回结果。异步 IO 正是为了解决这一痛点而生,它允许程序在等待 I/O 完成期间去处理其他任务,从而大幅提升资源利用率和吞吐量。
Python 的 asyncio 模块提供了异步编程的基础框架,而 aiohttp 则是基于此框架构建的异步 HTTP 客户端/服务器库。二者结合,能够轻松实现高并发的网络通信。本文将手把手带你掌握这一强大组合的用法。
一、异步 IO 的核心概念
在深入代码之前,有三个关键概念必须理解清楚。它们是异步编程的地基,地基不稳,后续学习会非常吃力。
事件循环是异步程序的"总调度员"。它负责监控所有异步任务的执行状态,当某个任务因 I/O 而阻塞时,事件循环会立即切换到其他就绪的任务,确保 CPU 始终有活干。你可以把事件循环想象成一个永不停歇的循环:while True: 监控任务状态 -> 执行已就绪的任务。
协程是异步编程的基本执行单元。它比普通函数更加"智能"——可以在执行过程中主动暂停(yield),让出控制权给其他协程,待条件满足时再恢复执行。使用 async def 定义的函数就是协程,调用它会返回一个协程对象,不会立即执行。
任务是协程的"包装盒"。把协程包装成任务后,事件循环才能调度它执行。任务负责追踪协程的状态,并在适当时机启动、暂停或恢复协程。
import asyncio
# 定义一个协程
async def fetch_data():
print("开始获取数据...")
await asyncio.sleep(2) # 模拟耗时 I/O 操作
print("数据获取完成")
return {"data": 123}
# 主函数
async def main():
print("创建任务...")
# 将协程包装成任务
task = asyncio.create_task(fetch_data())
print("任务已创建,等待完成...")
# 等待任务完成并获取返回值
result = await task
print(f"收到结果: {result}")
# 启动事件循环
asyncio.run(main())
运行这段代码,你会发现程序在 await asyncio.sleep(2) 处"卡住"了 2 秒后继续执行。这就是异步的魅力——虽然代码看起来是顺序执行的,但实际上在此期间,事件循环可以调度其他协程运行。如果你同时创建多个这样的任务,它们会被并发执行,总耗时不是单个耗时的累加,而是取决于最慢的那个任务。
二、aiohttp 快速上手
aiohttp 分为两大部分:异步客户端(ClientSession)和异步服务器(web)。我们先从客户端学起,因为它最常用也最简单。
2.1 异步 HTTP 客户端
传统的 requests 库是同步的,发出请求后会阻塞直到收到响应。aiohttp 的客户端则完全不同,它使用 async/await 语法,让你在等待响应期间去处理其他事情。
import aiohttp
import asyncio
async def fetch_url(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
# 等待响应体下载
text = await response.text()
print(f"从 {url} 获取了 {len(text)} 字符")
return text
async def main():
urls = [
"https://httpbin.org/get",
"https://httpbin.org/ip",
"https://httpbin.org/headers"
]
# 并发请求所有 URL
tasks = [fetch_url(url) for url in urls]
results = await asyncio.gather(*tasks)
print(f"成功获取 {len(results)} 个响应")
asyncio.run(main())
这段代码同时向三个 URL 发起请求。使用 asyncio.gather() 将多个协程打包执行,它们会真正地并发运行。如果三个请求各需 1 秒完成,传统同步方式需要 3 秒,而异步方式理论上只需要 1 秒左右。
关键点解析:
async with 语句用于管理上下文资源。对于 ClientSession,它确保连接被正确复用和关闭。对于 session.get(),它确保响应体被完全读取后释放连接。这些细节在编写生产级代码时非常重要——连接泄漏会导致资源耗尽。
2.2 异步 Web 服务器
aiohttp 不仅能发请求,还能收请求。创建一个异步 HTTP 服务器只需要几行代码:
from aiohttp import web
import asyncio
async def handle_request(request):
# 从 URL 路径中提取参数
name = request.match_info.get('name', 'World')
return web.json_response({
"message": f"Hello, {name}!",
"method": request.method
})
async def health_check(request):
return web.Response(text="OK")
# 创建应用并注册路由
app = web.Application()
app.router.add_get('/hello/{name}', handle_request)
app.router.add_get('/health', health_check)
# 启动服务器
web.run_app(app, host='0.0.0.0', port=8080)
运行这段代码后,访问 http://localhost:8080/hello/YourName 会返回 JSON 格式的问候语,访问 /health 则返回健康检查响应。相比 Flask 或 Django,aiohttp 的服务器更加轻量,启动更快,非常适合微服务架构。
三、实战:并发爬虫
现在我们来做一个完整的实战项目:一个并发网页爬虫。这个爬虫会从多个新闻网站抓取标题,并发处理所有请求。
import aiohttp
import asyncio
from typing import List, Dict
import re
class NewsCrawler:
def __init__(self, urls: List[str]):
self.urls = urls
self.session = None
async def __aenter__(self):
# 创建全局复用的连接会话
connector = aiohttp.TCPConnector(limit=10, limit_per_host=3)
self.session = aiohttp.ClientSession(connector=connector)
return self
async def __aexit__(self, *args):
# 确保会话正确关闭
await self.session.close()
async def fetch_title(self, url: str) -> Dict[str, str]:
"""获取单个页面的标题"""
try:
async with self.session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
if resp.status != 200:
return {"url": url, "title": f"HTTP {resp.status}"}
html = await resp.text()
# 使用正则提取 <title> 标签内容
match = re.search(r'<title>([^<]+)</title>', html, re.IGNORECASE)
title = match.group(1).strip() if match else "无标题"
return {"url": url, "title": title}
except asyncio.TimeoutError:
return {"url": url, "title": "超时"}
except Exception as e:
return {"url": url, "title": f"错误: {type(e).__name__}"}
async def crawl_all(self) -> List[Dict[str, str]]:
"""并发抓取所有 URL"""
tasks = [self.fetch_title(url) for url in self.urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def main():
target_urls = [
"https://news.ycombinator.com/",
"https://www.reddit.com/",
"https://www.producthunt.com/",
"https://news.sina.com.cn/",
"https://www.bbc.com/news"
]
print("开始抓取新闻网站标题...")
async with NewsCrawler(target_urls) as crawler:
results = await crawler.crawl_all()
print("\n抓取结果:")
for r in results:
print(f" - [{r['url'][:40]}...] -> {r['title']}")
if __name__ == "__main__":
asyncio.run(main())
这个爬虫有几个值得学习的细节:
TCPConnector 配置了连接池大小。limit=10 限制总并发连接数不超过 10,limit_per_host=3 限制对同一主机的并发不超过 3。这些限制是为了避免对目标服务器造成过大压力,也是基本的网络礼仪。
return_exceptions=True 让 gather 在某个任务出错时不会整体崩溃,而是将异常作为返回值捕获。这样即使某个网站挂掉了,其他网站的抓取仍能正常完成。
四、高级用法与最佳实践
掌握了基础用法后,以下高级技巧能帮助你写出更健壮的异步代码。
4.1 信号量控制并发
有时你需要严格控制并发数量,比如避免触发目标网站的防爬机制。asyncio.Semaphore 是完美的解决方案:
import asyncio
import aiohttp
async def limited_fetch(session, url, semaphore):
async with semaphore:
# 无论 url 有多少,这里同时最多执行 5 个
async with session.get(url) as resp:
return await resp.text()
async def main():
semaphore = asyncio.Semaphore(5) # 最多 5 个并发
async with aiohttp.ClientSession() as session:
tasks = [limited_fetch(session, url, semaphore) for url in urls]
await asyncio.gather(*tasks)
4.2 批量任务与进度追踪
当需要处理成百上千个任务时,实时显示进度能让你对程序状态有清晰的了解:
import asyncio
from tqdm import tqdm # 需要安装: pip install tqdm
async def process_item(item):
await asyncio.sleep(0.1) # 模拟处理
return item * 2
async def main():
items = list(range(100))
# 创建任务列表
tasks = [process_item(item) for item in items]
# 使用 tqdm 显示进度条
for future in asyncio.as_completed(tasks):
result = await future
# 这里可以更新进度条
4.3 错误处理策略
异步代码的错误处理比同步代码更复杂,因为错误可能发生在任何一个 await 点。以下是推荐的错误处理模式:
async def robust_fetch(url):
"""带重试机制的请求函数"""
max_retries = 3
retry_delay = 1
for attempt in range(max_retries):
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
if resp.status == 429: # Too Many Requests
# 触发速率限制,等待后重试
await asyncio.sleep(retry_delay * (attempt + 1))
continue
resp.raise_for_status()
return await resp.json()
except aiohttp.ClientError as e:
if attempt == max_retries - 1:
# 最后一次尝试仍然失败,记录错误并返回默认值
return {"error": str(e)}
await asyncio.sleep(retry_delay)
五、性能调优要点
异步代码虽然高效,但错误的用法反而可能不如同步代码。以下是经过验证的调优建议:
连接复用是第一要务。每次创建 ClientSession 都有开销,应该在程序生命周期内复用同一个会话。上文的 NewsCrawler 类展示了正确的做法——使用上下文管理器管理会话生命周期。
超时设置必须合理。没有设置超时的网络请求可能永久挂起,拖慢整个程序。务必为所有 I/O 操作设置合理的超时时间。
避免在协程中执行 CPU 密集型任务。异步擅长的是 I/O 等待,而非计算。如果需要进行大量计算,应该使用 asyncio.to_thread() 将任务转移到线程池,或者使用 concurrent.futures.ProcessPoolExecutor 利用多进程。
六、常见问题排查
代码跑通了但性能不达标?以下是几个常见的原因。
使用了阻塞式 API。 time.sleep() 会阻塞整个线程,导致事件循环无法调度其他任务。必须使用 await asyncio.sleep() 代替。第三方库如 requests、pymysql 都是同步的,会阻塞事件循环,应该用它们的异步替代品(如 aiohttp、aiomysql)。
协程没有被真正并发执行。 如果你在 await A() 之后才创建 B() 的任务,它们就会串行执行。正确的做法是先把所有任务创建出来,再统一 await。
# 错误:串行执行
await fetch(url1)
await fetch(url2)
# 正确:并发执行
t1 = asyncio.create_task(fetch(url1))
t2 = asyncio.create_task(fetch(url2))
await t1
await t2
# 或者更简洁
await asyncio.gather(fetch(url1), fetch(url2))
七、完整项目架构示例
最后展示一个更完整的项目结构,它整合了本文介绍的所有技术点:
import aiohttp
import asyncio
from dataclasses import dataclass
from typing import Optional
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class FetchResult:
url: str
status: int
content: Optional[str] = None
error: Optional[str] = None
class AsyncFetcher:
def __init__(self, max_concurrent: int = 10):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
self.session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=15)
)
return self
async def __aexit__(self, *args):
await self.session.close()
async def fetch(self, url: str) -> FetchResult:
async with self.semaphore:
try:
async with self.session.get(url) as resp:
return FetchResult(
url=url,
status=resp.status,
content=await resp.text()[:1000] # 截取部分内容
)
except Exception as e:
logger.error(f"获取 {url} 失败: {e}")
return FetchResult(url=url, status=0, error=str(e))
async def fetch_all(self, urls: list) -> list:
tasks = [self.fetch(url) for url in urls]
return await asyncio.gather(*tasks)
async def main():
urls = [
"https://httpbin.org/get",
"https://httpbin.org/delay/2",
"https://httpbin.org/status/500",
"https://httpbin.org/html",
]
async with AsyncFetcher(max_concurrent=3) as fetcher:
results = await fetcher.fetch_all(urls)
for r in results:
status_mark = "✓" if r.status == 200 else "✗"
print(f"{status_mark} [{r.status:3}] {r.url}")
if __name__ == "__main__":
asyncio.run(main())
这个架构清晰地分离了关注点:FetchResult 定义了统一的数据结构,AsyncFetcher 封装了所有异步逻辑,main 函数负责组装和执行。实际项目中,你可以在此基础上添加重试、缓存、日志等功能。

暂无评论,快来抢沙发吧!