123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565 |
- <?php
- namespace backend\forms;
- use common\logic\LoginHandle;
- use PhpAmqpLib\Message\AMQPMessage;
- use components\service\AmqpConfig;
- use components\service\Redis;
- use components\Exception;
- use components\Curl;
- use common\helpers\KeyHelper;
- use yii\helpers\ArrayHelper;
- use Yii;
- class MessageForm extends BaseForm
- {
- /**
- * @var
- */
- public $queue;
- /**
- * @var
- */
- public $message;
- /**
- * @var
- */
- public $name;
- /**
- * @var integer 数量
- */
- public $count;
- /**
- * @var bool 是否重新排序
- */
- public $requeue;
- /**
- * @var string 编码
- */
- public $encoding;
- /**
- * @var string 消费id
- */
- public $mid;
- /**
- * @var array 消费ids
- */
- public $mids;
- /**
- * @var string 消费类型
- */
- public $type;
- /**
- * @var bool 自动应答
- */
- public $ack;
- /**
- * @var bool 强制删除
- */
- public $forced;
- /**
- * @var bool
- */
- private $_stop = false;
- /**
- * @var string
- */
- private $_mid;
- /**
- * @var string
- */
- private $_body;
- /**
- * @var AMQPMessage
- */
- private $_message;
- /**
- * @var array
- */
- private $_rows;
- /**
- * @var array
- */
- private $_result;
- public function rules()
- {
- return [
- [['name'], 'required', 'on' => ['message_list', 'purge']],
- [['name'], 'trim', 'on' => ['message_list', 'purge', 'consume', 'delete', 'ack']],
- /* 发送消息 */
- [['queue'], 'trim', 'on' => ['send', 'batch_send']],
- [['queue', 'message'], 'required', 'on' => ['send', 'batch_send']],
- ['message', 'validateArray', 'on' => ['send', 'batch_send']],
- /* 获取消息列表 */
- ['count', 'integer', 'min' => 1, 'max' => 1000, 'message' => 1205,
- 'tooSmall' => 1206, 'tooBig' => 1207, 'on' => ['message_list']],
- ['count', 'default', 'value' => 20, 'on' => ['message_list']],
- ['requeue', 'boolean', 'message' => 1209, 'on' => ['message_list']],
- ['requeue', 'default', 'value' => true, 'on' => ['message_list']],
- ['encoding', 'in', 'range' => ['auto', 'base64'], 'message' => 1210, 'on' => ['message_list']],
- ['encoding', 'default', 'value' => 'auto', 'on' => ['message_list']],
- /* 消费 */
- [['name', 'count'], 'required', 'on' => ['consume']],
- ['count', 'integer', 'min' => 1, 'max' => 65000, 'message' => 1205,
- 'tooSmall' => 1206, 'on' => ['consume']],
- ['type', 'default', 'value' => 'client', 'on' => 'consume'],
- ['type', 'in', 'range' => ['server', 'client'], 'message' => 1302, 'on' => 'consume'],
- /* delete & ack */
- [['mids', 'name'], 'required', 'on' => ['delete', 'ack']],
- ['mids', 'validateArray', 'on' => ['delete', 'ack']],
- ['name', 'string', 'on' => ['delete', 'ack']],
- ['forced', 'default', 'value' => false, 'on' => 'delete'],
- ['forced', 'boolean', 'on' => 'delete'],
- ];
- }
- public function validateArray($attribute)
- {
- if (!$this->$attribute || !is_array($this->$attribute))
- throw new Exception(1003, "{$attribute} 必须是数组");
- }
- /**
- * [发送消息]
- * @author: libingke
- * @return array
- * @throws Exception
- * @version 1.1
- */
- public function sendMessage()
- {
- try {
- $connect = $this->getConnect();
- $channel = $connect->channel();
- $this->_handleMessage($this->message);
- //预声明
- $channel->queue_declare($this->queue,
- false, true, false, false);
- $channel->basic_publish($this->_message, '', $this->queue);
- //获取返回结果
- list($q_name, $message_count, ) = $channel->queue_declare($this->queue,
- false, true, false, false);
- $data = [
- 'message_total' => $message_count,
- 'queue_name' => $q_name,
- 'message_add' => [
- 'mid' => $this->_mid,
- 'body' => $this->_body
- ]
- ];
- if (($get = $channel->basic_get($q_name)) !== null) {
- $data['basic_get'] = [
- 'mid' => $get->get('message_id'),
- 'body' => $get->body
- ];
- }
- $channel->close();
- $connect->close();
- Redis::set($q_name, $this->_mid, 'status', AmqpConfig::STATUS_SEND_OK);
- return $data;
- } catch (\Exception $e) {
- throw new Exception(1001, $e->getMessage());
- }
- }
- /**
- * [构造消息体]
- * @param $data
- */
- private function _handleMessage($data)
- {
- $this->_mid = KeyHelper::getUniqueId('message_send');
- $this->_body = call_user_func_array([$this, '_messageBody'], [$data]);
- $properties = [
- 'message_id' => $this->_mid,
- 'correlation_id'=> $this->_mid,
- 'consumer_tag' => $this->_mid
- ];
- $this->_message = new AMQPMessage($this->_body, $properties);
- }
- private function _messageBody($data)
- {
- return json_encode($data);
- }
- /**
- * [批量发送消息]
- * @author: libingke
- * @return array
- * @throws Exception
- */
- public function batchSendMessage()
- {
- try {
- $connect = $this->getConnect();
- $channel = $connect->channel();
- //预声明
- $channel->queue_declare($this->queue,
- false, true, false, false);
- foreach ($this->message as $k => $v) {
- $this->_handleMessage($v);
- $this->_rows[] = [
- 'mid' => $this->_mid,
- 'body' => $this->_body
- ];
- $channel->batch_basic_publish($this->_message, '', $this->queue);
- Redis::set($this->queue, $this->_mid, 'status', AmqpConfig::STATUS_SEND_OK);
- }
- $channel->publish_batch();
- //获取返回结果
- list($q_name, $message_count, ) = $channel->queue_declare($this->queue,
- false, true, false, false);
- $data = [
- 'message_total' => $message_count,
- 'queue_name' => $q_name,
- 'add_count' => count($this->_rows),
- 'add_rows' => $this->_rows
- ];
- if (($get = $channel->basic_get($q_name)) !== null) {
- $data['basic_get'] = [
- 'mid' => $get->get('message_id'),
- 'body' => $get->body
- ];
- }
- $channel->close();
- $connect->close();
- return $data;
- } catch (\Exception $e) {
- throw new Exception(1001, $e->getMessage());
- }
- }
- /**
- * [获取消息列表]
- * @author: libingke
- */
- public function getMessageList()
- {
- $authStr = Yii::$app->Amqp->user . ':' . Yii::$app->Amqp->pass;
- $vhost = urlencode(Yii::$app->Amqp->vhost);
- $url = Yii::$app->Amqp->host . ':' . Yii::$app->Amqp->api_port .
- "/api/queues/{$vhost}/" . $this->name . "/get";
- $postParams = [
- 'name' => $this->name,
- 'count' => $this->count,
- 'encoding' => $this->encoding,
- 'requeue' => $this->requeue,
- 'truncate' => "50000",
- 'vhost' => '/',
- ];
- $curl = new Curl();
- $curl->setOption(CURLOPT_USERPWD, $authStr);
- $curl->setRawPostData(json_encode($postParams));
- $result = json_decode($curl->post($url), true);
- if ($curl->responseCode != 200)
- throw new Exception(1002);
- if ($curl->errorText)
- throw new Exception(1002, $curl->errorText);
- if (isset($result['error']) && is_string($result['error']))
- throw new Exception(1002, $result['error']);
- ArrayHelper::multisort($result,'message_count',SORT_ASC);
- //print_r($result);exit();
- $rows = [];
- foreach ($result as $k => $v) {
- $rows[$k]['mid'] = $v['properties']['message_id'];
- $rows[$k]['body'] = $v['payload'];
- $rows[$k]['before'] = $v['message_count'];
- }
- unset($result);
- return ['count' => count($rows), 'rows' => $rows];
- }
- /**
- * consumeMessage
- * @author: libingke
- * @return array
- * @throws Exception
- */
- public function consumeMessage()
- {
- $q_name = $this->name;
- if ($this->type == 'server') {
- $function = '_closure' . ucfirst($q_name);
- if ( !method_exists($this, $function) )
- throw new Exception(1303);
- } else {
- $function = 'closureConsume';
- }
- $connect = $this->getConnect();
- $channel = $connect->channel();
- $channel->queue_declare($q_name,
- false, true, false, false);
- list(, $total, ) = $channel->queue_declare($q_name,
- false, true, false, false);
- if ($total == 0)
- throw new Exception(1300);
- try {
- $min = min($this->count, $total);
- $callback = function ($msg) use($q_name, $function) {call_user_func_array([$this, $function], [$msg, $q_name]);};
- $channel->basic_qos(0, $min, null);
- $channel->basic_consume($q_name,
- '', false, false, false, false, $callback);
- for ($i = 1; count($channel->callbacks) && $this->_stop !== true; $i++) {
- if ($i > $min)
- break;
- $channel->wait();
- }
- $channel->close();
- $connect->close();
- } catch (\Exception $e) {
- throw new Exception(1001, $e->getMessage());
- }
- return $this->_result;
- }
- /**
- * closureConsume for consumeMessage
- * @param $msg
- * @param $queue
- */
- protected function closureConsume($msg, $queue)
- {
- $data = ['mid' => '', 'body' => '', 'error' => ''];
- try {
- $data['mid'] = $msg->get('message_id');
- $data['body'] = $msg->body;
- Redis::set($queue, $data['mid'], 'status', AmqpConfig::STATUS_HAND);
- } catch (\Exception $e) {
- $data['error'] = $e->getMessage();
- } finally {
- $this->_result[] = $data;
- }
- }
- /**
- * ackMessage
- * @author: libingke
- * @return array
- * @throws Exception
- */
- public function ackMessage()
- {
- //帅选合法待应答消息id
- foreach ($this->mids as $mid)
- if ($mid && is_string($mid))
- $this->_rows[$mid] = $mid;
- if (!is_array($this->_rows) || count($this->_rows) == 0)
- throw new Exception(1301);
- $ack = $this->_rows;
- $q_name = $this->name;
- $connect = $this->getConnect();
- $channel = $connect->channel();
- $channel->queue_declare($q_name,
- false, true, false, false);
- list(, $total, ) = $channel->queue_declare($q_name,
- false, true, false, false);
- try {
- $callback = function ($msg) use($q_name) {
- call_user_func_array([$this, 'closureAck'], [$msg, $q_name, AmqpConfig::STATUS_HAND_OK, true]);
- };
- $channel->basic_qos(0, $total, null);
- $channel->basic_consume($q_name,
- '', false, false, false, false, $callback);
- for ($i = 1; count($channel->callbacks) && $this->_stop !== true; $i++) {
- if ($i > $total)
- break;
- $channel->wait();
- }
- $channel->close();
- $connect->close();
- } catch (\Exception $e) {
- throw new Exception(1101, $e->getMessage());
- }
- $ackCount = count($ack) - count($this->_rows);
- $data = ['queue' => $q_name, 'ack_count' => $ackCount];
- count($this->_rows) ? $data['ack_fail'] = array_keys($this->_rows) : null;
- return $data;
- }
- /**
- * deleteMessage
- * @author: libingke
- * @return array
- * @throws Exception
- */
- public function deleteMessage()
- {
- //帅选合法待应答消息id
- foreach ($this->mids as $mid)
- if ($mid && is_string($mid))
- $this->_rows[$mid] = $mid;
- if (!is_array($this->_rows) || count($this->_rows) == 0)
- throw new Exception(1301);
- $delete = $this->_rows;
- Redis::batchDel($this->name, $delete, 'status');
- $q_name = $this->name;
- $connect = $this->getConnect();
- $channel = $connect->channel();
- $channel->queue_declare($q_name,
- false, true, false, false);
- list(, $total, ) = $channel->queue_declare($q_name,
- false, true, false, false);
- try {
- $callback = function ($msg) use($q_name) {call_user_func_array([$this, 'closureAck'], [$msg]);};
- $channel->basic_qos(0, $total, null);
- $channel->basic_consume($q_name,
- '', false, false, false, false, $callback);
- for ($i = 1; count($channel->callbacks) && $this->_stop !== true; $i++) {
- if ($i > $total)
- break;
- $channel->wait();
- }
- $channel->close();
- $connect->close();
- } catch (\Exception $e) {
- throw new Exception(1101, $e->getMessage());
- }
- $deleteCount = count($delete) - count($this->_rows);
- $data = ['queue' => $q_name, 'delete_count' => $deleteCount];
- count($this->_rows) ? $data['delete_fail'] = array_keys($this->_rows) : null;
- return $data;
- }
- /**
- * closureAck for ackMessage && deleteMessage
- * @author: libingke
- * @param $msg
- * @param string $queue
- * @param bool $status
- * @param bool $check
- */
- protected function closureAck($msg, $queue = '', $status = false, $check = false)
- {
- try {
- $mid = $msg->get('message_id');
- if (in_array($mid, $this->_rows)) {
- if ($check == true && Redis::get($queue, $mid, 'status') != AmqpConfig::STATUS_HAND) {
- goto end;
- }
- unset($this->_rows[$mid]);
- $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
- if ($status != false && $queue != '')
- Redis::set($queue, $mid, 'status', $status);
- }
- end:
- } finally {
- if (count($this->_rows) == 0)
- $this->_stop = true;
- }
- }
- /**
- * [清空消息]
- * @author: libingke
- * @return array
- */
- public function purge()
- {
- try {
- $connect = $this->getConnect();
- $channel = $connect->channel();
- $delete = $channel->queue_purge($this->name);
- if ($delete > 0)
- Redis::purge($this->name);
- $channel->close();
- $connect->close();
- return [
- 'queue' => $this->name,
- 'count' => $delete
- ];
- } catch (\Exception $e) {
- throw new Exception(1001, $e->getMessage());
- }
- }
- private function _closureLogin($msg, $queue)
- {
- $data = ['mid' => '', 'body' => '', 'response' => '', 'error' => ''];
- try {
- $data['mid'] = $msg->get('message_id');
- $data['body'] = $msg->body;
- if ($data['mid']) {
- $handle = new LoginHandle();
- $data['response'] = $handle->login($msg->body, $queue, $data['mid']);
- $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
- Redis::set($queue, $data['mid'], 'status', AmqpConfig::STATUS_HAND_OK);
- Redis::set($queue, $data['mid'], 'result', $data['response']);
- Redis::expire($queue, $data['mid'], 'result', 3600);
- }
- } catch (\Exception $e) {
- $data['error'] = $e->getMessage();
- } finally {
- $this->_result[] = $data;
- }
- }
- }
|