irpas技术客

java操作rabbitmq消费者示例_luffy5459_java rabbitmq 消费

网络 2224

? ? rabbitmq作为消息队列,在实际应用中很常见,生产者将消息发送到某个队列,消费者消费这个队列。

? ? ?消息在队列中,消费者要消费,需要监听队列,简单的来说,就是注册一个方法到消息通道,这个方法就会在有消息的时候执行。

? ? ?下面通过java来操作rabbitmq,给出代码示例。

? ? ? 这里介绍三种消费者示例方法

? ? ? 1、最简单的注册。

? ? ? ? ? ? 直接通过channel.basicConsume()设置callback。

? ? ? 2、模拟一个队列消费者,启动线程。

? ? ? ? ? ? ?自定义一个消费者,并且将消费者当做一个线程启动。

? ? ? ?以上只需要amqp-client依赖?

<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.4.3</version> </dependency>

? ? ? 3、利用springboot对amqp的支持,通过自定义MessageListener然后,绑定到Container。

? ? ? ? ? ? ?这是最常见的做法,自定义监听器,设置消息处理方法。将监听器注入消息监听容器,最后启动容器。

? ? ?这里需要加入spring-boot-starter-amqp依赖。

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.1.4.RELEASE</version> </dependency>

? ? 下面给出以上介绍的三种消费者示例代码:

? ? ? 1、这个做法比较原始,也是最容易看懂的。

