> 技术文档 > 企业微信自建应用实现接收消息和发送消息功能(python)_企业微信 接收消息 python

企业微信自建应用实现接收消息和发送消息功能(python)_企业微信 接收消息 python

# 这一周我不断的琢磨企业微信自建应用并且实现了自建应用的消息接收和发送功能

1.笔记,记录

第一步:打开企业微信后台 https://work.weixin.qq.com

1.1 如果没有企业可以在这里申请,如果有可以直接扫码登录

1.2 打开后台-应用管理-自建应用-创建应用——填写自建应用的logo,应用名称,应用介绍等信息。

1.3 获取自建应用的AgentId,Secret,以及我的企业-企业ID信息

第二步:初步测试发送消息功能(注意:将刚刚保存好的信息正确填入到相应的代码段)

import json #用于处理json数据import urllib.parse # 用于对URL进行解析和构建import requests #用于发送HTTTP请求corpid = \'\' # 企业IDagentid = # 应用IDcorpsecret = \'\' # 应用Secrettouser = \'\' # 接收消息的用户# 企业微信API的基础URLbase = \'https://qyapi.weixin.qq.com\'# 请求登录凭证(access-token)# 构造函数(获取access-token的API URL)access_token_api = urllib.parse.urljoin(base, \'/cgi-bin/gettoken\') # 使用urllib.parse.urljoin来构建获取access-token的完整url# 定义请求参数(包括企业ID,应用密钥)params = {\'corpid\': corpid, \'corpsecret\': corpsecret}# 发送GET请求获取access-token,并且将json响应转化为python字典response = requests.get(url=access_token_api, params=params).json()# 从响应中获取access-tokenaccess_token = response[\'access_token\']# 发送消息# 构建发送消息的完整URL,包含access-tokenmessage_send_api = urllib.parse.urljoin(base, f\'/cgi-bin/message/send?access_token={access_token}\')# 定义要发送的消息数据(文本格式)data = {\'touser\': touser, \'msgtype\': \'text\', \'agentid\': agentid, \'text\': {\'content\': \'测试数据:hello world!\'}}# 发送POST请求以发送消息, 并将json响应转化为python字典response = requests.post(url=message_send_api, data=json.dumps(data)).json()# 当请求返回值为0时(异常处理)if response[\'errcode\'] == 0: print(\'发送成功\')else: print(response)

安装请求处理包 :

pip install requests

发送消息:

第三步:下载微信官方的加解密文件(注:下载对应的我这里下载的时python的加解密包)

下载地址:https://developer.work.weixin.qq.com/document/path/90468

3.1 下载加解密文件并且进行解压安装相关的依赖包

在这个加解密文件中主要使用(WXBizMsgCrypt3.py)文件,解压配置依赖包导入到项目文件中。

