|
@@ -0,0 +1,207 @@
|
|
|
+<?php
|
|
|
+
|
|
|
+namespace common\logic\MQMessage;
|
|
|
+
|
|
|
+use PhpAmqpLib\Message\AMQPMessage;
|
|
|
+use PhpAmqpLib\Connection\AMQPStreamConnection;
|
|
|
+
|
|
|
+
|
|
|
+ * 处理消息体逻辑
|
|
|
+ * Class Message
|
|
|
+ * @package common\logic\MQMessage
|
|
|
+ */
|
|
|
+class Message extends Config
|
|
|
+{
|
|
|
+ private $connection;
|
|
|
+
|
|
|
+ private $channel;
|
|
|
+
|
|
|
+ private $callback_queue;
|
|
|
+
|
|
|
+ private $response = null;
|
|
|
+
|
|
|
+ private $corr_id;
|
|
|
+
|
|
|
+ private $result = '';
|
|
|
+
|
|
|
+ public function __construct()
|
|
|
+ {
|
|
|
+ $this->connection = new AMQPStreamConnection(
|
|
|
+ self::HOST,
|
|
|
+ self::PORT,
|
|
|
+ self::USER,
|
|
|
+ self::PASS
|
|
|
+ );
|
|
|
+
|
|
|
+ $this->channel = $this->connection->channel();
|
|
|
+ list($this->callback_queue, ,) = $this->channel->queue_declare(
|
|
|
+ '', false, false, true, false
|
|
|
+ );
|
|
|
+
|
|
|
+ $this->channel->basic_consume(
|
|
|
+ $this->callback_queue, '', false, false, false, false,
|
|
|
+ array($this, 'on_response')
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ * [发送信息]
|
|
|
+ * @author: libingke
|
|
|
+ * @param string $body 消息主体
|
|
|
+ * @param string $routing_key
|
|
|
+ * @return array 返回数据
|
|
|
+ */
|
|
|
+ public function send($body, $routing_key = 'task_queue')
|
|
|
+ {
|
|
|
+ $properties = [
|
|
|
+ 'correlation_id' => uniqid(),
|
|
|
+ 'reply_to' => $this->callback_queue
|
|
|
+ ];
|
|
|
+
|
|
|
+ if (is_string($body)) {
|
|
|
+ $msg = new AMQPMessage( (string) $body,$properties);
|
|
|
+ $this->result = $this->channel->basic_publish($msg, '', $routing_key);
|
|
|
+ }
|
|
|
+
|
|
|
+ $this->channel->close();
|
|
|
+ $this->connection->close();
|
|
|
+
|
|
|
+ return $this->result;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ * [批量发送信息]
|
|
|
+ * @author: libingke
|
|
|
+ * @param string $body 消息主体
|
|
|
+ * @param string $routing_key
|
|
|
+ * @return array 返回数据
|
|
|
+ */
|
|
|
+ public function batchSend($body, $routing_key = '')
|
|
|
+ {
|
|
|
+ $properties = [
|
|
|
+ 'correlation_id' => uniqid(),
|
|
|
+ 'reply_to' => $this->callback_queue
|
|
|
+ ];
|
|
|
+
|
|
|
+ if (is_array($body)) {
|
|
|
+ $msg = new AMQPMessage((string) $body, $properties);
|
|
|
+ $this->result = $this->channel->batch_basic_publish($msg, '', $routing_key);
|
|
|
+ }
|
|
|
+
|
|
|
+ $this->channel->close();
|
|
|
+ $this->connection->close();
|
|
|
+
|
|
|
+ return $this->result;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ * [消费信息]
|
|
|
+ * @author: libingke
|
|
|
+ * @param string $queue 队列名称
|
|
|
+ * @param string $str
|
|
|
+ * @return array 返回数据
|
|
|
+ */
|
|
|
+ public function receive($queue, $str)
|
|
|
+ {
|
|
|
+ $callback = function($msg) {
|
|
|
+ echo " [x] Received ", $msg->body, "\n";
|
|
|
+ };
|
|
|
+
|
|
|
+ $this->result = $this->channel->basic_consume($queue, '', false, true, false, false, $callback);
|
|
|
+
|
|
|
+ while(count($this->channel->callbacks)) {
|
|
|
+ $this->channel->wait();
|
|
|
+ }
|
|
|
+
|
|
|
+ return $this->result;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ * [批量消费信息]
|
|
|
+ * @author: libingke
|
|
|
+ * @param string $queue 队列名称
|
|
|
+ * @param string $arr
|
|
|
+ * @return array 返回数据
|
|
|
+ */
|
|
|
+ public function batchReceive($queue, $arr)
|
|
|
+ {
|
|
|
+ $callback = function($msg) {
|
|
|
+ echo " [x] Received ", $msg->body, "\n";
|
|
|
+ };
|
|
|
+
|
|
|
+ $this->result = $this->channel->basic_consume($queue, '', false, true, false, false, $callback);
|
|
|
+
|
|
|
+ while(count($this->channel->callbacks)) {
|
|
|
+ $this->channel->wait();
|
|
|
+ }
|
|
|
+
|
|
|
+ return $this->result;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ * [删除信息]
|
|
|
+ * @author: libingke
|
|
|
+ */
|
|
|
+ public function delete()
|
|
|
+ {
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ public function on_response($rep)
|
|
|
+ {
|
|
|
+ if($rep->get('correlation_id') == $this->corr_id) {
|
|
|
+ $this->result .= $rep->body;
|
|
|
+ $this->response = $rep->body;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public static function CallMq($n)
|
|
|
+ {
|
|
|
+
|
|
|
+ $connection = new AMQPStreamConnection(
|
|
|
+ self::HOST, self::PORT, self::USER, self::PASS
|
|
|
+ );
|
|
|
+ $channel = $connection->channel();
|
|
|
+
|
|
|
+ $channel->queue_declare('task_queue', false, true, false, false);
|
|
|
+
|
|
|
+ $data=empty($n)?"Hello World!":$n;
|
|
|
+
|
|
|
+ $msg = new AMQPMessage($data,
|
|
|
+ array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
|
|
|
+ );
|
|
|
+
|
|
|
+ $channel->basic_publish($msg, '', 'task_queue');
|
|
|
+
|
|
|
+ $channel->close();
|
|
|
+ $connection->close();
|
|
|
+
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ public static function CallUserMq($n)
|
|
|
+ {
|
|
|
+
|
|
|
+ $connection = new AMQPStreamConnection(
|
|
|
+ self::HOST, self::PORT, self::USER, self::PASS
|
|
|
+ );
|
|
|
+ $channel = $connection->channel();
|
|
|
+
|
|
|
+ $channel->queue_declare('login', false, true, false, false);
|
|
|
+
|
|
|
+ $data=empty($n)?"Hello World!":$n;
|
|
|
+
|
|
|
+ $msg = new AMQPMessage($data,
|
|
|
+ array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
|
|
|
+ );
|
|
|
+
|
|
|
+ $channel->basic_publish($msg, '', 'login');
|
|
|
+
|
|
|
+ $channel->close();
|
|
|
+ $connection->close();
|
|
|
+
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+}
|