1500, 'on' => ['create_subscribe', 'delete_subscribe']], ['topicName', 'string', 'message' => 1501, 'on' => ['create_subscribe', 'delete_subscribe']], ['topicName', 'filter', 'filter' => 'trim', 'on' => ['create_subscribe', 'delete_subscribe']], ]; } /** * [创建队列] * @author: libingke * @return array * @throws Exception */ public function Subscribe() { try { $connect = $this->getConnect(); $channel = $connect->channel(); list($subscriptionName,,) = $channel->queue_bind($this->endpoint, $this->topicName, is_null($this->bindingKey)?'*':$this->bindingKey); } catch (\Exception $e) { throw new Exception(1001, $e->getMessage()); } return $data = [ 'subscriptionName' => empty($subscriptionName)?$this->subscriptionName:$subscriptionName, 'bindingKey' => $this->topicName, 'endpoint' => $this->endpoint, ]; } /** * [删除队列] * @author: libingke * @return array * @throws Exception */ public function Unsubscribe() { try { $connect = $this->getConnect(); $channel = $connect->channel(); $channel->exchange_delete($this->name, false, false ); return [ 'name' => $this->name, 'result' => '删除成功' ]; } catch (\Exception $e) { throw new Exception(1001, $e->getMessage()); } } /** * [获取消息列表] * @author: libingke */ public function getQueueList() { $authStr = Yii::$app->Amqp->user . ':' . Yii::$app->Amqp->pass; $url = Yii::$app->Amqp->host . ':' . Yii::$app->Amqp->api_port . "/api/queues"; $curl = new Curl(); $curl->setOption(CURLOPT_USERPWD, $authStr); $result = json_decode($curl->get($url), true); if ($curl->responseCode != 200) throw new Exception(1002); if ($curl->errorText) throw new Exception(1002, $curl->errorText); if (isset($result['error']) && is_string($result['error'])) throw new Exception(1002, $result['error']); $rows = []; foreach ($result as $k => $v) { //$name = $v['name']; $name = $v['name']; $rows[$name]['name'] = $name; $rows[$name]['messages_count'] = $v['messages']; $rows[$name]['message_bytes'] = $v['message_bytes']; $rows[$name]['messages_ready'] = $v['messages_ready']; //$rows[$name]['message_stats'] = $v['message_stats']; $rows[$name]['consumers_count'] = $v['consumers']; $rows[$name]['auto_delete'] = $v['auto_delete']; $rows[$name]['durable'] = $v['durable']; //$rows[$name]['arguments'] = $v['arguments']; $rows[$name]['state'] = $v['state']; $rows[$name]['idle_since'] = $v['idle_since']; } unset($result); return ['count' => count($rows), 'rows' => $rows]; } }