0
0

RabbitBase.php 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. <?php
  2. namespace components;
  3. use PhpAmqpLib\Message\AMQPMessage;
  4. use PhpAmqpLib\Connection\AMQPStreamConnection;
  5. class RabbitBase {
  6. private $connection;
  7. private $channel;
  8. private $callback_queue='yang';
  9. private $response;
  10. private $corr_id;
  11. private $result = '';
  12. // CONST HOST = "172.30.118.225";
  13. CONST HOST = "localhost";
  14. CONST PORT = 5673; //默认5672
  15. CONST USER = "guest"; //用户名
  16. CONST PASS = "guest";//密码
  17. public function __construct() {
  18. $this->connection = new AMQPStreamConnection(
  19. \Yii::$app->params['rabbithost'], self::PORT, self::USER, self::PASS); //建立连接
  20. $this->channel = $this->connection->channel();
  21. list($this->callback_queue, ,) = $this->channel->queue_declare("", false, false, true, false);
  22. // list($this->callback_queue, ,) = $this->channel->queue_declare("", false, true, false, false);
  23. $this->channel->basic_consume(
  24. $this->callback_queue, '', false, false, false, false,
  25. array($this, 'on_response'));
  26. }
  27. public function on_response($rep) {
  28. if($rep->get('correlation_id') == $this->corr_id) {
  29. $this->result .= $rep->body;
  30. $this->response = $rep->body;
  31. }
  32. }
  33. public function call($n) {
  34. $this->response = null;
  35. $this->corr_id = uniqid();
  36. $msg = new AMQPMessage(
  37. (string) $n,
  38. array('correlation_id' => $this->corr_id,
  39. 'reply_to' => $this->callback_queue)
  40. );
  41. $this->channel->basic_publish($msg, '', 'yang'); //这里的queue是消息名称
  42. //这个是监听rpc模式
  43. //while($this->response != "end") {
  44. // $this->channel->wait();
  45. //}
  46. return $this->result;
  47. }
  48. public static function CallMq($n){
  49. $connection = new AMQPStreamConnection(
  50. \Yii::$app->params['rabbithost'], self::PORT, self::USER, self::PASS); //建立连接
  51. $channel = $connection->channel();
  52. $channel->queue_declare('login', false, true, false, false);
  53. $data=empty($n)?"Hello World!":$n;
  54. $msg = new AMQPMessage($data,
  55. array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
  56. );
  57. $channel->basic_publish($msg, '', 'login');
  58. $channel->close();
  59. $connection->close();
  60. return true;
  61. }
  62. }