返回顶部
首页 > 资讯 > 后端开发 > Python >利用Python学习RabbitMQ消息队列
  • 322
分享到

利用Python学习RabbitMQ消息队列

队列消息Python 2022-06-04 19:06:30 322人浏览 薄情痞子

Python 官方文档:入门教程 => 点击学习

摘要

RabbitMQ可以当做一个消息代理,它的核心原理非常简单:即接收和发送消息,可以把它想象成一个邮局:我们把信件放入邮箱,邮递员就会把信件投递到你的收件人处,RabbitMQ就是一个邮箱、邮局、投递员功能综

RabbitMQ可以当做一个消息代理,它的核心原理非常简单:即接收和发送消息,可以把它想象成一个邮局:我们把信件放入邮箱,邮递员就会把信件投递到你的收件人处,RabbitMQ就是一个邮箱、邮局、投递员功能综合体,整个过程就是:邮箱接收信件,邮局转发信件,投递员投递信件到达收件人处。

RabbitMQ和邮局的主要区别就是RabbitMQ接收、存储和发送的是二进制数据----消息。

rabbitmq基本管理命令:

一步启动Erlang node和Rabbit应用:sudo rabbitmq-server

在后台启动Rabbit node:sudo rabbitmq-server -detached

关闭整个节点(包括应用):sudo rabbitmqctl stop


add_user <UserName> <PassWord>
delete_user <UserName>
change_password <UserName> <NewPassword>
list_users
add_vhost <VHostPath>
delete_vhost <VHostPath>
list_vhosts
set_permissions [-p <VHostPath>] <UserName> <Regexp> <Regexp> <Regexp>
clear_permissions [-p <VHostPath>] <UserName>
list_permissions [-p <VHostPath>]
list_user_permissions <UserName>
list_queues [-p <VHostPath>] [<QueueInfoItem> ...]
list_exchanges [-p <VHostPath>] [<ExchangeInfoItem> ...]
list_bindings [-p <VHostPath>]
list_connections [<ConnectionInfoItem> ...]

Demo:

producer.py


 #!/usr/bin/env python
 # -*- coding: utf_ -*-
 # Date: 年月日
 # Author:蔚蓝行
 # 博客 Http://www.cnblogs.com/duanv/
 import pika
 import sys
 #创建连接connection到localhost
 con = pika.BlockinGConnection(pika.ConnectionParameters('localhost'))
 #创建虚拟连接channel
 cha = con.channel()
 #创建队列anheng,durable参数为真时,队列将持久化;exclusive为真时,建立临时队列
 result=cha.queue_declare(queue='anheng',durable=True,exclusive=False)
 #创建名为yanfa,类型为fanout的exchange,其他类型还有direct和topic,如果指定durable为真,exchange将持久化
 cha.exchange_declare(durable=False,
           exchange='yanfa',
           type='direct',)
 #绑定exchange和queue,result.method.queue获取的是队列名称
 cha.queue_bind(exchange='yanfa', 
        queue=result.method.queue,
        routing_key='',) 
 #公平分发,使每个consumer在同一时间最多处理一个message,收到ack前,不会分配新的message
 cha.basic_qos(prefetch_count=)
 #发送信息到队列‘anheng'
 message = ' '.join(sys.argv[:])
 #消息持久化指定delivery_mode=;
 cha.basic_publish(exchange='',
          routing_key='anheng',
          body=message,
          properties=pika.BasicProperties(
           delivery_mode = ,
         ))
 print '[x] Sent %r' % (message,)
 #关闭连接
 con.close()

consumer.py


 #!/usr/bin/env Python
 # -*- coding: utf_ -*-
 # Date: 年月日
 # Author:蔚蓝行
 # 博客 http://www.cnblogs.com/duanv/
 import pika
 #建立连接connection到localhost
 con = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 #创建虚拟连接channel
 cha = con.channel()
 #创建队列anheng
 result=cha.queue_declare(queue='anheng',durable=True)
 #创建名为yanfa,类型为fanout的交换机,其他类型还有direct和topic
 cha.exchange_declare(durable=False,
           exchange='yanfa', 
           type='direct',)
 #绑定exchange和queue,result.method.queue获取的是队列名称
 cha.queue_bind(exchange='yanfa',
        queue=result.method.queue,
        routing_key='',)
 #公平分发,使每个consumer在同一时间最多处理一个message,收到ack前,不会分配新的message
 cha.basic_qos(prefetch_count=)
 print ' [*] Waiting for messages. To exit press CTRL+C'
 #定义回调函数
 def callback(ch, method, properties, body):
   print " [x] Received %r" % (body,)
   ch.basic_ack(delivery_tag = method.delivery_tag)
 cha.basic_consume(callback,
          queue='anheng',
          no_ack=False,)
 cha.start_consuming()