#!/usr/bin/env python# -*- encoding:utf-8 -*-\"\"\" 对企业微信发送给企业后台的消息加解密示例代码.@copyright: Copyright (c) 1998-2014 Tencent Inc.\"\"\"# ------------------------------------------------------------------------import loggingimport base64import randomimport hashlibimport timeimport structfrom Crypto.Cipher import AESimport xml.etree.cElementTree as ETimport socketimport ierror\"\"\"Crypto.Cipher包已不再维护,开发者可以通过以下命令下载安装最新版的加解密工具包 pip install pycryptodome\"\"\"class FormatException(Exception): passdef throw_exception(message, exception_class=FormatException): \"\"\"my define raise exception function\"\"\" raise exception_class(message)class SHA1: \"\"\"计算企业微信的消息签名接口\"\"\" def getSHA1(self, token, timestamp, nonce, encrypt): \"\"\"用SHA1算法生成安全签名 @param token: 票据 @param timestamp: 时间戳 @param encrypt: 密文 @param nonce: 随机字符串 @return: 安全签名 \"\"\" try: sortlist = [token, timestamp, nonce, encrypt] sortlist.sort() sha = hashlib.sha1() sha.update(\"\".join(sortlist).encode()) return ierror.WXBizMsgCrypt_OK, sha.hexdigest() except Exception as e: logger = logging.getLogger() logger.error(e) return ierror.WXBizMsgCrypt_ComputeSignature_Error, Noneclass XMLParse: \"\"\"提供提取消息格式中的密文及生成回复消息格式的接口\"\"\" # xml消息模板 AES_TEXT_RESPONSE_TEMPLATE = \"\"\"%(timestamp)s\"\"\" def extract(self, xmltext): \"\"\"提取出xml数据包中的加密消息 @param xmltext: 待提取的xml字符串 @return: 提取出的加密消息字符串 \"\"\" try: xml_tree = ET.fromstring(xmltext) encrypt = xml_tree.find(\"Encrypt\") return ierror.WXBizMsgCrypt_OK, encrypt.text except Exception as e: logger = logging.getLogger() logger.error(e) return ierror.WXBizMsgCrypt_ParseXml_Error, None def generate(self, encrypt, signature, timestamp, nonce): \"\"\"生成xml消息 @param encrypt: 加密后的消息密文 @param signature: 安全签名 @param timestamp: 时间戳 @param nonce: 随机字符串 @return: 生成的xml字符串 \"\"\" resp_dict = { \'msg_encrypt\': encrypt, \'msg_signaturet\': signature, \'timestamp\': timestamp, \'nonce\': nonce, } resp_xml = self.AES_TEXT_RESPONSE_TEMPLATE % resp_dict return resp_xmlclass PKCS7Encoder(): \"\"\"提供基于PKCS7算法的加解密接口\"\"\" block_size = 32 def encode(self, text): \"\"\" 对需要加密的明文进行填充补位 @param text: 需要进行填充补位操作的明文 @return: 补齐明文字符串 \"\"\" text_length = len(text) # 计算需要填充的位数 amount_to_pad = self.block_size - (text_length % self.block_size) if amount_to_pad == 0: amount_to_pad = self.block_size # 获得补位所用的字符 pad = chr(amount_to_pad) return text + (pad * amount_to_pad).encode() def decode(self, decrypted): \"\"\"删除解密后明文的补位字符 @param decrypted: 解密后的明文 @return: 删除补位字符后的明文 \"\"\" pad = ord(decrypted[-1]) if pad  32: pad = 0 return decrypted[:-pad]class Prpcrypt(object): \"\"\"提供接收和推送给企业微信消息的加解密接口\"\"\" def __init__(self, key): # self.key = base64.b64decode(key+\"=\") self.key = key # 设置加解密模式为AES的CBC模式 self.mode = AES.MODE_CBC def encrypt(self, text, receiveid): \"\"\"对明文进行加密 @param text: 需要加密的明文 @return: 加密得到的字符串 \"\"\" # 16位随机字符串添加到明文开头 text = text.encode() text = self.get_random_str() + struct.pack(\"I\", socket.htonl(len(text))) + text + receiveid.encode() # 使用自定义的填充方式对明文进行补位填充 pkcs7 = PKCS7Encoder() text = pkcs7.encode(text) # 加密 cryptor = AES.new(self.key, self.mode, self.key[:16]) try: ciphertext = cryptor.encrypt(text) # 使用BASE64对加密后的字符串进行编码 return ierror.WXBizMsgCrypt_OK, base64.b64encode(ciphertext) except Exception as e: logger = logging.getLogger() logger.error(e) return ierror.WXBizMsgCrypt_EncryptAES_Error, None def decrypt(self, text, receiveid): \"\"\"对解密后的明文进行补位删除 @param text: 密文 @return: 删除填充补位后的明文 \"\"\" try: cryptor = AES.new(self.key, self.mode, self.key[:16]) # 使用BASE64对密文进行解码,然后AES-CBC解密 plain_text = cryptor.decrypt(base64.b64decode(text)) except Exception as e: logger = logging.getLogger() logger.error(e) return ierror.WXBizMsgCrypt_DecryptAES_Error, None try: pad = plain_text[-1] # 去掉补位字符串 # pkcs7 = PKCS7Encoder() # plain_text = pkcs7.encode(plain_text) # 去除16位随机字符串 content = plain_text[16:-pad] xml_len = socket.ntohl(struct.unpack(\"I\", content[: 4])[0]) xml_content = content[4: xml_len + 4] from_receiveid = content[xml_len + 4:] except Exception as e: logger = logging.getLogger() logger.error(e) return ierror.WXBizMsgCrypt_IllegalBuffer, None if from_receiveid.decode(\'utf8\') != receiveid: return ierror.WXBizMsgCrypt_ValidateCorpid_Error, None return 0, xml_content def get_random_str(self): \"\"\" 随机生成16位字符串 @return: 16位字符串 \"\"\" return str(random.randint(1000000000000000, 9999999999999999)).encode()class WXBizMsgCrypt(object): # 构造函数 def __init__(self, sToken, sEncodingAESKey, sReceiveId): try: self.key = base64.b64decode(sEncodingAESKey + \"=\") assert len(self.key) == 32 except: throw_exception(\"[error]: EncodingAESKey unvalid !\", FormatException) # return ierror.WXBizMsgCrypt_IllegalAesKey,None self.m_sToken = sToken self.m_sReceiveId = sReceiveId # 验证URL # @param sMsgSignature: 签名串,对应URL参数的msg_signature # @param sTimeStamp: 时间戳,对应URL参数的timestamp # @param sNonce: 随机串,对应URL参数的nonce # @param sEchoStr: 随机串,对应URL参数的echostr # @param sReplyEchoStr: 解密之后的echostr,当return返回0时有效 # @return:成功0,失败返回对应的错误码 def VerifyURL(self, sMsgSignature, sTimeStamp, sNonce, sEchoStr): sha1 = SHA1() ret, signature = sha1.getSHA1(self.m_sToken, sTimeStamp, sNonce, sEchoStr) if ret != 0: return ret, None if not signature == sMsgSignature: return ierror.WXBizMsgCrypt_ValidateSignature_Error, None pc = Prpcrypt(self.key) ret, sReplyEchoStr = pc.decrypt(sEchoStr, self.m_sReceiveId) return ret, sReplyEchoStr def EncryptMsg(self, sReplyMsg, sNonce, timestamp=None): # 将企业回复用户的消息加密打包 # @param sReplyMsg: 企业号待回复用户的消息,xml格式的字符串 # @param sTimeStamp: 时间戳,可以自己生成,也可以用URL参数的timestamp,如为None则自动用当前时间 # @param sNonce: 随机串,可以自己生成,也可以用URL参数的nonce # sEncryptMsg: 加密后的可以直接回复用户的密文,包括msg_signature, timestamp, nonce, encrypt的xml格式的字符串, # return:成功0,sEncryptMsg,失败返回对应的错误码None pc = Prpcrypt(self.key) ret, encrypt = pc.encrypt(sReplyMsg, self.m_sReceiveId) encrypt = encrypt.decode(\'utf8\') if ret != 0: return ret, None if timestamp is None: timestamp = str(int(time.time())) # 生成安全签名 sha1 = SHA1() ret, signature = sha1.getSHA1(self.m_sToken, timestamp, sNonce, encrypt) if ret != 0: return ret, None xmlParse = XMLParse() return ret, xmlParse.generate(encrypt, signature, timestamp, sNonce) def DecryptMsg(self, sPostData, sMsgSignature, sTimeStamp, sNonce): # 检验消息的真实性,并且获取解密后的明文 # @param sMsgSignature: 签名串,对应URL参数的msg_signature # @param sTimeStamp: 时间戳,对应URL参数的timestamp # @param sNonce: 随机串,对应URL参数的nonce # @param sPostData: 密文,对应POST请求的数据 # xml_content: 解密后的原文,当return返回0时有效 # @return: 成功0,失败返回对应的错误码 # 验证安全签名 xmlParse = XMLParse() ret, encrypt = xmlParse.extract(sPostData) if ret != 0: return ret, None sha1 = SHA1() ret, signature = sha1.getSHA1(self.m_sToken, sTimeStamp, sNonce, encrypt) if ret != 0: return ret, None if not signature == sMsgSignature: return ierror.WXBizMsgCrypt_ValidateSignature_Error, None pc = Prpcrypt(self.key) ret, xml_content = pc.decrypt(encrypt, self.m_sReceiveId) return ret, xml_content

