返回顶部
首页 > 资讯 > 后端开发 > Python >SpringCloud+RocketMQ实现分布式事务的实践
  • 391
分享到

SpringCloud+RocketMQ实现分布式事务的实践

2024-04-02 19:04:59 391人浏览 八月长安

Python 官方文档:入门教程 => 点击学习

摘要

目录一、RocketMQ的分布式事务结构和说明二、搭建RocketMQ三、事务场景,然后准备工程,运行代码  随着互联网公司的微服务越来越多,分布式事务已经成为了我们的经常使用的。所

  随着互联网公司的微服务越来越多,分布式事务已经成为了我们的经常使用的。所以我们来一步一步的实现基于RocketMQ的分布式事务。接下来,我们将要做的主题写出来。

  • RocketMQ的分布式事务结构和说明
  • 搭建RocketMQ步骤
  • 事务场景,然后准备工程,运行代码

一、RocketMQ的分布式事务结构和说明

  我们通过下图来了解一下RocketMQ实现分布式事务的结构。采用半消息机制实现分布式事务,半消息顾名思义,就是发送方将消息发送到MQ中的Broker端,这个消息被标记为“暂不投递”状态,这个时间订阅方是不能收到这个消息的,当发送方将提交了Commit后,这个消息才可以被订阅方收到。如果发送方提交了Rollback,那么订阅方就不会收到以该消息。

  RocketMQ采用的是最终一致性事务,因为它使用了半消息机制,订阅方可能收不到消息,但是最终状态是能保持一致的。

  我们在讲这个流程前,我们需要先看下图的红色部分,当发送方发给MQ的Commit/Rollback没有收到的时候,这个时候需要启用一个线程,从本地事务库里边查找到当前的状态,根据当前的状态来决定是否重新发送Commit/Rollback,相当于重试。

  接下来,我们说一下整个执行的过程。

发送方将半消息发送到MQ中,成功后MQ将消息保持好后将结果同步给发送方,即半消息发送成功。当MQ告诉发送方成功后,我们需要记录下发送到MQ的相关的事情,比如发送的消息内容,发送时间,发送的相关ID,比如事务ID,都要做好相应记录。当本地事务库,也可以理解为日志库,我们可以针对发生的问题进行追溯。本地库记录好事务后,发送方就可以将Commit/Rollback提交到MQ了。当MQ收到了Commit命令后,这个时候就会将该条消息发送给订阅方。收到了Rollback后,那该条消息不会投递给订阅方,消息保存3天后删除。

  整个过程就是这些,大家如果觉得有遗漏的,可以告诉我。

二、搭建RocketMQ

  我使用的是windos10,所以需要下载windows版本。

   进入到RocketMQ的官方地址:Http://rocketmq.apache.org/release_notes/release-notes-4.3.0/

   下载二进制文件压缩包。如下图:

  先解压,然后进行环境变量配置,打开菜单,直接输入环境变量。

    变量名:ROCKETMQ_HOME

    值:解压的release的路径,如D:\rocketmq-all-4.3.0-bin-release

  然后打开CMD命令,进入到解压路径中的bin目标,进行nameserver启动。


start mqnameserv.cmd

  启动后的效果为:

  再进行broker的启动,启动需要连到的nameserver为127.0.0.1:9876,打开自动创建Topic,这个时候当用到某个Topic没有的情况下会自动创建一个。


start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

  这个时候Windows的rocketmq就下载好并且启动成功了。

三、事务场景,然后准备工程,运行代码

  因为我们使用的RocketMq是使用的分布式事务是最终一致性的,柔性的,所以我们使用的场景也要考虑到。我们用的场景就是下订单充话费,用户支付了订单,那么直接给用户的进行充值。购买的订单不会立马充值到手机的,需要等一会才到账,这就是最终一致性。

  我们使用的如下代码版本号,SpringCloud的版本号要和SpringBoot保持一致,否则会出现类找不到的情况。

