lin 6 years ago
parent
commit
4b1eae984c

+ 3 - 6
backend/controllers/MessageController.php

@@ -36,17 +36,14 @@ class MessageController extends BaseController
 	 * @author: libingke
 	 * @return array
 	 */
-	public function actionSend1()
+	public function actionSendtopic()
 	{
 		$model = new MessageForm();
-		$model->setScenario('send');
+		$model->setScenario('sendtopic');
 		$model->load(['MessageForm' => Yii::$app->request->post()]);
-
-		die;
-
 		$data = [];
 		if ($model->validate()) {
-			$data = $model->sendMessage();
+            $data = $model->sendMessageTopic();
 		} else {
 			$model->handleError();//处理验证失败
 		}

+ 98 - 0
backend/controllers/SubscribeController.php

@@ -0,0 +1,98 @@
+<?php
+namespace backend\controllers;
+
+use backend\forms\QueueForm;
+use backend\forms\SubscribeForm;
+use backend\forms\TopicForm;
+use components\Exception;
+use yii\helpers\ArrayHelper;
+use Yii;
+
+class SubscribeController extends BaseController
+{
+    /**
+     * behaviors
+     * @return array
+     */
+    public function behaviors()
+    {
+        return ArrayHelper::merge(parent::behaviors(), [
+            'verbs' => [
+                'class' => \yii\filters\VerbFilter::className(),
+                'actions' => [
+                    'list' => ['GET'],
+                    'create' => ['POST'],
+                    'delete' => ['POST'],
+                ],
+            ],
+        ]);
+    }
+
+    /**
+     * [创建队列]
+     * @author: libingke
+     * @return array
+     * @throws Exception
+     */
+    public function actionCreate()
+    {
+        $model = new SubscribeForm();
+        $model->setScenario('create_subscribe');
+        $model->load(['SubscribeForm' => Yii::$app->request->post()]);
+
+        $data = [];
+        if ($model->validate()) {
+            $data = $model->Subscribe();
+        } else {
+            $model->handleError();//处理验证失败
+        }
+
+        return [
+            'code' => 200,
+            'message' => Yii::t('error', 200),
+            'data' => $data
+        ];
+    }
+
+    /**
+     * 删除队列
+     * @author: libingke
+     * @return array
+     * @throws Exception
+     */
+    public function actionDelete()
+    {
+        $model = new TopicForm();
+        $model->setScenario('delete_topic');
+        $model->load(['TopicForm' => Yii::$app->request->post()]);
+
+        $data = [];
+        if ($model->validate()) {
+            $data = $model->deleteTopic();
+        } else {
+            $model->handleError();
+        }
+
+        return [
+            'code' => 200,
+            'message' => Yii::t('error', 200),
+            'data' => $data
+        ];
+    }
+
+    /**
+     * 获取队列列表
+     * @author: libingke
+     */
+    public function actionList()
+    {
+        $model = new TopicForm();
+        $data = $model->getQueueList();
+
+        return [
+            'code' => 200,
+            'message' => Yii::t('error', 200),
+            'data' => $data
+        ];
+    }
+}

+ 97 - 0
backend/controllers/TopicController.php

@@ -0,0 +1,97 @@
+<?php
+namespace backend\controllers;
+
+use backend\forms\QueueForm;
+use backend\forms\TopicForm;
+use components\Exception;
+use yii\helpers\ArrayHelper;
+use Yii;
+
+class TopicController extends BaseController
+{
+    /**
+     * behaviors
+     * @return array
+     */
+    public function behaviors()
+    {
+        return ArrayHelper::merge(parent::behaviors(), [
+            'verbs' => [
+                'class' => \yii\filters\VerbFilter::className(),
+                'actions' => [
+                    'list' => ['GET'],
+                    'create' => ['POST'],
+                    'delete' => ['POST'],
+                ],
+            ],
+        ]);
+    }
+
+    /**
+     * [创建队列]
+     * @author: libingke
+     * @return array
+     * @throws Exception
+     */
+    public function actionCreate()
+    {
+        $model = new TopicForm();
+        $model->setScenario('create_topic');
+        $model->load(['TopicForm' => Yii::$app->request->post()]);
+
+        $data = [];
+        if ($model->validate()) {
+            $data = $model->createTopic();
+        } else {
+            $model->handleError();//处理验证失败
+        }
+
+        return [
+            'code' => 200,
+            'message' => Yii::t('error', 200),
+            'data' => $data
+        ];
+    }
+
+    /**
+     * 删除队列
+     * @author: libingke
+     * @return array
+     * @throws Exception
+     */
+    public function actionDelete()
+    {
+        $model = new TopicForm();
+        $model->setScenario('delete_topic');
+        $model->load(['TopicForm' => Yii::$app->request->post()]);
+
+        $data = [];
+        if ($model->validate()) {
+            $data = $model->deleteTopic();
+        } else {
+            $model->handleError();
+        }
+
+        return [
+            'code' => 200,
+            'message' => Yii::t('error', 200),
+            'data' => $data
+        ];
+    }
+
+    /**
+     * 获取队列列表
+     * @author: libingke
+     */
+    public function actionList()
+    {
+        $model = new TopicForm();
+        $data = $model->getQueueList();
+
+        return [
+            'code' => 200,
+            'message' => Yii::t('error', 200),
+            'data' => $data
+        ];
+    }
+}

+ 55 - 1
backend/forms/MessageForm.php

@@ -29,7 +29,20 @@ class MessageForm extends BaseForm
 	 */
 	public $name;
 
-	/**
+    /**
+     * @var 主题 交换机
+     */
+    public $topicName;
+
+
+    /**
+     * @var  路由
+     */
+    public $routingKey;
+
+
+
+    /**
 	 * @var integer 数量
 	 */
 	public $count;
@@ -110,6 +123,14 @@ class MessageForm extends BaseForm
 			[['queue', 'message'], 'required', 'on' => ['send', 'batch_send']],
 			['message', 'validateArray', 'on' => ['send', 'batch_send']],
 
+
+            /* 发送到订阅者 */
+            [['topicName'], 'trim', 'on' => ['sendtopic']],
+            [['topicName', 'message'], 'required', 'on' => ['sendtopic']],
+            ['message', 'validateArray', 'on' => ['sendtopic']],
+
+
+
 			/* 获取消息列表 */
 			['count', 'integer', 'min' => 1, 'max' => 1000, 'message' => 1205,
 				'tooSmall' => 1206, 'tooBig' => 1207, 'on' => ['message_list']],
@@ -188,6 +209,39 @@ class MessageForm extends BaseForm
 		}
 	}
 
