123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286 |
- <?php
- namespace backend\controllers;
- use backend\models\Jobs;
- use backend\forms\MessageForm;
- use common\logic\Amqp\Cache;
- use common\logic\Amqp\Message;
- use common\logic\Amqp\Queue;
- use components\Exception;
- use Yii;
- class MessageController extends BaseController
- {
- /**
- * [查询回执信息]
- * @author: libingke
- */
- public function actionQueryReceipt($query)
- {
- if (is_string($query)) {
- $status = Cache::getData($query);
- if ($status !== false) {
- $mark = Cache::getMarkById($status);//状态码 => 提示信息
- if (Cache::STATUS_HAND_OK == $status) {
- return Cache::getData('result_' . $query);//直接返回数据
- } else {
- throw new Exception(2101, $mark);//非正常时回执
- }
- } else {
- throw new Exception(2102);//无效
- }
- } else {
- throw new Exception(2102);//无效
- }
- }
- /**
- * 发送消息
- * @author: libingke
- * @return array
- * @throws Exception
- */
- public function actionSend()
- {
- $sign = Yii::$app->request->get('sign');
- if ($sign == '')
- throw new Exception(2201);
- $job = Jobs::fetchDataBySign($sign, false);
- if (!isset($job['scenario']))
- throw new Exception(2202);//验证场景
- $scenario = $job['scenario'];
- $queue = $job['queue'];
- if (!Yii::$app->request->isPost)
- throw new Exception('1001');
- $model = new MessageForm();
- $model->setScenario($scenario);
- $model->setQueue($queue);
- $model->load(['MessageForm' => Yii::$app->request->post()]);
- $data = [];
- if ($model->validate()) {
- $data['queueName'] = $model->getQueue();
- $data['requestId'] = $model->sendMessage();
- } else {
- $model->handleError();//处理验证失败
- }
- return [
- 'code' => 200,
- 'message' => Yii::t('error', 200),
- 'data' => $data
- ];
- }
- /**
- * 发送消息
- * @author: libingke
- */
- public function actionBatchSend()
- {
- $sign = Yii::$app->request->get('sign');
- if ($sign == '')
- throw new Exception(2201);
- $job = Jobs::fetchDataBySign($sign, false);
- if (!isset($job['scenario']))
- throw new Exception(2202);//验证场景
- $scenario = $job['scenario'];
- $queue = $job['queue'];
- if (!Yii::$app->request->isPost)
- throw new Exception('1001');
- $model = new MessageForm();
- $model->setScenario($scenario);
- $model->setQueue($queue);
- $model->load(['MessageForm' => Yii::$app->request->post()]);
- $data = [];
- if ($model->validate()) {
- $data['queueName'] = $model->getQueue();
- $data['requestId'] = $model->sendMessage();
- } else {
- $model->handleError();//处理验证失败
- }
- return [
- 'code' => 200,
- 'message' => Yii::t('error', 200),
- 'data' => $data
- ];
- $params = Yii::$app->request->post();
- //test data start todo delete
- $test[] = [
- 'queue' => 'y1',
- 'infos' => [
- json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y1-m1']),
- json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y1-m2'])
- ],
- ];
- $test[] = [
- 'queue' => 'y2',
- 'infos' => [
- json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y2-m1']),
- json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y2-m2'])
- ],
- ];
- $params['data'] = json_encode($test);
- //test data end todo delete
- if (!isset($params['data']))
- throw new Exception('2005');
- $data = json_decode($params['data'], true);
- if (!is_array($data) || !$data)
- throw new Exception('2006');
- $errCount = 0;
- foreach ($data as $items) {
- if (!isset($items['queue'])
- || !is_string($items['queue'])
- || !isset($items['infos'])
- || !is_array($items['infos'])
- || !$items['infos'] )
- {
- $errCount ++;
- continue;
- }
- try {
- //todo check
- //if queue exist
- $queueModel = (new Queue())->create($items['queue']);
- if ($queueModel['status'] == 1) {
- $message = new Message($items['queue']);
- foreach ($items['infos'] as $info) {
- $message->batchSend($info, $items['queue']);
- }
- } else {
- $errCount ++;
- }
- } catch (\common\logic\Amqp\Exception $e) {
- $errCount ++;
- //throw new Exception($e->getCode(), $e->getMessage());
- }
- }
- return [
- 'code' => 200,
- 'message' => Yii::t('error', 200),
- 'data' => ['error_count' => $errCount]
- ];
- }
- /**
- * 消费消息 todo
- * @author: libingke
- */
- public function actionConsume()
- {
- $queue = 'login';
- try {
- $message = new Message($queue);
- $message->consume($queue);
- return $result = ['code' => 200, 'message' => Yii::t('common', 'OK')];
- } catch (\common\logic\Amqp\Exception $e) {
- $result = ['code' => $e->getCode(), 'message' => $e->getMessage()];
- }
- return $result;
- }
- /**
- * 批量消费消息
- * @author: libingke
- */
- public function actionBatchReceive()
- {
- //获取接收参数 利用model验证或者判断
- $params = Yii::$app->request->post();
- $post = [
- 'message1' => 'message 1',
- 'message2' => 'message 2',
- 'message3' => 'message 3',
- 'queue' => 'y1',
- ];
- try {
- $queue = (new Queue())->create($post['queue']);
- if ($queue['status'] == 1) {
- $message = new Message($queue['result']);
- $message->receive($post['queue']);
- return $result = ['code' => 200, 'message' => Yii::t('common', 'OK')];
- } else {
- return ['code' => $queue['status'], 'message' => $queue['result']];
- }
- } catch (\common\logic\Amqp\Exception $e) {
- $result = ['code' => $e->getCode(), 'message' => $e->getMessage()];
- }
- return $result;
- }
- /**
- * 删除消息
- * @author: libingke
- */
- public function actionDelete()
- {
- //获取接收参数 利用model验证或者判断
- $params = Yii::$app->request->post();
- $post = [
- 'message1' => 'message 1',
- 'message2' => 'message 2',
- 'message3' => 'message 3',
- 'queue' => 'y1',
- ];
- try {
- $queue = (new Queue())->create($post['queue']);
- if ($queue['status'] == 1) {
- $message = new Message($queue['result']);
- $message->delete($post['message1']);
- return $result = ['code' => 200, 'message' => Yii::t('common', 'OK')];
- } else {
- return ['code' => $queue['status'], 'message' => $queue['result']];
- }
- } catch (\common\logic\Amqp\Exception $e) {
- $result = ['code' => $e->getCode(), 'message' => $e->getMessage()];
- }
- return $result;
- }
- /**
- * 批量删除消息
- * @author: libingke
- */
- public function actionBatchDelete()
- {
- }
- }
|