* @since 2.0 */ class RabbitUserController extends Controller { public function actionGo() { // $RabbitMqServer = new RabbitMqServer(); // $responses = $RabbitMqServer->setRabbit('{json:1}','','yang');//在这里传入发送给服务端脚本的内容 $RabbitMqServer = new RabbitMqBase(); RabbitMqBase::setRabbitMq('{json:2}','','yang'); } public function actionLogin() { //从redis中取数结果 $mobile='13236390680'; $redis = \Yii::$app->redis; $key = 'get-one-login-user-id-by-phone-' . $mobile; $result = $redis->get($key); ; $redis->set($key,$result); var_dump($result); } public function actionGet() { $res = RabbitMqBase::getRabbitMq('',''); var_dump($res); } public function actionShow() { $RabbitMqServer = new RabbitBase(); $RabbitMqServer->call('nihao'); } public function actionWorker() { $connection = new AMQPStreamConnection(\Yii::$app->params['rabbithost'], 5673, 'guest', 'guest'); // $connection = new AMQPStreamConnection('172.30.118.225', 5673, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('login', false, true, false, false); //交互格式形式json php yii rabbittask/new-task "{"\"text\":\"广州\""}" // $city='{"text":"广州"}'; // var_dump($city); // $city=json_decode($city,true); // var_dump($city); $callback = function ($msg) { //do sth //$msg->body //连接数据库 //begin 持久化 $model = new RabbitLog(); $model->msg = json_encode($msg,TRUE); $model->msg_body =json_encode($msg->body,TRUE); $model->channel = json_encode($msg->delivery_info['channel'],TRUE); $model->queue = "login"; $model->msg_delivery_tag =json_encode($msg->delivery_info['delivery_tag'],TRUE); $model->addtime = date('y-m-d h:i:s',time()); $model->describe = "this is rabbit consume message"; $model->save(false); //请求接口登入 //模拟接口 吧数据取出来 //$msg->body //string $userInfo = ApiHandler::getInstance()->post('/mock/trueExam.do', [ 'id' => 'ffff-1513343606669-10163178175-0418',// 'data'=>$msg->body//还想加参数过去 ]); // //调试 var_dump('data:-----'.$msg->body); var_dump('userInfo:-----'.$userInfo); $postRequestObj=json_decode($msg->body); $mobile=$postRequestObj->mobile; //从msg->body 取出 $redis = \Yii::$app->redis; $key = 'get-one-login-user-id-by-phone-' . $mobile; $redis->set($key,$userInfo); //通知 $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; $channel->basic_qos(null, 1, null); $channel->basic_consume('login', '', false, false, false, false, $callback); while (count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); } //根据消费的句柄消费后删除消息 public function actionAckmsg($queue='login',$msgid='13236390684',$num=1) { $connection = new AMQPStreamConnection(\Yii::$app->params['rabbithost'], 5673, 'guest', 'guest'); // $connection = new AMQPStreamConnection('172.30.118.225', 5673, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare($queue, false, true, false, false); $callback = function ($msg) { $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; //设置堆积数量 $channel->basic_qos(null, 65535, null); //设置ack通知手动 $channel->basic_consume($queue, '', false, false, false, false, $callback); //继续消费 while (count($channel->callbacks)) { $i=0; $i++; if($i<$num); $channel->wait(); } $channel->close(); $connection->close(); return true; } /* * demo * * */ public function actionInsert() { $model = new RabbitLog(); $model->msg = 1; $model->msg_body = 1; $model->channel = 1; $model->queue = 1; $model->msg_delivery_tag = 1; $model->addtime =date('y-m-d h:i:s',time()); $model->describe = "this is rabbit consume message"; $model->save(false); echo 'ok consume one '. "\n" ; } public function actionSelect() { $model = new RabbitLog(); $model->msg = 1; $model->msg_body = 1; $model->channel = 1; $model->queue = 1; $model->msg_delivery_tag = 1; $model->addtime =date('y-m-d h:i:s',time()); $model->describe = "this is rabbit consume message"; $model->save(false); echo 'ok consume one '. "\n" ; } }