Browse Source

修改删除,ack;优化;
@libingke
@2018/02/08

lbk 6 years ago
parent
commit
f782a5d1bb

+ 5 - 7
backend/config/main.php

@@ -15,7 +15,7 @@ return [
     'components' => [
         'request' => [
 			'baseUrl' => '',
-            'csrfParam' => '_csrf-backend',
+			'enableCsrfValidation' => false,
         ],
         'user' => [
             'identityClass' => 'common\models\User',
@@ -42,13 +42,11 @@ return [
         'db' =>   require __DIR__ . '/../../common/config/db.php',
 
         'urlManager' => [
-            'enablePrettyUrl' => true,  //美化url==ture
-            'enableStrictParsing' => false,  //不启用严格解析
-            'showScriptName' => false,   //隐藏index.php
+            'enablePrettyUrl' => true,
+            'enableStrictParsing' => false,
+            'showScriptName' => false,
             'rules' => [
-                /*'<module:\w+>/<controller:\w+>/<id:\d+>' => '<module>/<controller>/view',
-                '<controller:\w+>/<id:\d+>' => '<controller>/view',*/
-            ],
+			],
         ]
     ],
     'params' => $params,

+ 0 - 69
backend/controllers/AdminController.php

@@ -1,69 +0,0 @@
-<?php
-namespace backend\controllers;
-
-use Yii;
-use yii\web\Controller;
-use yii\filters\VerbFilter;
-use yii\filters\AccessControl;
-use common\models\LoginForm;
-
-use backend\controllers;
-use yii\data\ActiveDataProvider;
-use common\models\AdminLog;
-use common\models\Banner;
-
-
-/**
- * AdminLog Controller
- */
-class AdminController extends Controller
-{
-    public function actionIndex()
-    {
-        $dataProvider = new ActiveDataProvider([
-            'query' => AdminLog::find(),
-            'sort' => [
-                'defaultOrder' => [
-                    'addtime' => SORT_DESC
-                ]
-            ],
-        ]);
-        return $this->render('index',[
-            'dataProvider' => $dataProvider
-        ]);
-    }
-
-    public function actionView($id){
-        return $this->render('view',[
-            'model'=>AdminLog::findOne($id),
-        ]);
-    }
-
-    public function actionCreate()
-    {
-
-
-        $model = new Banner();
-        $model->status=Banner::STATUS_DISPLAY;
-
-        if ($model->load(Yii::$app->request->post()) && $model->save()) {
-            //保存操作记录
-            \common\models\AdminLog::saveLog('banner','create',$model->searchById($model->primaryKey),$model->primaryKey);
-            Yii::$app->session->setFlash('success','Banner【'.$model->title.'】发布成功');
-            return $this->redirect(['index']);
-        } else {
-            return $this->render('create', [
-                'model' => $model,
-            ]);
-        }
-    }
-
-    public function searchById($id){
-        if (($model = Banner::findOne($id)) !== null) {
-            return json_encode($model->toArray());
-        } else {
-            throw new \yii\web\NotFoundHttpException('The requested page does not exist.');
-        }
-    }
-
-}

+ 14 - 4
backend/controllers/BaseController.php

@@ -1,6 +1,7 @@
 <?php
 namespace backend\controllers;
 
+use components\Exception;
 use yii\web\Controller;
 
 /**
@@ -9,18 +10,27 @@ use yii\web\Controller;
  */
 class BaseController extends Controller
 {
+	/**
+	 * init
+	 */
 	public function init()
 	{
 		\Yii::$app->response->format = \yii\web\Response::FORMAT_JSON;
 		parent::init();
 	}
 
+	/**
+	 * beforeAction
+	 * @return bool
+	 * @throws Exception
+	 */
 	public function beforeAction($action)
 	{
-		//todo
-		/*if (Yii::$app->user->isGuest) {
+		try {
+			return parent::beforeAction($action);
 
-		}*/
-		return true;
+		} catch (\Exception $e) {
+			throw new Exception($e->statusCode, $e->getMessage());
+		}
 	}
 }

+ 66 - 38
backend/controllers/MessageController.php

@@ -2,63 +2,47 @@
 namespace backend\controllers;
 
 use backend\forms\MessageForm;
-use common\helpers\KeyHelper;
-use common\logic\Amqp\Cache;
 use components\Exception;
-use components\service\AmqpConfig;
 use Yii;
 
 class MessageController extends BaseController
 {
 	/**
-	 * [查询回执信息]
-	 * @author: libingke
-	 * @param $query
-	 * @param $queue
-	 * @return bool
-	 * @throws Exception
+	 * behaviors
+	 * @return array
 	 */
-	public function actionQueryReceipt($query, $queue)
+	public function behaviors()
 	{
-		if (is_string($query) && is_string($query)) {
-			$statusKey = KeyHelper::getMessageStatusKey($query, $queue);
-			$resultKey = KeyHelper::getMessageResultKey($query, $queue);
-			$status = Yii::$app->redis->get($statusKey);
-
-			if ($status !== false) {
-				$mark = AmqpConfig::getMarkById($status);//状态码 => 提示信息
-
-				if (Cache::STATUS_HAND_OK == $status) {
-					return json_decode(Yii::$app->redis->get($resultKey), true);//直接返回数据
-
-				} else {
-					throw new Exception(2101, $mark);//非正常时回执
-				}
-
-			} else {
-				throw new Exception(2102);//无效
-			}
-
-		} else {
-			throw new Exception(2102);//无效
-		}
+		return [
+			'verbs' => [
+				'class' => \yii\filters\VerbFilter::className(),
+				'actions' => [
+					'send' => ['POST'],
+					'batch-send' => ['POST'],
+					'list' => ['GET'],
+					'purge' => ['POST'],
+					'consume' => ['POST'],
+					'delete' => ['POST'],
+					'ack' => ['POST'],
+				],
+			],
+		];
 	}
 
 	/**
 	 * 发送消息
 	 * @author: libingke
 	 * @return array
-	 * @throws Exception
 	 */
 	public function actionSend()
 	{
 		$model = new MessageForm();
 		$model->setScenario('send');
-		$model->load(['MessageForm' => Yii::$app->request->get()]);
+		$model->load(['MessageForm' => Yii::$app->request->post()]);
 
 		$data = [];
 		if ($model->validate()) {
-			$data = $model->sendMessageV1_1();
+			$data = $model->sendMessage();
 		} else {
 			$model->handleError();//处理验证失败
 		}
@@ -79,7 +63,7 @@ class MessageController extends BaseController
 	{
 		$model = new MessageForm();
 		$model->setScenario('batch_send');
-		$model->load(['MessageForm' => Yii::$app->request->get()]);
+		$model->load(['MessageForm' => Yii::$app->request->post()]);
 
 		$data = [];
 		if ($model->validate()) {
@@ -98,6 +82,7 @@ class MessageController extends BaseController
 	/**
 	 * 获取队列消息列表
 	 * @author: libingke
+	 * @return array
 	 */
 	public function actionList()
 	{
@@ -127,7 +112,7 @@ class MessageController extends BaseController
 	{
 		$model = new MessageForm();
 		$model->setScenario('consume');
-		$model->load(['MessageForm' => Yii::$app->request->get()]);
+		$model->load(['MessageForm' => Yii::$app->request->post()]);
 
 		$data = [];
 		if ($model->validate()) {
@@ -143,24 +128,67 @@ class MessageController extends BaseController
 		];
 	}
 
+	/**
+	 * 手动应答
+	 * @author: libingke
+	 */
+	public function actionAck()
+	{
+		$model = new MessageForm();
+		$model->setScenario('ack');
+		$model->load(['MessageForm' => Yii::$app->request->post()]);
+
+		$data = [];
+		if ($model->validate()) {
+			$data = $model->ackMessage();
+		} else {
+			$model->handleError();
+		}
+
+		return [
+			'code' => 200,
+			'message' => Yii::t('error', 200),
+			'data' => $data
+		];
+	}
+
 	/**
 	 * 删除消息
 	 * @author: libingke
+	 * @return array
+	 * @throws Exception
 	 */
     public function actionDelete()
     {
+		$model = new MessageForm();
+		$model->setScenario('delete');
+		$model->load(['MessageForm' => Yii::$app->request->post()]);
+
+		$data = [];
+		if ($model->validate()) {
+			$data = $model->deleteMessage();
+		} else {
+			$model->handleError();
+		}
 
+		return [
+			'code' => 200,
+			'message' => Yii::t('error', 200),
+			'data' => $data
+		];
     }
 
+
 	/**
 	 * 清空消息
 	 * @author: libingke
+	 * @return array
 	 */
 	public function actionPurge()
 	{
 		$model = new MessageForm();
 		$model->setScenario('purge');
-		$model->load(['MessageForm' => Yii::$app->request->get()]);
+		$model->load(['MessageForm' => Yii::$app->request->post()]);
 
 		$data = [];
 		if ($model->validate()) {

+ 58 - 0
backend/controllers/QueryController.php

@@ -0,0 +1,58 @@
+<?php
+namespace backend\controllers;
+
+use components\Exception;
+use components\service\AmqpConfig;
+use components\service\Redis;
+use Yii;
+
+class QueryController extends BaseController
+{
+	/**
+	 * behaviors
+	 * @return array
+	 */
+	public function behaviors()
+	{
+		return [
+			'verbs' => [
+				'class' => \yii\filters\VerbFilter::className(),
+				'actions' => [
+					'message-status' => ['GET'],
+				],
+			],
+		];
+	}
+
+	/**
+	 * 查询消息状态
+	 * @author: libingke
+	 * @return mixed
+	 * @throws Exception
+	 */
+	public function actionMessageStatus()
+	{
+		$params = Yii::$app->request->queryParams;
+		if (!isset($params['queue']))
+			throw new Exception(1100);
+
+		if (!is_string($params['queue']) || !$params['queue'])
+			throw new Exception(1101);
+
+		if (!isset($params['mid']))
+			throw new Exception(1203);
+
+		if (!is_string($params['mid']) || !$params['mid'])
+			throw new Exception(1204);
+
+		$r = Redis::get($params['queue'], $params['mid'], 'status');
+		if ($r == null)
+			throw new Exception(1004);
+
+		return [
+			'code' => 200,
+			'message' => Yii::t('error', 200),
+			'data' => ['status' => $r, 'status_mark' => AmqpConfig::getMarkById($r)]
+		];
+	}
+}

+ 19 - 7
backend/controllers/QueueController.php

@@ -7,6 +7,24 @@ use Yii;
 
 class QueueController extends BaseController
 {
+	/**
+	 * behaviors
+	 * @return array
+	 */
+	public function behaviors()
+	{
+		return [
+			'verbs' => [
+				'class' => \yii\filters\VerbFilter::className(),
+				'actions' => [
+					'list' => ['GET'],
+					'create' => ['POST'],
+					'delete' => ['POST'],
+				],
+			],
+		];
+	}
+
 	/**
 	 * [创建队列]
 	 * @author: libingke
@@ -15,9 +33,6 @@ class QueueController extends BaseController
 	 */
 	public function actionCreate()
 	{
-		if (!Yii::$app->request->isPost)
-			throw new Exception('1001');
-
 		$model = new QueueForm();
 		$model->setScenario('create_queue');
 		$model->load(['QueueForm' => Yii::$app->request->post()]);
@@ -44,9 +59,6 @@ class QueueController extends BaseController
 	 */
 	public function actionDelete()
 	{
-		if (!Yii::$app->request->isPost)
-			throw new Exception('1001');
-
 		$model = new QueueForm();
 		$model->setScenario('delete_queue');
 		$model->load(['QueueForm' => Yii::$app->request->post()]);
@@ -65,7 +77,6 @@ class QueueController extends BaseController
 		];
 	}
 
-
 	/**
 	 * 获取队列列表
 	 * @author: libingke
@@ -74,6 +85,7 @@ class QueueController extends BaseController
 	{
 		$model = new QueueForm();
 		$data = $model->getQueueList();
+
 		return [
 			'code' => 200,
 			'message' => Yii::t('error', 200),

+ 0 - 101
backend/controllers/RabbitController.php

@@ -1,101 +0,0 @@
-<?php
-namespace backend\controllers;
-
-use common\models\RabbitLog;
-use Yii;
-use yii\web\Controller;
-use yii\filters\VerbFilter;
-use yii\filters\AccessControl;
-use common\models\LoginForm;
-
-use backend\controllers;
-use yii\data\ActiveDataProvider;
-use common\models\Banner;
-
-
-/**
- * RabbitLog Controller
- */
-class RabbitController extends Controller
-{
-
-    /**
-     * @inheritdoc
-     */
-    public function behaviors()
-    {
-        return [
-            'access' => [
-                'class' => AccessControl::className(),
-                'rules' => [
-                    [
-                        'actions' => ['login','view', 'error', 'cjyy'],
-                        'allow' => true,
-                    ],
-                    [
-                        'actions' => ['logout', 'index'],
-                        'allow' => true,
-                        'roles' => ['@'],
-                    ],
-                ],
-            ],
-            'verbs' => [
-                'class' => VerbFilter::className(),
-                'actions' => [
-                    'logout' => ['post'],
-                ],
-            ],
-        ];
-    }
-
-
-    public function actionIndex()
-    {
-        $dataProvider = new ActiveDataProvider([
-            'query' => RabbitLog::find(),
-            'sort' => [
-                'defaultOrder' => [
-                    'addtime' => SORT_DESC
-                ]
-            ],
-        ])
- ;
-        return $this->render('index',[
-            'dataProvider' => $dataProvider
-        ]);
-    }
-
-    public function actionView($id){
-        return $this->render('view',[
-            'model'=>RabbitLog::findOne($id),
-        ]);
-    }
-
-    public function actionCreate()
-    {
-
-
-        $model = new Banner();
-        $model->status=Banner::STATUS_DISPLAY;
-
-        if ($model->load(Yii::$app->request->post()) && $model->save()) {
-            //保存操作记录
-            \common\models\RabbitLog::saveLog('banner','create',$model->searchById($model->primaryKey),$model->primaryKey);
-            Yii::$app->session->setFlash('success','Banner【'.$model->title.'】发布成功');
-            return $this->redirect(['index']);
-        } else {
-            return $this->render('create', [
-                'model' => $model,
-            ]);
-        }
-    }
-
-    public function searchById($id){
-        if (($model = RabbitLog::findOne($id)) !== null) {
-            return json_encode($model->toArray());
-        } else {
-            throw new \yii\web\NotFoundHttpException('The requested page does not exist.');
-        }
-    }
-
-}

+ 1 - 11
backend/controllers/SiteController.php

@@ -22,7 +22,7 @@ class SiteController extends Controller
                 'class' => AccessControl::className(),
                 'rules' => [
                     [
-                        'actions' => ['login', 'error', 'cjyy','index'],
+                        'actions' => ['login', 'error', 'index'],
                         'allow' => true,
                     ],
                     [
@@ -64,16 +64,6 @@ class SiteController extends Controller
     }
 
 
-    /**
-     * Displays homepage.
-     *
-     * @return string
-     */
-    public function actionCjyy()
-    {
-        return $this->render('index');
-    }
-
     /**
      * Login action.
      *

+ 23 - 0
backend/forms/BaseForm.php

@@ -1,6 +1,7 @@
 <?php
 namespace backend\forms;
 
+use Yii;
 use yii\base\Model;
 use components\Exception;
 
@@ -20,4 +21,26 @@ class BaseForm extends Model
 			throw new Exception(1003, $result);
 		}
 	}
+
+	/**
+	 * getConnect
+	 * @author: libingke
+	 * @return \PhpAmqpLib\Connection\AMQPStreamConnection
+	 * @throws Exception
+	 */
+	public function getConnect()
+	{
+		try {
+			return new \PhpAmqpLib\Connection\AMQPStreamConnection(
+				Yii::$app->Amqp->host,
+				Yii::$app->Amqp->port,
+				Yii::$app->Amqp->user,
+				Yii::$app->Amqp->pass,
+				Yii::$app->Amqp->vhost
+			);
+
+		} catch (\Exception $e) {
+			throw new Exception(1001, $e->getMessage());
+		}
+	}
 }

+ 202 - 309
backend/forms/MessageForm.php

@@ -2,25 +2,30 @@
 
 namespace backend\forms;
 
+use PhpAmqpLib\Message\AMQPMessage;
 use components\service\AmqpConfig;
-use PhpAmqpLib\Exception\AMQPProtocolChannelException;
-use common\helpers\KeyHelper;
-use common\logic\Amqp\Cache;
-use components\Curl;
+use components\service\Redis;
 use components\Exception;
-use PhpAmqpLib\Message\AMQPMessage;
+use components\Curl;
+use common\helpers\KeyHelper;
 use yii\helpers\ArrayHelper;
 use Yii;
 
 class MessageForm extends BaseForm
 {
-	/* send */
+	/**
+	 * @var
+	 */
 	public $queue;
 
+	/**
+	 * @var
+	 */
 	public $message;
 
-
-	/* message_list */
+	/**
+	 * @var
+	 */
 	public $name;
 
 	/**
@@ -44,27 +49,30 @@ class MessageForm extends BaseForm
 	public $mid;
 
 	/**
-	 * @var string 消费类型
+	 * @var array 消费ids
 	 */
-	public $type;
+	public $mids;
 
 	/**
-	 * @var bool 是否空消费
+	 * @var string 消费类型
 	 */
-	public $do_nothing = false;
+	public $type;
 
 	/**
 	 * @var bool 自动应答
 	 */
-	public $ack = false;
+	public $ack;
 
+	/**
+	 * @var bool 强制删除
+	 */
+	public $forced;
 
 	/**
 	 * @var bool
 	 */
 	private $_stop = false;
 
-
 	/**
 	 * @var string
 	 */
@@ -90,142 +98,44 @@ class MessageForm extends BaseForm
 	 */
 	private $_result;
 
-	const TYPE_MID	 = 'mid';//消费某条
-	const TYPE_COUNT = 'count';//(从第一条开始) 消费条数
-	const TYPE_MC	 = 'mid_count';//(从某条开始) 消费条数
-
 	public function rules()
 	{
 		return [
+			[['name'], 'required', 'on' => ['message_list', 'purge']],
+			[['name'], 'trim', 'on' => ['message_list', 'purge', 'consume', 'delete', 'ack']],
+
 			/* 发送消息 */
 			[['queue'], 'trim', 'on' => ['send', 'batch_send']],
 			[['queue', 'message'], 'required', 'on' => ['send', 'batch_send']],
-			['message', 'validateMessage', 'on' => ['send', 'batch_send']],
+			['message', 'validateArray', 'on' => ['send', 'batch_send']],
 
 			/* 获取消息列表 */
-			[['name'], 'trim', 'on' => ['message_list', 'purge', 'consume']],
-			[['name'], 'required', 'on' => ['message_list', 'purge']],
-			['count', 'integer', 'min' => 1, 'max' => 1000, 'message' => 2012,
-				'tooSmall' => 2013, 'tooBig' => 2014, 'on' => ['message_list']],
+			['count', 'integer', 'min' => 1, 'max' => 1000, 'message' => 1205,
+				'tooSmall' => 1206, 'tooBig' => 1207, 'on' => ['message_list']],
 			['count', 'default', 'value' => 20, 'on' => ['message_list']],
-			['requeue', 'boolean', 'message' => 2010, 'on' => ['message_list']],
+			['requeue', 'boolean', 'message' => 1209, 'on' => ['message_list']],
 			['requeue', 'default', 'value' => true, 'on' => ['message_list']],
-			['encoding', 'in', 'range' => ['auto', 'base64'], 'message' => 2011, 'on' => ['message_list']],
+			['encoding', 'in', 'range' => ['auto', 'base64'], 'message' => 1210, 'on' => ['message_list']],
 			['encoding', 'default', 'value' => 'auto', 'on' => ['message_list']],
 
 			/* 消费 */
-			[['type', 'name'], 'required', 'on' => ['consume']],
-			['type', 'in', 'range' => [static::TYPE_MID, static::TYPE_COUNT, static::TYPE_MC], 'on' => 'consume'],
-			['type', 'validateType', 'on' => 'consume'],
-			['mid', 'string', 'on' => 'consume'],
-			['count', 'integer', 'min' => 1, 'max' => 65000, 'message' => 2012,
-				'tooSmall' => 2013, 'on' => ['consume']],
-			['do_nothing', 'default', 'value' => false, 'on' => 'consume'],
-			['do_nothing', 'boolean',  'on' => 'consume'],
-			['ack', 'default', 'value' => false, 'on' => 'consume'],
-			['ack', 'boolean',  'on' => 'consume'],
+			[['name', 'count'], 'required', 'on' => ['consume']],
+			['count', 'integer', 'min' => 1, 'max' => 65000, 'message' => 1205,
+				'tooSmall' => 1206, 'on' => ['consume']],
+
+			/* delete & ack */
+			[['mids', 'name'], 'required', 'on' => ['delete', 'ack']],
+			['mids', 'validateArray', 'on' => ['delete', 'ack']],
+			['name', 'string', 'on' => ['delete', 'ack']],
+			['forced', 'default', 'value' => false, 'on' => 'delete'],
+			['forced', 'boolean',  'on' => 'delete'],
 		];
 	}
 
-	public function validateMessage($attribute)
+	public function validateArray($attribute)
 	{
-		if (!$this->$attribute)
-			throw new Exception(2001);
-
-		if (!is_array($this->$attribute))
-			throw new Exception(2002, "{$attribute} 必须是数组");
-	}
-
-	public function validateType($attribute)
-	{
-		if ($this->$attribute == static::TYPE_MC) {
-			if ($this->mid == null)
-				throw new Exception(2016);
-			if ($this->count == null)
-				throw new Exception(2017);
-
-		} else {
-			if ($this->{$this->$attribute} == null)
-				throw new Exception(2016, $this->$attribute . " 不能为空");
-		}
-	}
-
-	/**
-	 * [连接AMQP]
-	 * @author: libingke
-	 * @return \PhpAmqpLib\Connection\AMQPStreamConnection
-	 */
-	protected function getConnect()
-	{
-		return new \PhpAmqpLib\Connection\AMQPStreamConnection(
-			Yii::$app->Amqp->host,
-			Yii::$app->Amqp->port,
-			Yii::$app->Amqp->user,
-			Yii::$app->Amqp->pass,
-			Yii::$app->Amqp->vhost
-		);
-	}
-
-	/**
-	 * [发送消息]
-	 * @author: libingke
-	 * @return array
-	 * @throws Exception
-	 * @version 1.0
-	 */
-	public function sendMessage()
-	{
-		$body = call_user_func_array([$this, '_messageBody'], [$this->attributes]);
-		$mid = KeyHelper::getUniqueId('message_send');
-		$properties = [
-			'content_type' => 'text/plain',
-			'message_id' => $mid,
-			'correlation_id' => $mid,
-			'consumer_tag' => $mid
-		];
-
-		$data = [];
-		$e_name = 'message.default';
-		$k_route = 'route.default';
-		$q_name = $this->queue;
-
-		$connect  = Yii::$app->Amqp->AMQPConnection();
-		$channel  = new \AMQPChannel($connect);
-		$exchange = new \AMQPExchange($channel);
-		$exchange->setName($e_name);
-		$exchange->setType(AMQP_EX_TYPE_DIRECT);
-		$exchange->setFlags(AMQP_DURABLE);//持久化
-		$exchange->declareExchange();//声明交换机
-
-		$queue = new \AMQPQueue($channel);
-		$queue->setName($q_name);
-		$queue->declareQueue(); //声明队列
-		$queue->bind($e_name, $k_route);
-		$r = $exchange->publish($body, $k_route, AMQP_NOPARAM, $properties);
-		if ($r == true) {
-			$data['message_total'] = $queue->declareQueue();
-			$data['queue_name'] = $this->queue;
-			$data['message_new'] = [
-				'mid' => $mid,
-				'body' => $body
-			];
-
-			if ($data['message_total'] > 1) {
-				$data['message_first'] = [
-					'mid' => $queue->get()->getMessageId(),
-					'body' => $queue->get()->getBody()
-				];
-			}
-
-			Cache::setData(
-				'queue:mid:'.$mid,
-				Cache::STATUS_SEND_OK
-			);
-		} else {
-			throw new Exception(2103);
-		}
-
-		return $data;
+		if (!$this->$attribute || !is_array($this->$attribute))
+			throw new Exception(1003, "{$attribute} 必须是数组");
 	}
 
 	/**
@@ -235,10 +145,9 @@ class MessageForm extends BaseForm
 	 * @throws Exception
 	 * @version 1.1
 	 */
-	public function sendMessageV1_1()
+	public function sendMessage()
 	{
 		try {
-			//connect
 			$connect = $this->getConnect();
 			$channel = $connect->channel();
 			$this->_handleMessage($this->message);
@@ -267,19 +176,17 @@ class MessageForm extends BaseForm
 			$channel->close();
 			$connect->close();
 
-			$statusKey = KeyHelper::getMessageStatusKey($this->_mid, $q_name);
-			Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_HAND_OK);
+			Redis::set($q_name, $this->_mid, 'status', AmqpConfig::STATUS_SEND_OK);
 
 			return $data;
 
 		} catch (\Exception $e) {
-			throw new Exception(1000, $e->getMessage());
+			throw new Exception(1001, $e->getMessage());
 		}
 	}
 
 	/**
 	 * [构造消息体]
-	 * @author: libingke
 	 * @param $data
 	 */
 	private function _handleMessage($data)
@@ -308,13 +215,12 @@ class MessageForm extends BaseForm
 	public function batchSendMessage()
 	{
 		try {
-			//connect
 			$connect = $this->getConnect();
 			$channel = $connect->channel();
 			//预声明
 			$channel->queue_declare($this->queue,
 				false, true, false, false);
-			//batch_basic_publish todo
+
 			foreach ($this->message as $k => $v) {
 				$this->_handleMessage($v);
 				$this->_rows[] = [
@@ -323,8 +229,7 @@ class MessageForm extends BaseForm
 				];
 				$channel->batch_basic_publish($this->_message, '', $this->queue);
 
-				$statusKey = KeyHelper::getMessageStatusKey($this->_mid, $this->queue);
-				Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_HAND_OK);
+				Redis::set($this->queue, $this->_mid, 'status', AmqpConfig::STATUS_SEND_OK);
 			}
 			$channel->publish_batch();
 
@@ -350,7 +255,7 @@ class MessageForm extends BaseForm
 			return $data;
 
 		} catch (\Exception $e) {
-			throw new Exception(1000, $e->getMessage());
+			throw new Exception(1001, $e->getMessage());
 		}
 	}
 
@@ -360,14 +265,11 @@ class MessageForm extends BaseForm
 	 */
 	public function getMessageList()
 	{
-		/* api错误码 */
-		$badCode = 2000;
-		/* 登录验证 */
-		$authStr = Yii::$app->Amqp->getConfig('user') . ':' . Yii::$app->Amqp->getConfig('pass');
-		/* URL */
-		$vhost = urlencode(Yii::$app->Amqp->getConfig('vhost'));
-		$url = Yii::$app->Amqp->getConfig('host') . ':' . Yii::$app->Amqp->getConfig('api_port') .
+		$authStr = Yii::$app->Amqp->user . ':' . Yii::$app->Amqp->pass;
+		$vhost = urlencode(Yii::$app->Amqp->vhost);
+		$url = Yii::$app->Amqp->host . ':' . Yii::$app->Amqp->api_port .
 			"/api/queues/{$vhost}/" . $this->name . "/get";
+
 		$postParams = [
 			'name' => $this->name,
 			'count' => $this->count,
@@ -382,20 +284,22 @@ class MessageForm extends BaseForm
 		$curl->setRawPostData(json_encode($postParams));
 		$result = json_decode($curl->post($url), true);
 		if ($curl->responseCode != 200)
-			throw new Exception($badCode);
+			throw new Exception(1002);
+
 		if ($curl->errorText)
-			throw new Exception($badCode, $curl->errorText);
+			throw new Exception(1002, $curl->errorText);
+
 		if (isset($result['error']) && is_string($result['error']))
-			throw new Exception($badCode, $result['error']);
+			throw new Exception(1002, $result['error']);
 
 		ArrayHelper::multisort($result,'message_count',SORT_ASC);
 		//print_r($result);exit();
 		$rows = [];
 		foreach ($result as $k => $v) {
-			$rows[$k]['before_count']	= $v['message_count'];
-			$rows[$k]['payload']			= $v['payload'];
-			$rows[$k]['payload_encoding']= $v['payload_encoding'];
-			$rows[$k]['message_id']		= $v['properties']['message_id'];
+			$rows[$k]['mid']	= $v['properties']['message_id'];
+			$rows[$k]['body']	= $v['payload'];
+			$rows[$k]['before']	= $v['message_count'];
+
 		}
 		unset($result);
 
@@ -403,131 +307,139 @@ class MessageForm extends BaseForm
 	}
 
 	/**
-	 * [空处理]
+	 * consumeMessage
+	 * @author: libingke
+	 * @return array
+	 * @throws Exception
 	 */
-	private function _consumeEmpty($mid, $body, $ack, $error, $stop = false)
+	public function consumeMessage()
 	{
-		$this->_stop = $stop;
-		$this->_result[] = [
-			'mid' => $mid,
-			'result' => 'success: do nothing!',
-			'ack' => $ack,
-			'error' => $error
-		];
+		$q_name = $this->name;
+
+		$connect = $this->getConnect();
+		$channel = $connect->channel();
+		$channel->queue_declare($q_name,
+			false, true, false, false);
+		list(, $total, ) = $channel->queue_declare($q_name,
+			false, true, false, false);
+
+		if ($total == 0)
+			throw new Exception(1300);
+
+		try {
+			$min = min($this->count, $total);
+
+			$callback = function ($msg) use($q_name) {call_user_func_array([$this, 'closureConsume'], [$msg, $q_name]);};
+			$channel->basic_qos(0, $min, null);
+			$channel->basic_consume($q_name,
+				'', false, false, false, false, $callback);
+			for ($i = 1; count($channel->callbacks) && $this->_stop !== true; $i++) {
+				if ($i > $min)
+					break;
+
+				$channel->wait();
+			}
+
+			$channel->close();
+			$connect->close();
+
+		} catch (\Exception $e) {
+			throw new Exception(1001, $e->getMessage());
+		}
+
+		return $this->_result;
 	}
 
 	/**
-	 * [处理逻辑1]
-	 * @author: libingke
+	 * closureConsume for consumeMessage
+	 * @param $msg
+	 * @param $queue
 	 */
-	private function _consumeLogin($mid, $body, $ack, $error, $stop = false)
+	protected function closureConsume($msg, $queue)
 	{
-		$this->_stop = $stop;
-		$this->_result[] = [
-			'mid' => $mid,
-			'result' => '已处理',
-			'ack' => $ack,
-			'error' => $error
-		];
+		$data = ['mid' => '', 'body' => '',	'error' => ''];
+		try {
+			$data['mid'] = $msg->get('message_id');
+			$data['body'] = $msg->body;
+			Redis::set($queue, $data['mid'], 'status', AmqpConfig::STATUS_HAND);
+
+		} catch (\Exception $e) {
+			$data['error'] = $e->getMessage();
+
+		} finally {
+			$this->_result[] = $data;
+		}
 	}
 
 	/**
-	 * [消费某条消息]
+	 * ackMessage
 	 * @author: libingke
-	 * @param $mid
 	 * @return array
 	 * @throws Exception
 	 */
-	protected function consumeByMid($mid)
+	public function ackMessage()
 	{
-		$q_name = $this->name;
-
-		//选择执行回调
-		if ($this->do_nothing != true) {
-			$function = '_consume' . ucfirst($q_name);
-			if (!method_exists($this, $function) || !is_callable(array($this, $function)))
-				throw new Exception(2015);
-		} else {
-			//if do nothing handle empty
-			$function = '_consumeEmpty';
-		}
+		//帅选合法待应答消息id
+		foreach ($this->mids as $mid)
+			if ($mid && is_string($mid))
+				$this->_rows[$mid] = $mid;
+		if (!is_array($this->_rows) || count($this->_rows) == 0)
+			throw new Exception(1301);
+		$ack = $this->_rows;
 
+		$q_name = $this->name;
 		$connect = $this->getConnect();
 		$channel = $connect->channel();
 		$channel->queue_declare($q_name,
 			false, true, false, false);
-		list(, $count, ) = $channel->queue_declare($q_name,
+		list(, $total, ) = $channel->queue_declare($q_name,
 			false, true, false, false);
-		$callback = function ($msg) use($function, $mid, $q_name) {
-			try {
-				$message_id = $msg->get('message_id');
-				$ack = $this->ack == true ? true : false;//是否应答
-				if ($mid == $message_id) {
-					call_user_func_array(
-						[$this, $function],
-						[$message_id, $msg->body, $ack, '',	true]
-					);
-
-					$statusKey = KeyHelper::getMessageStatusKey($message_id, $q_name);
-					if ($ack === true) {
-						$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
-						Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_HAND_OK);
-					} else {
-						Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_NO_ACK);
-					}
-				}
-			} catch (\Exception $e) {
-				//$e->getMessage();
-			}
-		};
 
 		try {
-			$channel->basic_qos(0, $count, null);
+			$callback = function ($msg) use($q_name) {
+				call_user_func_array([$this, 'closureAck'], [$msg, $q_name, AmqpConfig::STATUS_HAND_OK]);
+			};
+			$channel->basic_qos(0, $total, null);
 			$channel->basic_consume($q_name,
 				'', false, false, false, false, $callback);
-
-			$i = 0;
-			while (count($channel->callbacks)) {
-				$i ++;
-				if ($i > $count || $this->_stop == true)
+			for ($i = 1; count($channel->callbacks) && $this->_stop !== true; $i++) {
+				if ($i > $total)
 					break;
 
 				$channel->wait();
 			}
 
+			$channel->close();
+			$connect->close();
+
 		} catch (\Exception $e) {
-			throw new Exception(2104, $e->getMessage());
+			throw new Exception(1101, $e->getMessage());
 		}
 
-		$channel->close();
-		$connect->close();
-
-		if ($this->_result == null)
-			throw new Exception(2102);
-
-		return $this->_result;
+		$ackCount = count($ack) - count($this->_rows);
+		$data = ['queue' => $q_name, 'ack_count' => $ackCount];
+		count($this->_rows) ? $data['ack_fail'] = array_keys($this->_rows) : null;
+		return $data;
 	}
 
 	/**
-	 * [根据数量消费]
+	 * deleteMessage
 	 * @author: libingke
-	 * @param string | int $startPos 开始位置
-	 * @param int $count 数量
+	 * @return array
+	 * @throws Exception
 	 */
-	protected function consumeByCount($count)
+	public function deleteMessage()
 	{
-		$q_name = $this->name;
-
-		//选择执行回调
-		if ($this->do_nothing != true) {
-			$function = '_consume' . ucfirst($q_name);
-			if (!method_exists($this, $function) || !is_callable(array($this, $function)))
-				throw new Exception(2015);
-		} else {
-			//if do nothing handle empty
-			$function = '_consumeEmpty';
-		}
+		//帅选合法待应答消息id
+		foreach ($this->mids as $mid)
+			if ($mid && is_string($mid))
+				$this->_rows[$mid] = $mid;
+		if (!is_array($this->_rows) || count($this->_rows) == 0)
+			throw new Exception(1301);
+		$delete = $this->_rows;
+		Redis::batchDel($this->name, $delete, 'status');
 
+		$q_name = $this->name;
 		$connect = $this->getConnect();
 		$channel = $connect->channel();
 		$channel->queue_declare($q_name,
@@ -535,82 +447,53 @@ class MessageForm extends BaseForm
 		list(, $total, ) = $channel->queue_declare($q_name,
 			false, true, false, false);
 
-		$callback = function ($msg) use($function, $q_name) {
-			$ack = $this->ack == true ? true : false;//是否应答
-			try {
-				$message_id = $msg->get('message_id');
-				call_user_func_array(
-					[$this, $function],
-					[$message_id, $msg->body, $ack,	'', false]
-				);
-
-				//更新状态
-				$statusKey = KeyHelper::getMessageStatusKey($message_id, $q_name);
-				if ($ack === true) {
-					$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
-					Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_HAND_OK);
-				} else {
-					Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_NO_ACK);
-				}
-
-			} catch (\Exception $e) {
-				//消息体出错机制
-				call_user_func_array(
-					[$this, $function],
-					['', $msg->body, $ack, $e->getMessage(), false]
-				);
-			}
-		};
-
 		try {
-			$min = min($count, $total);
-			$channel->basic_qos(0, $min, null);
+			$callback = function ($msg) use($q_name) {call_user_func_array([$this, 'closureAck'], [$msg]);};
+			$channel->basic_qos(0, $total, null);
 			$channel->basic_consume($q_name,
 				'', false, false, false, false, $callback);
-
-			$i = 0;
-			while (count($channel->callbacks)) {
-				$i ++;
-				if ($i > $min || $this->_stop == true)
+			for ($i = 1; count($channel->callbacks) && $this->_stop !== true; $i++) {
+				if ($i > $total)
 					break;
 
 				$channel->wait();
 			}
 
+			$channel->close();
+			$connect->close();
+
 		} catch (\Exception $e) {
-			throw new Exception(2104, $e->getMessage());
+			throw new Exception(1101, $e->getMessage());
 		}
 
-		$channel->close();
-		$connect->close();
-		return $this->_result;
+		$deleteCount = count($delete) - count($this->_rows);
+		$data = ['queue' => $q_name, 'delete_count' => $deleteCount];
+		count($this->_rows) ? $data['delete_fail'] = array_keys($this->_rows) : null;
+		return $data;
 	}
 
 	/**
-	 * [消费消息]
+	 * closureAck for ackMessage && deleteMessage
 	 * @author: libingke
+	 * @param $msg
+	 * @param $queue
+	 * @param bool $status
 	 */
-	public function consumeMessage()
+	protected function closureAck($msg, $queue = '', $status = false)
 	{
-		switch ($this->type)
-		{
-			case static::TYPE_MID:
-				$data = $this->consumeByMid($this->mid);
-				break;
-
-			case static::TYPE_COUNT:
-				$data = $this->consumeByCount($this->count);
-				break;
-
-			case static::TYPE_MC:
-				throw new Exception(1000, '未开发');
-				break;
-
-			default:
-				return "It's not possible to get there.";
-		}
+		try {
+			$mid = $msg->get('message_id');
+			if (in_array($mid, $this->_rows)) {
+				unset($this->_rows[$mid]);
+				$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
+				if ($status != false && $queue != '')
+					Redis::set($queue, $mid, 'status', $status);
+			}
 
-		return $data;
+		} finally {
+			if (count($this->_rows) == 0)
+				$this->_stop = true;
+		}
 	}
 
 	/**
@@ -620,13 +503,23 @@ class MessageForm extends BaseForm
 	 */
 	public function purge()
 	{
-		$data = [];
-		$connect  = Yii::$app->Amqp->AMQPConnection();
-		$channel  = new \AMQPChannel($connect);
-		$queue = new \AMQPQueue($channel);
-		$queue->setName($this->name);
-		$queue->purge();
-		return $data;
-	}
+		try {
+			$connect = $this->getConnect();
+			$channel = $connect->channel();
+			$delete = $channel->queue_purge($this->name);
+			if ($delete > 0)
+				Redis::purge($this->name);
 
+			$channel->close();
+			$connect->close();
+
+			return [
+				'queue' => $this->name,
+				'count' => $delete
+			];
+
+		} catch (\Exception $e) {
+			throw new Exception(1001, $e->getMessage());
+		}
+	}
 }

+ 32 - 106
backend/forms/QueueForm.php

@@ -2,8 +2,6 @@
 
 namespace backend\forms;
 
-use backend\models\Queue;
-use common\helpers\KeyHelper;
 use components\Curl;
 use components\Exception;
 use Yii;
@@ -14,33 +12,18 @@ use Yii;
  */
 class QueueForm extends BaseForm
 {
-	public $sign;
-
-	public $queue;
-
-	public $remark = '';
-
 	/**
-	 * @var array
+	 * @var string 名称
 	 */
-	public $config = [];
-
-	public $qid;
-
-	public $force;//强制删除:用于删除库里没有的队列
+	public $name;
 
 	public function rules()
 	{
 		return [
-			//create_queue
-			[['config', 'remark'], 'safe', 'on' => 'create_queue'],
-			[['sign', 'queue'], 'required', 'on' => 'create_queue'],
-			['sign', 'filter', 'filter' => 'trim', 'on' => 'create_queue'],
-			['sign', 'string', 'on' => 'create_queue'],
-			['remark', 'default', 'value' => 'sys', 'on' => 'create_queue'],
-
-			//delete_queue
-			[['qid'], 'required', 'on' => 'delete_queue'],
+			//create_queue  delete_queue
+			[['name'], 'required', 'message' => 1100, 'on' => ['create_queue', 'delete_queue']],
+			['name', 'string', 'message' => 1101, 'on' => ['create_queue', 'delete_queue']],
+			['name', 'filter', 'filter' => 'trim', 'on' => ['create_queue', 'delete_queue']],
 		];
 	}
 
@@ -53,46 +36,16 @@ class QueueForm extends BaseForm
 	public function createQueue()
 	{
 		try {
-			$connect = new \PhpAmqpLib\Connection\AMQPStreamConnection(
-				Yii::$app->Amqp->host,
-				Yii::$app->Amqp->port,
-				Yii::$app->Amqp->user,
-				Yii::$app->Amqp->pass,
-				Yii::$app->Amqp->vhost
-			);
+			$connect = $this->getConnect();
 			$channel = $connect->channel();
-			list($queue,,) = $channel->queue_declare($this->queue,
-				false, true, false, false);
-
-		} catch (\PhpAmqpLib\Exception\AMQPProtocolChannelException $e) {
-			throw new Exception(2000, $e->getMessage());
-		}
+			list($queue,,) = $channel->queue_declare($this->name,
+				false, true, false, false, false);
 
-		$data = ['queue' => $queue];
-		try {
-			$one = Queue::findOne(['sign' => $this->sign]);
-			if ($one)
-				throw new Exception(2008);
-
-			//save
-			$one = new Queue();
-			$one->qid = KeyHelper::getUniqueId('queue-add');
-			$one->sign = $this->sign;
-			$one->queue = $this->queue;
-			$one->status = Queue::STATUS_YES;
-			$one->remark = $this->remark;
-			$one->config = serialize($this->config);
-			if ($one->save()) {
-				$data['qid'] = $one->qid;
-				$data['sign'] = $one->sign;
-				ksort($data);
-			}
-
-		} catch (\yii\db\Exception $e) {
-			//todo db没有记录成功
+		} catch (\Exception $e) {
+			throw new Exception(1001, $e->getMessage());
 		}
 
-		return $data;
+		return $data = ['name' => $queue];
 	}
 
 	/**
@@ -103,47 +56,21 @@ class QueueForm extends BaseForm
 	 */
 	public function deleteQueue()
 	{
-		$cb = function($name) {
-			try {
-				$connect = new \PhpAmqpLib\Connection\AMQPStreamConnection(
-					Yii::$app->Amqp->host,
-					Yii::$app->Amqp->port,
-					Yii::$app->Amqp->user,
-					Yii::$app->Amqp->pass,
-					Yii::$app->Amqp->vhost
-				);
-				$channel = $connect->channel();
-				$message = $channel->queue_delete($name,
-					false, false, false, false
-				);
-
-			} catch (\PhpAmqpLib\Exception\AMQPProtocolChannelException $e) {
-				$message = $e->getMessage();
-			}
-			return $message == 0 ? '' : $message;
-		};
-
-		$data = ['delete_count' => 0, 'delete_message' => ''];
 		try {
-			$one = Queue::findOne(['qid' => $this->qid, 'status' => Queue::STATUS_YES]);
-
-			if ($one) {
-				$one->status = Queue::STATUS_NO;
-				if (($c = $cb($one->queue)) === '') {
-					$one->update();
-					$data['delete_count'] ++;
-				}
-
-				$data['delete_message'] = $c;
-			} else {
-				throw new Exception(2009);
-			}
-
-		} catch (\yii\db\Exception $e) {
-			$data['delete_message'] = $e->getMessage();
-		}
+			$connect = $this->getConnect();
+			$channel = $connect->channel();
+			$channel->queue_delete($this->name,
+				false, false, false, false
+			);
 
-		return $data;
+			return [
+				'name' => $this->name,
+				'result' => '删除成功'
+			];
+
+		} catch (\Exception $e) {
+			throw new Exception(1001, $e->getMessage());
+		}
 	}
 
 	/**
@@ -152,21 +79,20 @@ class QueueForm extends BaseForm
 	 */
 	public function getQueueList()
 	{
-		$badCode = 2000;
-		$authStr = Yii::$app->Amqp->getConfig('user') . ':' . Yii::$app->Amqp->getConfig('pass');
-		/* URL */
-		$vhost = urlencode(Yii::$app->Amqp->getConfig('vhost'));
-		$url = Yii::$app->Amqp->getConfig('host') . ':' . Yii::$app->Amqp->getConfig('api_port') . "/api/queues";
+		$authStr = Yii::$app->Amqp->user . ':' . Yii::$app->Amqp->pass;
+		$url = Yii::$app->Amqp->host . ':' . Yii::$app->Amqp->api_port . "/api/queues";
 
 		$curl = new Curl();
 		$curl->setOption(CURLOPT_USERPWD, $authStr);
 		$result = json_decode($curl->get($url), true);
 		if ($curl->responseCode != 200)
-			throw new Exception($badCode);
+			throw new Exception(1002);
+
 		if ($curl->errorText)
-			throw new Exception($badCode, $curl->errorText);
+			throw new Exception(1002, $curl->errorText);
+
 		if (isset($result['error']) && is_string($result['error']))
-			throw new Exception($badCode, $result['error']);
+			throw new Exception(1002, $result['error']);
 
 		$rows = [];
 		foreach ($result as $k => $v) {

+ 1 - 1
common/config/main.php

@@ -25,7 +25,7 @@ return [
             'class' => 'yii\redis\Connection',
             'hostname' => '127.0.0.1',
             'port' => 6378,
-            'database' => 1,
+            'database' => 0,
             'password' => 'redis12078',
         ],
 

+ 27 - 26
common/messages/zh-CN/errorCode.php

@@ -75,35 +75,36 @@ return [
 	'598' => 'Network read timeout error',
 	'599' => 'Network connect timeout error',
 
+	/**
+	 * 队列模块
+	 */
 
+	/* common 1000~1099 */
 	'1000' => 'Exception',
-	'1001' => '非POST提交',
-	'1002' => '非GET提交',
+	'1001' => 'rabbitMQ 错误',
+	'1002' => 'rabbitMQ API 错误',
 	'1003' => '表单验证失败',
+	'1004' => '未查询到结果',
+
+	//queue 1100~1199
+	'1100' => '队列名称不能为空',
+	'1101' => '队列名称必须是有效的字符串',
+
+	//message 1200~1299
+	'1201' => '消息主体不能为空',
+	'1202' => '消息格式错误',
+	'1203' => '消息id不能为空',
+	'1204' => '消息id必须是有效的字符串',
+	'1205' => '获取数量必须是整数',
+	'1206' => '获取数量太小,必须大于0',
+	'1207' => '获取数量太大,必须小于1000 [或联系开发解除限制]',
+	'1208' => '未找到该队列对应的消费回调',
+	'1209' => 'requeue只能是布尔值',
+	'1210' => 'encoding格式错误',
 
-	//message
-	'2000' => 'rabbitMQ连接失败',
-	'2001' => '消息主体不能为空',
-	'2002' => '消息格式错误',
-	'2003' => '队列名称不能为空',
-	'2004' => '队列名称格式错误',
-	'2005' => 'data不能为空',
-	'2006' => 'data格式错误,必须是合法json',
-	'2007' => 'data格式错误,必须包含 \'body\',\'infos\'',
-	'2008' => '队列标识sign已存在',
-	'2009' => '队列id不存在或已删除',
-	'2010' => 'requeue只能是布尔值',
-	'2011' => 'encoding格式错误',
-	'2012' => '获取数量必须是整数',
-	'2013' => '获取数量太小,必须大于0',
-	'2014' => '获取数量太大,必须小于1000 [或联系开发解除限制]',
-	'2015' => '未找到该队列对应的消费回调',
-	'2016' => '消息id不能为空',
-	'2017' => '数量不能为空',
+	//consume 1300~1399
+	'1300' => '队列中没有消息',
+	'1301' => '队列中没有可删除的消息',
 
-	'2100' => '队列创建失败',
-	'2101' => '消息处理中',
-	'2102' => '未知消息',
-	'2103' => '消息发送至队列过程中失败',
-	'2104' => '队列不存在',
+	//ack 1400~1499
 ];

+ 6 - 12
components/service/AmqpConfig.php

@@ -17,23 +17,17 @@ class AmqpConfig
 	public $api_port = '15672';
 
 	/* status */
-	const STATUS_SEND_FAIL	= -1;//发送失败
 	const STATUS_SEND_OK	= 1;//已发送,待处理
-	const STATUS_WAIT		= 2;//等待中
-	const STATUS_HAND		= 3;//处理中
-	const STATUS_NO_ACK		= 4;//已处理,未应答
-	const STATUS_HAND_OK	= 99;//处理成功
-	const STATUS_HAND_FAIL	= -99;//处理失败
+	const STATUS_HAND		= 2;//处理中
+	const STATUS_HAND_OK	= 10;//处理成功
+	const STATUS_HAND_FAIL	= 11;//处理失败
 
 	public static $statusMark = [
-		-1	=> '发送失败',
 		0	=> '未知状态',
 		1	=> '已发送,待处理',
-		2	=> '等待中',
-		3	=> '处理中',
-		4	=> '已处理,未应答',
-		99	=> '处理成功',
-		-99 => '处理失败',
+		2	=> '处理中',
+		10	=> '处理成功',
+		11	=> '处理失败',
 	];
 
 	public static function getMarkById($id)

+ 130 - 0
components/service/Redis.php

@@ -0,0 +1,130 @@
+<?php
+
+namespace components\service;
+
+use Yii;
+
+/**
+ * redis(方便统计队列)
+ * Class Redis
+ * @package components\service
+ */
+class Redis
+{
+	const KEYS_STATUS = 'statistic:status';
+
+	const TYPE_STATUS = 'status';
+
+	const TYPE_RESULT = 'result';
+
+
+	/**
+	 * [设置缓存]
+	 * @author: libingke
+	 * @param $queue
+	 * @param $mid
+	 * @param $type
+	 * @param $val
+	 * @return mixed
+	 */
+	public static function set($queue, $mid, $type, $val)
+	{
+		$key = static::getKey($queue, $mid, $type);
+		if ($type == static::TYPE_STATUS)
+			Yii::$app->redis->hmset(static::KEYS_STATUS, $mid, $val);//统计状态
+
+		return Yii::$app->redis->set($key, $val);
+	}
+
+	/**
+	 * [获取状态keys]
+	 * @author: libingke
+	 * @return mixed
+	 */
+	public static function getStatusList()
+	{
+		return Yii::$app->redis->hgetall(static::KEYS_STATUS);
+	}
+
+	/**
+	 * [获取缓存]
+	 * @author: libingke
+	 * @param $queue
+	 * @param $mid
+	 * @param $type
+	 * @return mixed
+	 */
+	public static function get($queue, $mid, $type)
+	{
+		$key = static::getKey($queue, $mid, $type);
+		return Yii::$app->redis->get($key);
+	}
+
+	/**
+	 * [删除缓存]
+	 * @author: libingke
+	 * @param $queue
+	 * @param $mid
+	 * @param $type
+	 * @return mixed
+	 */
+	public static function del($queue, $mid, $type)
+	{
+		$key = static::getKey($queue, $mid, $type);
+		return Yii::$app->redis->del($key);
+	}
+
+	/**
+	 * [失效设置]
+	 * @author: libingke
+	 * @param $queue
+	 * @param $mid
+	 * @param $type
+	 * @param int $expiration
+	 * @return mixed
+	 */
+	public static function expire($queue, $mid, $type, $expiration = 86400 * 7)
+	{
+		$key = static::getKey($queue, $mid, $type);
+		return Yii::$app->redis->expire($key, $expiration);
+	}
+
+	/**
+	 * [获取key]
+	 * @author: libingke
+	 * @param $queue
+	 * @param $mid
+	 * @param $type
+	 * @return string
+	 * @throws \Exception
+	 */
+	public static function getKey($queue, $mid, $type)
+	{
+		if ( $queue && $mid && in_array($type, [static::TYPE_STATUS, static::TYPE_RESULT]) )
+			return "queue:$queue:$mid:$type";
+
+		throw new \Exception("参数设置错误");
+	}
+
+	/**
+	 * purge 清空消息
+	 * @author: libingke
+	 * @param $queue
+	 */
+	public static function purge($queue)
+	{
+
+	}
+
+	/**
+	 * 批量删除 key
+	 * @author: libingke
+	 * @param $queue
+	 * @param array $delete
+	 * @param $type
+	 */
+	public static function batchDel($queue, $delete, $type)
+	{
+
+	}
+}