> 技术文档 > 规则分配脚本

规则分配脚本


需求:

1.根据用户编写的要报规则,去mysql库里SysManage_Rule表获取已经启用的规则作为条件(例如[{“field”: “关键词”, “logic”: “AND”, “value”: “阿尔法”, “operator”: “=”,, “assign_user”: “user222”}])条件即为:关键词=阿尔法
2.根据此条件去lxdb的all_report表进行查询,查询逻辑是每10min获取最新数据,满足条件的对all_report表的report_handler字段打上分配人名以及yb_importanceid填上要报规则的id
要报规则页面:
规则分配脚本
SysManage_Rule表
规则分配脚本

解决办法

规则分配脚本(supervisor运行):

import argparseimport osimport djangoimport timefrom datetime import datetime, timedeltaimport sysimport json# 添加项目路径,确保 Django 配置正确sys.path.append(\'/home/rpadmin/web/yb-backend\')# 初始化 Django 环境os.environ.setdefault(\"DJANGO_SETTINGS_MODULE\", \"yb.settings\")django.setup()# 导入自定义模块和参数from DBreport.functions.get_table_mapping import get_gather_tablefrom wrapper.Params import TOREPORT, GATHER_TABLEfrom DBconn.DBfunctions.dynamicQueryBase import DynamicQueryBaseEnginefrom SysManage.models import SysmanageRule# 字段名映射:中文字段名 -> 英文字段名field_mapping = { \"报文要素\": \"text_content\", \"语种\": \"audio_languagename\", \"关键词\": \"keyword\", \"文种\": \"text_lang\", \"实体\": \"text_entity\", \"来源手段\": \"hj_means\"}# 初始记录时间,第一次运行查很早之前的数据last_run_time = datetime.now() - timedelta(minutes=1600000)def replace_field_names(data, mapping): new_data = {} for user, rules in data.items(): new_rules = [] for rule in rules: new_rule = rule.copy() if new_rule.get(\'field\') in mapping: new_rule[\'field\'] = mapping[new_rule[\'field\']] new_rules.append(new_rule) new_data[user] = new_rules return new_datadef run_rule_dispatch(): global last_run_time # 获取数据库连接和数据表对象 db_conn = get_gather_table() gather_table = db_conn.load_table(GATHER_TABLE) print(\"[调试] all_report 字段列表:\", [col.name for col in gather_table.columns]) # 查询所有启用状态的规则 rules = SysmanageRule.objects.filter(rule_status=1) for rule in rules: rule_id = rule.id rule_name = rule.rule_name rule_content = rule.rule_content # 检查规则内容格式是否为列表 if isinstance(rule_content, list): conditions = rule_content else: print(f\"[跳过] 规则《{rule_name}》内容应为列表格式\") continue # 替换条件中的字段名 conditions = replace_field_names({rule_name: conditions}, field_mapping)[rule_name] filters = [] keyword_value = None # 保存关键词值(如果存在) for cond in conditions: field = cond.get(\"field\") operator = cond.get(\"operator\") value = cond.get(\"value\") if cond.get(\"assign_user\"): assign_user = cond[\"assign_user\"] if not field or not operator or value is None: continue # 如果字段为 keyword 且是等于操作,延迟处理 if field == \"keyword\" and operator == \"=\": keyword_value = value else: filters.append((field, operator, value)) # 增加时间过滤:只查上次运行之后新增的数据 filters.append((\'hj_createtime\', \'>=\', last_run_time)) # 关键词特殊处理:模糊匹配 audio_asr 或 trans_sidebyside 的 original_content/ translat_content 字段 if keyword_value is not None: # audio_asr 模糊匹配 filters_audio = filters + [(\'audio_asr\', \'like\', f\"%{keyword_value}%\")] # 执行原始查询(不带关键词)用于后续文本内容筛选 raw_trans_records = db_conn.execute_dynamic_query( gather_table, filters, limit=5, # 仅预览前5条,必要时可调整或移除 ignore_fields=[] ) matched_trans_data = [] for row in raw_trans_records[\'data\']: try:  trans_data = row.get(\'trans_sidebyside\')  if isinstance(trans_data, str): trans_data = json.loads(trans_data) # JSON 反序列化  if isinstance(trans_data, dict): original = trans_data.get(\"original_content\", \"\") translated = trans_data.get(\"translat_content\", \"\") if keyword_value in original or keyword_value in translated: matched_trans_data.append(row) except Exception:  continue # 执行 audio_asr 匹配查询 result1 = db_conn.execute_dynamic_query(gather_table, filters_audio, ignore_fields=[]) # 合并两类结果(根据 lxid 去重) combined_data = {row[\'lxid\']: row for row in result1[\'data\']} for row in matched_trans_data: combined_data.setdefault(row[\'lxid\'], row) total_count = len(combined_data) print(f\"[调试] 条件命中记录数:{total_count}\") if total_count: first_row = list(combined_data.values())[0] print(\"[调试] 命中示例记录:\", first_row) # 构造更新内容 updates = { \'report_handler\': assign_user, \'yb_importanceid\': str(rule_id), \'yb_importancename\': rule_name, \'assign_rule_type\': TOREPORT } # 遍历命中记录逐条更新 affected = 0 for row in combined_data.values(): row_filter = [(\'lxid\', \'=\', row[\'lxid\'])] try:  count = DynamicQueryBaseEngine.update_record(db_conn, gather_table, updates, row_filter)  affected += count except Exception as e:  print(f\"[×] 更新失败:{str(e)}\") print(f\"[✓] 规则《{rule_name}》执行成功,分配人:{assign_user},更新{affected}条\") continue # 正常流程处理:无关键词匹配逻辑 print(f\"[调试] 规则《{rule_name}》筛选条件:{filters}\") result_data = db_conn.execute_dynamic_query( gather_table, filters, limit=5, ignore_fields=[] ) print(f\"[调试] 条件命中记录数:{result_data[\'total_count\']}\") if result_data[\'data\']: print(\"[调试] 命中示例记录:\", result_data[\'data\'][0]) updates = { \'report_handler\': assign_user, \'yb_importanceid\': str(rule_id), \'yb_importancename\': rule_name, \'assign_rule_type\': TOREPORT } try: affected = DynamicQueryBaseEngine.update_record(db_conn, gather_table, updates, filters) print(f\"[✓] 规则《{rule_name}》执行成功,分配人:{assign_user},更新{affected}条\") except Exception as e: print(f\"[×] 规则《{rule_name}》执行失败:{str(e)}\") # 关闭连接,记录当前时间用于下轮过滤 db_conn.close() last_run_time = datetime.now()if __name__ == \"__main__\": while True: print(f\"\\n[调试] 开始执行调度任务,当前时间:{datetime.now().strftime(\'%Y-%m-%d %H:%M:%S\')}\") run_rule_dispatch() print(f\"[调试] 任务执行完成,等待10分钟...\\n\") time.sleep(600) # 每10分钟运行一次

