文章目录

Python 协程Asyncio处理高并发Websocket连接

发布于 2026-04-13 23:20:41 · 浏览 27 次 · 评论 0 条

Python 协程Asyncio处理高并发Websocket连接

WebSocket 是一种全双工通信协议,常用于实时聊天、股票报价和在线游戏等场景。当连接数从几十个增长到几万个时,传统的多线程模型会因为频繁的上下文切换导致系统资源耗尽。Python 的 asyncio 库利用协程实现单线程并发,能够以极低的内存开销处理成千上万个长连接。


理解核心原理

传统多线程模型像是一个拥有很多员工的餐厅,每个员工(线程)只服务一桌客人,客人不吃菜时员工也只能干等着。这导致人员成本(内存)极高,且人员调度(CPU切换)很耗时。

Asyncio 协程模型则像是一个拥有超能力的服务员(事件循环)。他只为正在点菜或上菜的客人服务,一旦客人开始自己吃饭(等待 I/O),服务员立刻转身去服务下一桌,直到上一桌客人呼叫(事件触发)再回去。这极大地节省了人力,提高了效率。

为了更直观地理解 asyncio 的事件循环处理机制,请看下面的流程图:

graph LR A[启动事件循环] --> B{等待新连接} B -->|收到连接请求| C["创建协程任务: handler"] C --> D["执行 I/O 操作: 接收消息"] D -->|数据到达| E["处理数据: 业务逻辑"] E --> D D -->|等待中| F["挂起当前协程"] F --> B B -->|无新请求且无活跃I/O| G[结束循环]

准备开发环境

确保你的系统中已安装 Python 3.7 或更高版本。

打开终端或命令行工具,输入以下命令 安装 websockets 库,这是 Python 中处理 WebSocket 最成熟的异步库之一:

pip install websockets

编写基础 WebSocket 服务端

创建一个名为 server.py 的文件。这个文件将运行在服务器上,负责监听端口、建立连接并收发消息。

编写以下代码,构建一个最简单的“回声”服务器(收到什么消息,就原样返回什么):

import asyncio
import websockets

# 定义异步处理函数,每个连接都会产生一个新的协程来运行此函数
async def handler(websocket):
    print(f"客户端 {websocket.remote_address} 已连接")
    try:
        # 循环接收消息
        async for message in websocket:
            print(f"收到消息: {message}")
            # 将消息原样发送回客户端
            await websocket.send(f"Echo: {message}")
    except websockets.exceptions.ConnectionClosed:
        print(f"客户端 {websocket.remote_address} 断开连接")

async def main():
    # 设置服务器监听地址和端口
    # "0.0.0.0" 表示监听所有网络接口,8765 是端口号
    async with websockets.serve(handler, "0.0.0.0", 8765):
        print("WebSocket 服务已启动,监听端口 8765")
        # 永久运行,直到程序被手动终止
        await asyncio.Future()  

if __name__ == "__main__":
    # 运行主程序
    asyncio.run(main())

保存文件。在终端中执行 python server.py。如果看到输出 WebSocket 服务已启动,监听端口 8765,说明服务端已准备就绪。


实现高并发广播功能

仅仅“回声”是不够的,在实际应用中,通常需要将一条消息发送给所有连接的客户端(例如聊天室)。我们需要维护一个“连接池”,在内存中记录所有活跃的连接对象。

新建一个名为 broadcast_server.py 的文件,输入以下代码:

import asyncio
import websockets

# 创建一个集合,用于存储所有活跃的连接对象
connected_users = set()

async def broadcast(message, sender_ws):
    """将消息发送给除发送者外的所有用户"""
    if connected_users:  # 确保集合不为空
        # 列表推导式创建任务列表,实现并发发送而非串行
        tasks = [
            user_ws.send(message) 
            for user_ws in connected_users 
            if user_ws != sender_ws
        ]
        if tasks:
            # 并发执行所有发送任务,等待全部完成
            await asyncio.gather(*tasks)

