PhpClient.php 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. <?php
  2. namespace components;
  3. use PhpAmqpLib\Message\AMQPMessage;
  4. use PhpAmqpLib\Connection\AMQPStreamConnection;
  5. class PhpClient {
  6. private $connection;
  7. private $channel;
  8. private $callback_queue;
  9. private $response;
  10. private $corr_id;
  11. private $result = '';
  12. // CONST HOST = "172.30.118.225";
  13. CONST HOST = "192.168.74.1";
  14. // CONST HOST = "localhost";
  15. CONST PORT = 5673; //默认5672
  16. CONST USER = "guest"; //用户名
  17. CONST PASS = "guest";//密码
  18. public function __construct() {
  19. $this->connection = new AMQPStreamConnection(
  20. \Yii::$app->params['rabbithost'], self::PORT, self::USER, self::PASS); //建立连接
  21. $this->channel = $this->connection->channel();
  22. list($this->callback_queue, ,) = $this->channel->queue_declare(
  23. "", false, false, true, false);
  24. $this->channel->basic_consume(
  25. $this->callback_queue, '', false, false, false, false,
  26. array($this, 'on_response'));
  27. }
  28. public function on_response($rep) {
  29. if($rep->get('correlation_id') == $this->corr_id) {
  30. $this->result .= $rep->body;
  31. $this->response = $rep->body;
  32. }
  33. }
  34. public function call($n) {
  35. $this->response = null;
  36. $this->corr_id = uniqid();
  37. $msg = new AMQPMessage(
  38. (string) $n,
  39. array('correlation_id' => $this->corr_id,
  40. 'reply_to' => $this->callback_queue)
  41. );
  42. $this->channel->basic_publish($msg, '', 'queue'); //这里的queue是消息名称
  43. while($this->response != "end") {
  44. $this->channel->wait();
  45. }
  46. return $this->result;
  47. }
  48. public static function CallMq($n){
  49. $connection = new AMQPStreamConnection(
  50. self::HOST, self::PORT, self::USER, self::PASS); //建立连接
  51. $channel = $connection->channel();
  52. $channel->queue_declare('task_queue', false, true, false, false);
  53. $data=empty($n)?"Hello World!":$n;
  54. $msg = new AMQPMessage($data,
  55. array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
  56. );
  57. $channel->basic_publish($msg, '', 'task_queue');
  58. $channel->close();
  59. $connection->close();
  60. return true;
  61. }
  62. public static function CallUserMq($n){
  63. $connection = new AMQPStreamConnection(
  64. self::HOST, self::PORT, self::USER, self::PASS); //建立连接
  65. $channel = $connection->channel();
  66. $channel->queue_declare('login', false, true, false, false);
  67. $data=empty($n)?"Hello World!":$n;
  68. $msg = new AMQPMessage($data,
  69. array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
  70. );
  71. $channel->basic_publish($msg, '', 'login');
  72. $channel->close();
  73. $connection->close();
  74. return true;
  75. }
  76. }