php+RabbitMQ 3.8.16+简单延时队列实现

  1. 安装插件并重启RabbitMQ,参见:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
  2. 生产者实现(生产者不需要知道队列名称)(注意使用的是官方库”php-amqplib/php-amqplib”: “>=3.0”):
$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
$channel = $connection->channel();

$channel->exchange_declare($exchange, 'x-delayed-message', false, true, false, false, false, new AMQPTable([
 'x-delayed-type' => AMQPExchangeType::FANOUT,
]));

$headers = new AMQPTable(['x-delay' => $delay]);
// json_encode($data) 为数据
$message = new AMQPMessage(json_encode($data), ['delivery_mode' => 2]);
$message->set('application_headers', $headers);
$channel->basic_publish($message, $exchange);

3.消费者实现(消费者的队列名称是随意的):

$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
$channel = $connection->channel();

// 防止交换机不存在 $exchange 为交换机名称
$channel->exchange_declare($exchange, 'x-delayed-message', false, true, false, false, false, new AMQPTable([
            'x-delayed-type' => AMQPExchangeType::FANOUT,
        ]));

// $queue 为队列名称
$channel->queue_declare($queue, false, false, false, false, false, new AMQPTable([
            'x-dead-letter-exchange' => 'delayed',
        ]));

# echo " [*] Waiting for messages. To exit press CTRL+C\n";

// 绑定交换机和队列
$channel->queue_bind($queue, $exchange);

        /*
    queue: Queue from where to get the messages
    consumer_tag: Consumer identifier
    no_local: Don't receive messages published by this consumer.
    no_ack: If set to true, automatic acknowledgement mode will be used by this consumer. See https://www.rabbitmq.com/confirms.html for details.
    exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
    nowait:
    callback: A PHP Callback
        function (AMQPMessage $message) {
            $headers = $message->get('application_headers');
            $nativeData = $headers->getNativeData();
            var_dump($nativeData['x-delay']);
            var_dump($message->getBody());
            $message->ack(); # 必须ack
        }
    */
// $callback 为处理数据的 callable
$channel->basic_consume($queue, '', false, false, false, false, $callback);

        register_shutdown_function(function ($channel, $connection) {
            $channel->close();
            $connection->close();
        }, $channel, $connection);

//循环监听
        while (count($channel->callbacks)) {
            try {
                $channel->wait(null, false, rand(33, 44));
            } catch (AMQPTimeoutException $e) {
            } catch (Throwable $e) {
                echo $e->getMessage(), "\n";
            }

        }