irpas技术客

SpringBoot + Mqtt协议,实现多个主题订阅及消息推送功能_程序猿vs码农_java订阅多个主题

大大的周 1576

MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议。

mqtt 与 MQ 的区别: mqtt:一种通信协议,类似人类交谈中的汉语、英语、俄语中的一种语言规范 MQ:一种通信通道,也叫消息队列,类似人类交谈中的用电话、email、微信的一种通信方式

详细区别: 有三个基本概念:消息、消息协议、消息队列。

消息:信息的载体

消息协议:为了让消息发送者和消息接收者都能够明白消息所承载的信息(消息发送者需要知道如何构造消息;消息接收者需要知道如何解析消息),它们就需要按照一种统一的格式描述消息,这种统一的格式称之为消息协议。所以,有效的消息一定具有某一种格式;而没有格式的消息是没有意义的。

消息队列:消息从发送者到接收者的方式也有两种。一种为即时消息通讯,也就是说消息从一端发出后(消息发送者)立即就可以达到另一端(消息接收者),这种方式的具体实现就是我们已经介绍过的RPC(当然单纯的http通讯也满足这个定义);另一种为延迟消息通讯,即消息从某一端发出后,首先进入一个容器进行临时存储,当达到某种条件后,再由这个容器发送给另一端。 这个容器的一种具体实现就是消息队列,如RabbitMQ。

相关mqtt的理论东西,有兴趣的可以详情了解,下面我大概说下如何搭建服务。

1)添加pom依赖

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>

2)application.yml文件中添加配置信息 ,订阅主题有多重形式,大致分为这几类 2.1)具体某个主题 ;test1 2.2)具体多个主题,可以用逗号分隔;test1,test2 2.3)主题层级分隔符: / ,单层通配符: + ,单层通配符 “+” 只匹配主题的一层; test/+ ,可以匹配到test/开头的,且没有第三级主题下的所有主题 2.4)主题层级分隔符: / ,多层通配符: # ,多层通配符"#"是一个匹配主题中任意层次数的通配符; test/# ,可以匹配到test/开头的所有主题

spring: mqtt: subscribe: url: tcp://IP:PORT username: emqx password: public client: inid: mqttClientInputId outid: mqttClientOutputId default: topic: mqtt/+/test1,mqtt/+/test2 completionTimeout: 3000 mqtt订阅消息配置文件 MqttSubscribeConfig.java,如下 package com.mqtt.test.config; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import java.util.Arrays; import java.util.List; import java.util.Map; @Slf4j @Configuration @IntegrationComponentScan public class MqttSubscribeConfig { @Value("${spring.mqtt.subscribe.username}") private String username; @Value("${spring.mqtt.subscribe.password}") private String password; @Value("${spring.mqtt.subscribe.url}") private String hostUrl; // 订阅主题的客户端ID @Value("${spring.mqtt.subscribe.client.inid}") private String clientId; @Value("${spring.mqtt.subscribe.default.topic}") private String defaultTopic; @Value("${spring.mqtt.subscribe.completionTimeout}") private int completionTimeout ; //连接超时 @Bean public MqttConnectOptions getReceiverMqttConnectOptionsForSub(){ MqttConnectOptions mqttConnectOptions=new MqttConnectOptions(); mqttConnectOptions.setUserName(username); mqttConnectOptions.setPassword(password.toCharArray()); List<String> hostList = Arrays.asList(hostUrl.trim().split(",")); String[] serverURIs = new String[hostList.size()]; hostList.toArray(serverURIs); mqttConnectOptions.setServerURIs(serverURIs); mqttConnectOptions.setKeepAliveInterval(2); return mqttConnectOptions; } @Bean public MqttPahoClientFactory receiverMqttClientFactoryForSub() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getReceiverMqttConnectOptionsForSub()); return factory; } //接收通道 @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } //配置client,监听的topic @Bean public MessageProducer inbound() { List<String> topicList = Arrays.asList(defaultTopic.trim().split(",")); String[] topics = new String[topicList.size()]; topicList.toArray(topics); MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId,receiverMqttClientFactoryForSub(),topics); adapter.setCompletionTimeout(completionTimeout); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); return adapter; } //通过通道获取数据 @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); String msg = message.getPayload().toString(); Map<String,Object> msgMap = JSONObject.parseObject(msg); // 这里可以处理接收的数据 log.info("\n----------------------------START---------------------------\n" + "接收到订阅消息:\ntopic:" + topic + "\nmessage:" + msg + "\n-----------------------------END----------------------------"); }; } }

4)mqtt配置推送消息的服务 ,MqttSendConfig.java

package com.mqtt.test.config; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import java.util.Arrays; import java.util.List; @Slf4j @Configuration @IntegrationComponentScan public class MqttSendConfig { @Value("${spring.mqtt.subscribe.username}") private String username; @Value("${spring.mqtt.subscribe.password}") private String password; @Value("${spring.mqtt.subscribe.url}") private String hostUrl; // 推送消息的客户端ID @Value("${spring.mqtt.subscribe.client.outid}") private String clientId; @Value("${spring.mqtt.subscribe.default.topic}") private String defaultTopic; @Value("${spring.mqtt.subscribe.completionTimeout}") private int completionTimeout ; //连接超时 @Bean public MqttConnectOptions getReceiverMqttConnectOptionsForSend(){ MqttConnectOptions mqttConnectOptions=new MqttConnectOptions(); mqttConnectOptions.setUserName(username); mqttConnectOptions.setPassword(password.toCharArray()); List<String> hostList = Arrays.asList(hostUrl.trim().split(",")); String[] serverURIs = new String[hostList.size()]; hostList.toArray(serverURIs); mqttConnectOptions.setServerURIs(serverURIs); mqttConnectOptions.setKeepAliveInterval(2); return mqttConnectOptions; } @Bean public MqttPahoClientFactory receiverMqttClientFactoryForSend() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getReceiverMqttConnectOptionsForSend()); return factory; } @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, receiverMqttClientFactoryForSend()); messageHandler.setAsync(false); messageHandler.setDefaultTopic(defaultTopic); return messageHandler; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } }

5)mqtt推送服务的接口类,可以通过注入该接口调用消息推送功能,推送消息至指定主题

package com.mqtt.test.mqtt; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttGateway { /** * 发送信息到MQTT服务器 * * @param topic 主题 * @param message 消息主体 */ void publishMqttMessageWithTopic(String message, @Header(MqttHeaders.TOPIC) String topic); /** * 发送信息到MQTT服务器 * * @param topic 主题 * @param qos 对消息处理的几种机制。<br> 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。<br> * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。<br> * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 * @param message 消息主体 */ void publishMqttMessageWithTopic(String message, @Header(MqttHeaders.TOPIC) String topic,@Header(MqttHeaders.QOS) int qos); }

注意事项; mqtt主题订阅和mqtt消息推送的配置中客户端ID是不能一样的,如果一样的话推送消息会报错,无法成功推送消息


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #java订阅多个主题 #标准ISOIEC #PRF #它工作在 #MQTT