返回顶部
首页 > 资讯 > 精选 >think-queue的示例分析
  • 750
分享到

think-queue的示例分析

2023-06-20 17:06:14 750人浏览 泡泡鱼
摘要

这篇文章主要介绍了think-queue的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。think-queue 解析前言分析之前请大家务必了解消息队列的实现tp5的消

这篇文章主要介绍了think-queue的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。

前言

分析之前请大家务必了解消息队列的实现

tp5的消息队列是基于database redis 和tp官方自己实现的 Topthink
本章是围绕Redis来做分析

存储key:

key类型描述
queues:queueNamelist要执行的任务
think:queue:restartstring重启队列时间戳
queues:queueName:delayedzSet延迟任务
queues:queueName:reservedzSet执行失败,等待重新执行

执行命令

work和listen的区别在下面会解释
命令描述
PHP think queue:work监听队列
php think queue:listen监听队列
php think queue:restart重启队列
php think queue:subscribe暂无,可能是保留的 官方有什么其他想法但是还没实现

行为标签

标签描述
worker_daemon_start守护进程开启
worker_memory_exceeded内存超出
worker_queue_restart重启守护进程
worker_before_process任务开始执行之前
worker_before_sleep任务延迟执行
queue_failed任务执行失败

命令参数

参数默认值可以使用的模式描述
queuenullwork,listen要执行的任务名称
daemonnullwork以守护进程执行任务
delay0work,listen失败后重新执行的时间
forcenullwork失败后重新执行的时间
memory128Mwork,listen限制最大内存
sleep3work,listen没有任务的时候等待的时间
tries0work,listen任务失败后最大尝试次数

模式区别

执行原理不同
work: 单进程的处理模式;
无 daemon 参数 work进程在处理完下一个消息后直接结束当前进程。当不存在新消息时,会sleep一段时间然后退出;
有 daemon 参数 work进程会循环地处理队列中的消息,直到内存超出参数配置才结束进程。当不存在新消息时,会在每次循环中sleep一段时间;

listen: 父进程 + 子进程 的处理模式;
会在所在的父进程会创建一个单次执行模式的work子进程,并通过该work子进程来处理队列中的下一个消息,当这个work子进程退出之后;
所在的父进程会监听到该子进程的退出信号,并重新创建一个新的单次执行的work子进程;

退出时机不同
work: 看上面
listen: 所在的父进程正常情况会一直运行,除非遇到下面两种情况
01: 创建的某个work子进程的执行时间超过了 listen命令行中的--timeout 参数配置;此时work子进程会被强制结束,listen所在的父进程也会抛出一个 ProcessTimeoutException 异常并退出;

开发者可以选择捕获该异常,让父进程继续执行;
02: 所在的父进程因某种原因存在内存泄露,则当父进程本身占用的内存超过了命令行中的 --memory 参数配置时,父子进程均会退出。正常情况下,listen进程本身占用的内存是稳定不变的。

性能不同
work: 是在脚本内部做循环,框架脚本在命令执行的初期就已加载完毕;

listen: 是处理完一个任务之后新开一个work进程,此时会重新加载框架脚本;

因此 work 模式的性能会比listen模式高。
注意: 当代码有更新时,work 模式下需要手动去执行 php think queue:restart 命令重启队列来使改动生效;而listen 模式会自动生效,无需其他操作。

超时控制能力
work: 本质上既不能控制进程自身的运行时间,也无法限制执行中的任务的执行时间;
listen: 可以限制其创建的work子进程的超时时间;

可通过 timeout 参数限制work子进程允许运行的最长时间,超过该时间限制仍未结束的子进程会被强制结束;
expire 和time的区别

expire 在配置文件中设置,指任务的过期时间 这个时间是全局的,影响到所有的work进程
timeout 在命令行参数中设置,指work子进程的超时时间,这个时间只对当前执行的listen 命令有效,timeout 针对的对象是 work 子进程;

使用场景不同

work 适用场景是:
01: 任务数量较多
02: 性能要求较高
03: 任务的执行时间较短
04: 消费者类中不存在死循环,sleep() ,exit() ,die() 等容易导致bug的逻辑

listen 适用场景是:

01: 任务数量较少
02: 任务的执行时间较长
03: 任务的执行时间需要有严格限制

公有操作

由于我们是根据redis来做分析 所以只需要分析src/queue/connector/redis.php
01: 首先调用 src/Queue.php中的魔术方法 __callStatic
02: 在__callStatic方法中调用了 buildConnector
03: buildConnector 中首先加载配置文件 如果无将是同步执行
04: 根据配置文件去创建连接并且传入配置

在redis.php类的构造方法中的操作:
01: 检测redis扩展是否安装
02: 合并配置
03: 检测是redis扩展还是 pRedis
04: 创建连接对象

发布过程

发布参数

参数名默认值描述可以使用的方法
$job要执行任务的类push,later
$data任务数据push,later
$queuedefault任务名称push,later
$delaynull延迟时间later

