1100, 'on' => ['create_queue', 'delete_queue']], [['name1','name2'], 'required', 'message' => 1100, 'on' => ['update_queue']], ['name', 'string', 'message' => 1101, 'on' => ['create_queue', 'delete_queue']], ['name', 'filter', 'filter' => 'trim', 'on' => ['create_queue', 'delete_queue']], ]; } /** * [创建队列] * @author: libingke * @return array * @throws Exception */ public function createQueue() { try { $connect = $this->getConnect(); $channel = $connect->channel(); list($queue,,) = $channel->queue_declare($this->name, false, true, false, false, false); } catch (\Exception $e) { throw new Exception(1001, $e->getMessage()); } return $data = ['name' => $queue]; } /** * [删除队列] * @author: libingke * @return array * @throws Exception */ public function deleteQueue() { try { $connect = $this->getConnect(); $channel = $connect->channel(); $channel->queue_delete($this->name, false, false, false, false ); return [ 'name' => $this->name, 'result' => '删除成功' ]; } catch (\Exception $e) { throw new Exception(1001, $e->getMessage()); } } /** * [获取消息列表] * @author: libingke */ public function getQueueList() { $authStr = Yii::$app->Amqp->user . ':' . Yii::$app->Amqp->pass; $url = Yii::$app->Amqp->host . ':' . Yii::$app->Amqp->api_port . "/api/queues"; $curl = new Curl(); $curl->setOption(CURLOPT_USERPWD, $authStr); $result = json_decode($curl->get($url), true); if ($curl->responseCode != 200) throw new Exception(1002); if ($curl->errorText) throw new Exception(1002, $curl->errorText); if (isset($result['error']) && is_string($result['error'])) throw new Exception(1002, $result['error']); $rows = []; foreach ($result as $k => $v) { //$name = $v['name']; $name = $v['name']; $rows[$name]['name'] = $name; $rows[$name]['messages_count'] = $v['messages']; $rows[$name]['message_bytes'] = $v['message_bytes']; $rows[$name]['messages_ready'] = $v['messages_ready']; //$rows[$name]['message_stats'] = $v['message_stats']; $rows[$name]['consumers_count'] = $v['consumers']; $rows[$name]['auto_delete'] = $v['auto_delete']; $rows[$name]['durable'] = $v['durable']; //$rows[$name]['arguments'] = $v['arguments']; $rows[$name]['state'] = $v['state']; $rows[$name]['idle_since'] = $v['idle_since']; } unset($result); return ['count' => count($rows), 'rows' => $rows]; } /** * [修改队列] * @author: hanguangxu * @return array * @throws Exception */ public function updateQueue() { try { $connect = $this->getConnect(); $channel = $connect->channel(); $channel->queue_delete($this->name1, false, false, false, false ); list($queue,,) = $channel->queue_declare($this->name2, false, true, false, false, false); } catch (\Exception $e) { throw new Exception(1001, $e->getMessage()); } return $data = ['name' => $queue]; } }