返回顶部
首页 > 资讯 > 精选 >SpringBoot整合RabbitMQ,怎么实现生产者与消费者的功能
  • 449
分享到

SpringBoot整合RabbitMQ,怎么实现生产者与消费者的功能

2023-06-14 05:06:38 449人浏览 泡泡鱼
摘要

这篇文章主要介绍“SpringBoot整合RabbitMQ,怎么实现生产者与消费者的功能”,在日常操作中,相信很多人在springBoot整合RabbitMQ,怎么实现生产者与消费者的功能问题上存在疑惑,小编查阅了各式资料,整理出简单好用的

这篇文章主要介绍“SpringBoot整合RabbitMQ,怎么实现生产者与消费者的功能”,在日常操作中,相信很多人在springBoot整合RabbitMQ,怎么实现生产者与消费者的功能问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”SpringBoot整合RabbitMQ,怎么实现生产者与消费者的功能”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

自然,依赖是少不了的。除了spring-boot-starter-WEB依赖外。
就这个是最主要的依赖了,其他的看着办就是了。我用的是gradle,用Maven的看着弄也一样的。无非就是包+包名+版本

//AMQPcompile('org.springframework.boot:spring-boot-starter-amqp:2.0.4.RELEASE')

这里有一个坑。导致我后来发送消息时一直连不上去。报错: java.net.SocketException: socket closed。
我去网上寻找了许多方案。大致都是一个意思。没有设置远程连接权限。让我添加一个用户,并且设置最大权限。
 
下面是添加rabbitmq用户的命令

#rabbitmqctl add_user 账号 密码rabbitmqctl add_user admin 614#分配用户标签(admin为要赋予administrator权限的刚创建的那个账号的名字)rabbitmqctl set_user_tags admin administrator#设置权限,开启远程访问rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

