返回顶部
首页 > 资讯 > 精选 >Springboot如何整合RocketMQ收发消息
  • 490
分享到

Springboot如何整合RocketMQ收发消息

2023-06-22 06:06:01 490人浏览 泡泡鱼
摘要

这篇文章将为大家详细讲解有关SpringBoot如何整合RocketMQ收发消息,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。springboot 整合 RocketMQ 收发消息创建springboot

这篇文章将为大家详细讲解有关SpringBoot如何整合RocketMQ收发消息,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。

springboot 整合 RocketMQ 收发消息

创建springboot项目

pom.xml添加rocketmq-spring-boot-starter依赖。

<dependency>    <groupId>org.apache.rocketmq</groupId>    <artifactId>rocketmq-spring-boot-starter</artifactId>    <version>2.1.0</version></dependency>

yml 配置

application.yml

rocketmq:  name-server: 192.168.64.141:9876

application-demo1.yml

使用 demo1 profile 指定生产者组组名

rocketmq:  producer:    group: producer-demo1

application-demo2.yml

使用 demo2 profile 指定生产者组组名

rocketmq:  producer:    group: producer-demo2

测试

demo 1

  • 发送普通消息

  • 发送 Spring 的通用 Message 对象

  • 发送异步消息

  • 发送顺序消息

生产者

