返回顶部
首页 > 资讯 > 后端开发 > Python >利用RabbitMQ实现RPC(pyth
  • 678
分享到

利用RabbitMQ实现RPC(pyth

RabbitMQRPCpyth 2023-01-31 07:01:11 678人浏览 泡泡鱼

Python 官方文档:入门教程 => 点击学习

摘要

    rpc——远程过程调用,通过网络调用运行在另一台计算机上的程序的函数\方法,是构建分布式程序的一种方式。RabbitMQ是一个消息队列系统,可以在程序之间收发消息。利用RabbitMQ可以实现RPC。本文所有操作都是在Centos7

    rpc——远程过程调用,通过网络调用运行在另一台计算机上的程序的函数\方法,是构建分布式程序的一种方式。RabbitMQ是一个消息队列系统,可以在程序之间收发消息。利用RabbitMQ可以实现RPC。本文所有操作都是在Centos7.3上进行的,示例代码语言为python

RabbiMQ以及pika模块安装

yum install rabbitmq-server Python-pika -y

systemctl    start rabbitmq-server

 

RPC的基本实现

RPC的服务端代码如下:

#!/usr/bin/env   python

import pika

 

connection = pika.BlockinGConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

 

def fun(n):

    return 2*n

 

def on_request(ch, method, props, body):

    n = int(body)

    response = fun(n)

    ch.basic_publish(exchange='',

        routing_key=props.reply_to,

        properties=pika.BasicProperties(correlation_id = props.correlation_id),

        body=str(response))

    ch.basic_ack(delivery_tag = method.delivery_tag)

 

channel.basic_qos(prefetch_count=1)

channel.basic_consume(on_request, queue='rpc_queue')

print(" [x] Awaiting RPC requests")

channel.start_consuming()

以上代码中,首先与RabbitMQ服务建立连接,然后定义了一个函数fun(),fun()功能很简单,输入一个数然后返回该数的两倍,这个函数就是我们要远程调用的函数。on_request()是一个回调函数,它作为参数传递给了basic_consume(),当basic_consume()在队列中消费1条消息时,on_request()就会被调用,on_request()从消息内容body中获取数字,并传给fun()进行计算,并将返回值作为消息内容发给调用方指定的接收队列,队列名称保存在变量props.reply_to中。

RPC的客户端代码如下:

#!/usr/bin/env   python

import pika

import uuid

 

class RpcClient(object):

    def __init__(self):

        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

 

        self.channel = self.connection.channel()

 

        result = self.channel.queue_declare(exclusive=True)

        self.callback_queue = result.method.queue

 

        self.channel.basic_consume(self.on_response, no_ack=True,

                                   queue=self.callback_queue)

 

    def on_response(self, ch, method, props, body):

        if self.corr_id == props.correlation_id:

            self.response = body

 

    def call(self,n):

        self.response = None

        self.corr_id = str(uuid.uuid4())

        self.channel.basic_publish(exchange='',

                                     routing_key='rpc_queue',

                                   properties=pika.BasicProperties(

                                           reply_to = self.callback_queue,

                                           correlation_id = self.corr_id,

                                         ),

                                   body=str(n))

        while self.response is None:

            self.connection.process_data_events()

        return str(self.response)

 

rpc = RpcClient()

 

print(" [x] Requesting")

response = rpc.call(2)

print(" [.] Got %r" % response)

代码开始也是连接RabbitMQ,然后开始消费消息队列callback_queue中的消息,该队列的名字通过Request的属性reply_to传递给服务端,就是在上面介绍服务端代码时提到过的props.reply_to,作用是告诉服务端把结果发到这个队列。 basic_consume()的回调函数变成了on_response(),这个函数从callback_queue的消息内容中获取返回结果。

函数call实际发起请求,把数字n发给服务端程序,当response不为空时,返回response值。

下面看运行效果,先启动服务端:

image.png

在另一个窗口中运行客户端:

image.png

成功调用了服务端的fun()并得到了正确结果(fun(2)结果为4)。

 

总结:RPC的实现过程可以用下图来表示(图片来自RabbitMQ官网):

image.png

当客户端启动时,它将创建一个callback queue用于接收服务端的返回消息Reply,名称由RabbitMQ自动生成,如上图中的amq.gen-Xa2..。同一个客户端可能会发出多个Request,这些Request的Reply都由callback queue接收,为了互相区分,就引入了correlation_id属性,每个请求的correlation_id值唯一。这样,客户端发起的Request就带由2个关键属性:reply_to告诉服务端向哪个队列返回结果;correlation_id用来区分是哪个Request的返回。

稍微复杂点的RPC

如果服务端定义了多个函数供远程调用怎么办?有两种思路,一种是利用Request的属性app_id传递函数名,另一种是把函数名通过消息内容发送给服务端。

1.我们先实现第一种,服务端代码如下:

#!/usr/bin/env   python

import pika

 

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

 

def a():

    return "a"

 

def b():

    return "b"

 

def on_request(ch, method, props, body):

    funname = props.app_id

    if funname == "a":

        response = a()

    elif funname == "b":

        response = b()

 

    ch.basic_publish(exchange='',

                     routing_key=props.reply_to,

                     properties=pika.BasicProperties(correlation_id = \

                                                           props.correlation_id),

                     body=str(response))

    ch.basic_ack(delivery_tag = method.delivery_tag)

 

channel.basic_qos(prefetch_count=1)

channel.basic_consume(on_request, queue='rpc_queue')

 

print(" [x] Awaiting RPC requests")

channel.start_consuming()

这次我们定义了2个不同函数a()和b(),分别打印不同字符串,根据接收到的app_id来决定调用哪一个。

客户端代码:

#!/usr/bin/env   python

import pika

import uuid

 

class RpcClient(object):

    def __init__(self):

        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

 

        self.channel = self.connection.channel()

 

        result = self.channel.queue_declare(exclusive=True)

        self.callback_queue = result.method.queue

 

        self.channel.basic_consume(self.on_response, no_ack=True,

                                   queue=self.callback_queue)

 

    def on_response(self, ch, method, props, body):

        if self.corr_id == props.correlation_id:

            self.response = body

 

    def call(self,name):

        self.response = None

        self.corr_id = str(uuid.uuid4())

        self.channel.basic_publish(exchange='',

                                     routing_key='rpc_queue',

                                   properties=pika.BasicProperties(

                                           reply_to = self.callback_queue,

                                           correlation_id = self.corr_id,

                                           app_id = str(name),

                                         ),

                                   body="request")

        while self.response is None:

            self.connection.process_data_events()

        return str(self.response)

 

rpc = RpcClient()

 

print(" [x] Requesting")

response = rpc.call("b")

print(" [.] Got %r" % response)

函数call()接收参数name作为被调用的远程函数的名字,通过app_id传给服务端程序,这段代码里我们选择调用服务端的函数b(),rpc.call(“b”)。

执行结果:

image.png

image.png

结果显示成功调用了函数b,如果改成rpc.call(“a”),执行结果就会变成:

image.png

2.第二种实现方法,服务端代码:

#!/usr/bin/env   python

import pika

 

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

 

def a():

    return "a"

 

def b():

    return "b"

 

def on_request(ch, method, props, body):

    funname = str(body)

    if funname == "a":

        response = a()

    elif funname == "b":

        response = b()

 

    ch.basic_publish(exchange='',

                     routing_key=props.reply_to,

                     properties=pika.BasicProperties(correlation_id = \

                                                           props.correlation_id),

                     body=str(response))

    ch.basic_ack(delivery_tag = method.delivery_tag)

 

channel.basic_qos(prefetch_count=1)

channel.basic_consume(on_request, queue='rpc_queue')

 

print(" [x] Awaiting RPC requests")

channel.start_consuming()

客户端代码:

#!/usr/bin/env   python

import pika

import uuid

 

class RpcClient(object):

    def __init__(self):

        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

 

        self.channel = self.connection.channel()

 

        result = self.channel.queue_declare(exclusive=True)

        self.callback_queue = result.method.queue

 

        self.channel.basic_consume(self.on_response, no_ack=True,

                                   queue=self.callback_queue)

 

    def on_response(self, ch, method, props, body):

        if self.corr_id == props.correlation_id:

            self.response = body

 

    def call(self,name):

        self.response = None

        self.corr_id = str(uuid.uuid4())

        self.channel.basic_publish(exchange='',

                                     routing_key='rpc_queue',

                                   properties=pika.BasicProperties(

                                           reply_to = self.callback_queue,

                                           correlation_id = self.corr_id,

                                         ),

                                   body=str(name))

        while self.response is None:

            self.connection.process_data_events()

        return str(self.response)

 

rpc = RpcClient()

 

print(" [x] Requesting")

response = rpc.call("b")

print(" [.] Got %r" % response)

与第一种实现方法的区别就是没有使用属性app_id,而是把要调用的函数名放在消息内容body中,执行结果跟第一种方法一样。

一个简单的实际应用案例

下面我们将编写一个小程序,用于收集多台KVM宿主机上的虚拟机数量和剩余可使用的资源。程序由两部分组成,运行在每台宿主机上的脚本agent.py和管理机上收集信息的脚本collect.py。从RPC的角度,agent.py是服务端,collect.py是客户端。

agent.py代码如下:

#!/usr/bin/python

import pika

import libvirt

import psutil

import JSON

import Socket

import os

import sys

from xml.dom import minidom

 

#配置RabbitMQ地址

RabbitMQServer=x.x.x.x

 

#连接libvirt,libvirt是一个虚拟机、容器管理程序。

def get_conn():

    conn = libvirt.open("qemu:///system")

    if conn == None:

        print '--Failed to open connection to   QEMU/KVM--'

        sys.exit(2)

    else:

        return conn

 

#获取虚拟机数量

def getVMcount():

    conn = get_conn()

    domainIDs = conn.listDomainsID()

    return len(domainIDs)

 

#获取分配给所有虚拟机的内存之和

def getMemoryused():

    conn = get_conn()

    domainIDs = conn.listDomainsID()

    used_mem = 0

    for id in domainIDs:

        dom = conn.lookupByID(id)

        used_mem += dom.maxMemory()/(1024*1024)

    return used_mem

 

#获取分配给所有虚拟机的vcpu之和

def getcpUused():

    conn = get_conn()

    domainIDs = conn.listDomainsID()

    used_cpu = 0

    for id in domainIDs:

        dom = conn.lookupByID(id)

        used_cpu += dom.maxVcpus()

    return used_cpu

 

#获取所有虚拟机磁盘文件大小之和

def getDiskused():

    conn = get_conn()

    domainIDs = conn.listDomainsID()

    diskused = 0

    for id in domainIDs:

        dom = conn.lookupByID(id)

        xml = dom.XMLDesc(0)

        doc = minidom.parseString(xml)

        disks = doc.getElementsByTagName('disk')

        for disk in disks:

            if disk.getAttribute('device') == 'disk':

                diskfile = disk.getElementsByTagName('source')[0].getAttribute('file')

                diskused += dom.blockInfo(diskfile,0)[0]/(1024**3)

    return diskused

 

#使agent.py进入守护进程模式

def daemonize(stdin='/dev/null',stdout='/dev/null',stderr='/dev/null'):

    try:

        pid = os.fork()

        if pid > 0:

            sys.exit(0)

    except OSError,e:

        sys.stderr.write("fork #1 failed: (%d) %s\n" % (e.errno,e.strerror))

        sys.exit(1)

    os.chdir("/")

    os.umask(0)

    os.setsid()

    try:

        pid = os.fork()

        if pid > 0:

            sys.exit(0)

    except OSError,e:

        sys.stderr.write("fork #2 failed: (%d) %s\n" % (e.errno,e.strerror))

        sys.exit(1)

    for f in sys.stdout,sys.stderr,: f.flush()

    si = file(stdin,'r')

    so = file(stdout,'a+',0)

    se = file(stderr,'a+',0)

    os.dup2(si.fileno(),sys.stdin.fileno())

    os.dup2(so.fileno(),sys.stdout.fileno())

    os.dup2(se.fileno(),sys.stderr.fileno())

 

daemonize('/dev/null','/root/kvm/agent.log','/root/kvm/agent.log')

 

#连接RabbitMQ

connection = pika.BlockingConnection(pika.ConnectionParameters(host= RabbitMQServer))

channel = connection.channel()

channel.exchange_declare(exchange='kvm',type='fanout')

result = channel.queue_declare(exclusive=True)

queue_name = result.method.queue

channel.queue_bind(exchange='kvm',queue=queue_name)

 

def on_request(ch,method,props,body):

    sys.stdout.write(body+'\n')

    sys.stdout.flush()

    mem_total = psutil.virtual_memory()[0]/(1024*1024*1024)

    cpu_total = psutil.cpu_count()

    statvfs = os.statvfs('/datapool')

    disk_total = (statvfs.f_frsize * statvfs.f_blocks)/(1024**3)

    mem_unused = mem_total - getMemoryused()

    cpu_unused = cpu_total - getCPUused()

    disk_unused = disk_total - getDiskused()

data = {

            'hostname':socket.gethostname(),#宿主机名

            'vm':getVMcount(),#虚拟机数量

            'available memory':mem_unused,#可用内存

            'available cpu':cpu_unused,#可用cpu核数

            'available disk':disk_unused#可用磁盘空间

            }

    json_str = json.dumps(data)

    ch.basic_publish(exchange='',

                     routing_key=props.reply_to,

                     properties=pika.BasicProperties(correlation_id=props.correlation_id),

                     body=json_str

                     )

    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)

channel.basic_consume(on_request,queue=queue_name)

sys.stdout.write(" [x] Awaiting RPC requests\n")

sys.stdout.flush()

channel.start_consuming()

collect.py代码如下:

#!/usr/bin/python

import pika

import uuid

import json

import datetime

 

#配置RabbitMQ地址

RabbitMQServer=x.x.x.x

class RpcClient(object):

    def __init__(self):

        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=RabbitMQServer))

        self.channel = self.connection.channel()

        self.channel.exchange_declare(exchange='kvm',type='fanout')

        result = self.channel.queue_declare(exclusive=True)

        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_responses,no_ack=True,queue=self.callback_queue)

        self.responses = []

 

    def on_responses(self,ch,method,props,body):

        if self.corr_id == props.correlation_id:

            self.responses.append(body)

 

    def call(self):

        timestamp = datetime.datetime.strftime(datetime.datetime.now(),'%Y-%m-%dT%H:%M:%SZ')

        self.corr_id = str(uuid.uuid4())

        self.channel.basic_publish(exchange='kvm',

                                     routing_key='',

                                   properties=pika.BasicProperties(

                                         reply_to = self.callback_queue,

                                       correlation_id = self.corr_id,

                                       ),

                                   body='%s: receive a request' % timestamp

                                   )

