> 技术文档 > Django实时通信实战:WebSocket与ASGI全解析(下)

Django实时通信实战:WebSocket与ASGI全解析(下)


文章目录

    • 一、实战:构建实时聊天室
      • 环境准备
      • 实现 WebSocket 消费者
      • 配置 WebSocket 路由
      • 创建聊天界面模板
      • 实现效果
    • 二、生产环境部署
      • Nginx 配置
      • 使用 Gunicorn+Uvicorn 部署
      • 优化 Worker 数量

一、实战:构建实时聊天室

环境准备

下面将使用 Django Channels 构建一个多用户实时聊天室。Django Channels的介绍、安装与配置,参考上篇。

实现 WebSocket 消费者

创建mysite\\myapp_infra\\websocket\\consumers.py文件,这是处理 WebSocket 连接的核心。主要实现了如下方法:

  • connect:处理新连接,验证用户token,将用户通道写入缓存,并加入默认组
  • disconnect:处理连接断开,删除用户的缓存通道记录,将用户从所在的房间组中移除
  • receive:处理接收到的消息。首先进行心跳检测(ping/pong),然后验证用户身份,解析两层 JSON 结构的消息内容。根据消息类型 demo-message-send 处理文本消息:若指定接收用户则单发,否则群发广播。
  • broadcast_message:处理群发消息,从事件中提取payload数据并发送给所有连接的客户端
  • single_message:处理单发消息,从事件中提取payload数据并发送给指定的客户端
\"\"\"WebScoket 消费者\"\"\"import jsonimport loggingfrom django.core.cache import cachefrom django.conf import settingsfrom channels.generic.websocket import AsyncWebsocketConsumerfrom rest_framework_simplejwt.tokens import RefreshTokenfrom rest_framework_simplejwt.exceptions import TokenErrorlogger = logging.getLogger(__name__)class InfraConsumer(AsyncWebsocketConsumer): \"\"\"基础设施-WebSocket 异步消费者\"\"\" async def connect(self): \"\"\"当客户端发起 WebSocket 连接时调用\"\"\" query_params = self.scope[\"query_string\"].decode() # 从查询参数获取token token_param = [ param.split(\"=\") for param in query_params.split(\"&\") if param.startswith(\"token=\") ] if not token_param or len(token_param[0]) != 2: logger.error(\"缺少token参数\") await self.close(code=4001) # 自定义错误码 return token = token_param[0][1] try: # 验证Refresh Token有效性 refresh = RefreshToken(token) user_id = refresh[\"user_id\"] self.scope[\"user\"] = {\"id\": user_id, \"token_type\": \"refresh\"} # 登录成功后,将用户通道写入缓存 cache.set(f\"user_{user_id}_channel\", self.channel_name) # 加入默认组 self.room_group_name = settings.DEFAULT_GROUP_NAME await self.channel_layer.group_add(self.room_group_name, self.channel_name) # 接受客户端的WebSocket连接请求 await self.accept() except TokenError as e: logger.error(\"无效token\") await self.close(code=4003) except Exception as e: logger.error(str(e)) await self.close(code=4000) async def disconnect(self, close_code): \"\"\"当客户端断开 WebSocket 连接时调用\"\"\" # 获取当前用户 user = self.scope.get(\"user\") if user: user_id = user.get(\"id\") # 移除用户通道 cache.delete(f\"user_{user_id}_channel\") # 将用户从组中移除 room_group_name = getattr(self, \"room_group_name\", None) if room_group_name: await self.channel_layer.group_discard(  room_group_name, self.channel_name ) async def receive(self, text_data=None, bytes_data=None): \"\"\"当接收到客户端发送的消息时调用\"\"\" # 心跳检测 if text_data == \"ping\": await self.send(text_data=\"pong\") return user = self.scope.get(\"user\") if not user: logger.warning(f\"匿名用户访问拒绝:{text_data}\") return logger.info(f\"收到用户 {user.get(\'id\')} 发送消息:{text_data}\") # 因为消息采用了两次JSON封装,这里进行两次JSON解析 try: outer_payload = json.loads(text_data) # 外层JSON解释 message_type = outer_payload.get(\"type\") raw_content = outer_payload.get(\"content\", \"{}\") inner_content = json.loads(raw_content) # 内层JSON解释 except json.JSONDecodeError: logger.error(\"内容解析失败\") return # 处理业务消息 if message_type == \"demo-message-send\": message_text = inner_content.get(\"text\", \"\").strip() if not message_text: logger.warning(\"消息内容不能为空\") return # 构建响应字典 content = { \"fromUserId\": user.get(\"id\"), \"text\": message_text, \"single\": False, # 默认群发 } # 判断接收对象 target_user_id = inner_content.get(\"toUserId\") if target_user_id not in [None, \"\", 0]: # 单发 await self._send_single_message(target_user_id, content) else: # 群发广播 await self._send_broadcast_message(content) def _build_response(self, content): \"\"\"构建标准化的响应消息,进行两次JSON封装\"\"\" # 第一次封装:添加消息类型 inner_message = { \"type\": \"demo-message-receive\", \"content\": json.dumps(content), } # 第二次封装:整体序列化 return json.dumps(inner_message) async def _send_single_message(self, target_user_id, content): \"\"\"向指定用户发送单条消息\"\"\" # 获取用户通道 target_channel = cache.get(f\"user_{target_user_id}_channel\") if not target_channel: return False # 构建并发送消息 message = self._build_response(content) await self.channel_layer.send( target_channel, { \"type\": \"single.message\", \"payload\": message, }, ) return True async def _send_broadcast_message(self, content): \"\"\"向房间内所有用户广播消息\"\"\" message = self._build_response(content) await self.channel_layer.group_send( self.room_group_name, { \"type\": \"broadcast.message\", \"payload\": message, }, ) async def broadcast_message(self, event): \"\"\"处理群发消息\"\"\" payload = event[\"payload\"] await self.send(text_data=payload) async def single_message(self, event): \"\"\"处理单发消息\"\"\" payload = event[\"payload\"] await self.send(text_data=payload)

