提示信息 if (Cache::STATUS_HAND_OK == $status) { return Cache::getData('result_' . $query);//直接返回数据 } else { throw new Exception(2101, $mark);//非正常时回执 } } else { throw new Exception(2102);//无效 } } else { throw new Exception(2102);//无效 } } /** * 发送消息 * @author: libingke * @return array * @throws Exception */ public function actionSend() { $sign = Yii::$app->request->get('sign'); if ($sign == '') throw new Exception(2201); $job = Jobs::fetchDataBySign($sign, false); if (!isset($job['scenario'])) throw new Exception(2202);//验证场景 $scenario = $job['scenario']; $queue = $job['queue']; if (!Yii::$app->request->isPost) throw new Exception('1001'); $model = new MessageForm(); $model->setScenario($scenario); $model->setQueue($queue); $model->load(['MessageForm' => Yii::$app->request->post()]); $data = []; if ($model->validate()) { $data['queueName'] = $model->getQueue(); $data['requestId'] = $model->sendMessage(); } else { $model->handleError();//处理验证失败 } return [ 'code' => 200, 'message' => Yii::t('error', 200), 'data' => $data ]; } /** * 发送消息 * @author: libingke */ public function actionBatchSend() { $sign = Yii::$app->request->get('sign'); if ($sign == '') throw new Exception(2201); $job = Jobs::fetchDataBySign($sign, false); if (!isset($job['scenario'])) throw new Exception(2202);//验证场景 $scenario = $job['scenario']; $queue = $job['queue']; if (!Yii::$app->request->isPost) throw new Exception('1001'); $model = new MessageForm(); $model->setScenario($scenario); $model->setQueue($queue); $model->load(['MessageForm' => Yii::$app->request->post()]); $data = []; if ($model->validate()) { $data['queueName'] = $model->getQueue(); $data['requestId'] = $model->sendMessage(); } else { $model->handleError();//处理验证失败 } return [ 'code' => 200, 'message' => Yii::t('error', 200), 'data' => $data ]; $params = Yii::$app->request->post(); //test data start todo delete $test[] = [ 'queue' => 'y1', 'infos' => [ json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y1-m1']), json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y1-m2']) ], ]; $test[] = [ 'queue' => 'y2', 'infos' => [ json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y2-m1']), json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y2-m2']) ], ]; $params['data'] = json_encode($test); //test data end todo delete if (!isset($params['data'])) throw new Exception('2005'); $data = json_decode($params['data'], true); if (!is_array($data) || !$data) throw new Exception('2006'); $errCount = 0; foreach ($data as $items) { if (!isset($items['queue']) || !is_string($items['queue']) || !isset($items['infos']) || !is_array($items['infos']) || !$items['infos'] ) { $errCount ++; continue; } try { //todo check //if queue exist $queueModel = (new Queue())->create($items['queue']); if ($queueModel['status'] == 1) { $message = new Message($items['queue']); foreach ($items['infos'] as $info) { $message->batchSend($info, $items['queue']); } } else { $errCount ++; } } catch (\common\logic\Amqp\Exception $e) { $errCount ++; //throw new Exception($e->getCode(), $e->getMessage()); } } return [ 'code' => 200, 'message' => Yii::t('error', 200), 'data' => ['error_count' => $errCount] ]; } /** * 消费消息 todo * @author: libingke */ public function actionConsume() { $queue = 'login'; try { $message = new Message($queue); $message->consume($queue); return $result = ['code' => 200, 'message' => Yii::t('common', 'OK')]; } catch (\common\logic\Amqp\Exception $e) { $result = ['code' => $e->getCode(), 'message' => $e->getMessage()]; } return $result; } /** * 批量消费消息 * @author: libingke */ public function actionBatchReceive() { //获取接收参数 利用model验证或者判断 $params = Yii::$app->request->post(); $post = [ 'message1' => 'message 1', 'message2' => 'message 2', 'message3' => 'message 3', 'queue' => 'y1', ]; try { $queue = (new Queue())->create($post['queue']); if ($queue['status'] == 1) { $message = new Message($queue['result']); $message->receive($post['queue']); return $result = ['code' => 200, 'message' => Yii::t('common', 'OK')]; } else { return ['code' => $queue['status'], 'message' => $queue['result']]; } } catch (\common\logic\Amqp\Exception $e) { $result = ['code' => $e->getCode(), 'message' => $e->getMessage()]; } return $result; } /** * 删除消息 * @author: libingke */ public function actionDelete() { //获取接收参数 利用model验证或者判断 $params = Yii::$app->request->post(); $post = [ 'message1' => 'message 1', 'message2' => 'message 2', 'message3' => 'message 3', 'queue' => 'y1', ]; try { $queue = (new Queue())->create($post['queue']); if ($queue['status'] == 1) { $message = new Message($queue['result']); $message->delete($post['message1']); return $result = ['code' => 200, 'message' => Yii::t('common', 'OK')]; } else { return ['code' => $queue['status'], 'message' => $queue['result']]; } } catch (\common\logic\Amqp\Exception $e) { $result = ['code' => $e->getCode(), 'message' => $e->getMessage()]; } return $result; } /** * 批量删除消息 * @author: libingke */ public function actionBatchDelete() { } }