返回顶部
首页 > 资讯 > 后端开发 > PHP编程 >详解PHP多进程消费队列
  • 554
分享到

详解PHP多进程消费队列

2024-04-02 19:04:59 554人浏览 泡泡鱼
摘要

目录引言Nginx进程模型进程设计进程信号量设计PHP安装修信号量信号量和系统调用daemon(守护)进程命令设计启动命令强制停止命令强制重启命令平滑停止命令平滑重启命令查看进程状态

引言

最近开发一个小功能,用到了队列mcq,启动一个进程消费队列数据,后边发现一个进程处理不过来了,又加了一个进程,过了段时间又处理不过来了......

这种方式每次都要修改crontab,如果进程挂掉了,不会及时的启动,要等到下次crontab执行的时候才会启动。关闭(重启)进程的时候用的是kill,这可能会丢失正在处理的数据,比如下面这个例子,我们假设sleep过程就是处理逻辑,这里为了明显看出效果,将处理时间放大到10s:


<?php
$i = 1;
while (1) {
    echo "开始第[{$i}]次循环\n";
    sleep(10);
    echo "结束第[{$i}]次循环\n";
    $i++;
}

当我们运行脚本之后,等到循环开始之后,给进程发送kill {$pid},默认发送的是编号为15的SIGTERM信号。假设$i是从队列拿到的,拿到2的时候,正在处理,我们给程序发送了kill信号,和队列数据丢失一样,问题比较大,因此我要想办法解决这些问题。

开始第[1]次循环

结束第[1]次循环

开始第[2]次循环

[1]    28372 terminated  php t.php

nginx进程模型

这时候我想到了nginx,nginx作为高性能服务器的中流砥柱,为成千上万的企业和个人服务,他的进程模型比较经典,如下所示:

管理员通过master进程和nginx进行交互,从/path/to/nginx.pid读取nginx master进程的pid,发送信号给master进程,master根据不同的信号做出不同的处理,然后反馈信息给管理员。worker是master进程fork出来的,master负责管理worker,不会去处理业务,worker才是具体业务的处理者,master可以控制worker的退出、启动,当worker意外退出,master会收到子进程退出的消息,也会重新启动新的worker进程补充上来,不让业务处理受影响。nginx还可以平滑退出,不丢失任何一个正在处理的数据,更新配置时nginx可以做到不影响线上服务来加载新的配置,这在请求量很大的时候特别有用。

进程设计

看了nginx的进模型,我们完全可以开发一个类似的类库来满足处理mcq数据的需求,做到单文件控制所有进程、可以平滑退出、可以查看子进程状态。不需要太复杂,因为我们处理队列数据接收一定的延迟,做到nginx那样不间断服务比较麻烦,费时费力,意义不是很大。设计的进程模型跟nginx类似,更像是nginx的简化版本。

进程信号量设计

信号量是进程间通讯的一种方式,比较简单,单功能也比较弱,只能发送信号给进程,进程根据信号做出不同的处理。

master进程启动的时候保存pid到文件/path/to/daeminze.pid,管理员通过信号和master进程通讯,master进程安装3种信号,碰到不同的信号,做出不同的处理,如下所示:

SIGINT => 平滑退出,处理完正在处理的数据再退出

SIGTERM => 暴力退出,无论进程是否正在处理数据直接退出

SIGUSR1 => 查看进程状态,查看进程占用内存,运行时间等信息

master进程通过信号和worker进程通讯,worker进程安装了2个信号,如下所示:

SIGINT => 平滑退出

SIGUSR1 => 查看worker进程自身状态

为什么worker进程只安装2个信号呢,少了个SIGTERM,因为master进程收到信号SIGTERM之后,向worker进程发送SIGKILL信号,默认强制关闭进程即可。

worker进程是通过master进程fork出来的,这样master进程可以通过pcntl_wait来等待子进程退出事件,当有子进程退出的时候返回子进程pid,做处理并启动新的进程补充上来。

