connection = new AMQPStreamConnection( self::HOST, self::PORT, self::USER, self::PASS ); $this->channel = $this->connection->channel(); list($this->callback_queue, ,) = $this->channel->queue_declare( '', false, false, true, false ); $this->channel->basic_consume( $this->callback_queue, '', false, false, false, false, array($this, 'on_response') ); } /** * [发送信息] * @author: libingke * @param string $body 消息主体 * @param string $routing_key * @return array 返回数据 */ public function send($body, $routing_key = 'task_queue') { $properties = [ 'correlation_id' => uniqid(), 'reply_to' => $this->callback_queue ]; if (is_string($body)) { $msg = new AMQPMessage( (string) $body,$properties); $this->result = $this->channel->basic_publish($msg, '', $routing_key); } $this->channel->close(); $this->connection->close(); return $this->result; } /** * [批量发送信息] * @author: libingke * @param string $body 消息主体 * @param string $routing_key * @return array 返回数据 */ public function batchSend($body, $routing_key = '') { $properties = [ 'correlation_id' => uniqid(), 'reply_to' => $this->callback_queue ]; if (is_array($body)) { $msg = new AMQPMessage((string) $body, $properties); $this->result = $this->channel->batch_basic_publish($msg, '', $routing_key); } $this->channel->close(); $this->connection->close(); return $this->result; } /** * [消费信息] * @author: libingke * @param string $queue 队列名称 * @param string $str * @return array 返回数据 */ public function receive($queue, $str) { $callback = function($msg) { echo " [x] Received ", $msg->body, "\n"; }; $this->result = $this->channel->basic_consume($queue, '', false, true, false, false, $callback); while(count($this->channel->callbacks)) { $this->channel->wait(); } return $this->result; } /** * [批量消费信息] * @author: libingke * @param string $queue 队列名称 * @param string $arr * @return array 返回数据 */ public function batchReceive($queue, $arr) { $callback = function($msg) { echo " [x] Received ", $msg->body, "\n"; }; $this->result = $this->channel->basic_consume($queue, '', false, true, false, false, $callback); while(count($this->channel->callbacks)) { $this->channel->wait(); } return $this->result; } /** * [删除信息] * @author: libingke */ public function delete() { } public function on_response($rep) { if($rep->get('correlation_id') == $this->corr_id) { $this->result .= $rep->body; $this->response = $rep->body; } } public static function CallMq($n) { $connection = new AMQPStreamConnection( self::HOST, self::PORT, self::USER, self::PASS ); $channel = $connection->channel(); $channel->queue_declare('task_queue', false, true, false, false); $data=empty($n)?"Hello World!":$n; $msg = new AMQPMessage($data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) ); $channel->basic_publish($msg, '', 'task_queue'); $channel->close(); $connection->close(); return true; } public static function CallUserMq($n) { $connection = new AMQPStreamConnection( self::HOST, self::PORT, self::USER, self::PASS ); $channel = $connection->channel(); $channel->queue_declare('login', false, true, false, false); $data=empty($n)?"Hello World!":$n; $msg = new AMQPMessage($data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) ); $channel->basic_publish($msg, '', 'login'); $channel->close(); $connection->close(); return true; } }