<?php

namespace backend\forms;

use common\logic\LoginHandle;
use PhpAmqpLib\Message\AMQPMessage;
use components\service\AmqpConfig;
use components\service\Redis;
use components\Exception;
use components\Curl;
use common\helpers\KeyHelper;
use yii\helpers\ArrayHelper;
use Yii;

class MessageForm extends BaseForm
{
	/**
	 * @var
	 */
	public $queue;

	/**
	 * @var
	 */
	public $message;

	/**
	 * @var
	 */
	public $name;

    /**
     * @var 主题 交换机
     */
    public $topicName;


    /**
     * @var  路由
     */
    public $routingKey;



    /**
	 * @var integer 数量
	 */
	public $count;

	/**
	 * @var bool 是否重新排序
	 */
	public $requeue;

	/**
	 * @var string 编码
	 */
	public $encoding;

	/**
	 * @var string 消费id
	 */
	public $mid;

	/**
	 * @var array 消费ids
	 */
	public $mids;

	/**
	 * @var string 消费类型
	 */
	public $type;

	/**
	 * @var bool 自动应答
	 */
	public $ack;

	/**
	 * @var bool 强制删除
	 */
	public $forced;

	/**
	 * @var bool
	 */
	private $_stop = false;

	/**
	 * @var string
	 */
	private $_mid;

	/**
	 * @var string
	 */
	private $_body;

	/**
	 * @var AMQPMessage
	 */
	private $_message;

	/**
	 * @var array
	 */
	private $_rows;

	/**
	 * @var array
	 */
	private $_result;

	public function rules()
	{
		return [
			[['name'], 'required', 'on' => ['message_list', 'purge']],
			[['name'], 'trim', 'on' => ['message_list', 'purge', 'consume', 'delete', 'ack']],

			/* 发送消息 */
			[['queue'], 'trim', 'on' => ['send', 'batch_send']],
			[['queue', 'message'], 'required', 'on' => ['send', 'batch_send']],
			['message', 'validateArray', 'on' => ['send', 'batch_send']],


            /* 发送到订阅者 */
            [['topicName'], 'trim', 'on' => ['sendtopic']],
            [['topicName', 'message'], 'required', 'on' => ['sendtopic']],
            ['message', 'validateArray', 'on' => ['sendtopic']],



			/* 获取消息列表 */
			['count', 'integer', 'min' => 1, 'max' => 1000, 'message' => 1205,
				'tooSmall' => 1206, 'tooBig' => 1207, 'on' => ['message_list']],
			['count', 'default', 'value' => 20, 'on' => ['message_list']],
			['requeue', 'boolean', 'message' => 1209, 'on' => ['message_list']],
			['requeue', 'default', 'value' => true, 'on' => ['message_list']],
			['encoding', 'in', 'range' => ['auto', 'base64'], 'message' => 1210, 'on' => ['message_list']],
			['encoding', 'default', 'value' => 'auto', 'on' => ['message_list']],

			/* 消费 */
			[['name', 'count'], 'required', 'on' => ['consume']],
			['count', 'integer', 'min' => 1, 'max' => 65000, 'message' => 1205,
				'tooSmall' => 1206, 'on' => ['consume']],
			['type', 'default', 'value' => 'client', 'on' => 'consume'],
			['type', 'in', 'range' => ['server', 'client'], 'message' => 1302, 'on' => 'consume'],

			/* delete & ack */
			[['mids', 'name'], 'required', 'on' => ['delete', 'ack']],
			['mids', 'validateArray', 'on' => ['delete', 'ack']],
			['name', 'string', 'on' => ['delete', 'ack']],
			['forced', 'default', 'value' => false, 'on' => 'delete'],
			['forced', 'boolean',  'on' => 'delete'],
		];
	}

	public function validateArray($attribute)
	{
		if (!$this->$attribute || !is_array($this->$attribute))
			throw new Exception(1003, "{$attribute} 必须是数组");
	}

