【xxl-job源码篇】xxl-job的10种路由策略源码解读
文章目录
-
- 📖导读
- ➡️执行流程
- 🚩路由策略
-
- 第一个`ExecutorRouteFirst`
- 最后一个`ExecutorRouteLast`
- 轮询`ExecutorRouteRound`
- 随机`ExecutorRouteRandom`
- 一致性HASH`ExecutorRouteConsistentHash`
- 最不经常使用`ExecutorRouteLFU`
- 最久未使用`ExecutorRouteLRU`
- 故障转移`ExecutorRouteFailover`
- 忙碌转移`ExecutorRouteBusyover`
- ✅结语
📖导读
为了保证任务稳定执行,xxl-job支持注册多个executor到注册中心,以保证任务能够稳定的执行,那么这些executor会以怎样的策略去执行呢,本章将从源码层面去解析xxl-job的策略的执行原理。
xxl-job为我们提供了如下策略
- 第一个
- 最后一个
- 轮询
- 随机
- 一致性HASH
- 最不经常使用
- 最久未使用
- 故障转移
- 忙碌转移
- 分片执行
以上策略可以帮助我们在各种场景下都能应对自如
➡️执行流程
策略的执行流程如下
- 执行任务
- 选择策略
- 通过策略选举出要执行的服务端地址
- 通过http调度executor去执行任务
首先我们定位到com.xxl.job.admin.core.thread.JobTriggerPoolHelper
类
该类就是负责任务执行的helper,它本身是饿汉单例模式
,当初始化时会调用toStart
方法,通过单例实例调用了start
方法,我们看一下该方法源码
public void start(){ // 快执行线程池 fastTriggerPool = new ThreadPoolExecutor( 10, XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode()); } }); // 慢执行线程池 slowTriggerPool = new ThreadPoolExecutor( 10, XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode()); } });}
可以看到此处初始化了一个快线程池,一个慢线程池,为什么要用两个线程池呢,我们先看addTrigger
方法
public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam, final String executorParam, final String addressList) { // 默认使用fastTriggerPool ThreadPoolExecutor triggerPool_ = fastTriggerPool; AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId); // 如果发现任务一分钟内有大于10次的慢执行,换slowTriggerPool线程池 if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { triggerPool_ = slowTriggerPool; } // 线程池执行 triggerPool_.execute(new Runnable() { @Override public void run() { long start = System.currentTimeMillis(); try { // 触发 XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList); } catch (Exception e) { logger.error(e.getMessage(), e); } finally { // 到达下一个周期则清理上一个周期数据 long minTim_now = System.currentTimeMillis()/60000; if (minTim != minTim_now) { minTim = minTim_now; jobTimeoutCountMap.clear(); } // 记录慢任务执行次数 long cost = System.currentTimeMillis()-start; if (cost > 500) { // 执行时间超过500毫秒,则认定为慢任务 AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1)); if (timeoutCount != null) { // 记录慢任务慢的次数 timeoutCount.incrementAndGet(); } } } } });}
执行任务时,首先判断这个任务是否是个慢任务,如果是个慢任务且慢执行的次数超过了10次将会使用slowTriggerPool
慢线程池,它的统计周期为60秒,这里是个优化点,当有大量的任务被执行时,为了防止任务被阻塞,尽可能的会先让执行快的任务优先执行。
我们顺着进入到XxlJobTrigger.trigger
方法
public static void trigger(int jobId, // 任务id TriggerTypeEnum triggerType, // 执行来源 int failRetryCount, // 失败重试次数 String executorShardingParam, // 分片广播参数 String executorParam, // 执行入参 String addressList) { // 可用执行器的地址,用逗号分割 // 获得任务对象 XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId); if (jobInfo == null) { logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId); return; } // 设置任务入参 if (executorParam != null) { jobInfo.setExecutorParam(executorParam); } int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount(); // 获得执行器 XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup()); // 录入执行器地址 if (addressList!=null && addressList.trim().length()>0) { group.setAddressType(1); group.setAddressList(addressList.trim()); } // 分片广播的逻辑 int[] shardingParam = null; if (executorShardingParam!=null){ String[] shardingArr = executorShardingParam.split("/"); if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) { shardingParam = new int[2]; shardingParam[0] = Integer.valueOf(shardingArr[0]); shardingParam[1] = Integer.valueOf(shardingArr[1]); } } // 如果是分片广播则特殊处理 if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null) && group.getRegistryList()!=null && !group.getRegistryList().isEmpty() && shardingParam==null) { for (int i = 0; i < group.getRegistryList().size(); i++) { // 分片广播会通知每一个执行器 processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size()); } } else { if (shardingParam == null) { shardingParam = new int[]{0, 1}; } // 其他执行策略 processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]); }}
可以看到分片广播是做了特殊处理的,进入processTrigger
后就可以看到正式的调度了
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){ // 并行还是串行 ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy // 路由执行策略 ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy // 分片广播参数 String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null; // 存储任务执行日志 XxlJobLog jobLog = new XxlJobLog(); jobLog.setJobGroup(jobInfo.getJobGroup()); jobLog.setJobId(jobInfo.getId()); jobLog.setTriggerTime(new Date()); XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog); logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId()); // 初始化请求参数 TriggerParam triggerParam = new TriggerParam(); triggerParam.setJobId(jobInfo.getId()); triggerParam.setExecutorHandler(jobInfo.getExecutorHandler()); triggerParam.setExecutorParams(jobInfo.getExecutorParam()); triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy()); triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout()); triggerParam.setLogId(jobLog.getId()); triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime()); triggerParam.setGlueType(jobInfo.getGlueType()); triggerParam.setGlueSource(jobInfo.getGlueSource()); triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime()); triggerParam.setBroadcastIndex(index); triggerParam.setBroadcastTotal(total); // 决策路由执行地址 String address = null; ReturnT<String> routeAddressResult = null; if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) { if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) { // 分片广播逻辑 if (index < group.getRegistryList().size()) { address = group.getRegistryList().get(index); } else { address = group.getRegistryList().get(0); } } else { // 通过我们指定的策略选择地址 routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList()); if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) { address = routeAddressResult.getContent(); } } } else { routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty")); } // 触发远程执行 ReturnT<String> triggerResult = null; if (address != null) { triggerResult = runExecutor(triggerParam, address); } else { triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null); } // 日志信息拼接 StringBuffer triggerMsgSb = new StringBuffer(); triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle()); triggerMsgSb.append("
").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp()); triggerMsgSb.append("
").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":") .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") ); triggerMsgSb.append("
").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList()); triggerMsgSb.append("
").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle()); if (shardingParam != null) { triggerMsgSb.append("("+shardingParam+")"); } triggerMsgSb.append("
").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle()); triggerMsgSb.append("
").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout()); triggerMsgSb.append("
").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount); triggerMsgSb.append("
>>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<<
") .append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"
":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():""); // 存储任务执行日志信息 jobLog.setExecutorAddress(address); jobLog.setExecutorHandler(jobInfo.getExecutorHandler()); jobLog.setExecutorParam(jobInfo.getExecutorParam()); jobLog.setExecutorShardingParam(shardingParam); jobLog.setExecutorFailRetryCount(finalFailRetryCount); //jobLog.setTriggerTime(); jobLog.setTriggerCode(triggerResult.getCode()); jobLog.setTriggerMsg(triggerMsgSb.toString()); XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog); logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());}
此处逻辑包含请求地址的决策,执行记录的存储,远程调度
🚩路由策略
通过以上代码我们可以看到决策最终调度的是ExecutorRouteStrategyEnum
中的router
下的route
方法,route方法传入了当前可用的所有executor地址和TriggerParam请求参数。
public enum ExecutorRouteStrategyEnum { FIRST(I18nUtil.getString("jobconf_route_first"), new ExecutorRouteFirst()), LAST(I18nUtil.getString("jobconf_route_last"), new ExecutorRouteLast()), ROUND(I18nUtil.getString("jobconf_route_round"), new ExecutorRouteRound()), RANDOM(I18nUtil.getString("jobconf_route_random"), new ExecutorRouteRandom()), CONSISTENT_HASH(I18nUtil.getString("jobconf_route_consistenthash"), new ExecutorRouteConsistentHash()), LEAST_FREQUENTLY_USED(I18nUtil.getString("jobconf_route_lfu"), new ExecutorRouteLFU()), LEAST_RECENTLY_USED(I18nUtil.getString("jobconf_route_lru"), new ExecutorRouteLRU()), FAILOVER(I18nUtil.getString("jobconf_route_failover"), new ExecutorRouteFailover()), BUSYOVER(I18nUtil.getString("jobconf_route_busyover"), new ExecutorRouteBusyover()), SHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"), null); ExecutorRouteStrategyEnum(String title, ExecutorRouter router) { this.title = title; this.router = router; } // 策略名 private String title; // executor抽象类 private ExecutorRouter router; public String getTitle() { return title; } public ExecutorRouter getRouter() { return router; } public static ExecutorRouteStrategyEnum match(String name, ExecutorRouteStrategyEnum defaultItem){ if (name != null) { for (ExecutorRouteStrategyEnum item: ExecutorRouteStrategyEnum.values()) { if (item.name().equals(name)) { return item; } } } return defaultItem; }}
可以看到这是一个enum类,提供了ExecutorRouter
抽象类,如果需要实现新的策略,只需要继承该类,实现route
方法并增加一个新的范型即可
接下来我们通过源码解读一下各个策略分别是如何实现的
第一个ExecutorRouteFirst
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList){ return new ReturnT<String>(addressList.get(0));}
最后一个ExecutorRouteLast
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) { return new ReturnT<String>(addressList.get(addressList.size()-1));}
轮询ExecutorRouteRound
轮询并非是从第一个开始,而是随机选择开始的位置,每次通过自增后取模来定位到下一个地址,为了防止integer无限增大,每24小时会清除一次位置信息,重新随机定位。
private static ConcurrentMap<Integer, AtomicInteger> routeCountEachJob = new ConcurrentHashMap<>();private static long CACHE_VALID_TIME = 0;private static int count(int jobId) { if (System.currentTimeMillis() > CACHE_VALID_TIME) { // 缓存超时清除所有任务的位置 routeCountEachJob.clear(); // 缓存24小时 CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24; } // 获得任务的随机数 AtomicInteger count = routeCountEachJob.get(jobId); if (count == null || count.get() > 1000000) { // 初始化时主动Random一次,缓解首次压力 count = new AtomicInteger(new Random().nextInt(100)); } else { // 获得下一个任务 count.addAndGet(1); } routeCountEachJob.put(jobId, count); return count.get();}@Overridepublic ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) { // 通过取模来定位到下一个执行的地址 String address = addressList.get(count(triggerParam.getJobId())%addressList.size()); return new ReturnT<String>(address);}
随机ExecutorRouteRandom
很简单,就是随机数取一个
private static Random localRandom = new Random();@Overridepublic ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) { String address = addressList.get(localRandom.nextInt(addressList.size())); return new ReturnT<String>(address);}
一致性HASHExecutorRouteConsistentHash
为了保证任务能够均匀的分散在各个机器上,采用了一致性hash算法
,并预设了100个虚拟节点,使地址能够尽量均匀分布
public String hashJob(int jobId, List<String> addressList) { // ------A1------A2-------A3------ // -----------J1------------------ // 使用treemap使之有序 TreeMap<Long, String> addressRing = new TreeMap<Long, String>(); // 遍历所有地址 for (String address: addressList) { // 生成100个虚拟节点 for (int i = 0; i < VIRTUAL_NODE_NUM; i++) { long addressHash = hash("SHARD-" + address + "-NODE-" + i); addressRing.put(addressHash, address); } } // hash节点位置 long jobHash = hash(String.valueOf(jobId)); // 获取到在hash环中的位置 SortedMap<Long, String> lastRing = addressRing.tailMap(jobHash); if (!lastRing.isEmpty()) { // 如果不在hash环最后面则拿到下一个最近的节点 return lastRing.get(lastRing.firstKey()); } // 如果在hash环最后的位置则取环中第一个节点 return addressRing.firstEntry().getValue();}@Overridepublic ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) { String address = hashJob(triggerParam.getJobId(), addressList); return new ReturnT<String>(address);}
最不经常使用ExecutorRouteLFU
原理是维护了一个以任务id为单位的地址计数器,当第一次进入时,不知道谁使用最少,以随机的形式先给各个地址初始化一个数,最大的计数器值不超过地址总量。
// 外层key为jobId,value-key为地址,value-value为计数器private static ConcurrentMap<Integer, HashMap<String, Integer>> jobLfuMap = new ConcurrentHashMap<Integer, HashMap<String, Integer>>();private static long CACHE_VALID_TIME = 0;public String route(int jobId, List<String> addressList) { if (System.currentTimeMillis() > CACHE_VALID_TIME) { // 超过一天后重置缓存 jobLfuMap.clear(); // 缓存周期为一天 CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24; } // 初始化结构 HashMap<String, Integer> lfuItemMap = jobLfuMap.get(jobId); // Key排序可以用TreeMap+构造入参Compare;Value排序暂时只能通过ArrayList; if (lfuItemMap == null) { lfuItemMap = new HashMap<String, Integer>(); jobLfuMap.putIfAbsent(jobId, lfuItemMap); // 避免重复覆盖 } for (String address: addressList) { if (!lfuItemMap.containsKey(address) || lfuItemMap.get(address) >1000000 ) { lfuItemMap.put(address, new Random().nextInt(addressList.size())); // 初始化时主动Random一次,缓解首次压力 } } // 移除无效的地址 List<String> delKeys = new ArrayList<>(); for (String existKey: lfuItemMap.keySet()) { if (!addressList.contains(existKey)) { delKeys.add(existKey); } } if (delKeys.size() > 0) { for (String delKey: delKeys) { lfuItemMap.remove(delKey); } } // 根据value进行排序 List<Map.Entry<String, Integer>> lfuItemList = new ArrayList<Map.Entry<String, Integer>>(lfuItemMap.entrySet()); Collections.sort(lfuItemList, new Comparator<Map.Entry<String, Integer>>() { @Override public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) { return o1.getValue().compareTo(o2.getValue()); } }); // 第0个就是使用最少的 Map.Entry<String, Integer> addressItem = lfuItemList.get(0); String minAddress = addressItem.getKey(); // 将本次使用的地址计数器做+1操作 addressItem.setValue(addressItem.getValue() + 1); return addressItem.getKey();}@Overridepublic ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) { String address = route(triggerParam.getJobId(), addressList); return new ReturnT<String>(address);}
最久未使用ExecutorRouteLRU
维护了一个以任务id为单位的map,kv都是地址,实现原理是利用了LinkedHashMap存储排序的特性。
accessOrder:true=访问顺序排序(get/put时排序);false=插入顺序排期;
// key=jobId,value-key=address,value-value=addressprivate static ConcurrentMap<Integer, LinkedHashMap<String, String>> jobLRUMap = new ConcurrentHashMap<Integer, LinkedHashMap<String, String>>();private static long CACHE_VALID_TIME = 0;public String route(int jobId, List<String> addressList) { // 每24小时清除一次缓存 if (System.currentTimeMillis() > CACHE_VALID_TIME) { jobLRUMap.clear(); CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24; } // init lru LinkedHashMap<String, String> lruItem = jobLRUMap.get(jobId); if (lruItem == null) { lruItem = new LinkedHashMap<String, String>(16, 0.75f, true); jobLRUMap.putIfAbsent(jobId, lruItem); } // 初始化地址kv都是地址 for (String address: addressList) { if (!lruItem.containsKey(address)) { lruItem.put(address, address); } } // 移除无效的地址 List<String> delKeys = new ArrayList<>(); for (String existKey: lruItem.keySet()) { if (!addressList.contains(existKey)) { delKeys.add(existKey); } } if (delKeys.size() > 0) { for (String delKey: delKeys) { lruItem.remove(delKey); } } // 直接拿到第一个即可 String eldestKey = lruItem.entrySet().iterator().next().getKey(); String eldestValue = lruItem.get(eldestKey); return eldestValue;}@Overridepublic ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) { String address = route(triggerParam.getJobId(), addressList); return new ReturnT<String>(address);}
故障转移ExecutorRouteFailover
当调度的机器出现无法调度的情况时,则切换为另一台机器
实现原理就是通过调用机器的beat接口查看机器的返回状态来判定是否存活,如果不存活则循环下一个继续该步骤,直到找到可用机器或者无可用机器为止
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) { StringBuffer beatResultSB = new StringBuffer(); for (String address : addressList) { // beat ReturnT<String> beatResult = null; try { ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address); // 通过调度接口的形式检查机器是否存活 beatResult = executorBiz.beat(); } catch (Exception e) { logger.error(e.getMessage(), e); beatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e ); } // 记录检查日志 beatResultSB.append( (beatResultSB.length()>0)?"
":"") .append(I18nUtil.getString("jobconf_beat") + ":") .append("
address:").append(address) .append("
code:").append(beatResult.getCode()) .append("
msg:").append(beatResult.getMsg()); if (beatResult.getCode() == ReturnT.SUCCESS_CODE) { // 机器正常时则返回该机器地址 beatResult.setMsg(beatResultSB.toString()); beatResult.setContent(address); return beatResult; } } // 没有可用的存活机器 return new ReturnT<String>(ReturnT.FAIL_CODE, beatResultSB.toString());}
忙碌转移ExecutorRouteBusyover
当机器的excutor处于忙碌的状态时,则转移至不忙碌的机器
实现原理就是通过调用机器的idleBeat接口查看机器的返回状态来判定是否忙碌,如果处于忙碌或不可用状态则循环下一个继续该步骤,直到找到空闲且可用的机器或者没有可用机器为止
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) { StringBuffer idleBeatResultSB = new StringBuffer(); for (String address : addressList) { ReturnT<String> idleBeatResult = null; try { ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address); // 调度接口查看是否忙碌 idleBeatResult = executorBiz.idleBeat(new IdleBeatParam(triggerParam.getJobId())); } catch (Exception e) { logger.error(e.getMessage(), e); idleBeatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e ); } idleBeatResultSB.append( (idleBeatResultSB.length()>0)?"
":"") .append(I18nUtil.getString("jobconf_idleBeat") + ":") .append("
address:").append(address) .append("
code:").append(idleBeatResult.getCode()) .append("
msg:").append(idleBeatResult.getMsg()); if (idleBeatResult.getCode() == ReturnT.SUCCESS_CODE) { idleBeatResult.setMsg(idleBeatResultSB.toString()); idleBeatResult.setContent(address); return idleBeatResult; } }// 没有可用的存活且空闲的机器 return new ReturnT<String>(ReturnT.FAIL_CODE, idleBeatResultSB.toString());}
✅结语
通过本章我们了解了xxl-job的任务执行流程,以及全部路由策略的实现原理
【xxl-job源码篇01】xxl-job源码解读 神奇的时间轮 触发流程解读
【xxl-job源码篇02】注册中心 自研RPC netty的应用?
【xxl-job源码篇03】xxl-job日志系统源码解读