> 技术文档 > 【飞书机器人】自动执行后台任务并通知_飞书机器人自动推送怎么设置

【飞书机器人】自动执行后台任务并通知_飞书机器人自动推送怎么设置

初次使用飞书机器人,一开始以为只能使用常规的webhook进行通知,后来才发现自定义机器人功能非常强大,同时也有个小功能可以借用自定义机器人去更好的完成,以下是探索使用的记录

一、建立企业自建应用

1、登录 飞书开放平台
2、创建企业自建应用(输入名称、描述、图标等,点击创建即创建完毕了)

【飞书机器人】自动执行后台任务并通知_飞书机器人自动推送怎么设置

二、自定义机器人配置

1、侧边栏选择-添加应用能力-选择“机器人”

【飞书机器人】自动执行后台任务并通知_飞书机器人自动推送怎么设置

2、添加成功后,可以在侧边栏设置机器人功能
1)如何开始使用:这里设置机器人卡片上的机器人使用说明

【飞书机器人】自动执行后台任务并通知_飞书机器人自动推送怎么设置
2)机器人自定义菜单:这里设置与机器人私聊界面的快捷菜单
菜单状态设置为开启,再设置具体的菜单展现形式。
【飞书机器人】自动执行后台任务并通知_飞书机器人自动推送怎么设置

3)配置菜单:
可以配置具体的功能按钮
录入按钮名称后,可以设置这个按钮的具体功能,目前有3个响应动作:
①跳转至指定链接:类似于一个超链接

【飞书机器人】自动执行后台任务并通知_飞书机器人自动推送怎么设置

②推送事件,这个可以推送用自定义事件的ID,功能简单来讲就是:点击按钮时直接调用后端服务,从而进行处理及响应。(需要提前配置“【事件】配置”,后面会讲到)

【飞书机器人】自动执行后台任务并通知_飞书机器人自动推送怎么设置

③发送文字消息:点了这个按钮,会发送“名称”处录入的文字消息,和自己输入文字没啥区别,就是快一点,主要作用也是在于打了文字之后,机器人通过【事件】接收消息,并发送给后端服务,后端可以对不同的消息进行处理及响应,和第二点推送事件效果类似,差异在于两者推送事件时,请求体中的event_type,一个是消息:im.message.receive_v1,一个是自定义事件:application.bot.menu_v6

【飞书机器人】自动执行后台任务并通知_飞书机器人自动推送怎么设置

三、事件与回调

侧边栏选择-开发配置-事件与回调

1、事件配置
①选择订阅方式,分为两种:一种为长连接,一种为将事件发送至开发者服务器。
a.长连接:需要使用官方SDK。
b.将事件发送至 开发者服务器:这个需要提前部署个接口在飞书可访问的服务器上,按要求返回响应内容,就配置成功了。后续请求地址路径不能改了,改了就要重新验证。

【飞书机器人】自动执行后台任务并通知_飞书机器人自动推送怎么设置

②、添加事件
当机器人被触发相关事件后,向后端服务发送具体消息body,后端根据body,做出具体的事务处理及响应。

【飞书机器人】自动执行后台任务并通知_飞书机器人自动推送怎么设置

③、开通事件权限
添加某些事件后,还需要开通权限,可以按需开通

【飞书机器人】自动执行后台任务并通知_飞书机器人自动推送怎么设置

2、回调配置
①、订阅方式与事件的订阅方式相似,不再累述了
②、添加回调
回调的可选内容比较少,我选择的时卡片回传交互,可以在用户点击卡片按钮时请求后端,另外使用卡片回调时,后端的提示可以通过前端toast进行提示,比较友好

