'create_queue'], [['sign', 'queue'], 'required', 'on' => 'create_queue'], ['sign', 'filter', 'filter' => 'trim', 'on' => 'create_queue'], ['sign', 'string', 'on' => 'create_queue'], ['remark', 'default', 'value' => 'sys', 'on' => 'create_queue'], //delete_queue [['qid'], 'required', 'on' => 'delete_queue'], ]; } /** * [创建队列] * @author: libingke * @return array * @throws Exception */ public function createQueue() { try { $connect = new \PhpAmqpLib\Connection\AMQPStreamConnection( Yii::$app->Amqp->host, Yii::$app->Amqp->port, Yii::$app->Amqp->user, Yii::$app->Amqp->pass, Yii::$app->Amqp->vhost ); $channel = $connect->channel(); list($queue,,) = $channel->queue_declare($this->queue, false, true, false, false); } catch (\PhpAmqpLib\Exception\AMQPProtocolChannelException $e) { throw new Exception(2000, $e->getMessage()); } $data = ['queue' => $queue]; try { $one = Queue::findOne(['sign' => $this->sign]); if ($one) throw new Exception(2008); //save $one = new Queue(); $one->qid = KeyHelper::getUniqueId('queue-add'); $one->sign = $this->sign; $one->queue = $this->queue; $one->status = Queue::STATUS_YES; $one->remark = $this->remark; $one->config = serialize($this->config); if ($one->save()) { $data['qid'] = $one->qid; $data['sign'] = $one->sign; ksort($data); } } catch (\yii\db\Exception $e) { //todo db没有记录成功 } return $data; } /** * [删除队列] * @author: libingke * @return array * @throws Exception */ public function deleteQueue() { $cb = function($name) { try { $connect = new \PhpAmqpLib\Connection\AMQPStreamConnection( Yii::$app->Amqp->host, Yii::$app->Amqp->port, Yii::$app->Amqp->user, Yii::$app->Amqp->pass, Yii::$app->Amqp->vhost ); $channel = $connect->channel(); $message = $channel->queue_delete($name, false, false, false, false ); } catch (\PhpAmqpLib\Exception\AMQPProtocolChannelException $e) { $message = $e->getMessage(); } return $message == 0 ? '' : $message; }; $data = ['delete_count' => 0, 'delete_message' => '']; try { $one = Queue::findOne(['qid' => $this->qid, 'status' => Queue::STATUS_YES]); if ($one) { $one->status = Queue::STATUS_NO; if (($c = $cb($one->queue)) === '') { $one->update(); $data['delete_count'] ++; } $data['delete_message'] = $c; } else { throw new Exception(2009); } } catch (\yii\db\Exception $e) { $data['delete_message'] = $e->getMessage(); } return $data; } /** * [获取消息列表] * @author: libingke */ public function getQueueList() { $badCode = 2000; $authStr = Yii::$app->Amqp->getConfig('user') . ':' . Yii::$app->Amqp->getConfig('pass'); /* URL */ $vhost = urlencode(Yii::$app->Amqp->getConfig('vhost')); $url = Yii::$app->Amqp->getConfig('host') . ':' . Yii::$app->Amqp->getConfig('api_port') . "/api/queues"; $curl = new Curl(); $curl->setOption(CURLOPT_USERPWD, $authStr); $result = json_decode($curl->get($url), true); if ($curl->responseCode != 200) throw new Exception($badCode); if ($curl->errorText) throw new Exception($badCode, $curl->errorText); if (isset($result['error']) && is_string($result['error'])) throw new Exception($badCode, $result['error']); $rows = []; foreach ($result as $k => $v) { //$name = $v['name']; $name = $v['name']; $rows[$name]['name'] = $name; $rows[$name]['messages_count'] = $v['messages']; $rows[$name]['message_bytes'] = $v['message_bytes']; $rows[$name]['messages_ready'] = $v['messages_ready']; //$rows[$name]['message_stats'] = $v['message_stats']; $rows[$name]['consumers_count'] = $v['consumers']; $rows[$name]['auto_delete'] = $v['auto_delete']; $rows[$name]['durable'] = $v['durable']; //$rows[$name]['arguments'] = $v['arguments']; $rows[$name]['state'] = $v['state']; $rows[$name]['idle_since'] = $v['idle_since']; } unset($result); return ['count' => count($rows), 'rows' => $rows]; } }