|
@@ -1,47 +1,77 @@
|
|
<?php
|
|
<?php
|
|
namespace backend\controllers;
|
|
namespace backend\controllers;
|
|
|
|
|
|
|
|
+use components\Exception;
|
|
use common\logic\Amqp\Message;
|
|
use common\logic\Amqp\Message;
|
|
use common\logic\Amqp\Queue;
|
|
use common\logic\Amqp\Queue;
|
|
use Yii;
|
|
use Yii;
|
|
|
|
|
|
class MessageController extends BaseController
|
|
class MessageController extends BaseController
|
|
{
|
|
{
|
|
|
|
+ public function batch_basic_publish(
|
|
|
|
+ $msg,
|
|
|
|
+ $exchange = '',
|
|
|
|
+ $routing_key = '',
|
|
|
|
+ $mandatory = false,
|
|
|
|
+ $immediate = false,
|
|
|
|
+ $ticket = null
|
|
|
|
+ ) {
|
|
|
|
+ var_dump($exchange);
|
|
|
|
+ $a[] = func_get_args();
|
|
|
|
+ var_dump($a);die;
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* 发送消息 (接受多条)
|
|
* 发送消息 (接受多条)
|
|
* @author: libingke
|
|
* @author: libingke
|
|
|
|
+ * @return array
|
|
|
|
+ * @throws Exception
|
|
*/
|
|
*/
|
|
public function actionSend()
|
|
public function actionSend()
|
|
{
|
|
{
|
|
- //获取接收参数 利用model验证或者判断
|
|
|
|
|
|
+ //if (!Yii::$app->request->isPost)
|
|
|
|
+ // throw new Exception('1001');
|
|
|
|
+
|
|
$params = Yii::$app->request->post();
|
|
$params = Yii::$app->request->post();
|
|
- $post = [
|
|
|
|
- 'message1' => 'message 1',
|
|
|
|
- 'message2' => 'message 2',
|
|
|
|
- 'message3' => 'message 3',
|
|
|
|
|
|
+ //test data todo delete
|
|
|
|
+ $params = [
|
|
|
|
+ 'body' => json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn']),
|
|
'queue' => 'y1',
|
|
'queue' => 'y1',
|
|
];
|
|
];
|
|
|
|
|
|
- try {
|
|
|
|
- $queue = (new Queue())->create($post['queue']);
|
|
|
|
- if ($queue['status'] == 1) {
|
|
|
|
- $message = new Message($queue['result']);
|
|
|
|
- $message->send($post['message1'], $post['queue']);
|
|
|
|
- $message->send($post['message2'], $post['queue']);
|
|
|
|
- $message->send($post['message3'], $post['queue']);
|
|
|
|
|
|
+ if (!isset($params['body']))
|
|
|
|
+ throw new Exception('2001');
|
|
|
|
|
|
- return $result = ['code' => 200, 'message' => Yii::t('common', 'OK')];
|
|
|
|
|
|
+ if (!is_string($params['body']))
|
|
|
|
+ throw new Exception('2002');
|
|
|
|
|
|
- } else {
|
|
|
|
|
|
+ if (!isset($params['queue']))
|
|
|
|
+ throw new Exception('2003');
|
|
|
|
|
|
- return ['code' => $queue['status'], 'message' => $queue['result']];
|
|
|
|
|
|
+ if (!is_string($params['queue']))
|
|
|
|
+ throw new Exception('2004');
|
|
|
|
+
|
|
|
|
+ $body = $params['body'];
|
|
|
|
+ $queue = $params['queue'];
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ $queueModel = (new Queue())->create($queue);
|
|
|
|
+ if ($queueModel['status'] == 1) {
|
|
|
|
+ $message = new Message($queue);
|
|
|
|
+ $message->send($body, $queue);
|
|
|
|
+ $code = 200;
|
|
|
|
+ } else {
|
|
|
|
+ $code = 2100;
|
|
}
|
|
}
|
|
|
|
|
|
} catch (\common\logic\Amqp\Exception $e) {
|
|
} catch (\common\logic\Amqp\Exception $e) {
|
|
- $result = ['code' => $e->getCode(), 'message' => $e->getMessage()];
|
|
|
|
|
|
+ throw new Exception($e->getCode(), $e->getMessage());
|
|
}
|
|
}
|
|
|
|
|
|
- return $result;
|
|
|
|
|
|
+ return [
|
|
|
|
+ 'code' => $code,
|
|
|
|
+ 'message' => Yii::t('error', $code)
|
|
|
|
+ ];
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -50,27 +80,73 @@ class MessageController extends BaseController
|
|
*/
|
|
*/
|
|
public function actionBatchSend()
|
|
public function actionBatchSend()
|
|
{
|
|
{
|
|
- //$params = Yii::$app->request->post();
|
|
|
|
- $post = [
|
|
|
|
- 'messages' => ['message 1', 'message 2'],
|
|
|
|
- 'queue' => 'task_queue',
|
|
|
|
|
|
+ $params = Yii::$app->request->post();
|
|
|
|
+ //test data start todo delete
|
|
|
|
+ $test[] = [
|
|
|
|
+ 'queue' => 'y1',
|
|
|
|
+ 'infos' => [
|
|
|
|
+ json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y1-m1']),
|
|
|
|
+ json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y1-m2'])
|
|
|
|
+ ],
|
|
];
|
|
];
|
|
|
|
+ $test[] = [
|
|
|
|
+ 'queue' => 'y2',
|
|
|
|
+ 'infos' => [
|
|
|
|
+ json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y2-m1']),
|
|
|
|
+ json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y2-m2'])
|
|
|
|
+ ],
|
|
|
|
|
|
- try {
|
|
|
|
- $model = new Message();
|
|
|
|
- $data = $model->batchSend($post['messages'], $post['queue']);
|
|
|
|
-
|
|
|
|
- $result = ['code' => 200, 'message' => Yii::t('common', 'OK'), 'data' => $data];
|
|
|
|
|
|
+ ];
|
|
|
|
+ $params['data'] = json_encode($test);
|
|
|
|
+ //test data end todo delete
|
|
|
|
+
|
|
|
|
+ if (!isset($params['data']))
|
|
|
|
+ throw new Exception('2005');
|
|
|
|
+
|
|
|
|
+ $data = json_decode($params['data'], true);
|
|
|
|
+ if (!is_array($data) || !$data)
|
|
|
|
+ throw new Exception('2006');
|
|
|
|
+
|
|
|
|
+ $errCount = 0;
|
|
|
|
+ foreach ($data as $items) {
|
|
|
|
+ if (!isset($items['queue'])
|
|
|
|
+ || !is_string($items['queue'])
|
|
|
|
+ || !isset($items['infos'])
|
|
|
|
+ || !is_array($items['infos'])
|
|
|
|
+ || !$items['infos'] )
|
|
|
|
+ {
|
|
|
|
+ $errCount ++;
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
|
|
- } catch (\common\logic\Amqp\Exception $e) {
|
|
|
|
- $result = ['code' => $e->getCode(), 'message' => $e->getMessage(), 'data' => []];
|
|
|
|
|
|
+ try {
|
|
|
|
+ //todo check
|
|
|
|
+ //if queue exist
|
|
|
|
+ $queueModel = (new Queue())->create($items['queue']);
|
|
|
|
+ if ($queueModel['status'] == 1) {
|
|
|
|
+ $message = new Message($items['queue']);
|
|
|
|
+ foreach ($items['infos'] as $info) {
|
|
|
|
+ $message->batchSend($info, $items['queue']);
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ $errCount ++;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ } catch (\common\logic\Amqp\Exception $e) {
|
|
|
|
+ $errCount ++;
|
|
|
|
+ //throw new Exception($e->getCode(), $e->getMessage());
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
- return $result;
|
|
|
|
|
|
+ return [
|
|
|
|
+ 'code' => 200,
|
|
|
|
+ 'message' => Yii::t('error', 200),
|
|
|
|
+ 'data' => ['error_count' => $errCount]
|
|
|
|
+ ];
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * 消费消息
|
|
|
|
|
|
+ * 消费消息 todo
|
|
* @author: libingke
|
|
* @author: libingke
|
|
*/
|
|
*/
|
|
public function actionReceive()
|
|
public function actionReceive()
|