MessageController.php 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. <?php
  2. namespace backend\controllers;
  3. use backend\models\Jobs;
  4. use backend\forms\MessageForm;
  5. use common\logic\Amqp\Cache;
  6. use common\logic\Amqp\Message;
  7. use common\logic\Amqp\Queue;
  8. use components\Exception;
  9. use Yii;
  10. class MessageController extends BaseController
  11. {
  12. /**
  13. * [查询回执信息]
  14. * @author: libingke
  15. */
  16. public function actionQueryReceipt($query)
  17. {
  18. if (is_string($query)) {
  19. $status = Cache::getData($query);
  20. if ($status !== false) {
  21. $mark = Cache::getMarkById($status);//状态码 => 提示信息
  22. if (Cache::STATUS_HAND_OK == $status) {
  23. return Cache::getData('result_' . $query);//直接返回数据
  24. } else {
  25. throw new Exception(2101, $mark);//非正常时回执
  26. }
  27. } else {
  28. throw new Exception(2102);//无效
  29. }
  30. } else {
  31. throw new Exception(2102);//无效
  32. }
  33. }
  34. /**
  35. * 发送消息
  36. * @author: libingke
  37. * @return array
  38. * @throws Exception
  39. */
  40. public function actionSend()
  41. {
  42. $sign = Yii::$app->request->get('sign');
  43. if ($sign == '')
  44. throw new Exception(2201);
  45. $job = Jobs::fetchDataBySign($sign, false);
  46. if (!isset($job['scenario']))
  47. throw new Exception(2202);//验证场景
  48. $scenario = $job['scenario'];
  49. $queue = $job['queue'];
  50. if (!Yii::$app->request->isPost)
  51. throw new Exception('1001');
  52. $model = new MessageForm();
  53. $model->setScenario($scenario);
  54. $model->setQueue($queue);
  55. $model->load(['MessageForm' => Yii::$app->request->post()]);
  56. $data = [];
  57. if ($model->validate()) {
  58. $data['queueName'] = $model->getQueue();
  59. $data['requestId'] = $model->sendMessage();
  60. } else {
  61. $model->handleError();//处理验证失败
  62. }
  63. return [
  64. 'code' => 200,
  65. 'message' => Yii::t('error', 200),
  66. 'data' => $data
  67. ];
  68. }
  69. /**
  70. * 发送消息
  71. * @author: libingke
  72. */
  73. public function actionBatchSend()
  74. {
  75. $sign = Yii::$app->request->get('sign');
  76. if ($sign == '')
  77. throw new Exception(2201);
  78. $job = Jobs::fetchDataBySign($sign, false);
  79. if (!isset($job['scenario']))
  80. throw new Exception(2202);//验证场景
  81. $scenario = $job['scenario'];
  82. $queue = $job['queue'];
  83. if (!Yii::$app->request->isPost)
  84. throw new Exception('1001');
  85. $model = new MessageForm();
  86. $model->setScenario($scenario);
  87. $model->setQueue($queue);
  88. $model->load(['MessageForm' => Yii::$app->request->post()]);
  89. $data = [];
  90. if ($model->validate()) {
  91. $data['queueName'] = $model->getQueue();
  92. $data['requestId'] = $model->sendMessage();
  93. } else {
  94. $model->handleError();//处理验证失败
  95. }
  96. return [
  97. 'code' => 200,
  98. 'message' => Yii::t('error', 200),
  99. 'data' => $data
  100. ];
  101. $params = Yii::$app->request->post();
  102. //test data start todo delete
  103. $test[] = [
  104. 'queue' => 'y1',
  105. 'infos' => [
  106. json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y1-m1']),
  107. json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y1-m2'])
  108. ],
  109. ];
  110. $test[] = [
  111. 'queue' => 'y2',
  112. 'infos' => [
  113. json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y2-m1']),
  114. json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y2-m2'])
  115. ],
  116. ];
  117. $params['data'] = json_encode($test);
  118. //test data end todo delete
  119. if (!isset($params['data']))
  120. throw new Exception('2005');
  121. $data = json_decode($params['data'], true);
  122. if (!is_array($data) || !$data)
  123. throw new Exception('2006');
  124. $errCount = 0;
  125. foreach ($data as $items) {
  126. if (!isset($items['queue'])
  127. || !is_string($items['queue'])
  128. || !isset($items['infos'])
  129. || !is_array($items['infos'])
  130. || !$items['infos'] )
  131. {
  132. $errCount ++;
  133. continue;
  134. }
  135. try {
  136. //todo check
  137. //if queue exist
  138. $queueModel = (new Queue())->create($items['queue']);
  139. if ($queueModel['status'] == 1) {
  140. $message = new Message($items['queue']);
  141. foreach ($items['infos'] as $info) {
  142. $message->batchSend($info, $items['queue']);
  143. }
  144. } else {
  145. $errCount ++;
  146. }
  147. } catch (\common\logic\Amqp\Exception $e) {
  148. $errCount ++;
  149. //throw new Exception($e->getCode(), $e->getMessage());
  150. }
  151. }
  152. return [
  153. 'code' => 200,
  154. 'message' => Yii::t('error', 200),
  155. 'data' => ['error_count' => $errCount]
  156. ];
  157. }
  158. /**
  159. * 消费消息 todo
  160. * @author: libingke
  161. */
  162. public function actionConsume()
  163. {
  164. $queue = 'login';
  165. try {
  166. $message = new Message($queue);
  167. $message->consume($queue);
  168. return $result = ['code' => 200, 'message' => Yii::t('common', 'OK')];
  169. } catch (\common\logic\Amqp\Exception $e) {
  170. $result = ['code' => $e->getCode(), 'message' => $e->getMessage()];
  171. }
  172. return $result;
  173. }
  174. /**
  175. * 批量消费消息
  176. * @author: libingke
  177. */
  178. public function actionBatchReceive()
  179. {
  180. //获取接收参数 利用model验证或者判断
  181. $params = Yii::$app->request->post();
  182. $post = [
  183. 'message1' => 'message 1',
  184. 'message2' => 'message 2',
  185. 'message3' => 'message 3',
  186. 'queue' => 'y1',
  187. ];
  188. try {
  189. $queue = (new Queue())->create($post['queue']);
  190. if ($queue['status'] == 1) {
  191. $message = new Message($queue['result']);
  192. $message->receive($post['queue']);
  193. return $result = ['code' => 200, 'message' => Yii::t('common', 'OK')];
  194. } else {
  195. return ['code' => $queue['status'], 'message' => $queue['result']];
  196. }
  197. } catch (\common\logic\Amqp\Exception $e) {
  198. $result = ['code' => $e->getCode(), 'message' => $e->getMessage()];
  199. }
  200. return $result;
  201. }
  202. /**
  203. * 删除消息
  204. * @author: libingke
  205. */
  206. public function actionDelete()
  207. {
  208. //获取接收参数 利用model验证或者判断
  209. $params = Yii::$app->request->post();
  210. $post = [
  211. 'message1' => 'message 1',
  212. 'message2' => 'message 2',
  213. 'message3' => 'message 3',
  214. 'queue' => 'y1',
  215. ];
  216. try {
  217. $queue = (new Queue())->create($post['queue']);
  218. if ($queue['status'] == 1) {
  219. $message = new Message($queue['result']);
  220. $message->delete($post['message1']);
  221. return $result = ['code' => 200, 'message' => Yii::t('common', 'OK')];
  222. } else {
  223. return ['code' => $queue['status'], 'message' => $queue['result']];
  224. }
  225. } catch (\common\logic\Amqp\Exception $e) {
  226. $result = ['code' => $e->getCode(), 'message' => $e->getMessage()];
  227. }
  228. return $result;
  229. }
  230. /**
  231. * 批量删除消息
  232. * @author: libingke
  233. */
  234. public function actionBatchDelete()
  235. {
  236. }
  237. }