基于redis实现延时任务
redis的zset是有序集合,默认根据score升序排序。并且可以根据scope范围查询,因此可以启动一个线程循环执行范围查询,获取当前时间之前的数据,即要执行任务,(因为不是严格按照时间匹配的,因此可能会有一点时间偏差,但一般情况下不会有影响),处理完后删除缓存。考虑到线程有可能会异常退出(比如redis连接异常等),因此使用监听者模式设计了线程重启方案,监听者会监听线程,当线程出现异常时监听者会重启线程。下面是具体代码。
@Configurationpublic class RedisTemplateConfig { @Bean(name = "myredis") public RedisTemplate redisTemplate(RedisConnectionFactory factory){ RedisTemplate<String, Object> redisTemplate = new RedisTemplate(); redisTemplate.setConnectionFactory(factory); // 使用StringRedisSerializer来序列化和反序列化redis的key值 StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); redisTemplate.setKeySerializer(stringRedisSerializer); redisTemplate.setHashKeySerializer(stringRedisSerializer); // 使用GenericFastJsonRedisSerializer 来序列化和反序列化redis的value值 GenericFastJsonRedisSerializer genericFastJsonRedisSerializer = new GenericFastJsonRedisSerializer(); redisTemplate.setValueSerializer(genericFastJsonRedisSerializer); redisTemplate.setHashValueSerializer(genericFastJsonRedisSerializer); redisTemplate.afterPropertiesSet(); return redisTemplate; }}
public class DelayTaskConsumerThread extends Observable implements Runnable { @Override public void run() { System.out.println("线程启动:"+Thread.currentThread().getId()); while (true){ try { long time = System.currentTimeMillis(); consumeMsg("testDelayTask",time); } catch (Exception e) { e.printStackTrace(); System.out.println("线程异常:"+Thread.currentThread().getId()); doBusiness(); break; } } } private void consumeMsg(String key,long time){ RedisTemplate redisTemplate = (RedisTemplate) SpringContextUtil.getBean("myredis"); Set set = redisTemplate.opsForZSet().rangeByScore(key,0l, time); if (set.size() > 0){ System.out.println("线程:"+Thread.currentThread().getId()+","+"获取到["+set.size()+"]条数据,[time="+time+"]"); Iterator iterator = set.iterator(); String next=""; while (iterator.hasNext()){ try { next = (String) iterator.next(); System.out.println("线程:"+Thread.currentThread().getId()+","+"消费消息:[value="+ next +"]"); }catch (Exception e){ //这里可以对消费异常的数据做进一步处理 throw e; }finally { redisTemplate.opsForZSet().remove(key,next); } } }else { System.out.println("线程:"+Thread.currentThread().getId()+","+"没有消息[time="+time+"]"); } } public void doBusiness(){ if(true){ super.setChanged(); } notifyObservers(); }}
public class DelayTaskConsumerListener implements Observer { @Override public void update(Observable o, Object arg) { System.out.println("DelayTaskConsumerThread线程异常退出,重新启动"); DelayTaskConsumerThread run = new DelayTaskConsumerThread(); run.addObserver(this); ThreadPoolUtil.execute(run); System.out.println("DelayTaskConsumerThread线程已重启"); }}
@Componentpublic class DelayTaskConsumerRunner implements ApplicationRunner { @Autowired @Qualifier(value = "myredis") private RedisTemplate redisTemplate; @Override public void run(ApplicationArguments args) { redisTemplate.delete("testDelayTask"); DelayTaskConsumerThread taskConsumerThread = new DelayTaskConsumerThread(); DelayTaskConsumerListener taskConsumerListener = new DelayTaskConsumerListener(); taskConsumerThread.addObserver(taskConsumerListener); ThreadPoolUtil.execute(taskConsumerThread); }}
@Componentpublic class SpringContextUtil implements ApplicationContextAware { / * 上下文对象实例 */ private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { SpringContextUtil.applicationContext = applicationContext; } / * 获取applicationContext * * @return */ public static ApplicationContext getApplicationContext() { return applicationContext; } / * 通过name获取 Bean. * * @param name * @return */ public static Object getBean(String name) { return getApplicationContext().getBean(name); } / * 通过class获取Bean. * * @param clazz * @param * @return */ public static <T> T getBean(Class<T> clazz) { return getApplicationContext().getBean(clazz); } / * 通过name,以及Clazz返回指定的Bean * * @param name * @param clazz * @param * @return */ public static <T> T getBean(String name, Class<T> clazz) { return getApplicationContext().getBean(name, clazz); }}
@GetMapping(value = "/test", produces = MediaType.APPLICATION_JSON_UTF8_VALUE) public CommonResponse test() { produceMsg("testDelayTask", UUID.randomUUID().toString(),System.currentTimeMillis());//这里的时间戳实际是需要从页面选择的任务执行时间,这里为了方便直接使用了系统当前时间戳 return new CommonResponse(); } / * * @param key * @param value * @param score 设置的任务执行时间 */ private void produceMsg(String key,String value,long score){ Boolean add = redisTemplate.opsForZSet().add(key, value, score); if (add){ System.out.println("发送消息:[value="+value+",score="+score+"]"); }else { System.out.println("消息发送失败:[value="+value+",score="+score+"]"); } }