SubscribeForm.php 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. <?php
  2. namespace backend\forms;
  3. use components\Curl;
  4. use components\Exception;
  5. use Yii;
  6. /**
  7. * Class QueueForm
  8. * @package backend\forms
  9. */
  10. class SubscribeForm extends BaseForm
  11. {
  12. /**
  13. * @var string 主题名字
  14. */
  15. public $topicName;
  16. /**
  17. * @var string 订阅名字
  18. */
  19. public $subscriptionName;
  20. /**
  21. * @var string 订阅的协议
  22. */
  23. public $protocol;
  24. /**
  25. * @var string 接收通知的 endpoint
  26. */
  27. public $endpoint;
  28. /**
  29. * @var string 订阅接收消息的过滤策略
  30. */
  31. public $bindingKey;
  32. public function rules()
  33. {
  34. return [
  35. //create_subscribe create_subscribe
  36. [['topicName','subscriptionName', 'endpoint'], 'required', 'message' => 1500, 'on' => ['create_subscribe', 'delete_subscribe']],
  37. ['topicName', 'string', 'message' => 1501, 'on' => ['create_subscribe', 'delete_subscribe']],
  38. ['topicName', 'filter', 'filter' => 'trim', 'on' => ['create_subscribe', 'delete_subscribe']],
  39. ];
  40. }
  41. /**
  42. * [创建队列]
  43. * @author: libingke
  44. * @return array
  45. * @throws Exception
  46. */
  47. public function Subscribe()
  48. {
  49. try {
  50. $connect = $this->getConnect();
  51. $channel = $connect->channel();
  52. list($subscriptionName,,) = $channel->queue_bind($this->endpoint, $this->topicName, is_null($this->bindingKey)?'*':$this->bindingKey);
  53. } catch (\Exception $e) {
  54. throw new Exception(1001, $e->getMessage());
  55. }
  56. return $data = [
  57. 'subscriptionName' => empty($subscriptionName)?$this->subscriptionName:$subscriptionName,
  58. 'bindingKey' => $this->topicName,
  59. 'endpoint' => $this->endpoint,
  60. ];
  61. }
  62. /**
  63. * [删除队列]
  64. * @author: libingke
  65. * @return array
  66. * @throws Exception
  67. */
  68. public function Unsubscribe()
  69. {
  70. try {
  71. $connect = $this->getConnect();
  72. $channel = $connect->channel();
  73. $channel->exchange_delete($this->name, false, false );
  74. return [
  75. 'name' => $this->name,
  76. 'result' => '删除成功'
  77. ];
  78. } catch (\Exception $e) {
  79. throw new Exception(1001, $e->getMessage());
  80. }
  81. }
  82. /**
  83. * [获取消息列表]
  84. * @author: libingke
  85. */
  86. public function getQueueList()
  87. {
  88. $authStr = Yii::$app->Amqp->user . ':' . Yii::$app->Amqp->pass;
  89. $url = Yii::$app->Amqp->host . ':' . Yii::$app->Amqp->api_port . "/api/queues";
  90. $curl = new Curl();
  91. $curl->setOption(CURLOPT_USERPWD, $authStr);
  92. $result = json_decode($curl->get($url), true);
  93. if ($curl->responseCode != 200)
  94. throw new Exception(1002);
  95. if ($curl->errorText)
  96. throw new Exception(1002, $curl->errorText);
  97. if (isset($result['error']) && is_string($result['error']))
  98. throw new Exception(1002, $result['error']);
  99. $rows = [];
  100. foreach ($result as $k => $v) {
  101. //$name = $v['name'];
  102. $name = $v['name'];
  103. $rows[$name]['name'] = $name;
  104. $rows[$name]['messages_count'] = $v['messages'];
  105. $rows[$name]['message_bytes'] = $v['message_bytes'];
  106. $rows[$name]['messages_ready'] = $v['messages_ready'];
  107. //$rows[$name]['message_stats'] = $v['message_stats'];
  108. $rows[$name]['consumers_count'] = $v['consumers'];
  109. $rows[$name]['auto_delete'] = $v['auto_delete'];
  110. $rows[$name]['durable'] = $v['durable'];
  111. //$rows[$name]['arguments'] = $v['arguments'];
  112. $rows[$name]['state'] = $v['state'];
  113. $rows[$name]['idle_since'] = $v['idle_since'];
  114. }
  115. unset($result);
  116. return ['count' => count($rows), 'rows' => $rows];
  117. }
  118. }