返回顶部
首页 > 资讯 > 后端开发 > Python >Python RabbitMQ
  • 614
分享到

Python RabbitMQ

PythonRabbitMQ 2023-01-31 02:01:51 614人浏览 安东尼

Python 官方文档:入门教程 => 点击学习

摘要

RabbitMQRabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用

RabbitMQ

RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

RabbitMQ安装

安装配置epel源
  # rpm -ivh Http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm

安装erlang
  # yum -y install erlang

安装RabbitMQ
  # yum -y install rabbitmq-server

win下安装推荐http://blog.csdn.net/a__java___a/article/details/17614797


启动/停止 service rabbitmq-server start/stop


安装api

    pip install pika

    or

    easy_install pika

    or

    源码

    https://pypi.python.org/pypi/pika


使用API操作RabbitMQ

基于Queue可以实现生产者消费者模型,而对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。

import pika

# ######################### 生产者 #########################

connection = pika.BlockinGConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='MQ1')

channel.basic_publish(exchange='',routing_key='MQ1',body='Hello World!')

print(" [x] Sent 'Hello World!'")

connection.close()

import pika

# ########################## 消费者 ##########################

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='MQ1')

def callback(ch, method, properties, body):
   print(" [x] Received %r" % body)

channel.basic_consume(callback,queue='MQ1',no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()


1. acknowledgment 消息不丢失

no-ack = False,如果生产者遇到情况(its channel is closed, connection is closed, or tcp connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。

import pika
# ########################## 消费者 ##########################

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='MQ1')

def callback(ch, method, properties, body):
   print(" [x] Received %r" % body)
   import time
   time.sleep(10)
   print('ok')
   ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,queue='MQ1',no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()


2. durable   消息不丢失

生产者消息持久化

import pika
# ######################### 生产者 #########################

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 消息持久化
channel.queue_declare(queue='MQ2', durable=True)

channel.basic_publish(exchange='',routing_key='MQ2',body='Hello World!',
                     properties=pika.BasicProperties(
                         delivery_mode=2, # 消息持久化
                     ))
print(" [x] Sent 'Hello World!'")
connection.close()

import pika
# ########################## 消费者 ##########################
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='MQ2', durable=True)  # 消息持久化

def callback(ch, method, properties, body):
   print(" [x] Received %r" % body)
   import time
   time.sleep(10)
   print('ok')
   ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,queue='MQ2',no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()


3.  消息获取顺序

默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。

如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。

channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列


import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='MQ2',durable=True)  # 消息持久化

def callback(ch, method, properties, body):
   print(" [x] Received %r" % body)
   import time
   time.sleep(10)
   print('ok')
   ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)

channel.basic_consume(callback,
                     queue='hello',
                     no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()


4. 发布订阅

发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

 exchange type = fanout

import pika,sys

##------------------------发布者-----------------

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',routing_key='',body=message)

print(" [x] Sent %r" % message)
connection.close()

import pika

##------------------------订阅者-----------------
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue    #获取随机队列名

channel.queue_bind(exchange='logs',queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
   print(" [x] %r" % body)

channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming()



5.  关键字发送

 exchange type = direct

之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

import pika

# ######################### 生产者 #########################
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',type='direct')

message = 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                     routing_key= 'mingyue',   #绑定的关键字
                     body=message)
print(" [x] Sent %r" % (message))
connection.close()

import pika
# # ########################## 消费者 ##########################
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 绑定两个不同的关键字
channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key= 'shuoming')
channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key= 'mingyue')

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
   print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,queue=queue_name,no_ack=True)

channel.start_consuming()

6.  模糊匹配

 exchange type = topic

在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

  • # 表示可以匹配 0 个 或 多个 单词

  • *  表示只能匹配 一个 单词

发送者路由值              队列中

www.Google.com          www.*  -- 不匹配

www.baidu.com           www.#  -- 匹配

import pika

# ######################### 生产者 #########################
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',type='topic')


message = 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                     routing_key= 'www.baidu.com',   #匹配模糊关键字
                     body=message)
print(" [x] Sent %r" % (message))
connection.close()

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 绑定两个不同的关键字
channel.queue_bind(exchange='topic_logs',queue=queue_name,routing_key= '
www.*')

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
   print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,queue=queue_name,no_ack=True)

