0
0

QueueForm.php 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. <?php
  2. namespace backend\forms;
  3. use backend\models\Queue;
  4. use common\helpers\KeyHelper;
  5. use components\Curl;
  6. use components\Exception;
  7. use Yii;
  8. /**
  9. * Class QueueForm
  10. * @package backend\forms
  11. */
  12. class QueueForm extends BaseForm
  13. {
  14. public $sign;
  15. public $queue;
  16. public $remark = '';
  17. /**
  18. * @var array
  19. */
  20. public $config = [];
  21. public $qid;
  22. public $force;//强制删除:用于删除库里没有的队列
  23. public function rules()
  24. {
  25. return [
  26. //create_queue
  27. [['config', 'remark'], 'safe', 'on' => 'create_queue'],
  28. [['sign', 'queue'], 'required', 'on' => 'create_queue'],
  29. ['sign', 'filter', 'filter' => 'trim', 'on' => 'create_queue'],
  30. ['sign', 'string', 'on' => 'create_queue'],
  31. ['remark', 'default', 'value' => 'sys', 'on' => 'create_queue'],
  32. //delete_queue
  33. [['qid'], 'required', 'on' => 'delete_queue'],
  34. ];
  35. }
  36. /**
  37. * [创建队列]
  38. * @author: libingke
  39. * @return array
  40. * @throws Exception
  41. */
  42. public function createQueue()
  43. {
  44. try {
  45. $connect = new \PhpAmqpLib\Connection\AMQPStreamConnection(
  46. Yii::$app->Amqp->host,
  47. Yii::$app->Amqp->port,
  48. Yii::$app->Amqp->user,
  49. Yii::$app->Amqp->pass,
  50. Yii::$app->Amqp->vhost
  51. );
  52. $channel = $connect->channel();
  53. list($queue,,) = $channel->queue_declare($this->queue,
  54. false, true, false, false);
  55. } catch (\PhpAmqpLib\Exception\AMQPProtocolChannelException $e) {
  56. throw new Exception(2000, $e->getMessage());
  57. }
  58. $data = ['queue' => $queue];
  59. try {
  60. $one = Queue::findOne(['sign' => $this->sign]);
  61. if ($one)
  62. throw new Exception(2008);
  63. //save
  64. $one = new Queue();
  65. $one->qid = KeyHelper::getUniqueId('queue-add');
  66. $one->sign = $this->sign;
  67. $one->queue = $this->queue;
  68. $one->status = Queue::STATUS_YES;
  69. $one->remark = $this->remark;
  70. $one->config = serialize($this->config);
  71. if ($one->save()) {
  72. $data['qid'] = $one->qid;
  73. $data['sign'] = $one->sign;
  74. ksort($data);
  75. }
  76. } catch (\yii\db\Exception $e) {
  77. //todo db没有记录成功
  78. }
  79. return $data;
  80. }
  81. /**
  82. * [删除队列]
  83. * @author: libingke
  84. * @return array
  85. * @throws Exception
  86. */
  87. public function deleteQueue()
  88. {
  89. $cb = function($name) {
  90. try {
  91. $connect = new \PhpAmqpLib\Connection\AMQPStreamConnection(
  92. Yii::$app->Amqp->host,
  93. Yii::$app->Amqp->port,
  94. Yii::$app->Amqp->user,
  95. Yii::$app->Amqp->pass,
  96. Yii::$app->Amqp->vhost
  97. );
  98. $channel = $connect->channel();
  99. $message = $channel->queue_delete($name,
  100. false, false, false, false
  101. );
  102. } catch (\PhpAmqpLib\Exception\AMQPProtocolChannelException $e) {
  103. $message = $e->getMessage();
  104. }
  105. return $message == 0 ? '' : $message;
  106. };
  107. $data = ['delete_count' => 0, 'delete_message' => ''];
  108. try {
  109. $one = Queue::findOne(['qid' => $this->qid, 'status' => Queue::STATUS_YES]);
  110. if ($one) {
  111. $one->status = Queue::STATUS_NO;
  112. if (($c = $cb($one->queue)) === '') {
  113. $one->update();
  114. $data['delete_count'] ++;
  115. }
  116. $data['delete_message'] = $c;
  117. } else {
  118. throw new Exception(2009);
  119. }
  120. } catch (\yii\db\Exception $e) {
  121. $data['delete_message'] = $e->getMessage();
  122. }
  123. return $data;
  124. }
  125. /**
  126. * [获取消息列表]
  127. * @author: libingke
  128. */
  129. public function getQueueList()
  130. {
  131. $badCode = 2000;
  132. $authStr = Yii::$app->Amqp->getConfig('user') . ':' . Yii::$app->Amqp->getConfig('pass');
  133. /* URL */
  134. $vhost = urlencode(Yii::$app->Amqp->getConfig('vhost'));
  135. $url = Yii::$app->Amqp->getConfig('host') . ':' . Yii::$app->Amqp->getConfig('api_port') . "/api/queues";
  136. $curl = new Curl();
  137. $curl->setOption(CURLOPT_USERPWD, $authStr);
  138. $result = json_decode($curl->get($url), true);
  139. if ($curl->responseCode != 200)
  140. throw new Exception($badCode);
  141. if ($curl->errorText)
  142. throw new Exception($badCode, $curl->errorText);
  143. if (isset($result['error']) && is_string($result['error']))
  144. throw new Exception($badCode, $result['error']);
  145. $rows = [];
  146. foreach ($result as $k => $v) {
  147. //$name = $v['name'];
  148. $name = $v['name'];
  149. $rows[$name]['name'] = $name;
  150. $rows[$name]['messages_count'] = $v['messages'];
  151. $rows[$name]['message_bytes'] = $v['message_bytes'];
  152. $rows[$name]['messages_ready'] = $v['messages_ready'];
  153. //$rows[$name]['message_stats'] = $v['message_stats'];
  154. $rows[$name]['consumers_count'] = $v['consumers'];
  155. $rows[$name]['auto_delete'] = $v['auto_delete'];
  156. $rows[$name]['durable'] = $v['durable'];
  157. //$rows[$name]['arguments'] = $v['arguments'];
  158. $rows[$name]['state'] = $v['state'];
  159. $rows[$name]['idle_since'] = $v['idle_since'];
  160. }
  161. unset($result);
  162. return ['count' => count($rows), 'rows' => $rows];
  163. }
  164. }