返回顶部
首页 > 资讯 > 服务器 >Docker启动RabbitMQ实现生产者与消费者的详细过程
  • 252
分享到

Docker启动RabbitMQ实现生产者与消费者的详细过程

Docker启动RabbitMQDocker生产者与消费者 2023-02-23 11:02:13 252人浏览 八月长安
摘要

目录一、Docker拉取镜像并启动RabbitMQ二、Hello World(一)依赖导入(二)消息生产者(三)消息消费者三、实现轮训分发消息(一)抽取工具类(二)启动两个工作线程(

一、Docker拉取镜像并启动RabbitMQ

拉取镜像

docker pull rabbitmq:3.8.8-management

查看镜像

docker images rabbitmq

 启动镜像

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.8.8-management

Linux虚拟机记得开放5672端口或者关闭防火墙,在window通过 主机ip:15672 访问rabbitmq控制台

 用户名密码默认为guest

二、Hello World

(一)依赖导入

<!--指定 jdk 编译版本-->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.Maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <!--rabbitmq 依赖客户端-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.8.0</version>
        </dependency>
        <!--操作文件流的一个依赖-->
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.6</version>
        </dependency>
    </dependencies>

(二)消息生产者

工作原理

  • Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker
  • Connection:publisher/consumer 和 broker 之间的 tcp 连接
  • Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
  • Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
  • Queue:消息最终被送到这里等待 consumer 取走

我们需要先获取连接(Connection),然后通过连接获取信道(Channel),这里我们演示简单例子,可以直接跳过交换机(Exchange)发送队列(Queue)

public class Producer {
 
    private final static String QUEUE_NAME = "hello";
 
    public static void main(String[] args) throws Exception {
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置主机ip
        factory.setHost("182.92.234.71");
        // 设置用户名
        factory.setUsername("guest");
        // 设置密码
        factory.setPassword("guest");
        //channel 实现了自动 close 接口 自动关闭 不需要显示关闭
        Connection connection = factory.newConnection();
        // 获取信道
        Channel channel = connection.createChannel();
        
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "hello rabbitmq";
        
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("发送成功");
    }
}

(三)消息消费者

public class Consumer {
 
    private static final String QUEUE_NAME = "hello";
 
    public static void main(String[] args) throws Exception {
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置主机ip
        factory.setHost("182.92.234.71");
        // 设置用户名
        factory.setUsername("guest");
        // 设置密码
        factory.setPassword("guest");
        //channel 实现了自动 close 接口 自动关闭 不需要显示关闭
        Connection connection = factory.newConnection();
        // 获取信道
        Channel channel = connection.createChannel();
 
        // 推送的消息如何进行消费的回调接口
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println(new String(message.getBody()));
        };
        // 取消消费的一个回调接口,如在消费的时候队列被删除了
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("消息消费被中断");
        };
        
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}

三、实现轮训分发消息

(一)抽取工具类

可以发现,上面获取连接工厂,然后获取连接,再获取信道的步骤是一致的,我们可以抽取成一个工具类来调用,并使用单例模式-饿汉式完成信道的初始化

public class RabbitMqUtils {
 
    private static Channel channel;
 
    static {
        ConnectionFactory factory = new ConnectionFactory();
        // 设置ip地址
        factory.setHost("192.168.23.100");
        // 设置用户名
        factory.setUsername("guest");
        // 设置密码
        factory.setPassword("guest");
        try {
            // 创建连接
            Connection connection = factory.newConnection();
            // 获取信道
            channel = connection.createChannel();
        } catch (Exception e) {
            System.out.println("创建信道失败,错误信息:" + e.getMessage());
        }
    }
 
    public static Channel getChannel() {
        return channel;
    }
}

(二)启动两个工作线程

相当于前面的消费者,我们只需要写一个类,通过ideal实现多线程启动即可模拟两个线程

public class Worker01 {
 
    private final static String QUEUE_NAME = "hello";
 
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = ( consumerTag,  message) -> {
            System.out.println("接受到消息:" + new String(message.getBody()));
        };
        CancelCallback cancelCallback = (cunsumerTag) -> {
            System.out.println("消费者取消消费接口回调逻辑");
        };
        // 启动两次,第一次为C1, 第二次为C2
        System.out.println("C2消费者等待消费消息");
        channel.basicConsume(QUEUE_NAME, true, deliverCallback,cancelCallback);
    }
}

(三)启动发送线程

public class Test01 {
 
    private final static String QUEUE_NAME = "hello";
 
    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
 
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 通过控制台输入充当消息,使轮训演示更明显
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()) {
            String message = scanner.next();
            channel.basicPublish("", QUEUE_NAME,null, message.getBytes() );
            System.out.println("消息发送完成:" + message);
        }
    }
}

结果 

四、实现手动应答