+
+
+    public function sendMessageTopic()
+    {
+        try {
+            $connect = $this->getConnect();
+            $channel = $connect->channel();
+            $this->_handleMessage($this->message);
+            //发送信息到某个主题 $exchange
+            $channel->exchange_declare($this->topicName, 'topic', false, true, false);
+            //消息发送到
+            $channel->basic_publish($this->_message, is_null($this->topicName)?'':$this->topicName, is_null($this->routingKey)?'*':$this->routingKey);
+
+            $data = [
+                'topicName' => is_null($this->topicName)?'*':$this->topicName,
+                'routingKey' => is_null($this->routingKey)?'*':$this->routingKey,
+                'message_add' => [
+                    'mid' => $this->_mid,
+                    'body' => $this->_body
+                ]
+            ];
+            $channel->close();
+            $connect->close();
+            //订阅的不需要标记
+           // Redis::set($q_name, $this->_mid, 'status', AmqpConfig::STATUS_SEND_OK);
+
+            return $data;
+
+        } catch (\Exception $e) {
+            throw new Exception(1001, $e->getMessage());
+        }
+    }
+
 	/**
 	 * [构造消息体]
 	 * @param $data

+ 136 - 0
backend/forms/SubscribeForm.php

@@ -0,0 +1,136 @@
+<?php
+
+namespace backend\forms;
+
+use components\Curl;
+use components\Exception;
+use Yii;
+
+/**
+ * Class QueueForm
+ * @package backend\forms
+ */
+class SubscribeForm extends BaseForm
+{
+	/**
+	 * @var string 主题名字
+	 */
+	public $topicName;
+
+    /**
+     * @var string 订阅名字
+     */
+    public $subscriptionName;
+
+    /**
+     * @var string 订阅的协议
+     */
+    public $protocol;
+
+    /**
+     * @var string 接收通知的 endpoint
+     */
+    public $endpoint;
+
+    /**
+     * @var string 订阅接收消息的过滤策略
+     */
+    public $bindingKey;
+
+	public function rules()
+	{
+		return [
+            //create_subscribe  create_subscribe
+			[['topicName','subscriptionName', 'endpoint'], 'required', 'message' => 1500, 'on' => ['create_subscribe', 'delete_subscribe']],
+			['topicName', 'string', 'message' => 1501, 'on' => ['create_subscribe', 'delete_subscribe']],
+			['topicName', 'filter', 'filter' => 'trim', 'on' => ['create_subscribe', 'delete_subscribe']],
+		];
+	}
+
+	/**
+	 * [创建队列]
+	 * @author: libingke
+	 * @return array
+	 * @throws Exception
+	 */
+	public function Subscribe()
+	{
+		try {
+			$connect = $this->getConnect();
+			$channel = $connect->channel();
+			list($subscriptionName,,) =  $channel->queue_bind($this->endpoint, $this->topicName, is_null($this->bindingKey)?'*':$this->bindingKey);
+
+        } catch (\Exception $e) {
+			throw new Exception(1001, $e->getMessage());
+		}
+		return $data = [
+		    'subscriptionName' => empty($subscriptionName)?$this->subscriptionName:$subscriptionName,
+            'bindingKey' => $this->topicName,
+            'endpoint' => $this->endpoint,
+            ];
+	}
+
+	/**
+	 * [删除队列]
+	 * @author: libingke
+	 * @return array
+	 * @throws Exception
+	 */
+	public function Unsubscribe()
+	{
+		try {
+			$connect = $this->getConnect();
+			$channel = $connect->channel();
+			$channel->exchange_delete($this->name, false, false );
+			return [
+				'name' => $this->name,
+				'result' => '删除成功'
+			];
+
+		} catch (\Exception $e) {
+			throw new Exception(1001, $e->getMessage());
+		}
+	}
+
+	/**
+	 * [获取消息列表]
+	 * @author: libingke
+	 */
+	public function getQueueList()
+	{
+		$authStr = Yii::$app->Amqp->user . ':' . Yii::$app->Amqp->pass;
+		$url = Yii::$app->Amqp->host . ':' . Yii::$app->Amqp->api_port . "/api/queues";
+
+		$curl = new Curl();
+		$curl->setOption(CURLOPT_USERPWD, $authStr);
+		$result = json_decode($curl->get($url), true);
+		if ($curl->responseCode != 200)
+			throw new Exception(1002);
+
+		if ($curl->errorText)
+			throw new Exception(1002, $curl->errorText);
+
+		if (isset($result['error']) && is_string($result['error']))
+			throw new Exception(1002, $result['error']);
+
+		$rows = [];
+		foreach ($result as $k => $v) {
+			//$name = $v['name'];
+			$name = $v['name'];
+			$rows[$name]['name'] 			= $name;
+			$rows[$name]['messages_count']	= $v['messages'];
+			$rows[$name]['message_bytes']	= $v['message_bytes'];
+			$rows[$name]['messages_ready']	= $v['messages_ready'];
+			//$rows[$name]['message_stats']	= $v['message_stats'];
+			$rows[$name]['consumers_count']	= $v['consumers'];
+			$rows[$name]['auto_delete']		= $v['auto_delete'];
+			$rows[$name]['durable']			= $v['durable'];
+			//$rows[$name]['arguments']		= $v['arguments'];
+			$rows[$name]['state']			= $v['state'];
+			$rows[$name]['idle_since']		= $v['idle_since'];
+		}
+		unset($result);
+
+		return ['count' => count($rows), 'rows' => $rows];
+	}
+}

