> 文档中心 > 【xxl-job源码篇】xxl-job的10种路由策略源码解读

【xxl-job源码篇】xxl-job的10种路由策略源码解读

文章目录

    • 📖导读
    • ➡️执行流程
    • 🚩路由策略
      • 第一个`ExecutorRouteFirst`
      • 最后一个`ExecutorRouteLast`
      • 轮询`ExecutorRouteRound`
      • 随机`ExecutorRouteRandom`
      • 一致性HASH`ExecutorRouteConsistentHash`
      • 最不经常使用`ExecutorRouteLFU`
      • 最久未使用`ExecutorRouteLRU`
      • 故障转移`ExecutorRouteFailover`
      • 忙碌转移`ExecutorRouteBusyover`
    • ✅结语

📖导读

为了保证任务稳定执行,xxl-job支持注册多个executor到注册中心,以保证任务能够稳定的执行,那么这些executor会以怎样的策略去执行呢,本章将从源码层面去解析xxl-job的策略的执行原理。

xxl-job为我们提供了如下策略

  • 第一个
  • 最后一个
  • 轮询
  • 随机
  • 一致性HASH
  • 最不经常使用
  • 最久未使用
  • 故障转移
  • 忙碌转移
  • 分片执行

以上策略可以帮助我们在各种场景下都能应对自如

➡️执行流程

策略的执行流程如下

  1. 执行任务
  2. 选择策略
  3. 通过策略选举出要执行的服务端地址
  4. 通过http调度executor去执行任务

首先我们定位到com.xxl.job.admin.core.thread.JobTriggerPoolHelper

该类就是负责任务执行的helper,它本身是饿汉单例模式,当初始化时会调用toStart方法,通过单例实例调用了start方法,我们看一下该方法源码

