0
0

PhpClient.php 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. <?php
  2. namespace components;
  3. use PhpAmqpLib\Message\AMQPMessage;
  4. use PhpAmqpLib\Connection\AMQPStreamConnection;
  5. class PhpClient {
  6. private $connection;
  7. private $channel;
  8. private $callback_queue;
  9. private $response;
  10. private $corr_id;
  11. private $result = '';
  12. // CONST HOST = "172.30.118.225";
  13. CONST HOST = "localhost";
  14. CONST PORT = 5673; //默认5672
  15. CONST USER = "guest"; //用户名
  16. CONST PASS = "guest";//密码
  17. public function __construct() {
  18. $this->connection = new AMQPStreamConnection(
  19. \Yii::$app->params['rabbithost'], self::PORT, self::USER, self::PASS); //建立连接
  20. $this->channel = $this->connection->channel();
  21. list($this->callback_queue, ,) = $this->channel->queue_declare(
  22. "", false, false, true, false);
  23. $this->channel->basic_consume(
  24. $this->callback_queue, '', false, false, false, false,
  25. array($this, 'on_response'));
  26. }
  27. public function on_response($rep) {
  28. if($rep->get('correlation_id') == $this->corr_id) {
  29. $this->result .= $rep->body;
  30. $this->response = $rep->body;
  31. }
  32. }
  33. public function call($n) {
  34. $this->response = null;
  35. $this->corr_id = uniqid();
  36. $msg = new AMQPMessage(
  37. (string) $n,
  38. array('correlation_id' => $this->corr_id,
  39. 'reply_to' => $this->callback_queue)
  40. );
  41. $this->channel->basic_publish($msg, '', 'queue'); //这里的queue是消息名称
  42. while($this->response != "end") {
  43. $this->channel->wait();
  44. }
  45. return $this->result;
  46. }
  47. public static function CallMq($n){
  48. $connection = new AMQPStreamConnection(
  49. self::HOST, self::PORT, self::USER, self::PASS); //建立连接
  50. $channel = $connection->channel();
  51. $channel->queue_declare('task_queue', false, true, false, false);
  52. $data=empty($n)?"Hello World!":$n;
  53. $msg = new AMQPMessage($data,
  54. array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
  55. );
  56. $channel->basic_publish($msg, '', 'task_queue');
  57. $channel->close();
  58. $connection->close();
  59. return true;
  60. }
  61. public static function CallUserMq($n){
  62. $connection = new AMQPStreamConnection(
  63. self::HOST, self::PORT, self::USER, self::PASS); //建立连接
  64. $channel = $connection->channel();
  65. $channel->queue_declare('login', false, true, false, false);
  66. $data=empty($n)?"Hello World!":$n;
  67. $msg = new AMQPMessage($data,
  68. array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
  69. );
  70. $channel->basic_publish($msg, '', 'login');
  71. $channel->close();
  72. $connection->close();
  73. return true;
  74. }
  75. }