['send', 'batch_send']], [['queue', 'message'], 'required', 'on' => ['send', 'batch_send']], ['message', 'validateMessage', 'on' => ['send', 'batch_send']], /* 获取消息列表 */ [['name'], 'trim', 'on' => ['message_list', 'purge', 'consume']], [['name'], 'required', 'on' => ['message_list', 'purge']], ['count', 'integer', 'min' => 1, 'max' => 1000, 'message' => 2012, 'tooSmall' => 2013, 'tooBig' => 2014, 'on' => ['message_list']], ['count', 'default', 'value' => 20, 'on' => ['message_list']], ['requeue', 'boolean', 'message' => 2010, 'on' => ['message_list']], ['requeue', 'default', 'value' => true, 'on' => ['message_list']], ['encoding', 'in', 'range' => ['auto', 'base64'], 'message' => 2011, 'on' => ['message_list']], ['encoding', 'default', 'value' => 'auto', 'on' => ['message_list']], /* 消费 */ [['type', 'name'], 'required', 'on' => ['consume']], ['type', 'in', 'range' => [static::TYPE_MID, static::TYPE_COUNT, static::TYPE_MC], 'on' => 'consume'], ['type', 'validateType', 'on' => 'consume'], ['mid', 'string', 'on' => 'consume'], ['count', 'integer', 'min' => 1, 'max' => 65000, 'message' => 2012, 'tooSmall' => 2013, 'on' => ['consume']], ['do_nothing', 'default', 'value' => false, 'on' => 'consume'], ['do_nothing', 'boolean', 'on' => 'consume'], ['ack', 'default', 'value' => false, 'on' => 'consume'], ['ack', 'boolean', 'on' => 'consume'], ]; } public function validateMessage($attribute) { if (!$this->$attribute) throw new Exception(2001); if (!is_array($this->$attribute)) throw new Exception(2002, "{$attribute} 必须是数组"); } public function validateType($attribute) { if ($this->$attribute == static::TYPE_MC) { if ($this->mid == null) throw new Exception(2016); if ($this->count == null) throw new Exception(2017); } else { if ($this->{$this->$attribute} == null) throw new Exception(2016, $this->$attribute . " 不能为空"); } } /** * [连接AMQP] * @author: libingke * @return \PhpAmqpLib\Connection\AMQPStreamConnection */ protected function getConnect() { return new \PhpAmqpLib\Connection\AMQPStreamConnection( Yii::$app->Amqp->host, Yii::$app->Amqp->port, Yii::$app->Amqp->user, Yii::$app->Amqp->pass, Yii::$app->Amqp->vhost ); } /** * [发送消息] * @author: libingke * @return array * @throws Exception * @version 1.0 */ public function sendMessage() { $body = call_user_func_array([$this, '_messageBody'], [$this->attributes]); $mid = KeyHelper::getUniqueId('message_send'); $properties = [ 'content_type' => 'text/plain', 'message_id' => $mid, 'correlation_id' => $mid, 'consumer_tag' => $mid ]; $data = []; $e_name = 'message.default'; $k_route = 'route.default'; $q_name = $this->queue; $connect = Yii::$app->Amqp->AMQPConnection(); $channel = new \AMQPChannel($connect); $exchange = new \AMQPExchange($channel); $exchange->setName($e_name); $exchange->setType(AMQP_EX_TYPE_DIRECT); $exchange->setFlags(AMQP_DURABLE);//持久化 $exchange->declareExchange();//声明交换机 $queue = new \AMQPQueue($channel); $queue->setName($q_name); $queue->declareQueue(); //声明队列 $queue->bind($e_name, $k_route); $r = $exchange->publish($body, $k_route, AMQP_NOPARAM, $properties); if ($r == true) { $data['message_total'] = $queue->declareQueue(); $data['queue_name'] = $this->queue; $data['message_new'] = [ 'mid' => $mid, 'body' => $body ]; if ($data['message_total'] > 1) { $data['message_first'] = [ 'mid' => $queue->get()->getMessageId(), 'body' => $queue->get()->getBody() ]; } Cache::setData( 'queue:mid:'.$mid, Cache::STATUS_SEND_OK ); } else { throw new Exception(2103); } return $data; } /** * [发送消息] * @author: libingke * @return array * @throws Exception * @version 1.1 */ public function sendMessageV1_1() { try { //connect $connect = $this->getConnect(); $channel = $connect->channel(); $this->_handleMessage($this->message); //预声明 $channel->queue_declare($this->queue, false, true, false, false); $channel->basic_publish($this->_message, '', $this->queue); //获取返回结果 list($q_name, $message_count, ) = $channel->queue_declare($this->queue, false, true, false, false); $data = [ 'message_total' => $message_count, 'queue_name' => $q_name, 'message_add' => [ 'mid' => $this->_mid, 'body' => $this->_body ] ]; if (($get = $channel->basic_get($q_name)) !== null) { $data['basic_get'] = [ 'mid' => $get->get('message_id'), 'body' => $get->body ]; } $channel->close(); $connect->close(); $statusKey = KeyHelper::getMessageStatusKey($this->_mid, $q_name); Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_HAND_OK); return $data; } catch (\Exception $e) { throw new Exception(1000, $e->getMessage()); } } /** * [构造消息体] * @author: libingke * @param $data */ private function _handleMessage($data) { $this->_mid = KeyHelper::getUniqueId('message_send'); $this->_body = call_user_func_array([$this, '_messageBody'], [$data]); $properties = [ 'message_id' => $this->_mid, 'correlation_id'=> $this->_mid, 'consumer_tag' => $this->_mid ]; $this->_message = new AMQPMessage($this->_body, $properties); } private function _messageBody($data) { return json_encode($data); } /** * [批量发送消息] * @author: libingke * @return array * @throws Exception */ public function batchSendMessage() { try { //connect $connect = $this->getConnect(); $channel = $connect->channel(); //预声明 $channel->queue_declare($this->queue, false, true, false, false); //batch_basic_publish todo foreach ($this->message as $k => $v) { $this->_handleMessage($v); $this->_rows[] = [ 'mid' => $this->_mid, 'body' => $this->_body ]; $channel->batch_basic_publish($this->_message, '', $this->queue); $statusKey = KeyHelper::getMessageStatusKey($this->_mid, $this->queue); Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_HAND_OK); } $channel->publish_batch(); //获取返回结果 list($q_name, $message_count, ) = $channel->queue_declare($this->queue, false, true, false, false); $data = [ 'message_total' => $message_count, 'queue_name' => $q_name, 'add_count' => count($this->_rows), 'add_rows' => $this->_rows ]; if (($get = $channel->basic_get($q_name)) !== null) { $data['basic_get'] = [ 'mid' => $get->get('message_id'), 'body' => $get->body ]; } $channel->close(); $connect->close(); return $data; } catch (\Exception $e) { throw new Exception(1000, $e->getMessage()); } } /** * [获取消息列表] * @author: libingke */ public function getMessageList() { /* api错误码 */ $badCode = 2000; /* 登录验证 */ $authStr = Yii::$app->Amqp->getConfig('user') . ':' . Yii::$app->Amqp->getConfig('pass'); /* URL */ $vhost = urlencode(Yii::$app->Amqp->getConfig('vhost')); $url = Yii::$app->Amqp->getConfig('host') . ':' . Yii::$app->Amqp->getConfig('api_port') . "/api/queues/{$vhost}/" . $this->name . "/get"; $postParams = [ 'name' => $this->name, 'count' => $this->count, 'encoding' => $this->encoding, 'requeue' => $this->requeue, 'truncate' => "50000", 'vhost' => '/', ]; $curl = new Curl(); $curl->setOption(CURLOPT_USERPWD, $authStr); $curl->setRawPostData(json_encode($postParams)); $result = json_decode($curl->post($url), true); if ($curl->responseCode != 200) throw new Exception($badCode); if ($curl->errorText) throw new Exception($badCode, $curl->errorText); if (isset($result['error']) && is_string($result['error'])) throw new Exception($badCode, $result['error']); ArrayHelper::multisort($result,'message_count',SORT_ASC); //print_r($result);exit(); $rows = []; foreach ($result as $k => $v) { $rows[$k]['before_count'] = $v['message_count']; $rows[$k]['payload'] = $v['payload']; $rows[$k]['payload_encoding']= $v['payload_encoding']; $rows[$k]['message_id'] = $v['properties']['message_id']; } unset($result); return ['count' => count($rows), 'rows' => $rows]; } /** * [空处理] */ private function _consumeEmpty($mid, $body, $ack, $error, $stop = false) { $this->_stop = $stop; $this->_result[] = [ 'mid' => $mid, 'result' => 'success: do nothing!', 'ack' => $ack, 'error' => $error ]; } /** * [处理逻辑1] * @author: libingke */ private function _consumeLogin($mid, $body, $ack, $error, $stop = false) { $this->_stop = $stop; $this->_result[] = [ 'mid' => $mid, 'result' => '已处理', 'ack' => $ack, 'error' => $error ]; } /** * [消费某条消息] * @author: libingke * @param $mid * @return array * @throws Exception */ protected function consumeByMid($mid) { $q_name = $this->name; //选择执行回调 if ($this->do_nothing != true) { $function = '_consume' . ucfirst($q_name); if (!method_exists($this, $function) || !is_callable(array($this, $function))) throw new Exception(2015); } else { //if do nothing handle empty $function = '_consumeEmpty'; } $connect = $this->getConnect(); $channel = $connect->channel(); $channel->queue_declare($q_name, false, true, false, false); list(, $count, ) = $channel->queue_declare($q_name, false, true, false, false); $callback = function ($msg) use($function, $mid, $q_name) { try { $message_id = $msg->get('message_id'); $ack = $this->ack == true ? true : false;//是否应答 if ($mid == $message_id) { call_user_func_array( [$this, $function], [$message_id, $msg->body, $ack, '', true] ); $statusKey = KeyHelper::getMessageStatusKey($message_id, $q_name); if ($ack === true) { $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_HAND_OK); } else { Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_NO_ACK); } } } catch (\Exception $e) { //$e->getMessage(); } }; try { $channel->basic_qos(0, $count, null); $channel->basic_consume($q_name, '', false, false, false, false, $callback); $i = 0; while (count($channel->callbacks)) { $i ++; if ($i > $count || $this->_stop == true) break; $channel->wait(); } } catch (\Exception $e) { throw new Exception(2104, $e->getMessage()); } $channel->close(); $connect->close(); if ($this->_result == null) throw new Exception(2102); return $this->_result; } /** * [根据数量消费] * @author: libingke * @param string | int $startPos 开始位置 * @param int $count 数量 */ protected function consumeByCount($count) { $q_name = $this->name; //选择执行回调 if ($this->do_nothing != true) { $function = '_consume' . ucfirst($q_name); if (!method_exists($this, $function) || !is_callable(array($this, $function))) throw new Exception(2015); } else { //if do nothing handle empty $function = '_consumeEmpty'; } $connect = $this->getConnect(); $channel = $connect->channel(); $channel->queue_declare($q_name, false, true, false, false); list(, $total, ) = $channel->queue_declare($q_name, false, true, false, false); $callback = function ($msg) use($function, $q_name) { $ack = $this->ack == true ? true : false;//是否应答 try { $message_id = $msg->get('message_id'); call_user_func_array( [$this, $function], [$message_id, $msg->body, $ack, '', false] ); //更新状态 $statusKey = KeyHelper::getMessageStatusKey($message_id, $q_name); if ($ack === true) { $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_HAND_OK); } else { Yii::$app->redis->set($statusKey, AmqpConfig::STATUS_NO_ACK); } } catch (\Exception $e) { //消息体出错机制 call_user_func_array( [$this, $function], ['', $msg->body, $ack, $e->getMessage(), false] ); } }; try { $min = min($count, $total); $channel->basic_qos(0, $min, null); $channel->basic_consume($q_name, '', false, false, false, false, $callback); $i = 0; while (count($channel->callbacks)) { $i ++; if ($i > $min || $this->_stop == true) break; $channel->wait(); } } catch (\Exception $e) { throw new Exception(2104, $e->getMessage()); } $channel->close(); $connect->close(); return $this->_result; } /** * [消费消息] * @author: libingke */ public function consumeMessage() { switch ($this->type) { case static::TYPE_MID: $data = $this->consumeByMid($this->mid); break; case static::TYPE_COUNT: $data = $this->consumeByCount($this->count); break; case static::TYPE_MC: throw new Exception(1000, '未开发'); break; default: return "It's not possible to get there."; } return $data; } /** * [清空消息] * @author: libingke * @return array */ public function purge() { $data = []; $connect = Yii::$app->Amqp->AMQPConnection(); $channel = new \AMQPChannel($connect); $queue = new \AMQPQueue($channel); $queue->setName($this->name); $queue->purge(); return $data; } }