package cn.tedu.demo2.m1;import org.apache.rocketmq.client.producer.SendCallback;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.spring.core.RocketMQtemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;@Componentpublic class Producer {    @Autowired    private RocketMQTemplate t ;    public  void send(){        //发送同步消息        t.convertAndSend("Topic1:TagA", "Hello world! ");        //发送spring的Message        Message<String> message = MessageBuilder.withPayload("Hello Spring message! ").build();        t.send("Topic1:TagA",message);        //发送异步消息        t.asyncSend("Topic1:TagA", "hello world asyn", new SendCallback() {            @Override            public void onSuccess(SendResult sendResult) {                System.out.println("发送成功");            }            @Override            public void onException(Throwable throwable) {                System.out.println("发送失败");            }        });        //发送顺序消息        t.syncSendOrderly("Topic1", "98456237,创建", "98456237");        t.syncSendOrderly("Topic1", "98456237,支付", "98456237");        t.syncSendOrderly("Topic1", "98456237,完成", "98456237");    }}

消费者

package cn.tedu.demo2.m1;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;@Component@RocketMQMessageListener(topic = "Topic1",consumerGroup = "consumer-demo1")public class Consumer  implements RocketMQListener<String> {    @Override    public void onMessage(String s) {        System.out.println("收到"+s);    }}

主类

package cn.tedu.demo2.m1;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class Main {    public static void main(String[] args) {        SpringApplication.run(Main.class, args);    }}

测试类

需要放在 test 文件夹

激活 demo1 profile  @ActiveProfiles("demo1")

package cn.tedu.demo2.m1;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.ActiveProfiles;@SpringBootTest@ActiveProfiles("demo1")public class Test1 {    @Autowired    private  Producer producer;    @Test    public void test1(){        producer.send();        try {            Thread.sleep(5000);        } catch (InterruptedException e) {            e.printStackTrace();        }    }}

demo 2

发送事务消息

生产者

package cn.tedu.demo2.m2;import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;@Componentpublic class Producer {    @Autowired    private RocketMQTemplate t;    public void send(){        Message<String> message = MessageBuilder.withPayload("Hello world").build();        //一旦发送消息,则执行监听器        t.sendMessageInTransaction("Topic2",message,null);    }    @RocketMQTransactionListener    class Lis implements RocketMQLocalTransactionListener {        @Override        public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {            System.out.println("执行本地事务");            return RocketMQLocalTransactionState.UNKNOWN;        }        @Override        public RocketMQLocalTransactionState checkLocalTransaction(Message message) {            System.out.println("执行事务回查");            return RocketMQLocalTransactionState.COMMIT;        }    }}

消费者

package cn.tedu.demo2.m2;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;@Component@RocketMQMessageListener(topic = "Topic2",consumerGroup = "consumer-demo2")public class Consumer implements RocketMQListener<String> {    @Override    public void onMessage(String s) {        System.out.println("收到"+s);    }}

主类

package cn.tedu.demo2.m2;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class Main {    public static void main(String[] args) {        SpringApplication.run(Main.class, args);    }}

测试类

package cn.tedu.demo2.m2;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.ActiveProfiles;@SpringBootTest@ActiveProfiles("demo2")public class Test2 {    @Autowired    private  Producer producer;    @Test    public void  test1(){        producer.send();        //为了能够收到消费者消费的数据,这里通过休眠模拟等待时间        try {            Thread.sleep(30000);        } catch (InterruptedException e) {            e.printStackTrace();        }    }}

关于“Springboot如何整合RocketMQ收发消息”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。

--结束END--

本文标题: Springboot如何整合RocketMQ收发消息

本文链接: https://lsjlt.com/news/303071.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • Springboot如何整合RocketMQ收发消息
    这篇文章将为大家详细讲解有关Springboot如何整合RocketMQ收发消息,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。Springboot 整合 RocketMQ 收发消息创建springboot...
    99+
    2023-06-22
  • Springboot 整合 RocketMQ 收发消息的配置过程
    Springboot 整合 RocketMQ 收发消息 创建springboot项目 pom.xml添加rocketmq-spring-boot-starter依赖。 <d...
    99+
    2024-04-02
  • SpringBoot整合RocketMQ实现消息发送和接收的详细步骤
    我们使用主流的SpringBoot框架整合RocketMQ来讲解,使用方便快捷; 最终项目结构如下: 具体步骤如下: 第一步:新建SpringBoot项目rocketmq-test...
    99+
    2024-04-02
  • Springboot中RocketMQ怎么实现消息发送与接收
    本文小编为大家详细介绍“Springboot中RocketMQ怎么实现消息发送与接收”,内容详细,步骤清晰,细节处理妥当,希望这篇“Springboot中RocketMQ怎么实现消息发送与接收”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢...
    99+
    2023-07-02
  • spring整合JMS如何实现同步收发消息
    这篇文章给大家分享的是有关spring整合JMS如何实现同步收发消息的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。1. 安装ActiveMQ注意:JDK版本需要1.7及以上才行到Apache官方网站下载最新的Ac...
    99+
    2023-05-30
    spring jms activemq
  • Springboot详解RocketMQ实现消息发送与接收流程
    springboot+rockermq 实现简单的消息发送与接收 普通消息的发送方式有3种:单向发送、同步发送和异步发送。 下面来介绍下 springboot+rockermq 整合...
    99+
    2024-04-02
  • SpringBoot怎么整合RocketMQ事务、广播以及顺序消息
    本篇内容介绍了“SpringBoot怎么整合RocketMQ事务、广播以及顺序消息”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔...
    99+
    2024-04-02
  • springboot整合消息队列RabbitMQ
    前言: RabbitMQ常用的三种Exchange Type:fanout、direct、topic。 fanout:把所有发送到该Exchange的消息投递到所有与它绑定的队列中。...
    99+
    2024-04-02
  • 如何进行SpringBoot+RabbitMQ方式收发消息
    本篇文章给大家分享的是有关如何进行SpringBoot+RabbitMQ方式收发消息,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。本篇会和SpringBoot做整合,采用自动配...
    99+
    2023-06-16
  • Spring Boot 整合RocketMq实现消息过滤功能
    目录简介根据TAG过滤消息生产者消费者测试结果根据SQL表达式过滤消息生产者消费者启动程序报错The broker does not support consumer to filt...
    99+
    2024-04-02
  • springboot整合redis之消息队列
    目录一、项目准备二、配置类三、redis中list数据类型定时器监听队列运行即监控队列四、发布/订阅模式五、ZSet实现延迟队列一、项目准备 依赖 <!-- R...
    99+
    2024-04-02
  • SpringBoot如何实现MQTT消息发送和接收
    今天小编给大家分享一下SpringBoot如何实现MQTT消息发送和接收的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。Spr...
    99+
    2023-07-05
  • 详解SpringBoot整合RabbitMQ如何实现消息确认
    目录简介生产者消息确认介绍流程配置ConfirmCallbackReturnCallback注册ConfirmCallback和ReturnCallback消费者消息确认介绍手动确认...
    99+
    2024-04-02
  • SpringBoot整合rockerMQ消息队列详解
    目录Springboot整合RockerMQ使用总结消费模式生产者组和消费者组生产者投递消息的三种方式如何保证消息不丢失顺序消息分布式事务Springboot整合RockerMQ 1...
    99+
    2024-04-02
  • SpringBoot整合RabbitMQ消息队列的完整步骤
    SpringBoot整合RabbitMQ 主要实现RabbitMQ以下三种消息队列: 简单消息队列(演示direct模式) 基于RabbitMQ特性的延时消息队列 ...
    99+
    2024-04-02
  • springboot整合企微webhook机器人发送消息提醒
    目录一、获取企业微信群机器人 Webhook 地址二、Webhook支持消息类型三、Webhook使用配置1.添加maven依赖2.配置webhook地址api3.注入Message...
    99+
    2022-12-19
    springboot webhook发送消息 springboot webhook企微机器人
  • springboot整合rocketmq如何实现分布式事务
    这篇文章给大家分享的是有关springboot整合rocketmq如何实现分布式事务的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。1 执行流程(1) 发送方向 MQ 服务端发送消息。(2) MQ Server 将...
    99+
    2023-06-15
  • 如何解决SpringBoot整合RocketMQ遇到的问题
    本篇内容主要讲解“如何解决SpringBoot整合RocketMQ遇到的问题”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“如何解决SpringBoot整合RocketMQ遇到的问题”吧!应用场景...
    99+
    2023-06-20
  • SpringBoot集成RocketMQ发送事务消息的原理解析
    目录简介原理具体实现消费者消费者生产者消息监听器消息事务测试正常测试异常测试代码调整执行结果总结简介 RocketMQ 事务消息(Transactional Message)是指应用...
    99+
    2024-04-02
  • SpringBoot整合RabbitMQ实现消息确认机制
    前面几篇案例已经将常用的交换器(DirectExchange、TopicExchange、FanoutExchange)的用法介绍完了,现在我们来看一下消息的回调,也就是消息确认。 ...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作