一起聊聊thinkphp6使用think-queue实现普通队列和延迟队列
时间:2022-04-20 [网络编程]作者:fabuyuan 浏览:6 次
###TP6 队列
TP6 中使用 think-queue 可以实现普通队列和延迟队列。
think-queue 是thinkphp 网提供的一个消息队列服务,它支持消息队列的一些基本特性:
- 消息的发布,获取,执行,删除,重发,失败处理,延迟执行,超时控制等
- 队列的多队列, 内存限制 ,启动,停止,守护等
- 消息队列可降级为同步执行
消息队列实现过程
1、通过生产者推送消息到消息队列服务中
2、消息队列服务将收到的消息存入redis队列中(zset)
3、消费者进行监听队列,当监听到队列有新的消息时,获取队列第一条
4、处理获取下来的消息调用业务类进行处理相关业务
5、业务处理后,需要从队列中删除消息
composer 安装 think-queue
composer require topthink/think-queue
配置文件
安装完 think-queue 后会在 config 目录中生成 queue.php,这个文件是队列的配置文件。
tp6中提供了多种消息队列的实现方式,默认使用sync,我这里选择使用Redis。
return [ 'default' => 'redis', 'connections' => [ 'sync' => [ 'type' => 'sync', ], 'database' => [ 'type' => 'database', 'queue' => 'default', 'table' => 'jobs', 'connection' => null, ], 'redis' => [ 'type' => 'redis', 'queue' => 'default', 'host' => env('redis.host', '127.0.0.1'), 'port' => env('redis.port', '6379'), 'password' => env('redis.password','123456'), 'select' => 0, 'timeout' => 0, 'persistent' => false, ], ], 'failed' => [ 'type' => 'none', 'table' => 'failed_jobs', ], ];
创建目录及队列消费类文件
在 app 目录下创建 queue 目录,然后在该目录下新建一个抽象类 Queue.php 文件,作为基础类
<?phpnamespace app\queue;use think\facade\Cache;use think\queue\Job;use think\facade\Log;/** * Class Queue 队列消费基础类 * @package app\queue */abstract class Queue{ /** * @describe:fire是消息队列默认调用的方法 * @param \think\queue\Job $job * @param $message */ public function fire(Job $job, $data) { if (empty($data)) { Log::error(sprintf('[%s][%s] 队列无消息', __CLASS__, __FUNCTION__)); return ; } $jobId = $job->getJobId(); // 队列的数据库id或者redis key // $jobClassName = $job->getName(); // 队列对象类 // $queueName = $job->getQueue(); // 队列名称 // 如果已经执行中或者执行完成就不再执行了 if (!$this->checkJob($jobId, $data)) { $job->delete(); Cache::store('redis')->delete($jobId); return ; } // 执行业务处理 if ($this->execute($data)) { Log::record(sprintf('[%s][%s] 队列执行成功', __CLASS__, __FUNCTION__)); $job->delete(); // 任务执行成功后删除 Cache::store('redis')->delete($jobId); // 删除redis中的缓存 } else { // 检查任务重试次数 if ($job->attempts() > 3) { Log::error(sprintf('[%s][%s] 队列执行重试次数超过3次,执行失败', __CLASS__, __FUNCTION__)); // 第1种处理方式:重新发布任务,该任务延迟10秒后再执行;也可以不指定秒数立即执行 //$job->release(10); // 第2种处理方式:原任务的基础上1分钟执行一次并增加尝试次数 //$job->failed(); // 第3种处理方式:删除任务 $job->delete(); // 任务执行后删除 Cache::store('redis')->delete($jobId); // 删除redis中的缓存 } } } /** * 消息在到达消费者时可能已经不需要执行了 * @param string $jobId * @param $message * @return bool 任务执行的结果 * @throws \Psr\SimpleCache\InvalidArgumentException */ protected function checkJob(string $jobId, $message): bool { // 查询redis $data = Cache::store('redis')->get($jobId); if (!empty($data)) { return false; } Cache::store('redis')->set($jobId, $message); return true; } /** * @describe: 根据消息中的数据进行实际的业务处理 * @param $data 数据 * @return bool 返回结果 */ abstract protected function execute($data): bool;}
所有真正的消费类继承基础抽象类
<?phpnamespace app\queue\test;use app\queue\Queue;class Test extends Queue{ protected function execute($data): bool { // 具体消费业务逻辑 }}
生产者逻辑
use think\facade\Queue; // 普通队列生成调用方式 Queue::push($job, $data, $queueName); // 例: Queue::push(Test::class, $data, $queueName); // 延时队列生成调用方式 Queue::later($delay, $job, $data, $queueName); // 例如使用延时队列 10 秒后执行: Queue::later(10 , Test::class, $data, $queueName);
开启进程监听任务并执行
php think queue:listen php think queue:work
命令模式介绍
命令模式
queue:work 命令
work 命令: 该命令将启动一个 work 进程来处理消息队列。
php think queue:work --queue TestQueue
queue:listen 命令
listen 命令: 该命令将会创建一个 listen 父进程 ,然后由父进程通过
proc_open(‘php think queue:work’)
的方式来创建一个work 子 进程来处理消息队列,且限制该work进程的执行时间。php think queue:listen --queue TestQueue
命令行参数
Work 模式
php think queue:work \ --daemon //是否循环执行,如果不加该参数,则该命令处理完下一个消息就退出 --queue helloJobQueue //要处理的队列的名称 --delay 0 \ //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0 --force \ //系统处于维护状态时是否仍然处理任务,并未找到相关说明 --memory 128 \ //该进程允许使用的内存上限,以 M 为单位 --sleep 3 \ //如果队列中无任务,则sleep多少秒后重新检查(work+daemon模式)或者退出(listen或非daemon模式) --tries 2 //如果任务已经超过尝试次数上限,则触发‘任务尝试次数超限’事件,默认为0
Listen 模式
php think queue:listen \ --queue helloJobQueue \ //监听的队列的名称 --delay 0 \ //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0 --memory 128 \ //该进程允许使用的内存上限,以 M 为单位 --sleep 3 \ //如果队列中无任务,则多长时间后重新检查,daemon模式下有效 --tries 0 \ //如果任务已经超过重发次数上限,则进入失败处理逻辑,默认为0 --timeout 60 //创建的work子进程的允许执行的最长时间,以秒为单位
可以看到 listen 模式下,不包含
--deamon
参数,原因下面会说明消息队列的开始,停止与重启
开始一个消息队列:
php think queue:work
停止所有的消息队列:
php think queue:restart
重启所有的消息队列:
php think queue:restart php think queue:work
推荐学习:《PHP视频教程》
以上就是一起聊聊thinkphp6使用think-queue实现普通队列和延迟队列的详细内容,更多请关注站长家园其它相关文章!
本文标签: thinkphp
转载请注明来源:一起聊聊thinkphp6使用think-queue实现普通队列和延迟队列
本文永久链接地址:https://www.adminjie.com/post/11349.html
免责声明:
本站所发布的一切资源仅限用于学习和研究目的;不得将上述内容用于商业或者非法用途,否则,一切后果请用户自负。本站信息来自网络,版权争议与本站无关。您必须在下载后的24个小时之内,从您的电脑中彻底删除上述内容。如果您喜欢该程序,请支持正版软件,购买注册,得到更好的正版服务。
附:
二○○二年一月一日《计算机软件保护条例》第十七条规定:为了学习和研究软件内含的设计思想和原理,通过安装、显示、传输或者存储软件等方式使用软件的,可以不经软件著作权人许可,不向其支付报酬!鉴于此,也希望大家按此说明研究软件!
版权声明:
一、本站致力于为软件爱好者提供国内外软件开发技术和软件共享,着力为用户提供优资资源。
二、本站提供的部分源码下载文件为网络共享资源,请于下载后的24小时内删除。如需体验更多乐趣,还请支持正版。
三、我站提供用户下载的所有内容均转自互联网。如有内容侵犯您的版权或其他利益的,若有侵犯你的权益请:提交版权证明文件到邮箱 2225329873#qq.com(#换为@) 站长会进行审查之后,情况属实的会在三个工作日内为您删除。
更多精彩内容
- VUE中V-IF条件判断改变元素的样式操作
- Discuz如何解决安装时报错run_sql_error
- 低版本VS项目在VS2019无法正常编译的问题
- PHP+Redis链表解决高并发下商品超卖问题(实现原理及步骤)
- Oracle数据库的实例/表空间/用户/表之间关系简单讲解
- RSA2是啥?PHP-RSA2签名验证怎么实现?
- 华为dubal20是什么型号
- ana an00华为是什么型号
- html5的标题标记一共有几个等级
- 电脑显示信号线无连接是什么意思
- html5中onclick是什么意思
- app是什么应用程序的简称
- 小程序大小超限除了分包还能怎么做?如何避免和解决大小限制?
- angular与bootstrap的区别是什么
- vivov1818a是什么手机型号

- 最新文章
-
-
jquery移动端库有哪些
jquery移动端库有:1、Mobiscroll,用于触屏设备的旋转滚动、日期和时间选择;2、“Ion.Sound”,用于即时声音通知;3、mmenu,用于创建...
-
oracle怎么修改sequence
方法:1、用DROPSEQUENCE语句删除sequence,然后用Createsequence语句重新创建一个;2、用IncrementBy修改序列初始...
-
什么是oracle表分区
在oracle中,表分区指的是当表中的数据不断增大时,将表中的数据在物理上存放到多个表空间,也即将表进行分区;表分区能够将表、索引或索引组织表进一步细分为段,这...
-
oracle怎么解决1067错误
解决方法:1、进入“$NETWORKADMIN”将“listener.ora”文件中的IP地址信息修改为正确的;2、利用NetManager,在本地的监听程序...
-
jquery有哪些基础事件方法
基础事件方法:1、click(),设置鼠标单击事件;2、dblclick(),设置鼠标双击事件;3、change(),设置内容改变事件;4、focus(),设置...
-
- 热门文章
-
-
VUE中V-IF条件判断改变元素的样式操作
这篇文章主要介绍了VUE中V-IF条件判断改变元素的样式操作,具有很好的参考价值,希望对大家有所帮助。一起跟随想过来看看吧...
-
Discuz如何解决安装时报错run_sql_error
问题环境VMware虚拟机Centos7.3PHP7.0MySQL8.0NGINX1.14Discuz3.4问题还原本地环境为PHP5.6+MySQL5.6在安...
-
低版本VS项目在VS2019无法正常编译的问题
低版本VS项目在VS2019无法正常编译的问题这里指的编译并不准确,只是为了方便说明。后有(未安装),201?...
-
PHP+Redis链表解决高并发下商品超卖问题(实现原理及步骤)
实现原理使用redis链表来做,因为pop操作是原子的,即使有很多用户同时到达,也是依次执行,推荐使用。实现步骤第一步,先将商品库存入队列/**.trigge...
-
Oracle数据库的实例/表空间/用户/表之间关系简单讲解
完整的Oracle数据库通常由两部分组成:Oracle数据库和数据库实例。Oracle是一种数据库管理系统,是一种关系型的数据库管理系统。我们用这些高级权限账号...
-