MessageForm.php 14 KB


  1. <?php
  2. namespace backend\forms;
  3. use common\logic\LoginHandle;
  4. use PhpAmqpLib\Message\AMQPMessage;
  5. use components\service\AmqpConfig;
  6. use components\service\Redis;
  7. use components\Exception;
  8. use components\Curl;
  9. use common\helpers\KeyHelper;
  10. use yii\helpers\ArrayHelper;
  11. use Yii;
  12. class MessageForm extends BaseForm
  13. {
  14. /**
  15. * @var
  16. */
  17. public $queue;
  18. /**
  19. * @var
  20. */
  21. public $message;
  22. /**
  23. * @var
  24. */
  25. public $name;
  26. /**
  27. * @var integer 数量
  28. */
  29. public $count;
  30. /**
  31. * @var bool 是否重新排序
  32. */
  33. public $requeue;
  34. /**
  35. * @var string 编码
  36. */
  37. public $encoding;
  38. /**
  39. * @var string 消费id
  40. */
  41. public $mid;
  42. /**
  43. * @var array 消费ids
  44. */
  45. public $mids;
  46. /**
  47. * @var string 消费类型
  48. */
  49. public $type;
  50. /**
  51. * @var bool 自动应答
  52. */
  53. public $ack;
  54. /**
  55. * @var bool 强制删除
  56. */
  57. public $forced;
  58. /**
  59. * @var bool
  60. */
  61. private $_stop = false;
  62. /**
  63. * @var string
  64. */
  65. private $_mid;
  66. /**
  67. * @var string
  68. */
  69. private $_body;
  70. /**
  71. * @var AMQPMessage
  72. */
  73. private $_message;
  74. /**
  75. * @var array
  76. */
  77. private $_rows;
  78. /**
  79. * @var array
  80. */
  81. private $_result;
  82. public function rules()
  83. {
  84. return [
  85. [['name'], 'required', 'on' => ['message_list', 'purge']],
  86. [['name'], 'trim', 'on' => ['message_list', 'purge', 'consume', 'delete', 'ack']],
  87. /* 发送消息 */
  88. [['queue'], 'trim', 'on' => ['send', 'batch_send']],
  89. [['queue', 'message'], 'required', 'on' => ['send', 'batch_send']],
  90. ['message', 'validateArray', 'on' => ['send', 'batch_send']],
  91. /* 获取消息列表 */
  92. ['count', 'integer', 'min' => 1, 'max' => 1000, 'message' => 1205,
  93. 'tooSmall' => 1206, 'tooBig' => 1207, 'on' => ['message_list']],
  94. ['count', 'default', 'value' => 20, 'on' => ['message_list']],
  95. ['requeue', 'boolean', 'message' => 1209, 'on' => ['message_list']],
  96. ['requeue', 'default', 'value' => true, 'on' => ['message_list']],
  97. ['encoding', 'in', 'range' => ['auto', 'base64'], 'message' => 1210, 'on' => ['message_list']],
  98. ['encoding', 'default', 'value' => 'auto', 'on' => ['message_list']],
  99. /* 消费 */
  100. [['name', 'count'], 'required', 'on' => ['consume']],
  101. ['count', 'integer', 'min' => 1, 'max' => 65000, 'message' => 1205,
  102. 'tooSmall' => 1206, 'on' => ['consume']],
  103. ['type', 'default', 'value' => 'client', 'on' => 'consume'],
  104. ['type', 'in', 'range' => ['server', 'client'], 'message' => 1302, 'on' => 'consume'],
  105. /* delete & ack */
  106. [['mids', 'name'], 'required', 'on' => ['delete', 'ack']],
  107. ['mids', 'validateArray', 'on' => ['delete', 'ack']],
  108. ['name', 'string', 'on' => ['delete', 'ack']],
  109. ['forced', 'default', 'value' => false, 'on' => 'delete'],
  110. ['forced', 'boolean', 'on' => 'delete'],
  111. ];
  112. }
  113. public function validateArray($attribute)
  114. {
  115. if (!$this->$attribute || !is_array($this->$attribute))
  116. throw new Exception(1003, "{$attribute} 必须是数组");
  117. }
  118. /**
  119. * [发送消息]
  120. * @author: libingke
  121. * @return array
  122. * @throws Exception
  123. * @version 1.1
  124. */
  125. public function sendMessage()
  126. {
  127. try {
  128. $connect = $this->getConnect();
  129. $channel = $connect->channel();
  130. $this->_handleMessage($this->message);
  131. //预声明
  132. $channel->queue_declare($this->queue,
  133. false, true, false, false);
  134. $channel->basic_publish($this->_message, '', $this->queue);
  135. //获取返回结果
  136. list($q_name, $message_count, ) = $channel->queue_declare($this->queue,
  137. false, true, false, false);
  138. $data = [
  139. 'message_total' => $message_count,
  140. 'queue_name' => $q_name,
  141. 'message_add' => [
  142. 'mid' => $this->_mid,
  143. 'body' => $this->_body
  144. ]
  145. ];
  146. if (($get = $channel->basic_get($q_name)) !== null) {
  147. $data['basic_get'] = [
  148. 'mid' => $get->get('message_id'),
  149. 'body' => $get->body
  150. ];
  151. }
  152. $channel->close();
  153. $connect->close();
  154. Redis::set($q_name, $this->_mid, 'status', AmqpConfig::STATUS_SEND_OK);
  155. return $data;
  156. } catch (\Exception $e) {
  157. throw new Exception(1001, $e->getMessage());
  158. }
  159. }
  160. /**
  161. * [构造消息体]
  162. * @param $data
  163. */
  164. private function _handleMessage($data)
  165. {
  166. $this->_mid = KeyHelper::getUniqueId('message_send');
  167. $this->_body = call_user_func_array([$this, '_messageBody'], [$data]);
  168. $properties = [
  169. 'message_id' => $this->_mid,
  170. 'correlation_id'=> $this->_mid,
  171. 'consumer_tag' => $this->_mid
  172. ];
  173. $this->_message = new AMQPMessage($this->_body, $properties);
  174. }
  175. private function _messageBody($data)
  176. {
  177. return json_encode($data);
  178. }
  179. /**
  180. * [批量发送消息]
  181. * @author: libingke
  182. * @return array
  183. * @throws Exception
  184. */
  185. public function batchSendMessage()
  186. {
  187. try {
  188. $connect = $this->getConnect();
  189. $channel = $connect->channel();
  190. //预声明
  191. $channel->queue_declare($this->queue,
  192. false, true, false, false);
  193. foreach ($this->message as $k => $v) {
  194. $this->_handleMessage($v);
  195. $this->_rows[] = [
  196. 'mid' => $this->_mid,
  197. 'body' => $this->_body
  198. ];
  199. $channel->batch_basic_publish($this->_message, '', $this->queue);
  200. Redis::set($this->queue, $this->_mid, 'status', AmqpConfig::STATUS_SEND_OK);
  201. }
  202. $channel->publish_batch();
  203. //获取返回结果
  204. list($q_name, $message_count, ) = $channel->queue_declare($this->queue,
  205. false, true, false, false);
  206. $data = [
  207. 'message_total' => $message_count,
  208. 'queue_name' => $q_name,
  209. 'add_count' => count($this->_rows),
  210. 'add_rows' => $this->_rows
  211. ];
  212. if (($get = $channel->basic_get($q_name)) !== null) {
  213. $data['basic_get'] = [
  214. 'mid' => $get->get('message_id'),
  215. 'body' => $get->body
  216. ];
  217. }
  218. $channel->close();
  219. $connect->close();
  220. return $data;
  221. } catch (\Exception $e) {
  222. throw new Exception(1001, $e->getMessage());
  223. }
  224. }
  225. /**
  226. * [获取消息列表]
  227. * @author: libingke
  228. */
  229. public function getMessageList()
  230. {
  231. $authStr = Yii::$app->Amqp->user . ':' . Yii::$app->Amqp->pass;
  232. $vhost = urlencode(Yii::$app->Amqp->vhost);
  233. $url = Yii::$app->Amqp->host . ':' . Yii::$app->Amqp->api_port .
  234. "/api/queues/{$vhost}/" . $this->name . "/get";
  235. $postParams = [
  236. 'name' => $this->name,
  237. 'count' => $this->count,
  238. 'encoding' => $this->encoding,
  239. 'requeue' => $this->requeue,
  240. 'truncate' => "50000",
  241. 'vhost' => '/',
  242. ];
  243. $curl = new Curl();
  244. $curl->setOption(CURLOPT_USERPWD, $authStr);
  245. $curl->setRawPostData(json_encode($postParams));
  246. $result = json_decode($curl->post($url), true);
  247. if ($curl->responseCode != 200)
  248. throw new Exception(1002);
  249. if ($curl->errorText)
  250. throw new Exception(1002, $curl->errorText);
  251. if (isset($result['error']) && is_string($result['error']))
  252. throw new Exception(1002, $result['error']);
  253. ArrayHelper::multisort($result,'message_count',SORT_ASC);
  254. //print_r($result);exit();
  255. $rows = [];
  256. foreach ($result as $k => $v) {
  257. $rows[$k]['mid'] = $v['properties']['message_id'];
  258. $rows[$k]['body'] = $v['payload'];
  259. $rows[$k]['before'] = $v['message_count'];
  260. }
  261. unset($result);
  262. return ['count' => count($rows), 'rows' => $rows];
  263. }
  264. /**
  265. * consumeMessage
  266. * @author: libingke
  267. * @return array
  268. * @throws Exception
  269. */
  270. public function consumeMessage()
  271. {
  272. $q_name = $this->name;
  273. if ($this->type == 'server') {
  274. $function = '_closure' . ucfirst($q_name);
  275. if ( !method_exists($this, $function) )
  276. throw new Exception(1303);
  277. } else {
  278. $function = 'closureConsume';
  279. }
  280. $connect = $this->getConnect();
  281. $channel = $connect->channel();
  282. $channel->queue_declare($q_name,
  283. false, true, false, false);
  284. list(, $total, ) = $channel->queue_declare($q_name,
  285. false, true, false, false);
  286. if ($total == 0)
  287. throw new Exception(1300);
  288. try {
  289. $min = min($this->count, $total);
  290. $callback = function ($msg) use($q_name, $function) {call_user_func_array([$this, $function], [$msg, $q_name]);};
  291. $channel->basic_qos(0, $min, null);
  292. $channel->basic_consume($q_name,
  293. '', false, false, false, false, $callback);
  294. for ($i = 1; count($channel->callbacks) && $this->_stop !== true; $i++) {
  295. if ($i > $min)
  296. break;
  297. $channel->wait();
  298. }
  299. $channel->close();
  300. $connect->close();
  301. } catch (\Exception $e) {
  302. throw new Exception(1001, $e->getMessage());
  303. }
  304. return $this->_result;
  305. }
  306. /**
  307. * closureConsume for consumeMessage
  308. * @param $msg
  309. * @param $queue
  310. */
  311. protected function closureConsume($msg, $queue)
  312. {
  313. $data = ['mid' => '', 'body' => '', 'error' => ''];
  314. try {
  315. $data['mid'] = $msg->get('message_id');
  316. $data['body'] = $msg->body;
  317. Redis::set($queue, $data['mid'], 'status', AmqpConfig::STATUS_HAND);
  318. } catch (\Exception $e) {
  319. $data['error'] = $e->getMessage();
  320. } finally {
  321. $this->_result[] = $data;
  322. }
  323. }
  324. /**
  325. * ackMessage
  326. * @author: libingke
  327. * @return array
  328. * @throws Exception
  329. */
  330. public function ackMessage()
  331. {
  332. //帅选合法待应答消息id
  333. foreach ($this->mids as $mid)
  334. if ($mid && is_string($mid))
  335. $this->_rows[$mid] = $mid;
  336. if (!is_array($this->_rows) || count($this->_rows) == 0)
  337. throw new Exception(1301);
  338. $ack = $this->_rows;
  339. $q_name = $this->name;
  340. $connect = $this->getConnect();
  341. $channel = $connect->channel();
  342. $channel->queue_declare($q_name,
  343. false, true, false, false);
  344. list(, $total, ) = $channel->queue_declare($q_name,
  345. false, true, false, false);
  346. try {
  347. $callback = function ($msg) use($q_name) {
  348. call_user_func_array([$this, 'closureAck'], [$msg, $q_name, AmqpConfig::STATUS_HAND_OK, true]);
  349. };
  350. $channel->basic_qos(0, $total, null);
  351. $channel->basic_consume($q_name,
  352. '', false, false, false, false, $callback);
  353. for ($i = 1; count($channel->callbacks) && $this->_stop !== true; $i++) {
  354. if ($i > $total)
  355. break;
  356. $channel->wait();
  357. }
  358. $channel->close();
  359. $connect->close();
  360. } catch (\Exception $e) {
  361. throw new Exception(1101, $e->getMessage());
  362. }
  363. $ackCount = count($ack) - count($this->_rows);
  364. $data = ['queue' => $q_name, 'ack_count' => $ackCount];
  365. count($this->_rows) ? $data['ack_fail'] = array_keys($this->_rows) : null;
  366. return $data;
  367. }
  368. /**
  369. * deleteMessage
  370. * @author: libingke
  371. * @return array
  372. * @throws Exception
  373. */
  374. public function deleteMessage()
  375. {
  376. //帅选合法待应答消息id
  377. foreach ($this->mids as $mid)
  378. if ($mid && is_string($mid))
  379. $this->_rows[$mid] = $mid;
  380. if (!is_array($this->_rows) || count($this->_rows) == 0)
  381. throw new Exception(1301);
  382. $delete = $this->_rows;
  383. Redis::batchDel($this->name, $delete, 'status');
  384. $q_name = $this->name;
  385. $connect = $this->getConnect();
  386. $channel = $connect->channel();
  387. $channel->queue_declare($q_name,
  388. false, true, false, false);
  389. list(, $total, ) = $channel->queue_declare($q_name,
  390. false, true, false, false);
  391. try {
  392. $callback = function ($msg) use($q_name) {call_user_func_array([$this, 'closureAck'], [$msg]);};
  393. $channel->basic_qos(0, $total, null);
  394. $channel->basic_consume($q_name,
  395. '', false, false, false, false, $callback);
  396. for ($i = 1; count($channel->callbacks) && $this->_stop !== true; $i++) {
  397. if ($i > $total)
  398. break;
  399. $channel->wait();
  400. }
  401. $channel->close();
  402. $connect->close();
  403. } catch (\Exception $e) {
  404. throw new Exception(1101, $e->getMessage());
  405. }
  406. $deleteCount = count($delete) - count($this->_rows);
  407. $data = ['queue' => $q_name, 'delete_count' => $deleteCount];
  408. count($this->_rows) ? $data['delete_fail'] = array_keys($this->_rows) : null;
  409. return $data;
  410. }
  411. /**
  412. * closureAck for ackMessage && deleteMessage
  413. * @author: libingke
  414. * @param $msg
  415. * @param string $queue
  416. * @param bool $status
  417. * @param bool $check
  418. */
  419. protected function closureAck($msg, $queue = '', $status = false, $check = false)
  420. {
  421. try {
  422. $mid = $msg->get('message_id');
  423. if (in_array($mid, $this->_rows)) {
  424. if ($check == true && Redis::get($queue, $mid, 'status') != AmqpConfig::STATUS_HAND) {
  425. goto end;
  426. }
  427. unset($this->_rows[$mid]);
  428. $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
  429. if ($status != false && $queue != '')
  430. Redis::set($queue, $mid, 'status', $status);
  431. }
  432. end:
  433. } finally {
  434. if (count($this->_rows) == 0)
  435. $this->_stop = true;
  436. }
  437. }
  438. /**
  439. * [清空消息]
  440. * @author: libingke
  441. * @return array
  442. */
  443. public function purge()
  444. {
  445. try {
  446. $connect = $this->getConnect();
  447. $channel = $connect->channel();
  448. $delete = $channel->queue_purge($this->name);
  449. if ($delete > 0)
  450. Redis::purge($this->name);
  451. $channel->close();
  452. $connect->close();
  453. return [
  454. 'queue' => $this->name,
  455. 'count' => $delete
  456. ];
  457. } catch (\Exception $e) {
  458. throw new Exception(1001, $e->getMessage());
  459. }
  460. }
  461. private function _closureLogin($msg, $queue)
  462. {
  463. $data = ['mid' => '', 'body' => '', 'response' => '', 'error' => ''];
  464. try {
  465. $data['mid'] = $msg->get('message_id');
  466. $data['body'] = $msg->body;
  467. if ($data['mid']) {
  468. $handle = new LoginHandle();
  469. $data['response'] = $handle->login($msg->body, $queue, $data['mid']);
  470. $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
  471. Redis::set($queue, $data['mid'], 'status', AmqpConfig::STATUS_HAND_OK);
  472. Redis::set($queue, $data['mid'], 'result', $data['response']);
  473. Redis::expire($queue, $data['mid'], 'result', 3600);
  474. }
  475. } catch (\Exception $e) {
  476. $data['error'] = $e->getMessage();
  477. } finally {
  478. $this->_result[] = $data;
  479. }
  480. }
  481. }