123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632 |
- <?php
- namespace backend\forms;
- use components\service\AmqpConfig;
- use PhpAmqpLib\Exception\AMQPProtocolChannelException;
- use common\helpers\KeyHelper;
- use common\logic\Amqp\Cache;
- use components\Curl;
- use components\Exception;
- use PhpAmqpLib\Message\AMQPMessage;
- use yii\helpers\ArrayHelper;
- use Yii;
- class MessageForm extends BaseForm
- {
- /* send */
- public $queue;
- public $message;
- /* message_list */
- public $name;
- /**
- * @var integer 数量
- */
- public $count;
- /**
- * @var bool 是否重新排序
- */
- public $requeue;
- /**
- * @var string 编码
- */
- public $encoding;
- /**
- * @var string 消费id
- */
- public $mid;
- /**
- * @var string 消费类型
- */
- public $type;
- /**
- * @var bool 是否空消费
- */
- public $do_nothing = false;
- /**
- * @var bool 自动应答
- */
- public $ack = false;
- /**
- * @var bool
- */
- private $_stop = false;
- /**
- * @var string
- */
- private $_mid;
- /**
- * @var string
- */
- private $_body;
- /**
- * @var AMQPMessage
- */
- private $_message;
- /**
- * @var array
- */
- private $_rows;
- /**
- * @var array
- */
- private $_result;
- const TYPE_MID = 'mid';//消费某条
- const TYPE_COUNT = 'count';//(从第一条开始) 消费条数
- const TYPE_MC = 'mid_count';//(从某条开始) 消费条数
- public function rules()
- {
- return [
- /* 发送消息 */
- [['queue'], 'trim', 'on' => ['send', 'batch_send']],
- [['queue', 'message'], 'required', 'on' => ['send', 'batch_send']],
- ['message', 'validateMessage', 'on' => ['send', 'batch_send']],
- /* 获取消息列表 */
- [['name'], 'trim', 'on' => ['message_list', 'purge', 'consume']],
- [['name'], 'required', 'on' => ['message_list', 'purge']],
- ['count', 'integer', 'min' => 1, 'max' => 1000, 'message' => 2012,
- 'tooSmall' => 2013, 'tooBig' => 2014, 'on' => ['message_list']],
- ['count', 'default', 'value' => 20, 'on' => ['message_list']],
- ['requeue', 'boolean', 'message' => 2010, 'on' => ['message_list']],
- ['requeue', 'default', 'value' => true, 'on' => ['message_list']],
- ['encoding', 'in', 'range' => ['auto', 'base64'], 'message' => 2011, 'on' => ['message_list']],
- ['encoding', 'default', 'value' => 'auto', 'on' => ['message_list']],
- /* 消费 */
- [['type', 'name'], 'required', 'on' => ['consume']],
- ['type', 'in', 'range' => [static::TYPE_MID, static::TYPE_COUNT, static::TYPE_MC], 'on' => 'consume'],
- ['type', 'validateType', 'on' => 'consume'],
- ['mid', 'string', 'on' => 'consume'],
- ['count', 'integer', 'min' => 1, 'max' => 65000, 'message' => 2012,
- 'tooSmall' => 2013, 'on' => ['consume']],
- ['do_nothing', 'default', 'value' => false, 'on' => 'consume'],
- ['do_nothing', 'boolean', 'on' => 'consume'],
- ['ack', 'default', 'value' => false, 'on' => 'consume'],
- ['ack', 'boolean', 'on' => 'consume'],
- ];
- }
- public function validateMessage($attribute)
- {
- if (!$this->$attribute)
- throw new Exception(2001);
- if (!is_array($this->$attribute))
- throw new Exception(2002, "{$attribute} 必须是数组");
- }
- public function validateType($attribute)
- {
- if ($this->$attribute == static::TYPE_MC) {
- if ($this->mid == null)
- throw new Exception(2016);
- if ($this->count == null)
- throw new Exception(2017);
- } else {
- if ($this->{$this->$attribute} == null)
- throw new Exception(2016, $this->$attribute . " 不能为空");
- }
- }
- /**
- * [连接AMQP]
- * @author: libingke
- * @return \PhpAmqpLib\Connection\AMQPStreamConnection
- */
- protected function getConnect()
- {
- return new \PhpAmqpLib\Connection\AMQPStreamConnection(
- Yii::$app->Amqp->host,
- Yii::$app->Amqp->port,
- Yii::$app->Amqp->user,
- Yii::$app->Amqp->pass,
- Yii::$app->Amqp->vhost
- );
- }
- /**
- * [发送消息]
- * @author: libingke
- * @return array
- * @throws Exception
- * @version 1.0
- */
- public function sendMessage()
- {
- $body = call_user_func_array([$this, '_messageBody'], [$this->attributes]);
- $mid = KeyHelper::getUniqueId('message_send');
- $properties = [
- 'content_type' => 'text/plain',
- 'message_id' => $mid,
- 'correlation_id' => $mid,
- 'consumer_tag' => $mid
- ];
- $data = [];
- $e_name = 'message.default';
- $k_route = 'route.default';
- $q_name = $this->queue;
- $connect = Yii::$app->Amqp->AMQPConnection();
- $channel = new \AMQPChannel($connect);
- $exchange = new \AMQPExchange($channel);
- $exchange->setName($e_name);
- $exchange->setType(AMQP_EX_TYPE_DIRECT);
- $exchange->setFlags(AMQP_DURABLE);//持久化
- $exchange->declareExchange();//声明交换机
- $queue = new \AMQPQueue($channel);
- $queue->setName($q_name);
- $queue->declareQueue(); //声明队列
- $queue->bind($e_name, $k_route);
- $r = $exchange->publish($body, $k_route, AMQP_NOPARAM, $properties);
- if ($r == true) {
- $data['message_total'] = $queue->declareQueue();
- $data['queue_name'] = $this->queue;
- $data['message_new'] = [
- 'mid' => $mid,
- 'body' => $body
- ];
- if ($data['message_total'] > 1) {
- $data['message_first'] = [
- 'mid' => $queue->get()->getMessageId(),
- 'body' => $queue->get()->getBody()
- ];
- }
- Cache::setData(
- 'queue:mid:'.$mid,
- Cache::STATUS_SEND_OK
- );
- } else {
- throw new Exception(2103);
- }
- return $data;
- }
- /**
- * [发送消息]
- * @author: libingke
- * @return array
- * @throws Exception
- * @version 1.1
- */
- public function sendMessageV1_1()
- {
- try {
- //connect
- $connect = $this->getConnect();
- $channel = $connect->channel();
- $this->_handleMessage($this->message);
- //预声明
- $channel->queue_declare($this->queue,
- false, true, false, false);
- $channel->basic_publish($this->_message, '', $this->queue);
- //获取返回结果
- list($q_name, $message_count, ) = $channel->queue_declare($this->queue,
- false, true, false, false);
- $data = [
- 'message_total' => $message_count,
- 'queue_name' => $q_name,
- 'message_add' => [
- 'mid' => $this->_mid,
- 'body' => $this->_body
- ]
- ];
- if (($get = $channel->basic_get($q_name)) !== null) {
- $data['basic_get'] = [
- 'mid' => $get->get('message_id'),
- 'body' => $get->body
- ];
- }
- $channel->close();
- $connect->close();
- $statusKey = KeyHelper::getMessageStatusKey($this->_mid, $q_name);
- Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_HAND_OK);
- return $data;
- } catch (\Exception $e) {
- throw new Exception(1000, $e->getMessage());
- }
- }
- /**
- * [构造消息体]
- * @author: libingke
- * @param $data
- */
- private function _handleMessage($data)
- {
- $this->_mid = KeyHelper::getUniqueId('message_send');
- $this->_body = call_user_func_array([$this, '_messageBody'], [$data]);
- $properties = [
- 'message_id' => $this->_mid,
- 'correlation_id'=> $this->_mid,
- 'consumer_tag' => $this->_mid
- ];
- $this->_message = new AMQPMessage($this->_body, $properties);
- }
- private function _messageBody($data)
- {
- return json_encode($data);
- }
- /**
- * [批量发送消息]
- * @author: libingke
- * @return array
- * @throws Exception
- */
- public function batchSendMessage()
- {
- try {
- //connect
- $connect = $this->getConnect();
- $channel = $connect->channel();
- //预声明
- $channel->queue_declare($this->queue,
- false, true, false, false);
- //batch_basic_publish todo
- foreach ($this->message as $k => $v) {
- $this->_handleMessage($v);
- $this->_rows[] = [
- 'mid' => $this->_mid,
- 'body' => $this->_body
- ];
- $channel->batch_basic_publish($this->_message, '', $this->queue);
- $statusKey = KeyHelper::getMessageStatusKey($this->_mid, $this->queue);
- Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_HAND_OK);
- }
- $channel->publish_batch();
- //获取返回结果
- list($q_name, $message_count, ) = $channel->queue_declare($this->queue,
- false, true, false, false);
- $data = [
- 'message_total' => $message_count,
- 'queue_name' => $q_name,
- 'add_count' => count($this->_rows),
- 'add_rows' => $this->_rows
- ];
- if (($get = $channel->basic_get($q_name)) !== null) {
- $data['basic_get'] = [
- 'mid' => $get->get('message_id'),
- 'body' => $get->body
- ];
- }
- $channel->close();
- $connect->close();
- return $data;
- } catch (\Exception $e) {
- throw new Exception(1000, $e->getMessage());
- }
- }
- /**
- * [获取消息列表]
- * @author: libingke
- */
- public function getMessageList()
- {
- /* api错误码 */
- $badCode = 2000;
- /* 登录验证 */
- $authStr = Yii::$app->Amqp->getConfig('user') . ':' . Yii::$app->Amqp->getConfig('pass');
- /* URL */
- $vhost = urlencode(Yii::$app->Amqp->getConfig('vhost'));
- $url = Yii::$app->Amqp->getConfig('host') . ':' . Yii::$app->Amqp->getConfig('api_port') .
- "/api/queues/{$vhost}/" . $this->name . "/get";
- $postParams = [
- 'name' => $this->name,
- 'count' => $this->count,
- 'encoding' => $this->encoding,
- 'requeue' => $this->requeue,
- 'truncate' => "50000",
- 'vhost' => '/',
- ];
- $curl = new Curl();
- $curl->setOption(CURLOPT_USERPWD, $authStr);
- $curl->setRawPostData(json_encode($postParams));
- $result = json_decode($curl->post($url), true);
- if ($curl->responseCode != 200)
- throw new Exception($badCode);
- if ($curl->errorText)
- throw new Exception($badCode, $curl->errorText);
- if (isset($result['error']) && is_string($result['error']))
- throw new Exception($badCode, $result['error']);
- ArrayHelper::multisort($result,'message_count',SORT_ASC);
- //print_r($result);exit();
- $rows = [];
- foreach ($result as $k => $v) {
- $rows[$k]['before_count'] = $v['message_count'];
- $rows[$k]['payload'] = $v['payload'];
- $rows[$k]['payload_encoding']= $v['payload_encoding'];
- $rows[$k]['message_id'] = $v['properties']['message_id'];
- }
- unset($result);
- return ['count' => count($rows), 'rows' => $rows];
- }
- /**
- * [空处理]
- */
- private function _consumeEmpty($mid, $body, $ack, $error, $stop = false)
- {
- $this->_stop = $stop;
- $this->_result[] = [
- 'mid' => $mid,
- 'result' => 'success: do nothing!',
- 'ack' => $ack,
- 'error' => $error
- ];
- }
- /**
- * [处理逻辑1]
- * @author: libingke
- */
- private function _consumeLogin($mid, $body, $ack, $error, $stop = false)
- {
- $this->_stop = $stop;
- $this->_result[] = [
- 'mid' => $mid,
- 'result' => '已处理',
- 'ack' => $ack,
- 'error' => $error
- ];
- }
- /**
- * [消费某条消息]
- * @author: libingke
- * @param $mid
- * @return array
- * @throws Exception
- */
- protected function consumeByMid($mid)
- {
- $q_name = $this->name;
- //选择执行回调
- if ($this->do_nothing != true) {
- $function = '_consume' . ucfirst($q_name);
- if (!method_exists($this, $function) || !is_callable(array($this, $function)))
- throw new Exception(2015);
- } else {
- //if do nothing handle empty
- $function = '_consumeEmpty';
- }
- $connect = $this->getConnect();
- $channel = $connect->channel();
- $channel->queue_declare($q_name,
- false, true, false, false);
- list(, $count, ) = $channel->queue_declare($q_name,
- false, true, false, false);
- $callback = function ($msg) use($function, $mid, $q_name) {
- try {
- $message_id = $msg->get('message_id');
- $ack = $this->ack == true ? true : false;//是否应答
- if ($mid == $message_id) {
- call_user_func_array(
- [$this, $function],
- [$message_id, $msg->body, $ack, '', true]
- );
- $statusKey = KeyHelper::getMessageStatusKey($message_id, $q_name);
- if ($ack === true) {
- $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
- Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_HAND_OK);
- } else {
- Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_NO_ACK);
- }
- }
- } catch (\Exception $e) {
- //$e->getMessage();
- }
- };
- try {
- $channel->basic_qos(0, $count, null);
- $channel->basic_consume($q_name,
- '', false, false, false, false, $callback);
- $i = 0;
- while (count($channel->callbacks)) {
- $i ++;
- if ($i > $count || $this->_stop == true)
- break;
- $channel->wait();
- }
- } catch (\Exception $e) {
- throw new Exception(2104, $e->getMessage());
- }
- $channel->close();
- $connect->close();
- if ($this->_result == null)
- throw new Exception(2102);
- return $this->_result;
- }
- /**
- * [根据数量消费]
- * @author: libingke
- * @param string | int $startPos 开始位置
- * @param int $count 数量
- */
- protected function consumeByCount($count)
- {
- $q_name = $this->name;
- //选择执行回调
- if ($this->do_nothing != true) {
- $function = '_consume' . ucfirst($q_name);
- if (!method_exists($this, $function) || !is_callable(array($this, $function)))
- throw new Exception(2015);
- } else {
- //if do nothing handle empty
- $function = '_consumeEmpty';
- }
- $connect = $this->getConnect();
- $channel = $connect->channel();
- $channel->queue_declare($q_name,
- false, true, false, false);
- list(, $total, ) = $channel->queue_declare($q_name,
- false, true, false, false);
- $callback = function ($msg) use($function, $q_name) {
- $ack = $this->ack == true ? true : false;//是否应答
- try {
- $message_id = $msg->get('message_id');
- call_user_func_array(
- [$this, $function],
- [$message_id, $msg->body, $ack, '', false]
- );
- //更新状态
- $statusKey = KeyHelper::getMessageStatusKey($message_id, $q_name);
- if ($ack === true) {
- $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
- Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_HAND_OK);
- } else {
- Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_NO_ACK);
- }
- } catch (\Exception $e) {
- //消息体出错机制
- call_user_func_array(
- [$this, $function],
- ['', $msg->body, $ack, $e->getMessage(), false]
- );
- }
- };
- try {
- $min = min($count, $total);
- $channel->basic_qos(0, $min, null);
- $channel->basic_consume($q_name,
- '', false, false, false, false, $callback);
- $i = 0;
- while (count($channel->callbacks)) {
- $i ++;
- if ($i > $min || $this->_stop == true)
- break;
- $channel->wait();
- }
- } catch (\Exception $e) {
- throw new Exception(2104, $e->getMessage());
- }
- $channel->close();
- $connect->close();
- return $this->_result;
- }
- /**
- * [消费消息]
- * @author: libingke
- */
- public function consumeMessage()
- {
- switch ($this->type)
- {
- case static::TYPE_MID:
- $data = $this->consumeByMid($this->mid);
- break;
- case static::TYPE_COUNT:
- $data = $this->consumeByCount($this->count);
- break;
- case static::TYPE_MC:
- throw new Exception(1000, '未开发');
- break;
- default:
- return "It's not possible to get there.";
- }
- return $data;
- }
- /**
- * [清空消息]
- * @author: libingke
- * @return array
- */
- public function purge()
- {
- $data = [];
- $connect = Yii::$app->Amqp->AMQPConnection();
- $channel = new \AMQPChannel($connect);
- $queue = new \AMQPQueue($channel);
- $queue->setName($this->name);
- $queue->purge();
- return $data;
- }
- }
|