public void start(){    // 快执行线程池    fastTriggerPool = new ThreadPoolExecutor(     10,     XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),     60L,     TimeUnit.SECONDS,     new LinkedBlockingQueue<Runnable>(1000),     new ThreadFactory() {  @Override  public Thread newThread(Runnable r) {      return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());  }     });    // 慢执行线程池    slowTriggerPool = new ThreadPoolExecutor(     10,     XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),     60L,     TimeUnit.SECONDS,     new LinkedBlockingQueue<Runnable>(2000),     new ThreadFactory() {  @Override  public Thread newThread(Runnable r) {      return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());  }     });}

可以看到此处初始化了一个快线程池,一个慢线程池,为什么要用两个线程池呢,我们先看addTrigger方法

public void addTrigger(final int jobId,  final TriggerTypeEnum triggerType,  final int failRetryCount,  final String executorShardingParam,  final String executorParam,  final String addressList) {    // 默认使用fastTriggerPool    ThreadPoolExecutor triggerPool_ = fastTriggerPool;    AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);    // 如果发现任务一分钟内有大于10次的慢执行,换slowTriggerPool线程池    if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { triggerPool_ = slowTriggerPool;    }    // 线程池执行    triggerPool_.execute(new Runnable() { @Override public void run() {     long start = System.currentTimeMillis();     try {  // 触发  XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);     } catch (Exception e) {  logger.error(e.getMessage(), e);     } finally {  // 到达下一个周期则清理上一个周期数据  long minTim_now = System.currentTimeMillis()/60000;  if (minTim != minTim_now) {      minTim = minTim_now;      jobTimeoutCountMap.clear();  }  // 记录慢任务执行次数  long cost = System.currentTimeMillis()-start;  if (cost > 500) {      // 执行时间超过500毫秒,则认定为慢任务      AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));      if (timeoutCount != null) {   // 记录慢任务慢的次数   timeoutCount.incrementAndGet();      }  }     } }    });}

执行任务时,首先判断这个任务是否是个慢任务,如果是个慢任务且慢执行的次数超过了10次将会使用slowTriggerPool慢线程池,它的统计周期为60秒,这里是个优化点,当有大量的任务被执行时,为了防止任务被阻塞,尽可能的会先让执行快的任务优先执行

我们顺着进入到XxlJobTrigger.trigger方法

public static void trigger(int jobId,   // 任务id      TriggerTypeEnum triggerType, // 执行来源      int failRetryCount,  // 失败重试次数      String executorShardingParam,    // 分片广播参数      String executorParam,    // 执行入参      String addressList) {    // 可用执行器的地址,用逗号分割    // 获得任务对象    XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);    if (jobInfo == null) { logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId); return;    }    // 设置任务入参    if (executorParam != null) { jobInfo.setExecutorParam(executorParam);    }    int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();    // 获得执行器    XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());    // 录入执行器地址    if (addressList!=null && addressList.trim().length()>0) { group.setAddressType(1); group.setAddressList(addressList.trim());    }    // 分片广播的逻辑    int[] shardingParam = null;    if (executorShardingParam!=null){ String[] shardingArr = executorShardingParam.split("/"); if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {     shardingParam = new int[2];     shardingParam[0] = Integer.valueOf(shardingArr[0]);     shardingParam[1] = Integer.valueOf(shardingArr[1]); }    }    // 如果是分片广播则特殊处理    if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)     && group.getRegistryList()!=null && !group.getRegistryList().isEmpty()     && shardingParam==null) { for (int i = 0; i < group.getRegistryList().size(); i++) {     // 分片广播会通知每一个执行器     processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size()); }    } else { if (shardingParam == null) {     shardingParam = new int[]{0, 1}; } // 其他执行策略 processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);    }}

可以看到分片广播是做了特殊处理的,进入processTrigger后就可以看到正式的调度了

private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){    // 并行还是串行    ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy    // 路由执行策略    ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy    // 分片广播参数    String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;    // 存储任务执行日志    XxlJobLog jobLog = new XxlJobLog();    jobLog.setJobGroup(jobInfo.getJobGroup());    jobLog.setJobId(jobInfo.getId());    jobLog.setTriggerTime(new Date());    XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);    logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());    // 初始化请求参数    TriggerParam triggerParam = new TriggerParam();    triggerParam.setJobId(jobInfo.getId());    triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());    triggerParam.setExecutorParams(jobInfo.getExecutorParam());    triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());    triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());    triggerParam.setLogId(jobLog.getId());    triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());    triggerParam.setGlueType(jobInfo.getGlueType());    triggerParam.setGlueSource(jobInfo.getGlueSource());    triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());    triggerParam.setBroadcastIndex(index);    triggerParam.setBroadcastTotal(total);    // 决策路由执行地址    String address = null;    ReturnT<String> routeAddressResult = null;    if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) { if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {     // 分片广播逻辑     if (index < group.getRegistryList().size()) {  address = group.getRegistryList().get(index);     } else {  address = group.getRegistryList().get(0);     } } else {     // 通过我们指定的策略选择地址     routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());     if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {  address = routeAddressResult.getContent();     } }    } else { routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));    }    // 触发远程执行    ReturnT<String> triggerResult = null;    if (address != null) { triggerResult = runExecutor(triggerParam, address);    } else { triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);    }    // 日志信息拼接    StringBuffer triggerMsgSb = new StringBuffer();    triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());    triggerMsgSb.append("
"
).append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp()); triggerMsgSb.append("
"
).append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":") .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") ); triggerMsgSb.append("
"
).append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList()); triggerMsgSb.append("
"
).append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle()); if (shardingParam != null) { triggerMsgSb.append("("+shardingParam+")"); } triggerMsgSb.append("
"
).append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle()); triggerMsgSb.append("
"
).append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout()); triggerMsgSb.append("
"
).append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount); triggerMsgSb.append("

>>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<<
"
) .append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"

"
:"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():""); // 存储任务执行日志信息 jobLog.setExecutorAddress(address); jobLog.setExecutorHandler(jobInfo.getExecutorHandler()); jobLog.setExecutorParam(jobInfo.getExecutorParam()); jobLog.setExecutorShardingParam(shardingParam); jobLog.setExecutorFailRetryCount(finalFailRetryCount); //jobLog.setTriggerTime(); jobLog.setTriggerCode(triggerResult.getCode()); jobLog.setTriggerMsg(triggerMsgSb.toString()); XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog); logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());}