return {\'toast\': {\'type\': \'success\', \'content\': \'验证码已发送\'}}

【飞书机器人】自动执行后台任务并通知_飞书机器人自动推送怎么设置

3、加密策略:用于加密事件或回调的请求内容,校验请求来源。当订阅方式为“将事件发送至开发者服务器”或“将回调发送至开发者服务器”时生效
我并没有使用(主要是调试会比较麻烦),想用的话见官方文档:
配置 Encrypt Key

四、版本管理与发布

侧边栏选择-应用发布-版本管理与发布
如果修改的内容需要通过发布才能生效,页面会有提示:

【飞书机器人】自动执行后台任务并通知_飞书机器人自动推送怎么设置

点击创建本进入发布内容编辑页面(或者通过创建版本按钮进入)
该页面主要编辑版本的修改内容,以及可用范围,涉及可用范围变更时,可能需要系统管理员进行审核

【飞书机器人】自动执行后台任务并通知_飞书机器人自动推送怎么设置

为了避免这种麻烦的流程,我只把可用范围定位了本人所在的部门,这样就避免了的审核。
发布成功后,飞书的开发小助手会提示发布成功。

【飞书机器人】自动执行后台任务并通知_飞书机器人自动推送怎么设置

五、飞书卡片

飞书卡片是交互的重要媒介,飞书提供了飞书卡片搭建工具,方便快速制作卡片
飞书卡片搭建工具

【飞书机器人】自动执行后台任务并通知_飞书机器人自动推送怎么设置
注意一下,已发布的卡片就不能删除了,我只能写“废弃”了

以我的卡片描述下具体功能:

1、左侧工具栏添加容器(不添加容器,直接使用文本框等,可能会有问题),我这边使用了表单容器,用于提交手机号、验证码。一共用到了2个表单容器,一个提交手机号,一个提交验证码。

【飞书机器人】自动执行后台任务并通知_飞书机器人自动推送怎么设置

2、设置输入框的属性,主要是一些前端的显示属性

【飞书机器人】自动执行后台任务并通知_飞书机器人自动推送怎么设置

输入框不支持事件哦,点击事件,进去后发现添加不了事件。

3、设置提交按钮的属性值:
其中表单标识符后续需要用到,用于辨识是哪个按钮提交的数据

button_name = callback_body[\'event\'][\'action\'][\'name\']

【飞书机器人】自动执行后台任务并通知_飞书机器人自动推送怎么设置

4、设置提交按钮的事件
点击创建事件,可以设置点击时的具体动作,我们这里选择请求回调,前端填写的手机号将被赋值到input_mobile这个变量上(变量可以在事件处直接定义,也可以在外面\"X\"标记处进行定义)

submit_data = callback_body[\'event\'][\'action\'].get(\'form_value\')if button_name == \'button_get_code\': if submit_data: mobile = submit_data.get(\'input_mobile\') or \'\'

【飞书机器人】自动执行后台任务并通知_飞书机器人自动推送怎么设置

5、同理设置短信验证码的输入框与提交框,我这边需要把已经获取到的手机号传入卡片,一起呈现在前端,需要把变量值和卡片一起发送。
【飞书机器人】自动执行后台任务并通知_飞书机器人自动推送怎么设置

return await feishu_send_msg_internal( receive_id_type = \'open_id\', send_msg = SendMessageRequestModel(  receive_id = open_id,  msg_type = \'interactive\',  content = get_format_card(\'AAqjQxxxxxx\', exist_mobile = exist_mobile) ), redis_pool = redis_pool )
# 返回格式化的卡片字符串def get_format_card(template_id: str, **template_variable) -> str: json_data = { \'type\': \'template\', \'data\': { \'template_id\': template_id, \'template_variable\': template_variable } } return json.dumps(json_data, ensure_ascii = False)

6、其他类型的前端组件传入变量也是相同操作

【飞书机器人】自动执行后台任务并通知_飞书机器人自动推送怎么设置

【飞书机器人】自动执行后台任务并通知_飞书机器人自动推送怎么设置

return await feishu_send_msg_internal( receive_id_type = \'open_id\', send_msg = SendMessageRequestModel(  receive_id = open_id,  msg_type = \'interactive\',  content = get_format_card(\'AAqjQZxxxxxx\', query_status_array = [  {\'run_time\': morning_time, \'run_status\': status_dict[morning_status]},  {\'run_time\': afternoon_time, \'run_status\': status_dict[afternoon_status]},  ]) ), redis_pool = redis_pool )

7、循环容器的使用
主要是生成数据列表,使用方法如下:
【飞书机器人】自动执行后台任务并通知_飞书机器人自动推送怎么设置
【飞书机器人】自动执行后台任务并通知_飞书机器人自动推送怎么设置

在变量设置内设置好模拟数据,即可在卡片编辑页面,看到预览样式
【飞书机器人】自动执行后台任务并通知_飞书机器人自动推送怎么设置
文本内容也可以设置样式,加粗,颜色,等等
【飞书机器人】自动执行后台任务并通知_飞书机器人自动推送怎么设置

后端组织好数据后,通过发送消息接口发给用户

 # 考试列表字典 task_list = request_info.get(\'task_list\', {}).get(\'list\') if task_list: # 飞书卡片内得字段值与响应体得字段值对照 key_mapping = { \'title\': \'name\', \'create_user_name\': \'avatar\', \'makeup_count\': \'count\', \'score\': \'score\', \'total_score\': \'total_score\', \'begin_time\': \'start_time\', \'end_time\': \'end_time\', \'task_id\': \'task_id\', \'id\': \'exam_id\' } def convert_value(key, value): if key in [\'begin_time\', \'end_time\']:  return datetime.datetime.fromtimestamp(int(value) / 1000).strftime(\'%Y-%m-%d %H:%M:%S\') return value exam_list = [{key_mapping.get(old_key, old_key): convert_value(old_key, value) for old_key, value in item.items() if old_key in key_mapping.keys()} for item in task_list] # 发送飞书卡片 send_message(open_id, is_text = False, template_id = \'AAqSDQnjaoppy\', exam_list = exam_list)

7、设计完成后,可以把卡片发送给自己,看下具体UI效果

【飞书机器人】自动执行后台任务并通知_飞书机器人自动推送怎么设置
8、全部完成后即可以发布了,发布前需要绑定之前添加的机器人

【飞书机器人】自动执行后台任务并通知_飞书机器人自动推送怎么设置

9、后续使用卡片时,使用卡片的唯一ID即可(如何发送消息后面讲)

【飞书机器人】自动执行后台任务并通知_飞书机器人自动推送怎么设置

六、发送飞书消息

我使用的飞书消息为私聊消息,群组消息,以及卡片消息,都是通过飞书提供的发送消息接口发送的。
发送消息

其实也没什么好多说的,就是调用飞书提供的接口,发送各种消息,以下是我的代码:

async def feishu_send_msg_internal( redis_pool: aioredis.Redis, receive_id_type: ReceiveIdType, send_msg: SendMessageRequestModel ) -> dict: try: access_token = await get_access_token_internal(redis_pool) except Exception as e: raise Exception(f\'获取tenant_access_token失败:{e}\') url = \'https://open.feishu.cn/open-apis/im/v1/messages\' headers = { \'Content-Type\': \'application/json\', \'Authorization\': f\'Bearer {access_token}\' } params = {\"receive_id_type\": receive_id_type} json_data = send_msg.model_dump() try: response = await fetch(method = \'POST\', url = url, headers = headers, params = params, data = json_data) try: if response[\'code\'] == 0: return {\'code\': 0, \'msg\': \'发送成功\'} else: raise Exception(f\'发送飞书消息失败:{response}\') except KeyError as e: raise Exception(f\'解析飞书返回数据失败,{e}:{response}\') except Exception as e: raise Exception(f\'请求发送飞书消息接口失败:{e}\')

以下是使用飞书sdk的发送消息的代码,并做了一定修改:

# 发送个人即时消息def send_message( receive_id: str, is_text: bool = True, is_image: bool = False, image_key: str = None, is_personal: bool = True, universal_text: str = None, target: Union[List[str], str] = None, template_id: str = None, client: Client = callback_client, max_retries: int = 3, retry_interval: int = 2, **template_variable ): \"\"\" 飞书发送消息 :param receive_id:接收者ID :param is_text:是否为文本消息,不是则为卡片消息 :param is_image: 新增发送图片消息参数 :param image_key: 图片的key,通过create_image方法上传图片后获取 :param is_personal:是否发给个人,不是则发给群组 :param universal_text:发送的文本消息内容 :param target:发送文本消息时需要@的用户/用户列表 :param template_id:飞书卡片模板ID :param client:飞书sdk客户端 :param max_retries:请求最大重试次数 :param retry_interval:重试间隔时间 :param template_variable:卡片模板变量 :return: \"\"\" request: CreateMessageRequest = CreateMessageRequest.builder() \\ .receive_id_type(\'open_id\' if is_personal else \'chat_id\') \\ .request_body( CreateMessageRequestBody.builder() .receive_id(receive_id) .msg_type((\'text\' if is_text else \'interactive\') if not is_image else \'image\') .content((get_format_text(universal_text, target) if is_text else get_format_card(template_id, **template_variable)) if not is_image else get_format_image(image_key)) .build() ) \\ .build() for retry_count in range(max_retries): try: # 发起请求 response: CreateMessageResponse = client.im.v1.message.create(request) if response.success(): return response.data.message_id else: lark.logger.error(  f\"client.im.v1.message.create failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}, resp: \\n{json.dumps(json.loads(response.raw.content), indent = 4, ensure_ascii = False)}\"  ) return None except Exception as e: if retry_count < max_retries - 1: time.sleep(retry_interval) continue else: lark.logger.error(f\"send message failed after {max_retries} retries, error: {e}\") return None# 返回格式化的text字符串def get_format_text(universal_text: str, target: Union[List[str], str] = None) -> str: if isinstance(target, list): full_str = \'\'.join([f\'<at user_id=\"{open_id}\">\' for open_id in target]) return json.dumps({\'text\': universal_text + full_str}, ensure_ascii = False) elif isinstance(target, str): return json.dumps({\'text\': universal_text + f\'<at user_id=\"{target}\">\'}, ensure_ascii = False) else: return json.dumps({\'text\': universal_text}, ensure_ascii = False)# 返回格式化的卡片字符串def get_format_card(template_id: str, need_str: bool = True, **template_variable) -> Union[str, dict]: json_data = { \'type\': \'template\', \'data\': { \'template_id\': template_id, \'template_variable\': template_variable } } if need_str: return json.dumps(json_data, ensure_ascii = False) else: return json_data# 返回格式化的图片字符串def get_format_image(image_key: str) -> str: return json.dumps({\'image_key\': image_key}, ensure_ascii = False)

七、关于长连接,发送至开发者服务器的详细说明

1、发送至开发者服务器:
之前提到过,这个借助于开发者自己部署的服务,去和飞书服务器进行交互,我这边使用的是fastapi。
下面是项目启动的main.py

import asyncioimport timeimport uvicornfrom apscheduler.schedulers.asyncio import AsyncIOSchedulerfrom fastapi import FastAPI, Requestfrom fastapi.staticfiles import StaticFilesfrom fastapi.middleware.cors import CORSMiddlewarefrom apis import receive_code_router, send_code_router, trigger_login_router, remove_bind_routerfrom backgroundTask import background_task, gen_today_timesfrom caches.cache import create_pool, close_poolfrom feishu.feishuAccessToken import feishu_access_token_routerfrom feishu.feishuCallback import feishu_callback_routerfrom feishu.feishuEvent import feishu_event_routerfrom feishu.feishuGetGroup import feishu_get_group_routerfrom feishu.feishuSendMsg import feishu_send_msg_routerfrom feishu.feishuGetGroupMember import feishu_get_group_member_routerfrom feishu.feishuSubscribe import feishu_subscribe_router# openApi文档设置app = FastAPI( title = \'GXT Auto Login API Docs\', description = \'自动登录使用的接口,使用的fastApi的后端接口文档\', version = \'1.0.0\', docs_url = \'/docs\', redoc_url = \'/redoc\' )# 挂载static文件app.mount( path = \'/static\', app = StaticFiles(directory = \'static\'), name = \'static\' )# 初始化后台任务调度器scheduler: AsyncIOScheduler = AsyncIOScheduler()@app.on_event(\'startup\')async def startup_event(): # 项目启动时创建redis连接池 create_pool_task = asyncio.create_task(create_pool()) await asyncio.wait([create_pool_task]) redis_pool = create_pool_task.result() # 设置使用代理初始值 await redis_pool.set(\'use_proxy\', 1) # 爬取代理地址的网站,代理地址可用性太差,没几个能用的,放弃使用代理池了 # scheduler.add_job(proxy_pool_task, \'cron\', hour = 22, minute = 0, second = 0, args = [redis_pool]) # 启动时先添加生成第二天任务运行时间的任务 scheduler.add_job(gen_today_times, \'cron\', hour = 1, minute = 0, second = 0, args = [redis_pool]) # 添加后台任务,3分钟执行一次,任务内部判断是否需要执行 scheduler.add_job(background_task, \'cron\', minute = \'*/3\', args = [redis_pool]) scheduler.start()@app.on_event(\'shutdown\')async def shutdown_event(): await close_pool() scheduler.shutdown()# 设置CORS跨域app.add_middleware( CORSMiddleware, allow_origins = [\"*\"], allow_credentials = True, allow_methods = [\"GET\", \"POST\"], allow_headers = [\"*\"], )# 挂载routerapp.include_router(feishu_event_router, prefix = \'\', tags = [\'飞书接口\'])app.include_router(feishu_callback_router, prefix = \'\', tags = [\'飞书接口\'])if __name__ == \'__main__\': uvicorn.run( app = \'main:app\', host = \'0.0.0.0\', port = 16666, # reload = True )

2、飞书长连接
长连接的模式就类似于一个服务,启动服务后,再调用相关sdk接口,即可进行交互,自己无需再启动其他的后端服务,如django,flask等。当然启动长连接服务时,我有考虑使用异步,但是目前的sdk并不支持,所以其性能犹未可知。
下面是长连接的服务器启动代码:

import concurrent.futuresfrom functools import partialimport lark_oapi as larkfrom apscheduler.schedulers.background import BackgroundSchedulerfrom caches.cache import create_pool, close_poolfrom caches.cacheConfig import *from feishu.feishuConfig import APP_ID, APP_SECRETfrom feishu.feishuEvent import do_p2_im_message_receive_v1, do_p2_im_chat_member_bot_added_v1, do_p2_im_chat_member_user_added_v1, do_chat_create, do_card_action_triggerfrom timedTask.autoLogin import auto_login_taskfrom timedTask.genTodayTimes import gen_today_timesfrom timedTask.syncRedisData import sync_redis_dataif __name__ == \'__main__\': # 创建redis连接池 redis_client = create_pool() # 删除原有的代理设置,恢复成默认状态 redis_client.delete(USE_PROXY_KEY) redis_client.delete(USE_TIMED_TASK_KEY) # 由于需要在事件中传入redis_client,所以需要使用partial函数进行封装,同时也不能分文件写,需要在main.py中定义,统一管理 do_p2_im_message_receive_v1_with_redis = partial(do_p2_im_message_receive_v1, redis_client) do_card_action_trigger_with_redis = partial(do_card_action_trigger, redis_client) # 注册飞书事件 # https://open.feishu.cn/document/uAjLw4CM/ukTMukTMukTM/server-side-sdk/python--sdk/handle-events event_handler = lark.EventDispatcherHandler.builder(\"\", \"\") \\ .register_p2_im_message_receive_v1(do_p2_im_message_receive_v1_with_redis) \\ .register_p2_im_chat_member_bot_added_v1(do_p2_im_chat_member_bot_added_v1) \\ .register_p2_im_chat_member_user_added_v1(do_p2_im_chat_member_user_added_v1) \\ .register_p1_customized_event(\'p2p_chat_create\', do_chat_create) \\ .register_p2_card_action_trigger(do_card_action_trigger_with_redis) \\ .build() # 创建飞书事件客户端 event_client = lark.ws.Client( APP_ID, APP_SECRET, event_handler = event_handler, log_level = lark.LogLevel.DEBUG ) # 创建线程池 executor = concurrent.futures.ThreadPoolExecutor(max_workers = 30) def start_client(): event_client.start() # 启动定时任务 scheduler = BackgroundScheduler() scheduler.add_job(sync_redis_data, \'cron\', hour = 3, minute = 0, second = 0) scheduler.add_job(gen_today_times, \'cron\', hour = 1, minute = 0, second = 0, args = [redis_client]) scheduler.add_job(auto_login_task, \'interval\', minutes = 3, args = [redis_client]) scheduler.start() # 启动事件监听 future = executor.submit(start_client) try: future.result() except KeyboardInterrupt: executor.shutdown() scheduler.shutdown() close_pool()

八、关于标题提到的定时任务

在上面的启动代码中已经写到了,使用apscheduler可以实现后台任务的执行,其支持各种定时任务,也兼容异步框架,也不再累述了。