一、概念:

Connection: 一个tcp的连接。Producer和Consumer都是通过TCP连接到RabbitMQ Server的。程序的起始处就是建立这个TCP连接。

Channels: 虚拟连接。建立在上述的TCP连接中。数据流动都是在Channel中进行的。一般情况是程序起始建立TCP连接,第二步就是建立这个Channel。

二、队列:

首先建立一个Connection,然后建立Channels,在channel上建立队列

建立时指定durable参数为真,队列将持久化;指定exclusive为真,队列为临时队列,关闭consumer后该队列将不再存在,一般情况下建立临时队列并不指定队列名称,rabbitmq将随机起名,通过result.method.queue来获取队列名:

result = channel.queue_declare(exclusive=True)

result.method.queue

区别:durable是队列持久化与否,如果为真,队列将在rabbitmq服务重启后仍存在,如果为假,rabbitmq服务重启前不会消失,与consumer关闭与否无关;

而exclusive是建立临时队列,当consumer关闭后,该队列就会被删除

三、exchange和bind

Exchange中durable参数指定exchange是否持久化,exchange参数指定exchange名称,type指定exchange类型。Exchange类型有direct,fanout和topic。

Bind是将exchange与queue进行关联,exchange参数和queue参数分别指定要进行bind的exchange和queue,routing_key为可选参数。

Exchange的三种模式:

Direct:

任何发送到Direct Exchange的消息都会被转发到routing_key中指定的Queue

1.一般情况可以使用rabbitMQ自带的Exchange:””(该Exchange的名字为空字符串);

2.这种模式下不需要将Exchange进行任何绑定(bind)操作;

3.消息传递时需要一个“routing_key”,可以简单的理解为要发送到的队列名字;

4.如果vhost中不存在routing_key中指定的队列名,则该消息会被抛弃。

Demo中虽然声明了一个exchange='yanfa'和queue='anheng'的bind,但是在后面发送消息时并没有使用该exchange和bind,而是采用了direct的模式,没有指定exchange,而是指定了routing_key的名称为队列名,消息将发送到指定队列。

如果一个exchange 声明为direct,并且bind中指定了routing_key,那么发送消息时需要同时指明该exchange和routing_key.

Fanout:

任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上

1.可以理解为路由表的模式

2.这种模式不需要routing_key

3.这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定。

4.如果接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃。

Demo中创建了一个将一个exchange和一个queue进行fanout类型的bind.但是发送信息时没有用到它,如果要用到它,只要在发送消息时指定该exchange的名称即可,该exchange就会将消息发送到所有和它bind的队列中。在fanout模式下,指定的routing_key是无效的 。

Topic:

任何发送到Topic Exchange的消息都会被转发到所有关心routing_key中指定话题的Queue上

1.这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个“标题”(routing_key),Exchange会将消息转发到所有关注主题能与routing_key模糊匹配的队列。

2.这种模式需要routing_key,也许要提前绑定Exchange与Queue。

3.在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心所有涉及log的消息(一个routing_key为”MQ.log.error”的消息会被转发到该队列)。

4.“#”表示0个或若干个关键字,“*”表示一个关键字。如“log.*”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。

5.同样,如果Exchange没有发现能够与routing_key匹配的Queue,则会抛弃此消息。

四、任务分发

1.Rabbitmq的任务是循环分发的,如果开启两个consumer,producer发送的信息是轮流发送到两个consume的。

2.在producer端使用cha.basic_publish()来发送消息,其中body参数就是要发送的消息,properties=pika.BasicProperties(delivery_mode = 2,)启用消息持久化,可以防止RabbitMQ Server 重启或者crash引起的数据丢失。

