1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192 |
- <?php
- namespace console\controllers;
- use backend\forms\MessageForm;
- use backend\models\Jobs;
- use common\logic\Amqp\Cache;
- use common\logic\Amqp\Connect;
- use components\Curl;
- /**
- * 消息处理进程
- * Class MessageController
- * @package console\controllers
- */
- class WorkerMsgController extends BaseController
- {
- /**
- * @var null
- */
- private $_conn = null;
- /**
- * @return \PhpAmqpLib\Connection\AMQPStreamConnection
- */
- protected function getConn()
- {
- if ($this->_conn == null) {
- $this->_conn = new \PhpAmqpLib\Connection\AMQPStreamConnection(
- Connect::HOST,
- Connect::PORT,
- Connect::USER,
- Connect::PASS
- );
- }
- return $this->_conn;
- }
- /**
- * [worker:登录处理]
- * @author: libingke
- */
- public function actionLogin()
- {
- $queue = Jobs::find()->select('queue')->where(['sign' => 'queue'])->scalar();
- $conn = $this->getConn();
- $channel = $conn->channel();
- $callback = function ($msg) {
- $cacheTime = 600;
- $status = Cache::STATUS_HAND_FAIL;
- $corr_id = $msg->get('correlation_id');
- $post = json_decode($msg->body, true);
- //todo log
- if (!is_array($post)) {
- $data = ['code' => Cache::STATUS_HAND_FAIL, 'message' => '消息格式错误'];
- } else {
- //handle
- $loginUrl = 'https://dev407.33.cn/admin/member/login';
- $post['redirect_uri'] = 'https://zpapi.licai.cn';
- $curl = new Curl();
- $curl->setPostParams($post);
- $result = json_decode($curl->post($loginUrl), true);
- if (is_array($result) && isset($result['code'])) {
- $cacheTime = isset($result['data']['expires_in']) ? $result['data']['expires_in'] : 864000;
- $status = Cache::STATUS_HAND_OK;
- $data = $result;
- } else {
- $data = ['code' => Cache::STATUS_HAND_FAIL, 'message' => $curl->errorText];
- }
- echo "Done: " . $corr_id, "\n";
- $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
- }
- Cache::setData($corr_id, $status);
- Cache::setData('result_' . $corr_id, $data, $cacheTime);
- };
- $channel->basic_qos(null, 1, null);
- $channel->basic_consume($queue, '', false, false, false, false, $callback);
- while (count($channel->callbacks)) {
- $channel->wait();
- }
- $channel->close();
- $conn->close();
- }
- }
|