项目实战第一讲:如何优雅地记录操作日志
操作日志几乎存在于每个系统中,而这些系统都有记录操作日志的一套 API。操作日志和系统日志不一样,操作日志必须要做到简单易懂。所以如何让操作日志不跟业务逻辑耦合,如何让操作日志的内容易于理解,如何让操作日志的接入更加简单?上面这些都是本文要回答的问题。本文是项目实战的第一讲:我们将围绕着如何“优雅”地记录操作日志展开描述。
文章目录
-
-
- 1、项目背景
- 2、主要技术
- 3、项目职责
- 4、项目实现
- 5、项目结果
-
1、项目背景
项目背景:后台类目增加操作日志,在配置在出现问题时,追溯原因
面临的挑战:
①数据量大
- 做好异步限流处理
2、主要技术
SSM+Maven+Springboot(2.0.3.RELEASE)+Git+Dubbo+RocketMQ+Redis+Apollo+MQ
- ①主要用maven构建项目模块和管理项目,控制jar包版本;
- ②用Dubbo+zookeeper搭建分布式SOA架构;
- ③使用MQ解耦业务逻辑和记录日志;
3、项目职责
- ①使用切面编程,将DAO层创建、变更接口加上切入点,执行操作;
- ②为了防止后发的消息先消费,导致日志记录乱序,使用了顺序消息发送变更信息;
- ③字段diff,新旧数据比对得出多个修改记录;
- ④使用Apollo配置控制开关是否打开
4、项目实现
可选方案:
方案1:使用canal监听,流程图如下所示
- 注意:只能处理数据表中单行数据,如果有批量数据合并落日志系统的需求,该方案并不合适
方案2:使用AOP切面,流程如下所示 - 由于批量的数据变更合并成一条日志没法通过canal实现,因此采用切面编程的方式
日志记录流程图:
表结构:
CREATE TABLE `zcy_operator_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `biz_type` varchar(20) NOT NULL COMMENT '业务类别', `biz_id` bigint(20) DEFAULT '0' COMMENT '业务ID', `sub_biz_type` varchar(100) NULL COMMENT '子业务类别', `remark` longtext DEFAULT '' COMMENT '操作内容', `status` tinyint(1) DEFAULT '1' COMMENT '状态,1-生效,-3-删除', `operate_type` tinyint(1) DEFAULT NULL COMMENT '1 create 新增; 2 update 编辑; 3 offLine 下架;4 freeze 冻结; 5 unfreeze 解冻;6 delete 删除; 7 online上架; 8 open启用;9 close 停用', `config_id` int(11) DEFAULT '1' COMMENT '配置id', `operator_name` varchar(100) NOT NULL COMMENT '创建人名称', `creator_id` bigint(20) DEFAULT NULL COMMENT '创建人id', `created_at` datetime DEFAULT NULL COMMENT '创建时间', `update_id` bigint(20) DEFAULT NULL COMMENT '更新人id', `updated_at` datetime DEFAULT NULL COMMENT '更新时间', `r_add_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '数据行创建时间', `r_modified_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '数据行最后修改时间', PRIMARY KEY (`id`), KEY `idx_biz_id (`biz_id`), KEY `idx_config_Id` (`config_id`), KEY `idx_biz_type` (`biz_type`) KEY `idx_addTime` (`r_add_time`), KEY `idx_modifiedTime` (`r_modified_time`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='操作日志';
整体流程图如下所示:
使用AOP记录日志的Demo:
// 步骤1:在DAO层加注解 @LogRecord(paramName = "groups", type = BizTypeEnum.CATEGORY_ATTRIBUTE_GROUP, paramType = LogRecordParamTypeEnum.DOMAIN_BEAN, operateType = OperateTypeEnum.CREATE)public Integer creates(List<CategoryAttributeGroup> groups) { return super.creates(groups);}// 步骤2:切面@Aspect@Slf4j@Componentpublic class LogRecordAspect { / * 旧值,放在ThreadLocal里面,线程安全 */ private static final ThreadLocal<String> OLD_VALUE = new ThreadLocal<>(); / * 注入实例对象,Spring的自动装配, * 获取Map的value的类型,查找匹配该类型的所有bean * key为 beanName * value为 bean实例 */ @Resource private Map<String, OperateLogUpdateService> operateLogUpdateServiceMap; / * 前置处理逻辑 * 查询旧值并返回 * * @param point 切入点 * @param logRecord 日志注解 */ @Before("@annotation(logRecord)") public void recordLogBefore(JoinPoint point, LogRecord logRecord) { log.info("方法开始执行完成,aop查询旧数据,处理service:{}", logRecord.type().getBeanName()); try { OperateLogUpdateService operateLogUpdateService = operateLogUpdateServiceMap.get(logRecord.type().getBeanName()); if (operateLogUpdateService == null) { //不支持该业务类别 log.warn("不支持该业务类别, BizType : {}", logRecord.type()); } //前置逻辑:查询旧数据,对应一些操作(创建),无需查询旧值,对于更新操作,需要旧值 List<OperateTypeEnum> noNeedOldValue = OperateTypeEnum.getNoNeedOldValue(); if (!noNeedOldValue.contains(logRecord.operateType())) { //从切入点获取数据 Map<String, Object> paramValue = LogOperateUtils.getNameAndValue(point); //返回值为json格式 String oldValue = Objects.nonNull(operateLogUpdateService) ? operateLogUpdateService.getBizData(paramValue.get(logRecord.paramName()), logRecord.operateType(), logRecord.paramType()) : null; OLD_VALUE.set(oldValue); } } catch (Exception e) { log.warn("[日志系统]切面前置处理逻辑抛错了,业务类别:{},cause:{}", logRecord.type(), e); } } / * 后置操作 * 加上开关,如果开关关闭,不走后置处理逻辑 * AfterReturning 返回通知方法 连接点方法成功执行后,返回通知方法才会执行,如果连接点方法出现异常,则返回通知方法不执行 * After 后置通知方法 后置方法在连接点方法完成之后执行,无论连接点方法执行成功还是出现异常,都将执行后置方法 * @param point 切入点 * @param logRecord 日志注解 */ @AfterReturning("@annotation(logRecord)") public void recordLogAfter(JoinPoint point, LogRecord logRecord) { log.info("方法开始执行完成,aop查询新数据,处理service:{}", logRecord.type().getBeanName()); try { //后置更新,插入日志 OperationRecordLogDto operationRecordLogDto = new OperationRecordLogDto(); //通过该参数知道是哪条业务线,确定父业务和子业务 if (BizTypeEnum.getParentBizType().contains(logRecord.type().getCode())) { operationRecordLogDto.setBizType(logRecord.type().getCode()); operationRecordLogDto.setSubBizType(null); } else if (BizTypeEnum.getChildBizType().contains(logRecord.type().getCode())) { operationRecordLogDto.setBizType(logRecord.type().pCode()); operationRecordLogDto.setSubBizType(logRecord.type().getCode()); } else { throw new ServiceException("[日志系统]该业务未定义,请检查!"); } //操作类型,由注解获得 operationRecordLogDto.setOperateType(logRecord.operateType()); //从切入点中获取 Map<String, Object> paramValue = LogOperateUtils.getNameAndValue(point); operationRecordLogDto.setOldValue(OLD_VALUE.get()); //configid由joinPoint传入,如果没有,则为1L operationRecordLogDto.setConfigId(1L); OperateLogUpdateService operateLogUpdateService = operateLogUpdateServiceMap.get(logRecord.type().getBeanName()); //各条业务线自定义插入日志的逻辑 operateLogUpdateService.insertLog(paramValue.get(logRecord.paramName()), operationRecordLogDto, logRecord.paramType()); } catch (Exception e) { log.warn("[日志系统]切面后置处理逻辑抛错了,业务类别:{},cause:{}", logRecord.type(), e); } finally { OLD_VALUE.remove(); } }}// 步骤3 发顺序消息和接受消息ZMQResponse zmqResponse = zmqTemplate.sendOrderMsg(MqConfigContant.ITEM_STANDARD_LOG_RECORD_TOPIC, MqConfigContant.ITEM_STANDARD_LOG_RECORD_TAG, null, message, shardingKey);private void consumeLogRecord(String message) { //message为二进制 LogRecordMq logRecordmq = JSON.parseObject(message, LogRecordMq.class); log.info("创建日志信息:{}", JSON.toJSONString(logRecordmq)); if (Objects.isNull(logRecordmq)) { log.warn("log info not found"); return; } //参数类型 LogRecordParamTypeEnum typeEnum = logRecordmq.getTypeEnum(); //业务类别 BizTypeEnum bizType = logRecordmq.getBizType(); //操作详情 OperationRecordLogDto detail = logRecordmq.getValue(); //调用子类 OperateLogUpdateService operateLogUpdateService = operateLogUpdateServiceMap.get(bizType.getBeanName()); operateLogUpdateService.logOperate(detail, bizType, typeEnum);}// 步骤4 记录日志/ * 日志操作 怎么操作由实现类自行定义 * 提供给切面后置处理逻辑使用 * @param paramValue 日志参数 * @param paramType 业务类型 * @param typeEnum 参数类型 */void logOperate(OperationRecordLogDto paramValue, BizTypeEnum paramType, LogRecordParamTypeEnum typeEnum);
问题1、如何优雅地查看操作日志?
- 对于业务类别,定义了父子类型,方面汇总日志
/ * code码,会落库 */ private String code; / * 父级业务 */ private String pCode;
问题2、需要跟旧数据比对得出多个修改记录,该如何优雅的搞定?(支持字段级别对比)
- Object 对象多字段 diff 的功能,可以生成多字段修改的操作日志文案
Demo
/ * 比较两个对象的差异值 * @param first: 第一个对象 * @param second: 第一个对象 * @param excludeFields: 不需要比较的字段 * @param bothExistFieldOnly: 是否仅比较共有的字段 * @return */ public static List<FieldCompareInfo> getBeanDiffFields(Object first, Object second, List<String> excludeFields, boolean bothExistFieldOnly) { return FIELD_BASE_EQUATOR.getDiffFields(first, second, excludeFields, bothExistFieldOnly); }public List<FieldCompareInfo> getDiffFields(Object first, Object second, List<String> excludeFields, boolean bothExistFieldOnly) { // 1. 同一个对象 if (first == second) { return Lists.newArrayList(); } // 2. 是否简单数据类型 if (isSimpleField(first, second)) { return compareSimpleField(first, second); } Set<String> allFieldNames; // 获取所有字段 Map<String, Field> firstFields = getAllFields(first); Map<String, Field> secondFields = getAllFields(second); // 到此处,first和second不会同时为null if (first == null) { allFieldNames = secondFields.keySet(); } else if (second == null) { allFieldNames = firstFields.keySet(); } else { allFieldNames = getAllFieldNames(firstFields.keySet(), secondFields.keySet(),bothExistFieldOnly); } List<FieldCompareInfo> diffFields = Lists.newArrayList(); for (String fieldName : allFieldNames) { try { Field firstField = firstFields.getOrDefault(fieldName, null); Field secondField = secondFields.getOrDefault(fieldName, null); Object firstVal = null; Class<?> firstType = null; Class<?> secondType = null; Object secondVal = null; if (firstField != null) { firstField.setAccessible(true); firstVal = firstField.get(first); firstType = firstField.getType(); } if (secondField != null) { secondField.setAccessible(true); secondVal = secondField.get(second); secondType = secondField.getType(); } FieldCompareInfo fieldInfo = new FieldCompareInfo(fieldName, firstType, secondType,firstVal,secondVal, FieldCompareInfo.calculateChangeType(firstVal , secondVal)); if (!isFieldEquals(fieldInfo,excludeFields)) { diffFields.add(fieldInfo); } } catch (IllegalAccessException e) { throw new IllegalStateException("获取属性进行比对发生异常: " + fieldName, e); } } return diffFields; }
- 对比JsonPath工具
问题 3、有什么办法保证和主线程的上下文继承,除了手动传参这种侵入性太强的方法以外?
- 使用阿里 TransmittableThreadLocal
- 可以参考这篇文章 todo
问题4、消息的全链路追踪?
- traceId 消息增强的好处
@Configuration@ComponentScan(basePackages = { "cn.gov.zcy.service.component.mq", "cn.gov.zcy.common.mq"})@Slf4jpublic class MqConfig implements ApplicationRunner, ApplicationContextAware { @Autowired private ZmqConsumerManager consumerManager; private ApplicationContext applicationContext; / * Callback used to run the bean. * * @param args incoming application arguments * @throws Exception on error */ @Override public void run(ApplicationArguments args) throws Exception { log.info("[ZMQ 启动消费者加载]"); Map<String, Object> consumerMap = applicationContext.getBeansWithAnnotation(Consumer.class); Map<String, MessageCallback> callbackMap = Maps.newHashMap(); consumerMap.forEach((beanName, callback) -> { try { MessageCallback messageCallback = (MessageCallback) callback; Consumer consumer = ClassUtil.findClassAnnotation(callback,Consumer.class); if (Objects.nonNull(consumer)) { callbackMap.put(consumer.id(), ImConsumerEnhanceUtil.enhanceAll(consumer.id(), messageCallback)); }else{ throw new IllegalStateException("not get Consumer annotation from bean:"+beanName); } } catch (Exception e) { log.error("[ZMQ 启动消费者加载失败], cid: {}, callback: {}", beanName, callback, e); throw e; } }); log.info("[ZMQ 启动消费者加载成功]消费者集合为:{}", callbackMap); consumerManager.listenMessage(callbackMap); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; }}// 增强:trace、消费时延public static MessageCallback enhanceAll(String cid, MessageCallback messageCallback) { //多次增强,后面/外面的先执行 return ImTraceEnhanceUtil.wrapper(consumeDelay(cid, messageCallback));} / * 包装,使其具有消费延时通知能力 * @param cid * @param messageCallback * @return 增强后的 */public static MessageCallback consumeDelay(String cid, MessageCallback messageCallback) { return msg -> { //记录消息消费时延 Long delayLimitMs = getDelayLimitMs(cid); long consumeDelay; if (delayLimitMs != null && msg.getBornTimestamp() > 0L && (consumeDelay = (System.currentTimeMillis() - msg.getBornTimestamp())) > delayLimitMs) { log.info("消息消费延时 cid:{} , delay:{} ms", cid, consumeDelay); } return messageCallback.messageArrived(msg); };}private static Long getDelayLimitMs(String cid) { //格式:cid:ms String json = ApolloConfigUtil.getString(RECORD_CONSUME_DELAY); try { if (!StringUtils.hasText(json)) { return null; } else { Map<String, Long> cidDelayMap = ImJsonUtils.jsonToStrToLongMap(json); return cidDelayMap != null ? cidDelayMap.get(cid) : null; } } catch (Exception e) { log.error("getDelayLimitMs error", e); } return null;}/ * 包装,使其具有trace能力 * @param messageCallback * @return 增强后的 */public static MessageCallback wrapper(MessageCallback messageCallback) { return msg -> { //消息的traceId(从消息userProperties中传递而来)只放在了MDC,此处从MDC获取 //cn.gov.zcy.zmq.util.ZMQMixAll.convertMsgFromMQ2ZMQ(org.apache.rocketmq.common.message.MessageExt) String traceId = MDC.get("traceId"); //考虑到发多个消息,此处追加消息id traceId = traceId + "||msg" + msg.getMsgId(); TraceIdUtil.initTraceId(traceId); //消息消费时,初始没有span。不需要赋值,后续会使用新的traceId生成 try { return messageCallback.messageArrived(msg); } finally { TraceIdUtil.clearTraceId(); TRACER.removeParentSpan(); } };}
Action1:在这个项目中的收获?
- ①创新点:使用AOP记录操作日志,解耦业务逻辑与记录日志
- ②拓展性:符合面向对象编程,拓展性良好
Action2:可以优化的地方?
- ①在消费中,处理日志落库时,有很多定制化逻辑,可以考虑后续拓展;
Action3:为什么使用AOP记录操作日志,而不是其他方案?
方案1、使用 Canal 监听数据库增量日志来记录操作日志
- 方式:监听数据库Binlog
- 优点:这种方式的优点是和业务逻辑完全分离,对dbChange也能监听
- 缺点:如果调用其他团队RPC接口修改对方业务,没法监听对方数据库; 没法聚合日志
方案2 通过日志文件MDC的方式记录(使用场景:traceId增强)
- 方式:使用Slf4J的MDC工具类(MDC 可以看成是一个与当前线程绑定的哈希表,可以往其中添加键值对)
- 如何使用:在用户的拦截器中把用户的标识Put到MDC中
@Componentpublic class UserInterceptor extends HandlerInterceptorAdapter { @Override public boolean preHandle(HttpServletRequest request,HttpServletResponse response, Object handler) throws Exception { // 获取到用户标识 String userNo = getUserNo(request); // 把用户 ID 放到 MDC 上下文中 MDC.put("userId", userNo); return super.preHandle(request, response, handler); } private String getUserNo(HttpServletRequest request) { // 通过 SSO 或者 Cookie 或者 Auth 信息获取到 当前登陆的用户信息 return null; }}
把 userId 格式化到日志中,使用 %X{userId} 可以取到 MDC 中用户标识
”%d{yyyy-MM-dd HH:mm:ss.SSS} %t %-5level %X{userId} %logger{30}.%method:%L - %msg%n”
方案3、通过 LogUtil 的方式记录日志
LogUtil.log(orderNo, " 订单创建 ", " 小明 ") 模板LogUtil.log(orderNo, " 订单创建,订单号 "+"NO.11089999", " 小明 ");String template = " 用户 %s 修改了订单的配送地址:从 "%s" 修改到 "%s"";LogUtil.log(orderNo, String.format(tempalte, " 小明 ", " 金灿灿小区 ", " 银盏盏小区 "), " 小明 ");
缺点:记录日志代码耦合在业务代码中,违背了单一职责原则,影响了可读性和可维护性。
优点 | 缺点 | 适用场景 | |
---|---|---|---|
切面编程 |
开发较为简单 对业务无侵入 |
1、先执行切面,日志落库,然后执行业务逻辑,批量任务执行时耗时长 2、直接操作数据库时,日志没法落库 |
批量操作 |
canal监听 |
监听数据表,只要有数据变更,都可落库日志 对业务无侵入,速度快 |
不支持批量操作 监听粒度为字段,后续如需字段拓展需修改代码 |
单个操作 |
public class LogRecordContext { private static final InheritableThreadLocal<Stack<Map<String, Object>>> variableMapStack = new InheritableThreadLocal<>(); // 其他省略 ....}
Action4、对于实例类目勾选太多的场景
- 处理方法:截取前500条数据,后面使用“。。。”展示
Action5、日志落库的逻辑处理
- 1、切面前置处理逻辑:读取旧数据
- 使用 ThreadLocal 保存数据
- 2、执行切面业务逻辑
- 3、执行后置处理逻辑:
- 发消息通知日志系统执行操作,收到消息后,先进行数据比对,有数据变更,执行落库逻辑
Action7、为什么将某些切入点放置在service层?
- 如果放在rest接口上,rest接口的入参千奇百怪,不好做切面;
- 如果放在dao层,可能一次请求会调用n次dao层,造成不必要的性能开销,数据也不准确例如:后台类目创建接口:会调用update更新父类目 调用create来创建当前类目。
5、项目结果
上线时间:2021年3月份上线,共计1年时间,日志数据为10万级别
积极主动地做事,这样无论工作还是生活,都会上升一个台阶