设计一个支持百万级并发的WebSocket服务需要综合考虑异步编程、服务器架构优化、协议选择和资源管理。以下是基于Python的实现方案,分为核心步骤和代码示例:
1. 异步框架选择
使用高性能异步框架,推荐组合:
uvloop
(替代默认的asyncio
事件循环,性能提升40%-50%)websockets
(轻量级WebSocket库) 或FastAPI
+WebSocket
(需搭配异步ASGI服务器如Uvicorn
)
# 安装依赖
pip install uvloop websockets uvicorn fastapi
2. 服务端代码示例(使用websockets
库)
import asyncio
import uvloop
import websockets
from websockets import WebSocketServerProtocol
async def handle_connection(websocket: WebSocketServerProtocol, path: str):
try:
async for message in websocket:
# 处理客户端消息(示例:原样返回)
await websocket.send(f"Server received: {message}")
except websockets.ConnectionClosed:
print("Connection closed")
async def main():
async with websockets.serve(handle_connection, "0.0.0.0", 8765):
await asyncio.Future() # 永久运行
if __name__ == "__main__":
uvloop.install() # 替换默认事件循环
asyncio.run(main())
3. 关键优化措施
3.1 系统级调优
# 调整Linux内核参数(需root权限)
sysctl -w fs.file-max=1000000 # 最大文件描述符数
sysctl -w net.core.somaxconn=65535 # 最大连接队列
sysctl -w net.ipv4.tcp_max_syn_backlog=65535
sysctl -w net.ipv4.ip_local_port_range="1024 65535"
3.2 事件循环优化
# 使用uvloop替代默认事件循环(性能接近Go的goroutine)
uvloop.install()
3.3 连接管理
- 心跳机制:定期发送
ping/pong
帧 - 连接超时:清理僵尸连接
async def handle_connection(websocket: WebSocketServerProtocol, path: str):
try:
websocket.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 60)
while True:
await asyncio.wait_for(websocket.ping(), timeout=30)
await asyncio.sleep(10) # 每10秒发送一次心跳
except (asyncio.TimeoutError, websockets.ConnectionClosed):
await websocket.close()
4. 分布式架构
4.1 负载均衡
- 方案:使用
Nginx
或HAProxy
做横向扩展 - Nginx配置示例:
http {
upstream websocket_cluster {
least_conn;
server ws1.example.com:8765;
server ws2.example.com:8765;
keepalive 1024; # 保持长连接
}
server {
listen 80;
location / {
proxy_pass http://websocket_cluster;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}
}
4.2 状态共享
- Redis Pub/Sub:跨节点广播消息
import aioredis
async def handle_connection(websocket: WebSocketServerProtocol, path: str):
redis = await aioredis.create_redis("redis://localhost")
channel = await redis.subscribe("broadcast")
async for msg in channel[0].iter():
await websocket.send(msg.decode("utf-8"))
5. 性能测试工具
# 使用wrk或websocket-bench进行压测
websocket-bench -a 1000000 -c 1000 -w 8 -k ws://localhost:8765
6. 监控与调试
- Prometheus + Grafana:监控连接数、内存、CPU
- 日志异步化:使用
structlog
或loguru
避免I/O阻塞
import logging
from loguru import logger
logger.add("websocket.log", enqueue=True) # 异步写入日志
7. 扩展性设计
- 垂直扩展:单机优化(CPU绑定、内存分配)
- 水平扩展:通过Kubernetes动态扩容Pod
总结
- 核心要点:异步I/O + 事件循环优化 + 分布式架构
- 瓶颈突破:单机理论支持约5万~10万连接,百万级需多节点集群
- 注意事项:避免阻塞操作(如同步数据库调用),所有I/O必须异步化
实际部署时需结合具体业务场景调整参数(如心跳间隔、线程池大小)。