RabbitUserController.php 5.8 KB

  1. <?php
  2. /**
  3. * @link http://www.yiiframework.com/
  4. * @copyright Copyright (c) 2008 Yii Software LLC
  5. * @license http://www.yiiframework.com/license/
  6. */
  7. namespace console\controllers;
  8. use components\ApiHandler;
  9. use common\models\RabbitLog;
  10. use components\RabbitBase;
  11. use components\RabbitMqBase;
  12. use components\RabbitMqServer;
  13. use yii\console\Controller;
  14. use components\PhpClient;
  15. use PhpAmqpLib\Message\AMQPMessage;
  16. use PhpAmqpLib\Connection\AMQPStreamConnection;
  17. use yii\helpers\ArrayHelper;
  26. class RabbitUserController extends Controller
  27. {
  28. public function actionGo()
  29. {
  30. // $RabbitMqServer = new RabbitMqServer();
  31. // $responses = $RabbitMqServer->setRabbit('{json:1}','','yang');//在这里传入发送给服务端脚本的内容
  32. $RabbitMqServer = new RabbitMqBase();
  33. RabbitMqBase::setRabbitMq('{json:2}','','yang');
  34. }
  35. public function actionLogin()
  36. {
  37. //从redis中取数结果
  38. $mobile='13236390680';
  39. $redis = \Yii::$app->redis;
  40. $key = 'get-one-login-user-id-by-phone-' . $mobile;
  41. $result = $redis->get($key); ;
  42. $redis->set($key,$result);
  43. var_dump($result);
  44. }
  45. public function actionGet()
  46. {
  47. $res = RabbitMqBase::getRabbitMq('','');
  48. var_dump($res);
  49. }
  50. public function actionShow()
  51. {
  52. $RabbitMqServer = new RabbitBase();
  53. $RabbitMqServer->call('nihao');
  54. }
  55. public function actionWorker()
  56. {
  57. $connection = new AMQPStreamConnection(\Yii::$app->params['rabbithost'], 5673, 'guest', 'guest');
  58. // $connection = new AMQPStreamConnection('', 5673, 'guest', 'guest');
  59. $channel = $connection->channel();
  60. $channel->queue_declare('login', false, true, false, false);
  61. //交互格式形式json php yii rabbittask/new-task "{"\"text\":\"广州\""}"
  62. // $city='{"text":"广州"}';
  63. // var_dump($city);
  64. // $city=json_decode($city,true);
  65. // var_dump($city);
  66. $callback = function ($msg) {
  67. //do sth
  68. //$msg->body
  69. //连接数据库
  70. //begin 持久化
  71. $model = new RabbitLog();
  72. $model->msg = json_encode($msg,TRUE);
  73. $model->msg_body =json_encode($msg->body,TRUE);
  74. $model->channel = json_encode($msg->delivery_info['channel'],TRUE);
  75. $model->queue = "login";
  76. $model->msg_delivery_tag =json_encode($msg->delivery_info['delivery_tag'],TRUE);
  77. $model->addtime = date('y-m-d h:i:s',time());
  78. $model->describe = "this is rabbit consume message";
  79. $model->save(false);
  80. //请求接口登入
  81. //模拟接口 吧数据取出来
  82. //$msg->body //string
  83. $userInfo = ApiHandler::getInstance()->post('/mock/trueExam.do', [
  84. 'id' => 'ffff-1513343606669-10163178175-0418',//
  85. 'data'=>$msg->body//还想加参数过去
  86. ]);
  87. //
  88. //调试
  89. var_dump('data:-----'.$msg->body);
  90. var_dump('userInfo:-----'.$userInfo);
  91. $postRequestObj=json_decode($msg->body);
  92. $mobile=$postRequestObj->mobile;
  93. //从msg->body 取出
  94. $redis = \Yii::$app->redis;
  95. $key = 'get-one-login-user-id-by-phone-' . $mobile;
  96. $redis->set($key,$userInfo);
  97. //通知
  98. $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
  99. };
  100. $channel->basic_qos(null, 1, null);
  101. $channel->basic_consume('login', '', false, false, false, false, $callback);
  102. while (count($channel->callbacks)) {
  103. $channel->wait();
  104. }
  105. $channel->close();
  106. $connection->close();
  107. }
  108. //根据消费的句柄消费后删除消息
  109. public function actionAckmsg($queue='login',$msgid='13236390684',$num=1)
  110. {
  111. $connection = new AMQPStreamConnection(\Yii::$app->params['rabbithost'], 5673, 'guest', 'guest');
  112. // $connection = new AMQPStreamConnection('', 5673, 'guest', 'guest');
  113. $channel = $connection->channel();
  114. $channel->queue_declare($queue, false, true, false, false);
  115. $callback = function ($msg) {
  116. $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
  117. };
  118. //设置堆积数量
  119. $channel->basic_qos(null, 65535, null);
  120. //设置ack通知手动
  121. $channel->basic_consume($queue, '', false, false, false, false, $callback);
  122. //继续消费
  123. while (count($channel->callbacks)) {
  124. $i=0;
  125. $i++;
  126. if($i<$num);
  127. $channel->wait();
  128. }
  129. $channel->close();
  130. $connection->close();
  131. return true;
  132. }
  133. /*
  134. * demo
  135. *
  136. * */
  137. public function actionInsert()
  138. {
  139. $model = new RabbitLog();
  140. $model->msg = 1;
  141. $model->msg_body = 1;
  142. $model->channel = 1;
  143. $model->queue = 1;
  144. $model->msg_delivery_tag = 1;
  145. $model->addtime =date('y-m-d h:i:s',time());
  146. $model->describe = "this is rabbit consume message";
  147. $model->save(false);
  148. echo 'ok consume one '. "\n" ;
  149. }
  150. public function actionSelect()
  151. {
  152. $model = new RabbitLog();
  153. $model->msg = 1;
  154. $model->msg_body = 1;
  155. $model->channel = 1;
  156. $model->queue = 1;
  157. $model->msg_delivery_tag = 1;
  158. $model->addtime =date('y-m-d h:i:s',time());
  159. $model->describe = "this is rabbit consume message";
  160. $model->save(false);
  161. echo 'ok consume one '. "\n" ;
  162. }
  163. }