3.2 导入下载安装包中的(ierror.py)错误码提示文件

#!/usr/bin/env python# -*- coding: utf-8 -*-########################################################################## Author: jonyqin# Created Time: Thu 11 Sep 2014 01:53:58 PM CST# File Name: ierror.py# Description:定义错误码含义 ########################################################################## \"======================================================提供了错误码===================================================\"WXBizMsgCrypt_OK = 0WXBizMsgCrypt_ValidateSignature_Error = -40001WXBizMsgCrypt_ParseXml_Error = -40002WXBizMsgCrypt_ComputeSignature_Error = -40003WXBizMsgCrypt_IllegalAesKey = -40004WXBizMsgCrypt_ValidateCorpid_Error = -40005WXBizMsgCrypt_EncryptAES_Error = -40006WXBizMsgCrypt_DecryptAES_Error = -40007WXBizMsgCrypt_IllegalBuffer = -40008WXBizMsgCrypt_EncodeBase64_Error = -40009WXBizMsgCrypt_DecodeBase64_Error = -40010WXBizMsgCrypt_GenReturnXml_Error = -40011

第四步:编写发送和接收消息逻辑(fastapi)

在这里我使用fastapi实现了接口的开发,详细如下,逻辑代码中需要配置(接收消息设置里边的TOKEN,ENCODING_AES_KEY,CORP_ID等信息,这么后面讲怎么获取。

from fastapi import FastAPI, Request, HTTPExceptionfrom fastapi.responses import PlainTextResponse, Responsefrom WXBizMsgCrypt3 import WXBizMsgCryptimport xmltodictimport loggingimport timeimport xml.etree.ElementTree as ET# 配置日志logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__)app = FastAPI()# 企业微信配置TOKEN = \"\" # 设置的TokenENCODING_AES_KEY = \"\" #设置密钥CORP_ID = \"\" # 企业IDtry: wxcpt = WXBizMsgCrypt(TOKEN, ENCODING_AES_KEY, CORP_ID)except Exception as e: logger.error(f\"初始化WXBizMsgCrypt失败: {str(e)}\") raise@app.get(\"/callback\")async def verify_url(msg_signature: str, timestamp: str, nonce: str, echostr: str): \"\"\"验证URL有效性\"\"\" try: logger.info(f\"收到验证请求: msg_signature={msg_signature}, timestamp={timestamp}, nonce={nonce}\") ret, sEchoStr = wxcpt.VerifyURL(msg_signature, timestamp, nonce, echostr) if ret == 0: logger.info(\"URL验证成功\") return PlainTextResponse(content=sEchoStr) else: logger.error(f\"URL验证失败,错误码: {ret}\") raise HTTPException(status_code=400, detail=\"验证失败\") except Exception as e: logger.error(f\"验证过程发生错误: {str(e)}\") raise HTTPException(status_code=500, detail=\"服务器内部错误\")@app.post(\"/callback\")async def receive_message(request: Request): \"\"\"接收并处理企业微信消息\"\"\" try: # 获取请求参数 body = await request.body() msg_signature = request.query_params.get(\"msg_signature\") timestamp = request.query_params.get(\"timestamp\") nonce = request.query_params.get(\"nonce\") if not all([msg_signature, timestamp, nonce]): raise HTTPException(status_code=400, detail=\"缺少必要的参数\") logger.info(f\"收到消息推送: msg_signature={msg_signature}, timestamp={timestamp}, nonce={nonce}\") # 解密消息 ret, sMsg = wxcpt.DecryptMsg(body, msg_signature, timestamp, nonce) if ret != 0: logger.error(f\"消息解密失败,错误码: {ret}\") raise HTTPException(status_code=400, detail=\"消息解密失败\") # 解析XML消息 xml_dict = xmltodict.parse(sMsg) logger.info(f\"解密后的消息内容: {xml_dict}\") # 提取消息内容 xml_content = xml_dict[\'xml\'] to_user_name = xml_content.get(\'ToUserName\') from_user_name = xml_content.get(\'FromUserName\') create_time = xml_content.get(\'CreateTime\') msg_type = xml_content.get(\'MsgType\') content = xml_content.get(\'Content\') msg_id = xml_content.get(\'MsgId\') agent_id = xml_content.get(\'AgentID\') logger.info(f\"收到消息: {content}\") # 构造回复消息 reply_content = content.replace(\'吗\', \'\').replace(\'?\', \'!\').replace(\'?\', \'!\') current_time = str(int(time.time())) reply_msg = f\"\"\"    {current_time}   {msg_id} {agent_id}  \"\"\" # 加密回复消息 ret, encrypted_msg = wxcpt.EncryptMsg(reply_msg, nonce, current_time) if ret != 0: logger.error(f\"消息加密失败,错误码: {ret}\") raise HTTPException(status_code=500, detail=\"消息加密失败\") logger.info(\"成功构造并加密回复消息\") return Response(content=encrypted_msg, media_type=\"application/xml\") except Exception as e: logger.error(f\"处理消息时发生错误: {str(e)}\") raise HTTPException(status_code=500, detail=\"服务器内部错误\")if __name__ == \"__main__\": import uvicorn uvicorn.run(app, host=\"127.0.0.1\", port=5000)

第五步:回调配置

5.1 进入企业微信后台配置-接收消息

这里需要我们提供一个处理请求回调的URL,随机生成Token,EncodingAESKey,并且需要把这写数据配置到上方代码中

5.2 配置URL和项目部署

首先需要创建一个域名

添加好的域名中指向项目文件并且配置反向代理(设置-配置文件)

保存起来,然后创建一个文件,把配置好的代码上传到创建的文件中,并且验证回调服务,让程序跑起来。

5.3 配置成功以后保存企业微信的URL以及其他配置信息

结束了。。。

接下来看效果:

日志文件

效果:

配电柜