Преглед изворни кода

20180130分支更新@libingke

lbk пре 6 година
родитељ
комит
1f33939499

+ 84 - 28
backend/controllers/MessageController.php

@@ -1,7 +1,8 @@
 <?php
 namespace backend\controllers;
 
-use common\logic\MQMessage\Message;
+use common\logic\Amqp\Message;
+use common\logic\Amqp\Queue;
 use Yii;
 
 class MessageController extends BaseController
@@ -12,23 +13,32 @@ class MessageController extends BaseController
 	 */
 	public function actionSend()
 	{
-		//oauth 验证 todo
-
 		//获取接收参数  利用model验证或者判断
 		$params = Yii::$app->request->post();
 		$post = [
-			'message' => 'message 1',
-			'queue' => 'task_queue',
+			'message1' => 'message 1',
+			'message2' => 'message 2',
+			'message3' => 'message 3',
+			'queue' => 'y1',
 		];
 
 		try {
-			$model = new Message();
-			$data = $model->send($post['message'], $post['queue']);
+			$queue = (new Queue())->create($post['queue']);
+			if ($queue['status'] == 1) {
+				$message = new Message($queue['result']);
+				$message->send($post['message1'], $post['queue']);
+				$message->send($post['message2'], $post['queue']);
+				$message->send($post['message3'], $post['queue']);
 
-			$result = ['code' => 200, 'message' => Yii::t('common', 'OK'), 'data' => $data];
+				return $result = ['code' => 200, 'message' => Yii::t('common', 'OK')];
 
-		} catch (\common\logic\MQMessage\Exception $e) {
-			$result = ['code' => $e->getCode(), 'message' => $e->getMessage(), 'data' => []];
+			} else {
+
+				return ['code' => $queue['status'], 'message' => $queue['result']];
+			}
+
+		} catch (\common\logic\Amqp\Exception $e) {
+			$result = ['code' => $e->getCode(), 'message' => $e->getMessage()];
 		}
 
 		return $result;
@@ -52,7 +62,7 @@ class MessageController extends BaseController
 
 			$result = ['code' => 200, 'message' => Yii::t('common', 'OK'), 'data' => $data];
 
-		} catch (\common\logic\MQMessage\Exception $e) {
+		} catch (\common\logic\Amqp\Exception $e) {
 			$result = ['code' => $e->getCode(), 'message' => $e->getMessage(), 'data' => []];
 		}
 
