WorkerMsgController.php 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. <?php
  2. namespace console\controllers;
  3. use backend\forms\MessageForm;
  4. use backend\models\Jobs;
  5. use common\logic\Amqp\Cache;
  6. use common\logic\Amqp\Connect;
  7. use components\Curl;
  8. /**
  9. * 消息处理进程
  10. * Class MessageController
  11. * @package console\controllers
  12. */
  13. class WorkerMsgController extends BaseController
  14. {
  15. /**
  16. * @var null
  17. */
  18. private $_conn = null;
  19. /**
  20. * @return \PhpAmqpLib\Connection\AMQPStreamConnection
  21. */
  22. protected function getConn()
  23. {
  24. if ($this->_conn == null) {
  25. $this->_conn = new \PhpAmqpLib\Connection\AMQPStreamConnection(
  26. Connect::HOST,
  27. Connect::PORT,
  28. Connect::USER,
  29. Connect::PASS
  30. );
  31. }
  32. return $this->_conn;
  33. }
  34. /**
  35. * [worker:登录处理]
  36. * @author: libingke
  37. */
  38. public function actionLogin()
  39. {
  40. $queue = Jobs::find()->select('queue')->where(['sign' => 'queue'])->scalar();
  41. $conn = $this->getConn();
  42. $channel = $conn->channel();
  43. $callback = function ($msg) {
  44. $cacheTime = 600;
  45. $status = Cache::STATUS_HAND_FAIL;
  46. $corr_id = $msg->get('correlation_id');
  47. $post = json_decode($msg->body, true);
  48. //todo log
  49. if (!is_array($post)) {
  50. $data = ['code' => Cache::STATUS_HAND_FAIL, 'message' => '消息格式错误'];
  51. } else {
  52. //handle
  53. $loginUrl = 'https://dev407.33.cn/admin/member/login';
  54. $post['redirect_uri'] = 'https://zpapi.licai.cn';
  55. $curl = new Curl();
  56. $curl->setPostParams($post);
  57. $result = json_decode($curl->post($loginUrl), true);
  58. if (is_array($result) && isset($result['code'])) {
  59. $cacheTime = isset($result['data']['expires_in']) ? $result['data']['expires_in'] : 864000;
  60. $status = Cache::STATUS_HAND_OK;
  61. $data = $result;
  62. } else {
  63. $data = ['code' => Cache::STATUS_HAND_FAIL, 'message' => $curl->errorText];
  64. }
  65. echo "Done: " . $corr_id, "\n";
  66. $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
  67. }
  68. Cache::setData($corr_id, $status);
  69. Cache::setData('result_' . $corr_id, $data, $cacheTime);
  70. };
  71. $channel->basic_qos(null, 1, null);
  72. $channel->basic_consume($queue, '', false, false, false, false, $callback);
  73. while (count($channel->callbacks)) {
  74. $channel->wait();
  75. }
  76. $channel->close();
  77. $conn->close();
  78. }
  79. }