irpas技术客

Springboot 整合 阿里云消息队列RabbitMQ版服务_君燕尾

未知 7462

因为公司的需要服务都是用的阿里云相关的产品,最近自己工作中也涉及到了消息队列这一块的业务,索性自己也来从零开始对接阿里云的消息队列服务。

准备

本着学习的前提,寻找是否免费的或者做活动的服务,能白嫖的就白嫖,果然被我找到了。

进入阿里云官方首页,找到精选活动->阿里云使用中心 点击进入

????????2.进入页面搜索消息队列

????????3.? 具体队列的相关配置步骤可参考官方文档:快速入门概述 - 消息队列RabbitMQ版 - 阿里云

????????4. 本来Rocket版、Kafka版都想学习的,但只有rabbit版的免费,但也够意思了毕竟不要钱(注:虽然免费但后面还留了一个很大的坑等着踩呢)

开始

????????1. 创建一个springboot项目 命名为:rabbitmq-aliyun

????????2.在application.yml中配置相关mq常量(常量可从前面自己创建的阿里云mq控制台获取)

server: port: 8080 aliyun: rabbitmq: accessKey: 密匙key accessKeySecret: 密匙密码 username: 静态用户名 password: 静态密码 vHost: 虚拟机名称 exchange: 交换机名称 exType: 交换机类型 queue: 队列名称 BindingKey: 路由key host: 介入点(公网接入点)

? ? ? ? 注:本地测试必须使用公网接入点? ,但是我们使用的免费rabbitMq服务并没有公网接入点,只有VPC接入点

?所以自己按照官方教程一步一步实现,总是连接超时,代码检查了无数遍也没有找到问题所在。(官方教程也没有表明用哪一个接入点地址,进了这个大坑)

最后只能需求官方客户帮助:

本着,不花钱的原则,但是使用VPC接入点 还得购买 阿里云ecs服务,岂不是还得花更多的钱。

最后只能升级服务,并且选择支持公网

?

所以说,白嫖也并没有完全白嫖,还得花钱,要么买ecs服务,要么升配队列服务

3.创建配置数据映射对象?RabbitMqConfigDTO.class

@Configuration @ConfigurationProperties("aliyun.rabbitmq") @Data public class RabbitMqConfigDTO { /** * 账户密匙key */ private String accessKey; /** * 账户密匙 */ private String accessKeySecret; /** * 静态用户名 */ private String username; /** * 静态用户名密码 */ private String password; /** * 虚拟机名称 */ private String vHost; /** * 交换机名 */ private String exchange; /** * 交换机类型 */ private String exType; /** * 队列名 */ private String queue; /** * 绑定规则key */ private String BindingKey; /** * 接入点地址 */ private String host; }

????????4. 创建spring工具类?SpringContextHolder.class 用于获取bean对象