(一)消息应答概念

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成 了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消 息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续 发送给该消费这的消息,因为它无法接收到。 为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是: 消费者在接 收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。

自动应答:消费者发送后立即被认为已经传送成功。这种模式需要在高吞吐量和数据传输安全性方面做权,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了。

当然另一方面这种模式消费者那边可以传递过载的消息, 没有对传递的消息数量进行限制 , 当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终 使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并 以某种速率能够处理这些消息的情况下使用

手动应答:消费者接受到消息并顺利完成业务后再调用方法进行确认,rabbitmq 才可以把该消息删除

(二)消息应答的方法

  • Channel.basicAck(用于肯定确认)
  • RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
  • Channel.basicNack(用于否定确认)
  • Channel.basicReject(用于否定确认)
  • 与 Channel.basicNack 相比少一个参数Multiple
  • multiple 的 true 和 false 代表不同意思

        true 代表批量应答 channel 上未应答的消息
        比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时
        5-8 的这些还未应答的消息都会被确认收到消息应答
        false 同上面相比
        只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答

  • 不处理该消息了直接拒绝,可以将其丢弃了

(三)消息自动重新入队 

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确 保不会丢失任何消息。

(四)消息手动应答代码 

1、生产者

public class Test01 {
 
    private final static String QUEUE_NAME = "ack";
 
    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
 
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()) {
            String message = scanner.next();
            channel.basicPublish("", QUEUE_NAME,null, message.getBytes() );
            System.out.println("消息发送完成:" + message);
        }
    }
}

2、睡眠工具类模拟业务执行

public class SleepUtils {
 
    public static void sleep(int second) {
        try {
            Thread.sleep(1000 * second);
        } catch (InterruptedException _ignored) {
            Thread.currentThread().interrupt();
        }
    }
}

3、消费者

public class Worker01 {
 
    private final static String QUEUE_NAME = "ack";
 
    public static void main(String[] args) throws Exception {
        System.out.println("C1,业务时间短");
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = ( consumerTag,  message) -> {
            SleepUtils.sleep(1);  // 模拟业务执行1秒
            System.out.println("接受到消息:" + new String(message.getBody()));
            
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };
        CancelCallback cancelCallback = (cunsumerTag) -> {
            System.out.println("消费者取消消费接口回调逻辑");
        };
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback,cancelCallback);
    }
}
 
==============================================================================
public class Worker02 {
 
    private final static String QUEUE_NAME = "ack";
 
    public static void main(String[] args) throws Exception {
        System.out.println("C2,业务时间长");
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = ( consumerTag,  message) -> {
            SleepUtils.sleep(15);  // 模拟业务执行15秒
            System.out.println("接受到消息:" + new String(message.getBody()));
            
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };
        CancelCallback cancelCallback = (cunsumerTag) -> {
            System.out.println("消费者取消消费接口回调逻辑");
        };
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback,cancelCallback);
    }
}

worker01业务时间短,worker02业务时间长,我们提前终止worker02模拟出异常,可以看到消息dd会被放回队列由worker01接收处理。

注意:这里需要先启动生产者声明队列ack,不然启动消费者会报错

最后一个案例我们可以看到消息轮训+消息自动重新入队+手动应答。

到此这篇关于Docker启动RabbitMQ,实现生产者与消费者的文章就介绍到这了,更多相关Docker启动RabbitMQ内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

--结束END--

本文标题: Docker启动RabbitMQ实现生产者与消费者的详细过程

