1.什么是消息队列 ● 消息队列(Message Queue)是一种用于在应用程序之间传递消息的通信方式,消息队列允许应用程序异步地发送和接收消息,并且不需要直接连接到对方。 ● 消息(Message)是指在应用间传送的数据。消息可以非常简
● 消息队列(Message Queue)是一种用于在应用程序之间传递消息的通信方式,消息队列允许应用程序异步地发送和接收消息,并且不需要直接连接到对方。
● 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象
● 队列(Queue)可以说是一个数据结构,可以存储数据。先进先出
2.1应用解耦
1. 传统模式下,如果库存系统异常无法访问,会导致下单失败;或者随着公司业务拓展,物流系统也需要接入下单信息,此时订单系统还需要增加调用物流系统的接口
2. 引入消息队列,用户下单后,将消息写入消息队列,返回下单成功;库存和物流系统通过订阅下单消息,获取下单信息,库存系统根据下单信息进行库存扣减操作,物流系统根据下单信息生成物流单;即使此时库存系统无法访问,但是不会影响下单流程。当库存系统恢复后可以正常消费消息
1. 传统模式下,用户从注册到响应成功,需要先保存注册信息,再发送邮件通知,邮件发送成功后再发送短息通知,短息发送成功后才响应给用户,用户体验不好
2. 引入MQ后,保存用户信息后,短信通知和邮件通知消息写入MQ(此过程耗时比较短)。极大的缩短了响应时间。增强用户体验
3.认识一下RabbitMQ 加粗文本
一款基于AMQP(高级消息队列协议)用于软件之间通信的中间件,由Rabbit科技有限公司开发,服务器端用Erlang语言编写,支持多种客户端,如:python、Ruby、.net、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
RabbitMQ四大核心:
● 生产者
● 消费者
● 队列
● 交换机
AMQP协议是一种二进制协议,它定义了一组规则和标准,以确保消息可以在不同的应用程序和平台之间传递和解释,AMQP协议包含四个核心组件:
● 消息
● 交换机
● 队列
● 绑定
RabbitMQ:https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-23.3.4.11-1.el7.x86_64.rpm/
erlang:Https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-23.3.4.11-1.el7.x86_64.rpm/download.rpm?distro_version_id=140
// An highlighted blockvar foo = 'bar';
rpm -ivh erlang-23.3.4.11-1.el7.x86_64.rpm
yum install socat -y
rpm -ivh rabbitmq-server-3.8.16-1.el7.noarch.rpm
开机启动:chkconfig rabbitmq-server on
启动服务:/sbin/service rabbitmq-server start
查看服务状态:/sbin/service rabbitmq-server status
停止服务(选择执行):/sbin/service rabbitmq-server stop
开启WEB管理插件(先将服务关闭掉):rabbitmq-plugins enable rabbitmq_management
访问地址:http://192.168.16.128:15672/(默认端口号:15672),如果遇到访问不了,看看是否防火墙开着
关闭防火墙:systemctl stop firewalld
开机关闭防火墙:systemctl disable firewalld
查看防火墙状态:systemctl status firewalld
**Broker:**接收和分发消息的应用,RabbitMQ Server 就是 Message Broker
Virtual host:Virtual host是一个虚拟主机的概念,一个Broker中可以有多个Virtual host,每个Virtual host都有一套自己的Exchange和Queue,同一个Virtual host中的Exchange和Queue不能重名,不同的Virtual host中的Exchange和Queue名字可以一样。这样,不同的用户在访问同一个RabbitMQ Broker时,可以创建自己单独的Virtual host,然后在自己的Virtual host中创建Exchange和Queue,很好地做到了不同用户之间相互隔离的效果。
Connection:publisher/consumer和borker之间的tcp连接
**Channel:**发送消息的通道,如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程 序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客 户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发 消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
Queue:Queue是一个用来存放消息的队列,生产者发送的消息会被放到Queue中,消费者消费消息时也是从Queue中取走消息。
Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保 存到 exchange 中的查询表中,用于 message 的分发依据
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.16.0</version></dependency>
import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class Producer { //队列名 private static final String QUEUE_NAME = "hello-queue"; //交换机 private static final String EXCHANGE_NAME = "hello-exchange"; public static void main(String[] args) throws Exception { //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); //服务地址 factory.setHost("192.168.16.128"); //账号 factory.setUsername("admin"); //密码 factory.setPassWord("123456"); //端口号 factory.setPort(5672); //创建连接 Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, true, null); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, QUEUE_NAME); String message = "hello world"; channel.basicPublish(EXCHANGE_NAME, QUEUE_NAME, null, message.getBytes()); System.out.println("消息发送成功"); }}
import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class Producer { //队列名 private static final String QUEUE_NAME = "hello-queue"; //交换机 private static final String EXCHANGE_NAME = "hello-exchange"; public static void main(String[] args) throws Exception { //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); //服务地址 factory.setHost("192.168.16.128"); //账号 factory.setUsername("admin"); //密码 factory.setPassword("123456"); //端口号 factory.setPort(5672); //创建连接 Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, true, null); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, QUEUE_NAME); String message = "hello world"; channel.basicPublish(EXCHANGE_NAME, QUEUE_NAME, null, message.getBytes()); System.out.println("消息发送成功"); }}
import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.DeliverCallback;public class Consumer { //队列名 private static final String QUEUE_NAME = "hello-queue"; public static void main(String[] args) throws Exception { //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); //服务地址 factory.setHost("192.168.16.128"); //账号 factory.setUsername("admin"); //密码 factory.setPassword("123456"); //端口号 factory.setPort(5672); //创建连接 Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //接受消息回调 DeliverCallback deliverCallback = (consumerTag, message)-> { System.out.println(new String(message.getBody())); }; //取消消息回调 CancelCallback cancelCallback = consumerTag ->{ System.out.println("消费消息被中断"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); }}
码字不易,如果大家觉得文章不错有帮助的记得点赞支持哦,谢谢大家!
来源地址:https://blog.csdn.net/weixin_51702416/article/details/129846285
--结束END--
本文标题: 【快速掌握RabbitMQ到实战】
本文链接: https://lsjlt.com/news/378614.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-04-01
2024-04-03
2024-04-03
2024-01-21
2024-01-21
2024-01-21
2024-01-21
2023-12-23
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0