|
@@ -2,52 +2,168 @@
|
|
|
|
|
|
namespace backend\forms;
|
|
|
|
|
|
+use components\service\AmqpConfig;
|
|
|
+use PhpAmqpLib\Exception\AMQPProtocolChannelException;
|
|
|
+use common\helpers\KeyHelper;
|
|
|
use common\logic\Amqp\Cache;
|
|
|
-use common\logic\Amqp\Message;
|
|
|
+use components\Curl;
|
|
|
use components\Exception;
|
|
|
+use PhpAmqpLib\Message\AMQPMessage;
|
|
|
+use yii\helpers\ArrayHelper;
|
|
|
+use Yii;
|
|
|
|
|
|
class MessageForm extends BaseForm
|
|
|
{
|
|
|
- public $email;
|
|
|
+
|
|
|
+ public $queue;
|
|
|
|
|
|
- public $password;
|
|
|
+ public $message;
|
|
|
|
|
|
+
|
|
|
+
|
|
|
+ public $name;
|
|
|
+
|
|
|
+
|
|
|
+ * @var integer 数量
|
|
|
+ */
|
|
|
+ public $count;
|
|
|
+
|
|
|
+
|
|
|
+ * @var bool 是否重新排序
|
|
|
+ */
|
|
|
+ public $requeue;
|
|
|
+
|
|
|
+
|
|
|
+ * @var string 编码
|
|
|
+ */
|
|
|
+ public $encoding;
|
|
|
+
|
|
|
+
|
|
|
+ * @var string 消费id
|
|
|
+ */
|
|
|
+ public $mid;
|
|
|
+
|
|
|
+
|
|
|
+ * @var string 消费类型
|
|
|
+ */
|
|
|
public $type;
|
|
|
|
|
|
- public $send_arr;
|
|
|
+
|
|
|
+ * @var bool 是否空消费
|
|
|
+ */
|
|
|
+ public $do_nothing = false;
|
|
|
+
|
|
|
+
|
|
|
+ * @var bool 自动应答
|
|
|
+ */
|
|
|
+ public $ack = false;
|
|
|
+
|
|
|
|
|
|
- private $_queue = '';
|
|
|
+
|
|
|
+ * @var bool
|
|
|
+ */
|
|
|
+ private $_stop = false;
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ * @var string
|
|
|
+ */
|
|
|
+ private $_mid;
|
|
|
+
|
|
|
+
|
|
|
+ * @var string
|
|
|
+ */
|
|
|
+ private $_body;
|
|
|
+
|
|
|
+
|
|
|
+ * @var AMQPMessage
|
|
|
+ */
|
|
|
+ private $_message;
|
|
|
+
|
|
|
+
|
|
|
+ * @var array
|
|
|
+ */
|
|
|
+ private $_rows;
|
|
|
|
|
|
|
|
|
- * 登录方式
|
|
|
+ * @var array
|
|
|
*/
|
|
|
- const TYPE_SMS = 'sms';
|
|
|
+ private $_result;
|
|
|
|
|
|
- const TYPE_EMAIL = 'email';
|
|
|
+ const TYPE_MID = 'mid';
|
|
|
+ const TYPE_COUNT = 'count';
|
|
|
+ const TYPE_MC = 'mid_count';
|
|
|
|
|
|
public function rules()
|
|
|
{
|
|
|
return [
|
|
|
-
|
|
|
-
|
|
|
- [['email', 'password', 'type'], 'trim', 'on' => 'login'],
|
|
|
- [['email', 'password'], 'required', 'on' => 'login'],
|
|
|
- ['email', 'email', 'on' => 'login'],
|
|
|
- ['type', 'default', 'value' => static::TYPE_EMAIL, 'on' => 'login'],
|
|
|
- ['type', 'in', 'range' => [static::TYPE_EMAIL, static::TYPE_SMS], 'on' => 'login'],
|
|
|
-
|
|
|
- [['send_arr'], 'required', 'on' => 'logins']
|
|
|
+
|
|
|
+ [['queue'], 'trim', 'on' => ['send', 'batch_send']],
|
|
|
+ [['queue', 'message'], 'required', 'on' => ['send', 'batch_send']],
|
|
|
+ ['message', 'validateMessage', 'on' => ['send', 'batch_send']],
|
|
|
+
|
|
|
+
|
|
|
+ [['name'], 'trim', 'on' => ['message_list', 'purge', 'consume']],
|
|
|
+ [['name'], 'required', 'on' => ['message_list', 'purge']],
|
|
|
+ ['count', 'integer', 'min' => 1, 'max' => 1000, 'message' => 2012,
|
|
|
+ 'tooSmall' => 2013, 'tooBig' => 2014, 'on' => ['message_list']],
|
|
|
+ ['count', 'default', 'value' => 20, 'on' => ['message_list']],
|
|
|
+ ['requeue', 'boolean', 'message' => 2010, 'on' => ['message_list']],
|
|
|
+ ['requeue', 'default', 'value' => true, 'on' => ['message_list']],
|
|
|
+ ['encoding', 'in', 'range' => ['auto', 'base64'], 'message' => 2011, 'on' => ['message_list']],
|
|
|
+ ['encoding', 'default', 'value' => 'auto', 'on' => ['message_list']],
|
|
|
+
|
|
|
+
|
|
|
+ [['type', 'name'], 'required', 'on' => ['consume']],
|
|
|
+ ['type', 'in', 'range' => [static::TYPE_MID, static::TYPE_COUNT, static::TYPE_MC], 'on' => 'consume'],
|
|
|
+ ['type', 'validateType', 'on' => 'consume'],
|
|
|
+ ['mid', 'string', 'on' => 'consume'],
|
|
|
+ ['count', 'integer', 'min' => 1, 'max' => 65000, 'message' => 2012,
|
|
|
+ 'tooSmall' => 2013, 'on' => ['consume']],
|
|
|
+ ['do_nothing', 'default', 'value' => false, 'on' => 'consume'],
|
|
|
+ ['do_nothing', 'boolean', 'on' => 'consume'],
|
|
|
+ ['ack', 'default', 'value' => false, 'on' => 'consume'],
|
|
|
+ ['ack', 'boolean', 'on' => 'consume'],
|
|
|
];
|
|
|
}
|
|
|
|
|
|
- public function setQueue($scenario)
|
|
|
+ public function validateMessage($attribute)
|
|
|
{
|
|
|
- $this->_queue = $scenario;
|
|
|
+ if (!$this->$attribute)
|
|
|
+ throw new Exception(2001);
|
|
|
+
|
|
|
+ if (!is_array($this->$attribute))
|
|
|
+ throw new Exception(2002, "{$attribute} 必须是数组");
|
|
|
+ }
|
|
|
+
|
|
|
+ public function validateType($attribute)
|
|
|
+ {
|
|
|
+ if ($this->$attribute == static::TYPE_MC) {
|
|
|
+ if ($this->mid == null)
|
|
|
+ throw new Exception(2016);
|
|
|
+ if ($this->count == null)
|
|
|
+ throw new Exception(2017);
|
|
|
+
|
|
|
+ } else {
|
|
|
+ if ($this->{$this->$attribute} == null)
|
|
|
+ throw new Exception(2016, $this->$attribute . " 不能为空");
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- public function getQueue()
|
|
|
+
|
|
|
+ * [连接AMQP]
|
|
|
+ * @author: libingke
|
|
|
+ * @return \PhpAmqpLib\Connection\AMQPStreamConnection
|
|
|
+ */
|
|
|
+ protected function getConnect()
|
|
|
{
|
|
|
- return $this->_queue;
|
|
|
+ return new \PhpAmqpLib\Connection\AMQPStreamConnection(
|
|
|
+ Yii::$app->Amqp->host,
|
|
|
+ Yii::$app->Amqp->port,
|
|
|
+ Yii::$app->Amqp->user,
|
|
|
+ Yii::$app->Amqp->pass,
|
|
|
+ Yii::$app->Amqp->vhost
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
|
|
@@ -55,29 +171,462 @@ class MessageForm extends BaseForm
|
|
|
* @author: libingke
|
|
|
* @return array
|
|
|
* @throws Exception
|
|
|
+ * @version 1.0
|
|
|
*/
|
|
|
public function sendMessage()
|
|
|
{
|
|
|
- $body = json_encode([
|
|
|
- 'email' => $this->email,
|
|
|
- 'password' => $this->password,
|
|
|
- 'type' => $this->type
|
|
|
- ]);
|
|
|
- $queue = $this->getQueue();
|
|
|
+ $body = call_user_func_array([$this, '_messageBody'], [$this->attributes]);
|
|
|
+ $mid = KeyHelper::getUniqueId('message_send');
|
|
|
+ $properties = [
|
|
|
+ 'content_type' => 'text/plain',
|
|
|
+ 'message_id' => $mid,
|
|
|
+ 'correlation_id' => $mid,
|
|
|
+ 'consumer_tag' => $mid
|
|
|
+ ];
|
|
|
|
|
|
$data = [];
|
|
|
+ $e_name = 'message.default';
|
|
|
+ $k_route = 'route.default';
|
|
|
+ $q_name = $this->queue;
|
|
|
+
|
|
|
+ $connect = Yii::$app->Amqp->AMQPConnection();
|
|
|
+ $channel = new \AMQPChannel($connect);
|
|
|
+ $exchange = new \AMQPExchange($channel);
|
|
|
+ $exchange->setName($e_name);
|
|
|
+ $exchange->setType(AMQP_EX_TYPE_DIRECT);
|
|
|
+ $exchange->setFlags(AMQP_DURABLE);
|
|
|
+ $exchange->declareExchange();
|
|
|
+
|
|
|
+ $queue = new \AMQPQueue($channel);
|
|
|
+ $queue->setName($q_name);
|
|
|
+ $queue->declareQueue();
|
|
|
+ $queue->bind($e_name, $k_route);
|
|
|
+ $r = $exchange->publish($body, $k_route, AMQP_NOPARAM, $properties);
|
|
|
+ if ($r == true) {
|
|
|
+ $data['message_total'] = $queue->declareQueue();
|
|
|
+ $data['queue_name'] = $this->queue;
|
|
|
+ $data['message_new'] = [
|
|
|
+ 'mid' => $mid,
|
|
|
+ 'body' => $body
|
|
|
+ ];
|
|
|
+
|
|
|
+ if ($data['message_total'] > 1) {
|
|
|
+ $data['message_first'] = [
|
|
|
+ 'mid' => $queue->get()->getMessageId(),
|
|
|
+ 'body' => $queue->get()->getBody()
|
|
|
+ ];
|
|
|
+ }
|
|
|
+
|
|
|
+ Cache::setData(
|
|
|
+ 'queue:mid:'.$mid,
|
|
|
+ Cache::STATUS_SEND_OK
|
|
|
+ );
|
|
|
+ } else {
|
|
|
+ throw new Exception(2103);
|
|
|
+ }
|
|
|
+
|
|
|
+ return $data;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ * [发送消息]
|
|
|
+ * @author: libingke
|
|
|
+ * @return array
|
|
|
+ * @throws Exception
|
|
|
+ * @version 1.1
|
|
|
+ */
|
|
|
+ public function sendMessageV1_1()
|
|
|
+ {
|
|
|
try {
|
|
|
- $message = new Message($queue);
|
|
|
- $corrId = $message->send($body, $queue);
|
|
|
+
|
|
|
+ $connect = $this->getConnect();
|
|
|
+ $channel = $connect->channel();
|
|
|
+ $this->_handleMessage($this->message);
|
|
|
+
|
|
|
+ $channel->queue_declare($this->queue,
|
|
|
+ false, true, false, false);
|
|
|
+ $channel->basic_publish($this->_message, '', $this->queue);
|
|
|
+
|
|
|
+ list($q_name, $message_count, ) = $channel->queue_declare($this->queue,
|
|
|
+ false, true, false, false);
|
|
|
+
|
|
|
+ $data = [
|
|
|
+ 'message_total' => $message_count,
|
|
|
+ 'queue_name' => $q_name,
|
|
|
+ 'message_add' => [
|
|
|
+ 'mid' => $this->_mid,
|
|
|
+ 'body' => $this->_body
|
|
|
+ ]
|
|
|
+ ];
|
|
|
+ if (($get = $channel->basic_get($q_name)) !== null) {
|
|
|
+ $data['basic_get'] = [
|
|
|
+ 'mid' => $get->get('message_id'),
|
|
|
+ 'body' => $get->body
|
|
|
+ ];
|
|
|
+ }
|
|
|
+ $channel->close();
|
|
|
+ $connect->close();
|
|
|
|
|
|
- Cache::setData($corrId, Cache::STATUS_SEND_OK);
|
|
|
+ $statusKey = KeyHelper::getMessageStatusKey($this->_mid, $q_name);
|
|
|
+ Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_HAND_OK);
|
|
|
|
|
|
- $data[] = $corrId;
|
|
|
+ return $data;
|
|
|
|
|
|
- } catch (\common\logic\Amqp\Exception $e) {
|
|
|
- throw new Exception($e->getCode(), $e->getMessage());
|
|
|
+ } catch (\Exception $e) {
|
|
|
+ throw new Exception(1000, $e->getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ * [构造消息体]
|
|
|
+ * @author: libingke
|
|
|
+ * @param $data
|
|
|
+ */
|
|
|
+ private function _handleMessage($data)
|
|
|
+ {
|
|
|
+ $this->_mid = KeyHelper::getUniqueId('message_send');
|
|
|
+ $this->_body = call_user_func_array([$this, '_messageBody'], [$data]);
|
|
|
+ $properties = [
|
|
|
+ 'message_id' => $this->_mid,
|
|
|
+ 'correlation_id'=> $this->_mid,
|
|
|
+ 'consumer_tag' => $this->_mid
|
|
|
+ ];
|
|
|
+ $this->_message = new AMQPMessage($this->_body, $properties);
|
|
|
+ }
|
|
|
+
|
|
|
+ private function _messageBody($data)
|
|
|
+ {
|
|
|
+ return json_encode($data);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ * [批量发送消息]
|
|
|
+ * @author: libingke
|
|
|
+ * @return array
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+ public function batchSendMessage()
|
|
|
+ {
|
|
|
+ try {
|
|
|
+
|
|
|
+ $connect = $this->getConnect();
|
|
|
+ $channel = $connect->channel();
|
|
|
+
|
|
|
+ $channel->queue_declare($this->queue,
|
|
|
+ false, true, false, false);
|
|
|
+
|
|
|
+ foreach ($this->message as $k => $v) {
|
|
|
+ $this->_handleMessage($v);
|
|
|
+ $this->_rows[] = [
|
|
|
+ 'mid' => $this->_mid,
|
|
|
+ 'body' => $this->_body
|
|
|
+ ];
|
|
|
+ $channel->batch_basic_publish($this->_message, '', $this->queue);
|
|
|
+
|
|
|
+ $statusKey = KeyHelper::getMessageStatusKey($this->_mid, $this->queue);
|
|
|
+ Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_HAND_OK);
|
|
|
+ }
|
|
|
+ $channel->publish_batch();
|
|
|
+
|
|
|
+
|
|
|
+ list($q_name, $message_count, ) = $channel->queue_declare($this->queue,
|
|
|
+ false, true, false, false);
|
|
|
+
|
|
|
+ $data = [
|
|
|
+ 'message_total' => $message_count,
|
|
|
+ 'queue_name' => $q_name,
|
|
|
+ 'add_count' => count($this->_rows),
|
|
|
+ 'add_rows' => $this->_rows
|
|
|
+ ];
|
|
|
+ if (($get = $channel->basic_get($q_name)) !== null) {
|
|
|
+ $data['basic_get'] = [
|
|
|
+ 'mid' => $get->get('message_id'),
|
|
|
+ 'body' => $get->body
|
|
|
+ ];
|
|
|
+ }
|
|
|
+ $channel->close();
|
|
|
+ $connect->close();
|
|
|
+
|
|
|
+ return $data;
|
|
|
+
|
|
|
+ } catch (\Exception $e) {
|
|
|
+ throw new Exception(1000, $e->getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ * [获取消息列表]
|
|
|
+ * @author: libingke
|
|
|
+ */
|
|
|
+ public function getMessageList()
|
|
|
+ {
|
|
|
+
|
|
|
+ $badCode = 2000;
|
|
|
+
|
|
|
+ $authStr = Yii::$app->Amqp->getConfig('user') . ':' . Yii::$app->Amqp->getConfig('pass');
|
|
|
+
|
|
|
+ $vhost = urlencode(Yii::$app->Amqp->getConfig('vhost'));
|
|
|
+ $url = Yii::$app->Amqp->getConfig('host') . ':' . Yii::$app->Amqp->getConfig('api_port') .
|
|
|
+ "/api/queues/{$vhost}/" . $this->name . "/get";
|
|
|
+ $postParams = [
|
|
|
+ 'name' => $this->name,
|
|
|
+ 'count' => $this->count,
|
|
|
+ 'encoding' => $this->encoding,
|
|
|
+ 'requeue' => $this->requeue,
|
|
|
+ 'truncate' => "50000",
|
|
|
+ 'vhost' => '/',
|
|
|
+ ];
|
|
|
+
|
|
|
+ $curl = new Curl();
|
|
|
+ $curl->setOption(CURLOPT_USERPWD, $authStr);
|
|
|
+ $curl->setRawPostData(json_encode($postParams));
|
|
|
+ $result = json_decode($curl->post($url), true);
|
|
|
+ if ($curl->responseCode != 200)
|
|
|
+ throw new Exception($badCode);
|
|
|
+ if ($curl->errorText)
|
|
|
+ throw new Exception($badCode, $curl->errorText);
|
|
|
+ if (isset($result['error']) && is_string($result['error']))
|
|
|
+ throw new Exception($badCode, $result['error']);
|
|
|
+
|
|
|
+ ArrayHelper::multisort($result,'message_count',SORT_ASC);
|
|
|
+
|
|
|
+ $rows = [];
|
|
|
+ foreach ($result as $k => $v) {
|
|
|
+ $rows[$k]['before_count'] = $v['message_count'];
|
|
|
+ $rows[$k]['payload'] = $v['payload'];
|
|
|
+ $rows[$k]['payload_encoding']= $v['payload_encoding'];
|
|
|
+ $rows[$k]['message_id'] = $v['properties']['message_id'];
|
|
|
+ }
|
|
|
+ unset($result);
|
|
|
+
|
|
|
+ return ['count' => count($rows), 'rows' => $rows];
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ * [空处理]
|
|
|
+ */
|
|
|
+ private function _consumeEmpty($mid, $body, $ack, $error, $stop = false)
|
|
|
+ {
|
|
|
+ $this->_stop = $stop;
|
|
|
+ $this->_result[] = [
|
|
|
+ 'mid' => $mid,
|
|
|
+ 'result' => 'success: do nothing!',
|
|
|
+ 'ack' => $ack,
|
|
|
+ 'error' => $error
|
|
|
+ ];
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ * [处理逻辑1]
|
|
|
+ * @author: libingke
|
|
|
+ */
|
|
|
+ private function _consumeLogin($mid, $body, $ack, $error, $stop = false)
|
|
|
+ {
|
|
|
+ $this->_stop = $stop;
|
|
|
+ $this->_result[] = [
|
|
|
+ 'mid' => $mid,
|
|
|
+ 'result' => '已处理',
|
|
|
+ 'ack' => $ack,
|
|
|
+ 'error' => $error
|
|
|
+ ];
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ * [消费某条消息]
|
|
|
+ * @author: libingke
|
|
|
+ * @param $mid
|
|
|
+ * @return array
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+ protected function consumeByMid($mid)
|
|
|
+ {
|
|
|
+ $q_name = $this->name;
|
|
|
+
|
|
|
+
|
|
|
+ if ($this->do_nothing != true) {
|
|
|
+ $function = '_consume' . ucfirst($q_name);
|
|
|
+ if (!method_exists($this, $function) || !is_callable(array($this, $function)))
|
|
|
+ throw new Exception(2015);
|
|
|
+ } else {
|
|
|
+
|
|
|
+ $function = '_consumeEmpty';
|
|
|
+ }
|
|
|
+
|
|
|
+ $connect = $this->getConnect();
|
|
|
+ $channel = $connect->channel();
|
|
|
+ $channel->queue_declare($q_name,
|
|
|
+ false, true, false, false);
|
|
|
+ list(, $count, ) = $channel->queue_declare($q_name,
|
|
|
+ false, true, false, false);
|
|
|
+ $callback = function ($msg) use($function, $mid, $q_name) {
|
|
|
+ try {
|
|
|
+ $message_id = $msg->get('message_id');
|
|
|
+ $ack = $this->ack == true ? true : false;
|
|
|
+ if ($mid == $message_id) {
|
|
|
+ call_user_func_array(
|
|
|
+ [$this, $function],
|
|
|
+ [$message_id, $msg->body, $ack, '', true]
|
|
|
+ );
|
|
|
+
|
|
|
+ $statusKey = KeyHelper::getMessageStatusKey($message_id, $q_name);
|
|
|
+ if ($ack === true) {
|
|
|
+ $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
|
|
|
+ Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_HAND_OK);
|
|
|
+ } else {
|
|
|
+ Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_NO_ACK);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (\Exception $e) {
|
|
|
+
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ try {
|
|
|
+ $channel->basic_qos(0, $count, null);
|
|
|
+ $channel->basic_consume($q_name,
|
|
|
+ '', false, false, false, false, $callback);
|
|
|
+
|
|
|
+ $i = 0;
|
|
|
+ while (count($channel->callbacks)) {
|
|
|
+ $i ++;
|
|
|
+ if ($i > $count || $this->_stop == true)
|
|
|
+ break;
|
|
|
+
|
|
|
+ $channel->wait();
|
|
|
+ }
|
|
|
+
|
|
|
+ } catch (\Exception $e) {
|
|
|
+ throw new Exception(2104, $e->getMessage());
|
|
|
+ }
|
|
|
+
|
|
|
+ $channel->close();
|
|
|
+ $connect->close();
|
|
|
+
|
|
|
+ if ($this->_result == null)
|
|
|
+ throw new Exception(2102);
|
|
|
+
|
|
|
+ return $this->_result;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ * [根据数量消费]
|
|
|
+ * @author: libingke
|
|
|
+ * @param string | int $startPos 开始位置
|
|
|
+ * @param int $count 数量
|
|
|
+ */
|
|
|
+ protected function consumeByCount($count)
|
|
|
+ {
|
|
|
+ $q_name = $this->name;
|
|
|
+
|
|
|
+
|
|
|
+ if ($this->do_nothing != true) {
|
|
|
+ $function = '_consume' . ucfirst($q_name);
|
|
|
+ if (!method_exists($this, $function) || !is_callable(array($this, $function)))
|
|
|
+ throw new Exception(2015);
|
|
|
+ } else {
|
|
|
+
|
|
|
+ $function = '_consumeEmpty';
|
|
|
+ }
|
|
|
+
|
|
|
+ $connect = $this->getConnect();
|
|
|
+ $channel = $connect->channel();
|
|
|
+ $channel->queue_declare($q_name,
|
|
|
+ false, true, false, false);
|
|
|
+ list(, $total, ) = $channel->queue_declare($q_name,
|
|
|
+ false, true, false, false);
|
|
|
+
|
|
|
+ $callback = function ($msg) use($function, $q_name) {
|
|
|
+ $ack = $this->ack == true ? true : false;
|
|
|
+ try {
|
|
|
+ $message_id = $msg->get('message_id');
|
|
|
+ call_user_func_array(
|
|
|
+ [$this, $function],
|
|
|
+ [$message_id, $msg->body, $ack, '', false]
|
|
|
+ );
|
|
|
+
|
|
|
+
|
|
|
+ $statusKey = KeyHelper::getMessageStatusKey($message_id, $q_name);
|
|
|
+ if ($ack === true) {
|
|
|
+ $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
|
|
|
+ Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_HAND_OK);
|
|
|
+ } else {
|
|
|
+ Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_NO_ACK);
|
|
|
+ }
|
|
|
+
|
|
|
+ } catch (\Exception $e) {
|
|
|
+
|
|
|
+ call_user_func_array(
|
|
|
+ [$this, $function],
|
|
|
+ ['', $msg->body, $ack, $e->getMessage(), false]
|
|
|
+ );
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ try {
|
|
|
+ $min = min($count, $total);
|
|
|
+ $channel->basic_qos(0, $min, null);
|
|
|
+ $channel->basic_consume($q_name,
|
|
|
+ '', false, false, false, false, $callback);
|
|
|
+
|
|
|
+ $i = 0;
|
|
|
+ while (count($channel->callbacks)) {
|
|
|
+ $i ++;
|
|
|
+ if ($i > $min || $this->_stop == true)
|
|
|
+ break;
|
|
|
+
|
|
|
+ $channel->wait();
|
|
|
+ }
|
|
|
+
|
|
|
+ } catch (\Exception $e) {
|
|
|
+ throw new Exception(2104, $e->getMessage());
|
|
|
+ }
|
|
|
+
|
|
|
+ $channel->close();
|
|
|
+ $connect->close();
|
|
|
+ return $this->_result;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ * [消费消息]
|
|
|
+ * @author: libingke
|
|
|
+ */
|
|
|
+ public function consumeMessage()
|
|
|
+ {
|
|
|
+ switch ($this->type)
|
|
|
+ {
|
|
|
+ case static::TYPE_MID:
|
|
|
+ $data = $this->consumeByMid($this->mid);
|
|
|
+ break;
|
|
|
+
|
|
|
+ case static::TYPE_COUNT:
|
|
|
+ $data = $this->consumeByCount($this->count);
|
|
|
+ break;
|
|
|
+
|
|
|
+ case static::TYPE_MC:
|
|
|
+ throw new Exception(1000, '未开发');
|
|
|
+ break;
|
|
|
+
|
|
|
+ default:
|
|
|
+ return "It's not possible to get there.";
|
|
|
}
|
|
|
|
|
|
return $data;
|
|
|
}
|
|
|
+
|
|
|
+
|
|
|
+ * [清空消息]
|
|
|
+ * @author: libingke
|
|
|
+ * @return array
|
|
|
+ */
|
|
|
+ public function purge()
|
|
|
+ {
|
|
|
+ $data = [];
|
|
|
+ $connect = Yii::$app->Amqp->AMQPConnection();
|
|
|
+ $channel = new \AMQPChannel($connect);
|
|
|
+ $queue = new \AMQPQueue($channel);
|
|
|
+ $queue->setName($this->name);
|
|
|
+ $queue->purge();
|
|
|
+ return $data;
|
|
|
+ }
|
|
|
+
|
|
|
}
|