此处逻辑包含请求地址的决策,执行记录的存储,远程调度

🚩路由策略

通过以上代码我们可以看到决策最终调度的是ExecutorRouteStrategyEnum中的router下的route方法,route方法传入了当前可用的所有executor地址和TriggerParam请求参数。

public enum ExecutorRouteStrategyEnum {    FIRST(I18nUtil.getString("jobconf_route_first"), new ExecutorRouteFirst()),    LAST(I18nUtil.getString("jobconf_route_last"), new ExecutorRouteLast()),    ROUND(I18nUtil.getString("jobconf_route_round"), new ExecutorRouteRound()),    RANDOM(I18nUtil.getString("jobconf_route_random"), new ExecutorRouteRandom()),    CONSISTENT_HASH(I18nUtil.getString("jobconf_route_consistenthash"), new ExecutorRouteConsistentHash()),    LEAST_FREQUENTLY_USED(I18nUtil.getString("jobconf_route_lfu"), new ExecutorRouteLFU()),    LEAST_RECENTLY_USED(I18nUtil.getString("jobconf_route_lru"), new ExecutorRouteLRU()),    FAILOVER(I18nUtil.getString("jobconf_route_failover"), new ExecutorRouteFailover()),    BUSYOVER(I18nUtil.getString("jobconf_route_busyover"), new ExecutorRouteBusyover()),    SHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"), null);    ExecutorRouteStrategyEnum(String title, ExecutorRouter router) { this.title = title; this.router = router;    }    // 策略名    private String title;    // executor抽象类    private ExecutorRouter router;    public String getTitle() { return title;    }    public ExecutorRouter getRouter() { return router;    }    public static ExecutorRouteStrategyEnum match(String name, ExecutorRouteStrategyEnum defaultItem){ if (name != null) {     for (ExecutorRouteStrategyEnum item: ExecutorRouteStrategyEnum.values()) {  if (item.name().equals(name)) {      return item;  }     } } return defaultItem;    }}

可以看到这是一个enum类,提供了ExecutorRouter抽象类,如果需要实现新的策略,只需要继承该类,实现route方法并增加一个新的范型即可

接下来我们通过源码解读一下各个策略分别是如何实现的

第一个ExecutorRouteFirst

public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList){    return new ReturnT<String>(addressList.get(0));}

最后一个ExecutorRouteLast

public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {    return new ReturnT<String>(addressList.get(addressList.size()-1));}

轮询ExecutorRouteRound

轮询并非是从第一个开始,而是随机选择开始的位置,每次通过自增后取模来定位到下一个地址,为了防止integer无限增大,每24小时会清除一次位置信息,重新随机定位。

