MessageController.php 5.9 KB


  1. <?php
  2. namespace backend\controllers;
  3. use backend\forms\MessageForm;
  4. use common\logic\Amqp\Cache;
  5. use components\Exception;
  6. use common\logic\Amqp\Message;
  7. use common\logic\Amqp\Queue;
  8. use Yii;
  9. class MessageController extends BaseController
  10. {
  11. /**
  12. * [查询回执信息]
  13. * @author: libingke
  14. */
  15. public function actionQueryReceipt($query)
  16. {
  17. if (is_string($query)) {
  18. $status = Cache::getData($query);
  19. if ($status !== false) {
  20. $mark = Cache::getMarkById($status);//状态码 => 提示信息
  21. if (Cache::STATUS_HAND_OK == $status) {
  22. return Cache::getData('result_' . $query);//直接返回数据
  23. } else {
  24. throw new Exception(2101, $mark);//非正常时回执
  25. }
  26. } else {
  27. throw new Exception(2102);//无效
  28. }
  29. } else {
  30. throw new Exception(2102);//无效
  31. }
  32. }
  33. /**
  34. * 发送消息
  35. * @author: libingke
  36. * @return array
  37. * @throws Exception
  38. */
  39. public function actionSend()
  40. {
  41. /* 选择对应表单 start */
  42. $form = Yii::$app->request->get('form');
  43. switch ($form)
  44. {
  45. //发送登录消息
  46. case 'licai_login':
  47. $scenario = 'login';
  48. break;
  49. case '':
  50. throw new Exception(2201);
  51. break;
  52. default:
  53. throw new Exception(2202);
  54. }
  55. /* 选择对应表单 end */
  56. if (!Yii::$app->request->isPost)
  57. throw new Exception('1001');
  58. //验证
  59. $model = new MessageForm();
  60. $model->setScenario($scenario);
  61. $model->load(['MessageForm' => Yii::$app->request->post()]);
  62. $data = [];
  63. if ($model->validate()) {
  64. $data['queueName'] = MessageForm::getQueueName($scenario);
  65. $data['requestId'] = $model->sendMessage(); //send
  66. } else {
  67. $model->handleError();//处理验证失败
  68. }
  69. return [
  70. 'code' => 200,
  71. 'message' => 'OK',
  72. 'data' => $data
  73. ];
  74. }
  75. /**
  76. * 发送消息 (接受多条)
  77. * @author: libingke
  78. */
  79. public function actionBatchSend()
  80. {
  81. $params = Yii::$app->request->post();
  82. //test data start todo delete
  83. $test[] = [
  84. 'queue' => 'y1',
  85. 'infos' => [
  86. json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y1-m1']),
  87. json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y1-m2'])
  88. ],
  89. ];
  90. $test[] = [
  91. 'queue' => 'y2',
  92. 'infos' => [
  93. json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y2-m1']),
  94. json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y2-m2'])
  95. ],
  96. ];
  97. $params['data'] = json_encode($test);
  98. //test data end todo delete
  99. if (!isset($params['data']))
  100. throw new Exception('2005');
  101. $data = json_decode($params['data'], true);
  102. if (!is_array($data) || !$data)
  103. throw new Exception('2006');
  104. $errCount = 0;
  105. foreach ($data as $items) {
  106. if (!isset($items['queue'])
  107. || !is_string($items['queue'])
  108. || !isset($items['infos'])
  109. || !is_array($items['infos'])
  110. || !$items['infos'] )
  111. {
  112. $errCount ++;
  113. continue;
  114. }
  115. try {
  116. //todo check
  117. //if queue exist
  118. $queueModel = (new Queue())->create($items['queue']);
  119. if ($queueModel['status'] == 1) {
  120. $message = new Message($items['queue']);
  121. foreach ($items['infos'] as $info) {
  122. $message->batchSend($info, $items['queue']);
  123. }
  124. } else {
  125. $errCount ++;
  126. }
  127. } catch (\common\logic\Amqp\Exception $e) {
  128. $errCount ++;
  129. //throw new Exception($e->getCode(), $e->getMessage());
  130. }
  131. }
  132. return [
  133. 'code' => 200,
  134. 'message' => Yii::t('error', 200),
  135. 'data' => ['error_count' => $errCount]
  136. ];
  137. }
  138. /**
  139. * 消费消息 todo
  140. * @author: libingke
  141. */
  142. public function actionReceive()
  143. {
  144. //获取接收参数 利用model验证或者判断
  145. $params = Yii::$app->request->post();
  146. $post = [
  147. 'message1' => 'message 1',
  148. 'message2' => 'message 2',
  149. 'message3' => 'message 3',
  150. 'queue' => 'y1',
  151. ];
  152. try {
  153. $queue = (new Queue())->create($post['queue']);
  154. if ($queue['status'] == 1) {
  155. $message = new Message($queue['result']);
  156. $message->receive($post['queue']);
  157. return $result = ['code' => 200, 'message' => Yii::t('common', 'OK')];
  158. } else {
  159. return ['code' => $queue['status'], 'message' => $queue['result']];
  160. }
  161. } catch (\common\logic\Amqp\Exception $e) {
  162. $result = ['code' => $e->getCode(), 'message' => $e->getMessage()];
  163. }
  164. return $result;
  165. }
  166. /**
  167. * 批量消费消息
  168. * @author: libingke
  169. */
  170. public function actionBatchReceive()
  171. {
  172. //获取接收参数 利用model验证或者判断
  173. $params = Yii::$app->request->post();
  174. $post = [
  175. 'message1' => 'message 1',
  176. 'message2' => 'message 2',
  177. 'message3' => 'message 3',
  178. 'queue' => 'y1',
  179. ];
  180. try {
  181. $queue = (new Queue())->create($post['queue']);
  182. if ($queue['status'] == 1) {
  183. $message = new Message($queue['result']);
  184. $message->receive($post['queue']);
  185. return $result = ['code' => 200, 'message' => Yii::t('common', 'OK')];
  186. } else {
  187. return ['code' => $queue['status'], 'message' => $queue['result']];
  188. }
  189. } catch (\common\logic\Amqp\Exception $e) {
  190. $result = ['code' => $e->getCode(), 'message' => $e->getMessage()];
  191. }
  192. return $result;
  193. }
  194. /**
  195. * 删除消息
  196. * @author: libingke
  197. */
  198. public function actionDelete()
  199. {
  200. //获取接收参数 利用model验证或者判断
  201. $params = Yii::$app->request->post();
  202. $post = [
  203. 'message1' => 'message 1',
  204. 'message2' => 'message 2',
  205. 'message3' => 'message 3',
  206. 'queue' => 'y1',
  207. ];
  208. try {
  209. $queue = (new Queue())->create($post['queue']);
  210. if ($queue['status'] == 1) {
  211. $message = new Message($queue['result']);
  212. $message->delete($post['message1']);
  213. return $result = ['code' => 200, 'message' => Yii::t('common', 'OK')];
  214. } else {
  215. return ['code' => $queue['status'], 'message' => $queue['result']];
  216. }
  217. } catch (\common\logic\Amqp\Exception $e) {
  218. $result = ['code' => $e->getCode(), 'message' => $e->getMessage()];
  219. }
  220. return $result;
  221. }
  222. /**
  223. * 批量删除消息
  224. * @author: libingke
  225. */
  226. public function actionBatchDelete()
  227. {
  228. }
  229. }