123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263 |
- <?php
- namespace backend\controllers;
- use components\Exception;
- use common\logic\Amqp\Message;
- use common\logic\Amqp\Queue;
- use Yii;
- class MessageController extends BaseController
- {
- public function batch_basic_publish(
- $msg,
- $exchange = '',
- $routing_key = '',
- $mandatory = false,
- $immediate = false,
- $ticket = null
- ) {
- var_dump($exchange);
- $a[] = func_get_args();
- var_dump($a);die;
- }
- /**
- * 发送消息 (接受多条)
- * @author: libingke
- * @return array
- * @throws Exception
- */
- public function actionSend()
- {
- //if (!Yii::$app->request->isPost)
- // throw new Exception('1001');
- $params = Yii::$app->request->post();
- //test data todo delete
- $params = [
- 'body' => json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn']),
- 'queue' => 'y1',
- ];
- if (!isset($params['body']))
- throw new Exception('2001');
- if (!is_string($params['body']))
- throw new Exception('2002');
- if (!isset($params['queue']))
- throw new Exception('2003');
- if (!is_string($params['queue']))
- throw new Exception('2004');
- $body = $params['body'];
- $queue = $params['queue'];
- try {
- $queueModel = (new Queue())->create($queue);
- if ($queueModel['status'] == 1) {
- $message = new Message($queue);
- $message->send($body, $queue);
- $code = 200;
- } else {
- $code = 2100;
- }
- } catch (\common\logic\Amqp\Exception $e) {
- throw new Exception($e->getCode(), $e->getMessage());
- }
- return [
- 'code' => $code,
- 'message' => Yii::t('error', $code)
- ];
- }
- /**
- * 发送消息 (接受多条)
- * @author: libingke
- */
- public function actionBatchSend()
- {
- $params = Yii::$app->request->post();
- //test data start todo delete
- $test[] = [
- 'queue' => 'y1',
- 'infos' => [
- json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y1-m1']),
- json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y1-m2'])
- ],
- ];
- $test[] = [
- 'queue' => 'y2',
- 'infos' => [
- json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y2-m1']),
- json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y2-m2'])
- ],
- ];
- $params['data'] = json_encode($test);
- //test data end todo delete
- if (!isset($params['data']))
- throw new Exception('2005');
- $data = json_decode($params['data'], true);
- if (!is_array($data) || !$data)
- throw new Exception('2006');
- $errCount = 0;
- foreach ($data as $items) {
- if (!isset($items['queue'])
- || !is_string($items['queue'])
- || !isset($items['infos'])
- || !is_array($items['infos'])
- || !$items['infos'] )
- {
- $errCount ++;
- continue;
- }
- try {
- //todo check
- //if queue exist
- $queueModel = (new Queue())->create($items['queue']);
- if ($queueModel['status'] == 1) {
- $message = new Message($items['queue']);
- foreach ($items['infos'] as $info) {
- $message->batchSend($info, $items['queue']);
- }
- } else {
- $errCount ++;
- }
- } catch (\common\logic\Amqp\Exception $e) {
- $errCount ++;
- //throw new Exception($e->getCode(), $e->getMessage());
- }
- }
- return [
- 'code' => 200,
- 'message' => Yii::t('error', 200),
- 'data' => ['error_count' => $errCount]
- ];
- }
- /**
- * 消费消息 todo
- * @author: libingke
- */
- public function actionReceive()
- {
- //获取接收参数 利用model验证或者判断
- $params = Yii::$app->request->post();
- $post = [
- 'message1' => 'message 1',
- 'message2' => 'message 2',
- 'message3' => 'message 3',
- 'queue' => 'y1',
- ];
- try {
- $queue = (new Queue())->create($post['queue']);
- if ($queue['status'] == 1) {
- $message = new Message($queue['result']);
- $message->receive($post['queue']);
- return $result = ['code' => 200, 'message' => Yii::t('common', 'OK')];
- } else {
- return ['code' => $queue['status'], 'message' => $queue['result']];
- }
- } catch (\common\logic\Amqp\Exception $e) {
- $result = ['code' => $e->getCode(), 'message' => $e->getMessage()];
- }
- return $result;
- }
- /**
- * 批量消费消息
- * @author: libingke
- */
- public function actionBatchReceive()
- {
- //获取接收参数 利用model验证或者判断
- $params = Yii::$app->request->post();
- $post = [
- 'message1' => 'message 1',
- 'message2' => 'message 2',
- 'message3' => 'message 3',
- 'queue' => 'y1',
- ];
- try {
- $queue = (new Queue())->create($post['queue']);
- if ($queue['status'] == 1) {
- $message = new Message($queue['result']);
- $message->receive($post['queue']);
- return $result = ['code' => 200, 'message' => Yii::t('common', 'OK')];
- } else {
- return ['code' => $queue['status'], 'message' => $queue['result']];
- }
- } catch (\common\logic\Amqp\Exception $e) {
- $result = ['code' => $e->getCode(), 'message' => $e->getMessage()];
- }
- return $result;
- }
- /**
- * 删除消息
- * @author: libingke
- */
- public function actionDelete()
- {
- //获取接收参数 利用model验证或者判断
- $params = Yii::$app->request->post();
- $post = [
- 'message1' => 'message 1',
- 'message2' => 'message 2',
- 'message3' => 'message 3',
- 'queue' => 'y1',
- ];
- try {
- $queue = (new Queue())->create($post['queue']);
- if ($queue['status'] == 1) {
- $message = new Message($queue['result']);
- $message->delete($post['message1']);
- return $result = ['code' => 200, 'message' => Yii::t('common', 'OK')];
- } else {
- return ['code' => $queue['status'], 'message' => $queue['result']];
- }
- } catch (\common\logic\Amqp\Exception $e) {
- $result = ['code' => $e->getCode(), 'message' => $e->getMessage()];
- }
- return $result;
- }
- /**
- * 批量删除消息
- * @author: libingke
- */
- public function actionBatchDelete()
- {
- }
- }
|