|
@@ -147,6 +147,43 @@ class RabbitUserController extends Controller
|
|
|
$channel->close();
|
|
|
$connection->close();
|
|
|
}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ //根据消费的句柄消费后删除消息
|
|
|
+ public function actionAckmsg($queue='login',$msgid='13236390684',$num=1)
|
|
|
+ {
|
|
|
+
|
|
|
+
|
|
|
+ $connection = new AMQPStreamConnection(\Yii::$app->params['rabbithost'], 5673, 'guest', 'guest');
|
|
|
+// $connection = new AMQPStreamConnection('172.30.118.225', 5673, 'guest', 'guest');
|
|
|
+ $channel = $connection->channel();
|
|
|
+ $channel->queue_declare($queue, false, true, false, false);
|
|
|
+ $callback = function ($msg) {
|
|
|
+ $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
|
|
|
+ };
|
|
|
+
|
|
|
+
|
|
|
+ //设置堆积数量
|
|
|
+ $channel->basic_qos(null, 65535, null);
|
|
|
+ //设置ack通知手动
|
|
|
+ $channel->basic_consume($queue, '', false, false, false, false, $callback);
|
|
|
+
|
|
|
+
|
|
|
+ //继续消费
|
|
|
+ while (count($channel->callbacks)) {
|
|
|
+ $i=0;
|
|
|
+ $i++;
|
|
|
+ if($i<$num);
|
|
|
+ $channel->wait();
|
|
|
+ }
|
|
|
+
|
|
|
+ $channel->close();
|
|
|
+ $connection->close();
|
|
|
+
|
|
|
+ return true;
|
|
|
+ }
|
|
|
/*
|
|
|
* demo
|
|
|
*
|