0
0

MessageForm.php 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632
  1. <?php
  2. namespace backend\forms;
  3. use components\service\AmqpConfig;
  4. use PhpAmqpLib\Exception\AMQPProtocolChannelException;
  5. use common\helpers\KeyHelper;
  6. use common\logic\Amqp\Cache;
  7. use components\Curl;
  8. use components\Exception;
  9. use PhpAmqpLib\Message\AMQPMessage;
  10. use yii\helpers\ArrayHelper;
  11. use Yii;
  12. class MessageForm extends BaseForm
  13. {
  14. /* send */
  15. public $queue;
  16. public $message;
  17. /* message_list */
  18. public $name;
  19. /**
  20. * @var integer 数量
  21. */
  22. public $count;
  23. /**
  24. * @var bool 是否重新排序
  25. */
  26. public $requeue;
  27. /**
  28. * @var string 编码
  29. */
  30. public $encoding;
  31. /**
  32. * @var string 消费id
  33. */
  34. public $mid;
  35. /**
  36. * @var string 消费类型
  37. */
  38. public $type;
  39. /**
  40. * @var bool 是否空消费
  41. */
  42. public $do_nothing = false;
  43. /**
  44. * @var bool 自动应答
  45. */
  46. public $ack = false;
  47. /**
  48. * @var bool
  49. */
  50. private $_stop = false;
  51. /**
  52. * @var string
  53. */
  54. private $_mid;
  55. /**
  56. * @var string
  57. */
  58. private $_body;
  59. /**
  60. * @var AMQPMessage
  61. */
  62. private $_message;
  63. /**
  64. * @var array
  65. */
  66. private $_rows;
  67. /**
  68. * @var array
  69. */
  70. private $_result;
  71. const TYPE_MID = 'mid';//消费某条
  72. const TYPE_COUNT = 'count';//(从第一条开始) 消费条数
  73. const TYPE_MC = 'mid_count';//(从某条开始) 消费条数
  74. public function rules()
  75. {
  76. return [
  77. /* 发送消息 */
  78. [['queue'], 'trim', 'on' => ['send', 'batch_send']],
  79. [['queue', 'message'], 'required', 'on' => ['send', 'batch_send']],
  80. ['message', 'validateMessage', 'on' => ['send', 'batch_send']],
  81. /* 获取消息列表 */
  82. [['name'], 'trim', 'on' => ['message_list', 'purge', 'consume']],
  83. [['name'], 'required', 'on' => ['message_list', 'purge']],
  84. ['count', 'integer', 'min' => 1, 'max' => 1000, 'message' => 2012,
  85. 'tooSmall' => 2013, 'tooBig' => 2014, 'on' => ['message_list']],
  86. ['count', 'default', 'value' => 20, 'on' => ['message_list']],
  87. ['requeue', 'boolean', 'message' => 2010, 'on' => ['message_list']],
  88. ['requeue', 'default', 'value' => true, 'on' => ['message_list']],
  89. ['encoding', 'in', 'range' => ['auto', 'base64'], 'message' => 2011, 'on' => ['message_list']],
  90. ['encoding', 'default', 'value' => 'auto', 'on' => ['message_list']],
  91. /* 消费 */
  92. [['type', 'name'], 'required', 'on' => ['consume']],
  93. ['type', 'in', 'range' => [static::TYPE_MID, static::TYPE_COUNT, static::TYPE_MC], 'on' => 'consume'],
  94. ['type', 'validateType', 'on' => 'consume'],
  95. ['mid', 'string', 'on' => 'consume'],
  96. ['count', 'integer', 'min' => 1, 'max' => 65000, 'message' => 2012,
  97. 'tooSmall' => 2013, 'on' => ['consume']],
  98. ['do_nothing', 'default', 'value' => false, 'on' => 'consume'],
  99. ['do_nothing', 'boolean', 'on' => 'consume'],
  100. ['ack', 'default', 'value' => false, 'on' => 'consume'],
  101. ['ack', 'boolean', 'on' => 'consume'],
  102. ];
  103. }
  104. public function validateMessage($attribute)
  105. {
  106. if (!$this->$attribute)
  107. throw new Exception(2001);
  108. if (!is_array($this->$attribute))
  109. throw new Exception(2002, "{$attribute} 必须是数组");
  110. }
  111. public function validateType($attribute)
  112. {
  113. if ($this->$attribute == static::TYPE_MC) {
  114. if ($this->mid == null)
  115. throw new Exception(2016);
  116. if ($this->count == null)
  117. throw new Exception(2017);
  118. } else {
  119. if ($this->{$this->$attribute} == null)
  120. throw new Exception(2016, $this->$attribute . " 不能为空");
  121. }
  122. }
  123. /**
  124. * [连接AMQP]
  125. * @author: libingke
  126. * @return \PhpAmqpLib\Connection\AMQPStreamConnection
  127. */
  128. protected function getConnect()
  129. {
  130. return new \PhpAmqpLib\Connection\AMQPStreamConnection(
  131. Yii::$app->Amqp->host,
  132. Yii::$app->Amqp->port,
  133. Yii::$app->Amqp->user,
  134. Yii::$app->Amqp->pass,
  135. Yii::$app->Amqp->vhost
  136. );
  137. }
  138. /**
  139. * [发送消息]
  140. * @author: libingke
  141. * @return array
  142. * @throws Exception
  143. * @version 1.0
  144. */
  145. public function sendMessage()
  146. {
  147. $body = call_user_func_array([$this, '_messageBody'], [$this->attributes]);
  148. $mid = KeyHelper::getUniqueId('message_send');
  149. $properties = [
  150. 'content_type' => 'text/plain',
  151. 'message_id' => $mid,
  152. 'correlation_id' => $mid,
  153. 'consumer_tag' => $mid
  154. ];
  155. $data = [];
  156. $e_name = 'message.default';
  157. $k_route = 'route.default';
  158. $q_name = $this->queue;
  159. $connect = Yii::$app->Amqp->AMQPConnection();
  160. $channel = new \AMQPChannel($connect);
  161. $exchange = new \AMQPExchange($channel);
  162. $exchange->setName($e_name);
  163. $exchange->setType(AMQP_EX_TYPE_DIRECT);
  164. $exchange->setFlags(AMQP_DURABLE);//持久化
  165. $exchange->declareExchange();//声明交换机
  166. $queue = new \AMQPQueue($channel);
  167. $queue->setName($q_name);
  168. $queue->declareQueue(); //声明队列
  169. $queue->bind($e_name, $k_route);
  170. $r = $exchange->publish($body, $k_route, AMQP_NOPARAM, $properties);
  171. if ($r == true) {
  172. $data['message_total'] = $queue->declareQueue();
  173. $data['queue_name'] = $this->queue;
  174. $data['message_new'] = [
  175. 'mid' => $mid,
  176. 'body' => $body
  177. ];
  178. if ($data['message_total'] > 1) {
  179. $data['message_first'] = [
  180. 'mid' => $queue->get()->getMessageId(),
  181. 'body' => $queue->get()->getBody()
  182. ];
  183. }
  184. Cache::setData(
  185. 'queue:mid:'.$mid,
  186. Cache::STATUS_SEND_OK
  187. );
  188. } else {
  189. throw new Exception(2103);
  190. }
  191. return $data;
  192. }
  193. /**
  194. * [发送消息]
  195. * @author: libingke
  196. * @return array
  197. * @throws Exception
  198. * @version 1.1
  199. */
  200. public function sendMessageV1_1()
  201. {
  202. try {
  203. //connect
  204. $connect = $this->getConnect();
  205. $channel = $connect->channel();
  206. $this->_handleMessage($this->message);
  207. //预声明
  208. $channel->queue_declare($this->queue,
  209. false, true, false, false);
  210. $channel->basic_publish($this->_message, '', $this->queue);
  211. //获取返回结果
  212. list($q_name, $message_count, ) = $channel->queue_declare($this->queue,
  213. false, true, false, false);
  214. $data = [
  215. 'message_total' => $message_count,
  216. 'queue_name' => $q_name,
  217. 'message_add' => [
  218. 'mid' => $this->_mid,
  219. 'body' => $this->_body
  220. ]
  221. ];
  222. if (($get = $channel->basic_get($q_name)) !== null) {
  223. $data['basic_get'] = [
  224. 'mid' => $get->get('message_id'),
  225. 'body' => $get->body
  226. ];
  227. }
  228. $channel->close();
  229. $connect->close();
  230. $statusKey = KeyHelper::getMessageStatusKey($this->_mid, $q_name);
  231. Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_HAND_OK);
  232. return $data;
  233. } catch (\Exception $e) {
  234. throw new Exception(1000, $e->getMessage());
  235. }
  236. }
  237. /**
  238. * [构造消息体]
  239. * @author: libingke
  240. * @param $data
  241. */
  242. private function _handleMessage($data)
  243. {
  244. $this->_mid = KeyHelper::getUniqueId('message_send');
  245. $this->_body = call_user_func_array([$this, '_messageBody'], [$data]);
  246. $properties = [
  247. 'message_id' => $this->_mid,
  248. 'correlation_id'=> $this->_mid,
  249. 'consumer_tag' => $this->_mid
  250. ];
  251. $this->_message = new AMQPMessage($this->_body, $properties);
  252. }
  253. private function _messageBody($data)
  254. {
  255. return json_encode($data);
  256. }
  257. /**
  258. * [批量发送消息]
  259. * @author: libingke
  260. * @return array
  261. * @throws Exception
  262. */
  263. public function batchSendMessage()
  264. {
  265. try {
  266. //connect
  267. $connect = $this->getConnect();
  268. $channel = $connect->channel();
  269. //预声明
  270. $channel->queue_declare($this->queue,
  271. false, true, false, false);
  272. //batch_basic_publish todo
  273. foreach ($this->message as $k => $v) {
  274. $this->_handleMessage($v);
  275. $this->_rows[] = [
  276. 'mid' => $this->_mid,
  277. 'body' => $this->_body
  278. ];
  279. $channel->batch_basic_publish($this->_message, '', $this->queue);
  280. $statusKey = KeyHelper::getMessageStatusKey($this->_mid, $this->queue);
  281. Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_HAND_OK);
  282. }
  283. $channel->publish_batch();
  284. //获取返回结果
  285. list($q_name, $message_count, ) = $channel->queue_declare($this->queue,
  286. false, true, false, false);
  287. $data = [
  288. 'message_total' => $message_count,
  289. 'queue_name' => $q_name,
  290. 'add_count' => count($this->_rows),
  291. 'add_rows' => $this->_rows
  292. ];
  293. if (($get = $channel->basic_get($q_name)) !== null) {
  294. $data['basic_get'] = [
  295. 'mid' => $get->get('message_id'),
  296. 'body' => $get->body
  297. ];
  298. }
  299. $channel->close();
  300. $connect->close();
  301. return $data;
  302. } catch (\Exception $e) {
  303. throw new Exception(1000, $e->getMessage());
  304. }
  305. }
  306. /**
  307. * [获取消息列表]
  308. * @author: libingke
  309. */
  310. public function getMessageList()
  311. {
  312. /* api错误码 */
  313. $badCode = 2000;
  314. /* 登录验证 */
  315. $authStr = Yii::$app->Amqp->getConfig('user') . ':' . Yii::$app->Amqp->getConfig('pass');
  316. /* URL */
  317. $vhost = urlencode(Yii::$app->Amqp->getConfig('vhost'));
  318. $url = Yii::$app->Amqp->getConfig('host') . ':' . Yii::$app->Amqp->getConfig('api_port') .
  319. "/api/queues/{$vhost}/" . $this->name . "/get";
  320. $postParams = [
  321. 'name' => $this->name,
  322. 'count' => $this->count,
  323. 'encoding' => $this->encoding,
  324. 'requeue' => $this->requeue,
  325. 'truncate' => "50000",
  326. 'vhost' => '/',
  327. ];
  328. $curl = new Curl();
  329. $curl->setOption(CURLOPT_USERPWD, $authStr);
  330. $curl->setRawPostData(json_encode($postParams));
  331. $result = json_decode($curl->post($url), true);
  332. if ($curl->responseCode != 200)
  333. throw new Exception($badCode);
  334. if ($curl->errorText)
  335. throw new Exception($badCode, $curl->errorText);
  336. if (isset($result['error']) && is_string($result['error']))
  337. throw new Exception($badCode, $result['error']);
  338. ArrayHelper::multisort($result,'message_count',SORT_ASC);
  339. //print_r($result);exit();
  340. $rows = [];
  341. foreach ($result as $k => $v) {
  342. $rows[$k]['before_count'] = $v['message_count'];
  343. $rows[$k]['payload'] = $v['payload'];
  344. $rows[$k]['payload_encoding']= $v['payload_encoding'];
  345. $rows[$k]['message_id'] = $v['properties']['message_id'];
  346. }
  347. unset($result);
  348. return ['count' => count($rows), 'rows' => $rows];
  349. }
  350. /**
  351. * [空处理]
  352. */
  353. private function _consumeEmpty($mid, $body, $ack, $error, $stop = false)
  354. {
  355. $this->_stop = $stop;
  356. $this->_result[] = [
  357. 'mid' => $mid,
  358. 'result' => 'success: do nothing!',
  359. 'ack' => $ack,
  360. 'error' => $error
  361. ];
  362. }
  363. /**
  364. * [处理逻辑1]
  365. * @author: libingke
  366. */
  367. private function _consumeLogin($mid, $body, $ack, $error, $stop = false)
  368. {
  369. $this->_stop = $stop;
  370. $this->_result[] = [
  371. 'mid' => $mid,
  372. 'result' => '已处理',
  373. 'ack' => $ack,
  374. 'error' => $error
  375. ];
  376. }
  377. /**
  378. * [消费某条消息]
  379. * @author: libingke
  380. * @param $mid
  381. * @return array
  382. * @throws Exception
  383. */
  384. protected function consumeByMid($mid)
  385. {
  386. $q_name = $this->name;
  387. //选择执行回调
  388. if ($this->do_nothing != true) {
  389. $function = '_consume' . ucfirst($q_name);
  390. if (!method_exists($this, $function) || !is_callable(array($this, $function)))
  391. throw new Exception(2015);
  392. } else {
  393. //if do nothing handle empty
  394. $function = '_consumeEmpty';
  395. }
  396. $connect = $this->getConnect();
  397. $channel = $connect->channel();
  398. $channel->queue_declare($q_name,
  399. false, true, false, false);
  400. list(, $count, ) = $channel->queue_declare($q_name,
  401. false, true, false, false);
  402. $callback = function ($msg) use($function, $mid, $q_name) {
  403. try {
  404. $message_id = $msg->get('message_id');
  405. $ack = $this->ack == true ? true : false;//是否应答
  406. if ($mid == $message_id) {
  407. call_user_func_array(
  408. [$this, $function],
  409. [$message_id, $msg->body, $ack, '', true]
  410. );
  411. $statusKey = KeyHelper::getMessageStatusKey($message_id, $q_name);
  412. if ($ack === true) {
  413. $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
  414. Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_HAND_OK);
  415. } else {
  416. Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_NO_ACK);
  417. }
  418. }
  419. } catch (\Exception $e) {
  420. //$e->getMessage();
  421. }
  422. };
  423. try {
  424. $channel->basic_qos(0, $count, null);
  425. $channel->basic_consume($q_name,
  426. '', false, false, false, false, $callback);
  427. $i = 0;
  428. while (count($channel->callbacks)) {
  429. $i ++;
  430. if ($i > $count || $this->_stop == true)
  431. break;
  432. $channel->wait();
  433. }
  434. } catch (\Exception $e) {
  435. throw new Exception(2104, $e->getMessage());
  436. }
  437. $channel->close();
  438. $connect->close();
  439. if ($this->_result == null)
  440. throw new Exception(2102);
  441. return $this->_result;
  442. }
  443. /**
  444. * [根据数量消费]
  445. * @author: libingke
  446. * @param string | int $startPos 开始位置
  447. * @param int $count 数量
  448. */
  449. protected function consumeByCount($count)
  450. {
  451. $q_name = $this->name;
  452. //选择执行回调
  453. if ($this->do_nothing != true) {
  454. $function = '_consume' . ucfirst($q_name);
  455. if (!method_exists($this, $function) || !is_callable(array($this, $function)))
  456. throw new Exception(2015);
  457. } else {
  458. //if do nothing handle empty
  459. $function = '_consumeEmpty';
  460. }
  461. $connect = $this->getConnect();
  462. $channel = $connect->channel();
  463. $channel->queue_declare($q_name,
  464. false, true, false, false);
  465. list(, $total, ) = $channel->queue_declare($q_name,
  466. false, true, false, false);
  467. $callback = function ($msg) use($function, $q_name) {
  468. $ack = $this->ack == true ? true : false;//是否应答
  469. try {
  470. $message_id = $msg->get('message_id');
  471. call_user_func_array(
  472. [$this, $function],
  473. [$message_id, $msg->body, $ack, '', false]
  474. );
  475. //更新状态
  476. $statusKey = KeyHelper::getMessageStatusKey($message_id, $q_name);
  477. if ($ack === true) {
  478. $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
  479. Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_HAND_OK);
  480. } else {
  481. Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_NO_ACK);
  482. }
  483. } catch (\Exception $e) {
  484. //消息体出错机制
  485. call_user_func_array(
  486. [$this, $function],
  487. ['', $msg->body, $ack, $e->getMessage(), false]
  488. );
  489. }
  490. };
  491. try {
  492. $min = min($count, $total);
  493. $channel->basic_qos(0, $min, null);
  494. $channel->basic_consume($q_name,
  495. '', false, false, false, false, $callback);
  496. $i = 0;
  497. while (count($channel->callbacks)) {
  498. $i ++;
  499. if ($i > $min || $this->_stop == true)
  500. break;
  501. $channel->wait();
  502. }
  503. } catch (\Exception $e) {
  504. throw new Exception(2104, $e->getMessage());
  505. }
  506. $channel->close();
  507. $connect->close();
  508. return $this->_result;
  509. }
  510. /**
  511. * [消费消息]
  512. * @author: libingke
  513. */
  514. public function consumeMessage()
  515. {
  516. switch ($this->type)
  517. {
  518. case static::TYPE_MID:
  519. $data = $this->consumeByMid($this->mid);
  520. break;
  521. case static::TYPE_COUNT:
  522. $data = $this->consumeByCount($this->count);
  523. break;
  524. case static::TYPE_MC:
  525. throw new Exception(1000, '未开发');
  526. break;
  527. default:
  528. return "It's not possible to get there.";
  529. }
  530. return $data;
  531. }
  532. /**
  533. * [清空消息]
  534. * @author: libingke
  535. * @return array
  536. */
  537. public function purge()
  538. {
  539. $data = [];
  540. $connect = Yii::$app->Amqp->AMQPConnection();
  541. $channel = new \AMQPChannel($connect);
  542. $queue = new \AMQPQueue($channel);
  543. $queue->setName($this->name);
  544. $queue->purge();
  545. return $data;
  546. }
  547. }