PhpClient.php 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. <?php
  2. namespace components\RabbitMQ;
  3. use PhpAmqpLib\Message\AMQPMessage;
  4. use PhpAmqpLib\Connection\AMQPStreamConnection;
  5. class PhpClient
  6. {
  7. private $connection;
  8. private $channel;
  9. private $callback_queue;
  10. private $response;
  11. private $corr_id;
  12. private $result = '';
  13. CONST HOST = "121.196.226.188";
  14. CONST PORT = 5672;
  15. CONST USER = "lbk";
  16. CONST PASS = "123456";
  17. public function __construct()
  18. {
  19. $this->connection = new AMQPStreamConnection(
  20. self::HOST, self::PORT, self::USER, self::PASS); //建立连接
  21. $this->channel = $this->connection->channel();
  22. list($this->callback_queue, ,) = $this->channel->queue_declare(
  23. "", false, false, true, false);
  24. $this->channel->basic_consume(
  25. $this->callback_queue, '', false, false, false, false,
  26. array($this, 'on_response'));
  27. }
  28. public function on_response($rep) {
  29. if($rep->get('correlation_id') == $this->corr_id) {
  30. $this->result .= $rep->body;
  31. $this->response = $rep->body;
  32. }
  33. }
  34. public function call($n) {
  35. $this->response = null;
  36. $this->corr_id = uniqid();
  37. $msg = new AMQPMessage(
  38. (string) $n,
  39. array('correlation_id' => $this->corr_id,
  40. 'reply_to' => $this->callback_queue)
  41. );
  42. $this->channel->basic_publish($msg, '', 'test'); //这里的queue是消息名称
  43. var_dump($this->response, $this->result);die;
  44. while($this->response != "end") {
  45. $this->channel->wait();
  46. }
  47. return $this->result;
  48. }
  49. public static function CallMq($n){
  50. $connection = new AMQPStreamConnection(
  51. self::HOST, self::PORT, self::USER, self::PASS); //建立连接
  52. $channel = $connection->channel();
  53. $channel->queue_declare('task_queue', false, true, false, false);
  54. $data=empty($n)?"Hello World!":$n;
  55. $msg = new AMQPMessage($data,
  56. array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
  57. );
  58. $channel->basic_publish($msg, '', 'task_queue');
  59. $channel->close();
  60. $connection->close();
  61. return true;
  62. }
  63. public static function CallUserMq($n){
  64. $connection = new AMQPStreamConnection(
  65. self::HOST, self::PORT, self::USER, self::PASS); //建立连接
  66. $channel = $connection->channel();
  67. $channel->queue_declare('login', false, true, false, false);
  68. $data=empty($n)?"Hello World!":$n;
  69. $msg = new AMQPMessage($data,
  70. array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
  71. );
  72. $channel->basic_publish($msg, '', 'login');
  73. $channel->close();
  74. $connection->close();
  75. return true;
  76. }
  77. }