# jz.inc.php
<?php
namespace rmqTest;
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Channel\AMQPChannel;
/**
* Class JzRabbitMQExample
* @package rmqTest
*/
class JzRabbitMQExample {
protected $channel;
protected $connection;
protected $exchange;
/**
* JzRabbitMQExample constructor.
* @param string $exchange
*/
function __construct($exchange = 'topic-jz'){
$this->connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$this->channel = $this->connection->channel();;
$this->exchange = $exchange;
$this->channel->exchange_declare($exchange, 'topic', false, false, false);
}
function close(){
$this->channel->close();
$this->channel->close();
}
}
/**
* Class JzRabbitMQExample
* @package rmqTest
*/
class JzProducer extends JzRabbitMQExample{
/**
* @param string $data
* @param string $routingKey
*/
function sendMessage($data, $routingKey){
$msg = new AMQPMessage($data);
$this->channel->basic_publish($msg, $this->exchange, $routingKey);
echo " [x] Sent ",$routingKey,':',$data," \n";
}
}
/**
* Class JzConsumer
*/
class JzConsumer extends JzRabbitMQExample{
/**
* @var array
*/
protected $queues = [
'search' => ["*.add", "*.edit", "*.delete"],
'points' => ["*.add", '*.delete'],
'user-count' => ["*.add", "*.delete"]
];
protected function declareQueue($queueName, $keys = []){
$this->channel->queue_declare($queueName, false, false, false, false);
if(isset($this->queues[$queueName])){
$keys = $this->queues[$queueName];
}
foreach($keys as $key){
$this->channel->queue_bind($queueName, $this->exchange, $key);
}
}
function receive($queueName, $keys) {
// 绑定队列
$this->declareQueue($queueName, $keys);
$callback = [$this, 'process'];
$this->channel->basic_consume($queueName, '', false, true, false, false, $callback);
while(count($this->channel->callbacks)) {
$this->channel->wait();
}
}
/**
* @param $msg
*/
function process($msg){
echo ' [x] ', $msg->body, "\n";
var_dump($msg->delivery_info['delivery_tag']);
echo "\n";
}
}