3.在接收端使用cha.basic_consume()无限循环监听,如果设置no-ack参数为真,每次Consumer接到数据后,而不管是否处理完成,RabbitMQ Server会立即把这个Message标记为完成,然后从queue中删除了。为了保证数据不被丢失,RabbitMQ支持消息确认机制,即acknowledgments。为了保证数据能被正确处理而不仅仅是被Consumer收到,那么我们不能采用no-ack。而应该是在处理完数据后发送ack。

在处理数据后发送的ack,就是告诉RabbitMQ数据已经被接收,处理完成,RabbitMQ可以去安全的删除它了。如果Consumer退出了但是没有发送ack,那么RabbitMQ就会把这个Message发送到下一个Consumer。这样就保证了在Consumer异常退出的情况下数据也不会丢失。

这里并没有用到超时机制。RabbitMQ仅仅通过Consumer的连接中断来确认该Message并没有被正确处理。也就是说,RabbitMQ给了Consumer足够长的时间来做数据处理。

Demo的callback方法中ch.basic_ack(delivery_tag = method.delivery_tag)告诉rabbitmq消息已经正确处理。如果没有这条代码,Consumer退出时,Message会重新分发。然后RabbitMQ会占用越来越多的内存,由于RabbitMQ会长时间运行,因此这个“内存泄漏”是致命的。去调试这种错误,可以通过一下命令打印un-acked Messages:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

4.公平分发:设置cha.basic_qos(prefetch_count=1),这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。

五、注意:

生产者和消费者都应该声明建立队列,网上教程上说第二次创建如果参数和第一次不一样,那么该操作虽然成功,但是queue的属性并不会被修改。

可能因为版本问题,在我的测试中如果第二次声明建立的队列属性和第一次不完全相同,将报类似这种错406, "PRECONDITION_FAILED - parameters for queue 'anheng' in vhost '/' not equivalent"

如果是exchange第二次创建属性不同,将报这种错406, "PRECONDITION_FAILED - cannot redeclare exchange 'yanfa' in vhost '/' with different type, durable, internal or autodelete value"

如果第一次声明建立队列也出现这个错误,说明之前存在名字相同的队列且本次声明的某些属性和之前声明不同,可通过命令sudo rabbitmqctl list_queues查看当前有哪些队列。解决方法是声明建立另一名称的队列或删除原有队列,如果原有队列是非持久化的,可通过重启rabbitmq服务删除原有队列,如果原有队列是持久化的,只能删除它所在的vhost,然后再重建vhost,再设置vhost的权限(先确认该vhost中没有其他有用队列)。


sudo rabbitmqctl delete_vhost /
sudo rabbitmqctl add_vhost /
sudo rabbitmqctl set_permissions -p / username '.*' '.*' '.*'

以上内容是小编给大家介绍的利用Python学习RabbitMQ消息队列,希望大家喜欢。

--结束END--

本文标题: 利用Python学习RabbitMQ消息队列

