> 技术文档 > Flowable 高级扩展:自定义元素与性能优化实战

Flowable 高级扩展:自定义元素与性能优化实战

在前五篇文章中,我们从基础概念、流程设计、API 实战、Spring Boot 集成,到外部系统协同,逐步构建了 Flowable 的应用体系。但企业级复杂场景中,原生功能往往难以满足定制化需求 —— 比如需要特殊的审批规则网关、与决策引擎联动实现动态路由,或是在高并发场景下优化流程引擎性能。本文将聚焦 Flowable 的高级扩展能力,详解如何自定义流程元素、集成规则引擎,并掌握大型系统中的性能调优策略。

一、自定义流程元素:打造专属流程组件

Flowable 的扩展性体现在其对 BPMN 2.0 规范的灵活实现,允许我们通过自定义解析器和行为处理器,创建满足业务需求的专属流程元素。

1.1 自定义任务类型:动态审批任务

以 \"多级审批任务\" 为例,实现一个支持自动计算审批链的自定义任务:

步骤 1:定义自定义任务的 XML 标签

在 BPMN 文件中使用自定义扩展标签:

<definitions xmlns=\"http://www.omg.org/spec/BPMN/20100524/MODEL\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:custom=\"http://flowable.org/custom\"  targetNamespace=\"http://flowable.org/processdef\">         10000}\"/> <custom:approvalLevel rule=\"DEPT_MANAGER\" condition=\"${amount       
步骤 2:实现自定义解析器

解析自定义标签并转化为流程行为:

// 自定义扩展属性类@Datapublic class ApprovalLevelExtension { private String rule; // 审批人规则(如\"DEPT_MANAGER,FINANCE_MANAGER\") private String condition; // 条件表达式}// 自定义解析器public class CustomUserTaskParser extends UserTaskParser { @Override protected void parseExtensionElements(Element element, ScopeImpl scope, BpmnModel bpmnModel) { super.parseExtensionElements(element, scope, bpmnModel); // 解析自定义审批级别标签 List approvalLevelElements = element.elements(\"approvalLevel\"); if (approvalLevelElements != null && !approvalLevelElements.isEmpty()) { List extensions = new ArrayList(); for (Element approvalElement : approvalLevelElements) { ApprovalLevelExtension extension = new ApprovalLevelExtension(); extension.setRule(approvalElement.attributeValue(\"rule\")); extension.setCondition(approvalElement.attributeValue(\"condition\")); extensions.add(extension); } // 存储到流程定义的扩展属性中 scope.setVariable(\"approvalLevels\", extensions); } }}// 注册自定义解析器@Configurationpublic class CustomParserConfig { @Bean public EngineConfigurationConfigurer customParserConfigurer() { return configuration -> { // 获取BPMN解析器 BpmnParser bpmnParser = configuration.getBpmnParser(); // 替换用户任务解析器为自定义实现 Map parseHandlers = bpmnParser.getBpmnParseHandlers(); parseHandlers.put(\"userTask\", new CustomUserTaskParseHandler()); configuration.setBpmnParser(bpmnParser); }; }}
步骤 3:实现自定义任务行为
public class MultiLevelApprovalBehavior extends UserTaskActivityBehavior { private List approvalLevels; public MultiLevelApprovalBehavior(UserTask userTask, List approvalLevels) { super(userTask); this.approvalLevels = approvalLevels; } @Override public void execute(DelegateExecution execution) { // 1. 计算符合条件的审批级别 List approverRoles = new ArrayList(); for (ApprovalLevelExtension level : approvalLevels) { // 评估条件表达式 boolean conditionMet = evaluateCondition(level.getCondition(), execution); if (conditionMet) { // 解析审批角色(如\"DEPT_MANAGER,FINANCE_MANAGER\") String[] roles = level.getRule().split(\",\"); approverRoles.addAll(Arrays.asList(roles)); } } // 2. 从角色获取实际审批人(从组织架构服务查询) List approvers = new ArrayList(); for (String role : approverRoles) { List users = organizationService.getUsersByRole(role); approvers.addAll(users); } // 3. 创建多实例任务(依次审批) if (!approvers.isEmpty()) { execution.setVariable(\"approvers\", approvers); execution.setVariable(\"currentApproverIndex\", 0); // 创建第一个审批任务 createApprovalTask(execution, approvers.get(0)); } else { // 无审批人时直接推进流程 leave(execution); } } // 创建审批任务 private void createApprovalTask(DelegateExecution execution, String assignee) { TaskEntity task = TaskEntity.createAndInsert(); task.setName(userTask.getName() + \"(审批人:\" + assignee + \")\"); task.setAssignee(assignee); task.setExecutionId(execution.getId()); task.setProcessInstanceId(execution.getProcessInstanceId()); task.setTaskDefinitionKey(userTask.getId()); task.setCreateTime(new Date()); } // 评估条件表达式 private boolean evaluateCondition(String condition, DelegateExecution execution) { if (StringUtils.isEmpty(condition)) { return true; // 无条件时默认满足 } ExpressionManager expressionManager = Context.getProcessEngineConfiguration().getExpressionManager(); Expression expression = expressionManager.createExpression(condition); Object result = expression.getValue(execution); return result instanceof Boolean && (Boolean) result; } // 处理审批完成后的逻辑(需在任务完成监听器中调用) public void onApprovalComplete(DelegateExecution execution) { List approvers = (List) execution.getVariable(\"approvers\"); Integer currentIndex = (Integer) execution.getVariable(\"currentApproverIndex\"); // 检查是否有下一个审批人 if (currentIndex + 1 < approvers.size()) { // 创建下一个审批任务 int nextIndex = currentIndex + 1; execution.setVariable(\"currentApproverIndex\", nextIndex); createApprovalTask(execution, approvers.get(nextIndex)); } else { // 所有审批人已完成,推进流程 leave(execution); } }}

1.2 自定义网关:规则驱动的路由

实现基于规则引擎的动态网关,根据业务规则自动选择流程路径:

public class RuleBasedGateway extends GatewayActivityBehavior { @Autowired private DecisionService decisionService; // 规则引擎服务 @Override public void execute(DelegateExecution execution) { // 1. 准备决策参数 Map decisionInput = new HashMap(); decisionInput.put(\"orderAmount\", execution.getVariable(\"amount\")); decisionInput.put(\"customerLevel\", execution.getVariable(\"customerLevel\")); decisionInput.put(\"orderType\", execution.getVariable(\"orderType\")); // 2. 调用规则引擎获取路由结果(如返回\"expressDelivery\"或\"standardDelivery\") String route = decisionService.evaluate(\"orderDeliveryRoute\", decisionInput); // 3. 选择对应的流出线 String sequenceFlowId = \"flow_\" + route; // 假设线ID与规则结果对应 SequenceFlow selectedFlow = null; for (SequenceFlow flow : gateway.getOutgoingFlows()) { if (flow.getId().equals(sequenceFlowId)) { selectedFlow = flow; break; } } // 4. 推进流程 if (selectedFlow != null) { execution.setVariable(\"selectedRoute\", route); leave(execution, selectedFlow); } else { throw new FlowableException(\"未找到匹配的路由:\" + route); } }}

二、Flowable 与规则引擎 DMN 集成

DMN(决策模型和 notation)是流程引擎的最佳拍档,适合处理复杂的业务规则判断(如风控规则、定价策略)。

2.1 DMN 决策表设计

在 Flowable Modeler 中设计决策表(orderPricing.dmn):

订单金额

客户等级

折扣率

>10000

VIP

0.8

>10000

普通

0.9

<=10000

VIP

0.9

<=10000

普通

1.0

2.2 在流程中调用 DMN 决策

public class PricingServiceTask implements JavaDelegate { @Autowired private DecisionService decisionService; @Override public void execute(DelegateExecution execution) { // 准备决策输入 Map variables = new HashMap(); variables.put(\"orderAmount\", execution.getVariable(\"amount\")); variables.put(\"customerLevel\", execution.getVariable(\"customerLevel\")); // 调用DMN决策表 DmnDecisionResult result = decisionService.evaluateDecisionByKey( \"orderPricing\", // 决策表ID variables ); // 提取决策结果 if (result != null && !result.isEmpty()) { DmnDecisionResultItem item = result.getFirstResult(); BigDecimal discount = (BigDecimal) item.get(\"discountRate\"); // 计算最终价格 BigDecimal amount = (BigDecimal) execution.getVariable(\"amount\"); BigDecimal finalPrice = amount.multiply(discount); // 设置流程变量 execution.setVariable(\"discountRate\", discount); execution.setVariable(\"finalPrice\", finalPrice); log.info(\"订单定价完成:原价{},客户等级{},折扣{},最终价格{}\", amount, variables.get(\"customerLevel\"), discount, finalPrice); } }}

2.3 动态规则与流程的联动

实现规则变更无需重启流程引擎:

@Servicepublic class DynamicDecisionService { @Autowired private RepositoryService repositoryService; @Autowired private DmnRepositoryService dmnRepositoryService; // 部署新的决策表版本 public void deployNewDecisionVersion(MultipartFile dmnFile) { try { // 部署新决策表 DmnDeployment deployment = dmnRepositoryService.createDeployment() .addInputStream(\"dynamic-decision.dmn\", dmnFile.getInputStream()) .deploy(); // 激活新决策表,同时停用旧版本 List definitions = dmnRepositoryService .createDmnDecisionDefinitionQuery() .deploymentId(deployment.getId()) .list(); for (DmnDecisionDefinition definition : definitions) { // 激活新版本 dmnRepositoryService.activateDmnDecisionDefinitionById(definition.getId()); // 停用旧版本 List oldVersions = dmnRepositoryService  .createDmnDecisionDefinitionQuery()  .decisionDefinitionKey(definition.getKey())  .decisionDefinitionIdNotEquals(definition.getId())  .list(); oldVersions.forEach(old ->  dmnRepositoryService.suspendDmnDecisionDefinitionById(old.getId()) ); } } catch (Exception e) { log.error(\"部署决策表失败\", e); throw new RuntimeException(\"决策表部署失败\", e); } }}

三、流程引擎监控与运维

大型系统中,Flowable 的监控与运维至关重要,需实时掌握引擎状态并快速排查问题。

3.1 核心指标收集

通过 Spring Boot Actuator 暴露 Flowable 监控指标:

@Componentpublic class FlowableMetricsCollector implements MeterBinder { @Autowired private RuntimeService runtimeService; @Autowired private HistoryService historyService; @Autowired private ManagementService managementService; @Override public void bindTo(MeterRegistry registry) { // 1. 流程实例指标 Gauge.builder(\"flowable.process.instances.running\",  () -> runtimeService.createProcessInstanceQuery().active().count()) .description(\"运行中的流程实例数量\") .register(registry); Gauge.builder(\"flowable.process.instances.completed\", () -> historyService.createHistoricProcessInstanceQuery().finished().count()) .description(\"已完成的流程实例数量\") .register(registry); // 2. 任务指标 Gauge.builder(\"flowable.tasks.pending\", () -> managementService.createTaskQuery().active().count()) .description(\"待处理任务数量\") .register(registry); // 3. 流程定义指标 Gauge.builder(\"flowable.process.definitions\", () -> repositoryService.createProcessDefinitionQuery().count()) .description(\"流程定义总数\") .register(registry); // 4. 按流程定义分组的指标 Map processInstanceCounts = runtimeService.createProcessInstanceQuery() .groupByProcessDefinitionKey() .countByProcessDefinitionKey(); processInstanceCounts.forEach((processKey, count) -> Gauge.builder(\"flowable.process.instances.by.key\", () -> count) .tag(\"processKey\", processKey) .register(registry) ); }}

3.2 告警机制实现

基于监控指标设置告警阈值:

@Componentpublic class FlowableAlertManager { @Autowired private MeterRegistry meterRegistry; @Autowired private AlertService alertService; @Scheduled(fixedRate = 60000) // 每分钟检查一次 public void checkAlertThresholds() { // 1. 检查待办任务数量(超过1000时告警) checkMetricThreshold( \"flowable.tasks.pending\", 1000.0, \"待办任务数量超过阈值\", \"当前待办任务数量:%s,阈值:1000\" ); // 2. 检查流程实例运行时间(超过24小时的实例) checkLongRunningProcesses(); // 3. 检查数据库连接池状态 checkDbConnectionPool(); } // 检查指标阈值 private void checkMetricThreshold(String metricName, double threshold, String title, String messageFormat) { Meter meter = meterRegistry.find(metricName).gauge(); if (meter != null) { double value = (double) meter.measure().iterator().next().getValue(); if (value > threshold) { alertService.sendAlert(  title,  String.format(messageFormat, value) ); } } } // 检查长时间运行的流程实例 private void checkLongRunningProcesses() { // 超过24小时的流程实例 Date twentyFourHoursAgo = new Date(System.currentTimeMillis() - 24 * 60 * 60 * 1000); List longRunning = runtimeService.createProcessInstanceQuery() .startedBefore(twentyFourHoursAgo) .active() .listPage(0, 10); if (!longRunning.isEmpty()) { StringBuilder message = new StringBuilder(); message.append(\"发现\").append(longRunning.size()).append(\"个运行超过24小时的流程实例:\\n\"); longRunning.forEach(instance ->  message.append(\"- 流程ID:\").append(instance.getId())  .append(\",定义:\").append(instance.getProcessDefinitionKey())  .append(\",启动时间:\").append(instance.getStartTime()).append(\"\\n\") ); alertService.sendAlert(\"长时间运行的流程实例告警\", message.toString()); } } // 检查数据库连接池 private void checkDbConnectionPool() { // 实现数据库连接池状态检查逻辑 // ... }}

四、大型系统中的 Flowable 性能调优

当流程实例达到十万甚至百万级时,性能优化成为核心挑战。

4.1 数据库优化策略

分表策略(针对历史表)

Flowable 的历史表(ACT_HI_*)数据量增长最快,可采用分表策略:

// 自定义历史数据管理器,实现分表逻辑public class ShardingHistoryManager extends DefaultHistoryManager { @Autowired private ShardingJdbcTemplate shardingJdbcTemplate; // 分库分表JDBC模板 @Override public void recordActivityEnd(DelegateExecution execution, String activityId, String activityName) { // 按月份分表(表名格式:ACT_HI_ACTINST_202407) String tableName = \"ACT_HI_ACTINST_\" + new SimpleDateFormat(\"yyyyMM\").format(new Date()); // 检查分表是否存在,不存在则创建 ensureTableExists(tableName); // 插入数据到对应分表 HistoricActivityInstanceEntity activityInstance = createHistoricActivityInstance(execution, activityId, activityName); shardingJdbcTemplate.update( \"INSERT INTO \" + tableName + \" (ID_, PROC_INST_ID_, ...) VALUES (?, ?, ...)\", activityInstance.getId(), activityInstance.getProcessInstanceId(), // ... 其他字段 ); } // 确保分表存在 private void ensureTableExists(String tableName) { // 检查表是否存在,不存在则创建(DDL语句需与原表结构一致) // ... }}// 配置自定义历史管理器@Configurationpublic class ShardingConfig { @Bean public EngineConfigurationConfigurer shardingConfigurer() { return configuration -> { configuration.setHistoryManager(new ShardingHistoryManager()); }; }}
索引优化

为高频查询字段添加索引:

-- 流程实例查询索引CREATE INDEX IDX_PROCINST_BUSKEY ON ACT_RU_EXECUTION(BUSINESS_KEY_);CREATE INDEX IDX_PROCINST_PROCDEF ON ACT_RU_EXECUTION(PROC_DEF_ID_);-- 任务查询索引CREATE INDEX IDX_TASK_ASSIGNEE ON ACT_RU_TASK(ASSIGNEE_);CREATE INDEX IDX_TASK_PROCINST ON ACT_RU_TASK(PROC_INST_ID_);CREATE INDEX IDX_TASK_CREATE_TIME ON ACT_RU_TASK(CREATE_TIME_);-- 历史实例索引CREATE INDEX IDX_HI_PROCINST_END_TIME ON ACT_HI_PROCINST(END_TIME_);CREATE INDEX IDX_HI_PROCINST_START_TIME ON ACT_HI_PROCINST(START_TIME_);

4.2 缓存策略优化

流程定义缓存
@Configurationpublic class FlowableCacheConfig { @Bean public CacheManager flowableCacheManager() { // 配置Caffeine缓存 CaffeineCacheManager cacheManager = new CaffeineCacheManager(); cacheManager.setCaffeine(Caffeine.newBuilder() .expireAfterWrite(1, TimeUnit.HOURS) // 1小时过期 .maximumSize(1000) // 最大缓存1000个流程定义 ); return cacheManager; } @Bean public EngineConfigurationConfigurer cacheConfigurer() { return configuration -> { // 配置流程定义缓存 configuration.setProcessDefinitionCache(flowableCacheManager().getCache(\"processDefinitions\")); // 配置决策表缓存 configuration.setDmnDecisionCache(flowableCacheManager().getCache(\"dmnDecisions\")); // 配置缓存刷新策略 configuration.setCacheManager(flowableCacheManager()); }; }}
流程变量懒加载

避免一次性加载所有流程变量:

// 自定义变量服务,实现懒加载public class LazyVariableService { @Autowired private ManagementService managementService; // 按需加载变量 public Object getVariableLazy(String executionId, String variableName) { // 先查缓存 Object cachedValue = variableCache.get(executionId + \":\" + variableName); if (cachedValue != null) { return cachedValue; } // 缓存未命中,查询数据库 List variables = managementService.createNativeVariableInstanceQuery() .sql(\"SELECT * FROM ACT_RU_VARIABLE WHERE EXECUTION_ID_ = ? AND NAME_ = ?\") .parameter(executionId) .parameter(variableName) .list();  if (!variables.isEmpty()) { Object value = variables.get(0).getValue(); // 放入缓存(10分钟过期) variableCache.put(executionId + \":\" + variableName, value, 10, TimeUnit.MINUTES); return value; } return null; }}

4.3 异步处理优化

将非核心流程操作异步化:

// 异步处理流程归档@Servicepublic class AsyncArchiveService { @Autowired private HistoryService historyService; @Autowired private AsyncTaskExecutor asyncTaskExecutor; // 异步任务执行器 // 异步归档已完成流程 public void asyncArchiveCompletedProcesses() { // 查询30天前已完成的流程 Date cutoffDate = new Date(System.currentTimeMillis() - 30L * 24 * 60 * 60 * 1000); List processInstanceIds = historyService.createHistoricProcessInstanceQuery() .finished() .processInstanceEndTimeBefore(cutoffDate) .list() .stream() .map(HistoricProcessInstance::getId) .collect(Collectors.toList()); // 分批异步归档 for (List batch : Lists.partition(processInstanceIds, 100)) { asyncTaskExecutor.execute(() -> archiveProcessBatch(batch)); } } // 归档一批流程 private void archiveProcessBatch(List processInstanceIds) { // 归档逻辑(如迁移到归档库、删除原表数据) // ... }}

五、实战案例:电商订单流程的高级扩展

以电商平台的订单流程为例,综合应用上述高级特性:

