1100, 'on' => ['create_topic', 'delete_topic']], ['name', 'string', 'message' => 1101, 'on' => ['create_topic', 'delete_topic']], ['name', 'filter', 'filter' => 'trim', 'on' => ['create_topic', 'delete_topic']], ]; } /** * [创建队列] * @author: libingke * @return array * @throws Exception */ public function createTopic() { try { $connect = $this->getConnect(); $channel = $connect->channel(); list($topic,,) = $channel->exchange_declare($this->name, 'topic', false, true, false); } catch (\Exception $e) { throw new Exception(1001, $e->getMessage()); } return $data = ['TopicName' => empty($topic)?$this->name:$topic]; } /** * [删除队列] * @author: libingke * @return array * @throws Exception */ public function deleteTopic() { try { $connect = $this->getConnect(); $channel = $connect->channel(); $channel->exchange_delete($this->name, false, false ); return [ 'name' => $this->name, 'result' => '删除成功' ]; } catch (\Exception $e) { throw new Exception(1001, $e->getMessage()); } } /** * [获取消息列表] * @author: libingke */ public function getTopicList() { $authStr = Yii::$app->Amqp->user . ':' . Yii::$app->Amqp->pass; $url = Yii::$app->Amqp->host . ':' . Yii::$app->Amqp->api_port . "/api/exchanges"; $curl = new Curl(); $curl->setOption(CURLOPT_USERPWD, $authStr); $result = json_decode($curl->get($url), true); if ($curl->responseCode != 200) throw new Exception(1002); if ($curl->errorText) throw new Exception(1002, $curl->errorText); if (isset($result['error']) && is_string($result['error'])) throw new Exception(1002, $result['error']); $rows = []; foreach ($result as $k => $v) { if($v['type'] = 'topic'){ $name = $v['name']; $rows[$name]['name'] = $name; $rows[$name]['auto_delete'] = $v['auto_delete']; $rows[$name]['durable'] = $v['durable']; $rows[$name]['arguments'] = $v['arguments']; $rows[$name]['type'] = $v['type']; $rows[$name]['vhost'] = $v['vhost']; } } unset($result); return ['count' => count($rows), 'rows' => $rows]; } }