irpas技术客

Springboot基于redisson实现延时队列_+二_redisson延时队列缺陷

大大的周 4101

总监:咳咳咳…

我:😱

总监:那个,最近很多微服务里面需要重试机制啊,你看看怎么搞一下?

我:我gzip压缩还没搞完呢!(Gateway网关和Feign调用开启gzip压缩)

总监:你加把劲,我是相信你的,搞完了告诉我一声。

我心里:我的🐟怎么办?


思路:

既然是需要重试机制,那么一定需要一个队列去的形式去存储该部分需要重试的数据。重试就需要有时间的概念,比如重试几次,多长时间重试一次。由此就想到了可不可以使用redis的延时队列呢,我们只需要设置时间,记录一下失败的次数,就可以通过回调来自动执行这个任务。

说干就干.

一:编写一个生产者类为QueueProducer import lombok.extern.slf4j.Slf4j; import org.redisson.api.RBlockingQueue; import org.redisson.api.RDelayedQueue; import org.redisson.api.RedissonClient; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.concurrent.TimeUnit; @Slf4j @Component public class QueueProducer { @Resource private RedissonClient redissonClient; /** * 添加到队列 * * @param t 实际业务参数参数 * @param delay 延迟时间数 * @param timeUnit 延迟时间单位 * @param queueName 队列名 * @param <T> 泛型 */ public <T> void addQueue(String queueName, long delay, TimeUnit timeUnit, T t) { log.info("添加到队列:{}, 延迟时间数:{}, 延迟时间单位:{}, 实际参数:{}", queueName, delay, timeUnit, t); RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName); RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue); delayedQueue.offer(t, delay, timeUnit); } /** * 删除队列中的值 * @param queueName 队列名称 * @param value 要删除的值 * @param <T> 泛型 * @return 是否删除成功 */ public <T> Boolean delValue(String queueName,Object value){ RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName); RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue); return delayedQueue.remove(value); } } 二:编写一个消费者的接口 public interface QueueConsumer<T> { /** * 延迟队列任务执行方法,实现该方法执行具体业务 * * @param t 具体任务参数 */ void execute(T t); } 三:编写一个核心的监听类 import lombok.extern.slf4j.Slf4j; import org.redisson.api.RBlockingQueue; import org.redisson.api.RedissonClient; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Map; @Slf4j @SuppressWarnings("all") @Component public class CommonQueueComponent implements ApplicationContextAware { /** * redisson延迟队列名:以应用名(服务名)为队列名 * 该名称自行定义 */ @Value("${spring.application.name}") private String queueName; @Resource private RedissonClient redissonClient; /** * 从应用上下文中获取具体的队列消费者,并执行业务 * * @param applicationContext 应用上下文 * @throws BeansException BeansException */ @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Map<String, QueueConsumer> map = applicationContext.getBeansOfType(QueueConsumer.class); map.values().forEach(this::startThread); } /** * 启动线程获取队列,并执行业务 * * @param queueName 队列名 * @param CommonDelayQueueConsumer 任务回调监听 * @param <T> 泛型 */ private <T> void startThread(QueueConsumer queueConsumer) { RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName); //由于此线程需要常驻,所以可以直接新建线程,不需要交给线程池管理 Thread thread = new Thread(() -> { log.info("启动队列名为:{}的监听线程", queueName); while (true) { try { T t = blockingFairQueue.take(); //此处不提供多线程处理,自己决定是否开启多线程(业务中需要开启多线程话,建议使用线程池!!!) queueConsumer.execute(t); } catch (Exception e) { log.error("队列监听线程错误,", e); } } }); thread.setName(queueName); thread.start(); } }

因为是微服务项目,一个服务只用一个队列,如果不是微服务项目,可自行修改队列名称位置。

四:测试添加延时数据 public class Test{ /** * 注入redis的无界延迟队列执行器 */ @Resource CommonQueueProducer commonQueueProducer; public void test(){ //"test-service"对应queueName commonDelayedQueueProducer.addQueue("test-service", 1, MINUTES, "test"); } } 五:实现消费者接口QueueConsumer @Component public class RedisCustomer implements QueueConsumer { @Override public void execute(Object object) { //该类监听的为CommonQueueComponent类中的queueName名称的redis中的key //实现延时返回后处理数据的业务 } }

嘿嘿嘿,貌似搞定了。。。

我:看着外面的黑黑的天空,眼泪不自觉的流了下来。

溜了溜了。。。。等等,点个👍再走也不迟!


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #redisson延时队列缺陷 #springboot #springcloud