123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222 |
- <?php
- /**
- * @link http://www.yiiframework.com/
- * @copyright Copyright (c) 2008 Yii Software LLC
- * @license http://www.yiiframework.com/license/
- */
- namespace console\controllers;
- use components\ApiHandler;
- use common\models\RabbitLog;
- use components\RabbitBase;
- use components\RabbitMqBase;
- use components\RabbitMqServer;
- use yii\console\Controller;
- use components\PhpClient;
- use PhpAmqpLib\Message\AMQPMessage;
- use PhpAmqpLib\Connection\AMQPStreamConnection;
- use yii\helpers\ArrayHelper;
- /**
- * This command echoes the first argument that you have entered.
- *
- * This command is provided as an example for you to learn how to create console commands.
- *
- * @author Qiang Xue <qiang.xue@gmail.com>
- * @since 2.0
- */
- class RabbitUserController extends Controller
- {
- public function actionGo()
- {
- // $RabbitMqServer = new RabbitMqServer();
- // $responses = $RabbitMqServer->setRabbit('{json:1}','','yang');//在这里传入发送给服务端脚本的内容
- $RabbitMqServer = new RabbitMqBase();
- RabbitMqBase::setRabbitMq('{json:2}','','yang');
- }
- public function actionLogin()
- {
- //从redis中取数结果
- $mobile='13236390680';
- $redis = \Yii::$app->redis;
- $key = 'get-one-login-user-id-by-phone-' . $mobile;
- $result = $redis->get($key); ;
- $redis->set($key,$result);
- var_dump($result);
- }
- public function actionGet()
- {
- $res = RabbitMqBase::getRabbitMq('','');
- var_dump($res);
- }
- public function actionShow()
- {
- $RabbitMqServer = new RabbitBase();
- $RabbitMqServer->call('nihao');
- }
- public function actionWorker()
- {
- $connection = new AMQPStreamConnection(\Yii::$app->params['rabbithost'], 5673, 'guest', 'guest');
- // $connection = new AMQPStreamConnection('172.30.118.225', 5673, 'guest', 'guest');
- $channel = $connection->channel();
- $channel->queue_declare('login', false, true, false, false);
- //交互格式形式json php yii rabbittask/new-task "{"\"text\":\"广州\""}"
- // $city='{"text":"广州"}';
- // var_dump($city);
- // $city=json_decode($city,true);
- // var_dump($city);
- $callback = function ($msg) {
- //do sth
- //$msg->body
- //连接数据库
- //begin 持久化
- $model = new RabbitLog();
- $model->msg = json_encode($msg,TRUE);
- $model->msg_body =json_encode($msg->body,TRUE);
- $model->channel = json_encode($msg->delivery_info['channel'],TRUE);
- $model->queue = "login";
- $model->msg_delivery_tag =json_encode($msg->delivery_info['delivery_tag'],TRUE);
- $model->addtime = date('y-m-d h:i:s',time());
- $model->describe = "this is rabbit consume message";
- $model->save(false);
- //请求接口登入
- //模拟接口 吧数据取出来
- //$msg->body //string
- $userInfo = ApiHandler::getInstance()->post('/mock/trueExam.do', [
- 'id' => 'ffff-1513343606669-10163178175-0418',//
- 'data'=>$msg->body//还想加参数过去
- ]);
- //
- //调试
- var_dump('data:-----'.$msg->body);
- var_dump('userInfo:-----'.$userInfo);
- $postRequestObj=json_decode($msg->body);
- $mobile=$postRequestObj->mobile;
- //从msg->body 取出
- $redis = \Yii::$app->redis;
- $key = 'get-one-login-user-id-by-phone-' . $mobile;
- $redis->set($key,$userInfo);
- //通知
- $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
- };
- $channel->basic_qos(null, 1, null);
- $channel->basic_consume('login', '', false, false, false, false, $callback);
- while (count($channel->callbacks)) {
- $channel->wait();
- }
- $channel->close();
- $connection->close();
- }
- //根据消费的句柄消费后删除消息
- public function actionAckmsg($queue='login',$msgid='13236390684',$num=1)
- {
- $connection = new AMQPStreamConnection(\Yii::$app->params['rabbithost'], 5673, 'guest', 'guest');
- // $connection = new AMQPStreamConnection('172.30.118.225', 5673, 'guest', 'guest');
- $channel = $connection->channel();
- $channel->queue_declare($queue, false, true, false, false);
- $callback = function ($msg) {
- $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
- };
- //设置堆积数量
- $channel->basic_qos(null, 65535, null);
- //设置ack通知手动
- $channel->basic_consume($queue, '', false, false, false, false, $callback);
- //继续消费
- while (count($channel->callbacks)) {
- $i=0;
- $i++;
- if($i<$num);
- $channel->wait();
- }
- $channel->close();
- $connection->close();
- return true;
- }
- /*
- * demo
- *
- * */
- public function actionInsert()
- {
- $model = new RabbitLog();
- $model->msg = 1;
- $model->msg_body = 1;
- $model->channel = 1;
- $model->queue = 1;
- $model->msg_delivery_tag = 1;
- $model->addtime =date('y-m-d h:i:s',time());
- $model->describe = "this is rabbit consume message";
- $model->save(false);
- echo 'ok consume one '. "\n" ;
- }
- public function actionSelect()
- {
- $model = new RabbitLog();
- $model->msg = 1;
- $model->msg_body = 1;
- $model->channel = 1;
- $model->queue = 1;
- $model->msg_delivery_tag = 1;
- $model->addtime =date('y-m-d h:i:s',time());
- $model->describe = "this is rabbit consume message";
- $model->save(false);
- echo 'ok consume one '. "\n" ;
- }
- }
|