_connect = self::connect(); $this->_channel = $this->_connect->channel(); $this->_callback_queue = $queueName; } /** * [发送信息] * @author: libingke * @param string $body 消息主体 * @param string $routing_key * @return bool */ public function send($body, $routing_key) { $properties = [ 'content_type' => 'text/plain', 'correlation_id' => uniqid(), 'reply_to' => $this->_callback_queue ]; if (is_string($body)) { $msg = new AMQPMessage( (string) $body, $properties); $this->_channel->basic_publish($msg, '', $routing_key); } return true; } /** * [批量发送信息] * @author: libingke * @param string $body 消息主体 * @param string $routing_key * @return array 返回数据 */ public function batchSend($body, $routing_key = '') { } /** * [消费信息] * @author: libingke * @param string $queue 队列名称 * @param string $str * @return bool */ public function receive($queue) { $callback = function($msg) { echo $msg->get('correlation_id'); echo " [x] Received ", $msg->body, "\n"; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; $this->_channel->basic_qos(null, 1, null); $this->_channel->basic_consume( $queue, '', false, false, false, false, $callback ); /*while(count($this->_channel->callbacks)) { $this->_channel->wait(); }*/ return true; } /** * [批量消费信息] * @author: libingke * @param string $queue 队列名称 * @param string $arr * @return array 返回数据 */ public function batchReceive($queue, $arr) { } /** * [删除信息] * @author: libingke */ public function delete($message) { } }