wangbiaoyan 4 ヶ月 前
コミット
1ba1cb453a

+ 139 - 0
src/Bean/Mq/ConsumeParamBean.php

@@ -0,0 +1,139 @@
+<?php
+
+namespace Tool\MayouTool\Bean\Mq;
+
+use Tool\MayouTool\Mq\MqConsumeInterface;
+
+class ConsumeParamBean
+{
+    /**
+     * 交换机名称
+     * @var string $exchangeName
+     */
+    private $exchangeName;
+
+    /**
+     * 交换机类型
+     * @var string $exchangeType
+     */
+    private $exchangeType;
+
+    /**
+     * 队列名称
+     * @var string $queueName
+     */
+    private $queueName;
+
+    /**
+     * 路由键
+     * @var string $routeKey
+     */
+    private $routeKey;
+
+    /**
+     * 消费函数
+     * @var array $callback
+     */
+    private $callback;
+
+    /**
+     * @var $object MqConsumeInterface
+     */
+    private $object;
+
+    /**
+     * @return MqConsumeInterface
+     */
+    public function getObject(): MqConsumeInterface
+    {
+        return $this->object;
+    }
+
+    /**
+     * @param MqConsumeInterface $object
+     */
+    public function setObject(MqConsumeInterface $object)
+    {
+        $this->object = $object;
+    }
+
+    /**
+     * @return string
+     */
+    public function getExchangeName(): string
+    {
+        return $this->exchangeName;
+    }
+
+    /**
+     * @param string $exchangeName
+     */
+    public function setExchangeName(string $exchangeName)
+    {
+        $this->exchangeName = $exchangeName;
+    }
+
+    /**
+     * @return string
+     */
+    public function getExchangeType(): string
+    {
+        return $this->exchangeType;
+    }
+
+    /**
+     * @param string $exchangeType
+     */
+    public function setExchangeType(string $exchangeType)
+    {
+        $this->exchangeType = $exchangeType;
+    }
+
+    /**
+     * @return string
+     */
+    public function getQueueName(): string
+    {
+        return $this->queueName;
+    }
+
+    /**
+     * @param string $queueName
+     */
+    public function setQueueName(string $queueName)
+    {
+        $this->queueName = $queueName;
+    }
+
+    /**
+     * @return string
+     */
+    public function getRouteKey(): string
+    {
+        return $this->routeKey;
+    }
+
+    /**
+     * @param string $routeKey
+     */
+    public function setRouteKey(string $routeKey)
+    {
+        $this->routeKey = $routeKey;
+    }
+
+    /**
+     * @return array
+     */
+    public function getCallback(): array
+    {
+        return $this->callback;
+    }
+
+    /**
+     * @param array $callback
+     */
+    public function setCallback(array $callback)
+    {
+        $this->callback = $callback;
+    }
+}

+ 8 - 0
src/Exception/MqException.php

@@ -0,0 +1,8 @@
+<?php
+
+namespace Tool\MayouTool\Exception;
+
+class MqException extends \Exception
+{
+
+}

+ 83 - 0
src/Mq/MqConsumeInterface.php

@@ -0,0 +1,83 @@
+<?php
+
+
+namespace Tool\MayouTool\Mq;
+
+
+use Tool\MayouTool\Bean\Mq\ConsumeParamBean;
+use Tool\MayouTool\MqTool;
+
+abstract class MqConsumeInterface
+{
+    /**
+     * 交换机名称
+     * @var string $exchangeName
+     */
+    protected $exchangeName;
+
+    /**
+     * 队列名称
+     * @var string $queueName
+     */
+    protected $queueName;
+
+    /**
+     * 路由键
+     * @var string $routeKey
+     */
+    protected $routeKey;
+
+    /**
+     * 交换机类型(默认直连模式)
+     * @var string $exchangeType
+     */
+    protected $exchangeType = AMQP_EX_TYPE_DIRECT;
+
+    /**
+     * 消费前置动作
+     * @return mixed
+     */
+    public function bootStart()
+    {
+    }
+
+    /**
+     * 消费后置动作
+     * @return mixed
+     */
+    public function bootEnd()
+    {
+    }
+
+    /**
+     * 消费处理函数
+     * @param \AMQPEnvelope $AMQPEnvelope
+     * @param \AMQPQueue $AMQPQueue
+     * @return mixed
+     */
+    public abstract function consumeCallback(\AMQPEnvelope $AMQPEnvelope, \AMQPQueue $AMQPQueue);
+
+    public function handleConsume()
+    {
+        $params = [
+            "exchangeName" => $this->exchangeName,
+            "exchangeType" => $this->exchangeType,
+            "queueName"    => $this->queueName,
+            "routeKey"     => $this->routeKey,
+            "callback"     => [ $this, "consumeCallback" ],
+            "object"       => $this
+        ];
+        $consumeParamBean = new ConsumeParamBean($params);
+        $rabbitMq = RabbitMq::getInstance();
+        //声明交换机
+        $rabbitMq->declareExchange($consumeParamBean->getExchangeName(), $consumeParamBean->getExchangeType());
+        //声明队列
+        $rabbitMq->declareQueue($consumeParamBean->getQueueName(), $consumeParamBean->getRouteKey());
+        while (true) {
+            try {
+                MqTool::consume($consumeParamBean, $rabbitMq);
+            } catch (\Throwable $exception) {
+            }
+        }
+    }
+}