立即执行

    push($job, $data, $queue)    Queue::push(Test::class, ['id' => 1], 'test');

一顿骚操作后返回一个数组 并且序列化后 rPush到redis中 key为 queue:queueName
数组结构:

[    'job' => $job, // 要执行任务的类    'data' => $data, // 任务数据    'id'=>'xxxxx' //任务id]

写入 redis并且返回队列id
至于中间的那顿骚操作太长了就没写

延迟发布

    later($delay, $job, $data, $queue)    Queue::later(100, Test::class, ['id' => 1], 'test');

跟上面的差不多
一顿骚操作后返回一个数组 并且序列化后 zAdd 到redis中 key为 queue:queueName:delayed score为当前的时间戳+$delay

执行过程

执行过程有work模式和listen模式 两种 区别上面已经说了 代码逻辑由于太多等下回分解;
最后讲一下标签的使用

    //守护进程开启    'worker_daemon_start' => [        \app\index\behavior\WorkerDaemonStart::class    ],    //内存超出    'worker_memory_exceeded' => [        \app\index\behavior\WorkerMemoryExceeded::class    ],    //重启守护进程    'worker_queue_restart' => [        \app\index\behavior\WorkerQueueRestart::class    ],    //任务开始执行之前    'worker_before_process' => [        \app\index\behavior\WorkerBeforeProcess::class    ],    //任务延迟执行    'worker_before_sleep' => [        \app\index\behavior\WorkerBeforeSleep::class    ],    //任务执行失败    'queue_failed' => [        \app\index\behavior\QueueFailed::class    ]

think-queue的示例分析

public function run(Output $output)    {        $output->write('<info>任务执行失败</info>', true);    }

控制台执行 php think queue:work --queue test --daemon
会在控制台一次输出

守护进程开启任务延迟执行

失败的处理 如果有任务执行失败或者执行次数达到最大值
会触发 queue_failed

app\index\behavior@run方法里面写失败的逻辑 比如邮件通知 写入日志

最后我们来说一下如何在其他框架或者项目中给tp的项目推送消息队列,例如两个项目是分开的 另一个使用的却不是tp5的框架

在其他项目中推任务

php版本

<?phpclass Index{    private $redis = null;    public function __construct()    {        $this->redis = new Redis();        $this->redis->connect('127.0.0.1', 6379);        $this->redis->select(10);    }    public function push($job, $data, $queue)    {        $payload = $this->createPayload($job, $data);        $this->redis->rPush('queues:' . $queue, $payload);    }    public function later($delay, $job, $data, $queue)    {        $payload = $this->createPayload($job, $data);        $this->redis->zAdd('queues:' . $queue . ':delayed', time() + $delay, $payload);    }    private function createPayload($job, $data)    {        $payload = $this->setMeta(JSON_encode(['job' => $job, 'data' => $data]), 'id', $this->random(32));        return $this->setMeta($payload, 'attempts', 1);    }    private function setMeta($payload, $key, $value)    {        $payload = json_decode($payload, true);        $payload[$key] = $value;        $payload = json_encode($payload);        if (JSON_ERROR_NONE !== json_last_error()) {            throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg());        }        return $payload;    }    private function random(int $length = 16): string    {        $str = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ';        $randomString = '';        for ($i = 0; $i < $length; $i++) {            $randomString .= $str[rand(0, strlen($str) - 1)];        }        return $randomString;    }}(new Index())->later(10, 'app\index\jobs\Test', ['id' => 1], 'test');

Go版本

package mainimport (    "encoding/json"    "GitHub.com/garyburd/redigo/redis"    "math/rand"    "time")type Payload struct {    Id       string      `json:"id"`    Job      string      `json:"job"`    Data     interface{} `json:"data"`    Attempts int         `json:"attempts"`}var RedisClient *redis.Poolfunc init() {    RedisClient = &redis.Pool{        MaxIdle:     20,        MaxActive:   500,        IdleTimeout: time.Second * 100,        Dial: func() (conn redis.Conn, e error) {            c, err := redis.Dial("tcp", "127.0.0.1:6379")            if err != nil {                return nil, err            }            _, _ = c.Do("SELECT", 10)            return c, nil        },    }}func main() {    var data = make(map[string]interface{})    data["id"] = "1"    later(10, "app\\index\\jobs\\Test", data, "test")}func push(job string, data interface{}, queue string) {    payload := createPayload(job, data)    queueName := "queues:" + queue    _, _ = RedisClient.Get().Do("rPush", queueName, payload)}func later(delay int, job string, data interface{}, queue string) {    m, _ := time.ParseDuration("+1s")    currentTime := time.Now()    op := currentTime.Add(time.Duration(time.Duration(delay) * m)).Unix()    createPayload(job, data)    payload := createPayload(job, data)    queueName := "queues:" + queue + ":delayed"    _, _ = RedisClient.Get().Do("zAdd", queueName, op, payload)}// 创建指定格式的数据func createPayload(job string, data interface{}) (payload string) {    payload1 := &Payload{Job: job, Data: data, Id: random(32), Attempts: 1}    jsonStr, _ := json.Marshal(payload1)    return string(jsonStr)}// 创建随机字符串func random(n int) string {    var str = []rune("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")    b := make([]rune, n)    for i := range b {        b[i] = str[rand.Intn(len(str))]    }    return string(b)}

感谢你能够认真阅读完这篇文章,希望小编分享的“think-queue的示例分析”这篇文章对大家有帮助,同时也希望大家多多支持编程网,关注编程网精选频道,更多相关知识等着你来学习!

--结束END--

本文标题: think-queue的示例分析

本文链接: https://lsjlt.com/news/298723.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • think-queue的示例分析
    这篇文章主要介绍了think-queue的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。think-queue 解析前言分析之前请大家务必了解消息队列的实现tp5的消...
    99+
    2023-06-20
  • Java中Stack与Queue的示例分析
    这篇文章给大家分享的是有关Java中Stack与Queue的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。java基本数据类型有哪些Java的基本数据类型分为:1、整数类型,用来表示整数的数据类型。2、浮...
    99+
    2023-06-15
  • C++中Queue队列类模版的示例分析
    这篇文章主要介绍C++中Queue队列类模版的示例分析,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!1.队列的介绍队列的定义队列(Queue)是一种线性存储结构。它有以下几个特点:按照"先进先出(FIFO,...
    99+
    2023-06-29
  • laravel源码分析队列Queue方法示例
    目录前言队列任务的创建队列任务的分发前言 队列 (Queue) 是 laravel 中比较常用的一个功能,队列的目的是将耗时的任务延时处理,比如发送邮件,从而大幅度缩短 Web 请求...
    99+
    2024-04-02
  • Python中线程安全队列Queue的示例分析
    小编给大家分享一下Python中线程安全队列Queue的示例分析,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!一、什么是队列?像排队一样,从头到尾排成一排,还可以有人继续往后排队,这就是队列。这里学委想说的是Queue这个...
    99+
    2023-06-29
  • Don’t make me think网页理论的实例分析
    这篇文章给大家介绍Don’t make me think网页理论的实例分析,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。第一章确保网页容易被用户使用的最重要原则是“不要让我思考”。设计师应尽量做到让网页不言而喻、一目了...
    99+
    2023-06-08
  • Think PHP 6.0.13反序列化分析
    原poc链接:https://github.com/top-think/framework/issues/2749 think PHP 6.0.13下载 composer create-project topthink/think tp p...
    99+
    2023-10-06
    php web安全
  • MYSQL_GTID的示例分析
    这篇文章给大家分享的是有关MYSQL_GTID的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。 一、GTID概述  GTID是MYS...
    99+
    2024-04-02
  • RBAC的示例分析
    这篇文章主要为大家展示了“RBAC的示例分析”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“RBAC的示例分析”这篇文章吧。什么是权限管理基本上涉及到用户参与的系...
    99+
    2024-04-02
  • vuex的示例分析
    这篇文章主要介绍了vuex的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。如果你在使用 vue.js , 那么我想你可能会对 vue...
    99+
    2024-04-02
  • HappyPack的示例分析
    这篇文章主要介绍HappyPack的示例分析,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!由于运行在 Node.js 之上的 Webpack 是单线程模型的,所以Webpack 需要...
    99+
    2024-04-02
  • jquery.cookie.js的示例分析
    这篇文章主要为大家展示了“jquery.cookie.js的示例分析”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“jquery.cookie.js的示例分析”这...
    99+
    2024-04-02
  • DOM的示例分析
    这篇文章将为大家详细讲解有关DOM的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。DOM(Document Object Modle) 操作文档的编程接口DOM定...
    99+
    2024-04-02
  • FastClick的示例分析
    小编给大家分享一下FastClick的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!用 iOS 在手Q阅读书友交流区发表...
    99+
    2024-04-02
  • webpack4的示例分析
    小编给大家分享一下webpack4的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!新建一个demo文件夹,然后再将命令行...
    99+
    2024-04-02
  • JSON的示例分析
    这篇文章给大家分享的是有关JSON的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。json[{"id":394,"qy_json"...
    99+
    2024-04-02
  • ajax的示例分析
    这篇文章主要介绍了ajax的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。AJAX不是JavaScript的规范,它只是一个哥们“发...
    99+
    2024-04-02
  • Immutable.js的示例分析
    这篇文章将为大家详细讲解有关Immutable.js的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。JavaScript 中的对象一般是可变的(Mutable),...
    99+
    2024-04-02
  • bootstrap的示例分析
    这篇文章给大家分享的是有关bootstrap的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。bootstrap提供了三种类型的下载:1、用于生产环境的 Boot...
    99+
    2024-04-02
  • Final的示例分析
    小编给大家分享一下Final的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!事务的划分首先不可能无限细化,因此肯定是不需要无限的出现子类的。一旦无限出现子...
    99+
    2023-06-06
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作