#定义超时回调函数

       def outoftime():

            self.channel.stop_consuming()

        self.connection.add_timeout(30,outoftime)

        self.channel.start_consuming()

        return self.responses

 

rpc = RpcClient()

responses = rpc.call()

for i in responses:

    response = json.loads(i)

    print(" [.] Got %r" % response)

  本文在前面演示的RPC都是只有一个服务端的情况,客户端发起请求后是用一个while循环来阻塞程序以等待返回结果的,当self.response不为None,就退出循环。

  如果在多服务端的情况下照搬过来就会出问题,实际情况中我们可能有几十台宿主机,每台上面都运行了一个agent.py,当collect.py向几十个agent.py发起请求时,收到第一个宿主机的返回结果后就会退出上述while循环,导致后续其他宿主机的返回结果被丢弃。这里我选择定义了一个超时回调函数outoftime()来替代之前的while循环,超时时间设为30秒。collect.py发起请求后阻塞30秒来等待所有宿主机的回应。如果宿主机数量特别多,可以再调大超时时间。

  脚本运行需要使用的模块pika和psutil安装过程:

yum install -y python-pip python-devel

pip install pika

wget --no-check-certificate https://pypi.python.org/packages/source/p/psutil/psutil-2.1.3.tar.gz

tar zxvf psutil-2.1.3.tar.gz

