_conn == null) { $this->_conn = new \PhpAmqpLib\Connection\AMQPStreamConnection( Connect::HOST, Connect::PORT, Connect::USER, Connect::PASS ); } return $this->_conn; } /** * [worker:登录处理] * @author: libingke */ public function actionLogin() { $queue = Queue::find()->select('queue')->where(['sign' => 'licai_login'])->scalar(); if (!$queue) exit("no login queue\r\n"); $conn = $this->getConn(); $channel = $conn->channel(); $callback = function ($msg) use($queue) { $cacheTime = 600; $status = Cache::STATUS_HAND_FAIL; $corr_id = $msg->get('correlation_id'); $post = json_decode($msg->body, true); //todo log if (!is_array($post)) { $data = ['code' => Cache::STATUS_HAND_FAIL, 'message' => '消息格式错误']; } else { //handle if (isset($post['url'])) { $url = $post['url']; unset($post['url']); $post['redirect_uri'] = 'https://zpapi.licai.cn'; $curl = new Curl(); $curl->setPostParams($post); $result = json_decode($curl->post($url), true); if (is_array($result) && isset($result['code'])) { $cacheTime = isset($result['data']['expires_in']) ? $result['data']['expires_in'] : 600; $status = Cache::STATUS_HAND_OK; $data = $result; } else { $data = ['code' => Cache::STATUS_HAND_FAIL, 'message' => $curl->errorText]; } echo "Done: " . $corr_id, "\n"; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); } else { $data = ['code' => Cache::STATUS_HAND_FAIL, 'message' => '消息格式错误[url error]']; } } Cache::setData("{$queue}:mid:{$corr_id}", $status); Cache::setData("result:{$queue}:mid:{$corr_id}", json_encode($data), $cacheTime); }; $channel->basic_qos(null, 2, null); $channel->basic_consume($queue, '', false, false, false, false, $callback); while (count($channel->callbacks)) { $channel->wait(); } $channel->close(); $conn->close(); } public function callback($msg) { echo "$msg---" . PHP_EOL; } public function actionTest() { $connect = new \PhpAmqpLib\Connection\AMQPStreamConnection( '121.196.226.188', '5672', 'lbk', '123456', Yii::$app->Amqp->vhost ); $channel = $connect->channel(); $channel->queue_declare('login', false, true, false, false); $callback = function ($msg) { call_user_func_array([$this, 'callback'], ['22']); //$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; $channel->basic_qos(0, 10, null); $channel->basic_consume('login', '', false, false, false, false, $callback); $i = 0; while (count($channel->callbacks)) { $channel->wait(); $i++; echo $i . PHP_EOL; list($q_name, $message_count, $t) = $channel->queue_declare('login', false, true, false, false); var_dump($message_count, $t); } $channel->close(); $connect->close(); } }