springCloud(Finchley.RELEASE) + SpringBoot2.0.4.RELEASE + RocketMQ4.3 +Mysql + lombok(插件)

  我们使用SprinGCloud的几个组件:

  Euerka Server :用于提供服务注册的能力和发现

  Euerka Client : 用于进行服务注册

  Feign:用于服务间的调用

  Config :用于进行配置

  接下来,我们创建需要进行配置的表。


DROP TABLE IF EXISTS `spring_cloud_config`;

CREATE TABLE `spring_cloud_config` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `akey` varchar(30) DEFAULT NULL,
  `avalue` varchar(128) DEFAULT NULL,
  `application` varchar(30) DEFAULT NULL,
  `aprofile` varchar(30) DEFAULT NULL,
  `label` varchar(30) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;



INSERT INTO `spring_cloud_config` (`id`, `akey`, `avalue`, `application`, `aprofile`, `label`)
VALUES
    (2,'name_server','127.0.0.1:9876','product-service','dev','dev'),
    (3,'name_server','127.0.0.1:9876','order-service','dev','dev'),
    (4,'order_topic','order_topic','order-service','dev','dev'),
    (5,'order_topic','order_topic','product-service','dev','dev');

  我们构建了4个模块,eureka模块用于服务注册和发现,springcloud-config将数据库创建的配置加载供其他工程使用。charge-order-service用于下充值订单,phone-charge-service用于给手机进行充值业务。

  主要的流程为,通过charge-order-service产生了订单,然后将订单和数量到phone-charge-service进行二次确认看是否还有足够的数量可以使用,如果数量不够的话直接将事务进行rollback,这样消息就不会到消费端。如果下过去的订单号已经充值过了,那么该消息将会被直接丢掉,这也是消息端的幂等设计。

  接下来,我们看一下生产者以及事务的核心代码。主要用于连接RocketMQ, 执行本地事务,在本地事务中进行二次确认,根据结果进行Commit、Rollback。当连接RocketMQ出现问题的时候可能会收到UNKNOWN,这个时候会调用checkLocalTransaction()方法,用于检查是否将消息发送给RocketMQ了。


package com.hqs.chargeorderservice.mqservice;

import com.alibaba.fastJSON.jsONObject;
import com.hqs.chargeorderservice.config.Jms;
import com.hqs.chargeorderservice.service.ProduceOrderService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.*;



@Slf4j
@Component
public class TransactionProducer {

    
    private TransactionListener transactionListener ;

    
    private TransactionMQProducer producer = null;

    
    private ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("client-transaction-msg-check-thread");
            return thread;
        }

    });

    public TransactionProducer(@Autowired Jms jms, @Autowired ProduceOrderService produceOrderService) {
        transactionListener = new TransactionListenerImpl(produceOrderService);
        // 初始化 事务生产者
        producer = new TransactionMQProducer(jms.getOrderTopic());
        // 添加服务器地址
        producer.setNamesrvAddr(jms.getNameServer());
        // 添加事务监听器
        producer.setTransactionListener(transactionListener);
        // 添加自定义线程池
        producer.setExecutorService(executorService);

        start();
    }

    public TransactionMQProducer getProducer() {
        return this.producer;
    }

    
    public void start() {
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    
    public void shutdown() {
        this.producer.shutdown();
    }
}


@Slf4j
class TransactionListenerImpl implements TransactionListener {

    @Autowired
    private ProduceOrderService produceOrderService ;

    public TransactionListenerImpl( ProduceOrderService produceOrderService) {
        this.produceOrderService = produceOrderService;
    }

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        log.info("=========本地事务开始执行=============");
        String message = new String(msg.getBody());
        JSONObject jsonObject = JSONObject.parseObject(message);
        Integer productId = jsonObject.getInteger("productId");
        Integer total = jsonObject.getInteger("total");
        Integer tradeNo = jsonObject.getInteger("tradeNo");
        int userId = Integer.parseInt(arg.toString());
        //模拟执行本地事务begin=======
        