	/**
	 * [发送消息]
	 * @author: libingke
	 * @return array
	 * @throws Exception
	 * @version 1.1
	 */
	public function sendMessage()
	{
		try {
			$connect = $this->getConnect();
			$channel = $connect->channel();
			$this->_handleMessage($this->message);
			//预声明
			$channel->queue_declare($this->queue,
				false, true, false, false);
			$channel->basic_publish($this->_message, '', $this->queue);
			//获取返回结果
			list($q_name, $message_count, ) = $channel->queue_declare($this->queue,
				false, true, false, false);

			$data = [
				'message_total' => $message_count,
				'queue_name' => $q_name,
				'message_add' => [
					'mid' => $this->_mid,
					'body' => $this->_body
				]
			];
			if (($get = $channel->basic_get($q_name)) !== null) {
				$data['basic_get'] = [
					'mid' => $get->get('message_id'),
					'body' => $get->body
				];
			}
			$channel->close();
			$connect->close();

			Redis::set($q_name, $this->_mid, 'status', AmqpConfig::STATUS_SEND_OK);

			return $data;

		} catch (\Exception $e) {
			throw new Exception(1001, $e->getMessage());
		}
	}



    public function sendMessageTopic()
    {
        try {
            $connect = $this->getConnect();
            $channel = $connect->channel();
            $this->_handleMessage($this->message);
            //发送信息到某个主题 $exchange
            $channel->exchange_declare($this->topicName, 'topic', false, true, false);
            //消息发送到
            $channel->basic_publish($this->_message, is_null($this->topicName)?'':$this->topicName, is_null($this->routingKey)?'*':$this->routingKey);

            $data = [
                'topicName' => is_null($this->topicName)?'*':$this->topicName,
                'routingKey' => is_null($this->routingKey)?'*':$this->routingKey,
                'message_add' => [
                    'mid' => $this->_mid,
                    'body' => $this->_body
                ]
            ];
            $channel->close();
            $connect->close();
            //订阅的不需要标记
           // Redis::set($q_name, $this->_mid, 'status', AmqpConfig::STATUS_SEND_OK);

            return $data;

        } catch (\Exception $e) {
            throw new Exception(1001, $e->getMessage());
        }
    }

	/**
	 * [构造消息体]
	 * @param $data
	 */
	private function _handleMessage($data)
	{
		$this->_mid = KeyHelper::getUniqueId('message_send');
		$this->_body = call_user_func_array([$this, '_messageBody'], [$data]);
		$properties = [
			'message_id' 	=> $this->_mid,
			'correlation_id'=> $this->_mid,
			'consumer_tag'	=> $this->_mid
		];
		$this->_message = new AMQPMessage($this->_body, $properties);
	}

	private function _messageBody($data)
	{
		return json_encode($data);
	}

	/**
	 * [批量发送消息]
	 * @author: libingke
	 * @return array
	 * @throws Exception
	 */
	public function batchSendMessage()
	{
		try {
			$connect = $this->getConnect();
			$channel = $connect->channel();
			//预声明
			$channel->queue_declare($this->queue,
				false, true, false, false);

			foreach ($this->message as $k => $v) {
				$this->_handleMessage($v);
				$this->_rows[] = [
					'mid' => $this->_mid,
					'body' => $this->_body
				];
				$channel->batch_basic_publish($this->_message, '', $this->queue);

				Redis::set($this->queue, $this->_mid, 'status', AmqpConfig::STATUS_SEND_OK);
			}
			$channel->publish_batch();

			//获取返回结果
			list($q_name, $message_count, ) = $channel->queue_declare($this->queue,
				false, true, false, false);

			$data = [
				'message_total' => $message_count,
				'queue_name' => $q_name,
				'add_count' => count($this->_rows),
				'add_rows' => $this->_rows
			];
			if (($get = $channel->basic_get($q_name)) !== null) {
				$data['basic_get'] = [
					'mid' => $get->get('message_id'),
					'body' => $get->body
				];
			}
			$channel->close();
			$connect->close();

			return $data;

		} catch (\Exception $e) {
			throw new Exception(1001, $e->getMessage());
		}
	}

