123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- <?php
- namespace backend\forms;
- use backend\models\Queue;
- use common\helpers\KeyHelper;
- use components\Curl;
- use components\Exception;
- use Yii;
- class QueueForm extends BaseForm
- {
- public $sign;
- public $queue;
- public $remark = '';
-
- public $config = [];
- public $qid;
- public $force;
- public function rules()
- {
- return [
-
- [['config', 'remark'], 'safe', 'on' => 'create_queue'],
- [['sign', 'queue'], 'required', 'on' => 'create_queue'],
- ['sign', 'filter', 'filter' => 'trim', 'on' => 'create_queue'],
- ['sign', 'string', 'on' => 'create_queue'],
- ['remark', 'default', 'value' => 'sys', 'on' => 'create_queue'],
-
- [['qid'], 'required', 'on' => 'delete_queue'],
- ];
- }
-
- public function createQueue()
- {
- try {
- $connect = new \PhpAmqpLib\Connection\AMQPStreamConnection(
- Yii::$app->Amqp->host,
- Yii::$app->Amqp->port,
- Yii::$app->Amqp->user,
- Yii::$app->Amqp->pass,
- Yii::$app->Amqp->vhost
- );
- $channel = $connect->channel();
- list($queue,,) = $channel->queue_declare($this->queue,
- false, true, false, false);
- } catch (\PhpAmqpLib\Exception\AMQPProtocolChannelException $e) {
- throw new Exception(2000, $e->getMessage());
- }
- $data = ['queue' => $queue];
- try {
- $one = Queue::findOne(['sign' => $this->sign]);
- if ($one)
- throw new Exception(2008);
-
- $one = new Queue();
- $one->qid = KeyHelper::getUniqueId('queue-add');
- $one->sign = $this->sign;
- $one->queue = $this->queue;
- $one->status = Queue::STATUS_YES;
- $one->remark = $this->remark;
- $one->config = serialize($this->config);
- if ($one->save()) {
- $data['qid'] = $one->qid;
- $data['sign'] = $one->sign;
- ksort($data);
- }
- } catch (\yii\db\Exception $e) {
-
- }
- return $data;
- }
-
- public function deleteQueue()
- {
- $cb = function($name) {
- try {
- $connect = new \PhpAmqpLib\Connection\AMQPStreamConnection(
- Yii::$app->Amqp->host,
- Yii::$app->Amqp->port,
- Yii::$app->Amqp->user,
- Yii::$app->Amqp->pass,
- Yii::$app->Amqp->vhost
- );
- $channel = $connect->channel();
- $message = $channel->queue_delete($name,
- false, false, false, false
- );
- } catch (\PhpAmqpLib\Exception\AMQPProtocolChannelException $e) {
- $message = $e->getMessage();
- }
- return $message == 0 ? '' : $message;
- };
- $data = ['delete_count' => 0, 'delete_message' => ''];
- try {
- $one = Queue::findOne(['qid' => $this->qid, 'status' => Queue::STATUS_YES]);
- if ($one) {
- $one->status = Queue::STATUS_NO;
- if (($c = $cb($one->queue)) === '') {
- $one->update();
- $data['delete_count'] ++;
- }
- $data['delete_message'] = $c;
- } else {
- throw new Exception(2009);
- }
- } catch (\yii\db\Exception $e) {
- $data['delete_message'] = $e->getMessage();
- }
- return $data;
- }
-
- public function getQueueList()
- {
- $badCode = 2000;
- $authStr = Yii::$app->Amqp->getConfig('user') . ':' . Yii::$app->Amqp->getConfig('pass');
-
- $vhost = urlencode(Yii::$app->Amqp->getConfig('vhost'));
- $url = Yii::$app->Amqp->getConfig('host') . ':' . Yii::$app->Amqp->getConfig('api_port') . "/api/queues";
- $curl = new Curl();
- $curl->setOption(CURLOPT_USERPWD, $authStr);
- $result = json_decode($curl->get($url), true);
- if ($curl->responseCode != 200)
- throw new Exception($badCode);
- if ($curl->errorText)
- throw new Exception($badCode, $curl->errorText);
- if (isset($result['error']) && is_string($result['error']))
- throw new Exception($badCode, $result['error']);
- $rows = [];
- foreach ($result as $k => $v) {
-
- $name = $v['name'];
- $rows[$name]['name'] = $name;
- $rows[$name]['messages_count'] = $v['messages'];
- $rows[$name]['message_bytes'] = $v['message_bytes'];
- $rows[$name]['messages_ready'] = $v['messages_ready'];
-
- $rows[$name]['consumers_count'] = $v['consumers'];
- $rows[$name]['auto_delete'] = $v['auto_delete'];
- $rows[$name]['durable'] = $v['durable'];
-
- $rows[$name]['state'] = $v['state'];
- $rows[$name]['idle_since'] = $v['idle_since'];
- }
- unset($result);
- return ['count' => count($rows), 'rows' => $rows];
- }
- }
|