package com.xxx.huali.hualitest.amqp; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; public class SimpleConsumer { private static final String host = "192.168.226.100"; private static final String username = "huali"; private static final String password = "huali"; private static final String queue_name1 = "activate"; private static final String exchange_name = "core.down.topic"; public static void main(String[] args) { Connection conn = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setVirtualHost("/mec"); //connection conn = factory.newConnection(); //channel channel = conn.createChannel(); //queue channel.queueDeclare(queue_name1, true, false, false, null); //exchange channel.exchangeDeclare(exchange_name, "topic",true); channel.queueBind(queue_name1, exchange_name, "TOPIC.*.*.*.*"); Consumer callback = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException { System.out.println("received message -> "+new String(body)); } }; channel.basicConsume(queue_name1, true, callback); } catch (Exception e) { e.printStackTrace(); } } }

? ? ? 2、这种方式在第一种的基础上,增加了线程,启动线程,设置监听,有点像第三种方式。

package com.xxx.huali.hualitest.amqp; import java.io.IOException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.ShutdownSignalException; public class QueueConsumer implements Consumer,Runnable{ private static final String host = "192.168.226.100"; private static final String username = "huali"; private static final String password = "huali"; private static final String queue_name1 = "activate"; private static final String exchange_name = "core.down.topic"; Connection conn = null; Channel channel = null; public QueueConsumer(){ try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setVirtualHost("/mec"); //connection conn = factory.newConnection(); //channel channel = conn.createChannel(); //queue channel.queueDeclare(queue_name1, true, false, false, null); //exchange channel.exchangeDeclare(exchange_name, "topic",true); channel.queueBind(queue_name1, exchange_name, "TOPIC.*.*.*.*"); } catch (Exception e) { e.printStackTrace(); } } public void close() throws IOException{ try { channel.close(); } catch (Exception e) { e.printStackTrace(); } this.conn.close(); } @Override public void run() { try { channel.basicConsume(queue_name1, true, this); } catch (IOException e) { e.printStackTrace(); } } @Override public void handleConsumeOk(String consumerTag) {} @Override public void handleCancelOk(String consumerTag) {} @Override public void handleCancel(String consumerTag) throws IOException {} @Override public void handleDelivery(String consumerTag, Envelope env, BasicProperties props, byte[] body) throws IOException { System.out.println("client received msg -> "+new String(body)); } @Override public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {} @Override public void handleRecoverOk(String consumerTag) {} }

? ? 启动主函数:

package com.xxx.huali.hualitest.amqp; public class ConsumerMain { public static void main(String[] args) { QueueConsumer consumer = new QueueConsumer(); Thread thread = new Thread(consumer); thread.start(); } }

? ? ? 3、 第三种方式需要借助spring-amqp的支持。它在实际开发中最常见,在springboot中,ConnectionFactory都直接配置了,连代码都不需要码了,我们只需要配置一个消息监听器就可以了。

package com.xxx.springbootamqp.test; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; public class MyMessageListener implements MessageListener{ @Override public void onMessage(Message message) { System.out.println("received message -> "+new String(message.getBody())); } }

启动主函数:

package com.xxx.springbootamqp.test; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; public class ConsumerMain { private static final String host = "192.168.226.100"; private static final String username = "huali"; private static final String password = "huali"; private static final String queue_name1 = "activate"; public static void main(String[] args) { try { CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setVirtualHost("/mec"); SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(factory); container.setMessageListener(new MyMessageListener()); container.setQueueNames(queue_name1); container.start(); } catch (Exception e) { e.printStackTrace(); } } }

? ? 这段代码严格来说,并不完整,没有声明队列,交换机,以及队列和交换机的绑定关系,因为消费者一旦绑定了交换机和队列的关系,后面就不需要重复设置了,这个关系就在rabbitmq中形成了,只要不手动删除,他自己不会改变。 因为这是第三个示例,前面的两个示例已经对队列和交换机做了设置和绑定。

? ? 这里需要启动容器,类似第二种的启动线程,其实这里也是利用了线程池,算是第二种方式的增强版。而且消费者封装成了BlockingQueueConsumer,更加符合队列消费者的语义。

? ? 有一点需要说明的是,这里采用的队列是和交换机进行绑定的,而交换机的类型是主题交换机,消息生产者只需要将消息发送到交换机对应的路由key上即可。也就是routingKey。??

channel.basicPublish(exchange_name, "TOPIC.4.128.1.a1OHmhQSyrxMEC_1", null, message.getBytes());

? ? 代码里面的?TOPIC.4.128.1.a1OHmhQSyrxMEC_1就是routingKey。

? ? ?消费者自己绑定主题交换机和队列,一个消费者,可以申请一个队列,两个消费者申请两个队列,这样,消息就路由到了多个消费者那里,而且互不干扰。

//queue channel.queueDeclare(queue_name1, true, false, false, null); //exchange channel.exchangeDeclare(exchange_name, "topic",true); channel.queueBind(queue_name1, exchange_name, "TOPIC.*.*.*.*");

? ? 最后附上一个简单的生产者代码示例:

package com.xxx.huali.hualitest.amqp; import java.io.IOException; import java.util.Date; import java.util.concurrent.TimeoutException; import com.alibaba.fastjson.JSONObject; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Publisher { private static final String host = "192.168.226.100"; private static final String username = "huali"; private static final String password = "huali"; private static final String exchange_name = "core.down.topic"; public static void main(String[] args) { Connection conn = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setVirtualHost("/mec"); //connection conn = factory.newConnection(); //channel channel = conn.createChannel(); //exchange channel.exchangeDeclare(exchange_name, "topic",true); JSONObject object = new JSONObject(); String device_id = "a1OHmhQSyrxMEC_1"; object.put("device_id", device_id); object.put("opt_type", 128); JSONObject data = new JSONObject(); data.put("auzcode_devname", device_id); data.put("regist_result", 1); long time_stamp = new Date().getTime(); object.put("data", data); object.put("time_stamp", time_stamp); String message = object.toJSONString(); //for(int i=0;i<5;i++) { // System.out.println("i="+i); channel.basicPublish(exchange_name, "TOPIC.4.128.1.a1OHmhQSyrxMEC_1", null, message.getBytes()); //} System.out.println("done"); } catch (Exception e) { e.printStackTrace(); }finally { try { channel.close(); conn.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } } }

? ? 生产者代码很简单,最关键的一句就是channel.basicPublish()那句发送消息代码。它只关心发送到哪个主题,哪个路由上,不关心队列。


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #JAVA #rabbitmq #消费