@@ -65,20 +75,30 @@ class MessageController extends BaseController
 	 */
 	public function actionReceive()
 	{
-		//$params = Yii::$app->request->post();
+		//获取接收参数  利用model验证或者判断
+		$params = Yii::$app->request->post();
 		$post = [
-			'message' => 'message 1',
-			'queue' => 'task_queue',
+			'message1' => 'message 1',
+			'message2' => 'message 2',
+			'message3' => 'message 3',
+			'queue' => 'y1',
 		];
 
 		try {
-			$model = new Message();
-			$data = $model->receive($post['queue'], $post['message']);
+			$queue = (new Queue())->create($post['queue']);
+			if ($queue['status'] == 1) {
+				$message = new Message($queue['result']);
+				$message->receive($post['queue']);
 
-			$result = ['code' => 200, 'message' => Yii::t('common', 'OK'), 'data' => $data];
+				return $result = ['code' => 200, 'message' => Yii::t('common', 'OK')];
 
-		} catch (\common\logic\MQMessage\Exception $e) {
-			$result = ['code' => $e->getCode(), 'message' => $e->getMessage(), 'data' => []];
+			} else {
+
+				return ['code' => $queue['status'], 'message' => $queue['result']];
+			}
+
+		} catch (\common\logic\Amqp\Exception $e) {
+			$result = ['code' => $e->getCode(), 'message' => $e->getMessage()];
 		}
 
 		return $result;
@@ -91,24 +111,33 @@ class MessageController extends BaseController
 	 */
 	public function actionBatchReceive()
 	{
-		//$params = Yii::$app->request->post();
+		//获取接收参数  利用model验证或者判断
+		$params = Yii::$app->request->post();
 		$post = [
-			'messages' => ['message 1', 'message 2'],
-			'queue' => 'task_queue',
+			'message1' => 'message 1',
+			'message2' => 'message 2',
+			'message3' => 'message 3',
+			'queue' => 'y1',
 		];
 
 		try {
-			$model = new Message();
-			$data = $model->batchReceive($post['queue'], $post['messages']);
+			$queue = (new Queue())->create($post['queue']);
+			if ($queue['status'] == 1) {
+				$message = new Message($queue['result']);
+				$message->receive($post['queue']);
 
-			$result = ['code' => 200, 'message' => Yii::t('common', 'OK'), 'data' => $data];
+				return $result = ['code' => 200, 'message' => Yii::t('common', 'OK')];
 
-		} catch (\common\logic\MQMessage\Exception $e) {
-			$result = ['code' => $e->getCode(), 'message' => $e->getMessage(), 'data' => []];
+			} else {
+
+				return ['code' => $queue['status'], 'message' => $queue['result']];
+			}
+
+		} catch (\common\logic\Amqp\Exception $e) {
+			$result = ['code' => $e->getCode(), 'message' => $e->getMessage()];
 		}
 
 		return $result;
-
 	}
 
 	/**
@@ -117,6 +146,33 @@ class MessageController extends BaseController
 	 */
     public function actionDelete()
     {
+		//获取接收参数  利用model验证或者判断
+		$params = Yii::$app->request->post();
+		$post = [
+			'message1' => 'message 1',
+			'message2' => 'message 2',
+			'message3' => 'message 3',
+			'queue' => 'y1',
+		];
+
+		try {
+			$queue = (new Queue())->create($post['queue']);
+			if ($queue['status'] == 1) {
+				$message = new Message($queue['result']);
+				$message->delete($post['message1']);
+
+				return $result = ['code' => 200, 'message' => Yii::t('common', 'OK')];
+
+			} else {
+
+				return ['code' => $queue['status'], 'message' => $queue['result']];
+			}
+
+		} catch (\common\logic\Amqp\Exception $e) {
+			$result = ['code' => $e->getCode(), 'message' => $e->getMessage()];
+		}
+
+		return $result;
 
     }
 

+ 38 - 0
common/logic/Amqp/Connect.php

@@ -0,0 +1,38 @@
+<?php
+namespace common\logic\Amqp;
+
+/**
+ * 连接配置
+ * Class Config
+ * @package common\logic\message
+ * @author libingke
+ */
+class Connect
+{
+	CONST HOST = "121.196.226.188";
+
+	CONST PORT = 5672;
+
+	CONST USER = "lbk";
+
+	CONST PASS = "123456";
+
+	private static $_conn = null;
+
+	private function __construct(){}
+
+	private function __clone(){}
+
+	public static function connect()
+	{
+		if (self::$_conn == null) {
+			self::$_conn = new \PhpAmqpLib\Connection\AMQPStreamConnection(
+				self::HOST,
+				self::PORT,
+				self::USER,
+				self::PASS
+			);
+		}
+		return self::$_conn;
+	}
+}

+ 2 - 1
common/logic/MQMessage/Exception.php → common/logic/Amqp/Exception.php

@@ -1,11 +1,12 @@
 <?php
-namespace common\logic\MQMessage;
+namespace common\logic\Amqp;
 
 use PhpAmqpLib\Exception\AMQPProtocolChannelException as BaseException;
 
 /**
  * Class Exception
  * @package common\logic\message
+ * @author libingke
  */
 class Exception extends BaseException
 {

+ 118 - 0
common/logic/Amqp/Message.php

@@ -0,0 +1,118 @@
+<?php
+
+namespace common\logic\Amqp;
+
+use PhpAmqpLib\Message\AMQPMessage;
+
+/**
+ * 处理消息体逻辑
+ * Class Message
+ * @package common\logic\MQMessage
+ * @author libingke
+ */
+class Message extends Connect
+{
+	private $_connect;
+
+	private $_channel;
+
+	private $_callback_queue;
+
+	private $_result;
+
+	private $_response;
+
+	private $_corr_id;
+
+	public function __construct($queueName)
+	{
+		$this->_connect = self::connect();
+
+		$this->_channel = $this->_connect->channel();
+
+		$this->_callback_queue = $queueName;
+	}
+
+	/**
+	 * [发送信息]
+	 * @author: libingke
+	 * @param string $body 消息主体
+	 * @param string $routing_key
+	 * @return bool
+	 */
+	public function send($body, $routing_key)
+	{
+		$properties = [
+			'content_type' => 'text/plain',
+			'correlation_id' => uniqid(),
+			'reply_to' => $this->_callback_queue
+		];
+
+		if (is_string($body)) {
+			$msg = new AMQPMessage( (string) $body, $properties);
+			$this->_channel->basic_publish($msg, '', $routing_key);
+		}
+
+		return true;
+	}
+
+
+	/**
+	 * [批量发送信息]
+	 * @author: libingke
+	 * @param string $body 消息主体
+	 * @param string $routing_key
+	 * @return array 返回数据
+	 */
+	public function batchSend($body, $routing_key = '')
+	{
+	}
+
+	/**
+	 * [消费信息]
+	 * @author: libingke
+	 * @param string $queue 队列名称
+	 * @param string $str
+	 * @return bool
+	 */
+	public function receive($queue)
+	{
+		$callback = function($msg) {
+			echo $msg->get('correlation_id');
+			echo " [x] Received ", $msg->body, "\n";
+			$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
+		};
+
+		$this->_channel->basic_qos(null, 1, null);
+		$this->_channel->basic_consume(
+			$queue, '', false, false, false, false,
+			$callback
+		);
+
+		/*while(count($this->_channel->callbacks)) {
+			$this->_channel->wait();
+		}*/
+
+		return true;
+	}
+
+	/**
+	 * [批量消费信息]
+	 * @author: libingke
+	 * @param string $queue 队列名称
+	 * @param string $arr
+	 * @return array 返回数据
+	 */
+	public function batchReceive($queue, $arr)
+	{
+	}
+
+	/**
+	 * [删除信息]
+	 * @author: libingke
+	 */
+	public function delete($message)
+	{
+
+	}
+}

+ 70 - 0
common/logic/Amqp/Queue.php

@@ -0,0 +1,70 @@
+<?php
+
+namespace common\logic\Amqp;
+
+use PhpAmqpLib\Exception\AMQPProtocolChannelException;
+
+/**
+ * 队列逻辑
+ * Class Queue
+ * @package common\logic\Amqp
+ * @author libingke
+ */
+class Queue extends Connect
+{
+    private $_connect;
+
+    private $_channel;
+
+    public function __construct()
+	{
+        $this->_connect = self::connect();
+
+		$this->_channel = $this->_connect->channel();
+    }
+
+	/**
+	 * [创建队列]
+	 * @author: libingke
+	 * @param string $queueName
+	 * @return array
+	 */
+    public function create($queueName)
+	{
+		try {
+			list($callback_queue, ,) = $this->_channel->queue_declare(
+				(string) $queueName,
+				false,
+				0,//持久化
+				false,
+				false
+			);
+
+			return ['status' => 1, 'result' => $callback_queue];
+
+		} catch (AMQPProtocolChannelException $e) {
+
+			return ['status' => 0, 'result' => $e->getMessage()];
+		}
+	}
+
+	/**
+	 * [删除队列]
+	 * @author: libingke
+	 * @param string $queueName
+	 * @return array
+	 */
+	public function delete($queueName)
+	{
+		try {
+			$this->_channel->queue_delete((string) $queueName);
+
+			return ['status' => 1, 'result' => 'OK'];
+
+		} catch (AMQPProtocolChannelException $e) {
+
+			return ['status' => 0, 'result' => $e->getMessage()];
+		}
+	}
+
+}

+ 0 - 18
common/logic/MQMessage/Config.php

@@ -1,18 +0,0 @@
-<?php
-namespace common\logic\MQMessage;
-
-/**
- * 连接配置
- * Class Config
- * @package common\logic\message
- */
-class Config
-{
-	CONST HOST = "121.196.226.188";
-
-	CONST PORT = 5672;
-
-	CONST USER = "lbk";
-
-	CONST PASS = "123456";
-}

+ 0 - 207
common/logic/MQMessage/Message.php

@@ -1,207 +0,0 @@
-<?php
-
-namespace common\logic\MQMessage;
-
-use PhpAmqpLib\Message\AMQPMessage;
-use PhpAmqpLib\Connection\AMQPStreamConnection;
-
-/**
- * 处理消息体逻辑
- * Class Message
- * @package common\logic\MQMessage
- */
-class Message extends Config
-{
-    private $connection;
-
-    private $channel;
-
-    private $callback_queue;
-
-    private $response = null;
-
-    private $corr_id;
-
-    private $result = '';
-
-    public function __construct()
-	{
-        $this->connection = new AMQPStreamConnection(
-        	self::HOST,
-			self::PORT,
-			self::USER,
-			self::PASS
-		);
-
-		$this->channel = $this->connection->channel();
-		list($this->callback_queue, ,) = $this->channel->queue_declare(
-			'', false, false, true, false
-		);
-
-		$this->channel->basic_consume(
-			$this->callback_queue, '', false, false, false, false,
-			array($this, 'on_response')
-		);
-    }
-
-	/**
-	 * [发送信息]
-	 * @author: libingke
-	 * @param string $body 消息主体
-	 * @param string $routing_key
-	 * @return array 返回数据
-	 */
-	public function send($body, $routing_key = 'task_queue')
-	{
-		$properties = [
-			'correlation_id' => uniqid(),
-			'reply_to' => $this->callback_queue
-		];
-
-		if (is_string($body)) {
-			$msg = new AMQPMessage( (string) $body,$properties);
-			$this->result = $this->channel->basic_publish($msg, '', $routing_key);
-		}
-
-		$this->channel->close();
-		$this->connection->close();
-
-		return $this->result;
-	}
-
-
-	/**
-	 * [批量发送信息]
-	 * @author: libingke
-	 * @param string $body 消息主体
-	 * @param string $routing_key
-	 * @return array 返回数据
-	 */
-	public function batchSend($body, $routing_key = '')
-	{
-		$properties = [
-			'correlation_id' => uniqid(),
-			'reply_to' => $this->callback_queue
-		];
-
-		if (is_array($body)) {
-			$msg = new AMQPMessage((string) $body, $properties);
-			$this->result = $this->channel->batch_basic_publish($msg, '', $routing_key);
-		}
-
-		$this->channel->close();
-		$this->connection->close();
-
-		return $this->result;
-	}
-
-	/**
-	 * [消费信息]
-	 * @author: libingke
-	 * @param string $queue 队列名称
-	 * @param string $str
-	 * @return array 返回数据
-	 */
-	public function receive($queue, $str)
-	{
-		$callback = function($msg) {
-			echo " [x] Received ", $msg->body, "\n";
-		};
-
-		$this->result = $this->channel->basic_consume($queue, '', false, true, false, false, $callback);
-
-		while(count($this->channel->callbacks)) {
-			$this->channel->wait();
-		}
-
-		return $this->result;
-	}
-
-	/**
-	 * [批量消费信息]
-	 * @author: libingke
-	 * @param string $queue 队列名称
-	 * @param string $arr
-	 * @return array 返回数据
-	 */
-	public function batchReceive($queue, $arr)
-	{
-		$callback = function($msg) {
-			echo " [x] Received ", $msg->body, "\n";
-		};
-
-		$this->result = $this->channel->basic_consume($queue, '', false, true, false, false, $callback);
-
-		while(count($this->channel->callbacks)) {
-			$this->channel->wait();
-		}
-
-		return $this->result;
-	}
-
-	/**
-	 * [删除信息]
-	 * @author: libingke
-	 */
-	public function delete()
-	{
-
-	}
-
-	public function on_response($rep)
-	{
-		if($rep->get('correlation_id') == $this->corr_id) {
-			$this->result .= $rep->body;
-			$this->response = $rep->body;
-		}
-	}
-
-    public static function CallMq($n)
-	{
-
-        $connection =   new AMQPStreamConnection(
-            self::HOST, self::PORT, self::USER, self::PASS
-		);
-        $channel = $connection->channel();
-
-        $channel->queue_declare('task_queue', false, true, false, false);
-
-        $data=empty($n)?"Hello World!":$n;
-
-        $msg = new AMQPMessage($data,
-            array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
-        );
-
-        $channel->basic_publish($msg, '', 'task_queue');
-
-        $channel->close();
-        $connection->close();
-
-        return true;
-    }
-
-
-    public static function CallUserMq($n)
-	{
-
-        $connection =   new AMQPStreamConnection(
-            self::HOST, self::PORT, self::USER, self::PASS
-		);
-        $channel = $connection->channel();
-
-        $channel->queue_declare('login', false, true, false, false);
-
-        $data=empty($n)?"Hello World!":$n;
-
-        $msg = new AMQPMessage($data,
-            array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
-        );
-
-        $channel->basic_publish($msg, '', 'login');
-
-        $channel->close();
-        $connection->close();
-
-        return true;
-    }
-}