_conn == null) { $this->_conn = new \PhpAmqpLib\Connection\AMQPStreamConnection( Connect::HOST, Connect::PORT, Connect::USER, Connect::PASS ); } return $this->_conn; } /** * [worker:登录处理] * @author: libingke */ public function actionLogin() { $queue = Jobs::find()->select('queue')->where(['sign' => 'queue'])->scalar(); $conn = $this->getConn(); $channel = $conn->channel(); $callback = function ($msg) { $cacheTime = 600; $status = Cache::STATUS_HAND_FAIL; $corr_id = $msg->get('correlation_id'); $post = json_decode($msg->body, true); //todo log if (!is_array($post)) { $data = ['code' => Cache::STATUS_HAND_FAIL, 'message' => '消息格式错误']; } else { //handle $loginUrl = 'https://dev407.33.cn/admin/member/login'; $post['redirect_uri'] = 'https://zpapi.licai.cn'; $curl = new Curl(); $curl->setPostParams($post); $result = json_decode($curl->post($loginUrl), true); if (is_array($result) && isset($result['code'])) { $cacheTime = isset($result['data']['expires_in']) ? $result['data']['expires_in'] : 864000; $status = Cache::STATUS_HAND_OK; $data = $result; } else { $data = ['code' => Cache::STATUS_HAND_FAIL, 'message' => $curl->errorText]; } echo "Done: " . $corr_id, "\n"; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); } Cache::setData($corr_id, $status); Cache::setData('result_' . $corr_id, $data, $cacheTime); }; $channel->basic_qos(null, 1, null); $channel->basic_consume($queue, '', false, false, false, false, $callback); while (count($channel->callbacks)) { $channel->wait(); } $channel->close(); $conn->close(); } }