connection = new AMQPStreamConnection( \Yii::$app->params['rabbithost'], self::PORT, self::USER, self::PASS); //建立连接 $this->channel = $this->connection->channel(); list($this->callback_queue, ,) = $this->channel->queue_declare( "", false, false, true, false); $this->channel->basic_consume( $this->callback_queue, '', false, false, false, false, array($this, 'on_response')); } public function on_response($rep) { if($rep->get('correlation_id') == $this->corr_id) { $this->result .= $rep->body; $this->response = $rep->body; } } public function call($n) { $this->response = null; $this->corr_id = uniqid(); $msg = new AMQPMessage( (string) $n, array('correlation_id' => $this->corr_id, 'reply_to' => $this->callback_queue) ); $this->channel->basic_publish($msg, '', 'queue'); //这里的queue是消息名称 while($this->response != "end") { $this->channel->wait(); } return $this->result; } public static function CallMq($n){ $connection = new AMQPStreamConnection( self::HOST, self::PORT, self::USER, self::PASS); //建立连接 $channel = $connection->channel(); $channel->queue_declare('task_queue', false, true, false, false); $data=empty($n)?"Hello World!":$n; $msg = new AMQPMessage($data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) ); $channel->basic_publish($msg, '', 'task_queue'); $channel->close(); $connection->close(); return true; } public static function CallUserMq($n){ $connection = new AMQPStreamConnection( self::HOST, self::PORT, self::USER, self::PASS); //建立连接 $channel = $connection->channel(); $channel->queue_declare('login', false, true, false, false); $data=empty($n)?"Hello World!":$n; $msg = new AMQPMessage($data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) ); $channel->basic_publish($msg, '', 'login'); $channel->close(); $connection->close(); return true; } }