irpas技术客

RabbitMQ延时队列实现(PHP)_王大爷~_php rabbitmq延时

irpas 2672

linux 下安装RabbitMQ 转载: https://blog.csdn.net/qq_39135287/article/details/95725385

本教程为 windows 示例: 转载: https://·/p/a6f21317722a

自测: 服务 + 延迟队列插件 (注意版本) RabbitMq Server 3.7.4 rabbitmq_delayed_message_exchange-3.8.0.ez [适用于3.7 ~ 3.8]

插件地址: https://github.rc1844.workers.dev/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez

下载完毕: 放在plugins目录

常用命令: 进入到RabbitMq Server 3.7.4 \ sbin 目录下

rabbitmq-service start // 启动 rabbitmq-service stop // 停止 rabbitmq-plugins enable rabbitmq_management // 启用管理界面 (web管理界面) rabbitmq-plugins enable rabbitmq_delayed_message_exchange // 启用延迟队列插件

安装完插件, 重新启动rabbitmq服务, 出现如下图所示, 即插件安装成功

PHP项目代码

protected $connection; //连接 protected $channel; //频道 protected $config = [ 'host' => 'localhost', 'port' => 5672, 'user' => 'guest', 'pass' => 'guest', 'vhost' => '/', ]; // 过期时间 const TIMEOUT_5_S = 5; // 5s private $exchange_logs = "logs"; private $exchange_direct = "direct"; private $exchange_delayed = "delayed"; private $queue_delayed = "delayedQueue"; CONST EXCHANGETYPE_FANOUT = "fanout"; CONST EXCHANGETYPE_DIRECT = "direct"; CONST EXCHANGETYPE_DELAYED = "x-delayed-message"; // 生命连接 protected function _initialize() { $this->connection = new AMQPStreamConnection($this->config['host'], $this->config['port'], $this->config['user'], $this->config['pass'], $this->config['vhost']); $this->channel = $this->connection->channel(); // 声明Exchange $this->channel->exchange_declare($this->exchange_delayed, self::EXCHANGETYPE_DELAYED, false, true, false,false,false,new AMQPTable(["x-delayed-type" => self::EXCHANGETYPE_DIRECT])); $this->channel->queue_declare($this->queue_delayed, false, true, false, false); $this->channel->queue_bind($this->queue_delayed, $this->exchange_delayed,$this->queue_delayed); } // 创建消息 , 发送消息第三行 会调用此方法 public function createMessageDelay($msg,$time) { $delayConfig = [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, 'application_headers' => new AMQPTable(['x-delay' => $time * 1000]) ]; $msg = new AMQPMessage($msg,$delayConfig); return $msg; } /** * delay send message 发送消息 */ // public function sendDelay($msg,$time = self::TIMEOUT_10_S) { public function sendDelay() { $msg = '555555'; $time = 10; $msg = $this->createMessageDelay($msg,$time); $this->channel->basic_publish($msg,$this->exchange_delayed,$this->queue_delayed); $this->channel->close(); $this->connection->close(); } /** * delay consum 监听 */ public function consumDelay(){ $callback = function($msg){ echo ' [x] ', $msg->body, "\n"; $this->channel->basic_ack($msg->delivery_info['delivery_tag'],false); }; $this->channel->basic_qos(null, 1, null); $this->channel->basic_consume($this->queue_delayed, '', false, false, false, false, $callback); echo ' [*] Waiting for logs. To exit press CTRL+C', "\n"; while (count($this->channel->callbacks)) { $this->channel->wait(); } $this->channel->close(); $this->connection->close(); }

测试类 1: 接收监听 2: 执行发送

ps:

mq修改完代码, 接收接听需要 重新监听

ps 有序发送

如下图, rabbitmq 发送数据是有序的, 先执行test, 在执行test2, 发现接收的顺序为: aaa, bbb 既aaa10分钟后被消费了, bbb才能被消费, 类似队列, 先进先出, 场景下单30分钟后取消

ps 无序发送

如下图, rabbitmq 发送数据是有序的, 先执行test, 在执行test2, 发现接收的顺序为: bbb, aaa 既aaa 5分钟后被消费了, bbb 被消费, 符合自定义延迟时间场景.

RabbitMQ常用的交换器类型有direct、topic、fanout、headers四种: Direct Exchange:见文知意,直连交换机意思是此交换机需要绑定一个队列,要求该消息与一个特定的路由键完全匹配。简单点说就是一对一的,点对点的发送。 Fanout Exchange:这种类型的交换机需要将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。简单点说就是发布订阅。 Topic Exchange:直接翻译的话叫做主题交换机,如果从用法上面翻译可能叫通配符交换机会更加贴切。这种交换机是使用通配符去匹配,路由到对应的队列。通配符有两种:"*" 、 “#”。需要注意的是通配符前面必须要加上".“符号。 *符号:有且只匹配一个词。比如 a.*可以匹配到"a.b”、“a.c”,但是匹配不了"a.b.c"。 #符号:匹配一个或多个词。比如"rabbit.#“既可以匹配到"rabbit.a.b”、“rabbit.a”,也可以匹配到"rabbit.a.b.c"。 Headers Exchange:这种交换机用的相对没这么多。它跟上面三种有点区别,它的路由不是用routingKey进行路由匹配,而是在匹配请求头中所带的键值进行路由。创建队列需要设置绑定的头部信息,有两种模式:全部匹配和部分匹配。如上图所示,交换机会根据生产者发送过来的头部信息携带的键值去匹配队列绑定的键值,路由到对应的队列。


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #PHP #rabbitmq延时 #Linux #下安装RabbitMQ转载 #Windows #示例转载