['message_list', 'purge']], [['name'], 'trim', 'on' => ['message_list', 'purge', 'consume', 'delete', 'ack']], /* 发送消息 */ [['queue'], 'trim', 'on' => ['send', 'batch_send']], [['queue', 'message'], 'required', 'on' => ['send', 'batch_send']], ['message', 'validateArray', 'on' => ['send', 'batch_send']], /* 获取消息列表 */ ['count', 'integer', 'min' => 1, 'max' => 1000, 'message' => 1205, 'tooSmall' => 1206, 'tooBig' => 1207, 'on' => ['message_list']], ['count', 'default', 'value' => 20, 'on' => ['message_list']], ['requeue', 'boolean', 'message' => 1209, 'on' => ['message_list']], ['requeue', 'default', 'value' => true, 'on' => ['message_list']], ['encoding', 'in', 'range' => ['auto', 'base64'], 'message' => 1210, 'on' => ['message_list']], ['encoding', 'default', 'value' => 'auto', 'on' => ['message_list']], /* 消费 */ [['name', 'count'], 'required', 'on' => ['consume']], ['count', 'integer', 'min' => 1, 'max' => 65000, 'message' => 1205, 'tooSmall' => 1206, 'on' => ['consume']], ['type', 'default', 'value' => 'client', 'on' => 'consume'], ['type', 'in', 'range' => ['server', 'client'], 'message' => 1302, 'on' => 'consume'], /* delete & ack */ [['mids', 'name'], 'required', 'on' => ['delete', 'ack']], ['mids', 'validateArray', 'on' => ['delete', 'ack']], ['name', 'string', 'on' => ['delete', 'ack']], ['forced', 'default', 'value' => false, 'on' => 'delete'], ['forced', 'boolean', 'on' => 'delete'], ]; } public function validateArray($attribute) { if (!$this->$attribute || !is_array($this->$attribute)) throw new Exception(1003, "{$attribute} 必须是数组"); } /** * [发送消息] * @author: libingke * @return array * @throws Exception * @version 1.1 */ public function sendMessage() { try { $connect = $this->getConnect(); $channel = $connect->channel(); $this->_handleMessage($this->message); //预声明 $channel->queue_declare($this->queue, false, true, false, false); $channel->basic_publish($this->_message, '', $this->queue); //获取返回结果 list($q_name, $message_count, ) = $channel->queue_declare($this->queue, false, true, false, false); $data = [ 'message_total' => $message_count, 'queue_name' => $q_name, 'message_add' => [ 'mid' => $this->_mid, 'body' => $this->_body ] ]; if (($get = $channel->basic_get($q_name)) !== null) { $data['basic_get'] = [ 'mid' => $get->get('message_id'), 'body' => $get->body ]; } $channel->close(); $connect->close(); Redis::set($q_name, $this->_mid, 'status', AmqpConfig::STATUS_SEND_OK); return $data; } catch (\Exception $e) { throw new Exception(1001, $e->getMessage()); } } /** * [构造消息体] * @param $data */ private function _handleMessage($data) { $this->_mid = KeyHelper::getUniqueId('message_send'); $this->_body = call_user_func_array([$this, '_messageBody'], [$data]); $properties = [ 'message_id' => $this->_mid, 'correlation_id'=> $this->_mid, 'consumer_tag' => $this->_mid ]; $this->_message = new AMQPMessage($this->_body, $properties); } private function _messageBody($data) { return json_encode($data); } /** * [批量发送消息] * @author: libingke * @return array * @throws Exception */ public function batchSendMessage() { try { $connect = $this->getConnect(); $channel = $connect->channel(); //预声明 $channel->queue_declare($this->queue, false, true, false, false); foreach ($this->message as $k => $v) { $this->_handleMessage($v); $this->_rows[] = [ 'mid' => $this->_mid, 'body' => $this->_body ]; $channel->batch_basic_publish($this->_message, '', $this->queue); Redis::set($this->queue, $this->_mid, 'status', AmqpConfig::STATUS_SEND_OK); } $channel->publish_batch(); //获取返回结果 list($q_name, $message_count, ) = $channel->queue_declare($this->queue, false, true, false, false); $data = [ 'message_total' => $message_count, 'queue_name' => $q_name, 'add_count' => count($this->_rows), 'add_rows' => $this->_rows ]; if (($get = $channel->basic_get($q_name)) !== null) { $data['basic_get'] = [ 'mid' => $get->get('message_id'), 'body' => $get->body ]; } $channel->close(); $connect->close(); return $data; } catch (\Exception $e) { throw new Exception(1001, $e->getMessage()); } } /** * [获取消息列表] * @author: libingke */ public function getMessageList() { $authStr = Yii::$app->Amqp->user . ':' . Yii::$app->Amqp->pass; $vhost = urlencode(Yii::$app->Amqp->vhost); $url = Yii::$app->Amqp->host . ':' . Yii::$app->Amqp->api_port . "/api/queues/{$vhost}/" . $this->name . "/get"; $postParams = [ 'name' => $this->name, 'count' => $this->count, 'encoding' => $this->encoding, 'requeue' => $this->requeue, 'truncate' => "50000", 'vhost' => '/', ]; $curl = new Curl(); $curl->setOption(CURLOPT_USERPWD, $authStr); $curl->setRawPostData(json_encode($postParams)); $result = json_decode($curl->post($url), true); if ($curl->responseCode != 200) throw new Exception(1002); if ($curl->errorText) throw new Exception(1002, $curl->errorText); if (isset($result['error']) && is_string($result['error'])) throw new Exception(1002, $result['error']); ArrayHelper::multisort($result,'message_count',SORT_ASC); //print_r($result);exit(); $rows = []; foreach ($result as $k => $v) { $rows[$k]['mid'] = $v['properties']['message_id']; $rows[$k]['body'] = $v['payload']; $rows[$k]['before'] = $v['message_count']; } unset($result); return ['count' => count($rows), 'rows' => $rows]; } /** * consumeMessage * @author: libingke * @return array * @throws Exception */ public function consumeMessage() { $q_name = $this->name; if ($this->type == 'server') { $function = '_closure' . ucfirst($q_name); if ( !method_exists($this, $function) ) throw new Exception(1303); } else { $function = 'closureConsume'; } $connect = $this->getConnect(); $channel = $connect->channel(); $channel->queue_declare($q_name, false, true, false, false); list(, $total, ) = $channel->queue_declare($q_name, false, true, false, false); if ($total == 0) throw new Exception(1300); try { $min = min($this->count, $total); $callback = function ($msg) use($q_name, $function) {call_user_func_array([$this, $function], [$msg, $q_name]);}; $channel->basic_qos(0, $min, null); $channel->basic_consume($q_name, '', false, false, false, false, $callback); for ($i = 1; count($channel->callbacks) && $this->_stop !== true; $i++) { if ($i > $min) break; $channel->wait(); } $channel->close(); $connect->close(); } catch (\Exception $e) { throw new Exception(1001, $e->getMessage()); } return $this->_result; } /** * closureConsume for consumeMessage * @param $msg * @param $queue */ protected function closureConsume($msg, $queue) { $data = ['mid' => '', 'body' => '', 'error' => '']; try { $data['mid'] = $msg->get('message_id'); $data['body'] = $msg->body; Redis::set($queue, $data['mid'], 'status', AmqpConfig::STATUS_HAND); } catch (\Exception $e) { $data['error'] = $e->getMessage(); } finally { $this->_result[] = $data; } } /** * ackMessage * @author: libingke * @return array * @throws Exception */ public function ackMessage() { //帅选合法待应答消息id foreach ($this->mids as $mid) if ($mid && is_string($mid)) $this->_rows[$mid] = $mid; if (!is_array($this->_rows) || count($this->_rows) == 0) throw new Exception(1301); $ack = $this->_rows; $q_name = $this->name; $connect = $this->getConnect(); $channel = $connect->channel(); $channel->queue_declare($q_name, false, true, false, false); list(, $total, ) = $channel->queue_declare($q_name, false, true, false, false); try { $callback = function ($msg) use($q_name) { call_user_func_array([$this, 'closureAck'], [$msg, $q_name, AmqpConfig::STATUS_HAND_OK, true]); }; $channel->basic_qos(0, $total, null); $channel->basic_consume($q_name, '', false, false, false, false, $callback); for ($i = 1; count($channel->callbacks) && $this->_stop !== true; $i++) { if ($i > $total) break; $channel->wait(); } $channel->close(); $connect->close(); } catch (\Exception $e) { throw new Exception(1101, $e->getMessage()); } $ackCount = count($ack) - count($this->_rows); $data = ['queue' => $q_name, 'ack_count' => $ackCount]; count($this->_rows) ? $data['ack_fail'] = array_keys($this->_rows) : null; return $data; } /** * deleteMessage * @author: libingke * @return array * @throws Exception */ public function deleteMessage() { //帅选合法待应答消息id foreach ($this->mids as $mid) if ($mid && is_string($mid)) $this->_rows[$mid] = $mid; if (!is_array($this->_rows) || count($this->_rows) == 0) throw new Exception(1301); $delete = $this->_rows; Redis::batchDel($this->name, $delete, 'status'); $q_name = $this->name; $connect = $this->getConnect(); $channel = $connect->channel(); $channel->queue_declare($q_name, false, true, false, false); list(, $total, ) = $channel->queue_declare($q_name, false, true, false, false); try { $callback = function ($msg) use($q_name) {call_user_func_array([$this, 'closureAck'], [$msg]);}; $channel->basic_qos(0, $total, null); $channel->basic_consume($q_name, '', false, false, false, false, $callback); for ($i = 1; count($channel->callbacks) && $this->_stop !== true; $i++) { if ($i > $total) break; $channel->wait(); } $channel->close(); $connect->close(); } catch (\Exception $e) { throw new Exception(1101, $e->getMessage()); } $deleteCount = count($delete) - count($this->_rows); $data = ['queue' => $q_name, 'delete_count' => $deleteCount]; count($this->_rows) ? $data['delete_fail'] = array_keys($this->_rows) : null; return $data; } /** * closureAck for ackMessage && deleteMessage * @author: libingke * @param $msg * @param string $queue * @param bool $status * @param bool $check */ protected function closureAck($msg, $queue = '', $status = false, $check = false) { try { $mid = $msg->get('message_id'); if (in_array($mid, $this->_rows)) { if ($check == true && Redis::get($queue, $mid, 'status') != AmqpConfig::STATUS_HAND) { goto end; } unset($this->_rows[$mid]); $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); if ($status != false && $queue != '') Redis::set($queue, $mid, 'status', $status); } end: } finally { if (count($this->_rows) == 0) $this->_stop = true; } } /** * [清空消息] * @author: libingke * @return array */ public function purge() { try { $connect = $this->getConnect(); $channel = $connect->channel(); $delete = $channel->queue_purge($this->name); if ($delete > 0) Redis::purge($this->name); $channel->close(); $connect->close(); return [ 'queue' => $this->name, 'count' => $delete ]; } catch (\Exception $e) { throw new Exception(1001, $e->getMessage()); } } private function _closureLogin($msg, $queue) { $data = ['mid' => '', 'body' => '', 'response' => '', 'error' => '']; try { $data['mid'] = $msg->get('message_id'); $data['body'] = $msg->body; if ($data['mid']) { $handle = new LoginHandle(); $data['response'] = $handle->login($msg->body, $queue, $data['mid']); $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); Redis::set($queue, $data['mid'], 'status', AmqpConfig::STATUS_HAND_OK); Redis::set($queue, $data['mid'], 'result', $data['response']); Redis::expire($queue, $data['mid'], 'result', 3600); } } catch (\Exception $e) { $data['error'] = $e->getMessage(); } finally { $this->_result[] = $data; } } }