+ 30 - 0
src/Mq/MqInterface.php

@@ -0,0 +1,30 @@
+<?php
+
+
+namespace Tool\MayouTool\Mq;
+
+
+interface MqInterface
+{
+    /**
+     * 链接mq
+     * @return mixed
+     */
+    public function connect();
+
+    /**
+     * 推送消息到mq
+     * @param string $message 消息体
+     * @param string $routingKey 路由键
+     * @return mixed
+     */
+    public function publish(string $message,string $routingKey);
+
+    /**
+     * 消费消息
+     * @param callable $callback 处理函数
+     * @return mixed
+     */
+    public function consume(callable $callback);
+
+}

+ 11 - 5
src/Mq/MqService.php

@@ -14,6 +14,12 @@ use PhpAmqpLib\Message\AMQPMessage;
 class MqService
 {
 
+    /**
+     * mq链接
+     * @var $connection AMQPStreamConnection
+     */
+    static $connection = null;
+
     /**
      * @var string 地址
      */
@@ -55,13 +61,16 @@ class MqService
      */
     public static function getMqConnection()
     {
+        if (self::$connection) {
+            return self::$connection;
+        }
         self::$host = env("MQ_HOST");
         self::$port = env("MQ_PORT");
         self::$userName = env("MQ_USER_NAME");
         self::$userPassword = env("MQ_USER_PASSWORD");
         //连接mq
-        $connection = new AMQPStreamConnection(self::$host, self::$port, self::$userName, self::$userPassword);
-        return $connection;
+        self::$connection = new AMQPStreamConnection(self::$host, self::$port, self::$userName, self::$userPassword);
+        return self::$connection;
     }
 
     /**
@@ -101,15 +110,12 @@ class MqService
         $connection = self::getMqConnection();
         //创建channel
         $channel = $connection->channel();
-
         //创建mq消息
         $msg = new AMQPMessage($message);
         //推送消息到mq
         $channel->basic_publish($msg, $exchange, $routeKey);
-
         //发送成功之后关闭channel和connection
         $channel->close();
-        $connection->close();
     }
 
 }

+ 223 - 0
src/Mq/RabbitMq.php

@@ -0,0 +1,223 @@
+<?php
+
+
+namespace Tool\MayouTool\Mq;
+
+
+use AMQPChannel;
+use AMQPConnection;
+use AMQPExchange;
+use AMQPQueue;
+
+class RabbitMq implements MqInterface
+{
+    /**
+     * 配置数组
+     * @var array $config
+     */
+    private $config;
+
+    /**
+     * 链接
+     * @var AMQPConnection $connection
+     */
+    private $connection;
+
+    /**
+     * 渠道
+     * @var AMQPChannel $channel
+     */
+    private $channel;
+
+    /**
+     * 交换机
+     * @var AMQPExchange $exchange
+     */
+    private $exchange;
+
+    /**
+     * 消息队列
+     * @var AMQPQueue $queue
+     */
+    private $queue;
+
+    /**
+     * @return array
+     */
+    public function getConfig(): array
+    {
+        return $this->config;
+    }
+
+    /**
+     * @param array $config
+     */
+    public function setConfig(array $config)
+    {
+        $this->config = $config;
+    }
+
+    /**
+     * @return AMQPConnection
+     */
+    public function getConnection(): AMQPConnection
+    {
+        return $this->connection;
+    }
+
+    /**
+     * @param AMQPConnection $connection
+     */
+    public function setConnection(AMQPConnection $connection)
+    {
+        $this->connection = $connection;
+    }
+
+    /**
+     * @return AMQPChannel
+     */
+    public function getChannel(): AMQPChannel
+    {
+        return $this->channel;
+    }
+
+    /**
+     * @param AMQPChannel $channel
+     */
+    public function setChannel(AMQPChannel $channel)
+    {
+        $this->channel = $channel;
+    }
+
+    /**
+     * @return AMQPExchange
+     */
+    public function getExchange(): AMQPExchange
+    {
+        return $this->exchange;
+    }
+
+    /**
+     * @param AMQPExchange $exchange
+     */
+    public function setExchange(AMQPExchange $exchange)
+    {
+        $this->exchange = $exchange;
+    }
+
+    /**
+     * @return AMQPQueue
+     */
+    public function getQueue(): AMQPQueue
+    {
+        return $this->queue;
+    }
+
+    /**
+     * @param AMQPQueue $queue
+     */
+    public function setQueue(AMQPQueue $queue)
+    {
+        $this->queue = $queue;
+    }
+
+
+    protected static $instance = null;
+
+    public static function getInstance($singleton = true)
+    {
+        if ($singleton) {
+            if (is_null(self::$instance)) {
+                self::$instance = new self();
+            }
+            return self::$instance;
+        } else {
+            return new self();
+        }
+    }
+
+    public function __construct()
+    {
+        $this->connect();
+    }
+
+    /**
+     * 链接主机
+     * @return mixed
+     */
+    public function connect()
+    {
+        $connectConfig = config('queue.connections.rabbitmq');
+        $this->config = [
+            'host'      => $connectConfig['host'],
+            'port'      => $connectConfig['port'],
+            'login'     => $connectConfig['login'],
+            'password'  => $connectConfig['password'],
+            'vhost'     => $connectConfig['vhost'],
+            'heartbeat' => 10
+        ];
+
+        $this->connection = new AMQPConnection($this->config);
+        $this->connection->connect();
+        $this->channel = new AMQPChannel($this->connection);
+    }
+
+    /**
+     * 消费消息
+     * @param callable $callback 处理函数
+     * @return mixed
+     */
+    public function consume(callable $callback)
+    {
+        // TODO: Implement consume() method.
+        $this->queue->consume($callback);
+    }
+
+    /**
+     * 推送消息到mq
+     * @param string $message 消息体
+     * @param string $routingKey 路由键
+     * @return mixed
+     */
+    public function publish(string $message, string $routingKey)
+    {
+        // TODO: Implement publish() method.
+        $this->exchange->publish($message, $routingKey);
+    }
+
+    public function __destruct()
+    {
+        $this->channel->close();
+        $this->connection->disconnect();
+    }
+
+    /**
+     * 声明交换机
+     */
+    public function declareExchange($exchangeName, $exchangeType, $create = true)
+    {
+        $this->exchange = new AMQPExchange($this->channel);
+        $this->exchange->setName($exchangeName);
+        $this->exchange->setType($exchangeType);
+        $this->exchange->setFlags(AMQP_DURABLE);
+        if ($create) {
+            $this->exchange->declareExchange();
+        }
+    }
+
+
+    /**
+     * 申明队列
+     */
+    public function declareQueue($queueName, $routeKey)
+    {
+        if (!$queueName) {
+            throw new \Exception('Queue name must be set');
+        }
+        $this->queue = new AMQPQueue($this->channel);
+        $this->queue->setName($queueName);
+        $this->queue->setFlags(AMQP_DURABLE);
+        $this->queue->declareQueue();
+        $this->queue->bind($this->exchange->getName(), $routeKey);
+    }
+}

