|
@@ -2,25 +2,31 @@
|
|
|
|
|
|
namespace backend\forms;
|
|
namespace backend\forms;
|
|
|
|
|
|
|
|
+use common\logic\LoginHandle;
|
|
|
|
+use PhpAmqpLib\Message\AMQPMessage;
|
|
use components\service\AmqpConfig;
|
|
use components\service\AmqpConfig;
|
|
-use PhpAmqpLib\Exception\AMQPProtocolChannelException;
|
|
|
|
-use common\helpers\KeyHelper;
|
|
|
|
-use common\logic\Amqp\Cache;
|
|
|
|
-use components\Curl;
|
|
|
|
|
|
+use components\service\Redis;
|
|
use components\Exception;
|
|
use components\Exception;
|
|
-use PhpAmqpLib\Message\AMQPMessage;
|
|
|
|
|
|
+use components\Curl;
|
|
|
|
+use common\helpers\KeyHelper;
|
|
use yii\helpers\ArrayHelper;
|
|
use yii\helpers\ArrayHelper;
|
|
use Yii;
|
|
use Yii;
|
|
|
|
|
|
class MessageForm extends BaseForm
|
|
class MessageForm extends BaseForm
|
|
{
|
|
{
|
|
- /* send */
|
|
|
|
|
|
+ /**
|
|
|
|
+ * @var
|
|
|
|
+ */
|
|
public $queue;
|
|
public $queue;
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * @var
|
|
|
|
+ */
|
|
public $message;
|
|
public $message;
|
|
|
|
|
|
-
|
|
|
|
- /* message_list */
|
|
|
|
|
|
+ /**
|
|
|
|
+ * @var
|
|
|
|
+ */
|
|
public $name;
|
|
public $name;
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -44,27 +50,30 @@ class MessageForm extends BaseForm
|
|
public $mid;
|
|
public $mid;
|
|
|
|
|
|
/**
|
|
/**
|
|
- * @var string 消费类型
|
|
|
|
|
|
+ * @var array 消费ids
|
|
*/
|
|
*/
|
|
- public $type;
|
|
|
|
|
|
+ public $mids;
|
|
|
|
|
|
/**
|
|
/**
|
|
- * @var bool 是否空消费
|
|
|
|
|
|
+ * @var string 消费类型
|
|
*/
|
|
*/
|
|
- public $do_nothing = false;
|
|
|
|
|
|
+ public $type;
|
|
|
|
|
|
/**
|
|
/**
|
|
* @var bool 自动应答
|
|
* @var bool 自动应答
|
|
*/
|
|
*/
|
|
- public $ack = false;
|
|
|
|
|
|
+ public $ack;
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * @var bool 强制删除
|
|
|
|
+ */
|
|
|
|
+ public $forced;
|
|
|
|
|
|
/**
|
|
/**
|
|
* @var bool
|
|
* @var bool
|
|
*/
|
|
*/
|
|
private $_stop = false;
|
|
private $_stop = false;
|
|
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* @var string
|
|
* @var string
|
|
*/
|
|
*/
|
|
@@ -90,142 +99,46 @@ class MessageForm extends BaseForm
|
|
*/
|
|
*/
|
|
private $_result;
|
|
private $_result;
|
|
|
|
|
|
- const TYPE_MID = 'mid';//消费某条
|
|
|
|
- const TYPE_COUNT = 'count';//(从第一条开始) 消费条数
|
|
|
|
- const TYPE_MC = 'mid_count';//(从某条开始) 消费条数
|
|
|
|
-
|
|
|
|
public function rules()
|
|
public function rules()
|
|
{
|
|
{
|
|
return [
|
|
return [
|
|
|
|
+ [['name'], 'required', 'on' => ['message_list', 'purge']],
|
|
|
|
+ [['name'], 'trim', 'on' => ['message_list', 'purge', 'consume', 'delete', 'ack']],
|
|
|
|
+
|
|
/* 发送消息 */
|
|
/* 发送消息 */
|
|
[['queue'], 'trim', 'on' => ['send', 'batch_send']],
|
|
[['queue'], 'trim', 'on' => ['send', 'batch_send']],
|
|
[['queue', 'message'], 'required', 'on' => ['send', 'batch_send']],
|
|
[['queue', 'message'], 'required', 'on' => ['send', 'batch_send']],
|
|
- ['message', 'validateMessage', 'on' => ['send', 'batch_send']],
|
|
|
|
|
|
+ ['message', 'validateArray', 'on' => ['send', 'batch_send']],
|
|
|
|
|
|
/* 获取消息列表 */
|
|
/* 获取消息列表 */
|
|
- [['name'], 'trim', 'on' => ['message_list', 'purge', 'consume']],
|
|
|
|
- [['name'], 'required', 'on' => ['message_list', 'purge']],
|
|
|
|
- ['count', 'integer', 'min' => 1, 'max' => 1000, 'message' => 2012,
|
|
|
|
- 'tooSmall' => 2013, 'tooBig' => 2014, 'on' => ['message_list']],
|
|
|
|
|
|
+ ['count', 'integer', 'min' => 1, 'max' => 1000, 'message' => 1205,
|
|
|
|
+ 'tooSmall' => 1206, 'tooBig' => 1207, 'on' => ['message_list']],
|
|
['count', 'default', 'value' => 20, 'on' => ['message_list']],
|
|
['count', 'default', 'value' => 20, 'on' => ['message_list']],
|
|
- ['requeue', 'boolean', 'message' => 2010, 'on' => ['message_list']],
|
|
|
|
|
|
+ ['requeue', 'boolean', 'message' => 1209, 'on' => ['message_list']],
|
|
['requeue', 'default', 'value' => true, 'on' => ['message_list']],
|
|
['requeue', 'default', 'value' => true, 'on' => ['message_list']],
|
|
- ['encoding', 'in', 'range' => ['auto', 'base64'], 'message' => 2011, 'on' => ['message_list']],
|
|
|
|
|
|
+ ['encoding', 'in', 'range' => ['auto', 'base64'], 'message' => 1210, 'on' => ['message_list']],
|
|
['encoding', 'default', 'value' => 'auto', 'on' => ['message_list']],
|
|
['encoding', 'default', 'value' => 'auto', 'on' => ['message_list']],
|
|
|
|
|
|
/* 消费 */
|
|
/* 消费 */
|
|
- [['type', 'name'], 'required', 'on' => ['consume']],
|
|
|
|
- ['type', 'in', 'range' => [static::TYPE_MID, static::TYPE_COUNT, static::TYPE_MC], 'on' => 'consume'],
|
|
|
|
- ['type', 'validateType', 'on' => 'consume'],
|
|
|
|
- ['mid', 'string', 'on' => 'consume'],
|
|
|
|
- ['count', 'integer', 'min' => 1, 'max' => 65000, 'message' => 2012,
|
|
|
|
- 'tooSmall' => 2013, 'on' => ['consume']],
|
|
|
|
- ['do_nothing', 'default', 'value' => false, 'on' => 'consume'],
|
|
|
|
- ['do_nothing', 'boolean', 'on' => 'consume'],
|
|
|
|
- ['ack', 'default', 'value' => false, 'on' => 'consume'],
|
|
|
|
- ['ack', 'boolean', 'on' => 'consume'],
|
|
|
|
|
|
+ [['name', 'count'], 'required', 'on' => ['consume']],
|
|
|
|
+ ['count', 'integer', 'min' => 1, 'max' => 65000, 'message' => 1205,
|
|
|
|
+ 'tooSmall' => 1206, 'on' => ['consume']],
|
|
|
|
+ ['type', 'default', 'value' => 'client', 'on' => 'consume'],
|
|
|
|
+ ['type', 'in', 'range' => ['server', 'client'], 'message' => 1302, 'on' => 'consume'],
|
|
|
|
+
|
|
|
|
+ /* delete & ack */
|
|
|
|
+ [['mids', 'name'], 'required', 'on' => ['delete', 'ack']],
|
|
|
|
+ ['mids', 'validateArray', 'on' => ['delete', 'ack']],
|
|
|
|
+ ['name', 'string', 'on' => ['delete', 'ack']],
|
|
|
|
+ ['forced', 'default', 'value' => false, 'on' => 'delete'],
|
|
|
|
+ ['forced', 'boolean', 'on' => 'delete'],
|
|
];
|
|
];
|
|
}
|
|
}
|
|
|
|
|
|
- public function validateMessage($attribute)
|
|
|
|
|
|
+ public function validateArray($attribute)
|
|
{
|
|
{
|
|
- if (!$this->$attribute)
|
|
|
|
- throw new Exception(2001);
|
|
|
|
-
|
|
|
|
- if (!is_array($this->$attribute))
|
|
|
|
- throw new Exception(2002, "{$attribute} 必须是数组");
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public function validateType($attribute)
|
|
|
|
- {
|
|
|
|
- if ($this->$attribute == static::TYPE_MC) {
|
|
|
|
- if ($this->mid == null)
|
|
|
|
- throw new Exception(2016);
|
|
|
|
- if ($this->count == null)
|
|
|
|
- throw new Exception(2017);
|
|
|
|
-
|
|
|
|
- } else {
|
|
|
|
- if ($this->{$this->$attribute} == null)
|
|
|
|
- throw new Exception(2016, $this->$attribute . " 不能为空");
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * [连接AMQP]
|
|
|
|
- * @author: libingke
|
|
|
|
- * @return \PhpAmqpLib\Connection\AMQPStreamConnection
|
|
|
|
- */
|
|
|
|
- protected function getConnect()
|
|
|
|
- {
|
|
|
|
- return new \PhpAmqpLib\Connection\AMQPStreamConnection(
|
|
|
|
- Yii::$app->Amqp->host,
|
|
|
|
- Yii::$app->Amqp->port,
|
|
|
|
- Yii::$app->Amqp->user,
|
|
|
|
- Yii::$app->Amqp->pass,
|
|
|
|
- Yii::$app->Amqp->vhost
|
|
|
|
- );
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * [发送消息]
|
|
|
|
- * @author: libingke
|
|
|
|
- * @return array
|
|
|
|
- * @throws Exception
|
|
|
|
- * @version 1.0
|
|
|
|
- */
|
|
|
|
- public function sendMessage()
|
|
|
|
- {
|
|
|
|
- $body = call_user_func_array([$this, '_messageBody'], [$this->attributes]);
|
|
|
|
- $mid = KeyHelper::getUniqueId('message_send');
|
|
|
|
- $properties = [
|
|
|
|
- 'content_type' => 'text/plain',
|
|
|
|
- 'message_id' => $mid,
|
|
|
|
- 'correlation_id' => $mid,
|
|
|
|
- 'consumer_tag' => $mid
|
|
|
|
- ];
|
|
|
|
-
|
|
|
|
- $data = [];
|
|
|
|
- $e_name = 'message.default';
|
|
|
|
- $k_route = 'route.default';
|
|
|
|
- $q_name = $this->queue;
|
|
|
|
-
|
|
|
|
- $connect = Yii::$app->Amqp->AMQPConnection();
|
|
|
|
- $channel = new \AMQPChannel($connect);
|
|
|
|
- $exchange = new \AMQPExchange($channel);
|
|
|
|
- $exchange->setName($e_name);
|
|
|
|
- $exchange->setType(AMQP_EX_TYPE_DIRECT);
|
|
|
|
- $exchange->setFlags(AMQP_DURABLE);//持久化
|
|
|
|
- $exchange->declareExchange();//声明交换机
|
|
|
|
-
|
|
|
|
- $queue = new \AMQPQueue($channel);
|
|
|
|
- $queue->setName($q_name);
|
|
|
|
- $queue->declareQueue(); //声明队列
|
|
|
|
- $queue->bind($e_name, $k_route);
|
|
|
|
- $r = $exchange->publish($body, $k_route, AMQP_NOPARAM, $properties);
|
|
|
|
- if ($r == true) {
|
|
|
|
- $data['message_total'] = $queue->declareQueue();
|
|
|
|
- $data['queue_name'] = $this->queue;
|
|
|
|
- $data['message_new'] = [
|
|
|
|
- 'mid' => $mid,
|
|
|
|
- 'body' => $body
|
|
|
|
- ];
|
|
|
|
-
|
|
|
|
- if ($data['message_total'] > 1) {
|
|
|
|
- $data['message_first'] = [
|
|
|
|
- 'mid' => $queue->get()->getMessageId(),
|
|
|
|
- 'body' => $queue->get()->getBody()
|
|
|
|
- ];
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- Cache::setData(
|
|
|
|
- 'queue:mid:'.$mid,
|
|
|
|
- Cache::STATUS_SEND_OK
|
|
|
|
- );
|
|
|
|
- } else {
|
|
|
|
- throw new Exception(2103);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- return $data;
|
|
|
|
|
|
+ if (!$this->$attribute || !is_array($this->$attribute))
|
|
|
|
+ throw new Exception(1003, "{$attribute} 必须是数组");
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -235,10 +148,9 @@ class MessageForm extends BaseForm
|
|
* @throws Exception
|
|
* @throws Exception
|
|
* @version 1.1
|
|
* @version 1.1
|
|
*/
|
|
*/
|
|
- public function sendMessageV1_1()
|
|
|
|
|
|
+ public function sendMessage()
|
|
{
|
|
{
|
|
try {
|
|
try {
|
|
- //connect
|
|
|
|
$connect = $this->getConnect();
|
|
$connect = $this->getConnect();
|
|
$channel = $connect->channel();
|
|
$channel = $connect->channel();
|
|
$this->_handleMessage($this->message);
|
|
$this->_handleMessage($this->message);
|
|
@@ -267,19 +179,17 @@ class MessageForm extends BaseForm
|
|
$channel->close();
|
|
$channel->close();
|
|
$connect->close();
|
|
$connect->close();
|
|
|
|
|
|
- $statusKey = KeyHelper::getMessageStatusKey($this->_mid, $q_name);
|
|
|
|
- Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_HAND_OK);
|
|
|
|
|
|
+ Redis::set($q_name, $this->_mid, 'status', AmqpConfig::STATUS_SEND_OK);
|
|
|
|
|
|
return $data;
|
|
return $data;
|
|
|
|
|
|
} catch (\Exception $e) {
|
|
} catch (\Exception $e) {
|
|
- throw new Exception(1000, $e->getMessage());
|
|
|
|
|
|
+ throw new Exception(1001, $e->getMessage());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
* [构造消息体]
|
|
* [构造消息体]
|
|
- * @author: libingke
|
|
|
|
* @param $data
|
|
* @param $data
|
|
*/
|
|
*/
|
|
private function _handleMessage($data)
|
|
private function _handleMessage($data)
|
|
@@ -308,13 +218,12 @@ class MessageForm extends BaseForm
|
|
public function batchSendMessage()
|
|
public function batchSendMessage()
|
|
{
|
|
{
|
|
try {
|
|
try {
|
|
- //connect
|
|
|
|
$connect = $this->getConnect();
|
|
$connect = $this->getConnect();
|
|
$channel = $connect->channel();
|
|
$channel = $connect->channel();
|
|
//预声明
|
|
//预声明
|
|
$channel->queue_declare($this->queue,
|
|
$channel->queue_declare($this->queue,
|
|
false, true, false, false);
|
|
false, true, false, false);
|
|
- //batch_basic_publish todo
|
|
|
|
|
|
+
|
|
foreach ($this->message as $k => $v) {
|
|
foreach ($this->message as $k => $v) {
|
|
$this->_handleMessage($v);
|
|
$this->_handleMessage($v);
|
|
$this->_rows[] = [
|
|
$this->_rows[] = [
|
|
@@ -323,8 +232,7 @@ class MessageForm extends BaseForm
|
|
];
|
|
];
|
|
$channel->batch_basic_publish($this->_message, '', $this->queue);
|
|
$channel->batch_basic_publish($this->_message, '', $this->queue);
|
|
|
|
|
|
- $statusKey = KeyHelper::getMessageStatusKey($this->_mid, $this->queue);
|
|
|
|
- Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_HAND_OK);
|
|
|
|
|
|
+ Redis::set($this->queue, $this->_mid, 'status', AmqpConfig::STATUS_SEND_OK);
|
|
}
|
|
}
|
|
$channel->publish_batch();
|
|
$channel->publish_batch();
|
|
|
|
|
|
@@ -350,7 +258,7 @@ class MessageForm extends BaseForm
|
|
return $data;
|
|
return $data;
|
|
|
|
|
|
} catch (\Exception $e) {
|
|
} catch (\Exception $e) {
|
|
- throw new Exception(1000, $e->getMessage());
|
|
|
|
|
|
+ throw new Exception(1001, $e->getMessage());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -360,14 +268,11 @@ class MessageForm extends BaseForm
|
|
*/
|
|
*/
|
|
public function getMessageList()
|
|
public function getMessageList()
|
|
{
|
|
{
|
|
- /* api错误码 */
|
|
|
|
- $badCode = 2000;
|
|
|
|
- /* 登录验证 */
|
|
|
|
- $authStr = Yii::$app->Amqp->getConfig('user') . ':' . Yii::$app->Amqp->getConfig('pass');
|
|
|
|
- /* URL */
|
|
|
|
- $vhost = urlencode(Yii::$app->Amqp->getConfig('vhost'));
|
|
|
|
- $url = Yii::$app->Amqp->getConfig('host') . ':' . Yii::$app->Amqp->getConfig('api_port') .
|
|
|
|
|
|
+ $authStr = Yii::$app->Amqp->user . ':' . Yii::$app->Amqp->pass;
|
|
|
|
+ $vhost = urlencode(Yii::$app->Amqp->vhost);
|
|
|
|
+ $url = Yii::$app->Amqp->host . ':' . Yii::$app->Amqp->api_port .
|
|
"/api/queues/{$vhost}/" . $this->name . "/get";
|
|
"/api/queues/{$vhost}/" . $this->name . "/get";
|
|
|
|
+
|
|
$postParams = [
|
|
$postParams = [
|
|
'name' => $this->name,
|
|
'name' => $this->name,
|
|
'count' => $this->count,
|
|
'count' => $this->count,
|
|
@@ -382,20 +287,22 @@ class MessageForm extends BaseForm
|
|
$curl->setRawPostData(json_encode($postParams));
|
|
$curl->setRawPostData(json_encode($postParams));
|
|
$result = json_decode($curl->post($url), true);
|
|
$result = json_decode($curl->post($url), true);
|
|
if ($curl->responseCode != 200)
|
|
if ($curl->responseCode != 200)
|
|
- throw new Exception($badCode);
|
|
|
|
|
|
+ throw new Exception(1002);
|
|
|
|
+
|
|
if ($curl->errorText)
|
|
if ($curl->errorText)
|
|
- throw new Exception($badCode, $curl->errorText);
|
|
|
|
|
|
+ throw new Exception(1002, $curl->errorText);
|
|
|
|
+
|
|
if (isset($result['error']) && is_string($result['error']))
|
|
if (isset($result['error']) && is_string($result['error']))
|
|
- throw new Exception($badCode, $result['error']);
|
|
|
|
|
|
+ throw new Exception(1002, $result['error']);
|
|
|
|
|
|
ArrayHelper::multisort($result,'message_count',SORT_ASC);
|
|
ArrayHelper::multisort($result,'message_count',SORT_ASC);
|
|
//print_r($result);exit();
|
|
//print_r($result);exit();
|
|
$rows = [];
|
|
$rows = [];
|
|
foreach ($result as $k => $v) {
|
|
foreach ($result as $k => $v) {
|
|
- $rows[$k]['before_count'] = $v['message_count'];
|
|
|
|
- $rows[$k]['payload'] = $v['payload'];
|
|
|
|
- $rows[$k]['payload_encoding']= $v['payload_encoding'];
|
|
|
|
- $rows[$k]['message_id'] = $v['properties']['message_id'];
|
|
|
|
|
|
+ $rows[$k]['mid'] = $v['properties']['message_id'];
|
|
|
|
+ $rows[$k]['body'] = $v['payload'];
|
|
|
|
+ $rows[$k]['before'] = $v['message_count'];
|
|
|
|
+
|
|
}
|
|
}
|
|
unset($result);
|
|
unset($result);
|
|
|
|
|
|
@@ -403,131 +310,147 @@ class MessageForm extends BaseForm
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * [空处理]
|
|
|
|
|
|
+ * consumeMessage
|
|
|
|
+ * @author: libingke
|
|
|
|
+ * @return array
|
|
|
|
+ * @throws Exception
|
|
*/
|
|
*/
|
|
- private function _consumeEmpty($mid, $body, $ack, $error, $stop = false)
|
|
|
|
|
|
+ public function consumeMessage()
|
|
{
|
|
{
|
|
- $this->_stop = $stop;
|
|
|
|
- $this->_result[] = [
|
|
|
|
- 'mid' => $mid,
|
|
|
|
- 'result' => 'success: do nothing!',
|
|
|
|
- 'ack' => $ack,
|
|
|
|
- 'error' => $error
|
|
|
|
- ];
|
|
|
|
|
|
+ $q_name = $this->name;
|
|
|
|
+ if ($this->type == 'server') {
|
|
|
|
+ $function = '_closure' . ucfirst($q_name);
|
|
|
|
+ if ( !method_exists($this, $function) )
|
|
|
|
+ throw new Exception(1303);
|
|
|
|
+
|
|
|
|
+ } else {
|
|
|
|
+ $function = 'closureConsume';
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ $connect = $this->getConnect();
|
|
|
|
+ $channel = $connect->channel();
|
|
|
|
+ $channel->queue_declare($q_name,
|
|
|
|
+ false, true, false, false);
|
|
|
|
+ list(, $total, ) = $channel->queue_declare($q_name,
|
|
|
|
+ false, true, false, false);
|
|
|
|
+
|
|
|
|
+ if ($total == 0)
|
|
|
|
+ throw new Exception(1300);
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ $min = min($this->count, $total);
|
|
|
|
+
|
|
|
|
+ $callback = function ($msg) use($q_name, $function) {call_user_func_array([$this, $function], [$msg, $q_name]);};
|
|
|
|
+ $channel->basic_qos(0, $min, null);
|
|
|
|
+ $channel->basic_consume($q_name,
|
|
|
|
+ '', false, false, false, false, $callback);
|
|
|
|
+ for ($i = 1; count($channel->callbacks) && $this->_stop !== true; $i++) {
|
|
|
|
+ if ($i > $min)
|
|
|
|
+ break;
|
|
|
|
+
|
|
|
|
+ $channel->wait();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ $channel->close();
|
|
|
|
+ $connect->close();
|
|
|
|
+
|
|
|
|
+ } catch (\Exception $e) {
|
|
|
|
+ throw new Exception(1001, $e->getMessage());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return $this->_result;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * [处理逻辑1]
|
|
|
|
- * @author: libingke
|
|
|
|
|
|
+ * closureConsume for consumeMessage
|
|
|
|
+ * @param $msg
|
|
|
|
+ * @param $queue
|
|
*/
|
|
*/
|
|
- private function _consumeLogin($mid, $body, $ack, $error, $stop = false)
|
|
|
|
|
|
+ protected function closureConsume($msg, $queue)
|
|
{
|
|
{
|
|
- $this->_stop = $stop;
|
|
|
|
- $this->_result[] = [
|
|
|
|
- 'mid' => $mid,
|
|
|
|
- 'result' => '已处理',
|
|
|
|
- 'ack' => $ack,
|
|
|
|
- 'error' => $error
|
|
|
|
- ];
|
|
|
|
|
|
+ $data = ['mid' => '', 'body' => '', 'error' => ''];
|
|
|
|
+ try {
|
|
|
|
+ $data['mid'] = $msg->get('message_id');
|
|
|
|
+ $data['body'] = $msg->body;
|
|
|
|
+ Redis::set($queue, $data['mid'], 'status', AmqpConfig::STATUS_HAND);
|
|
|
|
+
|
|
|
|
+ } catch (\Exception $e) {
|
|
|
|
+ $data['error'] = $e->getMessage();
|
|
|
|
+
|
|
|
|
+ } finally {
|
|
|
|
+ $this->_result[] = $data;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * [消费某条消息]
|
|
|
|
|
|
+ * ackMessage
|
|
* @author: libingke
|
|
* @author: libingke
|
|
- * @param $mid
|
|
|
|
* @return array
|
|
* @return array
|
|
* @throws Exception
|
|
* @throws Exception
|
|
*/
|
|
*/
|
|
- protected function consumeByMid($mid)
|
|
|
|
|
|
+ public function ackMessage()
|
|
{
|
|
{
|
|
- $q_name = $this->name;
|
|
|
|
-
|
|
|
|
- //选择执行回调
|
|
|
|
- if ($this->do_nothing != true) {
|
|
|
|
- $function = '_consume' . ucfirst($q_name);
|
|
|
|
- if (!method_exists($this, $function) || !is_callable(array($this, $function)))
|
|
|
|
- throw new Exception(2015);
|
|
|
|
- } else {
|
|
|
|
- //if do nothing handle empty
|
|
|
|
- $function = '_consumeEmpty';
|
|
|
|
- }
|
|
|
|
|
|
+ //帅选合法待应答消息id
|
|
|
|
+ foreach ($this->mids as $mid)
|
|
|
|
+ if ($mid && is_string($mid))
|
|
|
|
+ $this->_rows[$mid] = $mid;
|
|
|
|
+ if (!is_array($this->_rows) || count($this->_rows) == 0)
|
|
|
|
+ throw new Exception(1301);
|
|
|
|
+ $ack = $this->_rows;
|
|
|
|
|
|
|
|
+ $q_name = $this->name;
|
|
$connect = $this->getConnect();
|
|
$connect = $this->getConnect();
|
|
$channel = $connect->channel();
|
|
$channel = $connect->channel();
|
|
$channel->queue_declare($q_name,
|
|
$channel->queue_declare($q_name,
|
|
false, true, false, false);
|
|
false, true, false, false);
|
|
- list(, $count, ) = $channel->queue_declare($q_name,
|
|
|
|
|
|
+ list(, $total, ) = $channel->queue_declare($q_name,
|
|
false, true, false, false);
|
|
false, true, false, false);
|
|
- $callback = function ($msg) use($function, $mid, $q_name) {
|
|
|
|
- try {
|
|
|
|
- $message_id = $msg->get('message_id');
|
|
|
|
- $ack = $this->ack == true ? true : false;//是否应答
|
|
|
|
- if ($mid == $message_id) {
|
|
|
|
- call_user_func_array(
|
|
|
|
- [$this, $function],
|
|
|
|
- [$message_id, $msg->body, $ack, '', true]
|
|
|
|
- );
|
|
|
|
-
|
|
|
|
- $statusKey = KeyHelper::getMessageStatusKey($message_id, $q_name);
|
|
|
|
- if ($ack === true) {
|
|
|
|
- $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
|
|
|
|
- Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_HAND_OK);
|
|
|
|
- } else {
|
|
|
|
- Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_NO_ACK);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- } catch (\Exception $e) {
|
|
|
|
- //$e->getMessage();
|
|
|
|
- }
|
|
|
|
- };
|
|
|
|
|
|
|
|
try {
|
|
try {
|
|
- $channel->basic_qos(0, $count, null);
|
|
|
|
|
|
+ $callback = function ($msg) use($q_name) {
|
|
|
|
+ call_user_func_array([$this, 'closureAck'], [$msg, $q_name, AmqpConfig::STATUS_HAND_OK]);
|
|
|
|
+ };
|
|
|
|
+ $channel->basic_qos(0, $total, null);
|
|
$channel->basic_consume($q_name,
|
|
$channel->basic_consume($q_name,
|
|
'', false, false, false, false, $callback);
|
|
'', false, false, false, false, $callback);
|
|
-
|
|
|
|
- $i = 0;
|
|
|
|
- while (count($channel->callbacks)) {
|
|
|
|
- $i ++;
|
|
|
|
- if ($i > $count || $this->_stop == true)
|
|
|
|
|
|
+ for ($i = 1; count($channel->callbacks) && $this->_stop !== true; $i++) {
|
|
|
|
+ if ($i > $total)
|
|
break;
|
|
break;
|
|
|
|
|
|
$channel->wait();
|
|
$channel->wait();
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ $channel->close();
|
|
|
|
+ $connect->close();
|
|
|
|
+
|
|
} catch (\Exception $e) {
|
|
} catch (\Exception $e) {
|
|
- throw new Exception(2104, $e->getMessage());
|
|
|
|
|
|
+ throw new Exception(1101, $e->getMessage());
|
|
}
|
|
}
|
|
|
|
|
|
- $channel->close();
|
|
|
|
- $connect->close();
|
|
|
|
-
|
|
|
|
- if ($this->_result == null)
|
|
|
|
- throw new Exception(2102);
|
|
|
|
-
|
|
|
|
- return $this->_result;
|
|
|
|
|
|
+ $ackCount = count($ack) - count($this->_rows);
|
|
|
|
+ $data = ['queue' => $q_name, 'ack_count' => $ackCount];
|
|
|
|
+ count($this->_rows) ? $data['ack_fail'] = array_keys($this->_rows) : null;
|
|
|
|
+ return $data;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * [根据数量消费]
|
|
|
|
|
|
+ * deleteMessage
|
|
* @author: libingke
|
|
* @author: libingke
|
|
- * @param string | int $startPos 开始位置
|
|
|
|
- * @param int $count 数量
|
|
|
|
|
|
+ * @return array
|
|
|
|
+ * @throws Exception
|
|
*/
|
|
*/
|
|
- protected function consumeByCount($count)
|
|
|
|
|
|
+ public function deleteMessage()
|
|
{
|
|
{
|
|
- $q_name = $this->name;
|
|
|
|
-
|
|
|
|
- //选择执行回调
|
|
|
|
- if ($this->do_nothing != true) {
|
|
|
|
- $function = '_consume' . ucfirst($q_name);
|
|
|
|
- if (!method_exists($this, $function) || !is_callable(array($this, $function)))
|
|
|
|
- throw new Exception(2015);
|
|
|
|
- } else {
|
|
|
|
- //if do nothing handle empty
|
|
|
|
- $function = '_consumeEmpty';
|
|
|
|
- }
|
|
|
|
|
|
+ //帅选合法待应答消息id
|
|
|
|
+ foreach ($this->mids as $mid)
|
|
|
|
+ if ($mid && is_string($mid))
|
|
|
|
+ $this->_rows[$mid] = $mid;
|
|
|
|
+ if (!is_array($this->_rows) || count($this->_rows) == 0)
|
|
|
|
+ throw new Exception(1301);
|
|
|
|
+ $delete = $this->_rows;
|
|
|
|
+ Redis::batchDel($this->name, $delete, 'status');
|
|
|
|
|
|
|
|
+ $q_name = $this->name;
|
|
$connect = $this->getConnect();
|
|
$connect = $this->getConnect();
|
|
$channel = $connect->channel();
|
|
$channel = $connect->channel();
|
|
$channel->queue_declare($q_name,
|
|
$channel->queue_declare($q_name,
|
|
@@ -535,82 +458,53 @@ class MessageForm extends BaseForm
|
|
list(, $total, ) = $channel->queue_declare($q_name,
|
|
list(, $total, ) = $channel->queue_declare($q_name,
|
|
false, true, false, false);
|
|
false, true, false, false);
|
|
|
|
|
|
- $callback = function ($msg) use($function, $q_name) {
|
|
|
|
- $ack = $this->ack == true ? true : false;//是否应答
|
|
|
|
- try {
|
|
|
|
- $message_id = $msg->get('message_id');
|
|
|
|
- call_user_func_array(
|
|
|
|
- [$this, $function],
|
|
|
|
- [$message_id, $msg->body, $ack, '', false]
|
|
|
|
- );
|
|
|
|
-
|
|
|
|
- //更新状态
|
|
|
|
- $statusKey = KeyHelper::getMessageStatusKey($message_id, $q_name);
|
|
|
|
- if ($ack === true) {
|
|
|
|
- $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
|
|
|
|
- Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_HAND_OK);
|
|
|
|
- } else {
|
|
|
|
- Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_NO_ACK);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- } catch (\Exception $e) {
|
|
|
|
- //消息体出错机制
|
|
|
|
- call_user_func_array(
|
|
|
|
- [$this, $function],
|
|
|
|
- ['', $msg->body, $ack, $e->getMessage(), false]
|
|
|
|
- );
|
|
|
|
- }
|
|
|
|
- };
|
|
|
|
-
|
|
|
|
try {
|
|
try {
|
|
- $min = min($count, $total);
|
|
|
|
- $channel->basic_qos(0, $min, null);
|
|
|
|
|
|
+ $callback = function ($msg) use($q_name) {call_user_func_array([$this, 'closureAck'], [$msg]);};
|
|
|
|
+ $channel->basic_qos(0, $total, null);
|
|
$channel->basic_consume($q_name,
|
|
$channel->basic_consume($q_name,
|
|
'', false, false, false, false, $callback);
|
|
'', false, false, false, false, $callback);
|
|
-
|
|
|
|
- $i = 0;
|
|
|
|
- while (count($channel->callbacks)) {
|
|
|
|
- $i ++;
|
|
|
|
- if ($i > $min || $this->_stop == true)
|
|
|
|
|
|
+ for ($i = 1; count($channel->callbacks) && $this->_stop !== true; $i++) {
|
|
|
|
+ if ($i > $total)
|
|
break;
|
|
break;
|
|
|
|
|
|
$channel->wait();
|
|
$channel->wait();
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ $channel->close();
|
|
|
|
+ $connect->close();
|
|
|
|
+
|
|
} catch (\Exception $e) {
|
|
} catch (\Exception $e) {
|
|
- throw new Exception(2104, $e->getMessage());
|
|
|
|
|
|
+ throw new Exception(1101, $e->getMessage());
|
|
}
|
|
}
|
|
|
|
|
|
- $channel->close();
|
|
|
|
- $connect->close();
|
|
|
|
- return $this->_result;
|
|
|
|
|
|
+ $deleteCount = count($delete) - count($this->_rows);
|
|
|
|
+ $data = ['queue' => $q_name, 'delete_count' => $deleteCount];
|
|
|
|
+ count($this->_rows) ? $data['delete_fail'] = array_keys($this->_rows) : null;
|
|
|
|
+ return $data;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * [消费消息]
|
|
|
|
|
|
+ * closureAck for ackMessage && deleteMessage
|
|
* @author: libingke
|
|
* @author: libingke
|
|
|
|
+ * @param $msg
|
|
|
|
+ * @param $queue
|
|
|
|
+ * @param bool $status
|
|
*/
|
|
*/
|
|
- public function consumeMessage()
|
|
|
|
|
|
+ protected function closureAck($msg, $queue = '', $status = false)
|
|
{
|
|
{
|
|
- switch ($this->type)
|
|
|
|
- {
|
|
|
|
- case static::TYPE_MID:
|
|
|
|
- $data = $this->consumeByMid($this->mid);
|
|
|
|
- break;
|
|
|
|
-
|
|
|
|
- case static::TYPE_COUNT:
|
|
|
|
- $data = $this->consumeByCount($this->count);
|
|
|
|
- break;
|
|
|
|
-
|
|
|
|
- case static::TYPE_MC:
|
|
|
|
- throw new Exception(1000, '未开发');
|
|
|
|
- break;
|
|
|
|
-
|
|
|
|
- default:
|
|
|
|
- return "It's not possible to get there.";
|
|
|
|
- }
|
|
|
|
|
|
+ try {
|
|
|
|
+ $mid = $msg->get('message_id');
|
|
|
|
+ if (in_array($mid, $this->_rows)) {
|
|
|
|
+ unset($this->_rows[$mid]);
|
|
|
|
+ $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
|
|
|
|
+ if ($status != false && $queue != '')
|
|
|
|
+ Redis::set($queue, $mid, 'status', $status);
|
|
|
|
+ }
|
|
|
|
|
|
- return $data;
|
|
|
|
|
|
+ } finally {
|
|
|
|
+ if (count($this->_rows) == 0)
|
|
|
|
+ $this->_stop = true;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -620,13 +514,47 @@ class MessageForm extends BaseForm
|
|
*/
|
|
*/
|
|
public function purge()
|
|
public function purge()
|
|
{
|
|
{
|
|
- $data = [];
|
|
|
|
- $connect = Yii::$app->Amqp->AMQPConnection();
|
|
|
|
- $channel = new \AMQPChannel($connect);
|
|
|
|
- $queue = new \AMQPQueue($channel);
|
|
|
|
- $queue->setName($this->name);
|
|
|
|
- $queue->purge();
|
|
|
|
- return $data;
|
|
|
|
|
|
+ try {
|
|
|
|
+ $connect = $this->getConnect();
|
|
|
|
+ $channel = $connect->channel();
|
|
|
|
+ $delete = $channel->queue_purge($this->name);
|
|
|
|
+ if ($delete > 0)
|
|
|
|
+ Redis::purge($this->name);
|
|
|
|
+
|
|
|
|
+ $channel->close();
|
|
|
|
+ $connect->close();
|
|
|
|
+
|
|
|
|
+ return [
|
|
|
|
+ 'queue' => $this->name,
|
|
|
|
+ 'count' => $delete
|
|
|
|
+ ];
|
|
|
|
+
|
|
|
|
+ } catch (\Exception $e) {
|
|
|
|
+ throw new Exception(1001, $e->getMessage());
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private function _closureLogin($msg, $queue)
|
|
|
|
+ {
|
|
|
|
+ $data = ['mid' => '', 'body' => '', 'response' => '', 'error' => ''];
|
|
|
|
+ try {
|
|
|
|
+ $data['mid'] = $msg->get('message_id');
|
|
|
|
+ $data['body'] = $msg->body;
|
|
|
|
+ if ($data['mid']) {
|
|
|
|
+ $handle = new LoginHandle();
|
|
|
|
+ $data['response'] = $handle->login($msg->body, $queue, $data['mid']);
|
|
|
|
+
|
|
|
|
+ $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
|
|
|
|
+ Redis::set($queue, $data['mid'], 'status', AmqpConfig::STATUS_HAND_OK);
|
|
|
|
+ Redis::set($queue, $data['mid'], 'result', $data['response']);
|
|
|
|
+ Redis::expire($queue, $data['mid'], 'result', 3600);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ } catch (\Exception $e) {
|
|
|
|
+ $data['error'] = $e->getMessage();
|
|
|
|
+
|
|
|
|
+ } finally {
|
|
|
|
+ $this->_result[] = $data;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|