	/**
	 * [获取消息列表]
	 * @author: libingke
	 */
	public function getMessageList()
	{
		$authStr = Yii::$app->Amqp->user . ':' . Yii::$app->Amqp->pass;
		$vhost = urlencode(Yii::$app->Amqp->vhost);
		$url = Yii::$app->Amqp->host . ':' . Yii::$app->Amqp->api_port .
			"/api/queues/{$vhost}/" . $this->name . "/get";

		$postParams = [
			'name' => $this->name,
			'count' => $this->count,
			'encoding' => $this->encoding,
			'requeue' => $this->requeue,
			'truncate' => "50000",
			'vhost' => '/',
		];

		$curl = new Curl();
		$curl->setOption(CURLOPT_USERPWD, $authStr);
		$curl->setRawPostData(json_encode($postParams));
		$result = json_decode($curl->post($url), true);
		if ($curl->responseCode != 200)
			throw new Exception(1002);

		if ($curl->errorText)
			throw new Exception(1002, $curl->errorText);

		if (isset($result['error']) && is_string($result['error']))
			throw new Exception(1002, $result['error']);

		ArrayHelper::multisort($result,'message_count',SORT_ASC);
		//print_r($result);exit();
		$rows = [];
		foreach ($result as $k => $v) {
			$rows[$k]['mid']	= $v['properties']['message_id'];
			$rows[$k]['body']	= $v['payload'];
			$rows[$k]['before']	= $v['message_count'];

		}
		unset($result);

		return ['count' => count($rows), 'rows' => $rows];
	}

	/**
	 * consumeMessage
	 * @author: libingke
	 * @return array
	 * @throws Exception
	 */
	public function consumeMessage()
	{
		$q_name = $this->name;
		if ($this->type == 'server') {
			$function = '_closure' . ucfirst($q_name);
			if ( !method_exists($this, $function) )
				throw new Exception(1303);

		} else {
			$function = 'closureConsume';
		}

		$connect = $this->getConnect();
		$channel = $connect->channel();
		$channel->queue_declare($q_name,
			false, true, false, false);
		list(, $total, ) = $channel->queue_declare($q_name,
			false, true, false, false);

		if ($total == 0)
			throw new Exception(1300);

		try {
			$min = min($this->count, $total);

			$callback = function ($msg) use($q_name, $function) {call_user_func_array([$this, $function], [$msg, $q_name]);};
			$channel->basic_qos(0, $min, null);
			$channel->basic_consume($q_name,
				'', false, false, false, false, $callback);
			for ($i = 1; count($channel->callbacks) && $this->_stop !== true; $i++) {
				if ($i > $min)
					break;

				$channel->wait();
			}

			$channel->close();
			$connect->close();

		} catch (\Exception $e) {
			throw new Exception(1001, $e->getMessage());
		}

		return $this->_result;
	}

	/**
	 * closureConsume for consumeMessage
	 * @param $msg
	 * @param $queue
	 */
	protected function closureConsume($msg, $queue)
	{
		$data = ['mid' => '', 'body' => '',	'error' => ''];
		try {
			$data['mid'] = $msg->get('message_id');
			$data['body'] = $msg->body;
			Redis::set($queue, $data['mid'], 'status', AmqpConfig::STATUS_HAND);

		} catch (\Exception $e) {
			$data['error'] = $e->getMessage();

		} finally {
			$this->_result[] = $data;
		}
	}

	/**
	 * ackMessage
	 * @author: libingke
	 * @return array
	 * @throws Exception
	 */
	public function ackMessage()
	{
		//帅选合法待应答消息id
		foreach ($this->mids as $mid)
			if ($mid && is_string($mid))
				$this->_rows[$mid] = $mid;
		if (!is_array($this->_rows) || count($this->_rows) == 0)
			throw new Exception(1301);
		$ack = $this->_rows;

		$q_name = $this->name;
		$connect = $this->getConnect();
		$channel = $connect->channel();
		$channel->queue_declare($q_name,
			false, true, false, false);
		list(, $total, ) = $channel->queue_declare($q_name,
			false, true, false, false);

		try {
			$callback = function ($msg) use($q_name) {
				call_user_func_array([$this, 'closureAck'], [$msg, $q_name, AmqpConfig::STATUS_HAND_OK, true]);
			};
			$channel->basic_qos(0, $total, null);
			$channel->basic_consume($q_name,
				'', false, false, false, false, $callback);
			for ($i = 1; count($channel->callbacks) && $this->_stop !== true; $i++) {
				if ($i > $total)
					break;

				$channel->wait();
			}

			$channel->close();
			$connect->close();

		} catch (\Exception $e) {
			throw new Exception(1101, $e->getMessage());
		}

		$ackCount = count($ack) - count($this->_rows);
		$data = ['queue' => $q_name, 'ack_count' => $ackCount];
		count($this->_rows) ? $data['ack_fail'] = array_keys($this->_rows) : null;
		return $data;
	}

