返回顶部
首页 > 资讯 > 后端开发 > Python >3Python全栈之路系列之Rabbit
  • 359
分享到

3Python全栈之路系列之Rabbit

之路系列之Python 2023-01-31 06:01:18 359人浏览 八月长安

Python 官方文档:入门教程 => 点击学习

摘要

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件。RabbitMQ服务器是用Erlang语言编写的,它可以为你的应用提供一个通用的消息发送和接收平台,并且保证消息在传输过程中的安全,RabbitMQ官网,RabbitM


RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件。RabbitMQ服务器是用Erlang语言编写的,它可以为你的应用提供一个通用的消息发送和接收平台,并且保证消息在传输过程中的安全,RabbitMQ官网,RabbitMQ中文文档。


安装RabbitMQ

安装EPEL源

[root@anshengme ~]# yum -y install epel-release

安装erlang

[root@anshengme ~]# yum -y install erlang

安装RabbitMQ

[root@anshengme ~]# yum -y install rabbitmq-server

启动并设置开机器启动

在启动RabbitMQ之前需要hostname的解析,要不然启动不起来

[root@anshengme ~]# cat /etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4 anshengme
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
[root@anshengme ~]# systemctl start rabbitmq-server
[root@anshengme ~]# systemctl enable rabbitmq-server
Created symlink from /etc/systemd/system/multi-user.target.wants/rabbitmq-server.service to /usr/lib/systemd/system/rabbitmq-server.service.

查看启动状态

[root@anshengme ~]# netstat -tulnp |grep 5672
tcp        0      0 0.0.0.0:25672           0.0.0.0:*               LISTEN      37507/beam.smp      
tcp6       0      0 :::5672                 :::*                    LISTEN      37507/beam.smp

pika

pika模块是官方认可的操作RabbitMQapi接口。

安装pika

pip3 install pika

pika:https://pypi.python.org/pypi/pika

测试

>>> import pika

Work Queues

如果你启动了多个消费者,那么生产者生产的任务会根据顺序的依次让消费者来执行,这就是Work Queues模式

wKiom1kVKWmS6h7vAABZtJSB-Ow435.png

生产者代码

#!/usr/bin/env Python
# _*_ codin:utf-8 _*_

import pika

# 连接到RabbitMQ 这是一个阻塞的连接
connection = pika.BlockinGConnection(pika.ConnectionParameters('192.168.56.100'))

# 生成一个管道
channel = connection.channel()

# 通过管道创建一个队列
channel.queue_declare(queue='hello')

# 在队列内发送数据,body内容,routing_key队列,exchange交换器,通过交换器往hello队列内发送Hello World!数据
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')

# 关闭连接
connection.close()

消费者代码

#!/usr/bin/env python
# _*_ codin:utf-8 _*_

import pika
# 连接到RabbitMQ 这是一个阻塞的连接
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.56.100'))

# 生成一个管道
channel = connection.channel()

# 如果消费者连接到这个队列的时候,队列没有生成,那么消费者就生成这个队列,如果这个队列已经生成了,那么就忽略它
channel.queue_declare(queue='hello')

# 回调函数
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    
# 消费,当收到hello队列的消息的时候就,就调用callback函数,no_ack消费者在处理任务的时候要不需要确认任务已经处理完成,改为False则要确认
channel.basic_consume(callback, queue='hello', no_ack=True)

# 开始接受任务,阻塞
channel.start_consuming()

持久化

队列持久化

试想,如果我们的消费者在执行任务执行到一半时,突然down掉了,我们可以更改no_ack=False来让消费者每次执行完成完成之后确认执行完毕了再把这个任务在队列中移除移除掉,但是如果RabbitMQ的服务器停止我们的任务仍然会丢失。

首先,我们需要确保的RabbitMQ永远不会在我们的队列中失去,为了做到这一点,我们需要把durable=True,声明一个新名称的队列,为task_queue

channel.queue_declare(queue='task_queue', durable=True)

durable需要在生产者和消费者上面都需要写上,且durable只会让我们的队列持久化,并不能够让消息持久化。

消息持久化

消息持久化只需要在添加消息的时候添加一个delivery_mode=2

channel.basic_publish(exchange='',
                      routing_key='world',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          # 2=消息持久化
                          delivery_mode=2,
                      ))

在消费者的callback函数内添加以下代码:

ch.basic_ack(delivery_tag = method.delivery_tag)

消息公平分发

每一个消费者同时只处理一个任务,比如说现在有三个消费者,刚开始来了三个任务,平均分配给了三个消费者,那么这三个消费者目前都在同时执行任务,当第四个任务到来的时候依旧会分配给第一个消费者,第五个任务到来的时候会分配给第二个消费者,以此类推。

那么以上的状况有什么不妥呢?譬如说不同的消费者执行任务的时间不同,我们现在需要的时候,当三个消费者都在执行任务的时候,比如说第二个消费者任务执行完了,其他消费者都还在执行任务,当第四个任务到来的时候希望交给第二个消费者,若要实现此功能,只需要在消费者加上一下代码即可:

channel.basic_qos(prefetch_count=1)

完整的代码如下

