WebSocket 在前后端的完整使用流程_websocket前后端交互
一、前端 JavaScript 使用 WebSocket
1. 基本使用流程
// 1. 创建 WebSocket 连接 (ws:// 或 wss://)const socket = new WebSocket(\'ws://localhost:8000/ws\');// 2. 连接打开事件socket.addEventListener(\'open\', (event) => { console.log(\'WebSocket 连接已建立\'); // 发送文本消息 socket.send(\'Hello Server!\'); // 发送 JSON 数据 const data = { type: \'login\', user: \'John\' }; socket.send(JSON.stringify(data));});// 3. 接收消息事件socket.addEventListener(\'message\', (event) => { console.log(\'收到服务器消息:\', event.data); try { // 解析 JSON 数据 const jsonData = JSON.parse(event.data); console.log(\'解析后的数据:\', jsonData); } catch { // 处理文本消息 console.log(\'文本消息:\', event.data); }});// 4. 错误处理socket.addEventListener(\'error\', (error) => { console.error(\'WebSocket 错误:\', error);});// 5. 连接关闭事件socket.addEventListener(\'close\', (event) => { console.log(`连接关闭,代码: ${event.code}, 原因: ${event.reason}`);});// 6. 手动发送消息function sendMessage() { const message = document.getElementById(\'messageInput\').value; socket.send(message);}// 7. 关闭连接function closeConnection() { socket.close(1000, \'用户主动关闭\');}
2. 关键特性实现
二进制数据传输:
// 发送 ArrayBufferconst buffer = new ArrayBuffer(8);const view = new Uint8Array(buffer);view[0] = 42;socket.send(buffer);// 接收二进制数据socket.addEventListener(\'message\', ({ data }) => { if (data instanceof ArrayBuffer) { const view = new Uint8Array(data); console.log(\'收到二进制数据:\', view[0]); }});
心跳检测:
// 心跳检测setInterval(() => { if (socket.readyState === WebSocket.OPEN) { socket.send(\'ping\'); }}, 30000);// 处理服务器心跳响应socket.addEventListener(\'message\', ({ data }) => { if (data === \'pong\') { console.log(\'收到心跳响应\'); }});
自动重连机制:
function connect() { const socket = new WebSocket(\'ws://localhost:8000/ws\'); socket.addEventListener(\'close\', () => { console.log(\'连接断开,5秒后重连...\'); setTimeout(connect, 5000); }); return socket;}const socket = connect();
二、后端 Python 使用 WebSocket
推荐使用 websockets
库(异步)或 Flask-SocketIO
(同步)
1. 使用 websockets 库(异步)
# 安装: pip install websocketsimport asyncioimport websocketsimport jsonasync def handle_connection(websocket, path): print(\"客户端连接成功\") try: async for message in websocket: # 处理文本消息 if isinstance(message, str): print(f\"收到文本消息: {message}\") if message == \"ping\": await websocket.send(\"pong\") else: # 处理JSON数据 try: data = json.loads(message) print(f\"收到JSON数据: {data}\") # 响应消息 response = {\"status\": \"success\", \"message\": \"Data received\"} await websocket.send(json.dumps(response)) except json.JSONDecodeError: await websocket.send(f\"ECHO: {message}\") # 处理二进制消息 elif isinstance(message, bytes): print(f\"收到二进制数据: {message[:10]}...\") await websocket.send(message[::-1]) # 反转后返回 except websockets.ConnectionClosed: print(\"客户端断开连接\")# 启动服务器start_server = websockets.serve( handle_connection, \"localhost\", 8000, ping_interval=20, # 每20秒发送Ping ping_timeout=5 # 5秒未响应视为断开)asyncio.get_event_loop().run_until_complete(start_server)asyncio.get_event_loop().run_forever()
2. 使用 Flask-SocketIO(同步)
# 安装: pip install flask-socketiofrom flask import Flaskfrom flask_socketio import SocketIO, emitapp = Flask(__name__)socketio = SocketIO(app, cors_allowed_origins=\"*\")# WebSocket 连接事件@socketio.on(\'connect\')def handle_connect(): print(\'客户端连接成功\', request.sid) emit(\'server_response\', {\'data\': \'Connected\'})# 处理文本消息@socketio.on(\'text_message\')def handle_text(message): print(f\'收到文本: {message}\') emit(\'server_response\', {\'data\': f\'Received: {message}\'})# 处理JSON数据@socketio.on(\'json_data\')def handle_json(json_data): print(f\'收到JSON: {json_data}\') # 广播给所有客户端 emit(\'broadcast\', json_data, broadcast=True)# 处理二进制数据@socketio.on(\'binary_data\')def handle_binary(bin_data): print(f\'收到{len(bin_data)}字节二进制数据\') emit(\'binary_response\', bin_data)# 断开连接事件@socketio.on(\'disconnect\')def handle_disconnect(): print(\'客户端断开\', request.sid)if __name__ == \'__main__\': socketio.run(app, host=\'0.0.0.0\', port=8000)
三、前后端交互完整示例
前端 (JavaScript)
<!DOCTYPE html><html><body> <input type=\"text\" id=\"messageInput\" placeholder=\"输入消息\"> <button onclick=\"sendMessage()\">发送</button> <button onclick=\"sendBinary()\">发送二进制数据</button> <button onclick=\"closeConn()\">关闭连接</button> <div id=\"output\"></div> <script> const output = document.getElementById(\'output\'); const socket = new WebSocket(\'ws://localhost:8000/ws\'); // 连接事件 socket.onopen = () => { appendMessage(\'连接已建立\'); socket.send(JSON.stringify({ type: \'greeting\', msg: \'Hello Server!\' })); }; // 接收消息 socket.onmessage = ({ data }) => { if (typeof data === \'string\') { appendMessage(`文本: ${data}`); } else { appendMessage(`二进制数据长度: ${data.byteLength} 字节`); } }; // 错误处理 socket.onerror = (error) => { appendMessage(`错误: ${error.message}`); }; // 关闭事件 socket.onclose = (event) => { appendMessage(`连接关闭: ${event.code} - ${event.reason}`); }; function sendMessage() { const msg = document.getElementById(\'messageInput\').value; socket.send(msg); } function sendBinary() { const buffer = new Uint8Array([65, 66, 67, 68]); // ABCD socket.send(buffer); } function closeConn() { socket.close(1000, \'正常关闭\'); } function appendMessage(text) { output.innerHTML += `${text}`; } </script></body></html>
后端 (Python - websockets)
import asyncioimport websocketsasync def server(websocket, path): print(\"Client connected\") try: # 发送欢迎消息 await websocket.send(\"欢迎使用WebSocket服务\") async for message in websocket: # 文本消息处理 if isinstance(message, str): print(f\"收到文本: {message}\") await websocket.send(f\"服务器回复: {message}\") # 二进制消息处理 elif isinstance(message, bytes): print(f\"收到二进制数据: {message.hex()}\") await websocket.send(message[::-1]) # 反转数据返回 except websockets.ConnectionClosed: print(\"Client disconnected\")start_server = websockets.serve(server, \"localhost\", 8000)asyncio.get_event_loop().run_until_complete(start_server)asyncio.get_event_loop().run_forever()
四、生产环境最佳实践
-
安全加固
- 始终使用
wss://
(TLS加密) - 验证来源:
if origin not in allowed_origins: await websocket.close()
- 添加认证令牌
// 前端const socket = new WebSocket(\'wss://api.example.com/ws?token=xxxx\');// 后端token = websocket.request_headers.get(\'Sec-WebSocket-Protocol\')
- 始终使用
-
消息协议设计
{ \"type\": \"chat/message\", \"timestamp\": 1620000000, \"payload\": { \"user\": \"Alice\", \"text\": \"Hello!\" }}
-
性能优化
- 启用消息压缩:
new WebSocket(url, protocols, { compression: true })
- 后端批量处理消息
- 前端节流发送频率
- 启用消息压缩:
-
错误恢复
- 指数退避重连
let reconnectAttempts = 0;function reconnect() { const delay = Math.min(1000 * Math.pow(2, reconnectAttempts), 30000); setTimeout(connect, delay); reconnectAttempts++;}
-
监控指标
- 连接数
- 消息吞吐量
- 平均延迟
- 错误率
五、常用库推荐
ws
websockets
Flask-SocketIO
Java-WebSocket
gorilla/websocket
# Python 性能测试 (每秒消息数)pip install websockets httptools uvloop# 启动时使用 uvloop 提升性能asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
以上示例展示了 WebSocket 在前后端的完整使用流程,包含了连接管理、消息处理、错误恢复等核心功能,可直接用于实际项目开发。