Python 协程Asyncio处理高并发Websocket连接
WebSocket 是一种全双工通信协议,常用于实时聊天、股票报价和在线游戏等场景。当连接数从几十个增长到几万个时,传统的多线程模型会因为频繁的上下文切换导致系统资源耗尽。Python 的 asyncio 库利用协程实现单线程并发,能够以极低的内存开销处理成千上万个长连接。
理解核心原理
传统多线程模型像是一个拥有很多员工的餐厅,每个员工(线程)只服务一桌客人,客人不吃菜时员工也只能干等着。这导致人员成本(内存)极高,且人员调度(CPU切换)很耗时。
Asyncio 协程模型则像是一个拥有超能力的服务员(事件循环)。他只为正在点菜或上菜的客人服务,一旦客人开始自己吃饭(等待 I/O),服务员立刻转身去服务下一桌,直到上一桌客人呼叫(事件触发)再回去。这极大地节省了人力,提高了效率。
为了更直观地理解 asyncio 的事件循环处理机制,请看下面的流程图:
准备开发环境
确保你的系统中已安装 Python 3.7 或更高版本。
打开终端或命令行工具,输入以下命令 安装 websockets 库,这是 Python 中处理 WebSocket 最成熟的异步库之一:
pip install websockets
编写基础 WebSocket 服务端
创建一个名为 server.py 的文件。这个文件将运行在服务器上,负责监听端口、建立连接并收发消息。
编写以下代码,构建一个最简单的“回声”服务器(收到什么消息,就原样返回什么):
import asyncio
import websockets
# 定义异步处理函数,每个连接都会产生一个新的协程来运行此函数
async def handler(websocket):
print(f"客户端 {websocket.remote_address} 已连接")
try:
# 循环接收消息
async for message in websocket:
print(f"收到消息: {message}")
# 将消息原样发送回客户端
await websocket.send(f"Echo: {message}")
except websockets.exceptions.ConnectionClosed:
print(f"客户端 {websocket.remote_address} 断开连接")
async def main():
# 设置服务器监听地址和端口
# "0.0.0.0" 表示监听所有网络接口,8765 是端口号
async with websockets.serve(handler, "0.0.0.0", 8765):
print("WebSocket 服务已启动,监听端口 8765")
# 永久运行,直到程序被手动终止
await asyncio.Future()
if __name__ == "__main__":
# 运行主程序
asyncio.run(main())
保存文件。在终端中执行 python server.py。如果看到输出 WebSocket 服务已启动,监听端口 8765,说明服务端已准备就绪。
实现高并发广播功能
仅仅“回声”是不够的,在实际应用中,通常需要将一条消息发送给所有连接的客户端(例如聊天室)。我们需要维护一个“连接池”,在内存中记录所有活跃的连接对象。
新建一个名为 broadcast_server.py 的文件,输入以下代码:
import asyncio
import websockets
# 创建一个集合,用于存储所有活跃的连接对象
connected_users = set()
async def broadcast(message, sender_ws):
"""将消息发送给除发送者外的所有用户"""
if connected_users: # 确保集合不为空
# 列表推导式创建任务列表,实现并发发送而非串行
tasks = [
user_ws.send(message)
for user_ws in connected_users
if user_ws != sender_ws
]
if tasks:
# 并发执行所有发送任务,等待全部完成
await asyncio.gather(*tasks)
async def handler(websocket):
# 1. 注册:当新连接进来时,将其加入集合
connected_users.add(websocket)
user_addr = websocket.remote_address
print(f"用户 {user_addr} 加入,当前在线人数: {len(connected_users)}")
try:
# 发送欢迎消息给当前用户
await websocket.send(f"System: 欢迎!当前在线人数: {len(connected_users)}")
# 2. 监听消息
async for message in websocket:
print(f"{user_addr} 说: {message}")
# 3. 广播:将消息转发给其他人
await broadcast(f"User {user_addr}: {message}", websocket)
except websockets.exceptions.ConnectionClosed:
pass
finally:
# 4. 注销:无论正常断开还是异常断开,都要从集合中移除
connected_users.remove(websocket)
print(f"用户 {user_addr} 离开,当前在线人数: {len(connected_users)}")
async def main():
async with websockets.serve(handler, "0.0.0.0", 8765):
print("广播服务已启动,监听端口 8765")
await asyncio.Future()
if __name__ == "__main__":
asyncio.run(main())
注意代码中的 finally 块,这非常重要。它确保了即使客户端异常崩溃,服务端也能清理连接池,防止内存泄漏。
编写多连接测试脚本
为了验证服务端是否能处理高并发,我们需要编写一个客户端脚本,模拟成百上千个连接同时发送消息。
创建 stress_test_client.py,编写如下代码:
import asyncio
import websockets
# 模拟的并发连接数
CONCURRENT_CONNECTIONS = 100
# 每个连接发送的消息数
MESSAGES_PER_CONNECTION = 5
async def client_task(client_id):
uri = "ws://localhost:8765"
try:
# 尝试连接服务器
async with websockets.connect(uri) as websocket:
print(f"客户端 {client_id} 连接成功")
# 接收欢迎消息
await websocket.recv()
for i in range(MESSAGES_PER_CONNECTION):
msg = f"Client-{client_id}-Msg-{i}"
# 发送消息
await websocket.send(msg)
# 接收广播消息(可能会收到很多其他人的消息)
response = await websocket.recv()
# 这里我们只打印自己发的消息的回显,避免控制台刷屏
if f"Client-{client_id}" in response:
pass
print(f"客户端 {client_id} 完成测试")
except Exception as e:
print(f"客户端 {client_id} 出错: {e}")
async def main():
print(f"开始启动 {CONCURRENT_CONNECTIONS} 个并发客户端...")
# 创建任务列表
tasks = [client_task(i) for i in range(CONCURRENT_CONNECTIONS)]
# 并发启动所有客户端
await asyncio.gather(*tasks)
print("所有客户端测试完成")
if __name__ == "__main__":
asyncio.run(main())
先运行 python broadcast_server.py 启动服务端。
打开一个新的终端窗口,运行 python stress_test_client.py。
验证结果与性能分析
观察服务端的终端输出。如果一切正常,你应该会看到大量的“用户加入”日志瞬间刷屏,随后开始接收消息。
我们可以通过计算吞吐量来评估性能。假设我们用 $T$ 秒处理了 $N$ 个连接,每个连接发送了 $M$ 条消息。系统的总消息吞吐量 $Q$ 可以通过以下公式计算:
$$ Q = \frac{N \times M}{T} $$
如果你的测试参数是 100 个连接,每个发送 5 条消息,耗时 2 秒完成,那么吞吐量就是:
$$ Q = \frac{100 \times 5}{2} = 250 \text{ 条/秒} $$
在 asyncio 模型下,增加 $N$ 到 1000 甚至 5000,只要 CPU 足够强且带宽允许,$T$ 的增长通常是线性的,而不会像多线程模型那样因为线程切换开销导致性能指数级下降。
检查服务端代码中的 broadcast 函数。注意 asyncio.gather(*tasks) 这一行。如果没有使用 gather,而是使用 for 循环配合 await,那么服务端在向第一个用户发完消息前,不会向第二个用户发送。这将导致广播变成串行操作,极大地拖慢整体速度。
修改 CONCURRENT_CONNECTIONS 的值为 1000 或更高,再次运行测试脚本。观察服务端内存占用是否稳定,以及是否出现大量连接超时错误。只要资源未耗尽,Asyncio 就能轻松处理这种负载。

暂无评论,快来抢沙发吧!