消费者代码

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='192.168.56.100'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(10)
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)
    
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')
                      
channel.start_consuming()

生产者代码

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='192.168.56.100'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

for n in range(10):
    message = "Hello World! %s" % (n + 1)
    channel.basic_publish(exchange='',
                          routing_key='task_queue',
                          body=message,
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # make message persistent
                          ))
    print(" [x] Sent %r" % message)
connection.close()

消息传输类型

之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,

Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息

属性描述
fanout所有bind到此exchange的queue都可以接收消息
direct通过routingKey和exchange决定的那个唯一的queue可以接收消息
topic所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息

fanout(发布订阅)

只要有消费者,那么我生产者发布一条消息的时候所有的消费者都会被收到

wKioL1kVKmbR6t5hAAA2MyN-t_g055.png

# 消费者
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.56.100'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', type='fanout')
# 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
result = channel.queue_declare(exclusive=True)
# 获取queue的name
queue_name = result.method.queue
# 把queue绑定到exchange
channel.queue_bind(exchange='logs', queue=queue_name)
def callback(ch, method, properties, body):
    print(" [x] %r" % body)
channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming()
# 生产者
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.56.100'))
channel = connection.channel()
# fanout发送给所有人
channel.exchange_declare(exchange='logs', type='fanout')
channel.basic_publish(exchange='logs', routing_key='', body="Hello World!")
connection.close()

direct(关键字)

RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

wKiom1kVKprTftxpAABNvwKE9II350.png

生产者代码

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='192.168.56.100'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')
                         
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

消费者代码

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.56.100'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')
                         
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)
    
for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)
                       
print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
    
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
                      
channel.start_consuming()

topic(模糊匹配)

在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

表达式符号说明:

符号描述
#表示可以匹配0个多个单词
*表示只能匹配一个单词
发送者路由值队列中是否匹配
ansheng.meansheng.*不匹配
ansheng.meansheng.#匹配

消费者代码

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.56.100'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')
                         
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)
    
for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)
                       
print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
    
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
                      
channel.start_consuming()

生产者代码

#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.56.100'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
                         type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

rpc(Remote procedure call)

客户端发送一个任务到服务端,服务端把任务的执行结果再返回给客户端

wKiom1kVK2rB1RUhAACEQPOtmes668.png

# _*_coding:utf-8_*_
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='192.168.56.100'))
channel = connection.channel()
# 声明一个RPC QUEUE

channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)
        
def on_request(ch, method, props, body):
    # 接受传过来的值
    n = int(body)
    print(" [.] fib(%s)" % n)
    # 交给fib函数进行斐波那契处理
    response = fib(n)
    # 把结果发回去,此时消费者变成生产者
    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     # 客户端传过来的UUID顺便发回去
                     properties=pika.BasicProperties(correlation_id=props.correlation_id),
                     body=str(response))
    # 持久化
    ch.basic_ack(delivery_tag=method.delivery_tag)
    
# 同时只处理一个任务
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')
print(" [x] Awaiting RPC requests")
channel.start_consuming()

RPC Client

# _*_coding:utf-8_*_
import pika
import uuid

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='192.168.56.100'))
            
        self.channel = self.connection.channel()
        
        result = self.channel.queue_declare(exclusive=True)
        # 服务端返回处理完毕的数据新Queue名称
        self.callback_queue = result.method.queue
        
        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)
                                   
    def on_response(self, ch, method, props, body):
        # corr_id等于刚刚发送过去的ID,就代表这条消息是我的
        if self.corr_id == props.correlation_id:
            self.response = body
            
    def call(self, n):
        self.response = None
        # 生成一个唯一ID,相当于每个任务的ID
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                       # 让服务端处理完成之后把数据放到这个Queue里面
                                       reply_to=self.callback_queue,
                                       # 加上一个任务ID
                                       correlation_id=self.corr_id,
                                   ),
                                   body=str(n))
        while self.response is None:
            # 不断地去Queue接受消息,但不是阻塞的,而是一直循环的去取
            self.connection.process_data_events()
        return int(self.response)
        
fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

#Python标准库 #Rabbitmq


--结束END--

本文标题: 3Python全栈之路系列之Rabbit