private static ConcurrentMap<Integer, AtomicInteger> routeCountEachJob = new ConcurrentHashMap<>();private static long CACHE_VALID_TIME = 0;private static int count(int jobId) {    if (System.currentTimeMillis() > CACHE_VALID_TIME) { // 缓存超时清除所有任务的位置 routeCountEachJob.clear(); // 缓存24小时 CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;    }    // 获得任务的随机数    AtomicInteger count = routeCountEachJob.get(jobId);    if (count == null || count.get() > 1000000) { // 初始化时主动Random一次,缓解首次压力 count = new AtomicInteger(new Random().nextInt(100));    } else { // 获得下一个任务 count.addAndGet(1);    }    routeCountEachJob.put(jobId, count);    return count.get();}@Overridepublic ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {    // 通过取模来定位到下一个执行的地址    String address = addressList.get(count(triggerParam.getJobId())%addressList.size());    return new ReturnT<String>(address);}

随机ExecutorRouteRandom

很简单,就是随机数取一个

private static Random localRandom = new Random();@Overridepublic ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {    String address = addressList.get(localRandom.nextInt(addressList.size()));    return new ReturnT<String>(address);}

一致性HASHExecutorRouteConsistentHash

为了保证任务能够均匀的分散在各个机器上,采用了一致性hash算法,并预设了100个虚拟节点,使地址能够尽量均匀分布

public String hashJob(int jobId, List<String> addressList) {    // ------A1------A2-------A3------    // -----------J1------------------    // 使用treemap使之有序    TreeMap<Long, String> addressRing = new TreeMap<Long, String>();    // 遍历所有地址    for (String address: addressList) { // 生成100个虚拟节点 for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {     long addressHash = hash("SHARD-" + address + "-NODE-" + i);     addressRing.put(addressHash, address); }    }    // hash节点位置    long jobHash = hash(String.valueOf(jobId));    // 获取到在hash环中的位置    SortedMap<Long, String> lastRing = addressRing.tailMap(jobHash);    if (!lastRing.isEmpty()) { // 如果不在hash环最后面则拿到下一个最近的节点 return lastRing.get(lastRing.firstKey());    }    // 如果在hash环最后的位置则取环中第一个节点    return addressRing.firstEntry().getValue();}@Overridepublic ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {    String address = hashJob(triggerParam.getJobId(), addressList);    return new ReturnT<String>(address);}

最不经常使用ExecutorRouteLFU

原理是维护了一个以任务id为单位的地址计数器,当第一次进入时,不知道谁使用最少,以随机的形式先给各个地址初始化一个数,最大的计数器值不超过地址总量。

// 外层key为jobId,value-key为地址,value-value为计数器private static ConcurrentMap<Integer, HashMap<String, Integer>> jobLfuMap = new ConcurrentHashMap<Integer, HashMap<String, Integer>>();private static long CACHE_VALID_TIME = 0;public String route(int jobId, List<String> addressList) {    if (System.currentTimeMillis() > CACHE_VALID_TIME) { // 超过一天后重置缓存 jobLfuMap.clear(); // 缓存周期为一天 CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;    }    // 初始化结构    HashMap<String, Integer> lfuItemMap = jobLfuMap.get(jobId);     // Key排序可以用TreeMap+构造入参Compare;Value排序暂时只能通过ArrayList;    if (lfuItemMap == null) { lfuItemMap = new HashMap<String, Integer>(); jobLfuMap.putIfAbsent(jobId, lfuItemMap);   // 避免重复覆盖    }    for (String address: addressList) { if (!lfuItemMap.containsKey(address) || lfuItemMap.get(address) >1000000 ) {     lfuItemMap.put(address, new Random().nextInt(addressList.size()));  // 初始化时主动Random一次,缓解首次压力 }    }    // 移除无效的地址    List<String> delKeys = new ArrayList<>();    for (String existKey: lfuItemMap.keySet()) { if (!addressList.contains(existKey)) {     delKeys.add(existKey); }    }    if (delKeys.size() > 0) { for (String delKey: delKeys) {     lfuItemMap.remove(delKey); }    }    // 根据value进行排序    List<Map.Entry<String, Integer>> lfuItemList = new ArrayList<Map.Entry<String, Integer>>(lfuItemMap.entrySet());    Collections.sort(lfuItemList, new Comparator<Map.Entry<String, Integer>>() { @Override public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {     return o1.getValue().compareTo(o2.getValue()); }    });    // 第0个就是使用最少的    Map.Entry<String, Integer> addressItem = lfuItemList.get(0);    String minAddress = addressItem.getKey();    // 将本次使用的地址计数器做+1操作    addressItem.setValue(addressItem.getValue() + 1);    return addressItem.getKey();}@Overridepublic ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {    String address = route(triggerParam.getJobId(), addressList);    return new ReturnT<String>(address);}

最久未使用ExecutorRouteLRU

维护了一个以任务id为单位的map,kv都是地址,实现原理是利用了LinkedHashMap存储排序的特性。

accessOrder:true=访问顺序排序(get/put时排序);false=插入顺序排期;

// key=jobId,value-key=address,value-value=addressprivate static ConcurrentMap<Integer, LinkedHashMap<String, String>> jobLRUMap = new ConcurrentHashMap<Integer, LinkedHashMap<String, String>>();private static long CACHE_VALID_TIME = 0;public String route(int jobId, List<String> addressList) {    // 每24小时清除一次缓存    if (System.currentTimeMillis() > CACHE_VALID_TIME) { jobLRUMap.clear(); CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;    }    // init lru    LinkedHashMap<String, String> lruItem = jobLRUMap.get(jobId);    if (lruItem == null) { lruItem = new LinkedHashMap<String, String>(16, 0.75f, true); jobLRUMap.putIfAbsent(jobId, lruItem);    }    // 初始化地址kv都是地址    for (String address: addressList) { if (!lruItem.containsKey(address)) {     lruItem.put(address, address); }    }    // 移除无效的地址    List<String> delKeys = new ArrayList<>();    for (String existKey: lruItem.keySet()) { if (!addressList.contains(existKey)) {     delKeys.add(existKey); }    }    if (delKeys.size() > 0) { for (String delKey: delKeys) {     lruItem.remove(delKey); }    }    // 直接拿到第一个即可    String eldestKey = lruItem.entrySet().iterator().next().getKey();    String eldestValue = lruItem.get(eldestKey);    return eldestValue;}@Overridepublic ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {    String address = route(triggerParam.getJobId(), addressList);    return new ReturnT<String>(address);}

故障转移ExecutorRouteFailover

当调度的机器出现无法调度的情况时,则切换为另一台机器

实现原理就是通过调用机器的beat接口查看机器的返回状态来判定是否存活,如果不存活则循环下一个继续该步骤,直到找到可用机器或者无可用机器为止

public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {    StringBuffer beatResultSB = new StringBuffer();    for (String address : addressList) { // beat ReturnT<String> beatResult = null; try {     ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);     // 通过调度接口的形式检查机器是否存活     beatResult = executorBiz.beat(); } catch (Exception e) {     logger.error(e.getMessage(), e);     beatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e ); } // 记录检查日志 beatResultSB.append( (beatResultSB.length()>0)?"

"
:"") .append(I18nUtil.getString("jobconf_beat") + ":") .append("
address:"
).append(address) .append("
code:"
).append(beatResult.getCode()) .append("
msg:"
).append(beatResult.getMsg()); if (beatResult.getCode() == ReturnT.SUCCESS_CODE) { // 机器正常时则返回该机器地址 beatResult.setMsg(beatResultSB.toString()); beatResult.setContent(address); return beatResult; } } // 没有可用的存活机器 return new ReturnT<String>(ReturnT.FAIL_CODE, beatResultSB.toString());}

忙碌转移ExecutorRouteBusyover

当机器的excutor处于忙碌的状态时,则转移至不忙碌的机器

实现原理就是通过调用机器的idleBeat接口查看机器的返回状态来判定是否忙碌,如果处于忙碌或不可用状态则循环下一个继续该步骤,直到找到空闲且可用的机器或者没有可用机器为止

public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {    StringBuffer idleBeatResultSB = new StringBuffer();    for (String address : addressList) { ReturnT<String> idleBeatResult = null; try {     ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);     // 调度接口查看是否忙碌     idleBeatResult = executorBiz.idleBeat(new IdleBeatParam(triggerParam.getJobId())); } catch (Exception e) {     logger.error(e.getMessage(), e);     idleBeatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e ); } idleBeatResultSB.append( (idleBeatResultSB.length()>0)?"

"
:"") .append(I18nUtil.getString("jobconf_idleBeat") + ":") .append("
address:"
).append(address) .append("
code:"
).append(idleBeatResult.getCode()) .append("
msg:"
).append(idleBeatResult.getMsg()); if (idleBeatResult.getCode() == ReturnT.SUCCESS_CODE) { idleBeatResult.setMsg(idleBeatResultSB.toString()); idleBeatResult.setContent(address); return idleBeatResult; } }// 没有可用的存活且空闲的机器 return new ReturnT<String>(ReturnT.FAIL_CODE, idleBeatResultSB.toString());}

✅结语

通过本章我们了解了xxl-job的任务执行流程,以及全部路由策略的实现原理

【xxl-job源码篇01】xxl-job源码解读 神奇的时间轮 触发流程解读

【xxl-job源码篇02】注册中心 自研RPC netty的应用?

【xxl-job源码篇03】xxl-job日志系统源码解读