123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207 |
- <?php
- namespace common\logic\MQMessage;
- use PhpAmqpLib\Message\AMQPMessage;
- use PhpAmqpLib\Connection\AMQPStreamConnection;
- class Message extends Config
- {
- private $connection;
- private $channel;
- private $callback_queue;
- private $response = null;
- private $corr_id;
- private $result = '';
- public function __construct()
- {
- $this->connection = new AMQPStreamConnection(
- self::HOST,
- self::PORT,
- self::USER,
- self::PASS
- );
- $this->channel = $this->connection->channel();
- list($this->callback_queue, ,) = $this->channel->queue_declare(
- '', false, false, true, false
- );
- $this->channel->basic_consume(
- $this->callback_queue, '', false, false, false, false,
- array($this, 'on_response')
- );
- }
-
- public function send($body, $routing_key = 'task_queue')
- {
- $properties = [
- 'correlation_id' => uniqid(),
- 'reply_to' => $this->callback_queue
- ];
- if (is_string($body)) {
- $msg = new AMQPMessage( (string) $body,$properties);
- $this->result = $this->channel->basic_publish($msg, '', $routing_key);
- }
- $this->channel->close();
- $this->connection->close();
- return $this->result;
- }
-
- public function batchSend($body, $routing_key = '')
- {
- $properties = [
- 'correlation_id' => uniqid(),
- 'reply_to' => $this->callback_queue
- ];
- if (is_array($body)) {
- $msg = new AMQPMessage((string) $body, $properties);
- $this->result = $this->channel->batch_basic_publish($msg, '', $routing_key);
- }
- $this->channel->close();
- $this->connection->close();
- return $this->result;
- }
-
- public function receive($queue, $str)
- {
- $callback = function($msg) {
- echo " [x] Received ", $msg->body, "\n";
- };
- $this->result = $this->channel->basic_consume($queue, '', false, true, false, false, $callback);
- while(count($this->channel->callbacks)) {
- $this->channel->wait();
- }
- return $this->result;
- }
-
- public function batchReceive($queue, $arr)
- {
- $callback = function($msg) {
- echo " [x] Received ", $msg->body, "\n";
- };
- $this->result = $this->channel->basic_consume($queue, '', false, true, false, false, $callback);
- while(count($this->channel->callbacks)) {
- $this->channel->wait();
- }
- return $this->result;
- }
-
- public function delete()
- {
- }
- public function on_response($rep)
- {
- if($rep->get('correlation_id') == $this->corr_id) {
- $this->result .= $rep->body;
- $this->response = $rep->body;
- }
- }
- public static function CallMq($n)
- {
- $connection = new AMQPStreamConnection(
- self::HOST, self::PORT, self::USER, self::PASS
- );
- $channel = $connection->channel();
- $channel->queue_declare('task_queue', false, true, false, false);
- $data=empty($n)?"Hello World!":$n;
- $msg = new AMQPMessage($data,
- array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
- );
- $channel->basic_publish($msg, '', 'task_queue');
- $channel->close();
- $connection->close();
- return true;
- }
- public static function CallUserMq($n)
- {
- $connection = new AMQPStreamConnection(
- self::HOST, self::PORT, self::USER, self::PASS
- );
- $channel = $connection->channel();
- $channel->queue_declare('login', false, true, false, false);
- $data=empty($n)?"Hello World!":$n;
- $msg = new AMQPMessage($data,
- array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
- );
- $channel->basic_publish($msg, '', 'login');
- $channel->close();
- $connection->close();
- return true;
- }
- }
|