MessageController.php 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. <?php
  2. namespace backend\controllers;
  3. use components\Exception;
  4. use common\logic\Amqp\Message;
  5. use common\logic\Amqp\Queue;
  6. use Yii;
  7. class MessageController extends BaseController
  8. {
  9. public function batch_basic_publish(
  10. $msg,
  11. $exchange = '',
  12. $routing_key = '',
  13. $mandatory = false,
  14. $immediate = false,
  15. $ticket = null
  16. ) {
  17. var_dump($exchange);
  18. $a[] = func_get_args();
  19. var_dump($a);die;
  20. }
  21. /**
  22. * 发送消息 (接受多条)
  23. * @author: libingke
  24. * @return array
  25. * @throws Exception
  26. */
  27. public function actionSend()
  28. {
  29. //if (!Yii::$app->request->isPost)
  30. // throw new Exception('1001');
  31. $params = Yii::$app->request->post();
  32. //test data todo delete
  33. $params = [
  34. 'body' => json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn']),
  35. 'queue' => 'y1',
  36. ];
  37. if (!isset($params['body']))
  38. throw new Exception('2001');
  39. if (!is_string($params['body']))
  40. throw new Exception('2002');
  41. if (!isset($params['queue']))
  42. throw new Exception('2003');
  43. if (!is_string($params['queue']))
  44. throw new Exception('2004');
  45. $body = $params['body'];
  46. $queue = $params['queue'];
  47. try {
  48. $queueModel = (new Queue())->create($queue);
  49. if ($queueModel['status'] == 1) {
  50. $message = new Message($queue);
  51. $message->send($body, $queue);
  52. $code = 200;
  53. } else {
  54. $code = 2100;
  55. }
  56. } catch (\common\logic\Amqp\Exception $e) {
  57. throw new Exception($e->getCode(), $e->getMessage());
  58. }
  59. return [
  60. 'code' => $code,
  61. 'message' => Yii::t('error', $code)
  62. ];
  63. }
  64. /**
  65. * 发送消息 (接受多条)
  66. * @author: libingke
  67. */
  68. public function actionBatchSend()
  69. {
  70. $params = Yii::$app->request->post();
  71. //test data start todo delete
  72. $test[] = [
  73. 'queue' => 'y1',
  74. 'infos' => [
  75. json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y1-m1']),
  76. json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y1-m2'])
  77. ],
  78. ];
  79. $test[] = [
  80. 'queue' => 'y2',
  81. 'infos' => [
  82. json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y2-m1']),
  83. json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y2-m2'])
  84. ],
  85. ];
  86. $params['data'] = json_encode($test);
  87. //test data end todo delete
  88. if (!isset($params['data']))
  89. throw new Exception('2005');
  90. $data = json_decode($params['data'], true);
  91. if (!is_array($data) || !$data)
  92. throw new Exception('2006');
  93. $errCount = 0;
  94. foreach ($data as $items) {
  95. if (!isset($items['queue'])
  96. || !is_string($items['queue'])
  97. || !isset($items['infos'])
  98. || !is_array($items['infos'])
  99. || !$items['infos'] )
  100. {
  101. $errCount ++;
  102. continue;
  103. }
  104. try {
  105. //todo check
  106. //if queue exist
  107. $queueModel = (new Queue())->create($items['queue']);
  108. if ($queueModel['status'] == 1) {
  109. $message = new Message($items['queue']);
  110. foreach ($items['infos'] as $info) {
  111. $message->batchSend($info, $items['queue']);
  112. }
  113. } else {
  114. $errCount ++;
  115. }
  116. } catch (\common\logic\Amqp\Exception $e) {
  117. $errCount ++;
  118. //throw new Exception($e->getCode(), $e->getMessage());
  119. }
  120. }
  121. return [
  122. 'code' => 200,
  123. 'message' => Yii::t('error', 200),
  124. 'data' => ['error_count' => $errCount]
  125. ];
  126. }
  127. /**
  128. * 消费消息 todo
  129. * @author: libingke
  130. */
  131. public function actionReceive()
  132. {
  133. //获取接收参数 利用model验证或者判断
  134. $params = Yii::$app->request->post();
  135. $post = [
  136. 'message1' => 'message 1',
  137. 'message2' => 'message 2',
  138. 'message3' => 'message 3',
  139. 'queue' => 'y1',
  140. ];
  141. try {
  142. $queue = (new Queue())->create($post['queue']);
  143. if ($queue['status'] == 1) {
  144. $message = new Message($queue['result']);
  145. $message->receive($post['queue']);
  146. return $result = ['code' => 200, 'message' => Yii::t('common', 'OK')];
  147. } else {
  148. return ['code' => $queue['status'], 'message' => $queue['result']];
  149. }
  150. } catch (\common\logic\Amqp\Exception $e) {
  151. $result = ['code' => $e->getCode(), 'message' => $e->getMessage()];
  152. }
  153. return $result;
  154. }
  155. /**
  156. * 批量消费消息
  157. * @author: libingke
  158. */
  159. public function actionBatchReceive()
  160. {
  161. //获取接收参数 利用model验证或者判断
  162. $params = Yii::$app->request->post();
  163. $post = [
  164. 'message1' => 'message 1',
  165. 'message2' => 'message 2',
  166. 'message3' => 'message 3',
  167. 'queue' => 'y1',
  168. ];
  169. try {
  170. $queue = (new Queue())->create($post['queue']);
  171. if ($queue['status'] == 1) {
  172. $message = new Message($queue['result']);
  173. $message->receive($post['queue']);
  174. return $result = ['code' => 200, 'message' => Yii::t('common', 'OK')];
  175. } else {
  176. return ['code' => $queue['status'], 'message' => $queue['result']];
  177. }
  178. } catch (\common\logic\Amqp\Exception $e) {
  179. $result = ['code' => $e->getCode(), 'message' => $e->getMessage()];
  180. }
  181. return $result;
  182. }
  183. /**
  184. * 删除消息
  185. * @author: libingke
  186. */
  187. public function actionDelete()
  188. {
  189. //获取接收参数 利用model验证或者判断
  190. $params = Yii::$app->request->post();
  191. $post = [
  192. 'message1' => 'message 1',
  193. 'message2' => 'message 2',
  194. 'message3' => 'message 3',
  195. 'queue' => 'y1',
  196. ];
  197. try {
  198. $queue = (new Queue())->create($post['queue']);
  199. if ($queue['status'] == 1) {
  200. $message = new Message($queue['result']);
  201. $message->delete($post['message1']);
  202. return $result = ['code' => 200, 'message' => Yii::t('common', 'OK')];
  203. } else {
  204. return ['code' => $queue['status'], 'message' => $queue['result']];
  205. }
  206. } catch (\common\logic\Amqp\Exception $e) {
  207. $result = ['code' => $e->getCode(), 'message' => $e->getMessage()];
  208. }
  209. return $result;
  210. }
  211. /**
  212. * 批量删除消息
  213. * @author: libingke
  214. */
  215. public function actionBatchDelete()
  216. {
  217. }
  218. }