MessageForm.php 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619
  1. <?php
  2. namespace backend\forms;
  3. use common\logic\LoginHandle;
  4. use PhpAmqpLib\Message\AMQPMessage;
  5. use components\service\AmqpConfig;
  6. use components\service\Redis;
  7. use components\Exception;
  8. use components\Curl;
  9. use common\helpers\KeyHelper;
  10. use yii\helpers\ArrayHelper;
  11. use Yii;
  12. class MessageForm extends BaseForm
  13. {
  14. /**
  15. * @var
  16. */
  17. public $queue;
  18. /**
  19. * @var
  20. */
  21. public $message;
  22. /**
  23. * @var
  24. */
  25. public $name;
  26. /**
  27. * @var 主题 交换机
  28. */
  29. public $topicName;
  30. /**
  31. * @var 路由
  32. */
  33. public $routingKey;
  34. /**
  35. * @var integer 数量
  36. */
  37. public $count;
  38. /**
  39. * @var bool 是否重新排序
  40. */
  41. public $requeue;
  42. /**
  43. * @var string 编码
  44. */
  45. public $encoding;
  46. /**
  47. * @var string 消费id
  48. */
  49. public $mid;
  50. /**
  51. * @var array 消费ids
  52. */
  53. public $mids;
  54. /**
  55. * @var string 消费类型
  56. */
  57. public $type;
  58. /**
  59. * @var bool 自动应答
  60. */
  61. public $ack;
  62. /**
  63. * @var bool 强制删除
  64. */
  65. public $forced;
  66. /**
  67. * @var bool
  68. */
  69. private $_stop = false;
  70. /**
  71. * @var string
  72. */
  73. private $_mid;
  74. /**
  75. * @var string
  76. */
  77. private $_body;
  78. /**
  79. * @var AMQPMessage
  80. */
  81. private $_message;
  82. /**
  83. * @var array
  84. */
  85. private $_rows;
  86. /**
  87. * @var array
  88. */
  89. private $_result;
  90. public function rules()
  91. {
  92. return [
  93. [['name'], 'required', 'on' => ['message_list', 'purge']],
  94. [['name'], 'trim', 'on' => ['message_list', 'purge', 'consume', 'delete', 'ack']],
  95. /* 发送消息 */
  96. [['queue'], 'trim', 'on' => ['send', 'batch_send']],
  97. [['queue', 'message'], 'required', 'on' => ['send', 'batch_send']],
  98. ['message', 'validateArray', 'on' => ['send', 'batch_send']],
  99. /* 发送到订阅者 */
  100. [['topicName'], 'trim', 'on' => ['sendtopic']],
  101. [['topicName', 'message'], 'required', 'on' => ['sendtopic']],
  102. ['message', 'validateArray', 'on' => ['sendtopic']],
  103. /* 获取消息列表 */
  104. ['count', 'integer', 'min' => 1, 'max' => 1000, 'message' => 1205,
  105. 'tooSmall' => 1206, 'tooBig' => 1207, 'on' => ['message_list']],
  106. ['count', 'default', 'value' => 20, 'on' => ['message_list']],
  107. ['requeue', 'boolean', 'message' => 1209, 'on' => ['message_list']],
  108. ['requeue', 'default', 'value' => true, 'on' => ['message_list']],
  109. ['encoding', 'in', 'range' => ['auto', 'base64'], 'message' => 1210, 'on' => ['message_list']],
  110. ['encoding', 'default', 'value' => 'auto', 'on' => ['message_list']],
  111. /* 消费 */
  112. [['name', 'count'], 'required', 'on' => ['consume']],
  113. ['count', 'integer', 'min' => 1, 'max' => 65000, 'message' => 1205,
  114. 'tooSmall' => 1206, 'on' => ['consume']],
  115. ['type', 'default', 'value' => 'client', 'on' => 'consume'],
  116. ['type', 'in', 'range' => ['server', 'client'], 'message' => 1302, 'on' => 'consume'],
  117. /* delete & ack */
  118. [['mids', 'name'], 'required', 'on' => ['delete', 'ack']],
  119. ['mids', 'validateArray', 'on' => ['delete', 'ack']],
  120. ['name', 'string', 'on' => ['delete', 'ack']],
  121. ['forced', 'default', 'value' => false, 'on' => 'delete'],
  122. ['forced', 'boolean', 'on' => 'delete'],
  123. ];
  124. }
  125. public function validateArray($attribute)
  126. {
  127. if (!$this->$attribute || !is_array($this->$attribute))
  128. throw new Exception(1003, "{$attribute} 必须是数组");
  129. }
  130. /**
  131. * [发送消息]
  132. * @author: libingke
  133. * @return array
  134. * @throws Exception
  135. * @version 1.1
  136. */
  137. public function sendMessage()
  138. {
  139. try {
  140. $connect = $this->getConnect();
  141. $channel = $connect->channel();
  142. $this->_handleMessage($this->message);
  143. //预声明
  144. $channel->queue_declare($this->queue,
  145. false, true, false, false);
  146. $channel->basic_publish($this->_message, '', $this->queue);
  147. //获取返回结果
  148. list($q_name, $message_count, ) = $channel->queue_declare($this->queue,
  149. false, true, false, false);
  150. $data = [
  151. 'message_total' => $message_count,
  152. 'queue_name' => $q_name,
  153. 'message_add' => [
  154. 'mid' => $this->_mid,
  155. 'body' => $this->_body
  156. ]
  157. ];
  158. if (($get = $channel->basic_get($q_name)) !== null) {
  159. $data['basic_get'] = [
  160. 'mid' => $get->get('message_id'),
  161. 'body' => $get->body
  162. ];
  163. }
  164. $channel->close();
  165. $connect->close();
  166. Redis::set($q_name, $this->_mid, 'status', AmqpConfig::STATUS_SEND_OK);
  167. return $data;
  168. } catch (\Exception $e) {
  169. throw new Exception(1001, $e->getMessage());
  170. }
  171. }
  172. public function sendMessageTopic()
  173. {
  174. try {
  175. $connect = $this->getConnect();
  176. $channel = $connect->channel();
  177. $this->_handleMessage($this->message);
  178. //发送信息到某个主题 $exchange
  179. $channel->exchange_declare($this->topicName, 'topic', false, true, false);
  180. //消息发送到
  181. $channel->basic_publish($this->_message, is_null($this->topicName)?'':$this->topicName, is_null($this->routingKey)?'*':$this->routingKey);
  182. $data = [
  183. 'topicName' => is_null($this->topicName)?'*':$this->topicName,
  184. 'routingKey' => is_null($this->routingKey)?'*':$this->routingKey,
  185. 'message_add' => [
  186. 'mid' => $this->_mid,
  187. 'body' => $this->_body
  188. ]
  189. ];
  190. $channel->close();
  191. $connect->close();
  192. //订阅的不需要标记
  193. // Redis::set($q_name, $this->_mid, 'status', AmqpConfig::STATUS_SEND_OK);
  194. return $data;
  195. } catch (\Exception $e) {
  196. throw new Exception(1001, $e->getMessage());
  197. }
  198. }
  199. /**
  200. * [构造消息体]
  201. * @param $data
  202. */
  203. private function _handleMessage($data)
  204. {
  205. $this->_mid = KeyHelper::getUniqueId('message_send');
  206. $this->_body = call_user_func_array([$this, '_messageBody'], [$data]);
  207. $properties = [
  208. 'message_id' => $this->_mid,
  209. 'correlation_id'=> $this->_mid,
  210. 'consumer_tag' => $this->_mid
  211. ];
  212. $this->_message = new AMQPMessage($this->_body, $properties);
  213. }
  214. private function _messageBody($data)
  215. {
  216. return json_encode($data);
  217. }
  218. /**
  219. * [批量发送消息]
  220. * @author: libingke
  221. * @return array
  222. * @throws Exception
  223. */
  224. public function batchSendMessage()
  225. {
  226. try {
  227. $connect = $this->getConnect();
  228. $channel = $connect->channel();
  229. //预声明
  230. $channel->queue_declare($this->queue,
  231. false, true, false, false);
  232. foreach ($this->message as $k => $v) {
  233. $this->_handleMessage($v);
  234. $this->_rows[] = [
  235. 'mid' => $this->_mid,
  236. 'body' => $this->_body
  237. ];
  238. $channel->batch_basic_publish($this->_message, '', $this->queue);
  239. Redis::set($this->queue, $this->_mid, 'status', AmqpConfig::STATUS_SEND_OK);
  240. }
  241. $channel->publish_batch();
  242. //获取返回结果
  243. list($q_name, $message_count, ) = $channel->queue_declare($this->queue,
  244. false, true, false, false);
  245. $data = [
  246. 'message_total' => $message_count,
  247. 'queue_name' => $q_name,
  248. 'add_count' => count($this->_rows),
  249. 'add_rows' => $this->_rows
  250. ];
  251. if (($get = $channel->basic_get($q_name)) !== null) {
  252. $data['basic_get'] = [
  253. 'mid' => $get->get('message_id'),
  254. 'body' => $get->body
  255. ];
  256. }
  257. $channel->close();
  258. $connect->close();
  259. return $data;
  260. } catch (\Exception $e) {
  261. throw new Exception(1001, $e->getMessage());
  262. }
  263. }
  264. /**
  265. * [获取消息列表]
  266. * @author: libingke
  267. */
  268. public function getMessageList()
  269. {
  270. $authStr = Yii::$app->Amqp->user . ':' . Yii::$app->Amqp->pass;
  271. $vhost = urlencode(Yii::$app->Amqp->vhost);
  272. $url = Yii::$app->Amqp->host . ':' . Yii::$app->Amqp->api_port .
  273. "/api/queues/{$vhost}/" . $this->name . "/get";
  274. $postParams = [
  275. 'name' => $this->name,
  276. 'count' => $this->count,
  277. 'encoding' => $this->encoding,
  278. 'requeue' => $this->requeue,
  279. 'truncate' => "50000",
  280. 'vhost' => '/',
  281. ];
  282. $curl = new Curl();
  283. $curl->setOption(CURLOPT_USERPWD, $authStr);
  284. $curl->setRawPostData(json_encode($postParams));
  285. $result = json_decode($curl->post($url), true);
  286. if ($curl->responseCode != 200)
  287. throw new Exception(1002);
  288. if ($curl->errorText)
  289. throw new Exception(1002, $curl->errorText);
  290. if (isset($result['error']) && is_string($result['error']))
  291. throw new Exception(1002, $result['error']);
  292. ArrayHelper::multisort($result,'message_count',SORT_ASC);
  293. //print_r($result);exit();
  294. $rows = [];
  295. foreach ($result as $k => $v) {
  296. $rows[$k]['mid'] = $v['properties']['message_id'];
  297. $rows[$k]['body'] = $v['payload'];
  298. $rows[$k]['before'] = $v['message_count'];
  299. }
  300. unset($result);
  301. return ['count' => count($rows), 'rows' => $rows];
  302. }
  303. /**
  304. * consumeMessage
  305. * @author: libingke
  306. * @return array
  307. * @throws Exception
  308. */
  309. public function consumeMessage()
  310. {
  311. $q_name = $this->name;
  312. if ($this->type == 'server') {
  313. $function = '_closure' . ucfirst($q_name);
  314. if ( !method_exists($this, $function) )
  315. throw new Exception(1303);
  316. } else {
  317. $function = 'closureConsume';
  318. }
  319. $connect = $this->getConnect();
  320. $channel = $connect->channel();
  321. $channel->queue_declare($q_name,
  322. false, true, false, false);
  323. list(, $total, ) = $channel->queue_declare($q_name,
  324. false, true, false, false);
  325. if ($total == 0)
  326. throw new Exception(1300);
  327. try {
  328. $min = min($this->count, $total);
  329. $callback = function ($msg) use($q_name, $function) {call_user_func_array([$this, $function], [$msg, $q_name]);};
  330. $channel->basic_qos(0, $min, null);
  331. $channel->basic_consume($q_name,
  332. '', false, false, false, false, $callback);
  333. for ($i = 1; count($channel->callbacks) && $this->_stop !== true; $i++) {
  334. if ($i > $min)
  335. break;
  336. $channel->wait();
  337. }
  338. $channel->close();
  339. $connect->close();
  340. } catch (\Exception $e) {
  341. throw new Exception(1001, $e->getMessage());
  342. }
  343. return $this->_result;
  344. }
  345. /**
  346. * closureConsume for consumeMessage
  347. * @param $msg
  348. * @param $queue
  349. */
  350. protected function closureConsume($msg, $queue)
  351. {
  352. $data = ['mid' => '', 'body' => '', 'error' => ''];
  353. try {
  354. $data['mid'] = $msg->get('message_id');
  355. $data['body'] = $msg->body;
  356. Redis::set($queue, $data['mid'], 'status', AmqpConfig::STATUS_HAND);
  357. } catch (\Exception $e) {
  358. $data['error'] = $e->getMessage();
  359. } finally {
  360. $this->_result[] = $data;
  361. }
  362. }
  363. /**
  364. * ackMessage
  365. * @author: libingke
  366. * @return array
  367. * @throws Exception
  368. */
  369. public function ackMessage()
  370. {
  371. //帅选合法待应答消息id
  372. foreach ($this->mids as $mid)
  373. if ($mid && is_string($mid))
  374. $this->_rows[$mid] = $mid;
  375. if (!is_array($this->_rows) || count($this->_rows) == 0)
  376. throw new Exception(1301);
  377. $ack = $this->_rows;
  378. $q_name = $this->name;
  379. $connect = $this->getConnect();
  380. $channel = $connect->channel();
  381. $channel->queue_declare($q_name,
  382. false, true, false, false);
  383. list(, $total, ) = $channel->queue_declare($q_name,
  384. false, true, false, false);
  385. try {
  386. $callback = function ($msg) use($q_name) {
  387. call_user_func_array([$this, 'closureAck'], [$msg, $q_name, AmqpConfig::STATUS_HAND_OK, true]);
  388. };
  389. $channel->basic_qos(0, $total, null);
  390. $channel->basic_consume($q_name,
  391. '', false, false, false, false, $callback);
  392. for ($i = 1; count($channel->callbacks) && $this->_stop !== true; $i++) {
  393. if ($i > $total)
  394. break;
  395. $channel->wait();
  396. }
  397. $channel->close();
  398. $connect->close();
  399. } catch (\Exception $e) {
  400. throw new Exception(1101, $e->getMessage());
  401. }
  402. $ackCount = count($ack) - count($this->_rows);
  403. $data = ['queue' => $q_name, 'ack_count' => $ackCount];
  404. count($this->_rows) ? $data['ack_fail'] = array_keys($this->_rows) : null;
  405. return $data;
  406. }
  407. /**
  408. * deleteMessage
  409. * @author: libingke
  410. * @return array
  411. * @throws Exception
  412. */
  413. public function deleteMessage()
  414. {
  415. //帅选合法待应答消息id
  416. foreach ($this->mids as $mid)
  417. if ($mid && is_string($mid))
  418. $this->_rows[$mid] = $mid;
  419. if (!is_array($this->_rows) || count($this->_rows) == 0)
  420. throw new Exception(1301);
  421. $delete = $this->_rows;
  422. Redis::batchDel($this->name, $delete, 'status');
  423. $q_name = $this->name;
  424. $connect = $this->getConnect();
  425. $channel = $connect->channel();
  426. $channel->queue_declare($q_name,
  427. false, true, false, false);
  428. list(, $total, ) = $channel->queue_declare($q_name,
  429. false, true, false, false);
  430. try {
  431. $callback = function ($msg) use($q_name) {call_user_func_array([$this, 'closureAck'], [$msg]);};
  432. $channel->basic_qos(0, $total, null);
  433. $channel->basic_consume($q_name,
  434. '', false, false, false, false, $callback);
  435. for ($i = 1; count($channel->callbacks) && $this->_stop !== true; $i++) {
  436. if ($i > $total)
  437. break;
  438. $channel->wait();
  439. }
  440. $channel->close();
  441. $connect->close();
  442. } catch (\Exception $e) {
  443. throw new Exception(1101, $e->getMessage());
  444. }
  445. $deleteCount = count($delete) - count($this->_rows);
  446. $data = ['queue' => $q_name, 'delete_count' => $deleteCount];
  447. count($this->_rows) ? $data['delete_fail'] = array_keys($this->_rows) : null;
  448. return $data;
  449. }
  450. /**
  451. * closureAck for ackMessage && deleteMessage
  452. * @author: libingke
  453. * @param $msg
  454. * @param string $queue
  455. * @param bool $status
  456. * @param bool $check
  457. */
  458. protected function closureAck($msg, $queue = '', $status = false, $check = false)
  459. {
  460. try {
  461. $mid = $msg->get('message_id');
  462. if (in_array($mid, $this->_rows)) {
  463. if ($check == true && Redis::get($queue, $mid, 'status') != AmqpConfig::STATUS_HAND) {
  464. goto end;
  465. }
  466. unset($this->_rows[$mid]);
  467. $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
  468. if ($status != false && $queue != '')
  469. Redis::set($queue, $mid, 'status', $status);
  470. }
  471. end:
  472. } finally {
  473. if (count($this->_rows) == 0)
  474. $this->_stop = true;
  475. }
  476. }
  477. /**
  478. * [清空消息]
  479. * @author: libingke
  480. * @return array
  481. */
  482. public function purge()
  483. {
  484. try {
  485. $connect = $this->getConnect();
  486. $channel = $connect->channel();
  487. $delete = $channel->queue_purge($this->name);
  488. if ($delete > 0)
  489. Redis::purge($this->name);
  490. $channel->close();
  491. $connect->close();
  492. return [
  493. 'queue' => $this->name,
  494. 'count' => $delete
  495. ];
  496. } catch (\Exception $e) {
  497. throw new Exception(1001, $e->getMessage());
  498. }
  499. }
  500. private function _closureLogin($msg, $queue)
  501. {
  502. $data = ['mid' => '', 'body' => '', 'response' => '', 'error' => ''];
  503. try {
  504. $data['mid'] = $msg->get('message_id');
  505. $data['body'] = $msg->body;
  506. if ($data['mid']) {
  507. $handle = new LoginHandle();
  508. $data['response'] = $handle->login($msg->body, $queue, $data['mid']);
  509. $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
  510. Redis::set($queue, $data['mid'], 'status', AmqpConfig::STATUS_HAND_OK);
  511. Redis::set($queue, $data['mid'], 'result', $data['response']);
  512. Redis::expire($queue, $data['mid'], 'result', 3600);
  513. }
  514. } catch (\Exception $e) {
  515. $data['error'] = $e->getMessage();
  516. } finally {
  517. $this->_result[] = $data;
  518. }
  519. }
  520. }