channel.start_consuming()


Remote procedure call (rpc)

RPC 远程过程调用。客户端C,向服务端S请求一项服务,官网举了一个计算fibonacci值的例子。C 向S请求计算fib(x),S 则计算,算完之后发给,或者说反馈给C。C收到反馈信息之后才能想S 继续请求服务。

一个RPC流程是:

  • C 启动,然后创建一个匿名私有(exclusive=Ture)的反馈队列。

  • C 发送请求的时候,要附上reply_to(用户反馈的队列名)和correlation_id(反馈的***ID)。

  • 请求被发送到 rpc_queue 队列.

  • S 等待队列. 发现有消息到达则计算fib(x)然后通过反馈队列反馈给C.

  • C 等待反馈队列,发现有反馈信息到达,比对反馈***ID。符合,发送下一个计算请求。不符合,再等。

  • wKioL1byJDihA7kpAAA5qk8GcaI689.png

RPC Server

import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
   if n == 0:
       return 0
   elif n == 1:
       return 1
   else:
       return fib(n-1) + fib(n-2)

def on_request(ch, method, props, body):
   n = int(body)

   print(" [.] fib(%s)" % n)
   response = fib(n)

   ch.basic_publish(exchange='',
                    routing_key=props.reply_to,
                    properties=pika.BasicProperties(correlation_id = \
                                                        props.correlation_id),
                    body=str(response))
   ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')

print(" [x] Awaiting RPC requests")
channel.start_consuming()


RPC Client

import pika,uuid

class FibonacciRpcClient(object):
   def __init__(self):
       self.connection = pika.BlockingConnection(pika.ConnectionParameters(
               host='localhost'))

       self.channel = self.connection.channel()

       result = self.channel.queue_declare(exclusive=True)
       self.callback_queue = result.method.queue

       self.channel.basic_consume(self.on_response, no_ack=True,
                                  queue=self.callback_queue)

   def on_response(self, ch, method, props, body):
       if self.corr_id == props.correlation_id:
           self.response = body

   def call(self, n):
       self.response = None
       self.corr_id = str(uuid.uuid4())
       self.channel.basic_publish(exchange='',
                                  routing_key='rpc_queue',
                                  properties=pika.BasicProperties(
                                        reply_to = self.callback_queue,
                                        correlation_id = self.corr_id,
                                        ),
                                  body=str(n))
       while self.response is None:
           self.connection.process_data_events()
       return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)


--结束END--

本文标题: Python RabbitMQ