master进程也通过pcntl_wait来等待接收信号,当有信号到达的时候,会返回-1,这个地方还有些坑,在下文中会详细讲。

PHP中有2种信号触发的方式,第一种方式是declare(ticks = 1);,这种效率不高,Zend每执行一次低级语句,都会去检查进程中是否有未处理的信号,现在已经很少使用了,PHP 5.3.0及之前的版本可能会用到这个。

第二种是通过pcntl_signal_dispatch来调用未处理的信号,PHP 5.4.0及之后的版本适用,可以巧妙的将该函数放在循环中,性能上基本没什么损失,现在推荐适用。

PHP安装修信号量

PHP通过pcntl_signal安装信号,函数声明如下所示:


bool pcntl_signal ( int $signo , [callback $handler [, bool $restart_syscalls = true ] )

第三个参数restart_syscalls不太好理解,找了很多资料,也没太查明白,经过试验发现,这个参数对pcntl_wait函数接收信号有影响,当设置为缺省值true的时候,发送信号,进程用pcntl_wait收不到,必须设置为false才可以,看看下面这个例子:


<?php
$i = 0;
while ($i<5) {
    $pid = pcntl_fork();
    $random = rand(10, 50);
    if ($pid == 0) {
        sleep($random);
        exit();
    }
    echo "child {$pid} sleep {$random}\n";
    $i++;
}

pcntl_signal(SIGINT,  function($signo) {
     echo "Ctrl + C\n";
});

while (1) {
    $pid = pcntl_wait($status);
    var_dump($pid);
    pcntl_signal_dispatch();
}

运行之后,我们对父进程发送kill -SIGINT {$pid}信号,发现pcntl_wait没有反应,等到有子进程退出的时候,发送过的SIGINT会一个个执行,比如下面结果:

child 29643 sleep 48

child 29644 sleep 24

child 29645 sleep 37

child 29646 sleep 20

child 29647 sleep 31

int(29643)

Ctrl + C

Ctrl + C

Ctrl + C

Ctrl + C

int(29646)

这是运行脚本之后马上给父进程发送了四次SIGINT信号,等到一个子进程推出的时候,所有信号都会触发。

但当把安装信号的第三个参数设置为false:


pcntl_signal(SIGINT,  function($signo) {
     echo "Ctrl + C\n";
}, false);

这时候给父进程发送SIGINT信号,pcntl_wait会马上返回-1,信号对应的事件也会触发。

所以第三个参数大概意思就是,是否重新注册此信号,如果为false只注册一次,触发之后就返回,pcntl_wait就能收到消息,如果为true,会重复注册,不会返回,pcntl_wait收不到消息。

信号量和系统调用

信号量会打断系统调用,让系统调用立刻返回,比如sleep,当进程正在sleep的时候,收到信号,sleep会马上返回剩余sleep秒数,比如:


<?php
pcntl_signal(SIGINT,  function($signo) {
     echo "Ctrl + C\n";
}, false);

while (true) {
	pcntl_signal_dispatch();
    echo "123\n";
    $limit = sleep(2);
	echo "limit sleep [{$limit}] s\n";
}

运行之后,按Ctrl + C,结果如下所示:

123

^Climit sleep [1] s

Ctrl + C

123

limit sleep [0] s

123

^Climit sleep [1] s

Ctrl + C

123

^Climit sleep [2] s

daemon(守护)进程

这种进程一般设计为daemon进程,不受终端控制,不与终端交互,长时间运行在后台,而对于一个进程,我们可以通过下面几个步骤把他升级为一个标准的daemon进程:


protected function daemonize()
{
    $pid = pcntl_fork();
    if (-1 == $pid) {
        throw new Exception("fork进程失败");
    } elseif ($pid != 0) {
        exit(0);
    }
    if (-1 == posix_setsid()) {
        throw new Exception("新建立session会话失败");
    }

    $pid = pcntl_fork();
    if (-1 == $pid) {
        throw new Exception("fork进程失败");
    } else if($pid != 0) {
        exit(0);
    }

    umask(0);
    chdir("/");
}

拢共分五步:

1.fork子进程,父进程退出。

2.设置子进程为会话组长,进程组长。

3.再次fork,父进程退出,子进程继续运行。

4.恢复文件掩码为0。

5.切换当前目录到根目录/。

第2步是为第1步做准备,设置进程为会话组长,必要条件是进程非进程组长,因此做第一次fork,进程组长(父进程)退出,子进程通过posix_setsid()设置为会话组长,同时也为进程组长。

第3步是为了不让进程重新控制终端,因为一个进程控制一个终端的必要条件是会话组长(pid=sid)。

第4步是为了恢复默认的文件掩码,避免之前做的操作对文件掩码做了设置,带来不必要的麻烦。关于文件掩码, linux中,文件掩码在创建文件、文件夹的时候会用到,文件的默认权限为666,文件夹为777,创建文件(夹)的时候会用默认值减去掩码的值作为创建文件(夹)的最终值,比如掩码022下创建文件666 - 222 = 644,创建文件夹777 - 022 = 755:

掩码 新建文件权限 新建文件夹权限
umask(0) 666 (-rw-rw-rw-) 777 (drwxrwxrwx)
umask(022) 644 (-rw-r--r--) 755 (drwxr-xr-x)

第5步是切换了当前目录到根目录/,网上说避免起始运行他的目录不能被正确卸载,这个不是太了解。

对应5步,每一步的各种id变化信息:

操作后 pid ppid pgid sid
开始 17723 31381 17723 31381
第一次fork 17723 1 17723 31381
posix_setsid() 17740 1 17740 17740
第二次fork 17840 1 17740 17740

另外,会话、进程组、进程的关系如下图所示,这张图有助于更好的理解。

至此,你也可以轻松地造出一个daemon进程了。

命令设计

我准备给这个类库设计6个命令,如下所示:

1.start 启动命令

2.restart 强制重启

3.stop 平滑停止

4.reload 平滑重启

5.quit 强制停止

6.status 查看进程状态

启动命令

启动命令就是默认的流程,按照默认流程走就是启动命令,启动命令会检测pid文件中是否已经有pid,pid对应的进程是否健康,是否需要重新启动。

强制停止命令

管理员通过入口文件结合pid给master进程发送SIGTERM信号,master进程给所有子进程发送SIGKILL信号,等待所有worker进程退出后,master进程也退出。

强制重启命令

强制停止命令+启动命令

平滑停止命令

平滑停止命令,管理员给master进程发送SIGINT信号,master进程给所有子进程发送SIGINT,worker进程将自身状态标记为stoping,当worker进程下次循环的时候会根据stoping决定停止,不在接收新的数据,等所有worker进程退出之后,master进程也退出。

平滑重启命令

平滑停止命令+启动命令

查看进程状态

查看进程状态这个借鉴了workerman的思路,管理员给master进程发送SIGUSR1信号,告诉主进程,我要看所有进程的信息,master进程,master进程将自身的进程信息写入配置好的文件路径A中,然后发送SIGUSR1,告诉worker进程把自己的信息也写入文件A中,由于这个过程是异步的,不知道worker进程啥时候写完,所以master进程在此处等待,等所有worker进程都写入文件之后,格式化所有的信息输出,最后输出的内容如下所示:

➜/dir /usr/local/bin/php DaemonMcn.php status

Daemon [DaemonMcn] 信息:

-------------------------------- master进程状态 --------------------------------

pid       占用内存       处理次数       开始时间                 运行时间

16343     0.75M          --             2018-05-15 09:42:45      0 天 0 时 3 分

12 slaver

-------------------------------- slaver进程状态 --------------------------------

任务task-mcq:

16345     0.75M          236            2018-05-15 09:42:45      0 天 0 时 3 分

16346     0.75M          236            2018-05-15 09:42:45      0 天 0 时 3 分

--------------------------------------------------------------------------------

任务test-mcq:

16348     0.75M          49             2018-05-15 09:42:45      0 天 0 时 3 分

16350     0.75M          49             2018-05-15 09:42:45      0 天 0 时 3 分

16358     0.75M          49             2018-05-15 09:42:45      0 天 0 时 3 分

16449     0.75M          1              2018-05-15 09:46:40      0 天 0 时 0 分

--------------------------------------------------------------------------------

等待worker进程将进程信息写入文件的时候,这个地方用了个比较trick的方法,每个worker进程输出一行信息,统计文件的行数,达到worker进程的行数之后表示所有worker进程都将信息写入完毕,否则,每个1s检测一次。

以上就是详解PHP多进程消费队列的详细内容,更多关于PHP多进程消费队列的资料请关注编程网其它相关文章!

--结束END--

本文标题: 详解PHP多进程消费队列

本文链接: https://lsjlt.com/news/127559.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • 详解PHP多进程消费队列
    目录引言nginx进程模型进程设计进程信号量设计PHP安装修信号量信号量和系统调用daemon(守护)进程命令设计启动命令强制停止命令强制重启命令平滑停止命令平滑重启命令查看进程状态...
    99+
    2024-04-02
  • PHP中的多进程消费队列有什么用
    小编给大家分享一下PHP中的多进程消费队列有什么用,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!最近开发一个小功能,用到了队列mcq,启动一个进程消费队列数据,后边发现一个进程处理不过来了,又加了一个进程,过了段时间又处理...
    99+
    2023-06-06
  • python—多进程的消息队列
    消息队列消息队列是在消息的传输过程中保存消息的容器消息队列最经典的用法就是消费者 和生产者之间通过消息管道传递消息,消费者和生成者是不同的进程。生产者往管道写消息,消费者从管道中读消息操作系统提供了很多机制来实现进程间的通信,multipr...
    99+
    2023-01-31
    队列 进程 消息
  • FreeRTOS-消息队列详解
    ✅作者简介:嵌入式入坑者,与大家一起加油,希望文章能够帮助各位!!!! 📃个人主页:@rivencode的个人主页 🔥系列专栏:玩转FreeRTOS Ὂ...
    99+
    2023-09-29
    java 网络 开发语言
  • 详解RedisStream做消息队列
    目录ListPub/subStreamConsumer Grouplast_delivered_idpending_idscurdpending_ids如何避免消息丢失嵌入...
    99+
    2024-04-02
  • Linux消息队列实现进程间通信实例详解
    Linux消息队列实现进程间通信实例详解 一、什么是消息队列 消息队列提供了一种从一个进程向另一个进程发送一个数据块的方法。 每个数据块都被认为含有一个类型,接收进程可以独立地接收含有不同类型的数据结构...
    99+
    2022-06-04
    队列 详解 实例
  • RabbitMQ消费端ACKNACK及重回队列机制详解
    目录ACK和NACK使用方式1、basicNack2、basicAck3、basicReject4、basicRecover消费端的重回队列实际运用ACK和NACK 当我们使用Ra...
    99+
    2022-12-26
    RabbitMQ消费端ACK NACK RabbitMQ消费端重回队列
  • RabbitMq同一队列多个消费者问题
    RabbitMQ只有Queue,如果多个消费者绑定同一个queue,那么一条消息,只能被其中一个消费者取走(轮询)。本质上,RabbitMq的消费者的消息确认机制,就注定不可能让多个消费者同时去消费同一个队列中的同一条消息,只能轮询的方式去...
    99+
    2023-09-20
    java-rabbitmq rabbitmq java
  • 详解Redis Stream做消息队列
    目录ListPub/subStreamConsumer Grouplast_delivered_idpending_idscurdpending_ids如何避免消息丢失嵌入SpringBoot注册Redis s...
    99+
    2024-04-02
  • MSMQ微软消息队列详解
    一、引言 Windows Communication Foundation(WCF)是Microsoft为构建面向服务的应用程序而提供的统一编程模型,该服务模型提供了支持松散耦合和版...
    99+
    2024-04-02
  • golang 队列消费需要多个客户端吗
    golang 队列消费是否需要多个客户端取决于特定场景:队列容量、消息到达频率:容量大或到达频率高时需要多个客户端。并行处理:需要并行处理时需多个客户端。容错性:为提高容错性需多个客户端...
    99+
    2024-04-21
    golang
  • Java进程间通信之消息队列
    目录消息队列1.消息队列的原理2.消息队列的接口:2.1创建消息队列2.2向消息队列发送消息2.3接收消息:2.4操作消息队列的接口2.5代码测试:信号量:信号量的原理总结消息队列 ...
    99+
    2024-04-02
  • C++实现简单的生产者-消费者队列详解
    本文的代码都是ChatGPT生成,我只是做了微小的调整和整合,AI提示词如下: 设计一个C++类,支持生产者-消费者模型,可以通过size函数获取剩余数量 可能第一次生成的不一定合适...
    99+
    2023-05-18
    C++实现生产者消费者队列 C++生产者消费者队列 C++队列
  • thinkphp5.0消息队列topthink/think-queue详解
    第一、安装topthink/think-queue composer require topthink/think-queue=1.1.6 第二、配置queue.php信息 找到应用目录下面的extra/queue.php进行配置,队列驱...
    99+
    2023-10-07
    php 开发语言
  • SpringBoot整合rockerMQ消息队列详解
    目录Springboot整合RockerMQ使用总结消费模式生产者组和消费者组生产者投递消息的三种方式如何保证消息不丢失顺序消息分布式事务Springboot整合RockerMQ 1...
    99+
    2024-04-02
  • 关于 Laravel Redis 多个进程同时取队列问题详解
    最近在工作中遇到了一个问题,开启多个进程处理队列会重复读取 Redis 中队列吗?是否因此导致重复执行任务?下面就来通过示例代码详细介绍下。 使用 Supervisor 监听 Laravel 队列任务,其中 Supervisor 的配置如下...
    99+
    2023-09-25
    laravel redis 数据库
  • AndroidMessageQueue消息队列主要作用详解
    目录定义模型关系内部属性行为约束使用建议消息队列相关概念消息队列的消费场景消息至多被消费一次消息至少被消费一次消息仅被消费一次实践Hello World消息队列好处或功能定义 队列是...
    99+
    2023-02-16
    Android MessageQueue Android MessageQueue消息队列 Android消息队列
  • PHP实现异步延迟消息队列的方法详解
    目录一、前言二、场景三、安装RabbitMQ延迟队列插件四、在Laravel框架中进行使用五、执行生产消息和消费消息一、前言 需求:电商秒杀场景中,如果用户下单10分钟未支付,需要进...
    99+
    2024-04-02
  • 详解Redis用链表实现消息队列
    前言 Redis链表经常会被用于消息队列的服务,以完成多程序之间的消息交换。个人认为redis消息队列有一个好处,就是可以实现分布式和共享,就和memcache作为mysql的缓存和mysql自带的缓存一样...
    99+
    2022-06-04
    队列 详解 链表
  • 分布式消息队列RocketMQ概念详解
    目录1.MQ概述1.1 RocketMQ简介1.2 MQ用途1.3 常见MQ产品2.RocketMQ 基本概念2.1 消息2.2 主题2.3 标签2.4 队列2.5 Producer...
    99+
    2023-05-19
    分布式消息队列RocketMQ概念 消息队列RocketMQ RocketMQ概念
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作