0
0

MessageForm.php 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525
  1. <?php
  2. namespace backend\forms;
  3. use PhpAmqpLib\Message\AMQPMessage;
  4. use components\service\AmqpConfig;
  5. use components\service\Redis;
  6. use components\Exception;
  7. use components\Curl;
  8. use common\helpers\KeyHelper;
  9. use yii\helpers\ArrayHelper;
  10. use Yii;
  11. class MessageForm extends BaseForm
  12. {
  13. /**
  14. * @var
  15. */
  16. public $queue;
  17. /**
  18. * @var
  19. */
  20. public $message;
  21. /**
  22. * @var
  23. */
  24. public $name;
  25. /**
  26. * @var integer 数量
  27. */
  28. public $count;
  29. /**
  30. * @var bool 是否重新排序
  31. */
  32. public $requeue;
  33. /**
  34. * @var string 编码
  35. */
  36. public $encoding;
  37. /**
  38. * @var string 消费id
  39. */
  40. public $mid;
  41. /**
  42. * @var array 消费ids
  43. */
  44. public $mids;
  45. /**
  46. * @var string 消费类型
  47. */
  48. public $type;
  49. /**
  50. * @var bool 自动应答
  51. */
  52. public $ack;
  53. /**
  54. * @var bool 强制删除
  55. */
  56. public $forced;
  57. /**
  58. * @var bool
  59. */
  60. private $_stop = false;
  61. /**
  62. * @var string
  63. */
  64. private $_mid;
  65. /**
  66. * @var string
  67. */
  68. private $_body;
  69. /**
  70. * @var AMQPMessage
  71. */
  72. private $_message;
  73. /**
  74. * @var array
  75. */
  76. private $_rows;
  77. /**
  78. * @var array
  79. */
  80. private $_result;
  81. public function rules()
  82. {
  83. return [
  84. [['name'], 'required', 'on' => ['message_list', 'purge']],
  85. [['name'], 'trim', 'on' => ['message_list', 'purge', 'consume', 'delete', 'ack']],
  86. /* 发送消息 */
  87. [['queue'], 'trim', 'on' => ['send', 'batch_send']],
  88. [['queue', 'message'], 'required', 'on' => ['send', 'batch_send']],
  89. ['message', 'validateArray', 'on' => ['send', 'batch_send']],
  90. /* 获取消息列表 */
  91. ['count', 'integer', 'min' => 1, 'max' => 1000, 'message' => 1205,
  92. 'tooSmall' => 1206, 'tooBig' => 1207, 'on' => ['message_list']],
  93. ['count', 'default', 'value' => 20, 'on' => ['message_list']],
  94. ['requeue', 'boolean', 'message' => 1209, 'on' => ['message_list']],
  95. ['requeue', 'default', 'value' => true, 'on' => ['message_list']],
  96. ['encoding', 'in', 'range' => ['auto', 'base64'], 'message' => 1210, 'on' => ['message_list']],
  97. ['encoding', 'default', 'value' => 'auto', 'on' => ['message_list']],
  98. /* 消费 */
  99. [['name', 'count'], 'required', 'on' => ['consume']],
  100. ['count', 'integer', 'min' => 1, 'max' => 65000, 'message' => 1205,
  101. 'tooSmall' => 1206, 'on' => ['consume']],
  102. /* delete & ack */
  103. [['mids', 'name'], 'required', 'on' => ['delete', 'ack']],
  104. ['mids', 'validateArray', 'on' => ['delete', 'ack']],
  105. ['name', 'string', 'on' => ['delete', 'ack']],
  106. ['forced', 'default', 'value' => false, 'on' => 'delete'],
  107. ['forced', 'boolean', 'on' => 'delete'],
  108. ];
  109. }
  110. public function validateArray($attribute)
  111. {
  112. if (!$this->$attribute || !is_array($this->$attribute))
  113. throw new Exception(1003, "{$attribute} 必须是数组");
  114. }
  115. /**
  116. * [发送消息]
  117. * @author: libingke
  118. * @return array
  119. * @throws Exception
  120. * @version 1.1
  121. */
  122. public function sendMessage()
  123. {
  124. try {
  125. $connect = $this->getConnect();
  126. $channel = $connect->channel();
  127. $this->_handleMessage($this->message);
  128. //预声明
  129. $channel->queue_declare($this->queue,
  130. false, true, false, false);
  131. $channel->basic_publish($this->_message, '', $this->queue);
  132. //获取返回结果
  133. list($q_name, $message_count, ) = $channel->queue_declare($this->queue,
  134. false, true, false, false);
  135. $data = [
  136. 'message_total' => $message_count,
  137. 'queue_name' => $q_name,
  138. 'message_add' => [
  139. 'mid' => $this->_mid,
  140. 'body' => $this->_body
  141. ]
  142. ];
  143. if (($get = $channel->basic_get($q_name)) !== null) {
  144. $data['basic_get'] = [
  145. 'mid' => $get->get('message_id'),
  146. 'body' => $get->body
  147. ];
  148. }
  149. $channel->close();
  150. $connect->close();
  151. Redis::set($q_name, $this->_mid, 'status', AmqpConfig::STATUS_SEND_OK);
  152. return $data;
  153. } catch (\Exception $e) {
  154. throw new Exception(1001, $e->getMessage());
  155. }
  156. }
  157. /**
  158. * [构造消息体]
  159. * @param $data
  160. */
  161. private function _handleMessage($data)
  162. {
  163. $this->_mid = KeyHelper::getUniqueId('message_send');
  164. $this->_body = call_user_func_array([$this, '_messageBody'], [$data]);
  165. $properties = [
  166. 'message_id' => $this->_mid,
  167. 'correlation_id'=> $this->_mid,
  168. 'consumer_tag' => $this->_mid
  169. ];
  170. $this->_message = new AMQPMessage($this->_body, $properties);
  171. }
  172. private function _messageBody($data)
  173. {
  174. return json_encode($data);
  175. }
  176. /**
  177. * [批量发送消息]
  178. * @author: libingke
  179. * @return array
  180. * @throws Exception
  181. */
  182. public function batchSendMessage()
  183. {
  184. try {
  185. $connect = $this->getConnect();
  186. $channel = $connect->channel();
  187. //预声明
  188. $channel->queue_declare($this->queue,
  189. false, true, false, false);
  190. foreach ($this->message as $k => $v) {
  191. $this->_handleMessage($v);
  192. $this->_rows[] = [
  193. 'mid' => $this->_mid,
  194. 'body' => $this->_body
  195. ];
  196. $channel->batch_basic_publish($this->_message, '', $this->queue);
  197. Redis::set($this->queue, $this->_mid, 'status', AmqpConfig::STATUS_SEND_OK);
  198. }
  199. $channel->publish_batch();
  200. //获取返回结果
  201. list($q_name, $message_count, ) = $channel->queue_declare($this->queue,
  202. false, true, false, false);
  203. $data = [
  204. 'message_total' => $message_count,
  205. 'queue_name' => $q_name,
  206. 'add_count' => count($this->_rows),
  207. 'add_rows' => $this->_rows
  208. ];
  209. if (($get = $channel->basic_get($q_name)) !== null) {
  210. $data['basic_get'] = [
  211. 'mid' => $get->get('message_id'),
  212. 'body' => $get->body
  213. ];
  214. }
  215. $channel->close();
  216. $connect->close();
  217. return $data;
  218. } catch (\Exception $e) {
  219. throw new Exception(1001, $e->getMessage());
  220. }
  221. }
  222. /**
  223. * [获取消息列表]
  224. * @author: libingke
  225. */
  226. public function getMessageList()
  227. {
  228. $authStr = Yii::$app->Amqp->user . ':' . Yii::$app->Amqp->pass;
  229. $vhost = urlencode(Yii::$app->Amqp->vhost);
  230. $url = Yii::$app->Amqp->host . ':' . Yii::$app->Amqp->api_port .
  231. "/api/queues/{$vhost}/" . $this->name . "/get";
  232. $postParams = [
  233. 'name' => $this->name,
  234. 'count' => $this->count,
  235. 'encoding' => $this->encoding,
  236. 'requeue' => $this->requeue,
  237. 'truncate' => "50000",
  238. 'vhost' => '/',
  239. ];
  240. $curl = new Curl();
  241. $curl->setOption(CURLOPT_USERPWD, $authStr);
  242. $curl->setRawPostData(json_encode($postParams));
  243. $result = json_decode($curl->post($url), true);
  244. if ($curl->responseCode != 200)
  245. throw new Exception(1002);
  246. if ($curl->errorText)
  247. throw new Exception(1002, $curl->errorText);
  248. if (isset($result['error']) && is_string($result['error']))
  249. throw new Exception(1002, $result['error']);
  250. ArrayHelper::multisort($result,'message_count',SORT_ASC);
  251. //print_r($result);exit();
  252. $rows = [];
  253. foreach ($result as $k => $v) {
  254. $rows[$k]['mid'] = $v['properties']['message_id'];
  255. $rows[$k]['body'] = $v['payload'];
  256. $rows[$k]['before'] = $v['message_count'];
  257. }
  258. unset($result);
  259. return ['count' => count($rows), 'rows' => $rows];
  260. }
  261. /**
  262. * consumeMessage
  263. * @author: libingke
  264. * @return array
  265. * @throws Exception
  266. */
  267. public function consumeMessage()
  268. {
  269. $q_name = $this->name;
  270. $connect = $this->getConnect();
  271. $channel = $connect->channel();
  272. $channel->queue_declare($q_name,
  273. false, true, false, false);
  274. list(, $total, ) = $channel->queue_declare($q_name,
  275. false, true, false, false);
  276. if ($total == 0)
  277. throw new Exception(1300);
  278. try {
  279. $min = min($this->count, $total);
  280. $callback = function ($msg) use($q_name) {call_user_func_array([$this, 'closureConsume'], [$msg, $q_name]);};
  281. $channel->basic_qos(0, $min, null);
  282. $channel->basic_consume($q_name,
  283. '', false, false, false, false, $callback);
  284. for ($i = 1; count($channel->callbacks) && $this->_stop !== true; $i++) {
  285. if ($i > $min)
  286. break;
  287. $channel->wait();
  288. }
  289. $channel->close();
  290. $connect->close();
  291. } catch (\Exception $e) {
  292. throw new Exception(1001, $e->getMessage());
  293. }
  294. return $this->_result;
  295. }
  296. /**
  297. * closureConsume for consumeMessage
  298. * @param $msg
  299. * @param $queue
  300. */
  301. protected function closureConsume($msg, $queue)
  302. {
  303. $data = ['mid' => '', 'body' => '', 'error' => ''];
  304. try {
  305. $data['mid'] = $msg->get('message_id');
  306. $data['body'] = $msg->body;
  307. Redis::set($queue, $data['mid'], 'status', AmqpConfig::STATUS_HAND);
  308. } catch (\Exception $e) {
  309. $data['error'] = $e->getMessage();
  310. } finally {
  311. $this->_result[] = $data;
  312. }
  313. }
  314. /**
  315. * ackMessage
  316. * @author: libingke
  317. * @return array
  318. * @throws Exception
  319. */
  320. public function ackMessage()
  321. {
  322. //帅选合法待应答消息id
  323. foreach ($this->mids as $mid)
  324. if ($mid && is_string($mid))
  325. $this->_rows[$mid] = $mid;
  326. if (!is_array($this->_rows) || count($this->_rows) == 0)
  327. throw new Exception(1301);
  328. $ack = $this->_rows;
  329. $q_name = $this->name;
  330. $connect = $this->getConnect();
  331. $channel = $connect->channel();
  332. $channel->queue_declare($q_name,
  333. false, true, false, false);
  334. list(, $total, ) = $channel->queue_declare($q_name,
  335. false, true, false, false);
  336. try {
  337. $callback = function ($msg) use($q_name) {
  338. call_user_func_array([$this, 'closureAck'], [$msg, $q_name, AmqpConfig::STATUS_HAND_OK]);
  339. };
  340. $channel->basic_qos(0, $total, null);
  341. $channel->basic_consume($q_name,
  342. '', false, false, false, false, $callback);
  343. for ($i = 1; count($channel->callbacks) && $this->_stop !== true; $i++) {
  344. if ($i > $total)
  345. break;
  346. $channel->wait();
  347. }
  348. $channel->close();
  349. $connect->close();
  350. } catch (\Exception $e) {
  351. throw new Exception(1101, $e->getMessage());
  352. }
  353. $ackCount = count($ack) - count($this->_rows);
  354. $data = ['queue' => $q_name, 'ack_count' => $ackCount];
  355. count($this->_rows) ? $data['ack_fail'] = array_keys($this->_rows) : null;
  356. return $data;
  357. }
  358. /**
  359. * deleteMessage
  360. * @author: libingke
  361. * @return array
  362. * @throws Exception
  363. */
  364. public function deleteMessage()
  365. {
  366. //帅选合法待应答消息id
  367. foreach ($this->mids as $mid)
  368. if ($mid && is_string($mid))
  369. $this->_rows[$mid] = $mid;
  370. if (!is_array($this->_rows) || count($this->_rows) == 0)
  371. throw new Exception(1301);
  372. $delete = $this->_rows;
  373. Redis::batchDel($this->name, $delete, 'status');
  374. $q_name = $this->name;
  375. $connect = $this->getConnect();
  376. $channel = $connect->channel();
  377. $channel->queue_declare($q_name,
  378. false, true, false, false);
  379. list(, $total, ) = $channel->queue_declare($q_name,
  380. false, true, false, false);
  381. try {
  382. $callback = function ($msg) use($q_name) {call_user_func_array([$this, 'closureAck'], [$msg]);};
  383. $channel->basic_qos(0, $total, null);
  384. $channel->basic_consume($q_name,
  385. '', false, false, false, false, $callback);
  386. for ($i = 1; count($channel->callbacks) && $this->_stop !== true; $i++) {
  387. if ($i > $total)
  388. break;
  389. $channel->wait();
  390. }
  391. $channel->close();
  392. $connect->close();
  393. } catch (\Exception $e) {
  394. throw new Exception(1101, $e->getMessage());
  395. }
  396. $deleteCount = count($delete) - count($this->_rows);
  397. $data = ['queue' => $q_name, 'delete_count' => $deleteCount];
  398. count($this->_rows) ? $data['delete_fail'] = array_keys($this->_rows) : null;
  399. return $data;
  400. }
  401. /**
  402. * closureAck for ackMessage && deleteMessage
  403. * @author: libingke
  404. * @param $msg
  405. * @param $queue
  406. * @param bool $status
  407. */
  408. protected function closureAck($msg, $queue = '', $status = false)
  409. {
  410. try {
  411. $mid = $msg->get('message_id');
  412. if (in_array($mid, $this->_rows)) {
  413. unset($this->_rows[$mid]);
  414. $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
  415. if ($status != false && $queue != '')
  416. Redis::set($queue, $mid, 'status', $status);
  417. }
  418. } finally {
  419. if (count($this->_rows) == 0)
  420. $this->_stop = true;
  421. }
  422. }
  423. /**
  424. * [清空消息]
  425. * @author: libingke
  426. * @return array
  427. */
  428. public function purge()
  429. {
  430. try {
  431. $connect = $this->getConnect();
  432. $channel = $connect->channel();
  433. $delete = $channel->queue_purge($this->name);
  434. if ($delete > 0)
  435. Redis::purge($this->name);
  436. $channel->close();
  437. $connect->close();
  438. return [
  439. 'queue' => $this->name,
  440. 'count' => $delete
  441. ];
  442. } catch (\Exception $e) {
  443. throw new Exception(1001, $e->getMessage());
  444. }
  445. }
  446. }