本文链接: https://lsjlt.com/news/186441.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • Python RabbitMQ
    RabbitMQRabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用...
    99+
    2023-01-31
    Python RabbitMQ
  • Python之RabbitMQ
    RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件。RabbitMQ服务器是用Erlang语言编写的,它可以为你的应用提供一个通用的消息发送和接收平台,并且保证消息在传输过程中的安全,RabbitMQ官网,RabbitM...
    99+
    2023-01-31
    Python RabbitMQ
  • python rabbitmq no_
    发送端:import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.chann...
    99+
    2023-01-31
    python rabbitmq
  • python rabbitmq send
    #!/usr/bin/env python#-*- coding: utf8 -*- import pikaimport tracebacktry:    connection = pika.BlockingConnection(pika....
    99+
    2023-01-31
    python rabbitmq send
  • python中的rabbitmq
    RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写...
    99+
    2023-01-30
    python rabbitmq
  • 【Python模块】rabbitMQ
    RabbitMQ介绍:父进程与子进程间,同一父继承可以用multiprocess的Manager模块来实现数据互访。作用:RabbitMQ是为了实现相互独立的两个进程数据互访。应用场景:不需要立即操作的数据。比如:发消息,发通知,发红包等。...
    99+
    2023-01-31
    模块 Python rabbitMQ
  • python rabbitmq 队列持久
    发送端:import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.chann...
    99+
    2023-01-31
    队列 持久 python
  • python使用pika操作rabbitmq
    python 连接操作rabbitMQ 主要是使用pika库pip3 install pika==1.1.0 官方对于pika有如下介绍Since threads aren’t a...
    99+
    2023-01-31
    操作 python pika
  • python操作rabbitmq 实践笔
    发布/订阅  系统1.基本用法生产者 1 import pika 2 import sys 3 4 username = 'wt' #指定远程rabbitmq的用户名密码 5 pwd = '111111' 6 user_pw...
    99+
    2023-01-31
    操作 python rabbitmq
  • Python介绍RabbitMQ使用篇二
    1. RabbitMQ WorkQueue基本工作模式介绍 上一篇我们使用C#语言讲解了单个消费者从消息队列中处理消息的模型,这一篇我们使用Python语言来讲解多个消费者同时工作从一个Queue处理消息的模型。 工作队列(又称:任务队...
    99+
    2023-01-31
    Python RabbitMQ
  • python测试rabbitmq的消息收
    send.py#!/usr/bin/env python    # -*- coding: UTF-8 -*-  import pika   import random            credentials = pika.Plain...
    99+
    2023-01-31
    消息 测试 python
  • RabbitMQ
    RabbitMQ概述:RabbitMQ是使用最广泛的开源消息代理。RabbitMQ轻量级,易于在集群内部和云平台中部署。它支持多种消息传递协议。 它可以满足企业高规模,高可用性的要求。RabbitMQ使用Erlang语言开发的。MQ概...
    99+
    2023-01-30
    RabbitMQ
  • 【RabbitMQ】RabbitMQ控制台的使用
    一、访问控制台页面 如果在本机上装了RabbitMQ则在浏览器访问127.0.0.1:15672,如果在服务器装了RabbitMQ则通过在浏览器输入urlip:15762来访问 登录后进入主页   二、添加RabbitMQ用户 进入主页...
    99+
    2023-09-18
    java-rabbitmq rabbitmq java
  • 利用Python学习RabbitMQ消息队列
    RabbitMQ可以当做一个消息代理,它的核心原理非常简单:即接收和发送消息,可以把它想象成一个邮局:我们把信件放入邮箱,邮递员就会把信件投递到你的收件人处,RabbitMQ就是一个邮箱、邮局、投递员功能综...
    99+
    2022-06-04
    队列 消息 Python
  • 【RabbitMQ】什么是RabbitMQ?RabbitMQ有什么用?应用场景有那些?
    目录 一、什么是RabbitMQ? 二、RabbitMQ是干什么的? 三、RabbitMQ的常见作用有那些? 四、RabbitMQ的应用场景有那些? 场景一:用户订单,库存处理。【服务间解耦】 场景二:用户注册,发送手机短信,邮件。【实现异...
    99+
    2023-08-31
    rabbitmq java 中间件 MQ
  • RabbitMQ---Spring AMQP
    Spring AMQP 1. 简介 Spring有很多不同的项目,其中就有对AMQP的支持: Spring AMQP的页面:http://spring.io/projects/spring-amqp 注意这里一段描述: Spring-a...
    99+
    2023-08-30
    java-rabbitmq rabbitmq spring 中间件 后端
  • python操作RabbitMq的三种工作模式
    目录一、简介:二、RabbitMq 生产和消费三、RabbitMq 持久化四、RabbitMq 发布与订阅模式一:fanout模式二:direct模式三:topicd一、简介: Ra...
    99+
    2024-04-02
  • python对RabbitMQ的简单入门使用教程
    目录(一)RabbitMQ的简介(二)RabbitMQ的安装(三)python操作RabbitMQ(四)RabbitMQ简单模式(五)RabbitMQ发布订阅模式(六)RabbitM...
    99+
    2024-04-02
  • laravel 使用rabbitmq
    composer.json加上:"php-amqplib/php-amqplib": "^2.12",然后执行composer updatexxie 消费者: $mqConfig = GlobalConfig::get('event.mqC...
    99+
    2023-09-27
    php
  • 1.RabbitMQ介绍
    一、MQ是什么?为什么使用它 MQ(Message Queue,简称MQ)被称为消息队列。 是一种用于在应用程序之间传递消息的通信方式。它是一种异步通信模式,允许不同的应用程序、服务或组件之间通过将消息放入队列中来进行通信。这些消息可以包含...
    99+
    2023-08-30
    rabbitmq 分布式
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作