request->post(); $post = [ 'message1' => 'message 1', 'message2' => 'message 2', 'message3' => 'message 3', 'queue' => 'y1', ]; try { $queue = (new Queue())->create($post['queue']); if ($queue['status'] == 1) { $message = new Message($queue['result']); $message->send($post['message1'], $post['queue']); $message->send($post['message2'], $post['queue']); $message->send($post['message3'], $post['queue']); return $result = ['code' => 200, 'message' => Yii::t('common', 'OK')]; } else { return ['code' => $queue['status'], 'message' => $queue['result']]; } } catch (\common\logic\Amqp\Exception $e) { $result = ['code' => $e->getCode(), 'message' => $e->getMessage()]; } return $result; } /** * 发送消息 (接受多条) * @author: libingke */ public function actionBatchSend() { //$params = Yii::$app->request->post(); $post = [ 'messages' => ['message 1', 'message 2'], 'queue' => 'task_queue', ]; try { $model = new Message(); $data = $model->batchSend($post['messages'], $post['queue']); $result = ['code' => 200, 'message' => Yii::t('common', 'OK'), 'data' => $data]; } catch (\common\logic\Amqp\Exception $e) { $result = ['code' => $e->getCode(), 'message' => $e->getMessage(), 'data' => []]; } return $result; } /** * 消费消息 * @author: libingke */ public function actionReceive() { //获取接收参数 利用model验证或者判断 $params = Yii::$app->request->post(); $post = [ 'message1' => 'message 1', 'message2' => 'message 2', 'message3' => 'message 3', 'queue' => 'y1', ]; try { $queue = (new Queue())->create($post['queue']); if ($queue['status'] == 1) { $message = new Message($queue['result']); $message->receive($post['queue']); return $result = ['code' => 200, 'message' => Yii::t('common', 'OK')]; } else { return ['code' => $queue['status'], 'message' => $queue['result']]; } } catch (\common\logic\Amqp\Exception $e) { $result = ['code' => $e->getCode(), 'message' => $e->getMessage()]; } return $result; } /** * 批量消费消息 * @author: libingke */ public function actionBatchReceive() { //获取接收参数 利用model验证或者判断 $params = Yii::$app->request->post(); $post = [ 'message1' => 'message 1', 'message2' => 'message 2', 'message3' => 'message 3', 'queue' => 'y1', ]; try { $queue = (new Queue())->create($post['queue']); if ($queue['status'] == 1) { $message = new Message($queue['result']); $message->receive($post['queue']); return $result = ['code' => 200, 'message' => Yii::t('common', 'OK')]; } else { return ['code' => $queue['status'], 'message' => $queue['result']]; } } catch (\common\logic\Amqp\Exception $e) { $result = ['code' => $e->getCode(), 'message' => $e->getMessage()]; } return $result; } /** * 删除消息 * @author: libingke */ public function actionDelete() { //获取接收参数 利用model验证或者判断 $params = Yii::$app->request->post(); $post = [ 'message1' => 'message 1', 'message2' => 'message 2', 'message3' => 'message 3', 'queue' => 'y1', ]; try { $queue = (new Queue())->create($post['queue']); if ($queue['status'] == 1) { $message = new Message($queue['result']); $message->delete($post['message1']); return $result = ['code' => 200, 'message' => Yii::t('common', 'OK')]; } else { return ['code' => $queue['status'], 'message' => $queue['result']]; } } catch (\common\logic\Amqp\Exception $e) { $result = ['code' => $e->getCode(), 'message' => $e->getMessage()]; } return $result; } /** * 批量删除消息 * @author: libingke */ public function actionBatchDelete() { } }