本文链接: https://lsjlt.com/news/15607.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • 利用Python学习RabbitMQ消息队列
    RabbitMQ可以当做一个消息代理,它的核心原理非常简单:即接收和发送消息,可以把它想象成一个邮局:我们把信件放入邮箱,邮递员就会把信件投递到你的收件人处,RabbitMQ就是一个邮箱、邮局、投递员功能综...
    99+
    2022-06-04
    队列 消息 Python
  • RabbitMQ消息队列
      一、简介   RabbitMQ是一个在AMQP基础上完整的、可复用的企业消息系统,遵循Mozilla Public License开源协议。MQ全称Message Queue(消息队列),它是一种应用程序对应用程序的通信方式。应用程序...
    99+
    2023-01-31
    队列 消息 RabbitMQ
  • RabbitMQ 消息队列
    RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。 MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写...
    99+
    2023-01-31
    队列 消息 RabbitMQ
  • springboot整合消息队列RabbitMQ
    前言: RabbitMQ常用的三种Exchange Type:fanout、direct、topic。 fanout:把所有发送到该Exchange的消息投递到所有与它绑定的队列中。...
    99+
    2024-04-02
  • rabbitmq学习系列教程之消息应答(autoAck)、队列持久化(durable)及消息持久化
    目录一、前言二、autoAck参数的讨论三、rabbitmq队列持久化操作四、2019.11.04问题补充五、2019.11.07消息的持久化六、2022.02.09增加队列持久化说...
    99+
    2024-04-02
  • Python消息队列
    消息中间件 --->就是消息队列异步方式:不需要立马得到结果,需要排队同步方式:需要实时获得数据,坚决不能排队例子:#多进程模块multiprocessingfrom multiprocessing import Processfro...
    99+
    2023-01-31
    队列 消息 Python
  • 如何利用rabbitMq的死信队列实现延时消息
    目录前言mq基本的消息模型mq死信队列的消息模型maven依赖配置普通队列和死信队列死信队列消费者发送消息测试测试成功总结前言 使用mq自带的死信去实现延时消息要注意一个坑点,就是m...
    99+
    2023-01-28
    rabbitMq死信队列 rabbitMq延时消息 rabbitMq延时队列
  • Spring整合消息队列RabbitMQ流程
    目录搭建生产者工程创建工程添加依赖配置整合发送消息搭建消费者工程创建工程添加依赖配置整合消息监听器搭建生产者工程 创建工程 添加依赖 修改pom.xml文件内容为如下: <...
    99+
    2023-03-20
    Spring RabbitMQ Spring整合消息队列 Spring整合RabbitMQ
  • PHP怎么实现RabbitMQ消息列队
    这篇“PHP怎么实现RabbitMQ消息列队”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“PHP怎么实现RabbitMQ消息...
    99+
    2023-06-30
  • rabbitMQ怎么复制队列内消息
    要复制RabbitMQ队列内的消息,可以使用RabbitMQ的镜像队列功能。镜像队列功能可以将一个队列中的消息复制到多个节点上,以提...
    99+
    2024-02-29
    rabbitmq
  • MQ消息队列中间件RabbitMQ怎么用
    小编给大家分享一下MQ消息队列中间件RabbitMQ怎么用,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!MQ消息队列中间件—RabbitMQ消息中间件主要用于组件...
    99+
    2023-06-04
  • Java分布式学习之Kafka消息队列
    目录介绍Kafka核心相关名称kafka集群安装kafka使用kafka文件存储Springboot整合kafka介绍 Apache Kafka 是分布式发布-订阅消息系统,在 ka...
    99+
    2024-04-02
  • 解决RabbitMq消息队列Qos Prefetch消息堵塞问题
    mq是实现代码扩展的有利手段,个人喜欢用概念来学习新知识,介绍堵塞问题的之前,先来段概念的学习。 ConnectionFactory:创建connection的工厂类 Connect...
    99+
    2024-04-02
  • python消息队列Queue
    实例1:消息队列Queue,不要将文件命名为“queue.py”,否则会报异常“ImportError: cannot import name 'Queue'”#coding=utf-8 from multiprocessing impor...
    99+
    2023-01-31
    队列 消息 python
  • rabbitmq怎么删除队列中的消息
    要删除RabbitMQ队列中的消息,需要使用RabbitMQ的管理界面或者通过编程方式使用RabbitMQ的API。 使用Rab...
    99+
    2023-10-23
    rabbitmq
  • RabbitMQ消息队列的特点有哪些
    RabbitMQ消息队列的特点包括: 可靠性:RabbitMQ使用持久化机制来确保消息的可靠性。它将消息存储在磁盘上,即使在重启...
    99+
    2023-10-25
    RabbitMQ
  • 详解消息队列及RabbitMQ部署和使用
    目录什么是消息队列为什么需要消息队列常见的消息队列ActiveMQRabbitMQZeroMQKafkaRocketMQRabbitMQ 的部署和使用Python 编写生产者Pyth...
    99+
    2024-04-02
  • RabbitMQ消息队列的应用场景有哪些
    RabbitMQ是一个高性能的开源消息中间件,它可以在分布式系统中传递和存储大量的消息。它的应用场景非常广泛,包括但不限于以下几个方...
    99+
    2023-09-20
    rabbitmq
  • 如何利用redis做消息队列
    利用redis做消息队列的示例:生产者模拟程序,代码:package scheduleTest; import java.util.Random; import java.util.UUID; import redis.clients.je...
    99+
    2024-04-02
  • java中RabbitMQ消息队列指的是什么
    这篇文章主要介绍了java中RabbitMQ消息队列指的是什么,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。1、说明RabbitMQ是用Erlang实现的一个高并发高可靠AM...
    99+
    2023-06-15
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作