Laravel 队列使用

1 环境

Laravel是一种类似ThinkPHP的php框架,封装的诸多功能可以很方便的使用。队列Queue便是其中之一。
Windows环境下,可使用PHPstorm作为Laravel的集成开发环境IDE。

Laravel可配置多种队列驱动,包括 "sync", "database", "beanstalkd", "sqs", "redis", "null"(具体参见app/config/queue.php)
其中sync为同步,database为使用数据库,后面三种为第三方队列服务,最后一种为不使用队列。
通过在 .env 中的 QUEUE_CONNECTION 选项,来决定选择何种驱动。
如 QUEUE_CONNECTION=database 即为选择数据库驱动队列。

所谓队列,会有数据的生产者和消费者之分。生产者向队列中投递数据,消费者从队列中获取数据。
比如向用户发送邮件的场景:现在有10w封邮件需要发送,最简单的,我们需要有一个方法将邮件的收件人、内容等,拆分成10w条任务放在队列中,同时需要设置一个回调方法负责处理每条任务。当队列中有邮件发送任务时,队列会主动调用回调方法,并传递任务详情进去。回调方法处理完成后,单条邮件即发送完毕。其他邮件依样处理。

4 使用数据库驱动队列

4.1 生成任务表

在终端下输入

php artisan queue:table
php artisan migrate

在数据库连接正常的情况下,会在数据库中出现jobs表:

  [id] bigint  
  [queue] nvarchar(255) 
  [payload] nvarchar(max) 
  [attempts] tinyint 
  [reserved_at] int  
  [available_at] int  
  [created_at] int  

4.2 创建任务类

php artisan make:job SendEmail

在终端内执行上述命令,会自动生成 app/Jobs/SendMail.php 文件

class SendMail implements ShouldQueue

在该文件的handle方法中,可以放置任务处理逻辑。

4.3 发送任务

在任意位置,均可像下面一样调用 dispatch 发送任务

SendMail::dispatch($email);

4.4 驱动队列

完成上述步骤后,可以在数据库中发现一条记录(导出为insert SQL语句):

INSERT INTO [jobs]([id], [queue], [payload], [attempts], [reserved_at], [available_at], [created_at]) VALUES (6, N'default', N'{"displayName":"App\\Jobs\\ProcessPodcast","job":"Illuminate\\Queue\\CallQueuedHandler@call","maxTries":null,"timeout":null,"timeoutAt":null,"data":{"commandName":"App\\Jobs\\ProcessPodcast","command":"O:23:\"App\\Jobs\\ProcessPodcast\":8:{s:29:\"\u0000App\\Jobs\\ProcessPodcast\u0000data\";s:6:\"111222\";s:6:\"\u0000*\u0000job\";N;s:10:\"connection\";N;s:5:\"queue\";N;s:15:\"chainConnection\";N;s:10:\"chainQueue\";N;s:5:\"delay\";N;s:7:\"chained\";a:0:{}}"}}', 0, NULL, 1545980176, 1545980176);

此时任务已经放置在数据库内,只有将队列运行起来后,队列才能主动调用回调方法。

php artisan queue:work

在终端内运行上述命令即可。该命令还有诸多参数,如deamon、tries等,可根据需要指定。

4.5 守护进程

为了保证应用服务的稳定性,需要开启守护进程。
Linux下,一般使用 Supervisor ,Windows下使用 Forever,可参考 这里.

4.6 执行失败的处理

对于处理失败的任务,Laravel也提供的解决方案。通过运行如下命令,即可创建表以记录失败任务。

php artisan queue:failed-table
php artisan migrate

在数据库中即生成 failed_jobs :

  [id] bigint
  [connection] nvarchar(max)
  [queue] nvarchar(max) 
  [payload] nvarchar(max) 
  [exception] nvarchar(max) 
  [failed_at] datetime

导致任务失败的 Exception 会被传递到 SendMail 的 failed 方法,因而你需要在SendMail中自行实现该方法,并做进一步处理。
任务执行失败的原因有很多,如传参错误、尝试次数超过限制、超时、甚至在 handle 方法中抛出异常,均会作为失败任务处理。

4.7 任务执行前后的处理

Laravel提供了任务执行前后的处理入口,即在 App/Providers/AppServiceProvider 中的 boot() 中加入如下代码:

public function boot()
      Queue::before( function (JobProcessing $event) {
          Log::info("处理任务前");
      Queue::after( function (JobProcessed $event) {
          Log::info("处理任务后");

传递的 $event 中,带有任务详情,几个简单的例子:

$event->connectionName
$event->job
$event->job->payload()

5 使用 Redis 驱动队列

5.1 Laravel 安装 Predis 包

在 Laravel 中使用 Redis 之前,需要通过 Composer 安装 predis/predis 包:

composer require predis/predis

上述拓展是帮助Laravel与Redis打交道的,我们现在还缺少Redis服务。
如果此时将 .env 中的 QUEUE_CONNECTION 改为 redis,访问时会报错:

Predis \ Connection \ ConnectionException (10061)
����Ŀ����������ܾ����޷����ӡ� [tcp://127.0.0.1:6379]

5.2 配置 Redis 服务

Redis官网 下载源码后自行编译即可。
官方并未提供Windows版,Redis的Windows版式由微软工作组维护的,你可以从其 GitHub页 找到。不过貌似已经不再维护了,最新的版本是16年发布的3.2.100。
Linux下通过简单的运行

./redis-server

即可开启服务,再通过

./redis-cli

来尝试使用Redis。使用也很简单,就是 set key value 和 get key。
Windows下安装后,在命令行中 cd 到安装目录

C:\Program Files\Redis>redis-server redis.windows.conf
C:\Program Files\Redis>netstat -an|find "6379"
  TCP    127.0.0.1:6379         0.0.0.0:0              LISTENING