123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104 |
- <?php
- namespace components;
- use PhpAmqpLib\Message\AMQPMessage;
- use PhpAmqpLib\Connection\AMQPStreamConnection;
- class PhpClient {
- private $connection;
- private $channel;
- private $callback_queue;
- private $response;
- private $corr_id;
- private $result = '';
- // CONST HOST = "172.30.118.225";
- CONST HOST = "192.168.74.1";
- // CONST HOST = "localhost";
- CONST PORT = 5673; //默认5672
- CONST USER = "guest"; //用户名
- CONST PASS = "guest";//密码
- public function __construct() {
- $this->connection = new AMQPStreamConnection(
- \Yii::$app->params['rabbithost'], self::PORT, self::USER, self::PASS); //建立连接
- $this->channel = $this->connection->channel();
- list($this->callback_queue, ,) = $this->channel->queue_declare(
- "", false, false, true, false);
- $this->channel->basic_consume(
- $this->callback_queue, '', false, false, false, false,
- array($this, 'on_response'));
- }
- public function on_response($rep) {
- if($rep->get('correlation_id') == $this->corr_id) {
- $this->result .= $rep->body;
- $this->response = $rep->body;
- }
- }
- public function call($n) {
- $this->response = null;
- $this->corr_id = uniqid();
- $msg = new AMQPMessage(
- (string) $n,
- array('correlation_id' => $this->corr_id,
- 'reply_to' => $this->callback_queue)
- );
- $this->channel->basic_publish($msg, '', 'queue'); //这里的queue是消息名称
- while($this->response != "end") {
- $this->channel->wait();
- }
- return $this->result;
- }
- public static function CallMq($n){
- $connection = new AMQPStreamConnection(
- self::HOST, self::PORT, self::USER, self::PASS); //建立连接
- $channel = $connection->channel();
- $channel->queue_declare('task_queue', false, true, false, false);
- $data=empty($n)?"Hello World!":$n;
- $msg = new AMQPMessage($data,
- array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
- );
- $channel->basic_publish($msg, '', 'task_queue');
- $channel->close();
- $connection->close();
- return true;
- }
- public static function CallUserMq($n){
- $connection = new AMQPStreamConnection(
- self::HOST, self::PORT, self::USER, self::PASS); //建立连接
- $channel = $connection->channel();
- $channel->queue_declare('login', false, true, false, false);
- $data=empty($n)?"Hello World!":$n;
- $msg = new AMQPMessage($data,
- array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
- );
- $channel->basic_publish($msg, '', 'login');
- $channel->close();
- $connection->close();
- return true;
- }
- }
|