* @since 2.0 */ class RabbitUserController extends Controller { public function actionWorker() { $connection = new AMQPStreamConnection('172.30.118.225', 5673, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('task_queue', false, true, false, false); //交互格式形式json php yii rabbittask/new-task "{"\"text\":\"广州\""}" // $city='{"text":"广州"}'; // var_dump($city); // $city=json_decode($city,true); // var_dump($city); $callback = function ($msg) { //do sth //$msg->body //连接数据库 //begin 持久化 $model = new RabbitLog(); $model->msg = json_encode($msg,TRUE); $model->msg_body =json_encode($msg->body,TRUE); $model->channel = json_encode($msg->delivery_info['channel'],TRUE); $model->queue = "task_queue"; $model->msg_delivery_tag =json_encode($msg->delivery_info['delivery_tag'],TRUE); $model->addtime = date('y-m-d h:i:s',time()); $model->describe = "this is rabbit consume message"; $model->save(false); echo 'ok consume one '. "\n" ; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; $channel->basic_qos(null, 1, null); $channel->basic_consume('task_queue', '', false, false, false, false, $callback); while (count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); } public function actionNewTask($n) { PhpClient::CallMq($n); } /* * demo * * */ public function actionInsert() { $model = new RabbitLog(); $model->msg = 1; $model->msg_body = 1; $model->channel = 1; $model->queue = 1; $model->msg_delivery_tag = 1; $model->addtime =date('y-m-d h:i:s',time()); $model->describe = "this is rabbit consume message"; $model->save(false); echo 'ok consume one '. "\n" ; } public function actionSelect() { $model = new RabbitLog(); $model->msg = 1; $model->msg_body = 1; $model->channel = 1; $model->queue = 1; $model->msg_delivery_tag = 1; $model->addtime =date('y-m-d h:i:s',time()); $model->describe = "this is rabbit consume message"; $model->save(false); echo 'ok consume one '. "\n" ; } }