+ 111 - 0
backend/forms/TopicForm.php

@@ -0,0 +1,111 @@
+<?php
+
+namespace backend\forms;
+
+use components\Curl;
+use components\Exception;
+use Yii;
+
+/**
+ * Class QueueForm
+ * @package backend\forms
+ */
+class TopicForm extends BaseForm
+{
+	/**
+	 * @var string 名称
+	 */
+	public $name;
+
+	public function rules()
+	{
+		return [
+			//create_queue  delete_queue
+			[['name'], 'required', 'message' => 1100, 'on' => ['create_topic', 'delete_topic']],
+			['name', 'string', 'message' => 1101, 'on' => ['create_topic', 'delete_topic']],
+			['name', 'filter', 'filter' => 'trim', 'on' => ['create_topic', 'delete_topic']],
+		];
+	}
+
+	/**
+	 * [创建队列]
+	 * @author: libingke
+	 * @return array
+	 * @throws Exception
+	 */
+	public function createTopic()
+	{
+		try {
+			$connect = $this->getConnect();
+			$channel = $connect->channel();
+			list($topic,,) = $channel->exchange_declare($this->name, 'topic', false, true, false);
+        } catch (\Exception $e) {
+			throw new Exception(1001, $e->getMessage());
+		}
+		return $data = ['TopicName' => empty($topic)?$this->name:$topic];
+	}
+
+	/**
+	 * [删除队列]
+	 * @author: libingke
+	 * @return array
+	 * @throws Exception
+	 */
+	public function deleteTopic()
+	{
+		try {
+			$connect = $this->getConnect();
+			$channel = $connect->channel();
+			$channel->exchange_delete($this->name, false, false );
+			return [
+				'name' => $this->name,
+				'result' => '删除成功'
+			];
+
+		} catch (\Exception $e) {
+			throw new Exception(1001, $e->getMessage());
+		}
+	}
+
+	/**
+	 * [获取消息列表]
+	 * @author: libingke
+	 */
+	public function getQueueList()
+	{
+		$authStr = Yii::$app->Amqp->user . ':' . Yii::$app->Amqp->pass;
+		$url = Yii::$app->Amqp->host . ':' . Yii::$app->Amqp->api_port . "/api/queues";
+
+		$curl = new Curl();
+		$curl->setOption(CURLOPT_USERPWD, $authStr);
+		$result = json_decode($curl->get($url), true);
+		if ($curl->responseCode != 200)
+			throw new Exception(1002);
+
+		if ($curl->errorText)
+			throw new Exception(1002, $curl->errorText);
+
+		if (isset($result['error']) && is_string($result['error']))
+			throw new Exception(1002, $result['error']);
+
+		$rows = [];
+		foreach ($result as $k => $v) {
+			//$name = $v['name'];
+			$name = $v['name'];
+			$rows[$name]['name'] 			= $name;
+			$rows[$name]['messages_count']	= $v['messages'];
+			$rows[$name]['message_bytes']	= $v['message_bytes'];
+			$rows[$name]['messages_ready']	= $v['messages_ready'];
+			//$rows[$name]['message_stats']	= $v['message_stats'];
+			$rows[$name]['consumers_count']	= $v['consumers'];
+			$rows[$name]['auto_delete']		= $v['auto_delete'];
+			$rows[$name]['durable']			= $v['durable'];
+			//$rows[$name]['arguments']		= $v['arguments'];
+			$rows[$name]['state']			= $v['state'];
+			$rows[$name]['idle_since']		= $v['idle_since'];
+		}
+		unset($result);
+
+		return ['count' => count($rows), 'rows' => $rows];
+	}
+}

+ 5 - 0
common/messages/zh-CN/errorCode.php

@@ -112,6 +112,11 @@ return [
 
 	//ack 1400~1499
 
+
+    //subscribe 1500~1599
+    '1500' => '订阅者名称不能为空',
+    '1501' => '主题名称不能为空',
+    '1502' => '接收者不能为空',
 	/**
 	 * 其它模块
 	 */