添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

场景:对接erp,内部后台每次生成数十万的兑换码,然后调用erp接口,向erp写入这些兑换码,并且erp只提供一个一个的写入,没有传一个json数组然后批量入库的,同时erp会返回写入结果,如果写入后台需要更新一下状态。如果使用传统的单进程方案,循环的调用接口写入,其效率是非常低的。简单的测试一下,用传统的单进程方案,写入一个兑换码大约需要0.2s(请求发起到响应时间),那么写入十万个大约需要5.5小时,如果是erp临时需要大量的兑换码使用,这么慢的速度是非常致命的。

本来想使用Swoole的Task来实现的,想想还要在服务器上安装许多扩展,最后还是算了。于是用了TP5官方的一个组件 think-queue

在传统的程序执行流程一般都是即时,串行,同步的。在某些场景下,会存在并发低,吞吐率低,响应时间长等问题。在大型应用中,一般会引入消息队列来提高应用的性能。

用了两个服务器,为了区分一下,就叫做A服务器和B服务器吧。都是1核2G1m的学生机

A服务器:部署队列
B服务器:模拟erp端的写入

一、部署消息队列以及模仿erp端写入

  1. 设置composer阿里云镜像
composer config -g repo.packagist composer https://mirrors.aliyun.com/composer/
  1. 安装TP5.1(这里使用的是 v5.1.40)
composer create-project topthink/think=5.1.*
  1. 安装think-queue(这里使用的是 v2.0.4)
composer require topthink/think-queue
  1. 生产者代码。releaseTaskQueue()为使用了队列的例子,releaseTaskOrdinary()则是使用了循环发起请求写入的代码。主要是想比较一下两个方法的性能如何