本文链接: https://lsjlt.com/news/197307.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • Docker启动RabbitMQ实现生产者与消费者的详细过程
    目录一、Docker拉取镜像并启动RabbitMQ二、Hello World(一)依赖导入(二)消息生产者(三)消息消费者三、实现轮训分发消息(一)抽取工具类(二)启动两个工作线程(...
    99+
    2023-02-23
    Docker启动RabbitMQ Docker 生产者与消费者
  • Docker怎么启动RabbitMQ实现生产者与消费者
    这篇文章主要讲解了“Docker怎么启动RabbitMQ实现生产者与消费者”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Docker怎么启动RabbitMQ实现生产者与消费者”吧!一、Doc...
    99+
    2023-07-05
  • Golang rabbitMQ生产者消费者实现示例
    目录消费者生产者消费者 package main import ( "fmt" "github.com/streadway/amqp" ) func failOnError(er...
    99+
    2024-04-02
  • SpringBoot整合RabbitMQ, 实现生产者与消费者的功能
    自然,依赖是少不了的。除了spring-boot-starter-web依赖外。 就这个是最主要的依赖了,其他的看着办就是了。我用的是gradle,用maven的看着弄也一样的。无非...
    99+
    2024-04-02
  • Golang rabbitMQ生产者和消费者怎么实现
    今天小编给大家分享一下Golang rabbitMQ生产者和消费者怎么实现的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一...
    99+
    2023-06-30
  • SpringBoot整合RabbitMQ,怎么实现生产者与消费者的功能
    这篇文章主要介绍“SpringBoot整合RabbitMQ,怎么实现生产者与消费者的功能”,在日常操作中,相信很多人在SpringBoot整合RabbitMQ,怎么实现生产者与消费者的功能问题上存在疑惑,小编查阅了各式资料,整理出简单好用的...
    99+
    2023-06-14
  • PHP实现生产者与消费者的案例
    这篇文章主要介绍PHP实现生产者与消费者的案例,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!PHP中使用Kafka需要RdKafka扩展,而RdKafka依赖于librdkafka,所以这两个我们都需要安装,具体安装...
    99+
    2023-06-14
  • java 中多线程生产者消费者问题详细介绍
    java 中多线程生产者消费者问题前言:一般面试喜欢问些线程的问题,较基础的问题无非就是死锁,生产者消费者问题,线程同步等等,在前面的文章有写过死锁,这里就说下多生产多消费的问题了import java.util.concurrent.lo...
    99+
    2023-05-31
    java 多线程 ava
  • java wait()/notify() 实现生产者消费者模式详解
    java wait()/notify() 实现生产者消费者模式 java中的多线程会涉及到线程间通信,常见的线程通信方式,例如共享变量、管道流等,这里我们要实现生产者消费者模式,也需...
    99+
    2024-04-02
  • C++实现简单的生产者-消费者队列详解
    本文的代码都是ChatGPT生成,我只是做了微小的调整和整合,AI提示词如下: 设计一个C++类,支持生产者-消费者模型,可以通过size函数获取剩余数量 可能第一次生成的不一定合适...
    99+
    2023-05-18
    C++实现生产者消费者队列 C++生产者消费者队列 C++队列
  • Java编程生产者消费者实现的四种方法
    目录实现生产者消费者的四种方式一、最基础的二、java.util.concurrent.lock 中的 Lock 框架三、阻塞队列BlockingQueue的实现Blockqueue...
    99+
    2024-04-02
  • Java实现生产者消费者问题与读者写者问题的示例分析
    这篇文章将为大家详细讲解有关Java实现生产者消费者问题与读者写者问题的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。1、生产者消费者问题生产者消费者问题是研究多线程程序时绕不开的经典问题之一,它...
    99+
    2023-05-30
    java
  • Java多线程中消费者生产者模式怎么实现
    这篇文章主要讲解了“Java多线程中消费者生产者模式怎么实现”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Java多线程中消费者生产者模式怎么实现”吧!  //主类&nb...
    99+
    2023-06-17
  • 详解RocketMQ中的消费者启动与消费流程分析
    目录一、简介1.1 RocketMQ 简介1.2 工作流程二、消费者启动流程2.1 实例化消费者2.2 设置NameServer和订阅topic过程2.2.1 添加tag2.2.2 ...
    99+
    2024-04-02
  • Java多线程中的生产者与消费者案例讲解
    这篇文章主要讲解了“Java多线程中的生产者与消费者案例讲解”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Java多线程中的生产者与消费者案例讲解”吧!目录前言工具知识点设计思路具体步骤总结...
    99+
    2023-06-20
  • python多进程中的生产者和消费者模型怎么实现
    这篇文章主要介绍了python多进程中的生产者和消费者模型怎么实现的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇python多进程中的生产者和消费者模型怎么实现文章都会有所收获,下面我们一起来看看吧。Pytho...
    99+
    2023-07-05
  • 队列的生产者与消费者模式在PHP与MySQL中的实现方法
    随着互联网业务的快速发展,系统中处理大量任务的需求变得越来越迫切。队列是一种常见的解决方案,可以高效地处理任务。队列的生产者-消费者模式(Producer-Consumer Pattern)在PHP和MySQL中的实现方法是一种常见的解决方...
    99+
    2023-10-21
    MySQL 消费者 PHP 队列 生产者
  • Java多线程中消费者与生产者的关系是什么
    这篇文章将为大家详细讲解有关Java多线程中消费者与生产者的关系是什么,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。  多线程:CPU中各种任务在交替执行过程中,被称为多线程处理。其中,每个任务的一次动态...
    99+
    2023-06-02
  • JAVA项目中的生产者消费者如何利用多线程实现
    今天就跟大家聊聊有关JAVA项目中的生产者消费者如何利用多线程实现,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。JAVA多线程实现生产者消费者的实例详解Product.Javapac...
    99+
    2023-05-31
    java 多线程 ava
  • Python中怎么利用多线程实现生产者消费者模式
    Python中怎么利用多线程实现生产者消费者模式,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。什么是生产者消费者模式在软件开发的过程中,经常碰到这样的场景:某些模块负责生产数据...
    99+
    2023-06-17
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作