山东大学项目实训(2) AI响应服务器的搭建_大学ai搭建
山东大学项目实训(2) AI响应服务器的搭建
往期文章
山东大学项目实训(1) DeepSeek API调用
文章目录
- 山东大学项目实训(2) AI响应服务器的搭建
-
- 往期文章
- 1. AI服务器的作用
- 2. 我们的项目架构
- 3. AI服务器编写
-
- AI服务器问答交互
- 多轮对话处理
- 代码编写
1. AI服务器的作用
AI服务器就是下与接受deepseek模型交互, 获取AI回答后返回到应用中的中间服务器. 其功能如下
其实从上图看出AI服务器是一个很\"鸡肋\"的功能, 貌似后端同学可以直接调用deepseek模型, 但是这样的情形不利于我们分工开发和我进行模型的研究, 故我暂时实现了一个AI服务器, 其在后期可能融入后端当中, 详见下部分架构的介绍.
2. 我们的项目架构
我提出了1, 2两种架构, 加上我们组员提出的一种架构共三种
这三种架构各有优劣, 我列举如下
- 方案1
- 几个优点: 开发各自独立, 很适合我们这样的小组作业, 且可扩展性高, 代码耦合度低. 适合跨语言作业, 像我们组后端使用java且AI服务使用python的情况下也可简单地使用http等通用协议调用.
- 缺点: 后期问题追踪不易, 且显而易见的效率低, 但实则不然, 其效率在以下几种情形有优势:
- 后端和服务器在一台服务器上运行, 此时二者间通信耗时很小, 当然速度还是不如方案2, 但初步考虑我们的用户需求应该够用.
- 后端性能积压, 比如要处理搜题等复杂任务, 这时将AI服务器分离到别的服务器上可以减缓后端性能压力.
- AI模型本地部署, 当我们将AI部署到本地时, AI服务器可直接承担AI模型的运作. 相当于架构方案图中的本地部署部分.
- 方案2
- 优点:效率较高.
- 缺点:代码耦合度高, 后续扩展困难. 在本地部署时需要后端自行处理AI的输出.
- 方案3
- 优点:集成了方案1的架构优势和方案2的高效率
- 缺点:前端工作量大, 且需要在数据统一处理和AI服务器不存数据间做出选择.
由于我们现在的需求较简单, 只在多用户对话的上下文处理间有些许疑问, 故三种架构均可, 我们暂定方案1. 因此下文中我都假设是后端调用AI服务器.
大模型的所有调用参数基本都可以在对话内容中给出, 其强大的语言处理能力使得用户个性化信息的传递非常容易. 例如:用户在数据结构领域答题正确率是70%, 我们只需要把这个信息告诉deepseek即可, 无需其它处理. 因此这使得AI服务器与后端的交互很简单. 换句话说, 这印证了AI服务器很\"鸡肋\" .
3. AI服务器编写
AI服务器问答交互
我们的大语言模型需求先定为智能问答, 智能答题和智能出题. 由于大语言模型的API参数较少, 因此我们只需设置不同的temperature和引导词即可. 引导词例如: {\"role\":\"system\", \"content\":\"你在为用户出题, 请按照\'题目内容答案内容\'的格式出题\"}
引导词的设置可由后端或服务器端完成. 返回即为一条json对话或者流数据或者纯粹是AI回答的文本, 内容为AI的回答.
多轮对话处理
我们目前设定一个用户可以有若干个对话, 每个对话都有其唯一ID, 因此AI服务器只关注对话ID而不关注用户, 对于用户信息后端许通过参数传递或者直接放在与AI的对话中, 例如\"用户在数据结构领域历史正确率为70%, 请据此出一道题目, 使其符合并能提高用户水平\"
. 这种个性化工作可由后端或者AI服务器完成, 我们组在后期合并阶段可由双方同学合作完成.
用户对话信息存储是个大问题. 我们暂时决定将对话记录存到用户本地, 还有一种可选方案是存到后端服务器上的数据库中.总而言之, 其大概率不会存在AI服务器本地. 因此, 我们需要在对话开始时将内容加载进AI服务器中. 考虑到服务器关断后重启等情况, 我选择处理以下两种情况:
- 后端调用初始化函数, 在AI服务器缓存中注册对话ID, 并将后端给的历史记录存入服务器缓存中.
- 后端进行对话请求, 这时缓存中若没有此对话的ID记录, 则新建一个 ,若有, 则读取历史对话记录.
代码编写
类的初始化, 其中save函数负责将内存中历史记录传回后端, 考虑到前/后端自主维护用户对话记录的设计, 一般用不到.
class DeepSeekChatServer: def __init__(self): #flask框架 self.app = Flask(__name__) CORS(self.app) #对话内容内存,重要 self.conversations = {} # {conversation_id: [messages]} #api调用相关 #一般人用deepseek官网,这是我们学校的api #记得把/v1/chat/completions部分加上 #openai的库会自动给你添加而你自己写的request不会 self.api_base = \"http://学校给的api/v1/chat/completions\" self.api_key = \"sk-差点忘了删\" #填写你自己的key #model类型也按官网来,这个是自己api上的类型 self.AI_model=\"DeepSeek-R1\" # 注册路由 self.app.add_url_rule(\'/chat\', \'chat_endpoint\', self.chat_endpoint, methods=[\'POST\']) self.app.add_url_rule(\'/save\', \'save_conversation\', self.save_conversation, methods=[\'POST\']) self.app.add_url_rule(\'/load\', \'load_conversation\', self.load_conversation, methods=[\'POST\'])
读取历史记录与初始化对话记录
def get_or_create_conversation(self, conversation_id=None): cid = conversation_id or str(uuid.uuid4()) if cid not in self.conversations: self.conversations[cid] = [] return cid def init_conversation( self, conversation_id: Optional[str] = None, init_messages: Optional[List[Dict]] = None ) -> str: \"\"\"初始化或重置对话,严格校验消息格式\"\"\" cid = conversation_id or str(uuid.uuid4()) # 校验初始化消息 if init_messages is not None: if not isinstance(init_messages, list): raise ValueError(\"初始化消息必须是消息列表\") if not all(self._validate_message(msg) for msg in init_messages): raise ValueError(\"消息格式不合法\") self.conversations[cid] = init_messages.copy() if init_messages else [] return cid
传输信息合法性检验,如果是信任的后端传来的,建议不使用以提高效率.
def _validate_message(self, message: Dict) -> bool: required_keys = {\"role\", \"content\"} return ( isinstance(message, dict) and all(key in message for key in required_keys) and message[\"role\"] in (\"user\", \"assistant\", \"system\") and isinstance(message[\"content\"], str) and len(message[\"content\"].strip()) > 0 )
主要的请求处理部分, 支持流式和非流式输出, 让deepseek给我加了很多错误处理部分. 注意data, header等部分其实可以写到类的属性里以提高效率, 不用每次都在代码里生成, 但是我们的项目还没固定, 先这么写着.
def _build_headers(self): return { \"Content-Type\": \"application/json\", \"Authorization\": f\"Bearer {self.api_key}\", \"Accept\": \"application/json\" } def generate_response(self, conversation_id, stream=False): messages = self.conversations.get(conversation_id, []) data = { \"model\": self.AI_model, \"messages\": messages, \"temperature\": 0.7, \"stream\": stream } response = requests.post( self.api_base, headers=self._build_headers(), json=data, stream=stream ) if response.status_code != 200: return jsonify({\"error\": f\"API Error: {response.text}\"}), 500 if stream: def generate(): full_content = \"\" for line in response.iter_lines(): if not line: continue decoded_line = line.decode(\'utf-8\') if not decoded_line.startswith(\"data: \"): continue chunk = json.loads(decoded_line[6:]) if chunk==\"[DONE]\" : break try: if \"content\" in chunk[\"choices\"][0][\"delta\"]: delta = chunk[\"choices\"][0][\"delta\"][\"content\"] full_content += delta yield delta except json.JSONDecodeError: yield \"[数据解析错误]\" self.conversations[conversation_id].append({ \"role\": \"assistant\", \"content\": full_content }) return Response(generate(), mimetype=\'text/event-stream\') else: content = response.json()[\"choices\"][0][\"message\"][\"content\"] self.conversations[conversation_id].append({ \"role\": \"assistant\", \"content\": content }) return jsonify({ \"conversation_id\": conversation_id, \"response\": content })
向外暴露的接口与运行, 也让deepseek添加了很多错误处理.
def chat_endpoint(self): data = request.json try: # 校验基础参数 if \'message\' not in data or not data[\'message\'].strip(): return jsonify({\"error\": \"消息内容不能为空\"}), 400 # 处理对话ID cid = self.get_or_create_conversation(data.get(\'conversation_id\')) # 构造消息 new_message = { \"role\": \"user\", \"content\": data[\'message\'].strip() } if not self._validate_message(new_message): return jsonify({\"error\": \"无效的消息结构\"}), 400 # 添加消息 self.conversations[cid].append(new_message) return self.generate_response(cid, data.get(\'stream\', False)) except KeyError: return jsonify({\"error\": \"请求格式错误\"}), 400 def save_conversation(self): data = request.json cid = data[\'conversation_id\'] return jsonify({ \"conversation_id\": cid, \"messages\": self.conversations.get(cid, []) }) def load_conversation(self): data = request.json try: cid = self.init_conversation( data.get(\'conversation_id\'), data[\'messages\'] ) return jsonify({\"conversation_id\": cid}), 200 except KeyError: return jsonify({\"error\": \"缺少必要参数 messages\"}), 400 except ValueError as e: return jsonify({\"error\": str(e)}), 400 def run(self, host=\'127.0.0.1\', port=5000, debug=True): self.app.run(host=host, port=port, debug=debug)
运行代码
if __name__ == \'__main__\': server = DeepSeekChatServer() server.run()
完整代码
import osfrom typing import Dict, List, Optionalimport uuidimport jsonfrom flask import Flask, request, jsonify, Responsefrom flask_cors import CORSimport requestsclass DeepSeekChatServer: def __init__(self): #flask框架 self.app = Flask(__name__) CORS(self.app) #对话内容内存,重要 self.conversations = {} # {conversation_id: [messages]} #api调用相关 #一般人用deepseek官网,这是我们学校的api #记得把/v1/chat/completions部分加上 #openai的库会自动给你添加而你自己写的request不会 self.api_base = \"http://某个api:某个端口/v1/chat/completions\" self.api_key = \"sk-又差点忘了删\" self.AI_model=\"DeepSeek-R1\" # 注册路由 self.app.add_url_rule(\'/chat\', \'chat_endpoint\', self.chat_endpoint, methods=[\'POST\']) self.app.add_url_rule(\'/save\', \'save_conversation\', self.save_conversation, methods=[\'POST\']) self.app.add_url_rule(\'/load\', \'load_conversation\', self.load_conversation, methods=[\'POST\']) def get_or_create_conversation(self, conversation_id=None): cid = conversation_id or str(uuid.uuid4()) if cid not in self.conversations: self.conversations[cid] = [] return cid def init_conversation( self, conversation_id: Optional[str] = None, init_messages: Optional[List[Dict]] = None ) -> str: \"\"\"初始化或重置对话,严格校验消息格式\"\"\" cid = conversation_id or str(uuid.uuid4()) # 校验初始化消息 if init_messages is not None: if not isinstance(init_messages, list): raise ValueError(\"初始化消息必须是消息列表\") if not all(self._validate_message(msg) for msg in init_messages): raise ValueError(\"消息格式不合法\") self.conversations[cid] = init_messages.copy() if init_messages else [] return cid def _validate_message(self, message: Dict) -> bool: required_keys = {\"role\", \"content\"} return ( isinstance(message, dict) and all(key in message for key in required_keys) and message[\"role\"] in (\"user\", \"assistant\", \"system\") and isinstance(message[\"content\"], str) and len(message[\"content\"].strip()) > 0 ) def _build_headers(self): return { \"Content-Type\": \"application/json\", \"Authorization\": f\"Bearer {self.api_key}\", \"Accept\": \"application/json\" } def generate_response(self, conversation_id, stream=False): messages = self.conversations.get(conversation_id, []) data = { \"model\": self.AI_model, \"messages\": messages, \"temperature\": 0.7, \"stream\": stream } response = requests.post( self.api_base, headers=self._build_headers(), json=data, stream=stream ) if response.status_code != 200: return jsonify({\"error\": f\"API Error: {response.text}\"}), 500 if stream: def generate(): full_content = \"\" for line in response.iter_lines(): if not line: continue decoded_line = line.decode(\'utf-8\') if not decoded_line.startswith(\"data: \"): continue chunk = json.loads(decoded_line[6:]) if chunk==\"[DONE]\" : break try: if \"content\" in chunk[\"choices\"][0][\"delta\"]: delta = chunk[\"choices\"][0][\"delta\"][\"content\"] full_content += delta yield delta except json.JSONDecodeError: yield \"[数据解析错误]\" self.conversations[conversation_id].append({ \"role\": \"assistant\", \"content\": full_content }) return Response(generate(), mimetype=\'text/event-stream\') else: content = response.json()[\"choices\"][0][\"message\"][\"content\"] self.conversations[conversation_id].append({ \"role\": \"assistant\", \"content\": content }) return jsonify({ \"conversation_id\": conversation_id, \"response\": content }) def chat_endpoint(self): data = request.json try: # 校验基础参数 if \'message\' not in data or not data[\'message\'].strip(): return jsonify({\"error\": \"消息内容不能为空\"}), 400 # 处理对话ID cid = self.get_or_create_conversation(data.get(\'conversation_id\')) # 构造消息 new_message = { \"role\": \"user\", \"content\": data[\'message\'].strip() } if not self._validate_message(new_message): return jsonify({\"error\": \"无效的消息结构\"}), 400 # 添加消息 self.conversations[cid].append(new_message) return self.generate_response(cid, data.get(\'stream\', False)) except KeyError: return jsonify({\"error\": \"请求格式错误\"}), 400 def save_conversation(self): data = request.json cid = data[\'conversation_id\'] return jsonify({ \"conversation_id\": cid, \"messages\": self.conversations.get(cid, []) }) def load_conversation(self): data = request.json try: cid = self.init_conversation( data.get(\'conversation_id\'), data[\'messages\'] ) return jsonify({\"conversation_id\": cid}), 200 except KeyError: return jsonify({\"error\": \"缺少必要参数 messages\"}), 400 except ValueError as e: return jsonify({\"error\": str(e)}), 400 def run(self, host=\'127.0.0.1\', port=5000, debug=True): self.app.run(host=host, port=port, debug=debug)if __name__ == \'__main__\': server = DeepSeekChatServer() server.run()