123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- <?php
- namespace backend\forms;
- use components\Curl;
- use components\Exception;
- use Yii;
- /**
- * Class QueueForm
- * @package backend\forms
- */
- class QueueForm extends BaseForm
- {
- /**
- * @var string 名称
- */
- public $name;
- public $name1;
- public $name2;
- public function rules()
- {
- return [
- //create_queue delete_queue
- [['name'], 'required', 'message' => 1100, 'on' => ['create_queue', 'delete_queue']],
- [['name1','name2'], 'required', 'message' => 1100, 'on' => ['update_queue']],
- ['name', 'string', 'message' => 1101, 'on' => ['create_queue', 'delete_queue']],
- ['name', 'filter', 'filter' => 'trim', 'on' => ['create_queue', 'delete_queue']],
- ];
- }
- /**
- * [创建队列]
- * @author: libingke
- * @return array
- * @throws Exception
- */
- public function createQueue()
- {
- try {
- $connect = $this->getConnect();
- $channel = $connect->channel();
- list($queue,,) = $channel->queue_declare($this->name,
- false, true, false, false, false);
- } catch (\Exception $e) {
- throw new Exception(1001, $e->getMessage());
- }
- return $data = ['name' => $queue];
- }
- /**
- * [删除队列]
- * @author: libingke
- * @return array
- * @throws Exception
- */
- public function deleteQueue()
- {
- try {
- $connect = $this->getConnect();
- $channel = $connect->channel();
- $channel->queue_delete($this->name,
- false, false, false, false
- );
- return [
- 'name' => $this->name,
- 'result' => '删除成功'
- ];
- } catch (\Exception $e) {
- throw new Exception(1001, $e->getMessage());
- }
- }
- /**
- * [获取消息列表]
- * @author: libingke
- */
- public function getQueueList()
- {
- $authStr = Yii::$app->Amqp->user . ':' . Yii::$app->Amqp->pass;
- $url = Yii::$app->Amqp->host . ':' . Yii::$app->Amqp->api_port . "/api/queues";
- $curl = new Curl();
- $curl->setOption(CURLOPT_USERPWD, $authStr);
- $result = json_decode($curl->get($url), true);
- if ($curl->responseCode != 200)
- throw new Exception(1002);
- if ($curl->errorText)
- throw new Exception(1002, $curl->errorText);
- if (isset($result['error']) && is_string($result['error']))
- throw new Exception(1002, $result['error']);
- $rows = [];
- foreach ($result as $k => $v) {
- //$name = $v['name'];
- $name = $v['name'];
- $rows[$name]['name'] = $name;
- $rows[$name]['messages_count'] = $v['messages'];
- $rows[$name]['message_bytes'] = $v['message_bytes'];
- $rows[$name]['messages_ready'] = $v['messages_ready'];
- //$rows[$name]['message_stats'] = $v['message_stats'];
- $rows[$name]['consumers_count'] = $v['consumers'];
- $rows[$name]['auto_delete'] = $v['auto_delete'];
- $rows[$name]['durable'] = $v['durable'];
- //$rows[$name]['arguments'] = $v['arguments'];
- $rows[$name]['state'] = $v['state'];
- $rows[$name]['idle_since'] = $v['idle_since'];
- }
- unset($result);
- return ['count' => count($rows), 'rows' => $rows];
- }
- /**
- * [修改队列]
- * @author: hanguangxu
- * @return array
- * @throws Exception
- */
- public function updateQueue()
- {
- try {
- $connect = $this->getConnect();
- $channel = $connect->channel();
- $channel->queue_delete($this->name1,
- false, false, false, false
- );
- list($queue,,) = $channel->queue_declare($this->name2,
- false, true, false, false, false);
- } catch (\Exception $e) {
- throw new Exception(1001, $e->getMessage());
- }
- return $data = ['name' => $queue];
- }
- }
|