提示信息 if (Cache::STATUS_HAND_OK == $status) { return Cache::getData('result_' . $query);//直接返回数据 } else { throw new Exception(2101, $mark);//非正常时回执 } } else { throw new Exception(2102);//无效 } } else { throw new Exception(2102);//无效 } } /** * 发送消息 * @author: libingke * @return array * @throws Exception */ public function actionSend() { /* 选择对应表单 start */ $form = Yii::$app->request->get('form'); switch ($form) { //发送登录消息 case 'licai_login': $scenario = 'login'; break; case '': throw new Exception(2201); break; default: throw new Exception(2202); } /* 选择对应表单 end */ if (!Yii::$app->request->isPost) throw new Exception('1001'); //验证 $model = new MessageForm(); $model->setScenario($scenario); $model->load(['MessageForm' => Yii::$app->request->post()]); $data = []; if ($model->validate()) { $data['queueName'] = MessageForm::getQueueName($scenario); $data['requestId'] = $model->sendMessage(); //send } else { $model->handleError();//处理验证失败 } return [ 'code' => 200, 'message' => 'OK', 'data' => $data ]; } /** * 发送消息 (接受多条) * @author: libingke */ public function actionBatchSend() { $params = Yii::$app->request->post(); //test data start todo delete $test[] = [ 'queue' => 'y1', 'infos' => [ json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y1-m1']), json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y1-m2']) ], ]; $test[] = [ 'queue' => 'y2', 'infos' => [ json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y2-m1']), json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y2-m2']) ], ]; $params['data'] = json_encode($test); //test data end todo delete if (!isset($params['data'])) throw new Exception('2005'); $data = json_decode($params['data'], true); if (!is_array($data) || !$data) throw new Exception('2006'); $errCount = 0; foreach ($data as $items) { if (!isset($items['queue']) || !is_string($items['queue']) || !isset($items['infos']) || !is_array($items['infos']) || !$items['infos'] ) { $errCount ++; continue; } try { //todo check //if queue exist $queueModel = (new Queue())->create($items['queue']); if ($queueModel['status'] == 1) { $message = new Message($items['queue']); foreach ($items['infos'] as $info) { $message->batchSend($info, $items['queue']); } } else { $errCount ++; } } catch (\common\logic\Amqp\Exception $e) { $errCount ++; //throw new Exception($e->getCode(), $e->getMessage()); } } return [ 'code' => 200, 'message' => Yii::t('error', 200), 'data' => ['error_count' => $errCount] ]; } /** * 消费消息 todo * @author: libingke */ public function actionReceive() { //获取接收参数 利用model验证或者判断 $params = Yii::$app->request->post(); $post = [ 'message1' => 'message 1', 'message2' => 'message 2', 'message3' => 'message 3', 'queue' => 'y1', ]; try { $queue = (new Queue())->create($post['queue']); if ($queue['status'] == 1) { $message = new Message($queue['result']); $message->receive($post['queue']); return $result = ['code' => 200, 'message' => Yii::t('common', 'OK')]; } else { return ['code' => $queue['status'], 'message' => $queue['result']]; } } catch (\common\logic\Amqp\Exception $e) { $result = ['code' => $e->getCode(), 'message' => $e->getMessage()]; } return $result; } /** * 批量消费消息 * @author: libingke */ public function actionBatchReceive() { //获取接收参数 利用model验证或者判断 $params = Yii::$app->request->post(); $post = [ 'message1' => 'message 1', 'message2' => 'message 2', 'message3' => 'message 3', 'queue' => 'y1', ]; try { $queue = (new Queue())->create($post['queue']); if ($queue['status'] == 1) { $message = new Message($queue['result']); $message->receive($post['queue']); return $result = ['code' => 200, 'message' => Yii::t('common', 'OK')]; } else { return ['code' => $queue['status'], 'message' => $queue['result']]; } } catch (\common\logic\Amqp\Exception $e) { $result = ['code' => $e->getCode(), 'message' => $e->getMessage()]; } return $result; } /** * 删除消息 * @author: libingke */ public function actionDelete() { //获取接收参数 利用model验证或者判断 $params = Yii::$app->request->post(); $post = [ 'message1' => 'message 1', 'message2' => 'message 2', 'message3' => 'message 3', 'queue' => 'y1', ]; try { $queue = (new Queue())->create($post['queue']); if ($queue['status'] == 1) { $message = new Message($queue['result']); $message->delete($post['message1']); return $result = ['code' => 200, 'message' => Yii::t('common', 'OK')]; } else { return ['code' => $queue['status'], 'message' => $queue['result']]; } } catch (\common\logic\Amqp\Exception $e) { $result = ['code' => $e->getCode(), 'message' => $e->getMessage()]; } return $result; } /** * 批量删除消息 * @author: libingke */ public function actionBatchDelete() { } }