SpingBoot+ScheduledFuture 实现动态定时任务
目录
- 1、前言
- 2、数据库设计
- 3、业务代码实现
-
- 3.1、TaskMapper.xml
- 3.2、TaskMapper
- 3.3、TaskService
-
- 3.4、TaskController
- 4、任务核心代码
-
- 4.1、TaskThread
- 4.2、TaskManager
- 4.3、TaskRunnable
- 4.4、TaskBusinessService
- 5、接口测试
-
- 5.1、新增任务
- 5.2、更新任务
- 5.3、任务详情
- 6、执行原理
- 7、源码参考
1、前言
1、动态任务的实现主要通过ThreadPoolTaskScheduler
与ScheduledFuture
类进行实现,并且配合自定义封装类实现完整的动态任务流程;
2、通过数据库记录具体的任务信息;
3、以下说明为核心代码的编写,具体完整代码可以参考gitee:《scheduled-future-demo》,建议下载源码后,再结合文档说明进行理解;
2、数据库设计
创建一个数据库叫做:task
,创建表sys_task
CREATE TABLE `sys_task` ( `id` bigint NOT NULL COMMENT '任务ID', `name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT '' COMMENT '任务名称', `invoke_target` varchar(500) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '目标字符串', `cron_expression` varchar(500) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT '' COMMENT 'cron执行表达式', `policy` int DEFAULT NULL COMMENT '计划执行错误策略(1手动,2-自动)', `situation` int DEFAULT NULL COMMENT '执行情况(1-执行中,2-已暂停)', `version` int DEFAULT '0' COMMENT '执行版本(每执行一次加一)', `last_run_time` timestamp NULL DEFAULT NULL COMMENT '上次执行时间', `next_run_time` timestamp NULL DEFAULT NULL COMMENT '下次执行时间', `status` int DEFAULT '0' COMMENT '状态(1正常 1暂停)', `del_flag` int DEFAULT NULL COMMENT '删除标志(1-存在,2-删除)', `remark` varchar(500) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT '' COMMENT '备注信息', PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='定时任务调度表'
3、业务代码实现
只进行核心业务类的编写,包括TaskMapper.xml
、TaskMapper
、TaskService
、TaskController
;
3.1、TaskMapper.xml
<mapper namespace="com.lhz.mapper.task.TaskMapper"> <resultMap type="com.lhz.model.entity.Task" id="BaseResultMap"> <id property="id" column="id" jdbcType="VARCHAR"/> <result property="name" column="name" jdbcType="VARCHAR"/> <result property="invokeTarget" column="invoke_target" jdbcType="VARCHAR"/> <result property="cronExpression" column="cron_expression" jdbcType="VARCHAR"/> <result property="policy" column="policy" jdbcType="INTEGER"/> <result property="situation" column="situation" jdbcType="INTEGER"/> <result property="version" column="version" jdbcType="INTEGER"/> <result property="lastRunTime" column="last_run_time" jdbcType="TIMESTAMP"/> <result property="nextRunTime" column="next_run_time" jdbcType="TIMESTAMP"/> <result property="status" column="status" jdbcType="INTEGER"/> <result property="delFlag" column="del_flag" jdbcType="INTEGER"/> <result property="remark" column="remark" jdbcType="VARCHAR"/> </resultMap> <sql id="Base_Column_List"> id,name,cycle,invoke_target,cron_expression,policy,situation,version,last_run_time,next_run_time,status,del_flag,remark </sql> <select id="taskList" resultMap="BaseResultMap"> select <include refid="Base_Column_List"/> from sys_task where `status` = 0 </select> <select id="selectTaskById" parameterType="string" resultMap="BaseResultMap"> select <include refid="Base_Column_List"/> from sys_task where id =#{id} </select> <insert id="insert" parameterType="com.lhz.model.entity.Task"> insert into sys_task(id, name, cycle, invoke_target, cron_expression, situation, version, policy, last_run_time, next_run_time, status, del_flag,remark) values (#{id}, #{name}, #{cycle}, #{invokeTarget}, #{cronExpression}, #{situation}, #{version}, #{policy}, #{lastRunTime}, #{nextRunTime}, #{status}, #{delFlag}, #{remark}) </insert> <update id="updateVersion"> update sys_task <set> version = #{version} + 1, <if test="task.lastRunTime != null"> last_run_time = #{task.lastRunTime}, </if> <if test="task.nextRunTime != null"> next_run_time = #{task.nextRunTime}, </if> <if test="task.situation != null"> situation = #{task.situation}, </if> </set> where id = #{task.id} and version = #{version} </update> <update id="update" parameterType="com.lhz.model.entity.Task"> update sys_task <set> <if test="id != null"> id = #{id}, </if> <if test="name != null"> name = #{name}, </if> <if test="situation != null"> situation = #{situation}, </if> <if test="status != null"> status = #{status}, </if> <if test="invokeTarget != null"> invoke_target = #{invokeTarget}, </if> <if test="cronExpression != null"> cron_expression = #{cronExpression}, </if> <if test="situation != null"> situation = #{situation}, </if> <if test="cycle != null"> cycle = #{cycle}, </if> <if test="policy != null"> policy = #{policy}, </if> <if test="version != null"> version = #{version}, </if> <if test="lastRunTime != null"> last_run_time = #{lastRunTime}, </if> <if test="nextRunTime != null"> next_run_time = #{nextRunTime}, </if> </set> where id = #{id,jdbcType=INTEGER} </update> <delete id="deleteTask" parameterType="string"> delete from sys_task where id = #{id} </delete> <insert id="insertTaskLog" parameterType="com.lhz.model.entity.TaskLog"> insert into sys_task_log value (#{id}, #{taskId}, #{time}, #{status}, #{exceptionInfo}, #{createTime}) </insert></mapper>
3.2、TaskMapper
import com.lhz.model.entity.Task;import org.apache.ibatis.annotations.Mapper;import org.apache.ibatis.annotations.Param;import java.util.List;@Mapperpublic interface TaskMapper { List<Task> taskList(); Task selectTaskById(String id); int insert(Task task); int update(Task task); int updateVersion(@Param("task") Task task, @Param("version") Integer version); int deleteTask(String id);}
3.3、TaskService
import com.lhz.mapper.task.TaskMapper;import com.lhz.model.constant.TaskRunTypeConstant;import com.lhz.model.entity.Task;import com.lhz.model.param.TaskParam;import com.lhz.model.vo.TaskVo;import com.lhz.task.TaskManager;import com.lhz.utils.CronUtils;import com.lhz.utils.UUIDUtils;import org.springframework.beans.BeanUtils;import org.springframework.scheduling.annotation.Async;import org.springframework.stereotype.Service;import javax.annotation.Resource;import java.util.Date;import java.util.List;@Servicepublic class TaskServiceImpl implements TaskService { @Resource private TaskMapper taskMapper; @Resource private TaskManager taskManager; /*** *任务列表查询 * @return o */ public Object taskList() { return taskMapper.taskList(); } /** * 新增列表 * * @param param 新增参数 */ public void addTask(TaskParam param) { // 解析表达式,此表达式由后端根据规则进行解析,可以直接由前端进行传递 String cron = CronUtils.dateConvertToCron(param); //查询执行周期 Date nextTime = CronUtils.nextCurrentTime(cron); //生成实体 Task task = new Task(); BeanUtils.copyProperties(param, task); task.setId(UUIDUtils.getUuId()); task.setDelFlag(0); task.setCronExpression(cron); task.setNextRunTime(nextTime); // 执行策略(1手动-暂停状态(2),2-自动-执行中状态(1)) Integer situation = param.getPolicy() == 1 ? 2 : 1; task.setSituation(situation); //设置版本好为0 task.setVersion(0); //正常 task.setStatus(0); //插入数据库 taskMapper.insert(task); // 执行任务 String runType = param.getPolicy() == 1 ? TaskRunTypeConstant.USER_RUN : TaskRunTypeConstant.SYSTEM_RUN; taskManager.start(task, runType); } /** * 修改任务 * * @param param 修改参数 */ public void updateTask(TaskParam param) { Task task = taskMapper.selectTaskById(param.getId()); if (task == null) { throw new RuntimeException("更新失败,任务不存在"); } //解析表达式 String cron = CronUtils.dateConvertToCron(param); //查询执行周期 Date nextTime = CronUtils.nextCurrentTime(cron); //生成实体 BeanUtils.copyProperties(param, task); task.setCronExpression(cron); task.setNextRunTime(nextTime); // 执行策略(1手动-暂停状态(2),2-自动-执行中状态(1)) int situation = param.getPolicy() == 1 ? 2 : 1; task.setSituation(situation); task.setStatus(0);//正常 //插入数据库 taskMapper.update(task); // 执行任务 String runType = param.getPolicy() == 1 ? TaskRunTypeConstant.USER_RUN : TaskRunTypeConstant.SYSTEM_RUN; taskManager.start(task, runType); } /** * 执行任务 * * @param id 任务id */ public void invokeTask(String id) { Task task = taskMapper.selectTaskById(id); if (task == null) { throw new RuntimeException("执行失败,任务不存在"); } // 执行 taskManager.start(task, TaskRunTypeConstant.SYSTEM_RUN); } /** * 暂停任务 * * @param id 任务id */ public void stopTask(String id) { Task task = taskMapper.selectTaskById(id); if (task == null) { throw new RuntimeException("暂停任务失败,任务不存在"); } taskManager.stop(id); } /** * 删除任务 * * @param id 任务id */ public void deleteTask(String id) { Task task = taskMapper.selectTaskById(id); if (task == null) { throw new RuntimeException("删除任务失败,任务不存在"); } taskManager.stop(id); //数据库删除 taskMapper.deleteTask(id); } /** * 禁用任务 * * @param id 任务id */ public void forbidTask(String id) { Task task = taskMapper.selectTaskById(id); if (task == null) { throw new RuntimeException("禁用失败,任务不存在"); } //停止任务 taskManager.stop(id); //禁用 task.setStatus(1); taskMapper.update(task); } /** * 查询详情 * * @param id 任务id */public TaskVo getTaskById(String id) { Task task = taskMapper.selectTaskById(id); TaskVo taskVo = new TaskVo(); BeanUtils.copyProperties(task, taskVo); List<String> nextExecution = (List<String>) CronUtils.getNextExecution(task.getCronExpression(), 8, true); taskVo.setNext(nextExecution); return taskVo; }}
3.4、TaskController
import com.lhz.model.entity.Task;import com.lhz.model.param.TaskParam;import com.lhz.model.vo.TaskVo;import com.lhz.service.TaskService;import io.swagger.annotations.*;import org.springframework.web.bind.annotation.*;import javax.annotation.Resource;@RestController("task")@Api(tags = "任务管理")public class TaskController { @Resource private TaskService taskService; /** * 查询任务列表 * * @return */ @GetMapping("/list") @ApiOperation(value = "任务列表", notes = "任务列表", response = Task.class, responseContainer = "List") public Object taskList() { return taskService.taskList(); } /** * 新增任务 * * @param param */ @PostMapping("/add") @ApiOperation(value = "新增任务", notes = "新增任务") public void addTask(@RequestBody TaskParam param) { taskService.addTask(param); } /** * 更新任务 * * @param param */ @PutMapping("/update") @ApiOperation(value = "更新任务", notes = "更新任务") public void updateTask(@RequestBody TaskParam param) { taskService.updateTask(param); } /** * 删除任务 * * @param id */ @DeleteMapping("delete/{id}") @ApiOperation(value = "删除任务", notes = "删除任务") @ApiImplicitParam(name = "id", value = "任务id", paramType = "path", required = true, dataType = "String") public void deleteTask(@PathVariable("id") String id) { taskService.deleteTask(id); } /** * 暂停任务 * * @param id */ @PostMapping("stop/{id}") @ApiOperation(value = "暂停任务", notes = "暂停任务") @ApiImplicitParam(name = "id", value = "任务id", paramType = "path", required = true, dataType = "String") public void stopTask(@PathVariable("id") String id) { taskService.stopTask(id); } /** * 执行任务 * * @param id */ @PostMapping("invoke/{id}") @ApiOperation(value = "执行任务", notes = "执行任务") @ApiImplicitParam(name = "id", value = "任务id", paramType = "path", required = true, dataType = "String") public void invokeTask(@PathVariable("id") String id) { taskService.invokeTask(id); } /** * 查询详情 * * @param id * @return */ @GetMapping("info/{id}") @ApiOperation(value = "查询详情", notes = "查询详情", response = TaskVo.class) @ApiImplicitParam(name = "id", value = "任务id", paramType = "path", required = true, dataType = "String") public TaskVo getTaskById(@PathVariable("id") String id) { return taskService.getTaskById(id); }}
4、任务核心代码
4.1、TaskThread
该类主要配置执行任务的线程,设置了线程前缀;
import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;@Configurationpublic class TaskThread { @Bean("taskScheduler") public ThreadPoolTaskScheduler taskScheduler() { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); scheduler.setThreadNamePrefix("TaskThread - "); return scheduler; }}
4.2、TaskManager
该类主要是对任务进行管理,初始化、启动、停止、清除操作,具体源码如下:
import com.lhz.mapper.task.TaskMapper;import com.lhz.model.constant.TaskPolicyConstant;import com.lhz.model.constant.TaskRunTypeConstant;import com.lhz.model.constant.TaskRunnableConstant;import com.lhz.model.entity.Task;import com.lhz.utils.CronUtils;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;import org.springframework.scheduling.support.CronTrigger;import org.springframework.stereotype.Component;import javax.annotation.Resource;import java.util.Date;import java.util.List;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ScheduledFuture;import java.util.stream.Collectors;@Componentpublic class TaskManager { private final Logger logger = LoggerFactory.getLogger(TaskManager.class); /** * ConcurrentHashMap保证线程安全 */ private final Map<String, ScheduledFuture<?>> taskScheduledMap = new ConcurrentHashMap<>(); /** * 任务线程最大值,可以根据情况调整 */ private static final int MaxPoolSize = 20; /** * 引用自定义配置Bean对象 */ @Resource(name = "taskScheduler") private ThreadPoolTaskScheduler threadPoolTaskScheduler; @Resource private TaskMapper taskMapper; public void initTask() { // 读取自动执行任务列表 List<Task> tasks = taskMapper.taskList(); // 过滤获取自动执行或者运行中的任务 List<Task> startTasks = tasks.stream().filter(t -> t.getPolicy() == 2 || t.getSituation() == 1).collect(Collectors.toList()); logger.info("初始化任务列表:{}", startTasks.size()); // 设置线程大小,根据当前任务的并发情况设置,最大值为 MaxPoolSize, int poolSize = Math.min(startTasks.size(), MaxPoolSize); threadPoolTaskScheduler.setPoolSize(poolSize); for (Task task : startTasks) { this.start(task, TaskRunTypeConstant.SYSTEM_RUN); } } public void destroyTask() { logger.info("######## 结束任务 #########"); //查询运行中的任务,进行停止操作 clear(); } public void start(Task task, String runType) { String taskId = task.getId(); Integer policy = task.getPolicy(); String cron = task.getCronExpression(); //创建任务 logger.info("======== 创建任务:{} ========", taskId); //如果线程已存在则先关闭,再开启新的 if (taskScheduledMap.get(taskId) != null) { logger.info("重复任务:" + taskId); close(taskId); } //执行start(自动策略或者人为触发) if (policy.equals(TaskPolicyConstant.AUTO) || runType.equals(TaskRunTypeConstant.SYSTEM_RUN)) { logger.info("======== 执行任务:{} ========", taskId); // 每次执行时,将读取的下次执行修改为真实的下次执行下级 Date nextCurrentTime = CronUtils.nextCurrentTime(task.getCronExpression()); task.setNextRunTime(nextCurrentTime); TaskRunnable taskRunnable = new TaskRunnable(taskId); // 将task对象加入内存 TaskRunnableConstant.taskMap.put(taskId, task); // 执行TaskRunnable的run方法 ScheduledFuture<?> schedule = threadPoolTaskScheduler.schedule(taskRunnable, new CronTrigger(cron)); taskScheduledMap.put(taskId, schedule); // 更新任务状态为已执行 // 执行中 task.setSituation(1); taskMapper.update(task); } } /** * 关闭redis的任务 * * @param taskId */ public void close(String taskId) { if (taskScheduledMap.get(taskId) != null) { ScheduledFuture<?> scheduledFuture = taskScheduledMap.get(taskId); scheduledFuture.cancel(true); taskScheduledMap.remove(taskId); TaskRunnableConstant.taskMap.remove(taskId); logger.info("关闭任务:" + taskId); } } public void stop(String taskId) { if (taskScheduledMap.get(taskId) != null) { ScheduledFuture<?> scheduledFuture = taskScheduledMap.get(taskId); // 调用取消 // 如果参数为true并且任务正在运行,那么这个任务将被取消 // 如果参数为false并且任务正在运行,那么这个任务将不会被取消 scheduledFuture.cancel(true); taskScheduledMap.remove(taskId); TaskRunnableConstant.taskMap.remove(taskId); logger.info("停止任务:" + taskId); //修改任务状况为停止 Task task = new Task(); task.setId(taskId); //已暂停 task.setSituation(2); taskMapper.update(task); } } public void clear() { for (Map.Entry<String, ScheduledFuture<?>> entry : taskScheduledMap.entrySet()) { String taskId = entry.getKey(); stop(taskId); } }}
4.3、TaskRunnable
import com.lhz.MyApplicationContext;import com.lhz.mapper.task.TaskMapper;import com.lhz.model.constant.TaskRunnableConstant;import com.lhz.model.entity.Task;import com.lhz.utils.CronUtils;import com.lhz.utils.JobInvokeUtil;import com.lhz.utils.RedisUtils;import com.lhz.utils.UUIDUtils;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.lang.reflect.Method;import java.util.Date;import java.util.List;import java.util.concurrent.TimeUnit;public class TaskRunnable implements Runnable { private final Logger log = LoggerFactory.getLogger(TaskManager.class); /** * 创建任务时,传递任任务id */ private final String id; public TaskRunnable(String id) { this.id = id; } /** * 是否进行校验时间差,第一次执行任务时,不校验时间差 */ private boolean checkTime = false; @Override public void run() { String lockKey = TaskRunnableConstant.TASK_LOCK_KEY + id; RedisUtils taskRedis = MyApplicationContext.getBean(RedisUtils.class); // redis 锁验证的value String value = UUIDUtils.getUuId(); //获取当前执行时间戳 long currentTime = System.currentTimeMillis(); // 获取task Task currentTask = MyApplicationContext.getBean(TaskMapper.class).selectTaskById(id); try { //判断任务执行时间和实际时间差 Date nextRunTime = currentTask.getNextRunTime(); log.info("任务:{},当前:{},下次:{}", id, new Date(), nextRunTime); // 时间差 long diffTime = Math.abs(currentTime - nextRunTime.getTime()); //执行时,允许200ms误差,为了防止服务器时间钟摆出现误差 if (diffTime > 200 && checkTime) { String msg = "任务执行异常,时间节点错误!"; //开发中出现了错误情况,可以采用发生邮箱提醒给开发者 log.error(msg); // 抛出异常记录错误日志 throw new RuntimeException(msg); } //通过表达式找到需要执行的方法 String invokeTarget = currentTask.getInvokeTarget(); //获取bean String beanName = JobInvokeUtil.getBeanName(invokeTarget); // 获取调用方法 String methodName = JobInvokeUtil.getMethodName(invokeTarget); // 获取参数 List<Object[]> methodParams = JobInvokeUtil.getMethodParams(invokeTarget); // 默认第一个参数 加上 id 参数 methodParams.add(0, new Object[]{id, String.class}); // 通过反射找到对应执行方法 Object bean = MyApplicationContext.getBean(beanName); Method method = bean.getClass().getDeclaredMethod(methodName, JobInvokeUtil.getMethodParamsType(methodParams)); // 执行任务 long startTime = System.currentTimeMillis(); method.invoke(bean, JobInvokeUtil.getMethodParamsValue(methodParams)); // 更新任务 updateTask(currentTask); // 记录日志 TaskLogRecord.recordTaskLog(id, startTime, null); } catch (Exception e) { e.printStackTrace(); // 更新任务 updateTask(currentTask); // 出现异常记录异常日志,并且可以发生邮箱给开发者 TaskLogRecord.recordTaskLog(id, 0, e); } finally { // 当任务执行完成后,后续开启时间校验 checkTime = true; } } private void updateTask(Task currentTask) { String taskId = currentTask.getId(); if (TaskRunnableConstant.taskMap.get(taskId) != null) { String cron = currentTask.getCronExpression(); String invokeTarget = currentTask.getInvokeTarget(); Date nextRunTime = currentTask.getNextRunTime(); // 查询执行周期 Date nextTime = CronUtils.nextCurrentTime(cron); //修改任务状况为执行中 Task task = new Task(); task.setId(taskId); task.setCronExpression(cron); task.setInvokeTarget(invokeTarget); //上次执行时间为,本次的下次执行时间 task.setLastRunTime(nextRunTime); task.setNextRunTime(nextTime); //执行中 task.setSituation(1); MyApplicationContext.getBean(TaskMapper.class).update(task); log.info("更新任务执行情况!"); } }}
4.4、TaskBusinessService
该类为具体的任务执行类,由注册@Service(“name”)Bean名称及具体方法名称构成,对应Sql中invoke_target
字段的值,在TaskRunnable
中通过反射执行该类。
import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.scheduling.annotation.Async;import org.springframework.stereotype.Service;import java.util.Random;@Service("taskBusinessService")public class TaskBusinessService { private final Logger logger = LoggerFactory.getLogger(TaskBusinessService.class); /** * 为了记录异常 所有的方法必须在try内 * 每个方法默认接收一个taskId */ @Async public void taskA(String id) { try { logger.info("======执行业务代码 --- this is A ======"); } catch (Exception e) { e.printStackTrace(); } }}
5、接口测试
该工程整合swagger进行接口测试实现,具体的接口测试通过swagger发起请求
5.1、新增任务
新增一个任务,corn为:0/5 * * * * ?
,每5S执行一次;
{"invokeTarget": "taskBusinessService.taskA","name": "测试任务", "corn":"0/5 * * * * ?"}
测试结果:每5S会执行一次,运行情况入截图:
5.2、更新任务
更新一个任务,corn为:0/3 * * * * ?
,每3S执行一次;
{"id":"f91b2d8c42ee4a50bc9de13454bc8ccb";"invokeTarget": "taskBusinessService.taskA","name": "测试任务", "corn":"0/3 * * * * ?"}
测试结果:由原来的每5S会执行一次变成了3S一次,运行情况入截图:
5.3、任务详情
查看任务时详情,以及后续几次触发时间:
6、执行原理
1、通过类ThreadPoolTaskScheduler的schedule方法传入一个Runnable及cron表达式参数,启动一个线程并且异步定时执行该线程的内容,启动后返回ScheduledFuture对象。
2、通过类ScheduledFuture、HashMap对当前定时的线程进行储存、取消操作。
3、在修改定时任务时,先判断当前的线程名称是否已经存在
如果存在则抛异常或者删除原来的再添加新的(具体看业务)
如果不存在则直接添加
4、在实现的Runnable接口中,通过反射机制,找到具体执行任务的Bean对象及方法。
5、线程开启也可以在项目启动时就进行处理。
6、每次重新启动后线程都将被清空,如果想要保存之前的定时任务情况,可以将定时任务写进数据库,在启动项目时进行初始化加载。
7、注意:就算开了异步线程,当一个定时任务还没有执行完时,就算到了执行周期也不会再次执行该任务。
8、处理流程:
7、源码参考
具体完整代码可以参考gitee:《scheduled-future-demo》,建议下载源码后,再结合文档说明进行理解;