应用场景
http://www.rabbitmq.com/tutorials/tutorial-five-php.html
Amqp协议
php客户端
http://www.rabbitmq.com/queues.html
有效期 http://www.rabbitmq.com/ttl.html
安装教程
1 2 3 4 |
yum install erlang --enablerepo=epel wget https://github.com/rabbitmq/rabbitmq-server/releases/download/rabbitmq_v3_5_8/rabbitmq-server-3.5.8-1.noarch.rpm rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc yum install rabbitmq-server-3.5.8-1.noarch.rpm |
Web插件
1 2 |
rabbitmq-plugins enable rabbitmq_management rabbitmqctl change_password guest guest |
http://ip:15672/
guest guest
持久化
服务重启时, 是否能恢复队列中的数据.
- Exchange 持久 durable=True
- Queue 持久 durable=True
- Message 持久 delivery_mode = 2,
使用示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# jz.inc.php <?php namespace rmqTest; require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Channel\AMQPChannel; /** * Class JzRabbitMQExample * @package rmqTest */ class JzRabbitMQExample { protected $channel; protected $connection; protected $exchange; /** * JzRabbitMQExample constructor. * @param string $exchange */ function __construct($exchange = 'topic-jz'){ $this->connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $this->channel = $this->connection->channel();; $this->exchange = $exchange; $this->channel->exchange_declare($exchange, 'topic', false, false, false); } function close(){ $this->channel->close(); $this->channel->close(); } } /** * Class JzRabbitMQExample * @package rmqTest */ class JzProducer extends JzRabbitMQExample{ /** * @param string $data * @param string $routingKey */ function sendMessage($data, $routingKey){ $msg = new AMQPMessage($data); $this->channel->basic_publish($msg, $this->exchange, $routingKey); echo " [x] Sent ",$routingKey,':',$data," \n"; } } /** * Class JzConsumer */ class JzConsumer extends JzRabbitMQExample{ /** * @var array */ protected $queues = [ 'search' => ["*.add", "*.edit", "*.delete"], 'points' => ["*.add", '*.delete'], 'user-count' => ["*.add", "*.delete"] ]; protected function declareQueue($queueName, $keys = []){ $this->channel->queue_declare($queueName, false, false, false, false); if(isset($this->queues[$queueName])){ $keys = $this->queues[$queueName]; } foreach($keys as $key){ $this->channel->queue_bind($queueName, $this->exchange, $key); } } function receive($queueName, $keys) { // 绑定队列 $this->declareQueue($queueName, $keys); $callback = [$this, 'process']; $this->channel->basic_consume($queueName, '', false, true, false, false, $callback); while(count($this->channel->callbacks)) { $this->channel->wait(); } } /** * @param $msg */ function process($msg){ echo ' [x] ', $msg->body, "\n"; var_dump($msg->delivery_info['delivery_tag']); echo "\n"; } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
<?php # jz-publish.php namespace rmqTest; require_once __DIR__ . '/jz.inc.php'; $routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info'; $data = implode(' ', array_slice($argv, 2)); if(empty($data)) $data = "Hello World!"; $mq = new JzProducer(); $mq->sendMessage($data, $routing_key); $mq->close(); |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
<?php # jz-receive.php namespace rmqTest; require_once __DIR__ . '/jz.inc.php'; $queueName = isset($argv[1]) ? $argv[1] : ''; $keys = array_slice($argv, 2); if(empty($queueName) && empty($keys)) { file_put_contents('php://stderr', "Usage: $argv[0] queueName [binding_key]\n"); exit(1); } echo ' [*] Waiting for logs. To exit press CTRL+C', "\n"; $mq = new JzConsumer(); $mq->receive($queueName, $keys); $mq->close(); |
1 2 3 4 5 6 |
# cli php jz-receive.php search php jz-receive.php use-count php jz-publish.php goods.edit 2 php jz-publish.php news.add 5 |