WorkerMsgController.php 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. <?php
  2. namespace console\controllers;
  3. use backend\forms\MessageForm;
  4. use common\logic\Amqp\Cache;
  5. use common\logic\Amqp\Connect;
  6. use components\Curl;
  7. /**
  8. * 消息处理进程
  9. * Class MessageController
  10. * @package console\controllers
  11. */
  12. class WorkerMsgController extends BaseController
  13. {
  14. /**
  15. * @var null
  16. */
  17. private $_conn = null;
  18. /**
  19. * @return \PhpAmqpLib\Connection\AMQPStreamConnection
  20. */
  21. protected function getConn()
  22. {
  23. if ($this->_conn == null) {
  24. $this->_conn = new \PhpAmqpLib\Connection\AMQPStreamConnection(
  25. Connect::HOST,
  26. Connect::PORT,
  27. Connect::USER,
  28. Connect::PASS
  29. );
  30. }
  31. return $this->_conn;
  32. }
  33. /**
  34. * [worker:登录处理]
  35. * @author: libingke
  36. */
  37. public function actionLogin()
  38. {
  39. $queue = MessageForm::getQueueName('login');
  40. $conn = $this->getConn();
  41. $channel = $conn->channel();
  42. $callback = function ($msg) {
  43. $cacheTime = 600;
  44. $status = Cache::STATUS_HAND_FAIL;
  45. $corr_id = $msg->get('correlation_id');
  46. $post = json_decode($msg->body, true);
  47. //todo log
  48. if (!is_array($post)) {
  49. $data = ['code' => Cache::STATUS_HAND_FAIL, 'message' => '消息格式错误'];
  50. } else {
  51. //handle
  52. $loginUrl = 'https://dev407.33.cn/admin/member/login';
  53. $post['redirect_uri'] = 'https://zpapi.licai.cn';
  54. $curl = new Curl();
  55. $curl->setPostParams($post);
  56. $result = json_decode($curl->post($loginUrl), true);
  57. if (is_array($result) && isset($result['code'])) {
  58. $cacheTime = isset($result['data']['expires_in']) ? $result['data']['expires_in'] : 864000;
  59. $status = Cache::STATUS_HAND_OK;
  60. $data = $result;
  61. } else {
  62. $data = ['code' => Cache::STATUS_HAND_FAIL, 'message' => $curl->errorText];
  63. }
  64. echo "Done: " . $corr_id, "\n";
  65. $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
  66. }
  67. Cache::setData($corr_id, $status);
  68. Cache::setData('result_' . $corr_id, $data, $cacheTime);
  69. };
  70. $channel->basic_qos(null, 1, null);
  71. $channel->basic_consume($queue, '', false, false, false, false, $callback);
  72. while (count($channel->callbacks)) {
  73. $channel->wait();
  74. }
  75. $channel->close();
  76. $conn->close();
  77. }
  78. }