本文链接: https://lsjlt.com/news/190003.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • 3Python全栈之路系列之Rabbit
    RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件。RabbitMQ服务器是用Erlang语言编写的,它可以为你的应用提供一个通用的消息发送和接收平台,并且保证消息在传输过程中的安全,RabbitMQ官网,RabbitM...
    99+
    2023-01-31
    之路 系列之 Python
  • 3Python全栈之路系列之D
    模板是一个文本,用于分离文档的表现形式和内容,模板定义了占位符以及各种用于规范文档该如何显示的各部分基本逻辑(模板标签)。模板通常用于产生HTML,但是Django的模板也能产生任何基于文本格式的文档。如何使用模板系统在Python代码中使...
    99+
    2023-01-31
    之路 系列之 Python
  • 3Python全栈之路系列之MySQL表
    先创创建一个表用于测试-- 创建数据库 CREATE DATABASE dbname DEFAULT CHARSET utf8 COLLATE utf8_general_ci; -- 创建表 CREATE TABLE `tb` (   `i...
    99+
    2023-01-31
    之路 系列之 Python
  • 3Python全栈之路系列之基于sock
    发布时间:2017年3月16日 00:04 浏览(106) 评论(0) 分类:Python前言此处没有前言粘包在实现发送文件功能之前我们先来理解下粘包的问题,下面有两张图,我觉得很清晰的就可以理解到了。正常情况下发送文件第一步: 客户端把获...
    99+
    2023-01-31
    之路 系列之 Python
  • 3Python全栈之路系列之MySQL表内操作
    Python全栈之路系列之MySQL表内操作先创创建一个表用于测试-- 创建数据库 CREATE DATABASE dbname DEFAULT CHARS...
    99+
    2024-04-02
  • 3Python全栈之路系列之字符串数据类
    字符串(str)字符串类型是python的序列类型,他的本质就是字符序列,而且python的字符串类型是不可以改变的,你无法将原字符串进行修改,但是可以将字符串的一部分复制到新的字符串中,来达到相同的修改效果。创建字符串类型可以使用单引号或...
    99+
    2023-01-31
    之路 字符串 数据
  • 2Python全栈之路系列之SQLAchemy
    Python全栈之路系列之SQLAlchemySQLAlchemy的是Python SQL工具包和对象关系映射器,让应用程序开发者的全部功能和SQL的灵活性。它提供了一套完整的众所周知的企业级持久性模式,专...
    99+
    2024-04-02
  • Python全栈之路系列之Python
    The Python interpreter has a number of functions and types built into it that are always available. They are listed her...
    99+
    2023-01-31
    之路 系列之 Python
  • 4Python全栈之路系列之Django模型
    Python全栈之路系列之Django模型MTV开发模式把数据存取逻辑、业务逻辑和表现逻辑组合在一起的概念有时被称为软件架构的Model-View-Controller(MVC)模式。在这个模式中,Mode...
    99+
    2024-04-02
  • 8Python全栈之路系列之MySQL触发器
    Python全栈之路系列之MySQL触发器l对某个表进行增/删/改操作的前后如果希望触发某个特定的行为时,可以使用触发器,触发器用于定制用户对表的行进行增/删/改前后的行为。创建触发器基本语法插入前CREA...
    99+
    2024-04-02
  • Python全栈之路系列之文件操作
    Python可以对文件进行查看、创建等功能,可以对文件内容进行添加、修改、删除,且所使用到的函数在Python3.5.x为open,在Python2.7.x同时支持file和open,但是在3.5.x系列移除了file函数。 Python...
    99+
    2023-01-31
    之路 操作 文件
  • 6Python全栈之路系列之MySQL存储过程
    Python全栈之路系列之MySQL存储过程存储过程是一个SQL语句集合,当主动去调用存储过程时,其中内部的SQL语句会按照逻辑执行。存储过程过接收的参数参数描述in仅用于传入参数用out仅用于返回值用in...
    99+
    2024-04-02
  • Python全栈之路系列之字符串格式化
    This PEP proposes a new system for built-in string formatting operations, intended as a replacement for the existing '%...
    99+
    2023-01-31
    之路 字符串 系列之
  • Python全栈之路系列之字符串数据类
    字符串(str) 字符串类型是python的序列类型,他的本质就是字符序列,而且python的字符串类型是不可以改变的,你无法将原字符串进行修改,但是可以将字符串的一部分复制到新的字符串中,来达到相同的修改效果。 创建字符串类型可以使用单...
    99+
    2023-01-31
    之路 字符串 数据
  • 2Python全栈之路系列之MysQl基本数据类型
    Python全栈之路系列之MySQL基本数据类型MySQL中定义数据字段的类型对你数据库的优化是非常重要的。MySQL支持多种类型,大致可以分为三类:数字类型日期和时间类型字符串类型数字类型类型大小用途BI...
    99+
    2024-04-02
  • python 全栈之路
    目录 Python 全栈之路 一. Python 1. Python基础知识部分 2. Python -函数 3. ...
    99+
    2023-01-30
    之路 python
  • 3Python标准库系列之os模块
    This module provides a portable way of using operating system dependent functionality. If you just want to read or write...
    99+
    2023-01-31
    模块 标准 系列之
  • 7Python全站之路系列之MySQL视图
    Python全栈之路系列之MySQL视图视图是一个虚拟表(非真实存在),其本质是根据SQL语句获取动态的数据集,并为其命名,用户使用时只需使用名称即可获取结果集,并可以将其当作表来使用。创建视图创建一个名称...
    99+
    2024-04-02
  • 9Python全站之路系列之MySQL SL注入
    Python全栈之路系列之MySQL SQL注入SQL注入是一种代码注入技术,过去常常用于***数据驱动性的应用,比如将恶意的SQL代码注入到特定字段用于实施******等。SQL注入的成功必须借助应用程序...
    99+
    2024-04-02
  • Python全栈之队列详解
    目录1.lock互斥锁2.事件_红绿灯效果2.1信号量_semaphore2.2事件_红绿灯效果3.queue进程队列4.生产者消费者模型5.joinablequeue队列使用6.总...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作