123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560 |
- <?php
- namespace backend\forms;
- use common\logic\LoginHandle;
- use PhpAmqpLib\Message\AMQPMessage;
- use components\service\AmqpConfig;
- use components\service\Redis;
- use components\Exception;
- use components\Curl;
- use common\helpers\KeyHelper;
- use yii\helpers\ArrayHelper;
- use Yii;
- class MessageForm extends BaseForm
- {
-
- public $queue;
-
- public $message;
-
- public $name;
-
- public $count;
-
- public $requeue;
-
- public $encoding;
-
- public $mid;
-
- public $mids;
-
- public $type;
-
- public $ack;
-
- public $forced;
-
- private $_stop = false;
-
- private $_mid;
-
- private $_body;
-
- private $_message;
-
- private $_rows;
-
- private $_result;
- public function rules()
- {
- return [
- [['name'], 'required', 'on' => ['message_list', 'purge']],
- [['name'], 'trim', 'on' => ['message_list', 'purge', 'consume', 'delete', 'ack']],
-
- [['queue'], 'trim', 'on' => ['send', 'batch_send']],
- [['queue', 'message'], 'required', 'on' => ['send', 'batch_send']],
- ['message', 'validateArray', 'on' => ['send', 'batch_send']],
-
- ['count', 'integer', 'min' => 1, 'max' => 1000, 'message' => 1205,
- 'tooSmall' => 1206, 'tooBig' => 1207, 'on' => ['message_list']],
- ['count', 'default', 'value' => 20, 'on' => ['message_list']],
- ['requeue', 'boolean', 'message' => 1209, 'on' => ['message_list']],
- ['requeue', 'default', 'value' => true, 'on' => ['message_list']],
- ['encoding', 'in', 'range' => ['auto', 'base64'], 'message' => 1210, 'on' => ['message_list']],
- ['encoding', 'default', 'value' => 'auto', 'on' => ['message_list']],
-
- [['name', 'count'], 'required', 'on' => ['consume']],
- ['count', 'integer', 'min' => 1, 'max' => 65000, 'message' => 1205,
- 'tooSmall' => 1206, 'on' => ['consume']],
- ['type', 'default', 'value' => 'client', 'on' => 'consume'],
- ['type', 'in', 'range' => ['server', 'client'], 'message' => 1302, 'on' => 'consume'],
-
- [['mids', 'name'], 'required', 'on' => ['delete', 'ack']],
- ['mids', 'validateArray', 'on' => ['delete', 'ack']],
- ['name', 'string', 'on' => ['delete', 'ack']],
- ['forced', 'default', 'value' => false, 'on' => 'delete'],
- ['forced', 'boolean', 'on' => 'delete'],
- ];
- }
- public function validateArray($attribute)
- {
- if (!$this->$attribute || !is_array($this->$attribute))
- throw new Exception(1003, "{$attribute} 必须是数组");
- }
-
- public function sendMessage()
- {
- try {
- $connect = $this->getConnect();
- $channel = $connect->channel();
- $this->_handleMessage($this->message);
-
- $channel->queue_declare($this->queue,
- false, true, false, false);
- $channel->basic_publish($this->_message, '', $this->queue);
-
- list($q_name, $message_count, ) = $channel->queue_declare($this->queue,
- false, true, false, false);
- $data = [
- 'message_total' => $message_count,
- 'queue_name' => $q_name,
- 'message_add' => [
- 'mid' => $this->_mid,
- 'body' => $this->_body
- ]
- ];
- if (($get = $channel->basic_get($q_name)) !== null) {
- $data['basic_get'] = [
- 'mid' => $get->get('message_id'),
- 'body' => $get->body
- ];
- }
- $channel->close();
- $connect->close();
- Redis::set($q_name, $this->_mid, 'status', AmqpConfig::STATUS_SEND_OK);
- return $data;
- } catch (\Exception $e) {
- throw new Exception(1001, $e->getMessage());
- }
- }
-
- private function _handleMessage($data)
- {
- $this->_mid = KeyHelper::getUniqueId('message_send');
- $this->_body = call_user_func_array([$this, '_messageBody'], [$data]);
- $properties = [
- 'message_id' => $this->_mid,
- 'correlation_id'=> $this->_mid,
- 'consumer_tag' => $this->_mid
- ];
- $this->_message = new AMQPMessage($this->_body, $properties);
- }
- private function _messageBody($data)
- {
- return json_encode($data);
- }
-
- public function batchSendMessage()
- {
- try {
- $connect = $this->getConnect();
- $channel = $connect->channel();
-
- $channel->queue_declare($this->queue,
- false, true, false, false);
- foreach ($this->message as $k => $v) {
- $this->_handleMessage($v);
- $this->_rows[] = [
- 'mid' => $this->_mid,
- 'body' => $this->_body
- ];
- $channel->batch_basic_publish($this->_message, '', $this->queue);
- Redis::set($this->queue, $this->_mid, 'status', AmqpConfig::STATUS_SEND_OK);
- }
- $channel->publish_batch();
-
- list($q_name, $message_count, ) = $channel->queue_declare($this->queue,
- false, true, false, false);
- $data = [
- 'message_total' => $message_count,
- 'queue_name' => $q_name,
- 'add_count' => count($this->_rows),
- 'add_rows' => $this->_rows
- ];
- if (($get = $channel->basic_get($q_name)) !== null) {
- $data['basic_get'] = [
- 'mid' => $get->get('message_id'),
- 'body' => $get->body
- ];
- }
- $channel->close();
- $connect->close();
- return $data;
- } catch (\Exception $e) {
- throw new Exception(1001, $e->getMessage());
- }
- }
-
- public function getMessageList()
- {
- $authStr = Yii::$app->Amqp->user . ':' . Yii::$app->Amqp->pass;
- $vhost = urlencode(Yii::$app->Amqp->vhost);
- $url = Yii::$app->Amqp->host . ':' . Yii::$app->Amqp->api_port .
- "/api/queues/{$vhost}/" . $this->name . "/get";
- $postParams = [
- 'name' => $this->name,
- 'count' => $this->count,
- 'encoding' => $this->encoding,
- 'requeue' => $this->requeue,
- 'truncate' => "50000",
- 'vhost' => '/',
- ];
- $curl = new Curl();
- $curl->setOption(CURLOPT_USERPWD, $authStr);
- $curl->setRawPostData(json_encode($postParams));
- $result = json_decode($curl->post($url), true);
- if ($curl->responseCode != 200)
- throw new Exception(1002);
- if ($curl->errorText)
- throw new Exception(1002, $curl->errorText);
- if (isset($result['error']) && is_string($result['error']))
- throw new Exception(1002, $result['error']);
- ArrayHelper::multisort($result,'message_count',SORT_ASC);
-
- $rows = [];
- foreach ($result as $k => $v) {
- $rows[$k]['mid'] = $v['properties']['message_id'];
- $rows[$k]['body'] = $v['payload'];
- $rows[$k]['before'] = $v['message_count'];
- }
- unset($result);
- return ['count' => count($rows), 'rows' => $rows];
- }
-
- public function consumeMessage()
- {
- $q_name = $this->name;
- if ($this->type == 'server') {
- $function = '_closure' . ucfirst($q_name);
- if ( !method_exists($this, $function) )
- throw new Exception(1303);
- } else {
- $function = 'closureConsume';
- }
- $connect = $this->getConnect();
- $channel = $connect->channel();
- $channel->queue_declare($q_name,
- false, true, false, false);
- list(, $total, ) = $channel->queue_declare($q_name,
- false, true, false, false);
- if ($total == 0)
- throw new Exception(1300);
- try {
- $min = min($this->count, $total);
- $callback = function ($msg) use($q_name, $function) {call_user_func_array([$this, $function], [$msg, $q_name]);};
- $channel->basic_qos(0, $min, null);
- $channel->basic_consume($q_name,
- '', false, false, false, false, $callback);
- for ($i = 1; count($channel->callbacks) && $this->_stop !== true; $i++) {
- if ($i > $min)
- break;
- $channel->wait();
- }
- $channel->close();
- $connect->close();
- } catch (\Exception $e) {
- throw new Exception(1001, $e->getMessage());
- }
- return $this->_result;
- }
-
- protected function closureConsume($msg, $queue)
- {
- $data = ['mid' => '', 'body' => '', 'error' => ''];
- try {
- $data['mid'] = $msg->get('message_id');
- $data['body'] = $msg->body;
- Redis::set($queue, $data['mid'], 'status', AmqpConfig::STATUS_HAND);
- } catch (\Exception $e) {
- $data['error'] = $e->getMessage();
- } finally {
- $this->_result[] = $data;
- }
- }
-
- public function ackMessage()
- {
-
- foreach ($this->mids as $mid)
- if ($mid && is_string($mid))
- $this->_rows[$mid] = $mid;
- if (!is_array($this->_rows) || count($this->_rows) == 0)
- throw new Exception(1301);
- $ack = $this->_rows;
- $q_name = $this->name;
- $connect = $this->getConnect();
- $channel = $connect->channel();
- $channel->queue_declare($q_name,
- false, true, false, false);
- list(, $total, ) = $channel->queue_declare($q_name,
- false, true, false, false);
- try {
- $callback = function ($msg) use($q_name) {
- call_user_func_array([$this, 'closureAck'], [$msg, $q_name, AmqpConfig::STATUS_HAND_OK]);
- };
- $channel->basic_qos(0, $total, null);
- $channel->basic_consume($q_name,
- '', false, false, false, false, $callback);
- for ($i = 1; count($channel->callbacks) && $this->_stop !== true; $i++) {
- if ($i > $total)
- break;
- $channel->wait();
- }
- $channel->close();
- $connect->close();
- } catch (\Exception $e) {
- throw new Exception(1101, $e->getMessage());
- }
- $ackCount = count($ack) - count($this->_rows);
- $data = ['queue' => $q_name, 'ack_count' => $ackCount];
- count($this->_rows) ? $data['ack_fail'] = array_keys($this->_rows) : null;
- return $data;
- }
-
- public function deleteMessage()
- {
-
- foreach ($this->mids as $mid)
- if ($mid && is_string($mid))
- $this->_rows[$mid] = $mid;
- if (!is_array($this->_rows) || count($this->_rows) == 0)
- throw new Exception(1301);
- $delete = $this->_rows;
- Redis::batchDel($this->name, $delete, 'status');
- $q_name = $this->name;
- $connect = $this->getConnect();
- $channel = $connect->channel();
- $channel->queue_declare($q_name,
- false, true, false, false);
- list(, $total, ) = $channel->queue_declare($q_name,
- false, true, false, false);
- try {
- $callback = function ($msg) use($q_name) {call_user_func_array([$this, 'closureAck'], [$msg]);};
- $channel->basic_qos(0, $total, null);
- $channel->basic_consume($q_name,
- '', false, false, false, false, $callback);
- for ($i = 1; count($channel->callbacks) && $this->_stop !== true; $i++) {
- if ($i > $total)
- break;
- $channel->wait();
- }
- $channel->close();
- $connect->close();
- } catch (\Exception $e) {
- throw new Exception(1101, $e->getMessage());
- }
- $deleteCount = count($delete) - count($this->_rows);
- $data = ['queue' => $q_name, 'delete_count' => $deleteCount];
- count($this->_rows) ? $data['delete_fail'] = array_keys($this->_rows) : null;
- return $data;
- }
-
- protected function closureAck($msg, $queue = '', $status = false)
- {
- try {
- $mid = $msg->get('message_id');
- if (in_array($mid, $this->_rows)) {
- unset($this->_rows[$mid]);
- $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
- if ($status != false && $queue != '')
- Redis::set($queue, $mid, 'status', $status);
- }
- } finally {
- if (count($this->_rows) == 0)
- $this->_stop = true;
- }
- }
-
- public function purge()
- {
- try {
- $connect = $this->getConnect();
- $channel = $connect->channel();
- $delete = $channel->queue_purge($this->name);
- if ($delete > 0)
- Redis::purge($this->name);
- $channel->close();
- $connect->close();
- return [
- 'queue' => $this->name,
- 'count' => $delete
- ];
- } catch (\Exception $e) {
- throw new Exception(1001, $e->getMessage());
- }
- }
- private function _closureLogin($msg, $queue)
- {
- $data = ['mid' => '', 'body' => '', 'response' => '', 'error' => ''];
- try {
- $data['mid'] = $msg->get('message_id');
- $data['body'] = $msg->body;
- if ($data['mid']) {
- $handle = new LoginHandle();
- $data['response'] = $handle->login($msg->body, $queue, $data['mid']);
- $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
- Redis::set($queue, $data['mid'], 'status', AmqpConfig::STATUS_HAND_OK);
- Redis::set($queue, $data['mid'], 'result', $data['response']);
- Redis::expire($queue, $data['mid'], 'result', 3600);
- }
- } catch (\Exception $e) {
- $data['error'] = $e->getMessage();
- } finally {
- $this->_result[] = $data;
- }
- }
- }
|