Browse Source

服务端消费逻辑@登录

lbk 6 years ago
parent
commit
59b36e8ac6

+ 32 - 0
backend/controllers/QueryController.php

@@ -55,4 +55,36 @@ class QueryController extends BaseController
 			'data' => ['status' => $r, 'status_mark' => AmqpConfig::getMarkById($r)]
 		];
 	}
+
+	/**
+	 * 查询消息结果
+	 * @author: libingke
+	 * @return mixed
+	 * @throws Exception
+	 */
+	public function actionMessageResult()
+	{
+		$params = Yii::$app->request->queryParams;
+		if (!isset($params['queue']))
+			throw new Exception(1100);
+
+		if (!is_string($params['queue']) || !$params['queue'])
+			throw new Exception(1101);
+
+		if (!isset($params['mid']))
+			throw new Exception(1203);
+
+		if (!is_string($params['mid']) || !$params['mid'])
+			throw new Exception(1204);
+
+		$s = Redis::get($params['queue'], $params['mid'], 'status');
+		if ($s != AmqpConfig::STATUS_HAND_OK)
+			throw new Exception(1005);
+
+		$r = Redis::get($params['queue'], $params['mid'], 'status');
+		if ($r == null)
+			throw new Exception(1004);
+
+		return json_decode($r, true);
+	}
 }

+ 36 - 1
backend/forms/MessageForm.php

@@ -2,6 +2,7 @@
 
 namespace backend\forms;
 
+use common\logic\LoginHandle;
 use PhpAmqpLib\Message\AMQPMessage;
 use components\service\AmqpConfig;
 use components\service\Redis;
@@ -122,6 +123,8 @@ class MessageForm extends BaseForm
 			[['name', 'count'], 'required', 'on' => ['consume']],
 			['count', 'integer', 'min' => 1, 'max' => 65000, 'message' => 1205,
 				'tooSmall' => 1206, 'on' => ['consume']],
+			['type', 'default', 'value' => 'client', 'on' => 'consume'],
+			['type', 'in', 'range' => ['server', 'client'], 'message' => 1302, 'on' => 'consume'],
 
 			/* delete & ack */
 			[['mids', 'name'], 'required', 'on' => ['delete', 'ack']],
@@ -315,6 +318,14 @@ class MessageForm extends BaseForm
 	public function consumeMessage()
 	{
 		$q_name = $this->name;
+		if ($this->type == 'server') {
+			$function = '_closure' . ucfirst($q_name);
+			if ( !method_exists($this, $function) )
+				throw new Exception(1303);
+
+		} else {
+			$function = 'closureConsume';
+		}
 
 		$connect = $this->getConnect();
 		$channel = $connect->channel();
@@ -329,7 +340,7 @@ class MessageForm extends BaseForm
 		try {
 			$min = min($this->count, $total);
 
-			$callback = function ($msg) use($q_name) {call_user_func_array([$this, 'closureConsume'], [$msg, $q_name]);};
+			$callback = function ($msg) use($q_name, $function) {call_user_func_array([$this, $function], [$msg, $q_name]);};
 			$channel->basic_qos(0, $min, null);
 			$channel->basic_consume($q_name,
 				'', false, false, false, false, $callback);
@@ -522,4 +533,28 @@ class MessageForm extends BaseForm
 			throw new Exception(1001, $e->getMessage());
 		}
 	}
+
+	private function _closureLogin($msg, $queue)
+	{
+		$data = ['mid' => '', 'body' => '', 'response' => '', 'error' => ''];
+		try {
+			$data['mid'] = $msg->get('message_id');
+			$data['body'] = $msg->body;
+			if ($data['mid']) {
+				$handle = new LoginHandle();
+				$data['response'] = $handle->login($msg->body, $queue, $data['mid']);
+
+				$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
+				Redis::set($queue, $data['mid'], 'status', AmqpConfig::STATUS_HAND_OK);
+				Redis::set($queue, $data['mid'], 'result', $data['response']);
+				Redis::expire($queue, $data['mid'], 'result', 3600);
+			}
+
+		} catch (\Exception $e) {
+			$data['error'] = $e->getMessage();
+
+		} finally {
+			$this->_result[] = $data;
+		}
+	}
 }

+ 46 - 0
common/logic/LoginHandle.php

@@ -0,0 +1,46 @@
+<?php
+namespace common\logic;
+
+use components\Curl;
+use components\Exception;
+
+/**
+ * 登陆
+ * Class LoginHandle
+ * @package common\logic
+ */
+class LoginHandle
+{
+	const URL = 'https://zpapi.licai.cn';
+
+	/**
+	 * login
+	 * @author: libingke
+	 * @param $body
+	 * @return array
+	 */
+	public function login($body)
+	{
+		$post = json_decode($body, true);
+		if (!is_array($post))
+			throw new Exception(1304, '消息格式错误');
+
+		$post['redirect_uri'] = 'https://zpapi.licai.cn';
+
+		$curl = new Curl();
+		$curl->setPostParams($post);
+		$result = json_decode($curl->post(static::URL), true);
+		if ($curl->responseCode != 200)
+			throw new Exception($curl->responseCode);
+
+		if ($curl->errorText)
+			throw new Exception(1304, $curl->errorText);
+
+		if (isset($result['error']) && is_string($result['error']))
+			throw new Exception(1304, $result['error']);
+
+		$json = json_encode($result);
+
+		return $json;
+	}
+}

+ 4 - 0
common/messages/zh-CN/errorCode.php

@@ -85,6 +85,7 @@ return [
 	'1002' => 'rabbitMQ API 错误',
 	'1003' => '表单验证失败',
 	'1004' => '未查询到结果',
+	'1005' => '该消息并未返回结果',
 
 	//queue 1100~1199
 	'1100' => '队列名称不能为空',
@@ -105,6 +106,9 @@ return [
 	//consume 1300~1399
 	'1300' => '队列中没有消息',
 	'1301' => '队列中没有可删除的消息',
+	'1302' => '非法的消费方式',
+	'1303' => '未找到该队列的消费回调',
+	'1304' => '消费回调错误',
 
 	//ack 1400~1499
 ];