	/**
	 * deleteMessage
	 * @author: libingke
	 * @return array
	 * @throws Exception
	 */
	public function deleteMessage()
	{
		//帅选合法待应答消息id
		foreach ($this->mids as $mid)
			if ($mid && is_string($mid))
				$this->_rows[$mid] = $mid;
		if (!is_array($this->_rows) || count($this->_rows) == 0)
			throw new Exception(1301);
		$delete = $this->_rows;
		Redis::batchDel($this->name, $delete, 'status');

		$q_name = $this->name;
		$connect = $this->getConnect();
		$channel = $connect->channel();
		$channel->queue_declare($q_name,
			false, true, false, false);
		list(, $total, ) = $channel->queue_declare($q_name,
			false, true, false, false);

		try {
			$callback = function ($msg) use($q_name) {call_user_func_array([$this, 'closureAck'], [$msg]);};
			$channel->basic_qos(0, $total, null);
			$channel->basic_consume($q_name,
				'', false, false, false, false, $callback);
			for ($i = 1; count($channel->callbacks) && $this->_stop !== true; $i++) {
				if ($i > $total)
					break;

				$channel->wait();
			}

			$channel->close();
			$connect->close();

		} catch (\Exception $e) {
			throw new Exception(1101, $e->getMessage());
		}

		$deleteCount = count($delete) - count($this->_rows);
		$data = ['queue' => $q_name, 'delete_count' => $deleteCount];
		count($this->_rows) ? $data['delete_fail'] = array_keys($this->_rows) : null;
		return $data;
	}

	/**
	 * closureAck for ackMessage && deleteMessage
	 * @author: libingke
	 * @param $msg
	 * @param string $queue
	 * @param bool $status
	 * @param bool $check
	 */
	protected function closureAck($msg, $queue = '', $status = false, $check = false)
	{
		try {
			$mid = $msg->get('message_id');
			if (in_array($mid, $this->_rows)) {
				if ($check == true && Redis::get($queue, $mid, 'status') != AmqpConfig::STATUS_HAND) {
					goto end;
				}
				unset($this->_rows[$mid]);
				$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
				if ($status != false && $queue != '')
					Redis::set($queue, $mid, 'status', $status);
			}
			end:

		} finally {
			if (count($this->_rows) == 0)
				$this->_stop = true;
		}
	}

	/**
	 * [清空消息]
	 * @author: libingke
	 * @return array
	 */
	public function purge()
	{
		try {
			$connect = $this->getConnect();
			$channel = $connect->channel();
			$delete = $channel->queue_purge($this->name);
			if ($delete > 0)
				Redis::purge($this->name);

			$channel->close();
			$connect->close();

			return [
				'queue' => $this->name,
				'count' => $delete
			];

		} catch (\Exception $e) {
			throw new Exception(1001, $e->getMessage());
		}
	}

	private function _closureLogin($msg, $queue)
	{
		$data = ['mid' => '', 'body' => '', 'response' => '', 'error' => ''];
		try {
			$data['mid'] = $msg->get('message_id');
			$data['body'] = $msg->body;
			if ($data['mid']) {
				$handle = new LoginHandle();
				$data['response'] = $handle->login($msg->body, $queue, $data['mid']);

				$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
				Redis::set($queue, $data['mid'], 'status', AmqpConfig::STATUS_HAND_OK);
				Redis::set($queue, $data['mid'], 'result', $data['response']);
				Redis::expire($queue, $data['mid'], 'result', 3600);
			}

		} catch (\Exception $e) {
			$data['error'] = $e->getMessage();

		} finally {
			$this->_result[] = $data;
		}
	}
}