PhpClient.php 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. <?php
  2. namespace components;
  3. use PhpAmqpLib\Message\AMQPMessage;
  4. use PhpAmqpLib\Connection\AMQPStreamConnection;
  5. class PhpClient {
  6. private $connection;
  7. private $channel;
  8. private $callback_queue;
  9. private $response;
  10. private $corr_id;
  11. private $result = '';
  12. CONST HOST = "172.30.118.225";
  13. CONST PORT = 5673; //默认5672
  14. CONST USER = "guest"; //用户名
  15. CONST PASS = "guest";//密码
  16. public function __construct() {
  17. $this->connection = new AMQPStreamConnection(
  18. self::HOST, self::PORT, self::USER, self::PASS); //建立连接
  19. $this->channel = $this->connection->channel();
  20. list($this->callback_queue, ,) = $this->channel->queue_declare(
  21. "", false, false, true, false);
  22. $this->channel->basic_consume(
  23. $this->callback_queue, '', false, false, false, false,
  24. array($this, 'on_response'));
  25. }
  26. public function on_response($rep) {
  27. if($rep->get('correlation_id') == $this->corr_id) {
  28. $this->result .= $rep->body;
  29. $this->response = $rep->body;
  30. }
  31. }
  32. public function call($n) {
  33. $this->response = null;
  34. $this->corr_id = uniqid();
  35. $msg = new AMQPMessage(
  36. (string) $n,
  37. array('correlation_id' => $this->corr_id,
  38. 'reply_to' => $this->callback_queue)
  39. );
  40. $this->channel->basic_publish($msg, '', 'queue'); //这里的queue是消息名称
  41. while($this->response != "end") {
  42. $this->channel->wait();
  43. }
  44. return $this->result;
  45. }
  46. public static function CallMq($n){
  47. $connection = new AMQPStreamConnection(
  48. self::HOST, self::PORT, self::USER, self::PASS); //建立连接
  49. $channel = $connection->channel();
  50. $channel->queue_declare('task_queue', false, true, false, false);
  51. $data=empty($n)?"Hello World!":$n;
  52. $msg = new AMQPMessage($data,
  53. array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
  54. );
  55. $channel->basic_publish($msg, '', 'task_queue');
  56. $channel->close();
  57. $connection->close();
  58. return true;
  59. }
  60. }