public class SpringContextHolder implements ApplicationContextAware { @Autowired private static ApplicationContext applicationContext; public SpringContextHolder() { } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { SpringContextHolder.applicationContext = applicationContext; } public static ApplicationContext getApplicationContext() { assertApplicationContext(); return applicationContext; } public static <T> T getBean(String beanName) { assertApplicationContext(); return (T) applicationContext.getBean(beanName); } public static <T> T getBean(Class<T> requiredType) { assertApplicationContext(); return applicationContext.getBean(requiredType); } private static void assertApplicationContext() { if (applicationContext == null) { throw new RuntimeException("applicaitonContext属性为null,请检查是否注入了SpringContextHolder!"); } }

????????5. 创建rabbitMq工具类??RabbitMqUtil.class

@Slf4j @Component public class RabbitMqUtil { @Autowired private RabbitMqConfigDTO rabbitMqConfigDTO; //第三步 建一个静态的本类 private static RabbitMqUtil rabbitMqUtil; //第四步 初始化 @PostConstruct public void init() { rabbitMqUtil = this; } /** * 创建队列连接 * @return */ public static Connection getRabbitConnection(){ ConnectionFactory factory = new ConnectionFactory(); //公网接入点 factory.setHost(rabbitMqUtil.rabbitMqConfigDTO.getHost()); //静态用户名 factory.setUsername(rabbitMqUtil.rabbitMqConfigDTO.getUsername()); //静态密码 factory.setPassword(rabbitMqUtil.rabbitMqConfigDTO.getPassword()); //自动恢复 factory.setAutomaticRecoveryEnabled(true); //网络恢复时间 factory.setNetworkRecoveryInterval(5000); //虚拟机名称 factory.setVirtualHost(rabbitMqUtil.rabbitMqConfigDTO.getVHost()); //端口 factory.setPort(5672); //连接超时时间 factory.setConnectionTimeout(30*100); //设置握手超时时间 factory.setHandshakeTimeout(300000000); factory.setShutdownTimeout(0); //创建连接 Connection connection = null; try { connection =factory.newConnection(); }catch (Exception e){ log.error("rabbitMq连接异常", e); } return connection; } /** * 创建队列通道 * @param connection * @return */ public static Channel getRabbitChannel(Connection connection){ Channel channel = null; try { channel = connection.createChannel(); String exchange = rabbitMqUtil.rabbitMqConfigDTO.getExchange(); channel.exchangeDeclare(exchange,rabbitMqUtil.rabbitMqConfigDTO.getExType(), true, false,false, null); channel.queueDeclare(rabbitMqUtil.rabbitMqConfigDTO.getQueue(), true, false, false, new HashMap<String, Object>()); channel.queueBind(rabbitMqUtil.rabbitMqConfigDTO.getQueue(), rabbitMqUtil.rabbitMqConfigDTO.getExchange(), rabbitMqUtil.rabbitMqConfigDTO.getBindingKey()); }catch (Exception e){ log.error("创建rabbitMq通道异常", e); } return channel; } }

????????6.创建server接口类

public interface RabbitMqService { /** * 发送mq消息 * @return */ String sendMessage() throws IOException, TimeoutException; /** * 消费消息 * @return * @throws IOException * @throws TimeoutException */ String consumeMessage() throws IOException, TimeoutException; }

????????7.创建实现类

@Service public class RabbitMqServiceImpl implements RabbitMqService { @Autowired private RabbitMqConfigDTO rabbitMqConfigDTO; @Override public String sendMessage() throws IOException { Connection connection = RabbitMqUtil.getRabbitConnection(); Channel channel = RabbitMqUtil.getRabbitChannel(connection); //开始发送消息 for(int i=0; i< 10 ; i++){ AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build(); channel.basicPublish(rabbitMqConfigDTO.getExchange(), "BindingKey", true, props, ("消息发送Body" + i).getBytes(StandardCharsets.UTF_8)); } connection.close(); return "消息发送成功"; } @Override public String consumeMessage() throws IOException, TimeoutException { Connection connection = RabbitMqUtil.getRabbitConnection(); Channel channel = RabbitMqUtil.getRabbitChannel(connection); String exchange = rabbitMqConfigDTO.getExchange(); channel.exchangeDeclare(exchange,rabbitMqConfigDTO.getExType(), true, false,false, null); channel.queueDeclare(rabbitMqConfigDTO.getQueue(), true, false, false, new HashMap<String, Object>()); channel.queueBind(rabbitMqConfigDTO.getQueue(), rabbitMqConfigDTO.getExchange(), rabbitMqConfigDTO.getBindingKey()); // 开始消费消息。 channel.basicConsume(rabbitMqConfigDTO.getQueue(), false, "ConsumerTag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //接收到的消息,进行业务逻辑处理。 System.out.println("Received: " + new String(body, StandardCharsets.UTF_8) + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId()); channel.basicAck(envelope.getDeliveryTag(), false); } }); connection.close(); return "消费成功"; } }

?????????8.创建控制层

@RestController public class RabbitMqController { @Autowired private RabbitMqService rabbitMqService; @GetMapping("/sendMessage") public String sendMessage() throws IOException, TimeoutException { return rabbitMqService.sendMessage(); } @GetMapping("/consumeMessage") public String consumeMessage() throws IOException, TimeoutException { return rabbitMqService.consumeMessage(); } }

????????9.项目整体结构

????????

?????10.完成启动项目

? ? ?11.点击获取源码

测试? 发送消息

? ? ?2. 进入控制台查看?

?

?????????此时可以看到堆积10条消息,说明消息发送成功了

? ? ? ? 3. 消费消息

? ? ? ? ?4.再次进入控制台查看

????????????????堆积的消息已变为0说明消息已经被全部消费了

后序

自己遇到不懂的,比如前面本地测试需要公网接入点的问题,百度了不少博客,也没有找到sprigboot 整合 阿里云 消息队列RabbitMQ版的博客已经解决方案,所以博主按自己的实际操作记录下来,希望对同样遇到问题的小伙伴有所帮助。


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #springboot #整合