+ 39 - 4
src/MqTool.php

@@ -4,6 +4,9 @@
 namespace Tool\MayouTool;
 
 
+use Tool\MayouTool\Bean\Mq\ConsumeParamBean;
+use Tool\MayouTool\Exception\MqException;
+use Tool\MayouTool\Mq\RabbitMq;
 use Tool\MayouTool\Mq\MqService;
 
 class MqTool
@@ -13,9 +16,9 @@ class MqTool
      * @param $message 消息
      * @param $routeKey 路由键
      */
-    public static function pushLogMessage($message,$routeKey)
+    public static function pushLogMessage($message, $routeKey)
     {
-        MqService::pushLogMessage($message,$routeKey);
+        MqService::pushLogMessage($message, $routeKey);
     }
 
     /**
@@ -24,8 +27,40 @@ class MqTool
      * @param $exchange 交换机
      * @param $routeKey 路由键
      */
-    public static function pushMessage($message,$exchange,$routeKey)
+    public static function pushMessage($message, $exchange, $routeKey)
     {
-        MqService::pushMessage($message,$exchange,$routeKey);
+        MqService::pushMessage($message, $exchange, $routeKey);
+    }
+
+    /**
+     * 消费消息队列
+     * @param ConsumeParamBean $consumeParamBean
+     * @throws MqException
+     * @throws \AMQPChannelException
+     * @throws \AMQPConnectionException
+     */
+    public static function consume(ConsumeParamBean $consumeParamBean, RabbitMq $rabbitMq)
+    {
+        if ($AMQPEnvelope = $rabbitMq->getQueue()->get()) {
+            //消费消息
+            $consumeParamBean->getObject()->bootStart();
+            call_user_func_array($consumeParamBean->getCallback(), [ $AMQPEnvelope, $rabbitMq->getQueue() ]);
+            $consumeParamBean->getObject()->bootEnd();
+        } else {
+            sleep(1);
+        }
+    }
+
+    /**
+     * 向交换机发布消息
+     * @param mixed $exchangeName 交换机名称
+     * @param mixed $message 消息
+     * @param string $routeKey 路由键
+     */
+    public static function publish($message = '', $routeKey = '', $singleton = true)
+    {
+        $rabbitMq = RabbitMq::getInstance($singleton);
+        //推送消息
+        $rabbitMq->publish($message, $routeKey);
     }
 }