我用完之后去管控台(Http://ip:15672)看了一下用户列表。确实已经添加上去了,也是最大权限。
然鹅并没有什么卵用
后来强行摸索出来了,原来是版本差异的原因。我SpringBoot本来是使用的是2.0.3版本,然后AMQP我使用的是2.0.4。可能有什么不兼容的地方。
把Springboot和AMQP的版本给同步成一个就好了。别的版本差一点根本没啥问题,就AMQP特殊,也是醉了。
 
 
使用SpriongBoot的yml配置:重点是rabbitmq那一栏
设置好登录用户、密码、地址端口、虚拟地址、超时时间就可以了

server: port: 8080 servlet: context-path: /spring: http: encoding:  charset: UTF-8 jackson:  #前端页面传Date值时格式化  date-fORMat: yyyy-MM-dd HH:mm:ss  time-zone: GMT+8 datasource:  driver-class-name: com.mysql.cj.jdbc.Driver  url: jdbc:Mysql://192.168.194.128:3306/mysql?serverTimezone=Asia/Shanghai  username: root  passWord: 614 rabbitmq: port: 5672 host: 192.168.194.128 username: admin password: 614 virtual-host: / connection-timeout: 15s#Redis配置 Redis: host: 192.168.194.128 port: 6379 #Redis连接池配置 jedis:  pool:  min-idle: 0  max-idle: 8  max-active: 8  max-wait: -1ms

这里又有个小坑,这个rabbitmq的超时时间(connection-timeout)配的我真的是醉了,我看的教程里写的是15000,表示15秒,我一输之后idea直接报红线啊。
网上一找,全特么用毫秒值配的,行吧,应该我们用的不是一个版本。
点开看下这参数接受一个java.time.Duration对象,百思不得其解。这玩意咋配?我不会啊。找了二十分钟的攻略才知道是这样子配的,使用数字+时间标志。比如1h、1M、1m、1d、1s、1ms这种格式就行了。


咳咳,配置文件弄好后也就差不多可以使用rabbitmq发消息了。
生产端发消息。只需要使用 RabbitTemplate 类就够了,看到这个名字,有没有一种很熟悉的感觉?
Redis也有个这玩意 叫 RedisTemplate
 
关于发消息,在这儿最好还是先指定好exchange和routingKey,即交换机和路由键。
这样发过去的消息才能被发到指定的交换机上,然后交换机在通过你的routingKey来发送给绑定了该routingKey的所有队列。
所以首先登陆管控台(http://ip:15672),到Exchanges和Queues菜单下,创建好交换机和队列,还有他们之间的routingKey。这个步骤我就不详细描述了。单靠语言不怎么能够描述清楚。估计得配很多图,有需要的自行Google把。
 
万事俱备。正式开始发送消息。
先准备一个要发的玩意。根据业务需求自己创个model就行。我这随便写一个。
关于这个messageId,及消息唯一ID。他的作用是将该条消息数据和RabbitMQ发送的消息绑定起来。不要也不是不行。只是最好还是设置一个这个参数。

package com.skypyb.rabbitmq.entity;import java.io.Serializable;public class User1 implements Serializable{ private Long id; private String name; private String messageId;//储存消息发送的唯一标识 public User1() { } public User1(Long id, String name, String messageId) {  this.id = id;  this.name = name;  this.messageId = messageId; } public Long getId() {  return id; } public void setId(Long id) {  this.id = id; } public String getName() {  return name; } public void setName(String name) {  this.name = name; } public String getMessageId() {  return messageId; } public void setMessageId(String messageId) {  this.messageId = messageId; }}

要发送的数据模型已经准备好,接下来这个类是一个重点。即发送消息的类。
注入RabbbitTemplate,然后就可以通过他的 convertSendAndReceive() 方法进行消息的发送。
他有很多种重载,最好是选用我这种,比较可控。交换机、路由键、消息唯一ID全部指定好。

package com.skypyb.rabbitmq.producer;import com.skypyb.rabbitmq.entity.User1;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.rabbit.support.CorrelationData;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;@Component("user1Sender")public class User1Sender {  @Autowired  private RabbitTemplate rabbitTemplate;//操作rabbitmq的模板  public void send(User1 user1){    CorrelationData correlationData= new CorrelationData();    correlationData.setId(user1.getMessageId());    rabbitTemplate.convertSendAndReceive(        "user1-exchange",//exchange        "user1.key1",//routingKey        user1,//消息体内容        correlationData//消息唯一ID    );  }}

emmmm,是不是感觉还是挺简单的。一个方法调用,消息就过去了。就发送到指定的交换机了。交换机再通过你的routingKey转发给绑定在上边的队列。生产端这边就完事了。
 
写个测试类测试一下。

package com.skypyb.test;import com.skypyb.rabbitmq.Application;import com.skypyb.rabbitmq.entity.User1;import com.skypyb.rabbitmq.producer.User1Sender;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import javax.annotation.Resource;import java.util.UUID;@RunWith(SpringRunner.class)@SpringBootTest(classes = Application.class)public class TestOne {  @Autowired  private User1Sender user1Sender;  @Test  public void testSend1(){    User1 user1 = new User1();    user1.setId(1L);    user1.setName("测试用户1");    user1.setMessageId("user1$"+System.currentTimeMillis()+"$"+ UUID.randomUUID().toString());    user1Sender.send(user1);  }}

运行完毕后。登陆管控台(http://ip:15672),进入Queues菜单。即可发现消息队列中已接收到一条消息,会是一个等待消费的状态。
至于到底是哪个消息队列来处理嘛,那就得看你的exchange通过你的routingKey具体把消息转发到哪儿了。这个都是在管控台里边配置的。
 
生产端准备完毕。接下来是消费端。消费端也很简单,yml需要添加消费端的配置。签收模式最好选择手动签收。可控。

server: port: 8081 servlet:  context-path: /spring: http:  encoding:   charset: UTF-8 jackson:   #前端页面传Date值时格式化   date-format: yyyy-MM-dd HH:mm:ss   time-zone: GMT+8 datasource:   driver-class-name: com.mysql.cj.jdbc.Driver   url: jdbc:mysql://192.168.194.128:3306/mysql?serverTimezone=Asia/Shanghai   username: root   password: 614 #rabbitmq基本配置 rabbitmq:  addresses: 192.168.194.128:5672  username: admin  password: 614  virtual-host: /  connection-timeout: 15s #rabbitmq消费端配置  listener:   simple:    #并发数    concurrency: 5    #最大并发数    max-concurrency: 10    #签收模式:手工签收、自动签收    acknowledge-mode: manual    #限流,在此消费端同一时间只有一条消息消费    prefetch: 1#Redis配置 redis:  host: 192.168.194.128  port: 6379  #Redis连接池配置  jedis:   pool:    min-idle: 0    max-idle: 8    max-active: 8    max-wait: -1ms

具体的消费者,具体解释都写在注释中了。
 
关于@Exchange注解中设置的交换机的type属性,主要是用这些值:

  • fanout:会把所有发到Exchange的消息路由到所有和它绑定的Queue

  • direct:会把消息路由到routing key和binding key完全相同的Queue,不相同的丢弃

  • topic:direct是严格匹配,那么topic就算模糊匹配,routing key和binding key都用.来区分单词串,比如A.B.C,*匹配任意单词,#匹配任意多个或0个单词,比如。A.B.*可以匹配到A.B.C

  • headers:不依赖routing key和binding key,通过对比消息属性中的headers属性,对比Exchange和Queue绑定时指定的键值对,相同就路由过来

basicAck()方法可以确认消息消费。执行后,消息队列中这条消息就没了。multiple参数表示是否批量消费,一般都选false。

package com.skypyb.rabbitmq.controller;import com.rabbitmq.client.Channel;import com.skypyb.rabbitmq.entity.User1;import org.springframework.amqp.rabbit.annotation.*;import org.springframework.amqp.support.AmqpHeaders;import org.springframework.messaging.handler.annotation.Headers;import org.springframework.messaging.handler.annotation.Payload;import org.springframework.stereotype.Component;import java.io.IOException;import java.util.Map;@Componentpublic class User1Receiver {      @RabbitListener(bindings = @QueueBinding(      value = @Queue(value = "user1-queue", durable = "true"),      exchange = @Exchange(name = "user1-exchange", durable = "true", type = "topic"),      key = "user1.#"  )  )  @RabbitHandler//标识这个方法用于消费消息  public void onUser1Message(@Payload User1 user1,                @Headers Map<String, Object> headers,                Channel channel) throws IOException {    //消费者操作    System.out.println("-------收到消息辣!-----");    System.out.println("发过来的用户名为:" + user1.getName());    //basicAck()表示确认已经消费消息。通知一下mq,需要先得到 delivery tag    //delivery tag可以从消息头里边get出来    Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);    channel.basicAck(deliveryTag, false);  }}

把消费端的服务打开后,就已经在监听了。若监听的队列中已有消息,则会立即处理。直到队列中没消息为止。
若队列为空,他就不会动,这个时候我启动一下生产者那边的测试,消息一发出去,立马就被消费。非常完美。就是这个效果。

到此,关于“SpringBoot整合RabbitMQ,怎么实现生产者与消费者的功能”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注编程网网站,小编会继续努力为大家带来更多实用的文章!

--结束END--

本文标题: SpringBoot整合RabbitMQ,怎么实现生产者与消费者的功能

本文链接: https://lsjlt.com/news/267879.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • SpringBoot整合RabbitMQ, 实现生产者与消费者的功能
    自然,依赖是少不了的。除了spring-boot-starter-web依赖外。 就这个是最主要的依赖了,其他的看着办就是了。我用的是gradle,用maven的看着弄也一样的。无非...
    99+
    2024-04-02
  • SpringBoot整合RabbitMQ,怎么实现生产者与消费者的功能
    这篇文章主要介绍“SpringBoot整合RabbitMQ,怎么实现生产者与消费者的功能”,在日常操作中,相信很多人在SpringBoot整合RabbitMQ,怎么实现生产者与消费者的功能问题上存在疑惑,小编查阅了各式资料,整理出简单好用的...
    99+
    2023-06-14
  • Docker怎么启动RabbitMQ实现生产者与消费者
    这篇文章主要讲解了“Docker怎么启动RabbitMQ实现生产者与消费者”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Docker怎么启动RabbitMQ实现生产者与消费者”吧!一、Doc...
    99+
    2023-07-05
  • Golang rabbitMQ生产者和消费者怎么实现
    今天小编给大家分享一下Golang rabbitMQ生产者和消费者怎么实现的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一...
    99+
    2023-06-30
  • Golang rabbitMQ生产者消费者实现示例
    目录消费者生产者消费者 package main import ( "fmt" "github.com/streadway/amqp" ) func failOnError(er...
    99+
    2024-04-02
  • Docker启动RabbitMQ实现生产者与消费者的详细过程
    目录一、Docker拉取镜像并启动RabbitMQ二、Hello World(一)依赖导入(二)消息生产者(三)消息消费者三、实现轮训分发消息(一)抽取工具类(二)启动两个工作线程(...
    99+
    2023-02-23
    Docker启动RabbitMQ Docker 生产者与消费者
  • PHP实现生产者与消费者的案例
    这篇文章主要介绍PHP实现生产者与消费者的案例,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!PHP中使用Kafka需要RdKafka扩展,而RdKafka依赖于librdkafka,所以这两个我们都需要安装,具体安装...
    99+
    2023-06-14
  • java 中怎么实现生产者消费者
    今天就跟大家聊聊有关java 中怎么实现生产者消费者,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。生产者消费者图存储空间已满,而生产者占用着它,消费者等着生产者让出空间从而去除产品,...
    99+
    2023-06-17
  • golang生产者消费者模式怎么实现
    在Go语言中,可以使用goroutine和channel来实现生产者消费者模式。 首先,我们定义一个包含生产者和消费者的函数: fu...
    99+
    2023-10-20
    golang
  • springboot+rabbitmq如何实现指定消费者才能消费
    这篇文章将为大家详细讲解有关springboot+rabbitmq如何实现指定消费者才能消费,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。如何保证mq队列里的消息只被测试服务器上的consumer消费,避...
    99+
    2023-06-25
  • springboot+rabbitmq实现指定消费者才能消费的方法
    如何保证mq队列里的消息只被测试服务器上的consumer消费,避免本地环境误消费? 程序里有一个应用场景使用到了rabbitmq——当财务确认收到企业的打款金额后,系统会把企业订单...
    99+
    2024-04-02
  • Java实现生产者消费者问题与读者写者问题的示例分析
    这篇文章将为大家详细讲解有关Java实现生产者消费者问题与读者写者问题的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。1、生产者消费者问题生产者消费者问题是研究多线程程序时绕不开的经典问题之一,它...
    99+
    2023-05-30
    java
  • Java多线程中消费者生产者模式怎么实现
    这篇文章主要讲解了“Java多线程中消费者生产者模式怎么实现”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Java多线程中消费者生产者模式怎么实现”吧!  //主类&nb...
    99+
    2023-06-17
  • Java编程生产者消费者实现的四种方法
    目录实现生产者消费者的四种方式一、最基础的二、java.util.concurrent.lock 中的 Lock 框架三、阻塞队列BlockingQueue的实现Blockqueue...
    99+
    2024-04-02
  • C++实现简单的生产者-消费者队列详解
    本文的代码都是ChatGPT生成,我只是做了微小的调整和整合,AI提示词如下: 设计一个C++类,支持生产者-消费者模型,可以通过size函数获取剩余数量 可能第一次生成的不一定合适...
    99+
    2023-05-18
    C++实现生产者消费者队列 C++生产者消费者队列 C++队列
  • python多进程中的生产者和消费者模型怎么实现
    这篇文章主要介绍了python多进程中的生产者和消费者模型怎么实现的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇python多进程中的生产者和消费者模型怎么实现文章都会有所收获,下面我们一起来看看吧。Pytho...
    99+
    2023-07-05
  • 队列的生产者与消费者模式在PHP与MySQL中的实现方法
    随着互联网业务的快速发展,系统中处理大量任务的需求变得越来越迫切。队列是一种常见的解决方案,可以高效地处理任务。队列的生产者-消费者模式(Producer-Consumer Pattern)在PHP和MySQL中的实现方法是一种常见的解决方...
    99+
    2023-10-21
    MySQL 消费者 PHP 队列 生产者
  • Python中怎么利用多线程实现生产者消费者模式
    Python中怎么利用多线程实现生产者消费者模式,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。什么是生产者消费者模式在软件开发的过程中,经常碰到这样的场景:某些模块负责生产数据...
    99+
    2023-06-17
  • Java实现生产者消费者的两种方式分别是什么
    本篇文章给大家分享的是有关Java实现生产者消费者的两种方式分别是什么,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。   我在8年前去面试程序员的时候,一个...
    99+
    2023-06-03
  • Java多线程中消费者与生产者的关系是什么
    这篇文章将为大家详细讲解有关Java多线程中消费者与生产者的关系是什么,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。  多线程:CPU中各种任务在交替执行过程中,被称为多线程处理。其中,每个任务的一次动态...
    99+
    2023-06-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作