WorkerMsgController.php 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. <?php
  2. namespace console\controllers;
  3. use backend\models\Queue;
  4. use common\logic\Amqp\Cache;
  5. use common\logic\Amqp\Connect;
  6. use components\Curl;
  7. use Yii;
  8. /**
  9. * 消息处理进程
  10. * Class MessageController
  11. * @package console\controllers
  12. */
  13. class WorkerMsgController extends BaseController
  14. {
  15. /**
  16. * @var null
  17. */
  18. private $_conn = null;
  19. /**
  20. * @return \PhpAmqpLib\Connection\AMQPStreamConnection
  21. */
  22. protected function getConn()
  23. {
  24. if ($this->_conn == null) {
  25. $this->_conn = new \PhpAmqpLib\Connection\AMQPStreamConnection(
  26. Connect::HOST,
  27. Connect::PORT,
  28. Connect::USER,
  29. Connect::PASS
  30. );
  31. }
  32. return $this->_conn;
  33. }
  34. /**
  35. * [worker:登录处理]
  36. * @author: libingke
  37. */
  38. public function actionLogin()
  39. {
  40. $queue = Queue::find()->select('queue')->where(['sign' => 'licai_login'])->scalar();
  41. if (!$queue)
  42. exit("no login queue\r\n");
  43. $conn = $this->getConn();
  44. $channel = $conn->channel();
  45. $callback = function ($msg) use($queue) {
  46. $cacheTime = 600;
  47. $status = Cache::STATUS_HAND_FAIL;
  48. $corr_id = $msg->get('correlation_id');
  49. $post = json_decode($msg->body, true);
  50. //todo log
  51. if (!is_array($post)) {
  52. $data = ['code' => Cache::STATUS_HAND_FAIL, 'message' => '消息格式错误'];
  53. } else {
  54. //handle
  55. if (isset($post['url'])) {
  56. $url = $post['url'];
  57. unset($post['url']);
  58. $post['redirect_uri'] = 'https://zpapi.licai.cn';
  59. $curl = new Curl();
  60. $curl->setPostParams($post);
  61. $result = json_decode($curl->post($url), true);
  62. if (is_array($result) && isset($result['code'])) {
  63. $cacheTime = isset($result['data']['expires_in']) ? $result['data']['expires_in'] : 600;
  64. $status = Cache::STATUS_HAND_OK;
  65. $data = $result;
  66. } else {
  67. $data = ['code' => Cache::STATUS_HAND_FAIL, 'message' => $curl->errorText];
  68. }
  69. echo "Done: " . $corr_id, "\n";
  70. $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
  71. } else {
  72. $data = ['code' => Cache::STATUS_HAND_FAIL, 'message' => '消息格式错误[url error]'];
  73. }
  74. }
  75. Cache::setData("{$queue}:mid:{$corr_id}", $status);
  76. Cache::setData("result:{$queue}:mid:{$corr_id}", json_encode($data), $cacheTime);
  77. };
  78. $channel->basic_qos(null, 2, null);
  79. $channel->basic_consume($queue, '', false, false, false, false, $callback);
  80. while (count($channel->callbacks)) {
  81. $channel->wait();
  82. }
  83. $channel->close();
  84. $conn->close();
  85. }
  86. public function callback($msg)
  87. {
  88. echo "$msg---" . PHP_EOL;
  89. }
  90. public function actionTest()
  91. {
  92. $connect = new \PhpAmqpLib\Connection\AMQPStreamConnection(
  93. '121.196.226.188',
  94. '5672',
  95. 'lbk',
  96. '123456',
  97. Yii::$app->Amqp->vhost
  98. );
  99. $channel = $connect->channel();
  100. $channel->queue_declare('login', false, true, false, false);
  101. $callback = function ($msg) {
  102. call_user_func_array([$this, 'callback'], ['22']);
  103. //$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
  104. };
  105. $channel->basic_qos(0, 10, null);
  106. $channel->basic_consume('login', '', false, false, false, false, $callback);
  107. $i = 0;
  108. while (count($channel->callbacks)) {
  109. $channel->wait();
  110. $i++;
  111. echo $i . PHP_EOL;
  112. list($q_name, $message_count, $t) = $channel->queue_declare('login',
  113. false, true, false, false);
  114. var_dump($message_count, $t);
  115. }
  116. $channel->close();
  117. $connect->close();
  118. }
  119. }