1
0

SubscribeForm.php 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. <?php
  2. namespace backend\forms;
  3. use components\Curl;
  4. use components\Exception;
  5. use Yii;
  6. /**
  7. * Class QueueForm
  8. * @package backend\forms
  9. */
  10. class SubscribeForm extends BaseForm
  11. {
  12. /**
  13. * @var string 主题名字
  14. */
  15. public $topicName;
  16. /**
  17. * @var string 订阅名字
  18. */
  19. public $subscriptionName;
  20. /**
  21. * @var string 订阅的协议
  22. */
  23. public $protocol;
  24. /**
  25. * @var string 接收通知的 endpoint
  26. */
  27. public $endpoint;
  28. /**
  29. * @var string 订阅接收消息的过滤策略
  30. */
  31. public $bindingKey;
  32. public function rules()
  33. {
  34. return [
  35. //create_subscribe create_subscribe
  36. [['topicName','subscriptionName', 'endpoint'], 'required', 'message' => 1500, 'on' => ['create_subscribe', 'delete_subscribe']],
  37. ['topicName', 'string', 'message' => 1501, 'on' => ['create_subscribe', 'delete_subscribe']],
  38. ['topicName', 'filter', 'filter' => 'trim', 'on' => ['create_subscribe', 'delete_subscribe']],
  39. ];
  40. }
  41. /**
  42. * [创建队列]
  43. * @author: libingke
  44. * @return array
  45. * @throws Exception
  46. */
  47. public function Subscribe()
  48. {
  49. try {
  50. $connect = $this->getConnect();
  51. $channel = $connect->channel();
  52. list($subscriptionName,,) = $channel->queue_bind($this->endpoint, $this->topicName, is_null($this->bindingKey)?'*':$this->bindingKey);
  53. } catch (\Exception $e) {
  54. throw new Exception(1001, $e->getMessage());
  55. }
  56. return $data = [
  57. 'subscriptionName' => empty($subscriptionName)?$this->subscriptionName:$subscriptionName,
  58. 'bindingKey' => $this->topicName,
  59. 'endpoint' => $this->endpoint,
  60. ];
  61. }
  62. /**
  63. * [删除队列]
  64. * @author: libingke
  65. * @return array
  66. * @throws Exception
  67. */
  68. public function Unsubscribe()
  69. {
  70. try {
  71. $connect = $this->getConnect();
  72. $channel = $connect->channel();
  73. $channel->queue_unbind($this->endpoint, $this->topicName, false );
  74. list($subscriptionName,,) = $channel->queue_unbind($this->endpoint, $this->topicName, is_null($this->bindingKey)?'*':$this->bindingKey);
  75. return [
  76. 'name' => $this->endpoint,
  77. 'result' => '删除成功'
  78. ];
  79. } catch (\Exception $e) {
  80. throw new Exception(1001, $e->getMessage());
  81. }
  82. }
  83. /**
  84. * [获取消息列表]
  85. * @author: libingke
  86. */
  87. public function getSubscribeList()
  88. {
  89. $authStr = Yii::$app->Amqp->user . ':' . Yii::$app->Amqp->pass;
  90. $url = Yii::$app->Amqp->host . ':' . Yii::$app->Amqp->api_port . "/api/bindings";
  91. $curl = new Curl();
  92. $curl->setOption(CURLOPT_USERPWD, $authStr);
  93. $result = json_decode($curl->get($url), true);
  94. if ($curl->responseCode != 200)
  95. throw new Exception(1002);
  96. if ($curl->errorText)
  97. throw new Exception(1002, $curl->errorText);
  98. if (isset($result['error']) && is_string($result['error']))
  99. throw new Exception(1002, $result['error']);
  100. $rows = [];
  101. foreach ($result as $k => $v) {
  102. //destination exchange
  103. $name = $v['destination'];
  104. $rows[$name]['destination'] = $name;
  105. $rows[$name]['source'] = $v['source'];
  106. $rows[$name]['topicName'] = $v['source'];
  107. $rows[$name]['destination_type'] = $v['destination_type'];
  108. $rows[$name]['routing_key'] = $v['routing_key'];
  109. $rows[$name]['arguments'] = $v['arguments'];
  110. $rows[$name]['properties_key'] = $v['properties_key'];
  111. }
  112. unset($result);
  113. return ['count' => count($rows), 'rows' => $rows];
  114. }
  115. }