点击查看完整代码

配置 WebSocket 路由

创建mysite\\myapp_infra\\websocket\\routing.py文件,定义 WebSocket 的 URL 路由

\"\"\"WebSocket 路由配置\"\"\"from django.urls import re_path, pathfrom .consumers import InfraConsumerwebsocket_urlpatterns = [ # 调用as_asgi()类方法来获取一个 ASGI 应用程序 re_path(r\"^infra/ws/?$\", InfraConsumer.as_asgi()),]

然后在项目的mysite\\mysite\\asgi.py配置中添加 ASGI 路由

import osfrom django.core.asgi import get_asgi_applicationfrom channels.routing import ProtocolTypeRouter, URLRouterfrom channels.auth import AuthMiddlewareStack# 设置环境变量并初始化Django应用os.environ.setdefault(\"DJANGO_SETTINGS_MODULE\", \"myproject.settings\")django_application = get_asgi_application()def get_websocket_application(): \"\"\"延迟导入WebSocket路由(避免循环导入)\"\"\" from myapp_infra.websocket.routing import websocket_urlpatterns return AuthMiddlewareStack(URLRouter(websocket_urlpatterns))# 协议路由:区分HTTP和WebSocket请求application = ProtocolTypeRouter( { \"http\": django_application, \"websocket\": get_websocket_application(), })

点击查看完整代码

创建聊天界面模板

以Vue3界面为例,代码实现了一个 WebSocket 客户端界面,功能包括:

  • 连接控制:显示连接状态、开启/关闭 WebSocket 连接。
  • 消息发送:输入消息内容并选择接收人(单发或群发),点击发送按钮通过 WebSocket 发送消息。
  • 消息接收与展示:监听 WebSocket 返回的数据,解析不同类型的消息(如单发、群发、系统通知)并在右侧列表中倒序展示。
  • 用户列表获取:页面加载时获取用户列表用于选择消息接收人。

Django实时通信实战:WebSocket与ASGI全解析(下)

点击查看完整代码

实现效果

使用 ASGI 运行项目

# 开发环境启动uvicorn mysite.asgi:application --reload

使用不同的浏览器,分别登录不同的用户,实现实时互发消息。

Django实时通信实战:WebSocket与ASGI全解析(下)

二、生产环境部署

Nginx 配置

使用 Nginx 作为反向代理,添加以下配置来处理 WebSocket 连接。这段配置告诉 Nginx 如何正确处理 WebSocket 升级请求。

location /ws/ { proxy_pass http://backend; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection \"upgrade\";}

使用 Gunicorn+Uvicorn 部署

Gunicorn(全称 Green Unicorn)是一个用于 UNIX 的 高性能 Python WSGI HTTP 服务器。Gunicorn只能用于 Linux 系统,Windows上无法使用。

安装

pip install gunicorn# 查看版本gunicorn -vgunicorn -h

Gunicorn 作为进程管理器,配合 Uvicorn 工作进程处理 ASGI 应用

  • -w 工作进程的数量。默认启动 1 个 Worker 进程
  • -k 要运行的工作进程类型。
  • -b 指定要绑定的服务器套接字。
# 基本启动命令gunicorn -b 0.0.0.0:8000 -k uvicorn.workers.UvicornWorker mysite.asgi:application

优化 Worker 数量

Gunicorn Worker 数量的设置对性能影响很大,推荐的公式是:(CPU核心数 × 2) + 1。我们可以编写一个启动脚本来自动计算最佳 Worker 数量

#!/bin/bash# run_gunicorn.sh# 计算最佳 Worker 数CORES=$(nproc)WORKERS=$((CORES * 2 + 1))# 限制最大Worker数(避免内存不足)MAX_WORKERS=8if [ $WORKERS -gt $MAX_WORKERS ]; then WORKERS=$MAX_WORKERSfi# 启动命令gunicorn -b 0.0.0.0:8000 \\ --workers $WORKERS \\ --max-requests 1000 \\ # 预防内存泄漏 --timeout 120 \\  # 超时控制 --keep-alive 5 \\  # Keep-Alive -k uvicorn.workers.UvicornWorker \\ mysite.asgi:application

添加执行权限并运行

chmod +x run_gunicorn.sh./run_gunicorn.sh

您正在阅读的是《Django从入门到实战》专栏!关注不迷路~