序
因为公司的需要服务都是用的阿里云相关的产品,最近自己工作中也涉及到了消息队列这一块的业务,索性自己也来从零开始对接阿里云的消息队列服务。
准备本着学习的前提,寻找是否免费的或者做活动的服务,能白嫖的就白嫖,果然被我找到了。
进入阿里云官方首页,找到精选活动->阿里云使用中心 点击进入????????2.进入页面搜索消息队列
????????3.? 具体队列的相关配置步骤可参考官方文档:快速入门概述 - 消息队列RabbitMQ版 - 阿里云
????????4. 本来Rocket版、Kafka版都想学习的,但只有rabbit版的免费,但也够意思了毕竟不要钱(注:虽然免费但后面还留了一个很大的坑等着踩呢)
开始????????1. 创建一个springboot项目 命名为:rabbitmq-aliyun
????????2.在application.yml中配置相关mq常量(常量可从前面自己创建的阿里云mq控制台获取)
server: port: 8080 aliyun: rabbitmq: accessKey: 密匙key accessKeySecret: 密匙密码 username: 静态用户名 password: 静态密码 vHost: 虚拟机名称 exchange: 交换机名称 exType: 交换机类型 queue: 队列名称 BindingKey: 路由key host: 介入点(公网接入点)? ? ? ? 注:本地测试必须使用公网接入点? ,但是我们使用的免费rabbitMq服务并没有公网接入点,只有VPC接入点
?所以自己按照官方教程一步一步实现,总是连接超时,代码检查了无数遍也没有找到问题所在。(官方教程也没有表明用哪一个接入点地址,进了这个大坑)
最后只能需求官方客户帮助:
本着,不花钱的原则,但是使用VPC接入点 还得购买 阿里云ecs服务,岂不是还得花更多的钱。
最后只能升级服务,并且选择支持公网
?
所以说,白嫖也并没有完全白嫖,还得花钱,要么买ecs服务,要么升配队列服务
3.创建配置数据映射对象?RabbitMqConfigDTO.class
@Configuration @ConfigurationProperties("aliyun.rabbitmq") @Data public class RabbitMqConfigDTO { /** * 账户密匙key */ private String accessKey; /** * 账户密匙 */ private String accessKeySecret; /** * 静态用户名 */ private String username; /** * 静态用户名密码 */ private String password; /** * 虚拟机名称 */ private String vHost; /** * 交换机名 */ private String exchange; /** * 交换机类型 */ private String exType; /** * 队列名 */ private String queue; /** * 绑定规则key */ private String BindingKey; /** * 接入点地址 */ private String host; }????????4. 创建spring工具类?SpringContextHolder.class 用于获取bean对象
public class SpringContextHolder implements ApplicationContextAware { @Autowired private static ApplicationContext applicationContext; public SpringContextHolder() { } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { SpringContextHolder.applicationContext = applicationContext; } public static ApplicationContext getApplicationContext() { assertApplicationContext(); return applicationContext; } public static <T> T getBean(String beanName) { assertApplicationContext(); return (T) applicationContext.getBean(beanName); } public static <T> T getBean(Class<T> requiredType) { assertApplicationContext(); return applicationContext.getBean(requiredType); } private static void assertApplicationContext() { if (applicationContext == null) { throw new RuntimeException("applicaitonContext属性为null,请检查是否注入了SpringContextHolder!"); } }????????5. 创建rabbitMq工具类??RabbitMqUtil.class
@Slf4j @Component public class RabbitMqUtil { @Autowired private RabbitMqConfigDTO rabbitMqConfigDTO; //第三步 建一个静态的本类 private static RabbitMqUtil rabbitMqUtil; //第四步 初始化 @PostConstruct public void init() { rabbitMqUtil = this; } /** * 创建队列连接 * @return */ public static Connection getRabbitConnection(){ ConnectionFactory factory = new ConnectionFactory(); //公网接入点 factory.setHost(rabbitMqUtil.rabbitMqConfigDTO.getHost()); //静态用户名 factory.setUsername(rabbitMqUtil.rabbitMqConfigDTO.getUsername()); //静态密码 factory.setPassword(rabbitMqUtil.rabbitMqConfigDTO.getPassword()); //自动恢复 factory.setAutomaticRecoveryEnabled(true); //网络恢复时间 factory.setNetworkRecoveryInterval(5000); //虚拟机名称 factory.setVirtualHost(rabbitMqUtil.rabbitMqConfigDTO.getVHost()); //端口 factory.setPort(5672); //连接超时时间 factory.setConnectionTimeout(30*100); //设置握手超时时间 factory.setHandshakeTimeout(300000000); factory.setShutdownTimeout(0); //创建连接 Connection connection = null; try { connection =factory.newConnection(); }catch (Exception e){ log.error("rabbitMq连接异常", e); } return connection; } /** * 创建队列通道 * @param connection * @return */ public static Channel getRabbitChannel(Connection connection){ Channel channel = null; try { channel = connection.createChannel(); String exchange = rabbitMqUtil.rabbitMqConfigDTO.getExchange(); channel.exchangeDeclare(exchange,rabbitMqUtil.rabbitMqConfigDTO.getExType(), true, false,false, null); channel.queueDeclare(rabbitMqUtil.rabbitMqConfigDTO.getQueue(), true, false, false, new HashMap<String, Object>()); channel.queueBind(rabbitMqUtil.rabbitMqConfigDTO.getQueue(), rabbitMqUtil.rabbitMqConfigDTO.getExchange(), rabbitMqUtil.rabbitMqConfigDTO.getBindingKey()); }catch (Exception e){ log.error("创建rabbitMq通道异常", e); } return channel; } }????????6.创建server接口类
public interface RabbitMqService { /** * 发送mq消息 * @return */ String sendMessage() throws IOException, TimeoutException; /** * 消费消息 * @return * @throws IOException * @throws TimeoutException */ String consumeMessage() throws IOException, TimeoutException; }????????7.创建实现类
@Service public class RabbitMqServiceImpl implements RabbitMqService { @Autowired private RabbitMqConfigDTO rabbitMqConfigDTO; @Override public String sendMessage() throws IOException { Connection connection = RabbitMqUtil.getRabbitConnection(); Channel channel = RabbitMqUtil.getRabbitChannel(connection); //开始发送消息 for(int i=0; i< 10 ; i++){ AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build(); channel.basicPublish(rabbitMqConfigDTO.getExchange(), "BindingKey", true, props, ("消息发送Body" + i).getBytes(StandardCharsets.UTF_8)); } connection.close(); return "消息发送成功"; } @Override public String consumeMessage() throws IOException, TimeoutException { Connection connection = RabbitMqUtil.getRabbitConnection(); Channel channel = RabbitMqUtil.getRabbitChannel(connection); String exchange = rabbitMqConfigDTO.getExchange(); channel.exchangeDeclare(exchange,rabbitMqConfigDTO.getExType(), true, false,false, null); channel.queueDeclare(rabbitMqConfigDTO.getQueue(), true, false, false, new HashMap<String, Object>()); channel.queueBind(rabbitMqConfigDTO.getQueue(), rabbitMqConfigDTO.getExchange(), rabbitMqConfigDTO.getBindingKey()); // 开始消费消息。 channel.basicConsume(rabbitMqConfigDTO.getQueue(), false, "ConsumerTag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //接收到的消息,进行业务逻辑处理。 System.out.println("Received: " + new String(body, StandardCharsets.UTF_8) + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId()); channel.basicAck(envelope.getDeliveryTag(), false); } }); connection.close(); return "消费成功"; } }?????????8.创建控制层
@RestController public class RabbitMqController { @Autowired private RabbitMqService rabbitMqService; @GetMapping("/sendMessage") public String sendMessage() throws IOException, TimeoutException { return rabbitMqService.sendMessage(); } @GetMapping("/consumeMessage") public String consumeMessage() throws IOException, TimeoutException { return rabbitMqService.consumeMessage(); } }????????9.项目整体结构
????????
?????10.完成启动项目
? ? ?11.点击获取源码
测试? 发送消息? ? ?2. 进入控制台查看?
?
?????????此时可以看到堆积10条消息,说明消息发送成功了
? ? ? ? 3. 消费消息
? ? ? ? ?4.再次进入控制台查看
????????????????堆积的消息已变为0说明消息已经被全部消费了
后序自己遇到不懂的,比如前面本地测试需要公网接入点的问题,百度了不少博客,也没有找到sprigboot 整合 阿里云 消息队列RabbitMQ版的博客已经解决方案,所以博主按自己的实际操作记录下来,希望对同样遇到问题的小伙伴有所帮助。
1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。 |
标签: #springboot #整合