123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290 |
- <?php
- namespace components\components\messageQueue;
- use Yii;
- use yii\base\Component;
- /**
- * MessageInterface
- * 消息队列 接口
- * -----------------
- * @author Verdient。
- * @version 1.0.0
- */
- class MessageInterface extends Component {
- /**
- * @var public String $key
- * 签名密钥
- * -----------------------
- * @method Config
- * @author Verdient。
- */
- public $key;
- /**
- * @var public $protocol
- * 访问协议
- * ---------------------
- * @method Config
- * @author Verdient。
- */
- public $protocol = '';
- /**
- * @var public $host
- * 域名
- * -----------------
- * @method Config
- * @author Verdient。
- */
- public $host = '';
- /**
- * @var public $port
- * 端口
- * -----------------
- * @method Config
- * @author Verdient。
- */
- public $port = '';
- /**
- * @var public $_urlPrefix
- * URL 路径前缀
- * -----------------------
- * @author Verdient。
- */
- protected $_urlPrefix;
- /**
- * init()
- * 初始化
- * ------
- * @param String $publicKey 公钥
- * -----------------------------
- * @return Array / false
- * @author Verdient。
- */
- public function init(){
- parent::init();
- if(!$this->protocol){
- $this->protocol = Yii::$app->request->isSecureConnection ? 'https' : 'http';
- }
- if(!$this->host){
- $this->host = explode(':', $_SERVER['HTTP_HOST'])[0];
- }
- $this->_urlPrefix = $this->protocol . '://' . $this->host . ($this->port ? (($this->protocol == 'https' && $this->port == 443) || ($this->protocol == 'http' && $this->port == 80) ? '' : (':' . $this->port)) : '');
- }
- /**
- * createQueue(String $name)
- * 创建队列
- * -------------------------
- * @param String $name 队列名称
- * ---------------------------
- * @return Array
- * @author Verdient。
- */
- public function createQueue($name){
- return $this->_formatResponse($this->_send('post', '/queue/create', ['name' => $name]));
- }
- /**
- * queueList()
- * 队列列表
- * -----------
- * @return Array
- * @author Verdient。
- */
- public function queueList(){
- return $this->_formatResponse($this->_send('get', '/queue/list'));
- }
- /**
- * deleteQueue(String $name)
- * 删除队列
- * -------------------------
- * @param String $name 队列名称
- * ---------------------------
- * @return Array
- * @author Verdient。
- */
- public function deleteQueue($name){
- return $this->_formatResponse($this->_send('post', '/queue/delete', ['name' => $name]));
- }
- /**
- * sendMessage(String $name, Array $message)
- * 发送消息
- * -----------------------------------------
- * @param String $name 队列名称
- * @param Array $message 消息
- * ---------------------------
- * @return Array
- * @author Verdient。
- */
- public function sendMessage($name, Array $message){
- return $this->_formatResponse($this->_send('post', '/message/send', ['queue' => $name, 'message' => $message]));
- }
- /**
- * batchSendMessage(String $name, Array $message)
- * 批量发送消息
- * ----------------------------------------------
- * @param String $name 队列名称
- * @param Array $message 消息
- * ---------------------------
- * @return Array
- * @author Verdient。
- */
- public function batchSendMessage($name, Array $message){
- return $this->_formatResponse($this->_send('post', '/message/batch-send', ['queue' => $name, 'message' => $message]));
- }
- /**
- * messageList(String $name)
- * 消息列表
- * -------------------------
- * @param String $name 队列名称
- * ---------------------------
- * @return Array
- * @author Verdient。
- */
- public function messageList($name){
- return $this->_formatResponse($this->_send('get', '/message/list', [], ['name' => $name]));
- }
- /**
- * flushMessage(String $name)
- * 清空消息
- * --------------------------
- * @param String $name 队列名称
- * ---------------------------
- * @return Array
- * @author Verdient。
- */
- public function flushMessage($name){
- return $this->_formatResponse($this->_send('post', '/message/purge', ['name' => $name]));
- }
- /**
- * consumeMessage(String $name, Integer $count)
- * 消费消息
- * --------------------------------------------
- * @param String $name 队列名称
- * @param Integer $count 消费数量
- * ------------------------------
- * @return Array
- * @author Verdient。
- */
- public function consumeMessage($name, $count){
- return $this->_formatResponse($this->_send('post', '/message/purge', ['name' => $name, 'count' => $count]));
- }
- /**
- * deleteMessage(String $name, Array $ids)
- * 删除消息
- * ---------------------------------------
- * @param String $name 队列名称
- * @param Array $ids 消息编号集合
- * -------------------------------
- * @return Array
- * @author Verdient。
- */
- public function deleteMessage($name, Array $ids){
- return $this->_formatResponse($this->_send('post', '/message/delete', ['name' => $name, 'mids' => $ids]));
- }
- /**
- * ackMessage(String $name, Array $ids)
- * 应答消息
- * ------------------------------------
- * @param String $name 队列名称
- * @param Array $ids 消息编号集合
- * -----------------------------
- * @return Array
- * @author Verdient。
- */
- public function ackMessage($name, Array $ids){
- return $this->_formatResponse($this->_send('post', '/message/ack', ['name' => $name, 'mids' => $ids]));
- }
- /**
- * messageStatus(String $name, String $id)
- * 消息状态
- * ---------------------------------------
- * @param String $name 队列名称
- * @param String $id 消息编号
- * ----------------------------
- * @author Verdient。
- */
- public function messageStatus($name, $id){
- return $this->_formatResponse($this->_send('get', '/query/message-status', [], ['queue' => $name, 'mid' => $id]));
- }
- /**
- * _send(String $method, String $url[, Array $data = []])
- * 发送数据
- * ------------------------------------------------------
- * @param String $method 访问方法
- * @param String $url URL路径
- * @param Array $data 发送的数据
- * ------------------------------
- * @return Array
- * @author Verdient。
- */
- protected function _send($method, $url, Array $data = [], Array $query = []){
- $method = strtolower($method);
- if(!empty($query)){
- $url .= '?' . http_build_query($query);
- }
- $url = $this->_urlPrefix . $url;
- $signature = new Signature();
- $signature->key = $this->key;
- $signature->verb = $method;
- if($method == 'post'){
- if(!empty($data)){
- Yii::$app->cUrl->setData($data, 'JSON');
- }
- $signature->content = $data;
- }
- Yii::$app->cUrl->setHeader([
- 'Authorization' => 'FzmMQ client_test:' . $signature->signature,
- 'Verb' => $signature->verb,
- 'Date' => $signature->date,
- 'Content-Md5' => $signature->contentMd5,
- 'Signature-Method' => $signature->signatureMethod,
- 'Signature-Version' => $signature->signatureVersion
- ]);
- $options = Yii::$app->cUrl->getOptions();
- $response = Yii::$app->cUrl->$method($url, 'JSON');
- Yii::trace(['url' => $url, 'cUrl' => $options, 'response' => $response], __METHOD__);
- return $response;
- }
- /**
- * _formatResponse(Mixed $response)
- * 格式化响应
- * --------------------------------
- * @param Mixed $response 响应内容
- * ------------------------------
- * @return Array
- * @author Verdient。
- */
- protected function _formatResponse($response){
- $flag = false;
- if(is_array($response) && isset($response['code']) && $response['code'] == 200){
- $flag = true;
- }
- return $flag ? [
- 'result' => $flag,
- 'data' => isset($response['data']) ? $response['data'] : null,
- 'code' => $response['code']
- ] : [
- 'result' => $flag,
- 'message' => isset($response['message']) ? $response['message'] : 'Unknown Error',
- 'code' => isset($response['code']) ? $response['code'] : 500
- ];
- }
- }
|