_connect = self::connect(); $this->_channel = $this->_connect->channel(); $this->_callback_queue = $queueName; } /** * [发送信息] * @author: libingke * @param string $body 消息主体 * @param string $routing_key * @return bool */ public function send($body, $routing_key) { $this->_corr_id = KeyHelper::getUniqueId('message_send'); $properties = [ //'content_type' => 'text/plain', 'correlation_id' => $this->_corr_id, 'reply_to' => $this->_callback_queue ]; if (is_string($body)) { $msg = new AMQPMessage( (string) $body, $properties); $this->_channel->basic_publish($msg, '', $routing_key); } return $this->_corr_id; } /** * [批量发送信息] * @author: libingke * @param $info * @param $queue */ public function batchSend($info, $queue) { foreach ($info as $str) { $this->_channel->batch_basic_publish($str, '', $queue); } $this->_channel->publish_batch(); } /** * [消费信息] todo * @author: libingke * @param string $queue 队列名称 * @param string $str * @return bool */ public function consume($queue) { $callback = function($msg) { echo " [x] Received ", $msg->body, "\n"; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; $this->_channel->basic_qos(null, 1, null); $this->_channel->basic_consume( $queue, '', false, false, false, false, $callback ); while(count($this->_channel->callbacks)) { $this->_channel->wait(); } return true; } /** * [批量消费信息] * @author: libingke * @param string $queue 队列名称 * @param string $arr * @return array 返回数据 */ public function batchConsume($queue, $arr) { } /** * [删除信息] * @author: libingke */ public function delete($message) { } }