        log.info("本地事务执行参数,用户id={},商品ID={},销售库存={}",userId,productId,total);
        int result = produceOrderService.save(userId, productId, total, tradeNo);
        //模拟执行本地事务end========
        //TODO 实际开发下面不需要我们手动返回,而是根据本地事务执行结果自动返回
        //1、二次确认消息,然后消费者可以消费
        if (result == 0) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        //2、回滚消息,Broker端会删除半消息
        if (result == 1) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        //3、Broker端会进行回查消息
        if (result == 2) {
            return LocalTransactionState.UNKNOW;
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }

    
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        log.info("==========回查接口=========");
        String key = msg.geTKEys();
        //TODO 1、必须根据key先去检查本地事务消息是否完成。
        
        // TODO 2、这里返回要么commit 要么rollback。没有必要在返回 UNKNOW
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

  我们启动一下程序。我们登录localhost:7001注册中心,看到3个服务都成功注册了。

  然后我们进行下订单调用接口,userId用户编码,productId产品号,total购买量,tradeNo交易订单号

http://localhost:9001/api/v1/order/chargeOrder?userId=1&productId=1&total=1&tradeNo=4

  charge-order-service显示出来本地执行事务的参数。

  phone-charge-service消费事务执行的日志。

  当我们在创建charge-order-service和phone-charge-service的时候一定要注意,将工程里边的application.properties变为bootstrap.yml,要不然程序启动的时候会从8888找config的配置内容。因为SpringCloud里面有个“启动上下文”,主要是用于加载远端的配置,也就是加载ConfigServer里面的配置,默认加载顺序为:加载bootstrap.*里面的配置 --> 链接configserver,加载远程配置 --> 加载application.*里面的配置。

