123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- <?php
- namespace common\logic\Amqp;
- use common\helpers\KeyHelper;
- use PhpAmqpLib\Message\AMQPMessage;
- use Yii;
- /**
- * 处理消息体逻辑
- * Class Message
- * @package common\logic\MQMessage
- * @author libingke
- */
- class Message extends Connect
- {
- private $_connect;
- private $_channel;
- private $_callback_queue;
- private $_response;
- private $_corr_id;
- public function __construct($queueName = '')
- {
- $this->_connect = self::connect();
- $this->_channel = $this->_connect->channel();
- $this->_callback_queue = $queueName;
- }
- /**
- * [发送信息]
- * @author: libingke
- * @param string $body 消息主体
- * @param string $routing_key
- * @return bool
- */
- public function send($body, $routing_key)
- {
- $this->_corr_id = KeyHelper::getUniqueId('message_send');
- $properties = [
- //'content_type' => 'text/plain',
- 'correlation_id' => $this->_corr_id,
- 'reply_to' => $this->_callback_queue
- ];
- if (is_string($body)) {
- $msg = new AMQPMessage( (string) $body, $properties);
- $this->_channel->basic_publish($msg, '', $routing_key);
- }
- return $this->_corr_id;
- }
- /**
- * [批量发送信息]
- * @author: libingke
- * @param $info
- * @param $queue
- */
- public function batchSend($info, $queue)
- {
- foreach ($info as $str) {
- $this->_channel->batch_basic_publish($str, '', $queue);
- }
- $this->_channel->publish_batch();
- }
- /**
- * [消费信息] todo
- * @author: libingke
- * @param string $queue 队列名称
- * @param string $str
- * @return bool
- */
- public function consume($queue)
- {
- $callback = function($msg) {
- echo " [x] Received ", $msg->body, "\n";
- $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
- };
- $this->_channel->basic_qos(null, 1, null);
- $this->_channel->basic_consume(
- $queue, '', false, false, false, false,
- $callback
- );
- while(count($this->_channel->callbacks)) {
- $this->_channel->wait();
- }
- return true;
- }
- /**
- * [批量消费信息]
- * @author: libingke
- * @param string $queue 队列名称
- * @param string $arr
- * @return array 返回数据
- */
- public function batchConsume($queue, $arr)
- {
- }
- /**
- * [删除信息]
- * @author: libingke
- */
- public function delete($message)
- {
- }
- }
|