> 文档中心 > 基于redis实现延时任务

基于redis实现延时任务

redis的zset是有序集合,默认根据score升序排序。并且可以根据scope范围查询,因此可以启动一个线程循环执行范围查询,获取当前时间之前的数据,即要执行任务,(因为不是严格按照时间匹配的,因此可能会有一点时间偏差,但一般情况下不会有影响),处理完后删除缓存。考虑到线程有可能会异常退出(比如redis连接异常等),因此使用监听者模式设计了线程重启方案,监听者会监听线程,当线程出现异常时监听者会重启线程。下面是具体代码。

@Configurationpublic class RedisTemplateConfig {    @Bean(name = "myredis")    public RedisTemplate redisTemplate(RedisConnectionFactory factory){ RedisTemplate<String, Object> redisTemplate = new RedisTemplate(); redisTemplate.setConnectionFactory(factory); // 使用StringRedisSerializer来序列化和反序列化redis的key值 StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); redisTemplate.setKeySerializer(stringRedisSerializer); redisTemplate.setHashKeySerializer(stringRedisSerializer); // 使用GenericFastJsonRedisSerializer 来序列化和反序列化redis的value值 GenericFastJsonRedisSerializer genericFastJsonRedisSerializer = new GenericFastJsonRedisSerializer(); redisTemplate.setValueSerializer(genericFastJsonRedisSerializer); redisTemplate.setHashValueSerializer(genericFastJsonRedisSerializer); redisTemplate.afterPropertiesSet(); return redisTemplate;    }}
public class DelayTaskConsumerThread extends Observable implements Runnable {    @Override    public void run() { System.out.println("线程启动:"+Thread.currentThread().getId()); while (true){     try {  long time = System.currentTimeMillis();  consumeMsg("testDelayTask",time);     } catch (Exception e) {  e.printStackTrace();  System.out.println("线程异常:"+Thread.currentThread().getId());  doBusiness();  break;     } }    }    private void consumeMsg(String key,long time){ RedisTemplate redisTemplate = (RedisTemplate) SpringContextUtil.getBean("myredis"); Set set = redisTemplate.opsForZSet().rangeByScore(key,0l, time); if (set.size() > 0){     System.out.println("线程:"+Thread.currentThread().getId()+","+"获取到["+set.size()+"]条数据,[time="+time+"]");     Iterator iterator = set.iterator();     String next="";     while (iterator.hasNext()){  try {      next = (String) iterator.next();      System.out.println("线程:"+Thread.currentThread().getId()+","+"消费消息:[value="+ next +"]");  }catch (Exception e){      //这里可以对消费异常的数据做进一步处理      throw e;  }finally {      redisTemplate.opsForZSet().remove(key,next);  }     } }else {     System.out.println("线程:"+Thread.currentThread().getId()+","+"没有消息[time="+time+"]"); }    }    public void doBusiness(){ if(true){     super.setChanged(); } notifyObservers();    }}
public class DelayTaskConsumerListener implements Observer {    @Override    public void update(Observable o, Object arg) { System.out.println("DelayTaskConsumerThread线程异常退出,重新启动"); DelayTaskConsumerThread run = new DelayTaskConsumerThread(); run.addObserver(this); ThreadPoolUtil.execute(run); System.out.println("DelayTaskConsumerThread线程已重启");    }}
@Componentpublic class DelayTaskConsumerRunner implements ApplicationRunner {    @Autowired    @Qualifier(value = "myredis")    private RedisTemplate redisTemplate;    @Override    public void run(ApplicationArguments args) { redisTemplate.delete("testDelayTask"); DelayTaskConsumerThread taskConsumerThread = new DelayTaskConsumerThread(); DelayTaskConsumerListener taskConsumerListener = new DelayTaskConsumerListener(); taskConsumerThread.addObserver(taskConsumerListener); ThreadPoolUtil.execute(taskConsumerThread);    }}
@Componentpublic class SpringContextUtil implements ApplicationContextAware {    /     * 上下文对象实例     */    private static ApplicationContext applicationContext;    @Override    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { SpringContextUtil.applicationContext = applicationContext;    }    /     * 获取applicationContext     *     * @return     */    public static ApplicationContext getApplicationContext() { return applicationContext;    }    /     * 通过name获取 Bean.     *     * @param name     * @return     */    public static Object getBean(String name) { return getApplicationContext().getBean(name);    }    /     * 通过class获取Bean.     *     * @param clazz     * @param      * @return     */    public static <T> T getBean(Class<T> clazz) { return getApplicationContext().getBean(clazz);    }    /     * 通过name,以及Clazz返回指定的Bean     *     * @param name     * @param clazz     * @param      * @return     */    public static <T> T getBean(String name, Class<T> clazz) { return getApplicationContext().getBean(name, clazz);    }}
@GetMapping(value = "/test", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)    public CommonResponse test() { produceMsg("testDelayTask", UUID.randomUUID().toString(),System.currentTimeMillis());//这里的时间戳实际是需要从页面选择的任务执行时间,这里为了方便直接使用了系统当前时间戳 return new CommonResponse();    }    /     *     * @param key     * @param value     * @param score 设置的任务执行时间     */    private void produceMsg(String key,String value,long score){ Boolean add = redisTemplate.opsForZSet().add(key, value, score); if (add){     System.out.println("发送消息:[value="+value+",score="+score+"]"); }else {     System.out.println("消息发送失败:[value="+value+",score="+score+"]"); }    }