Browse Source

登录队列相关代码
@libingke
@2018/01/31

lbk 6 years ago
parent
commit
517a19d8bd

+ 2 - 0
backend/controllers/BaseController.php

@@ -24,4 +24,6 @@ class BaseController extends Controller
 		}*/
 		return true;
 	}
+
+
 }

+ 62 - 49
backend/controllers/MessageController.php

@@ -1,6 +1,8 @@
 <?php
 namespace backend\controllers;
 
+use backend\forms\MessageForm;
+use common\logic\Amqp\Cache;
 use components\Exception;
 use common\logic\Amqp\Message;
 use common\logic\Amqp\Queue;
@@ -8,69 +10,80 @@ use Yii;
 
 class MessageController extends BaseController
 {
-	public function batch_basic_publish(
-		$msg,
-		$exchange = '',
-		$routing_key = '',
-		$mandatory = false,
-		$immediate = false,
-		$ticket = null
-	) {
-		var_dump($exchange);
-		$a[] = func_get_args();
-		var_dump($a);die;
-	}
-
 	/**
-	 * 发送消息 (接受多条)
+	 * [查询回执信息]
 	 * @author: libingke
-	 * @return array
-	 * @throws Exception
 	 */
-	public function actionSend()
+	public function actionQueryReceipt($query)
 	{
-		//if (!Yii::$app->request->isPost)
-		//	throw new Exception('1001');
-
-		$params = Yii::$app->request->post();
-		//test data todo delete
-		$params = [
-			'body' => json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn']),
-			'queue' => 'y1',
-		];
-
-		if (!isset($params['body']))
-			throw new Exception('2001');
+		if (is_string($query)) {
+			$status = Cache::getData($query);
 
-		if (!is_string($params['body']))
-			throw new Exception('2002');
+			if ($status !== false) {
+				$mark = Cache::getMarkById($status);//状态码 => 提示信息
 
-		if (!isset($params['queue']))
-			throw new Exception('2003');
+				if (Cache::STATUS_HAND_OK == $status) {
+					return Cache::getData('result_' . $query);//直接返回数据
 
-		if (!is_string($params['queue']))
-			throw new Exception('2004');
-
-		$body	= $params['body'];
-		$queue	= $params['queue'];
+				} else {
+					throw new Exception(2101, $mark);//非正常时回执
+				}
 
-		try {
-			$queueModel = (new Queue())->create($queue);
-			if ($queueModel['status'] == 1) {
-				$message = new Message($queue);
-				$message->send($body, $queue);
-				$code = 200;
 			} else {
-				$code = 2100;
+				throw new Exception(2102);//无效
 			}
 
-		} catch (\common\logic\Amqp\Exception $e) {
-			throw new Exception($e->getCode(), $e->getMessage());
+		} else {
+			throw new Exception(2102);//无效
+		}
+	}
+
+	/**
+	 * 发送消息
+	 * @author: libingke
+	 * @return array
+	 * @throws Exception
+	 */
+	public function actionSend()
+	{
+		/* 选择对应表单 start */
+		$form = Yii::$app->request->get('form');
+		switch ($form)
+		{
+			//发送登录消息
+			case 'licai_login':
+				$scenario = 'login';
+				break;
+
+			case '':
+				throw new Exception(2201);
+				break;
+
+			default:
+				throw new Exception(2202);
+		}
+		/* 选择对应表单 end */
+
+		if (!Yii::$app->request->isPost)
+			throw new Exception('1001');
+
+		//验证
+		$model = new MessageForm();
+		$model->setScenario($scenario);
+		$model->load(['MessageForm' => Yii::$app->request->post()]);
+
+		$data = [];
+		if ($model->validate()) {
+			$data['queueName'] = MessageForm::getQueueName($scenario);
+			$data['requestId'] = $model->sendMessage();	//send
+		} else {
+			$model->handleError();//处理验证失败
 		}
 
 		return [
-			'code' => $code,
-			'message' => Yii::t('error', $code)
+			'code' => 200,
+			'message' => 'OK',
+			'data' => $data
 		];
 	}
 

+ 18 - 0
backend/forms/BaseForm.php

@@ -0,0 +1,18 @@
+<?php
+namespace backend\forms;
+
+use yii\base\Model;
+use components\Exception;
+
+/**
+ * Class BaseForm
+ * @package backend\forms
+ */
+class BaseForm extends Model
+{
+	public function handleError()
+	{
+		$errors = $this->getFirstErrors();
+		throw new Exception(1003, reset($errors));
+	}
+}

+ 83 - 0
backend/forms/MessageForm.php

@@ -0,0 +1,83 @@
+<?php
+
+namespace backend\forms;
+
+use common\logic\Amqp\Cache;
+use common\logic\Amqp\Message;
+use components\Exception;
+
+class MessageForm extends BaseForm
+{
+	public $email;
+
+	public $password;
+
+	public $type;
+
+	/**
+	 * 登录方式
+	 */
+	const TYPE_SMS	 = 'sms';
+
+	const TYPE_EMAIL = 'email';
+
+	/**
+	 * 绑定队列名称
+	 * @var array
+	 */
+	public static $queueMap = [
+		'login'	=> 'login'
+	];
+
+	public function rules()
+	{
+		return [
+			[['email', 'password', 'type'], 'safe', 'on' => 'login'],
+			[['email', 'password', 'type'], 'trim', 'on' => 'login'],
+			[['email', 'password'], 'required', 'on' => 'login'],
+			['email', 'email', 'on' => 'login'],
+			['type', 'default', 'value' => static::TYPE_EMAIL, 'on' => 'login'],
+			['type', 'in', 'range' => [static::TYPE_EMAIL, static::TYPE_SMS], 'on' => 'login']
+		];
+	}
+
+	public static function getQueueName($scenario)
+	{
+		if (isset(self::$queueMap[$scenario])) {
+			return self::$queueMap[$scenario];
+		}
+
+		return '';
+	}
+
+	/**
+	 * [发送消息]
+	 * @author: libingke
+	 * @return array
+	 * @throws Exception
+	 */
+	public function sendMessage()
+	{
+		$body = json_encode([
+			'email'		=> $this->email,
+			'password'	=> $this->password,
+			'type'		=> $this->type
+		]);
+		$queue = MessageForm::getQueueName($this->scenario);
+
+		$data = [];
+		try {
+			$message = new Message($queue);
+			$corrId = $message->send($body, $queue);
+
+			Cache::setData($corrId, Cache::STATUS_SEND_OK);	//set: status = send ok
+
+			$data[] = $corrId;
+
+		} catch (\common\logic\Amqp\Exception $e) {
+			throw new Exception($e->getCode(), $e->getMessage());
+		}
+
+		return $data;
+	}
+}

+ 1 - 0
backend/forms/READ.md

@@ -0,0 +1 @@
+表单验证

+ 2 - 2
common/config/main.php

@@ -12,7 +12,7 @@ return [
             'class' => 'yii\caching\FileCache',
         ],
 
-        'redis' => [
+        /*'redis' => [
             'class' => 'yii\redis\Connection',
             'hostname' => 'localhost',
             'port' => 6378,
@@ -20,7 +20,7 @@ return [
             'password' => 'airent-redis~123',
 
 
-        ],
+        ],*/
 
 		'i18n' => [
 			'translations' => [

+ 92 - 0
common/logic/Amqp/Cache.php

@@ -0,0 +1,92 @@
+<?php
+namespace common\logic\Amqp;
+
+use Yii;
+
+/**
+ * 缓存设置
+ * Class Cache
+ * @package common\logic\Amqp
+ */
+class Cache
+{
+	const CACHE_HEADER	= 'amqp_v1_msg_';
+
+	const STATUS_SEND_FAIL	= -1;//发送失败
+	const STATUS_SEND_OK	= 1;//已发送,待处理
+	const STATUS_WAIT		= 2;//等待中
+	const STATUS_HAND		= 3;//处理中
+	const STATUS_HAND_OK	= 99;//处理成功
+	const STATUS_HAND_FAIL	= -99;//处理失败
+
+	public static $statusMark = [
+		-1	=> '发送失败',
+		0	=> '未知状态',
+		1	=> '已发送,待处理',
+		2	=> '等待中',
+		3	=> '处理中',
+		99	=> '处理成功',
+		-99 => '处理失败',
+	];
+
+	/**
+	 * [设置缓存状态]
+	 * @author: libingke
+	 * @param $key
+	 * @param $value
+	 * @param bool $time
+	 * @return bool
+	 */
+	public static function setData($key, $value, $time = false)
+	{
+		$cache = Yii::$app->redis;
+		if ($key) {
+			if (is_numeric($time)) {
+				return $cache->set(static::CACHE_HEADER . $key, $value, $time);
+			}
+
+			return $cache->set(static::CACHE_HEADER . $key, $value);
+		}
+
+		return false;
+	}
+
+	/**
+	 * [获取缓存状态]
+	 * @author: libingke
+	 * @param $key
+	 * @return bool
+	 */
+	public static function getData($key)
+	{
+		$cache = Yii::$app->redis;
+		return $cache->get(static::CACHE_HEADER . $key);
+	}
+
+	/**
+	 * [删除缓存状态]
+	 * @author: libingke
+	 * @param $key
+	 * @return mixed
+	 */
+	public static function deleteData($key)
+	{
+		$cache = Yii::$app->redis;
+		return $cache->delete(static::CACHE_HEADER . $key);
+	}
+
+	/**
+	 * [查询状态]
+	 * @author: libingke
+	 * @param $id
+	 * @return mixed|null
+	 */
+	public static function getMarkById($id)
+	{
+		if (isset(self::$statusMark[$id]) && is_numeric($id)) {
+			return self::$statusMark[$id];
+		}
+
+		return self::$statusMark[0];
+	}
+}

+ 7 - 4
common/logic/Amqp/Message.php

@@ -3,6 +3,7 @@
 namespace common\logic\Amqp;
 
 use PhpAmqpLib\Message\AMQPMessage;
+use Yii;
 
 /**
  * 处理消息体逻辑
@@ -22,7 +23,9 @@ class Message extends Connect
 
 	private $_response;
 
-	public function __construct($queueName)
+	private $_corr_id;
+
+	public function __construct($queueName = '')
 	{
 		$this->_connect = self::connect();
 
@@ -40,9 +43,10 @@ class Message extends Connect
 	 */
 	public function send($body, $routing_key)
 	{
+		$this->_corr_id = uniqid();
 		$properties = [
 			'content_type' => 'text/plain',
-			'correlation_id' => uniqid(),
+			'correlation_id' => $this->_corr_id,
 			'reply_to' => $this->_callback_queue
 		];
 
@@ -51,10 +55,9 @@ class Message extends Connect
 			$this->_channel->basic_publish($msg, '', $routing_key);
 		}
 
-		return true;
+		return $this->_corr_id;
 	}
 
-
 	/**
 	 * [批量发送信息]
 	 * @author: libingke

+ 7 - 0
common/messages/zh-CN/errorCode.php

@@ -77,6 +77,8 @@ return [
 
 
 	'1001' => '非POST提交',
+	'1002' => '非GET提交',
+	'1003' => '表单验证失败',
 
 	//message
 
@@ -89,4 +91,9 @@ return [
 	'2007' => 'data格式错误,必须包含 \'body\',\'infos\'',
 
 	'2100' => '队列创建失败',
+	'2101' => '消息处理中',
+	'2102' => '未知消息',
+
+	'2201' => 'form不能为空',
+	'2202' => '不支持的请求类型',
 ];

+ 618 - 0
components/Curl.php

@@ -0,0 +1,618 @@
+<?php
+
+namespace components;
+
+use Yii;
+use yii\base\Exception;
+use yii\helpers\Json;
+
+class Curl
+{
+	/**
+	 * @var string
+	 * * Holds response data right after sending a request.
+	 */
+	public $httpType = 'http';
+
+	/**
+	 * @var string|boolean
+	 * Holds response data right after sending a request.
+	 */
+	public $response = null;
+	/**
+	 * @var null|integer
+	 * Error code holder: https://curl.haxx.se/libcurl/c/libcurl-errors.html
+	 */
+	public $errorCode = null;
+	/**
+	 * @var null|string
+	 * Error text holder: http://php.net/manual/en/function.curl-strerror.php
+	 */
+	public $errorText = null;
+	/**
+	 * @var integer HTTP-Status Code
+	 * This value will hold HTTP-Status Code. False if request was not successful.
+	 */
+	public $responseCode = null;
+	/**
+	 * @var string|null HTTP Response Charset
+	 * (taken from Content-type header)
+	 */
+	public $responseCharset = null;
+	/**
+	 * @var int HTTP Response Length
+	 * (taken from Content-length header, or strlen() of downloaded content)
+	 */
+	public $responseLength = -1;
+	/**
+	 * @var string|null HTTP Response Content Type
+	 * (taken from Content-type header)
+	 */
+	public $responseType = null;
+	/**
+	 * @var array|null HTTP Response headers
+	 * Lists response header in an array if CURLOPT_HEADER is set to true.
+	 */
+	public $responseHeaders = null;
+	/**
+	 * @var array HTTP-Status Code
+	 * Custom options holder
+	 */
+	protected $_options = [];
+	/**
+	 * @var array
+	 * Hold array of get params to send with the request
+	 */
+	protected $_getParams = [];
+	/**
+	 * @var array
+	 * Hold array of post params to send with the request
+	 */
+	protected $_postParams = [];
+	/**
+	 * @var resource|null
+	 * Holds cURL-Handler
+	 */
+	public $curl = null;
+	/**
+	 * @var string
+	 * hold base URL
+	 */
+	protected $_baseUrl = '';
+	/**
+	 * @var array default curl options
+	 * Default curl options
+	 */
+	protected $_defaultOptions = [
+
+		CURLOPT_USERAGENT      => 'Yii2-Curl-Agent',
+		CURLOPT_TIMEOUT        => 10,
+		CURLOPT_CONNECTTIMEOUT => 10,
+		CURLOPT_RETURNTRANSFER => true,
+		CURLOPT_HEADER         => true,
+
+		CURLOPT_SSL_VERIFYPEER => false,
+		CURLOPT_SSL_VERIFYHOST => 2,
+	];
+	// ############################################### class methods // ##############################################
+	/**
+	 * Start performing GET-HTTP-Request
+	 *
+	 * @param string  $url
+	 * @param boolean $raw if response body contains JSON and should be decoded
+	 *
+	 * @return mixed response
+	 */
+	public function get($url, $raw = true)
+	{
+		$this->_baseUrl = $url;
+		return $this->_httpRequest('GET', $raw);
+	}
+	/**
+	 * Start performing HEAD-HTTP-Request
+	 *
+	 * @param string $url
+	 *
+	 * @return mixed response
+	 */
+	public function head($url)
+	{
+		$this->_baseUrl = $url;
+		return $this->_httpRequest('HEAD');
+	}
+	/**
+	 * Start performing POST-HTTP-Request
+	 *
+	 * @param string  $url
+	 * @param boolean $raw if response body contains JSON and should be decoded
+	 *
+	 * @return mixed response
+	 */
+	public function post($url, $raw = true)
+	{
+		$this->_baseUrl = $url;
+		return $this->_httpRequest('POST', $raw);
+	}
+	/**
+	 * Start performing PUT-HTTP-Request
+	 *
+	 * @param string  $url
+	 * @param boolean $raw if response body contains JSON and should be decoded
+	 *
+	 * @return mixed response
+	 */
+	public function put($url, $raw = true)
+	{
+		$this->_baseUrl = $url;
+		return $this->_httpRequest('PUT', $raw);
+	}
+	/**
+	 * Start performing PATCH-HTTP-Request
+	 *
+	 * @param string  $url
+	 * @param boolean $raw if response body contains JSON and should be decoded
+	 *
+	 * @return mixed response
+	 */
+	public function patch($url, $raw = true)
+	{
+		$this->_baseUrl = $url;
+		$this->setHeaders([
+			'X-HTTP-Method-Override' => 'PATCH'
+		]);
+		return $this->_httpRequest('POST',$raw);
+	}
+	/**
+	 * Start performing DELETE-HTTP-Request
+	 *
+	 * @param string  $url
+	 * @param boolean $raw if response body contains JSON and should be decoded
+	 *
+	 * @return mixed response
+	 */
+	public function delete($url, $raw = true)
+	{
+		$this->_baseUrl = $url;
+		return $this->_httpRequest('DELETE', $raw);
+	}
+	/**
+	 * Set curl option
+	 *
+	 * @param string $key
+	 * @param mixed  $value
+	 *
+	 * @return $this
+	 */
+	public function setOption($key, $value)
+	{
+		//set value
+		if (array_key_exists($key, $this->_defaultOptions) && $key !== CURLOPT_WRITEFUNCTION) {
+			$this->_defaultOptions[$key] = $value;
+		} else {
+			$this->_options[$key] = $value;
+		}
+		//return self
+		return $this;
+	}
+	/**
+	 * Set get params
+	 *
+	 * @param array $params
+	 * @return $this
+	 */
+	public function setGetParams($params)
+	{
+		if (is_array($params)) {
+			foreach ($params as $key => $value) {
+				$this->_getParams[$key] = $value;
+			}
+		}
+		//return self
+		return $this;
+	}
+	/**
+	 * Set get params
+	 *
+	 * @param array $params
+	 * @return $this
+	 */
+	public function setPostParams($params)
+	{
+		if (is_array($params)) {
+			$this->setOption(
+				CURLOPT_POSTFIELDS,
+				http_build_query($params)
+			);
+		}
+		//return self
+		return $this;
+	}
+	/**
+	 * Set raw post data allows you to post any data format.
+	 *
+	 * @param mixed $data
+	 * @return $this
+	 */
+	public function setRawPostData($data)
+	{
+		$this->setOption(
+			CURLOPT_POSTFIELDS,
+			$data
+		);
+		//return self
+		return $this;
+	}
+	/**
+	 * Set get params
+	 *
+	 * @param string $data
+	 * @return $this
+	 */
+	public function setRequestBody($data)
+	{
+		if (is_string($data)) {
+			$this->setOption(
+				CURLOPT_POSTFIELDS,
+				$data
+			);
+		}
+		//return self
+		return $this;
+	}
+	/**
+	 * Get URL - return URL parsed with given params
+	 *
+	 * @return string The full URL with parsed get params
+	 */
+	public function getUrl()
+	{
+		if (Count($this->_getParams) > 0) {
+			return $this->_baseUrl.'?'.http_build_query($this->_getParams);
+		} else {
+			return $this->_baseUrl;
+		}
+	}
+	/**
+	 * Set curl options
+	 *
+	 * @param array $options
+	 *
+	 * @return $this
+	 */
+	public function setOptions($options)
+	{
+		$this->_options = $options + $this->_options;
+		return $this;
+	}
+	/**
+	 * Set multiple headers for request.
+	 *
+	 * @param array $headers
+	 *
+	 * @return $this
+	 */
+	public function setHeaders($headers)
+	{
+		if (is_array($headers)) {
+			//init
+			$parsedHeader = [];
+			//collect currently set headers
+			foreach ($this->getRequestHeaders() as $header => $value) {
+				array_push($parsedHeader, $header.':'.$value);
+			}
+			//parse header into right format key:value
+			foreach ($headers as $header => $value) {
+				array_push($parsedHeader, $header.':'.$value);
+			}
+			//set headers
+			$this->setOption(
+				CURLOPT_HTTPHEADER,
+				$parsedHeader
+			);
+		}
+		return $this;
+	}
+	/**
+	 * Set a single header for request.
+	 *
+	 * @param string $header
+	 * @param string $value
+	 *
+	 * @return $this
+	 */
+	public function setHeader($header, $value)
+	{
+		//init
+		$parsedHeader = [];
+		//collect currently set headers
+		foreach ($this->getRequestHeaders() as $headerToSet => $valueToSet) {
+			array_push($parsedHeader, $headerToSet.':'.$valueToSet);
+		}
+		//add override new header
+		if (strlen($header) > 0) {
+			array_push($parsedHeader, $header.':'.$value);
+		}
+		//set headers
+		$this->setOption(
+			CURLOPT_HTTPHEADER,
+			$parsedHeader
+		);
+		return $this;
+	}
+	/**
+	 * Unset a single header.
+	 *
+	 * @param string $header
+	 *
+	 * @return $this
+	 */
+	public function unsetHeader($header)
+	{
+		//init
+		$parsedHeader = [];
+		//collect currently set headers and filter "unset" header param.
+		foreach ($this->getRequestHeaders() as $headerToSet => $valueToSet) {
+			if ($header !== $headerToSet) {
+				array_push($parsedHeader, $headerToSet.':'.$valueToSet);
+			}
+		}
+		//set headers
+		$this->setOption(
+			CURLOPT_HTTPHEADER,
+			$parsedHeader
+		);
+		return $this;
+	}
+	/**
+	 * Get all request headers as key:value array
+	 *
+	 * @return array
+	 */
+	public function getRequestHeaders()
+	{
+		//Init
+		$requestHeaders = $this->getOption(CURLOPT_HTTPHEADER);
+		$parsedRequestHeaders = [];
+		if (is_array($requestHeaders)) {
+			foreach ($requestHeaders as $headerValue) {
+				list ($key, $value) = explode(':', $headerValue, 2);
+				$parsedRequestHeaders[$key] = $value;
+			}
+		}
+		return $parsedRequestHeaders;
+	}
+	/**
+	 * Get specific request header as key:value array
+	 *
+	 * @param string $headerKey
+	 *
+	 * @return string|null
+	 */
+	public function getRequestHeader($headerKey)
+	{
+		//Init
+		$parsedRequestHeaders = $this->getRequestHeaders();
+		return isset($parsedRequestHeaders[$headerKey]) ? $parsedRequestHeaders[$headerKey] : null;
+	}
+	/**
+	 * Unset a single curl option
+	 *
+	 * @param string $key
+	 *
+	 * @return $this
+	 */
+	public function unsetOption($key)
+	{
+		//reset a single option if its set already
+		if (isset($this->_options[$key])) {
+			unset($this->_options[$key]);
+		}
+		return $this;
+	}
+	/**
+	 * Unset all curl option, excluding default options.
+	 *
+	 * @return $this
+	 */
+	public function unsetOptions()
+	{
+		//reset all options
+		if (isset($this->_options)) {
+			$this->_options = [];
+		}
+		return $this;
+	}
+	/**
+	 * Total reset of options, responses, etc.
+	 *
+	 * @return $this
+	 */
+	public function reset()
+	{
+		if ($this->curl !== null) {
+			curl_close($this->curl); //stop curl
+		}
+		//reset all options
+		if (isset($this->_options)) {
+			$this->_options = [];
+		}
+		//reset response & status params
+		$this->curl = null;
+		$this->errorCode = null;
+		$this->response = null;
+		$this->responseCode = null;
+		$this->responseCharset = null;
+		$this->responseLength = -1;
+		$this->responseType = null;
+		$this->errorText = null;
+		$this->_postParams = [];
+		$this->_getParams = [];
+		return $this;
+	}
+	/**
+	 * Return a single option
+	 *
+	 * @param string|integer $key
+	 * @return mixed|boolean
+	 */
+	public function getOption($key)
+	{
+		//get merged options depends on default and user options
+		$mergesOptions = $this->getOptions();
+		//return value or false if key is not set.
+		return isset($mergesOptions[$key]) ? $mergesOptions[$key] : false;
+	}
+	/**
+	 * Return merged curl options and keep keys!
+	 *
+	 * @return array
+	 */
+	public function getOptions()
+	{
+		return $this->_options + $this->_defaultOptions;
+	}
+	/**
+	 * Get curl info according to http://php.net/manual/de/function.curl-getinfo.php
+	 *
+	 * @param null $opt
+	 * @return array|mixed
+	 */
+	public function getInfo($opt = null)
+	{
+		if ($this->curl !== null && $opt === null) {
+			return curl_getinfo($this->curl);
+		} elseif ($this->curl !== null && $opt !== null) {
+			return curl_getinfo($this->curl, $opt);
+		} else {
+			return [];
+		}
+	}
+	/**
+	 * Performs HTTP request
+	 *
+	 * @param string  $method
+	 * @param boolean $raw if response body contains JSON and should be decoded -> helper.
+	 *
+	 * @throws Exception if request failed
+	 *
+	 * @return mixed
+	 */
+	protected function _httpRequest($method, $raw = false)
+	{
+		//set request type and writer function
+		$this->setOption(CURLOPT_CUSTOMREQUEST, strtoupper($method));
+		//check if method is head and set no body
+		if ($method === 'HEAD') {
+			$this->setOption(CURLOPT_NOBODY, true);
+			$this->unsetOption(CURLOPT_WRITEFUNCTION);
+		}
+		//setup error reporting and profiling
+		if (YII_DEBUG) {
+			Yii::trace('Start sending cURL-Request: '.$this->getUrl().'\n', __METHOD__);
+			Yii::beginProfile($method.' '.$this->_baseUrl.'#'.md5(serialize($this->getOption(CURLOPT_POSTFIELDS))), __METHOD__);
+		}
+		/**
+		 * proceed curl
+		 */
+		$curlOptions =  $this->getOptions();
+		$this->curl = curl_init($this->getUrl());
+		curl_setopt_array($this->curl, $curlOptions);
+		$response = curl_exec($this->curl);
+		//check if curl was successful
+		if ($response === false) {
+			//set error code
+			$this->errorCode = curl_errno($this->curl);
+			$this->errorText = curl_strerror($this->errorCode);
+			switch ($this->errorCode) {
+				// 7, 28 = timeout
+				case 7:
+				case 28:
+					$this->responseCode = 'timeout';
+					return false;
+					break;
+				default:
+					return false;
+					break;
+			}
+		}
+		//extract header / body data if CURLOPT_HEADER are set to true
+		if (isset($curlOptions[CURLOPT_HEADER]) && $curlOptions[CURLOPT_HEADER]) {
+			$this->response = $this->_extractCurlBody($response);
+			$this->responseHeaders = $this->_extractCurlHeaders($response);
+		} else {
+			$this->response = $response;
+		}
+		// Extract additional curl params
+		$this->_extractAdditionalCurlParameter();
+		//end yii debug profile
+		if (YII_DEBUG) {
+			Yii::endProfile($method.' '.$this->getUrl().'#'.md5(serialize($this->getOption(CURLOPT_POSTFIELDS))), __METHOD__);
+		}
+		//check responseCode and return data/status
+		if ($this->getOption(CURLOPT_CUSTOMREQUEST) === 'HEAD') {
+			return true;
+		} else {
+			$this->response = $raw ? $this->response : Json::decode($this->response);
+			return $this->response;
+		}
+	}
+	/**
+	 * Extract additional curl params protected class helper
+	 */
+	protected function _extractAdditionalCurlParameter ()
+	{
+		/**
+		 * retrieve response code
+		 */
+		$this->responseCode = curl_getinfo($this->curl, CURLINFO_HTTP_CODE);
+		/**
+		 * try extract response type & charset.
+		 */
+		$this->responseType = curl_getinfo($this->curl, CURLINFO_CONTENT_TYPE);
+		if (!is_null($this->responseType) && count(explode(';', $this->responseType)) > 1) {
+			list($this->responseType, $possibleCharset) = explode(';', $this->responseType);
+			//extract charset
+			if (preg_match('~^charset=(.+?)$~', trim($possibleCharset), $matches) && isset($matches[1])) {
+				$this->responseCharset = strtolower($matches[1]);
+			}
+		}
+		/**
+		 * try extract response length
+		 */
+		$this->responseLength = curl_getinfo($this->curl, CURLINFO_CONTENT_LENGTH_DOWNLOAD);
+		if((int)$this->responseLength == -1) {
+			$this->responseLength = strlen($this->response);
+		}
+	}
+	/**
+	 * Extract body curl data from response
+	 *
+	 * @param string $response
+	 * @return string
+	 */
+	protected function _extractCurlBody ($response)
+	{
+		return substr($response, $this->getInfo(CURLINFO_HEADER_SIZE));
+	}
+	/**
+	 * Extract header curl data from response
+	 *
+	 * @param string $response
+	 * @return array
+	 */
+	protected function _extractCurlHeaders ($response)
+	{
+		//Init
+		$headers = [];
+		$headerText = substr($response, 0, strpos($response, "\r\n\r\n"));
+		foreach (explode("\r\n", $headerText) as $i => $line) {
+			if ($i === 0) {
+				$headers['http_code'] = $line;
+			} else {
+				list ($key, $value) = explode(':', $line, 2);
+				$headers[$key] = ltrim($value);
+			}
+		}
+		return $headers;
+	}
+}

+ 1 - 1
components/ErrorHandler.php

@@ -63,7 +63,7 @@ class ErrorHandler extends baseErrorHandler
 		}
 
 		if (is_array($response->data)) {
-			$response->data['error'] = $response->data['name'];
+			//$response->data['error'] = $response->data['name'];
 			unset($response->data['name'], $response->data['type']);
 			ksort($response->data);
 			$response->data['data'] = [];

+ 22 - 0
console/controllers/BaseController.php

@@ -0,0 +1,22 @@
+<?php
+namespace console\controllers;
+
+use yii\console\Controller;
+
+/**
+ * 继承控制器
+ * Class BaseController
+ * @package console\controllers
+ */
+class BaseController extends Controller
+{
+	public function beforeAction($action)
+	{
+		//todo start log
+	}
+
+	public function afterAction($action, $result)
+	{
+		//todo end log
+	}
+}

+ 106 - 0
console/controllers/MainController.php

@@ -0,0 +1,106 @@
+<?php
+namespace console\controllers;
+
+use yii\console\Controller;
+
+/**
+ * 主进程:保持运行
+ * Class MainController
+ * @package console\controllers
+ * @author: libingke
+ *
+ * @note 必须将主进程添加到crontab脚本里,最好间隔时间内运行
+ * 如=>  1 * * * *  php /项目路径/yii main >/dev/null 2>&1 &
+ */
+class MainController extends Controller
+{
+	const DURATION = 10;	//主进程检查间隔时间
+
+	/**
+	 * 子进程
+	 * @var array ['route' => '', 'params' => '']
+	 */
+	private $_task = [
+		['route' => 'worker-msg/login', 'params' => '']
+	];
+
+	/**
+	 * [init]
+	 */
+	public function actionIndex()
+	{
+		//如果Main进程已经在运行了,则不启动,保持主进程只有一个
+		if ($this->checkMainProcess()) {
+			echo "Main process is running. Please check it."; exit;
+		}
+
+		//每N秒检查一次任务列表
+		while (1) {
+			$this->checkTasks();
+			sleep(self::DURATION);
+		};
+	}
+
+	/**
+	 * 扫描所有任务
+	 */
+	public function checkTasks()
+	{
+		echo "\n Checking tasks......";
+		$list = $this->_task;
+		foreach ($list as $task) {
+			$this->execTask($task['route']);
+		}
+	}
+
+	/**
+	 * 执行任务
+	 * @param $program
+	 * @return bool
+	 */
+	public function execTask($program)
+	{
+		//现在只支持单进程,如果以后支持多个进程同时运行,需要修改这里的逻辑
+		if ($this->checkProcess($program)) {
+			return false;
+		}
+		$dir = dirname(\Yii::$app->getBasePath());
+		//将任务放到后台执行
+		$cmd = "nohup php ".$dir."/yii " . $program." >/dev/null 2>&1 &";
+		echo "\n ".$cmd;
+		exec($cmd);
+	}
+
+	/**
+	 * 杀掉任务
+	 * @param $program
+	 */
+	public function killTask($program)
+	{
+		//避免误杀到其它(短字符的)进程
+		if (strlen($program) < 4)
+			return false;
+		$cmd = "ps -ef | grep " . $program . " | awk '{print $2}' | xargs kill -9";
+		exec($cmd);
+		return !$this->checkProcess($program);
+	}
+
+	/**
+	 * 检查是否进程中已经有别的主进程
+	 * @param $process
+	 * @return bool
+	 */
+	public function checkMainProcess()
+	{
+		$cmd = "ps -ef |grep -v grep|grep -v 'sh -c' | grep main";
+		exec($cmd, $output);
+		return count($output) > 1;
+	}
+
+	public function checkProcess($process)
+	{
+		$cmd = "ps -ef |grep -v grep|grep -v 'sh -c' | grep " . $process;
+		exec($cmd, $output);
+		return count($output) > 0;
+	}
+}

+ 90 - 0
console/controllers/WorkerMsgController.php

@@ -0,0 +1,90 @@
+<?php
+
+namespace console\controllers;
+
+use backend\forms\MessageForm;
+use common\logic\Amqp\Cache;
+use common\logic\Amqp\Connect;
+use components\Curl;
+
+/**
+ * 消息处理进程
+ * Class MessageController
+ * @package console\controllers
+ */
+class WorkerMsgController extends BaseController
+{
+	/**
+	 * @var null
+	 */
+	private $_conn = null;
+	/**
+	 * @return \PhpAmqpLib\Connection\AMQPStreamConnection
+	 */
+	protected function getConn()
+	{
+		if ($this->_conn == null) {
+			$this->_conn = new \PhpAmqpLib\Connection\AMQPStreamConnection(
+				Connect::HOST,
+				Connect::PORT,
+				Connect::USER,
+				Connect::PASS
+			);
+		}
+		return $this->_conn;
+	}
+
+	/**
+	 * [worker:登录处理]
+	 * @author: libingke
+	 */
+    public function actionLogin()
+    {
+		$queue = MessageForm::getQueueName('login');
+    	$conn = $this->getConn();
+		$channel = $conn->channel();
+
+        $callback = function ($msg) {
+        	$cacheTime = 600;
+			$status = Cache::STATUS_HAND_FAIL;
+			$corr_id = $msg->get('correlation_id');
+
+        	$post = json_decode($msg->body, true);
+        	//todo log
+        	if (!is_array($post)) {
+        		$data = ['code' => Cache::STATUS_HAND_FAIL, 'message' => '消息格式错误'];
+			} else {
+        		//handle
+				$loginUrl = 'https://dev407.33.cn/admin/member/login';
+				$post['redirect_uri'] = 'https://zpapi.licai.cn';
+				$curl = new Curl();
+				$curl->setPostParams($post);
+				$result = json_decode($curl->post($loginUrl), true);
+
+				if (is_array($result) && isset($result['code'])) {
+					$cacheTime = isset($result['data']['expires_in']) ? $result['data']['expires_in'] : 864000;
+					$status = Cache::STATUS_HAND_OK;
+					$data = $result;
+				} else {
+					$data = ['code' => Cache::STATUS_HAND_FAIL, 'message' => $curl->errorText];
+				}
+
+				echo "Done: " . $corr_id, "\n";
+				$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
+			}
+
+			Cache::setData($corr_id, $status);
+			Cache::setData('result_' . $corr_id, $data, $cacheTime);
+        };
+
+        $channel->basic_qos(null, 1, null);
+        $channel->basic_consume($queue, '', false, false, false, false, $callback);
+
+        while (count($channel->callbacks)) {
+            $channel->wait();
+        }
+
+        $channel->close();
+        $conn->close();
+    }
+}