irpas技术客

java 自定义类 模拟实现消息队列 消费发送功能(升级版)_napcleon1_java 队列消费

irpas 3621

消息队列 用户 消息发送消费 自研 rpcIO线程处理等 . 是多消息出入 单线程消费结构. (也支持多线程消费.但不是本代码的设计初衷)

大概流程 1.创建消息分组 2.添加一条消息 3.消息加入分组队列 并通知队列 执行消息 4.消息管理器获取消息运行 5.直到消息全部执行完毕 停止执行 6.等待消息管理器收到新的消息 再次运行

原版地址 https://blog.csdn.net/napcleon1/article/details/105402879 上一版存在一些问题 1.while (true) 循环 while(true)循环会一直占用资源导致服务器效率低下 2.单线程故障率高 3. 多任务模式下(类似kafka 的多个topic) 会创建多个while(true) 线程 及其占用资源.

下面直接上代码吧 首先写一个类里面有一个 线程安全的 list 用于存放消息

import com.github.niupengyu.core.exception.SysException; import com.github.niupengyu.core.util.DateUtil; import com.github.niupengyu.core.util.StringUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; import java.util.concurrent.*; 消息管理器 提供 内存消息队列的 创建 添加任务 等操作 public class CommonMessageService { Map<String,MultipleMessageService> multipleMessageServiceMap=new HashMap<>(); private Logger logger= LoggerFactory.getLogger(CommonMessageService.class); private ExecutorService pools; //创建一个线程池 默认 3个线程 public CommonMessageService(int corePoolSize){ pools=new ThreadPoolExecutor(corePoolSize,Integer.MAX_VALUE,0l, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); } public CommonMessageService(){ pools=new ThreadPoolExecutor(3,Integer.MAX_VALUE,0l, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); } public CommonMessageService(ExecutorService pools){ this.pools=pools; } //创建一个 分钟 类似创建kafka的topic public void addGroup(String key,DataRunner dataRunner,String name,int maxQueues){ if(multipleMessageServiceMap.containsKey(key)){ return; } MultipleMessageService multipleMessageService=new MultipleMessageService(name,maxQueues,dataRunner,pools); multipleMessageServiceMap.put(key,multipleMessageService); } //结束一个线程管理器(这个方法多少有点小问题.) public void end() { multipleMessageServiceMap.clear(); pools.shutdown(); } // 添加一个消息到任务队列(不一定立即执行) 任务分组名称 数据 public void add(String key,Object data){ if(multipleMessageServiceMap.containsKey(key)){ multipleMessageServiceMap.get(key).add(data); }else{ throw new SysException("任务分组不存在"+key); } } //添加一个消息 并立即执行 public void addNow(String key,Object data){ if(multipleMessageServiceMap.containsKey(key)){ multipleMessageServiceMap.get(key).addNow(data); }else{ throw new SysException("任务分组不存在"+key); } } // 添加一个消息到分组队列 制定处理器 public void add(String key,DataRunner dataRunner,Object data){ if(multipleMessageServiceMap.containsKey(key)){ multipleMessageServiceMap.get(key).add(dataRunner,data); }else{ throw new SysException("任务分组不存在"+key); } } //添加一个消息 制定处理器 并立即执行 public void addNow(String key,DataRunner dataRunner,Object data){ if(multipleMessageServiceMap.containsKey(key)){ multipleMessageServiceMap.get(key).addNow(dataRunner,data); }else{ throw new SysException("任务分组不存在"+key); } } //添加一条消息并自动创建分组 public void addNew(String key,DataRunner dataRunner,String name,int maxQueues,Object data){ addGroup(key, dataRunner, name,maxQueues); add(key,dataRunner, data); } //返回线程队列信息 public Map messageInfo(){ Map<String,Object> data=new HashMap<>(); for(Map.Entry<String,MultipleMessageService> entry:multipleMessageServiceMap.entrySet()){ MultipleMessageService multipleMessageService=entry.getValue(); data.put(entry.getKey(), StringUtil.createMap("size",multipleMessageService.size(),"stop",multipleMessageService.isStop())); } return data; } } import com.github.niupengyu.core.exception.SysException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ExecutorService; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; //数据管理类 public class DataManager implements Runnable{ private final Lock lock = new ReentrantLock(); private final Lock runLock = new ReentrantLock(); private Logger logger; private String name; private boolean stop=true; private ExecutorService pools; private DataQueues<MessageBean> dataQueues; private MessageBean messageBean; private int maxQueues=0; public DataManager() { } //初始化函数 public void init(String name,/* DataRunner dataRunner,*/ DataQueues dataQueues,int maxQueues,ExecutorService pools) { this.name=name; //this.dataRunner=dataRunner; this.dataQueues=dataQueues; this.maxQueues=maxQueues; logger= LoggerFactory.getLogger(name); this.pools=pools; } /** * 添加消息的方法 * @param messageobj * @throws SysException */ public void add(MessageBean messageobj) { lock.lock(); try { int size=dataQueues.messageSize(); while(maxQueues>0&&size>=maxQueues){ logger.debug("等待 {}/{} ",size,maxQueues); Thread.sleep(500); size=dataQueues.messageSize(); } dataQueues.add(messageobj); }catch(Exception e){ throw new SysException(e); }finally{ lock.unlock(); } } /** * 添加消息的方法 * @param messageList * @throws SysException */ /*public void addList(List<T> messageList) throws SysException { lock.lock(); try { dataQueues.addList(messageList); }catch(Exception e){ e.printStackTrace(); }finally{ lock.unlock(); } }*/ /** * 读取一条最新的消息 * @return */ public MessageBean getMessage(){ lock.lock(); MessageBean obj=null; try { obj=dataQueues.getMessage(); }catch(Exception e){ throw new SysException(e); }finally{ lock.unlock(); } return obj; } /** * 读取一条最新的消息 * @return */ /*public List<T> getMessageList(int size){ lock.lock(); List<T> list=null; try { list=dataQueues.getMessageList(size); }catch(Exception e){ e.printStackTrace(); }finally{ lock.unlock(); } return list; }*/ /** * 获取消息长度 * @return */ public int messageSize(){ lock.lock(); int size=0; try { size=dataQueues.messageSize(); }catch(Exception e){ throw new SysException(e); }finally{ lock.unlock(); } return size; } //判断任务是否执行完毕 public boolean isStop(){ runLock.lock(); try { return stop; }catch (Exception e){ throw new SysException(e); }finally { runLock.unlock(); } } //设置运行状态 public void setStop(boolean stop){ runLock.lock(); try { this.stop = stop; }catch (Exception e){ throw new SysException(e); }finally { runLock.unlock(); } } //处理多级实现类 @Override public void run() { while(true){ MessageBean messageBean=this.getMessage(); if(messageBean==null){ setStop(true); return; } try { messageBean.execute(); } catch (Exception e) { logger.error("消息处理异常 ",messageBean.toString(),e); } } } //结束任务(这个多少有点问题) public void end() { //setStop(true); pools.shutdown(); } //开始消费任务队列 public void start() { pools.execute(this); } //立即执行 public void addNow(MessageBean messageBean) { pools.execute(new Runnable() { @Override public void run() { try { messageBean.execute(); } catch (Exception e) { logger.error("消息处理异常 ",messageBean.toString(),e); } } }); } }

消息管理方法

import com.github.niupengyu.core.exception.SysException; import java.util.ArrayList; import java.util.List; public class DataQueues<T>{ private List<T> message=new ArrayList<>(); public DataQueues() { } /** * 添加消息的方法 * @param messageobj * @throws SysException */ public void add(T messageobj) { message.add(messageobj); } /** * 添加消息的方法 * @param messageList * @throws SysException */ public void addList(List<T> messageList) throws SysException { message.addAll(messageList); } /** * 读取一条最新的消息 * @return */ public T getMessage(){ T obj=null; if(message.size()>0){ obj=message.get(0); message.remove(0); } return obj; } /** * 读取一条最新的消息 * @return */ public List<T> getMessageList(int size){ List<T> list=null; int messageSize=message.size(); int length; if(messageSize>0){ if(messageSize>size){ length=size; list=new ArrayList<>(message.subList(0,length)); message=new ArrayList<>(message.subList(length,messageSize)); }else{ length=messageSize; list=new ArrayList<>(message.subList(0,length)); message=new ArrayList<>(); } } return list; } /** * 获取消息长度 * @return */ public int messageSize(){ int size=0; size=message.size(); return size; } }

处理器 需要实现的接口

public interface DataRunner<T> { void execute(T messageBean) throws Exception; }

封装消息实体

public class MessageBean<T> { private DataRunner<T> dataRunner; private T value; public DataRunner<T> getDataRunner() { return dataRunner; } public MessageBean(DataRunner<T> dataRunner, T value) { this.dataRunner = dataRunner; this.value = value; } public void setDataRunner(DataRunner<T> dataRunner) { this.dataRunner = dataRunner; } public T getValue() { return value; } public void setValue(T value) { this.value = value; } public void execute() throws Exception { dataRunner.execute(value); } @Override public String toString() { return "MessageBean{" + "dataRunner=" + dataRunner + ", value=" + value + '}'; } } import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; //消息管理器服务 封装 public class MultipleMessageService<T> { //private MessageManager<T> messageManager; private Logger logger= LoggerFactory.getLogger(MultipleMessageService.class); private DataManager dataManager=new DataManager(); private DataRunner<T> dataRunner; // 创建消息队列 public MultipleMessageService(DataRunner dataRunner,String name){ this.init(name,dataRunner,new DataQueues(),0, Executors.newSingleThreadExecutor()); } public MultipleMessageService(int count,DataRunner dataRunner,String name){ this.init(name,dataRunner,new DataQueues(),0, Executors.newFixedThreadPool(count)); } public MultipleMessageService(int count,int maxQueues,DataRunner dataRunner,String name){ this.init(name,dataRunner,new DataQueues(),maxQueues, Executors.newFixedThreadPool(count)); } public MultipleMessageService(String name, int maxQueues, DataRunner dataRunner, ExecutorService pools){ this.init(name,dataRunner,new DataQueues(),maxQueues,pools); } private void init(String name, DataRunner dataRunner, DataQueues dataQueues,int maxQueues,ExecutorService pools) { this.dataManager=new DataManager(); this.dataRunner=dataRunner; dataManager.init(name,/*dataRunner,*/dataQueues,maxQueues,pools); } //结束 public void end() { dataManager.end(); } //添加 public void add(T o){ dataManager.add(new MessageBean(dataRunner,o)); if(dataManager.isStop()){ dataManager.setStop(false); dataManager.start(); } } //立即执行 public void addNow(T o){ dataManager.addNow(new MessageBean(dataRunner,o)); } //添加任务 并制定处理器 public void add(DataRunner dataRunner,T o){ dataManager.add(new MessageBean(dataRunner,o)); if(dataManager.isStop()){ dataManager.setStop(false); dataManager.start(); } } //立即执行 并制定处理器 public void addNow(DataRunner dataRunner,T o){ dataManager.addNow(new MessageBean(dataRunner,o)); } //查看当前队列长度 public int size(){ return dataManager.messageSize(); } //查看任务是否运行中 public boolean isStop(){ return dataManager.isStop(); } }

测试类

public static void main(String[] args) { 创建队列管理器 CommonMessageService commonMessageService= new CommonMessageService(); 添加分组 test 并添加工作线程实现类 commonMessageService.addGroup("test", new DataRunner<String>() { @Override public void execute(String messageBean) throws Exception { System.out.println(DateUtil.dateFormat()+" "+messageBean); Random random=new Random(); int i=random.nextInt(1); /*if(messageBean>5){ if(i%3==0){ throw new SysException("22222"); } }*/ Thread.sleep(1000); } },"test",0); // 测试 给test中添加 10条消息 for(int i=0;i<10;i++){ commonMessageService.add("test","消息 "+i); } }

下面是运行结果

欢迎交流沟通


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #JAVA #队列消费 #消息队列 #用户 #消息发送消费 #自研 #rpc调用等原版地址