  好了,代码也放到git地址了,https://GitHub.com/stonehqs/springcloud-rocketmq-transaction,

到此这篇关于SpringCloud+RocketMQ实现分布式事务的实践的文章就介绍到这了,更多相关SpringCloud RocketMQ分布式事务内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

--结束END--

本文标题: SpringCloud+RocketMQ实现分布式事务的实践

本文链接: https://lsjlt.com/news/155705.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • SpringCloud+RocketMQ实现分布式事务的实践
    目录一、RocketMQ的分布式事务结构和说明二、搭建RocketMQ三、事务场景,然后准备工程,运行代码  随着互联网公司的微服务越来越多,分布式事务已经成为了我们的经常使用的。所...
    99+
    2024-04-02
  • springboot整合rocketmq实现分布式事务
    目录1 执行流程2 工程2.1 pom2.2 application.yml2.3 TransactionListenerImpl2.4 SpringTransactionProdu...
    99+
    2024-04-02
  • SpringCloud微服务开发基于RocketMQ实现分布式事务管理详解
    目录消息队列实现分布式事务原理RocketMQ的事务消息代码实现基础配置发送半消息执行本地事务与回查Account-Service消费消息测试小结消息队列实现分布式事务原理 首先让我...
    99+
    2024-04-02
  • springboot整合rocketmq如何实现分布式事务
    这篇文章给大家分享的是有关springboot整合rocketmq如何实现分布式事务的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。1 执行流程(1) 发送方向 MQ 服务端发送消息。(2) MQ Server 将...
    99+
    2023-06-15
  • springboot整合shardingsphere和seata实现分布式事务的实践
    各个框架版本信息 springboot: 2.1.3springcloud: Greenwich.RELEASEseata: 1.0.0shardingsphere:4.0.1 ma...
    99+
    2024-04-02
  • SpringCloud 分布式锁的多种实现
    目录前言1. 分布式锁概述2. 基于数据库的分布式锁2.1 数据库悲观锁实现的分布式锁2.2 数据库乐观锁实现的分布式锁3.基于Redis实现的分布式锁3.1 setnx + exp...
    99+
    2024-04-02
  • Mycat 分布式事务的实现
    引言:Mycat已经成为了一个强大的开源分布式数据库中间件产品。面对企业应用的海量数据事务处理,是目前最好的开源解决方案。但是如果想让多台机器中的数据保存一致,比较常规的解决方法是引入“协调者”来统一调度所...
    99+
    2024-04-02
  • SpringCloud Alibaba - Seata 四种分布式事务解决方案(TCC、Saga)+ 实践部署(下)
    目录 一、Seata 分布式解决方案 1.1、TCC 模式 1.1.1、TCC 模式理论 对比 TCC 和 AT 模式的一致性和隔离性 TC 的工作模型 1.2.2、TCC 模式优缺点 1.2.3、TCC 模式注意事项:空回滚 1.2.4、...
    99+
    2023-10-07
    spring cloud 分布式 spring
  • 怎么在springcloud分布式系统中实现分布式锁
    本篇内容介绍了“怎么在springcloud分布式系统中实现分布式锁”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!一、简介一般来说,对数据进...
    99+
    2023-06-25
  • springcloud怎么用Redlock实现分布式锁
    这篇文章主要讲解了“springcloud怎么用Redlock实现分布式锁”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“springcloud怎么用Redlock实现分布式锁”吧!一、red...
    99+
    2023-06-25
  • springcloud如何用Redlock实现分布式锁
    目录一、redlock简介 二、怎么用java使用 redlock 三、参考资料 之前写过一篇文章《如何在springcloud分布式系统中实现分布式锁? 》,由于自己仅仅是阅读了相...
    99+
    2024-04-02
  • redis实现分布式事务的方法
    小编给大家分享一下redis实现分布式事务的方法,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!问题描述:某电商平台,首发一款新品...
    99+
    2024-04-02
  • 分布式事务实战
    转载本文需注明出处:微信公众号EAWorld,违者必究。引言:微服务倡导将复杂的单体应用拆分为若干个功能简单、松耦合的服务,这样可以降低开发难度、增强扩展性、便于敏捷开发,从而被越来越多的开发者和公司推崇运用。但系统微服务化后,一个看似简单...
    99+
    2023-06-05
  • Zookeeper怎么实现分布式事务
    Zookeeper本身并不直接支持分布式事务,但可以作为分布式事务的基础设施来帮助实现分布式事务。以下是一种可能的实现方式: 使...
    99+
    2023-10-27
    Zookeeper
  • kafka分布式事务怎么实现
    Kafka并不原生支持分布式事务。但是可以通过以下几种方法来实现分布式事务: 使用Kafka事务性API:Kafka引入了事务性...
    99+
    2024-04-02
  • fescar分布式事务实现原理实例分析
    这篇文章主要介绍了fescar分布式事务实现原理实例分析的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇fescar分布式事务实现原理实例分析文章都会有所收获,下面我们一起来看看吧。项目说明本博文所述代码为fes...
    99+
    2023-06-29
  • 详解如何在springcloud分布式系统中实现分布式锁
    目录一、简介 二、redis命令介绍 三、实现思路 四、编码实现 五、注意点 六、参考资料 最近在看分布式锁的资料,看了 Josial L的《Redis in Action》的分布式...
    99+
    2024-04-02
  • 怎么使用RocketMQ事务消息解决分布式事务
    本篇文章为大家展示了怎么使用RocketMQ事务消息解决分布式事务,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。初步认识RocketMQ的核心模块rocketmq模块rocketmq-broker:...
    99+
    2023-06-04
  • BaikalDB技术实现内幕(一)-- 分布式事务实现
    本系列文章主要介绍HTAP数据库BaikalDB的技术实现细节。 作者介绍:罗小兵,百度商业平台研发部高级研发工程师,主要负责BaikalDB事务能力,全局二级索引等方向的研发工作。 欢迎关注 Star github.com/baidu/...
    99+
    2016-05-06
    BaikalDB技术实现内幕(一)-- 分布式事务实现
  • 数据库分布式事务的实现原理!
    事务是数据库系统中非常有趣也非常重要的概念,它是数据库管理系统执行过程中的一个逻辑单元,它能够保证一个事务中的所有操作要么全部执行,要么全不执行;在 SOA 与微服务架构大行其道的今天,在分布式的多个服...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作