namespace app\client\controller; use GuzzleHttp\Client; use think\Queue; class Produce public function releaseTaskQueue() for($i=0;$i<5000;$i++){ $content = ['timestamp' => time(),'random_str' => uniqid()]; Queue::push('app\client\controller\Consume',$content,'SendJob'); public function releaseTaskOrdinary() for($i=0;$i<5000;$i++){ $content = ['timestamp' => time(),'random_str' => uniqid()]; (new Client())->post('http://106.52.157.244/',[ 'form_params' => [ 'timestamp' => $content['timestamp'], 'random_str' => $content['random_str'],
  1. 消费者代码
namespace app\client\controller; use GuzzleHttp\Client; use think\queue\Job; class Consume public function fire(Job $job, $data) //有的时候,消息到达消费者之前,可能已经不需要再执行了。 //例如应用中做一个短信推送的功能,用户有权拒绝/接受推送。如果用户在消息发布之后拒绝了推送,那么认为该消息不再需要执行 if(!$this->getPushStatus()){ //如果用户拒绝推送,从队列中移除该任务 print '用户拒绝推送,从队列中移除该任务'.PHP_EOL; $job->delete(); return true; if($this->send($data)){ //如果对端已经成功插入该数据,从队列中移除该任务 print '对端已经成功插入该数据,从队列中移除该任务'.PHP_EOL; $job->delete(); return true; //如果做大型的应用,用户接收的推送短信可能不只一条,如果多次发送可能会触发短信平台的防盗刷功能 //这里可以判断一下短信平台的响应码,延迟发送 if(!$this->enoughReceive()){ //$delay为延迟时间,表示该任务延迟60秒后再执行 print '延迟执行该任务'.PHP_EOL; $job->release(60); return true; //还可以获取任务重试的次数,如果重试次数大于3次,从队列中移除该任务 if($job->attempts() > 3){ print '任务重试的次数>3,从队列中移除该任务'.PHP_EOL; $job->delete(); return true; * 向erp端发起请求,消费队列中的消息 * @param $data ['timestamp' => time(),'random_str' => uniqid()] * @return int 如果插入成功,返回1 否则返回0 private function send($data) $response = (new Client())->post('http://106.52.157.244/',[ 'form_params' => [ 'timestamp' => $data['timestamp'], 'random_str' => $data['random_str'], $response = json_decode($response->getBody()->getContents(),true); dump($response); return (isset($response['status']) && $response['status'] == 1) ? 1 : 0; * 判断用户是否可以接收短信 private function enoughReceive() //to do ... return true; * 模拟获取用户的短信订阅状态 private function getPushStatus() //to do... return true;
  1. 配置文件。TP官方一共内置了RedisDatabaseTopthinkSync这四种驱动,优先使用Redis,毕竟很快
/* 消息队列配置 */ return [ 'connector' => 'Redis', // Redis 驱动 'expire' => 60, // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null 'default' => 'default', // 默认的队列名称 'host' => '127.0.0.1', // redis 主机ip 'port' => 6379, // redis 端口 'password' => 'luoss,,', // redis 密码 'select' => 5, // 使用哪一个 db,默认为 db0 'timeout' => 0, // redis连接的超时时间 'persistent' => false, // 是否是长连接
  1. erp端模仿写入代码
$pdo = new PDO("mysql:host=localhost;dbname=testqueue_com",'testqueue_com',''); $pdo->exec('set names utf8'); $now_time = time(); $sql = "INSERT test_queue (timestamp,random_str,create_time) VALUES ('{$_POST['timestamp']}','{$_POST['random_str']}','$now_time')"; $pdo->exec($sql); }catch (Exception $e){ die('操作失败'.$e->getMessage()); $pdo = null;
CREATE TABLE `testqueue_com`.`test_queue` (
`id` int(11) UNSIGNED NOT NULL AUTO_INCREMENT,
`timestamp` int(11) UNSIGNED NOT NULL DEFAULT 0,
`random_str` varchar(35) NOT NULL DEFAULT '',
PRIMARY KEY (`id`)
  1. 使用Supervisor添加一个守护进程,进程数量为10,启动命令
php think queue:work --daemon --queue SendJob
  1. 先后执行releaseTaskQueue()和releaseTaskOrdinary()。在多进程的加持下,队列插入5000条数据花费了20秒,而普通插入花费20秒时间却只插入了900条数据。

    在生产环境2核4G服务器,8个进程的加持下。每秒可以写入800条左右的数据,负载15%左右,如果进程数多一点会更快。

    二、详细介绍

    1. 消息和队列的保存方式

      image.png
      共有三个key

      • queues:SendJob 类型为 List 列表,表示待执行的任务列表

      • queues:SendJob:delayed 类型为 Sorted Set 有序集合,表示延迟、定时执行的任务列表

      • queues:SendJob:reserved 类型为 Sorted Set 有序集合,表示执行中的任务列表

        Redis驱动下为了实现任务的延迟执行和重发,任务将在这三个key之间来回移动

    2. 命令
      • 2.1 命令模式

        • queue:subscribe
        • queue:listen
          listen 命令: 该命令将会启动一个 listen 进程 ,然后由 listen 进程通过 proc_open('php think queue:work --queue="%s" --delay=%s --memory=%s --sleep=%s --tries=%s') 的方式来周期性地创建一次性的 work 进程来消费消息队列, 并且限制该 work 进程的执行时间, 同时通过管道来监听 work 进程的输出。
          php think queue:listen --queue SendJob
        • queue:work
          work 命令: 该命令将启动一个 work 进程来处理消息队列。
          php think queue:work --queue SendJob
      • 2.2 命令行参数

        • Work 模式

          php think queue:work \
          --daemon            //是否循环执行,如果不加该参数,则该命令处理完下一个消息就退出
          --queue  helloJobQueue  //要处理的队列的名称
          --delay  0 \        //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
          --force  \          //系统处于维护状态时是否仍然处理任务,并未找到相关说明
          --memory 128 \      //该进程允许使用的内存上限,以 M 为单位
          --sleep  3 \        //如果队列中无任务,则sleep多少秒后重新检查(work+daemon模式)或者退出(listen或非daemon模式)
          --tries  2          //如果任务已经超过尝试次数上限,则触发‘任务尝试次数超限’事件,默认为0
          
        • Listen 模式

          php think queue:listen
          --queue  helloJobQueue \   //监听的队列的名称
          --delay  0 \         //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
          --memory 128 \       //该进程允许使用的内存上限,以 M 为单位
          --sleep  3 \         //如果队列中无任务,则多长时间后重新检查
          --tries  0 \         //如果任务已经超过重发次数上限,则进入失败处理逻辑,默认为0
          --timeout 60         // work 进程允许执行的最长时间,以秒为单位
          
      • 2.3 work 模式和 listen 模式的区别

        两者都可以用于处理消息队列中的任务

        • 2.3.1 执行原理不同
          • work 命令是单进程的处理模式。按照是否设置了 --daemon 参数,work命令又可分为单次执行和循环执行两种模式。
            • 单次执行:不添加 --daemon参数,该模式下,work进程在处理完下一个消息后直接结束当前进程。当队列为空时,会sleep一段时间然后退出。
            • 循环执行:添加了 --daemon参数,该模式下,work进程会循环地处理队列中的消息,直到内存超出参数配置才结束进程。当队列为空时,会在每次循环中sleep一段时间。
          • listen 命令是 双进程 + 管道 的处理模式。listen命令所在的进程会循环地创建 单次执行模式的 work 进程,每次创建的 work 进程只消费一个消息就会结束, 然后 listen 进程再创建一个新的 work 进程
            • listen 进程会定时检查当前的 work 进程执行时间是否超过了 --timeout 参数的值, 如果已超时, 则 listen 进程会 kill 掉 work 进程, 然后抛出异常
            • listen 进程会通过管道来监听当前的 work 进程的输出, 当 work 进程有输出时, listen 进程会将输出写入到 stdout / stderr
            • listen 进程会定时通过 proc_get_status() 来监控当前的 work 进程是否仍在运行, work 进程消费完一个任务之后, work 进程就结束了,其状态会变成 terminated, 此时 listen 进程就会重新创建一个新的 work 进程并对其计时, 新的 work 进程开始消费下一个任务
        • 2.3.2 结束时机不同
          • work 命令的结束时机在上面的执行原理部分已叙述,此处不再重复,listen 命令中,listen 进程和 work 进程会在以下情况下结束:
            • listen 进程会定时检查当前的 work 进程的执行时间是否超过了 --timeout 参数的值,如果已超时, 此时 listen 进程会先 kill 掉当前的 work 进程, 然后抛出一个 ProcessTimeoutException 异常并结束 listen 进程
            • listen 进程会定时检查自身使用的内存是否超过了 --memory 参数的值,如果已超过, 此时 listen 进程会直接 die 掉, work 进程也会自动结束.
        • 2.3.3 性能不同
          • work 命令是在脚本内部做循环,框架脚本在命令执行的初期就已加载完毕;
            • 而listen模式则是处理完一个任务之后新开一个work进程,此时会重新加载框架脚本。
              因此: work 模式的性能会比listen模式高
              注意:当代码有更新时,work 模式下需要手动去执行 php think queue:restart 命令重启队列来使改动生效;而listen 模式会自动生效,无需其他操作。
        • 2.3.4 超时控制能力
          • work 模式本质上既不能控制进程自身的运行时间,也无法限制执行中的任务的执行时间。work 模式下的超时控制能力,实际上应该理解为 多个work 进程配合下的过期任务重发能力。
          • 而 listen 命令可以限制 listen 进程创建的 work 进程的最大执行时间。
            listen 命令可通过 --timeout 参数限制 work 进程允许运行的最长时间,超过该时间限制后, work 进程会被强制 kill 掉, listen 进程本身也会抛出异常并结束;
          • 这里有必要补充一下 expire 和 timeout 之间的区别:
            • expire 在配置文件中设置,timeout 在 listen命令 的命令行参数中设置,而且,expire 和 timeout 是两个不同层次上的概念:
            • expire 是指任务的过期时间。这个时间是全局的,影响到所有的work进程。(不管是独立的work命令还是 listen 模式下创建的的 work 进程) 。expire 针对的对象是 任务
            • timeout 是指 work 进程的超时时间。这个时间只对当前执行的 listen 命令有效。timeout 针对的对象是 work 进程
        • 2.3.5 使用场景不同
          • work 命令的适用场景是
            • 任务数量较多
            • 性能要求较高
            • 任务的执行时间较短
            • 消费者类中不存在死循环,sleep() ,exit() ,die() 等容易导致bug的逻辑
          • listen命令的适用场景是
            • 任务数量较少
            • 任务的执行时间较长(如生成大型的excel报表等),
            • 任务的执行时间需要有严格限制
    3. 如何进行多任务处理?

      在以上的推送中,有时候不仅仅只是推送短信,也有可能推送邮件等,那么think-queue怎么进行多任务处理呢?
      生产者代码

      <?php
      namespace app\client\controller;
      use GuzzleHttp\Client;
      use think\facade\Request;
      use think\Queue;
      class Produce
          public function releaseTaskQueue()
              $deal_type = Request::instance()->get('deal_type');
              switch ($deal_type){
                  case 'send_sms' :
                      $content = ['timestamp' => time(),'random_str' => uniqid(),'task_name' => 'task_a'];
                      Queue::push('app\client\controller\Consume@send_sms',$content,'SendJob');
                      break;
                  case 'send_email' :
                      $content = ['timestamp' => time(),'random_str' => uniqid(),'task_name' => 'task_b'];
                      Queue::push('app\client\controller\Consume@send_email',$content,'SendJob');
                      break;
      

      消费者代码

      <?php
      namespace app\client\controller;
      use GuzzleHttp\Client;
      use think\queue\Job;
      class Consume
          public function send_sms(Job $job, $data)
              print '发送短信中'.PHP_EOL;
          public function send_email(Job $job, $data)
              print '发送邮件中'.PHP_EOL;
      

      只需要使用 任务类名@方法名 就可以了

    4. 消息执行时机的使用

      //即时执行消息
      Queue::push('app\client\controller\Consume',['date' => date('Y-m-d H:i:s'),'point' => 1],'SendJob');
      //延迟10s之后执行,即时消息马上执行,10s后延迟消息执行
      Queue::later(10,'app\client\controller\Consume',['date' => date('Y-m-d H:i:s'),'point' => 2],'SendJob');
      
    5. 消息的重发

      • 手动重发,可以在消费者中使用
        $job->release()
        如果release() 可以提供一个延迟的 数,如果没有提供,表示立即进行重发
      • worker进程自动重发,需要满足两个条件
        • 消费者fire()方法有代码异常
        • 任务未使用 $job->delete() 删除
      • 如果在配置文件中,配置了 expire 不为空,则worker进程每次查询剩下的任务之前,会自动重发已过期的任务
        • 查看任务的执行次数
          上面提及到消息队列保存的方式共有三个 Redis Key ,可以在 redis-cli 中使用
          zrange queues:SendJob:reserved 0 -1
          zrange queues:SendJob:delayed 0 -1
          lrange queues:SendJob 0 -1
          查看正在执行,待执行,延迟执行的任务列表,在返回的json数据中,有一个值为 attempts ,它代表该任务重发的次数

          [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-17L3kjdk-1618886517094)(https://www.sakuraluo.com/usr/uploads/2020/12/479004851.png)]

    6. 处理任务的失败回调

      public function failed($data)
          dump('failed');
          dump($data);
      

      需要关注的是,使用 php think queue:work --daemon --tries 2 --queue SendJob 命令运行队列,如果某个任务的重试次数大于 --tries 那么系统将自动删除该任务,该写法 failed()并没有提供 $job 对象,重发受限。

    三、使用队列过程中遇到的问题和疑问

    1. 启动了队列,并且消息可以成功入队,也消费了。但是第二天再看,消息可以入队,但是没有消费,查看进程进程也都还在。是怎么回事呢?
      • 主要的原因是因为fire里面用到了数据库(版本mysql 5.7.32-log ),对接erp会有一个写入状态返回,需要更新写入状态到后台。而mysql有一个连接缓存时间,如果代码中没有实现断线重连,那么就会出现这个成功入队但是没有消费的问题。
        在mysql中,查看链接缓存时间的命令是
    SHOW GLOBAL VARIABLES LIKE 'wait_timeout';
    SHOW GLOBAL VARIABLES LIKE '%timeout%;
    

    image.png
    这里返回的是 28800s ,即为8小时。在TP5中,只需要在数据库配置文件中配置 break_reconnect" => true 即可以实现断线重连

    变量名称解析
    connect_timeoutmysql客户端在尝试与mysql服务器建立连接时,mysql服务器返回错误握手协议前等待客户端数据包的最大时限。
    wait_timeout负责超时控制的变量,其时间为长度为28800s,就是8个小时,那么就是说MySQL的服务会在操作间隔8小时后断开,需要再次重连
    lock_wait_timeoutsql语句请求元数据锁的最长等待时间,默认为一年。此锁超时对于隐式访问Mysql库中系统表的sql语句无效,但是对于使用select,update语句直接访问mysql库中标的sql语句有效
    net_read_timeout / net_write_timeoutmysql服务器端等待从客户端读取数据 / 向客户端写入数据的最大时限
    slave_net_timeoutmysql从复制连结等待读取数据的最大时限
    1. 为什么使用了多进程,任务却不会重复的消费?

      • 主要原因是因为获取一个待消费的任务是调用了 Redis::pop() ,由于 pop() 方法是原子性的,多次进程同时到达也是分先后的,所以不会得到重复的消费任务
    2. 消息可以成功入队,但是在 fire() 开始打印任意东西,屏幕不会输出?

      • 可能是 Queue::push() 或者 Queue::later() 中的命名空间写错了
    3. 有以下代码
      生产者

      <?php
      namespace app\client\controller;
      use think\Queue;
      class Produce
          public function releaseTaskQueue()
               //即时执行消息
              Queue::push('app\client\controller\Consume',['date' => date('Y-m-d H:i:s'),'point' => 1],'SendJob');
      

      消费者

      <?php
      namespace app\client\controller;
      use think\Exception;
      use think\queue\Job;
      class Consume
          public function fire(Job $job, $data)
              print 'hello!'.PHP_EOL;
              //该行代码用户模拟有业务代码发生了致命的异常
              throw new \Exception('error');
      

      使用的是 php think queue:work --daemon --queue SendJob 运行队列,这个队列会错误的。在以上的 消息重发 - worker进程的自动重发提及到,如果fire()抛出异常且没有删除任务(任务可能是没有删除的,因为异常是意向不到的),worker进程就会进行自动重发,此时代码就会进入了死循环,并且不能停止。除非队列超过了设置的内存或者被kill

      • 解决方案一:
        • 可以在代码中捕获异常,进一步处理
          <?php
          namespace app\client\controller;
          use think\queue\Job;
          class Consume
              public function fire(Job $job, $data)
                  print 'hello!'.PHP_EOL;
                  //该行代码用户模拟有业务代码发生了致命的异常
                  try {
                      throw new \Exception('error');
                  }catch (\Exception $e){
                      echo $e->getMessage();
                      $job->delete();
                                              由Redis支持的一个简单,快速,健壮的Node.js作业/任务队列。
           简单:〜1000 LOC,且依赖性最小。
           快速:通过最小化Redis和网络开销来最大化吞吐量。 很好。
           稳健:在设计时考虑了并发性,原子性和失败性; 完整的代码覆盖率。
           const Queue = require ( 'bee-queue' ) ;
          const queue = new Queue ( 'example' ) ;
          const job = queue . createJob ( { x : 2 , y : 3 } ) ;
          job . save ( ) ;
          job . on ( 'succeeded' , ( result ) => {
            console . log ( `Received result for job ${ job . id } : ${ result } ` ) ;
          } ) ;
                                              Redis队列
          需要redis gem。
           添加Redis :: Queue类,该类可用作基于Redis的Distributed-QueueRedis通常用作消息传递服务器,以实现对后台作业或其他类型的消息传递任务的处理。 它实现了此处描述的可靠队列模式:  ://redis.io/commands/rpoplpush。
           $ gem install redis-queue
           $ bundle install
          $ rake
           require "redis-queue"
          redis = Redis . new
          queue = Redis :: Queue . new ( 'q_test' , 'bp_q_test' ,  :redis => redis )
          #Adding some elements
          queue . push "b" 
          queue << "a" # 
                                              在最开始的时候我们学习到 RabbitMQ 分发消息采用的轮训分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是。刚刚我们已经了解到,当消息发送给了消费者,但是消费者在消费的过程中发生异常时,如何保证消息的不丢失。
                                              前言分析之前请大家务必了解消息队列的实现如果不了解请先阅读下:有赞消息队列设计去哪儿网消息队列设计tp5消息队列是基于database redis 和tp官方自己实现的 Topthink本章是围绕redis来做分析存储key:key类型描述queues:queueNamelist要执行的任务think:queue:restartstring重启队列时间戳queues:queueName:dela...
                                              thinkPHP集成workman扩展
          workerman是一个高性能的PHP socket 服务器框架,workerman基于PHP多进程以及libevent事件轮询库。
          The “topthink/think-installer” plugin was skipped because it requires a Plugin API version ("^1.0") that does not match your Composer installation (“2.2.0”). You may
          think-queue是ThinkPHP官方提供的一个消息队列服务,是专门支持队列服务的扩展包。think-queue消息队列适用于大并发或返回结果时间比较长且需要批量操作的第三方接口,可用于短信发送、邮件发送、APP推送。think...