这篇文章主要讲解了“Docker怎么启动RabbitMQ实现生产者与消费者”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Docker怎么启动RabbitMQ实现生产者与消费者”吧!一、Doc
这篇文章主要讲解了“Docker怎么启动RabbitMQ实现生产者与消费者”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Docker怎么启动RabbitMQ实现生产者与消费者”吧!
拉取镜像
docker pull rabbitmq:3.8.8-management
查看镜像
docker images rabbitmq
启动镜像
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.8.8-management
Linux虚拟机记得开放5672端口或者关闭防火墙,在window通过 主机ip:15672 访问rabbitmq控制台
用户名密码默认为guest
<!--指定 jdk 编译版本--> <build> <plugins> <plugin> <groupId>org.apache.Maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> <dependencies> <!--rabbitmq 依赖客户端--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency> <!--操作文件流的一个依赖--> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency> </dependencies>
工作原理
Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker
Connection:publisher/consumer 和 broker 之间的 tcp 连接
Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
Queue:消息最终被送到这里等待 consumer 取走
我们需要先获取连接(Connection),然后通过连接获取信道(Channel),这里我们演示简单例子,可以直接跳过交换机(Exchange)发送队列(Queue)
public class Producer { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 设置主机ip factory.setHost("182.92.234.71"); // 设置用户名 factory.setUsername("guest"); // 设置密码 factory.setPassword("guest"); //channel 实现了自动 close 接口 自动关闭 不需要显示关闭 Connection connection = factory.newConnection(); // 获取信道 Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "hello rabbitmq"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("发送成功"); }}
public class Consumer { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 设置主机ip factory.setHost("182.92.234.71"); // 设置用户名 factory.setUsername("guest"); // 设置密码 factory.setPassword("guest"); //channel 实现了自动 close 接口 自动关闭 不需要显示关闭 Connection connection = factory.newConnection(); // 获取信道 Channel channel = connection.createChannel(); // 推送的消息如何进行消费的回调接口 DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println(new String(message.getBody())); }; // 取消消费的一个回调接口,如在消费的时候队列被删除了 CancelCallback cancelCallback = (consumerTag) -> { System.out.println("消息消费被中断"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); }}
可以发现,上面获取连接工厂,然后获取连接,再获取信道的步骤是一致的,我们可以抽取成一个工具类来调用,并使用单例模式-饿汉式完成信道的初始化
public class RabbitMqUtils { private static Channel channel; static { ConnectionFactory factory = new ConnectionFactory(); // 设置ip地址 factory.setHost("192.168.23.100"); // 设置用户名 factory.setUsername("guest"); // 设置密码 factory.setPassword("guest"); try { // 创建连接 Connection connection = factory.newConnection(); // 获取信道 channel = connection.createChannel(); } catch (Exception e) { System.out.println("创建信道失败,错误信息:" + e.getMessage()); } } public static Channel getChannel() { return channel; }}
相当于前面的消费者,我们只需要写一个类,通过ideal实现多线程启动即可模拟两个线程
public class Worker01 { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = ( consumerTag, message) -> { System.out.println("接受到消息:" + new String(message.getBody())); }; CancelCallback cancelCallback = (cunsumerTag) -> { System.out.println("消费者取消消费接口回调逻辑"); }; // 启动两次,第一次为C1, 第二次为C2 System.out.println("C2消费者等待消费消息"); channel.basicConsume(QUEUE_NAME, true, deliverCallback,cancelCallback); }}
public class Test01 { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException { Channel channel = RabbitMqUtils.getChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 通过控制台输入充当消息,使轮训演示更明显 Scanner scanner = new Scanner(System.in); while(scanner.hasNext()) { String message = scanner.next(); channel.basicPublish("", QUEUE_NAME,null, message.getBytes() ); System.out.println("消息发送完成:" + message); } }}
结果
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成 了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消 息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续 发送给该消费这的消息,因为它无法接收到。 为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是: 消费者在接 收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。
自动应答:消费者发送后立即被认为已经传送成功。这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了。
当然另一方面这种模式消费者那边可以传递过载的消息, 没有对传递的消息数量进行限制 , 当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终 使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并 以某种速率能够处理这些消息的情况下使用 。
手动应答:消费者接受到消息并顺利完成业务后再调用方法进行确认,rabbitmq 才可以把该消息删除
Channel.basicAck(用于肯定确认)
RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
Channel.basicNack(用于否定确认)
Channel.basicReject(用于否定确认)
与 Channel.basicNack 相比少一个参数Multiple
multiple 的 true 和 false 代表不同意思
true 代表批量应答 channel 上未应答的消息
比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时
5-8 的这些还未应答的消息都会被确认收到消息应答
false 同上面相比
只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答
不处理该消息了直接拒绝,可以将其丢弃了
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确 保不会丢失任何消息。
public class Test01 { private final static String QUEUE_NAME = "ack"; public static void main(String[] args) throws IOException { Channel channel = RabbitMqUtils.getChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); Scanner scanner = new Scanner(System.in); while(scanner.hasNext()) { String message = scanner.next(); channel.basicPublish("", QUEUE_NAME,null, message.getBytes() ); System.out.println("消息发送完成:" + message); } }}
public class SleepUtils { public static void sleep(int second) { try { Thread.sleep(1000 * second); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } }}
public class Worker01 { private final static String QUEUE_NAME = "ack"; public static void main(String[] args) throws Exception { System.out.println("C1,业务时间短"); Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = ( consumerTag, message) -> { SleepUtils.sleep(1); // 模拟业务执行1秒 System.out.println("接受到消息:" + new String(message.getBody())); channel.basicAck(message.getEnvelope().getDeliveryTag(), false); }; CancelCallback cancelCallback = (cunsumerTag) -> { System.out.println("消费者取消消费接口回调逻辑"); }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback,cancelCallback); }} ==============================================================================public class Worker02 { private final static String QUEUE_NAME = "ack"; public static void main(String[] args) throws Exception { System.out.println("C2,业务时间长"); Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = ( consumerTag, message) -> { SleepUtils.sleep(15); // 模拟业务执行15秒 System.out.println("接受到消息:" + new String(message.getBody())); channel.basicAck(message.getEnvelope().getDeliveryTag(), false); }; CancelCallback cancelCallback = (cunsumerTag) -> { System.out.println("消费者取消消费接口回调逻辑"); }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback,cancelCallback); }}
worker01业务时间短,worker02业务时间长,我们提前终止worker02模拟出异常,可以看到消息dd会被放回队列由worker01接收处理。
注意:这里需要先启动生产者声明队列ack,不然启动消费者会报错
最后一个案例我们可以看到消息轮训+消息自动重新入队+手动应答。
感谢各位的阅读,以上就是“Docker怎么启动RabbitMQ实现生产者与消费者”的内容了,经过本文的学习后,相信大家对Docker怎么启动RabbitMQ实现生产者与消费者这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是编程网,小编将为大家推送更多相关知识点的文章,欢迎关注!
--结束END--
本文标题: Docker怎么启动RabbitMQ实现生产者与消费者
本文链接: https://lsjlt.com/news/349830.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0