123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- <?php
- namespace common\logic\Amqp;
- use PhpAmqpLib\Message\AMQPMessage;
- class Message extends Connect
- {
- private $_connect;
- private $_channel;
- private $_callback_queue;
- private $_result;
- private $_response;
- private $_corr_id;
- public function __construct($queueName)
- {
- $this->_connect = self::connect();
- $this->_channel = $this->_connect->channel();
- $this->_callback_queue = $queueName;
- }
-
- public function send($body, $routing_key)
- {
- $properties = [
- 'content_type' => 'text/plain',
- 'correlation_id' => uniqid(),
- 'reply_to' => $this->_callback_queue
- ];
- if (is_string($body)) {
- $msg = new AMQPMessage( (string) $body, $properties);
- $this->_channel->basic_publish($msg, '', $routing_key);
- }
- return true;
- }
-
- public function batchSend($body, $routing_key = '')
- {
- }
-
- public function receive($queue)
- {
- $callback = function($msg) {
- echo $msg->get('correlation_id');
- echo " [x] Received ", $msg->body, "\n";
- $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
- };
- $this->_channel->basic_qos(null, 1, null);
- $this->_channel->basic_consume(
- $queue, '', false, false, false, false,
- $callback
- );
-
- return true;
- }
-
- public function batchReceive($queue, $arr)
- {
- }
-
- public function delete($message)
- {
- }
- }
|