  1. 自定义多级审批任务:订单金额超过 10 万元时,自动触发 \"部门经理→财务→总经理\" 的多级审批
  1. DMN 决策路由:根据客户等级和订单金额,决定物流方式(普通快递 / 加急配送 / VIP 专线)
  1. 分表存储:订单流程的历史数据按季度分表,提高查询性能
  1. 异步归档:完成的订单流程在 24 小时后自动异步归档到历史库
  1. 实时监控:通过 Grafana 监控订单流程的转化率、平均处理时间等指标
     <custom:approvalLevel rule=\"DEPT_MANAGER\" condition=\"${amount   100000 && amount   500000}\"/>                     

六、小结与下一篇预告

本文我们深入探讨了 Flowable 的高级扩展能力,包括:

  • 自定义流程元素(任务、网关)的实现方式
  • 与 DMN 规则引擎的集成,实现动态决策
  • 流程监控与告警机制的搭建
  • 大型系统中的性能优化策略(分表、缓存、异步处理)

这些技术足以支撑千万级流程实例的企业级系统。下一篇文章,我们将聚焦 Flowable 的实战落地经验,包括:

  • 流程引擎的选型决策(何时该用 Flowable,何时应选择轻量方案)
  • 实际项目中的常见坑点与解决方案
  • 流程引擎与低代码平台的结合
  • Flowable 的未来演进与社区生态

如果在高级扩展中遇到技术难题或有创新应用场景,欢迎在评论区分享讨论!