1500, 'on' => ['create_subscribe', 'delete_subscribe']], ['topicName', 'string', 'message' => 1501, 'on' => ['create_subscribe', 'delete_subscribe']], ['topicName', 'filter', 'filter' => 'trim', 'on' => ['create_subscribe', 'delete_subscribe']], ]; } /** * [创建队列] * @author: libingke * @return array * @throws Exception */ public function Subscribe() { try { $connect = $this->getConnect(); $channel = $connect->channel(); list($subscriptionName,,) = $channel->queue_bind($this->endpoint, $this->topicName, is_null($this->bindingKey)?'*':$this->bindingKey); } catch (\Exception $e) { throw new Exception(1001, $e->getMessage()); } return $data = [ 'subscriptionName' => empty($subscriptionName)?$this->subscriptionName:$subscriptionName, 'bindingKey' => $this->topicName, 'endpoint' => $this->endpoint, ]; } /** * [删除队列] * @author: libingke * @return array * @throws Exception */ public function Unsubscribe() { try { $connect = $this->getConnect(); $channel = $connect->channel(); $channel->queue_unbind($this->endpoint, $this->topicName, false ); list($subscriptionName,,) = $channel->queue_unbind($this->endpoint, $this->topicName, is_null($this->bindingKey)?'*':$this->bindingKey); return [ 'name' => $this->endpoint, 'result' => '删除成功' ]; } catch (\Exception $e) { throw new Exception(1001, $e->getMessage()); } } /** * [获取消息列表] * @author: libingke */ public function getSubscribeList() { $authStr = Yii::$app->Amqp->user . ':' . Yii::$app->Amqp->pass; $url = Yii::$app->Amqp->host . ':' . Yii::$app->Amqp->api_port . "/api/bindings"; $curl = new Curl(); $curl->setOption(CURLOPT_USERPWD, $authStr); $result = json_decode($curl->get($url), true); if ($curl->responseCode != 200) throw new Exception(1002); if ($curl->errorText) throw new Exception(1002, $curl->errorText); if (isset($result['error']) && is_string($result['error'])) throw new Exception(1002, $result['error']); $rows = []; foreach ($result as $k => $v) { //destination exchange $name = $v['destination']; $rows[$name]['destination'] = $name; $rows[$name]['source'] = $v['source']; $rows[$name]['topicName'] = $v['source']; $rows[$name]['destination_type'] = $v['destination_type']; $rows[$name]['routing_key'] = $v['routing_key']; $rows[$name]['arguments'] = $v['arguments']; $rows[$name]['properties_key'] = $v['properties_key']; } unset($result); return ['count' => count($rows), 'rows' => $rows]; } }