0
0

TopicForm.php 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. <?php
  2. namespace backend\forms;
  3. use components\Curl;
  4. use components\Exception;
  5. use Yii;
  6. /**
  7. * Class QueueForm
  8. * @package backend\forms
  9. */
  10. class TopicForm extends BaseForm
  11. {
  12. /**
  13. * @var string 名称
  14. */
  15. public $name;
  16. public function rules()
  17. {
  18. return [
  19. //create_queue delete_queue
  20. [['name'], 'required', 'message' => 1100, 'on' => ['create_topic', 'delete_topic']],
  21. ['name', 'string', 'message' => 1101, 'on' => ['create_topic', 'delete_topic']],
  22. ['name', 'filter', 'filter' => 'trim', 'on' => ['create_topic', 'delete_topic']],
  23. ];
  24. }
  25. /**
  26. * [创建队列]
  27. * @author: libingke
  28. * @return array
  29. * @throws Exception
  30. */
  31. public function createTopic()
  32. {
  33. try {
  34. $connect = $this->getConnect();
  35. $channel = $connect->channel();
  36. list($topic,,) = $channel->exchange_declare($this->name, 'topic', false, true, false);
  37. } catch (\Exception $e) {
  38. throw new Exception(1001, $e->getMessage());
  39. }
  40. return $data = ['TopicName' => empty($topic)?$this->name:$topic];
  41. }
  42. /**
  43. * [删除队列]
  44. * @author: libingke
  45. * @return array
  46. * @throws Exception
  47. */
  48. public function deleteTopic()
  49. {
  50. try {
  51. $connect = $this->getConnect();
  52. $channel = $connect->channel();
  53. $channel->exchange_delete($this->name, false, false );
  54. return [
  55. 'name' => $this->name,
  56. 'result' => '删除成功'
  57. ];
  58. } catch (\Exception $e) {
  59. throw new Exception(1001, $e->getMessage());
  60. }
  61. }
  62. /**
  63. * [获取消息列表]
  64. * @author: libingke
  65. */
  66. public function getQueueList()
  67. {
  68. $authStr = Yii::$app->Amqp->user . ':' . Yii::$app->Amqp->pass;
  69. $url = Yii::$app->Amqp->host . ':' . Yii::$app->Amqp->api_port . "/api/queues";
  70. $curl = new Curl();
  71. $curl->setOption(CURLOPT_USERPWD, $authStr);
  72. $result = json_decode($curl->get($url), true);
  73. if ($curl->responseCode != 200)
  74. throw new Exception(1002);
  75. if ($curl->errorText)
  76. throw new Exception(1002, $curl->errorText);
  77. if (isset($result['error']) && is_string($result['error']))
  78. throw new Exception(1002, $result['error']);
  79. $rows = [];
  80. foreach ($result as $k => $v) {
  81. //$name = $v['name'];
  82. $name = $v['name'];
  83. $rows[$name]['name'] = $name;
  84. $rows[$name]['messages_count'] = $v['messages'];
  85. $rows[$name]['message_bytes'] = $v['message_bytes'];
  86. $rows[$name]['messages_ready'] = $v['messages_ready'];
  87. //$rows[$name]['message_stats'] = $v['message_stats'];
  88. $rows[$name]['consumers_count'] = $v['consumers'];
  89. $rows[$name]['auto_delete'] = $v['auto_delete'];
  90. $rows[$name]['durable'] = $v['durable'];
  91. //$rows[$name]['arguments'] = $v['arguments'];
  92. $rows[$name]['state'] = $v['state'];
  93. $rows[$name]['idle_since'] = $v['idle_since'];
  94. }
  95. unset($result);
  96. return ['count' => count($rows), 'rows' => $rows];
  97. }
  98. }