cd psutil-2.1.3/ && python setup.py install

  脚本运行效果演示:

image.png

image.png

--结束END--

本文标题: 利用RabbitMQ实现RPC(pyth

本文链接: https://lsjlt.com/news/191218.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • 利用RabbitMQ实现RPC(pyth
        RPC——远程过程调用,通过网络调用运行在另一台计算机上的程序的函数\方法,是构建分布式程序的一种方式。RabbitMQ是一个消息队列系统,可以在程序之间收发消息。利用RabbitMQ可以实现RPC。本文所有操作都是在CentOS7...
    99+
    2023-01-31
    RabbitMQ RPC pyth
  • 如何使用RabbitMQ实现RPC
    这篇文章给大家分享的是有关如何使用RabbitMQ实现RPC的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。背景知识RabbitMQRabbitMQ 是基于 AMQP 协议实现的一个消息队列(Message Que...
    99+
    2023-06-02
  • 详解如何利用PHP实现RPC
    目录1.什么是RPC2.从通信协议的层面3.从不同的开发语言和平台层面4.从调用过程来看5.常见的几种通信方式6.php实现简单的rpc1.目录结构2.rpc服务端3.rpc 客户端...
    99+
    2024-04-02
  • 详解SpringBoot中使用RabbitMQ的RPC功能
    一、RabbitMQ的RPC简介 实际业务中,有的时候我们还需要等待消费者返回结果给我们,或者是说我们需要消费者上的一个功能、一个方法或是一个接口返回给我们相应的值,而往往大型的系统...
    99+
    2024-04-02
  • C#利用RabbitMQ实现点对点消息传输
    目录消息队列模型RabbitMQ设置RabbitMQ动态库安装RabbitMQ.Client相关知识点示例效果图核心代码消息队列模型 所有 MQ 产品从模型抽象上来说都是一样的过程...
    99+
    2024-04-02
  • SpringBoot中使用RabbitMQ的RPC功能案例分析
    这篇文章主要讲解了“SpringBoot中使用RabbitMQ的RPC功能案例分析”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“SpringBoot中使用RabbitMQ的RPC功能案例分析...
    99+
    2023-06-25
  • dubbo怎么实现rpc调用
    Dubbo是一个基于Java的高性能RPC框架,可以实现远程服务的调用。以下是使用Dubbo实现RPC调用的步骤:1. 定义服务接口...
    99+
    2023-10-23
    dubbo
  • SpringBoot2中怎么利用Dubbo框架实现RPC服务远程调用
    SpringBoot2中怎么利用Dubbo框架实现RPC服务远程调用,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。一、Dubbo框架简介1、框架依赖图例说明:1...
    99+
    2023-06-02
  • C#如何利用RabbitMQ实现点对点消息传输
    这篇文章主要介绍C#如何利用RabbitMQ实现点对点消息传输,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!消息队列模型所有 MQ 产品从模型抽象上来说都是一样的过程:消费者(consumer)订阅某个队列。生产者(...
    99+
    2023-06-15
  • python 实现 RPC 通信
     例子: Python RPC Server import SimpleXMLRPCServer class MyObject:      def sayHello(self):          return "hello ZQF,...
    99+
    2023-01-31
    通信 python RPC
  • springboot+HttpInvoke如何实现RPC调用
    小编给大家分享一下springboot+HttpInvoke如何实现RPC调用,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!开始用springboot2+hession4实现RPC服务时,发现第一个服务可以调用成功,但第二...
    99+
    2023-06-29
  • 怎么用Springboot和Netty实现rpc
    这篇文章主要介绍“怎么用Springboot和Netty实现rpc”,在日常操作中,相信很多人在怎么用Springboot和Netty实现rpc问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”怎么用Spring...
    99+
    2023-06-29
  • 模仿mybatis-plus实现rpc调用
    目录正文组件的整合优化点场景:背景:步骤:正文 首先我的目标就是 为了把rpc调用进行封装,让业务人员开发的时候 快速使用 组件的整合 pom.xml 整合 <depende...
    99+
    2023-02-14
    模仿mybatis-plus rpc调用 mybatis-plus rpc
  • Golang实现简易的rpc调用
    目录开始实现两点之间的通讯(transport)实现反射调用已注册的方法总结(自我pua)RPC(Remote Procedure Call Protocol)远程过程调用协议。 一...
    99+
    2023-03-06
    Golang实现rpc调用 Golang rpc调用 Golang rpc
  • 如何利用rabbitMq的死信队列实现延时消息
    目录前言mq基本的消息模型mq死信队列的消息模型maven依赖配置普通队列和死信队列死信队列消费者发送消息测试测试成功总结前言 使用mq自带的死信去实现延时消息要注意一个坑点,就是m...
    99+
    2023-01-28
    rabbitMq死信队列 rabbitMq延时消息 rabbitMq延时队列
  • springboot+HttpInvoke 实现RPC调用的方法
    开始用springboot2+hession4实现RPC服务时,发现第一个服务可以调用成功,但第二个就一直报'<'isanunknowncode。第一个服务还是...
    99+
    2024-04-02
  • Golang如何实现简易的rpc调用
    这篇文章主要介绍“Golang如何实现简易的rpc调用”,在日常操作中,相信很多人在Golang如何实现简易的rpc调用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Golang如何实现简易的rpc调用”的疑...
    99+
    2023-07-05
  • Golang如何用RPC实现转发服务
    今天小编给大家分享一下Golang如何用RPC实现转发服务的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。首先,我们需要了解一...
    99+
    2023-07-06
  • 什么是RPC?聊聊node中怎么实现 RPC 通信
    RPC vs HTTP相同点都是两台计算机之间的网络通信。ajax是浏览器和服务器之间的通行,RPC是服务器与服务器之间的通行需要双方约定一个数据格式不同点寻址服务器不同ajax 是使用 DNS作为寻址服务获取域名所对应的ip地址,浏览器拿...
    99+
    2022-11-22
    Node.js RPC
  • Go gRPC怎么实现Simple RPC
    本篇内容介绍了“Go gRPC怎么实现Simple RPC”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!前言gRPC主要...
    99+
    2023-07-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作