async def handler(websocket):
    # 1. 注册:当新连接进来时,将其加入集合
    connected_users.add(websocket)
    user_addr = websocket.remote_address
    print(f"用户 {user_addr} 加入,当前在线人数: {len(connected_users)}")

    try:
        # 发送欢迎消息给当前用户
        await websocket.send(f"System: 欢迎!当前在线人数: {len(connected_users)}")

        # 2. 监听消息
        async for message in websocket:
            print(f"{user_addr} 说: {message}")
            # 3. 广播:将消息转发给其他人
            await broadcast(f"User {user_addr}: {message}", websocket)

    except websockets.exceptions.ConnectionClosed:
        pass
    finally:
        # 4. 注销:无论正常断开还是异常断开,都要从集合中移除
        connected_users.remove(websocket)
        print(f"用户 {user_addr} 离开,当前在线人数: {len(connected_users)}")

async def main():
    async with websockets.serve(handler, "0.0.0.0", 8765):
        print("广播服务已启动,监听端口 8765")
        await asyncio.Future()

if __name__ == "__main__":
    asyncio.run(main())

注意代码中的 finally 块,这非常重要。它确保了即使客户端异常崩溃,服务端也能清理连接池,防止内存泄漏。


编写多连接测试脚本

为了验证服务端是否能处理高并发,我们需要编写一个客户端脚本,模拟成百上千个连接同时发送消息。

创建 stress_test_client.py编写如下代码:

import asyncio
import websockets

# 模拟的并发连接数
CONCURRENT_CONNECTIONS = 100 
# 每个连接发送的消息数
MESSAGES_PER_CONNECTION = 5

async def client_task(client_id):
    uri = "ws://localhost:8765"
    try:
        # 尝试连接服务器
        async with websockets.connect(uri) as websocket:
            print(f"客户端 {client_id} 连接成功")

            # 接收欢迎消息
            await websocket.recv()

            for i in range(MESSAGES_PER_CONNECTION):
                msg = f"Client-{client_id}-Msg-{i}"
                # 发送消息
                await websocket.send(msg)
                # 接收广播消息(可能会收到很多其他人的消息)
                response = await websocket.recv()
                # 这里我们只打印自己发的消息的回显,避免控制台刷屏
                if f"Client-{client_id}" in response:
                    pass 

            print(f"客户端 {client_id} 完成测试")

    except Exception as e:
        print(f"客户端 {client_id} 出错: {e}")

async def main():
    print(f"开始启动 {CONCURRENT_CONNECTIONS} 个并发客户端...")
    # 创建任务列表
    tasks = [client_task(i) for i in range(CONCURRENT_CONNECTIONS)]
    # 并发启动所有客户端
    await asyncio.gather(*tasks)
    print("所有客户端测试完成")

if __name__ == "__main__":
    asyncio.run(main())

先运行 python broadcast_server.py 启动服务端。

打开一个新的终端窗口,运行 python stress_test_client.py


验证结果与性能分析

观察服务端的终端输出。如果一切正常,你应该会看到大量的“用户加入”日志瞬间刷屏,随后开始接收消息。

我们可以通过计算吞吐量来评估性能。假设我们用 $T$ 秒处理了 $N$ 个连接,每个连接发送了 $M$ 条消息。系统的总消息吞吐量 $Q$ 可以通过以下公式计算:

$$ Q = \frac{N \times M}{T} $$

如果你的测试参数是 100 个连接,每个发送 5 条消息,耗时 2 秒完成,那么吞吐量就是:

$$ Q = \frac{100 \times 5}{2} = 250 \text{ 条/秒} $$

asyncio 模型下,增加 $N$ 到 1000 甚至 5000,只要 CPU 足够强且带宽允许,$T$ 的增长通常是线性的,而不会像多线程模型那样因为线程切换开销导致性能指数级下降。

检查服务端代码中的 broadcast 函数。注意 asyncio.gather(*tasks) 这一行。如果没有使用 gather,而是使用 for 循环配合 await,那么服务端在向第一个用户发完消息前,不会向第二个用户发送。这将导致广播变成串行操作,极大地拖慢整体速度。

修改 CONCURRENT_CONNECTIONS 的值为 1000 或更高,再次运行测试脚本。观察服务端内存占用是否稳定,以及是否出现大量连接超时错误。只要资源未耗尽,Asyncio 就能轻松处理这种负载。

评论 (0)

暂无评论,快来抢沙发吧!

扫一扫,手机查看

扫描上方二维码,在手机上查看本文