指定分配人接口(即向rule_content字段里添加assign_user):

@class_operation_logger(operation_name=\"/指定要报规则分配人\")class RuleAssignUserUpdateAPIView(APIView): \"\"\" 接收 assign_user 和 规则id,指定规则的 rule_content 中每一项的 assign_user POST /api/sys/update-assign-user/ Body: { \"id\": 2, \"assign_user\": \"user002\" } \"\"\" def post(self, request, *args, **kwargs): rule_id = request.data.get(\"id\") assign_user = request.data.get(\"assign_user\") if not rule_id or not assign_user: return ErrorResponse(data=False, msg=\"参数id 和 assign_user 都是必填的\") # 获取要更新的规则 rule = get_object_or_404(SysmanageRule, pk=rule_id) try: content_list = rule.rule_content or [] if not isinstance(content_list, list): raise ValueError(\"rule_content 必须是列表\") except Exception as e: return ErrorResponse(data=False, msg=f\"读取 rule_content 失败:{str(e)}\") # 创建或更新每一项的 assign_user for cond in content_list: cond[\'assign_user\'] = assign_user # 保存回库 rule.rule_content = content_list rule.save(update_fields=[\'rule_content\']) return SuccessResponse(msg=f\"id为 {rule_id} 的规则的分配人已更新为 {assign_user}\")

Supervisor工具详见文章Supervisor进程管理