request->isPost) // throw new Exception('1001'); $params = Yii::$app->request->post(); //test data todo delete $params = [ 'body' => json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn']), 'queue' => 'y1', ]; if (!isset($params['body'])) throw new Exception('2001'); if (!is_string($params['body'])) throw new Exception('2002'); if (!isset($params['queue'])) throw new Exception('2003'); if (!is_string($params['queue'])) throw new Exception('2004'); $body = $params['body']; $queue = $params['queue']; try { $queueModel = (new Queue())->create($queue); if ($queueModel['status'] == 1) { $message = new Message($queue); $message->send($body, $queue); $code = 200; } else { $code = 2100; } } catch (\common\logic\Amqp\Exception $e) { throw new Exception($e->getCode(), $e->getMessage()); } return [ 'code' => $code, 'message' => Yii::t('error', $code) ]; } /** * 发送消息 (接受多条) * @author: libingke */ public function actionBatchSend() { $params = Yii::$app->request->post(); //test data start todo delete $test[] = [ 'queue' => 'y1', 'infos' => [ json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y1-m1']), json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y1-m2']) ], ]; $test[] = [ 'queue' => 'y2', 'infos' => [ json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y2-m1']), json_encode(['type' => 'post', 'url' => 'http://zpapi.licai.cn', 'message' => 'y2-m2']) ], ]; $params['data'] = json_encode($test); //test data end todo delete if (!isset($params['data'])) throw new Exception('2005'); $data = json_decode($params['data'], true); if (!is_array($data) || !$data) throw new Exception('2006'); $errCount = 0; foreach ($data as $items) { if (!isset($items['queue']) || !is_string($items['queue']) || !isset($items['infos']) || !is_array($items['infos']) || !$items['infos'] ) { $errCount ++; continue; } try { //todo check //if queue exist $queueModel = (new Queue())->create($items['queue']); if ($queueModel['status'] == 1) { $message = new Message($items['queue']); foreach ($items['infos'] as $info) { $message->batchSend($info, $items['queue']); } } else { $errCount ++; } } catch (\common\logic\Amqp\Exception $e) { $errCount ++; //throw new Exception($e->getCode(), $e->getMessage()); } } return [ 'code' => 200, 'message' => Yii::t('error', 200), 'data' => ['error_count' => $errCount] ]; } /** * 消费消息 todo * @author: libingke */ public function actionReceive() { //获取接收参数 利用model验证或者判断 $params = Yii::$app->request->post(); $post = [ 'message1' => 'message 1', 'message2' => 'message 2', 'message3' => 'message 3', 'queue' => 'y1', ]; try { $queue = (new Queue())->create($post['queue']); if ($queue['status'] == 1) { $message = new Message($queue['result']); $message->receive($post['queue']); return $result = ['code' => 200, 'message' => Yii::t('common', 'OK')]; } else { return ['code' => $queue['status'], 'message' => $queue['result']]; } } catch (\common\logic\Amqp\Exception $e) { $result = ['code' => $e->getCode(), 'message' => $e->getMessage()]; } return $result; } /** * 批量消费消息 * @author: libingke */ public function actionBatchReceive() { //获取接收参数 利用model验证或者判断 $params = Yii::$app->request->post(); $post = [ 'message1' => 'message 1', 'message2' => 'message 2', 'message3' => 'message 3', 'queue' => 'y1', ]; try { $queue = (new Queue())->create($post['queue']); if ($queue['status'] == 1) { $message = new Message($queue['result']); $message->receive($post['queue']); return $result = ['code' => 200, 'message' => Yii::t('common', 'OK')]; } else { return ['code' => $queue['status'], 'message' => $queue['result']]; } } catch (\common\logic\Amqp\Exception $e) { $result = ['code' => $e->getCode(), 'message' => $e->getMessage()]; } return $result; } /** * 删除消息 * @author: libingke */ public function actionDelete() { //获取接收参数 利用model验证或者判断 $params = Yii::$app->request->post(); $post = [ 'message1' => 'message 1', 'message2' => 'message 2', 'message3' => 'message 3', 'queue' => 'y1', ]; try { $queue = (new Queue())->create($post['queue']); if ($queue['status'] == 1) { $message = new Message($queue['result']); $message->delete($post['message1']); return $result = ['code' => 200, 'message' => Yii::t('common', 'OK')]; } else { return ['code' => $queue['status'], 'message' => $queue['result']]; } } catch (\common\logic\Amqp\Exception $e) { $result = ['code' => $e->getCode(), 'message' => $e->getMessage()]; } return $result; } /** * 批量删除消息 * @author: libingke */ public function actionBatchDelete() { } }