123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- <?php
- namespace console\controllers;
- use backend\models\Queue;
- use common\logic\Amqp\Cache;
- use common\logic\Amqp\Connect;
- use components\Curl;
- use Yii;
- /**
- * 消息处理进程
- * Class MessageController
- * @package console\controllers
- */
- class WorkerMsgController extends BaseController
- {
- /**
- * @var null
- */
- private $_conn = null;
- /**
- * @return \PhpAmqpLib\Connection\AMQPStreamConnection
- */
- protected function getConn()
- {
- if ($this->_conn == null) {
- $this->_conn = new \PhpAmqpLib\Connection\AMQPStreamConnection(
- Connect::HOST,
- Connect::PORT,
- Connect::USER,
- Connect::PASS
- );
- }
- return $this->_conn;
- }
- /**
- * [worker:登录处理]
- * @author: libingke
- */
- public function actionLogin()
- {
- $queue = Queue::find()->select('queue')->where(['sign' => 'licai_login'])->scalar();
- if (!$queue)
- exit("no login queue\r\n");
- $conn = $this->getConn();
- $channel = $conn->channel();
- $callback = function ($msg) use($queue) {
- $cacheTime = 600;
- $status = Cache::STATUS_HAND_FAIL;
- $corr_id = $msg->get('correlation_id');
- $post = json_decode($msg->body, true);
- //todo log
- if (!is_array($post)) {
- $data = ['code' => Cache::STATUS_HAND_FAIL, 'message' => '消息格式错误'];
- } else {
- //handle
- if (isset($post['url'])) {
- $url = $post['url'];
- unset($post['url']);
- $post['redirect_uri'] = 'https://zpapi.licai.cn';
- $curl = new Curl();
- $curl->setPostParams($post);
- $result = json_decode($curl->post($url), true);
- if (is_array($result) && isset($result['code'])) {
- $cacheTime = isset($result['data']['expires_in']) ? $result['data']['expires_in'] : 600;
- $status = Cache::STATUS_HAND_OK;
- $data = $result;
- } else {
- $data = ['code' => Cache::STATUS_HAND_FAIL, 'message' => $curl->errorText];
- }
- echo "Done: " . $corr_id, "\n";
- $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
- } else {
- $data = ['code' => Cache::STATUS_HAND_FAIL, 'message' => '消息格式错误[url error]'];
- }
- }
- Cache::setData("{$queue}:mid:{$corr_id}", $status);
- Cache::setData("result:{$queue}:mid:{$corr_id}", json_encode($data), $cacheTime);
- };
- $channel->basic_qos(null, 2, null);
- $channel->basic_consume($queue, '', false, false, false, false, $callback);
- while (count($channel->callbacks)) {
- $channel->wait();
- }
- $channel->close();
- $conn->close();
- }
- public function callback($msg)
- {
- echo "$msg---" . PHP_EOL;
- }
- public function actionTest()
- {
- $connect = new \PhpAmqpLib\Connection\AMQPStreamConnection(
- '121.196.226.188',
- '5672',
- 'lbk',
- '123456',
- Yii::$app->Amqp->vhost
- );
- $channel = $connect->channel();
- $channel->queue_declare('login', false, true, false, false);
- $callback = function ($msg) {
- call_user_func_array([$this, 'callback'], ['22']);
- //$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
- };
- $channel->basic_qos(0, 10, null);
- $channel->basic_consume('login', '', false, false, false, false, $callback);
- $i = 0;
- while (count($channel->callbacks)) {
- $channel->wait();
- $i++;
- echo $i . PHP_EOL;
- list($q_name, $message_count, $t) = $channel->queue_declare('login',
- false, true, false, false);
- var_dump($message_count, $t);
- }
- $channel->close();
- $connect->close();
- }
- }
|