返回顶部
首页 > 资讯 > 前端开发 > node.js >RocketMQ事务消息是怎么保证数据的一致性
  • 952
分享到

RocketMQ事务消息是怎么保证数据的一致性

2024-04-02 19:04:59 952人浏览 泡泡鱼
摘要

这篇文章主要介绍“RocketMQ事务消息是怎么保证数据的一致性”,在日常操作中,相信很多人在RocketMQ事务消息是怎么保证数据的一致性问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望

这篇文章主要介绍“RocketMQ事务消息是怎么保证数据的一致性”,在日常操作中,相信很多人在RocketMQ事务消息是怎么保证数据的一致性问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”RocketMQ事务消息是怎么保证数据的一致性”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

前言

在面过的几家大厂中,几乎每轮的面试官(「没写错,几乎是每轮面试官」)都问了同样一个问题:你们的系统是分布式的系统吗?

答:是。

面试官:那么你们分布式的系统是如何解决分布式事务这个问题的呢?也就是如何保证数据的一致性。

答:我们的系统中通过 RocketMQ 的事务消息来保证数据的最终一致性。

面试官:那你说说它是如何来保证数据的最终一致性的?

答:分两部分来回答,第一部分先回答事务消息的实现流程,第二部分解释为什么它能保证数据的最终一致性。

事务消息的实现流程

RocketMQ事务消息是怎么保证数据的一致性

事务消息

  1. 鸿蒙官方战略合作共建——HarmonyOS技术社区

  2. 首先服务 A 发送一个半事务消息(也称 half 消息)至 MQ 中。为什么要先发送一个 half 消息呢?这是为了保证服务 A 和 MQ  之间的通信正常,如果无法正常通信,则服务 A 可以直接返回一个异常,也就不用处理后面的逻辑的了。

  3. 如果 half 消息发送成功,MQ 收到这个 half 消息后,会返回一个 success 响应给服务 A。

  4. 服务 A 接收到 MQ 返回的 success 响应后,开始处理本地的业务逻辑,并提交本地事务。

  5. 如果服务 A 本地事务提交成功,则会向 MQ 中发送 commit,表示将 half 消息提交,MQ 就会执行第 5 步操作;如果服务 A  本地事务提交失败,则直接回滚本地事务,并向 MQ 中发送 rollback,表示将之前的 half 消息进行回滚,MQ 接收到 rollback 消息后,就会将  half 消息删除。

  6. 如果 commit,则将 half 消息写入到磁盘。

  7. 如果 MQ 长时间没有接收到 commit 或者 rollback 消息,例如:服务 A 在处理本地业务时宕机了,或者发送的  commit、rollback 因为在弱网环境,数据丢失了。那么 MQ 就会在一定时间后尝试调用服务 A 提供的一个接口,通过这个接口来判断 half  消息的状态。所以服务 A 提供的接口,需要实现的业务逻辑是:通过数据库中对应数据的状态来判断,之前的 half 消息对应的业务是否执行成功。如果 MQ  从这个接口中得知 half 消息执行成功了,那么 MQ 就会将 half 消息持久化到本地磁盘,如果得知没有执行成功,那么就会将 half 消息删除。

  8. 服务 B 从 MQ 中消费到对应的消息。

  9. 服务 B 处理本地业务逻辑,然后提交本地事务。

如何保证数据的最终一致性

实现流程说完了,可能你现在有各种各样的疑惑?

Q: half 消息是个啥?

A: 它和我们正常发送的普通消息是一样的,都是存储在 MQ 中,唯一不同的是 half 在 MQ 中不会立马被消费者消费到,除非这个 half 消息被  commit 了。(至于为什么未 commit 的 half 消息无法被消费者读取到,这是因为在 MQ 内部,对于事务消息而言,在 commit  之前,会先放在一个内部队列中,只有 commit 了,才会真正将消息放在消费者能读取到的 topic 队列中)

Q: 为什么要先发送 half 消息?

A: 前面已经解释过了,主要是为了保证服务 A 和 MQ 之间是否能正常通信,如果两者之间都不能正常通信,后面还玩个锤子,直接返回异常就可以了。

Q: 如果 MQ 接收到了 half 消息,但是在返回 success 响应的时候,因为网络原因,导致服务 A 没有接收到 success  响应,这个时候是什么现象?

A: 当服务 A 发送 half 消息后,它会等待 MQ 给自己返回 success 响应,如果没有接收到,那么服务 A  也会直接结束,返回异常,不再执行后续逻辑。不执行后续逻辑,这样服务 A 也就不会提交 commit 消息给 MQ,MQ 长时间没接收到 commit  消息,那么它就会主动回调服务 A 的一个接口,服务 A 通过接口,查询本地数据后,发现这条消息对应的业务并没有正常执行,那么就告诉 MQ,这个 half  消息不能 commit,需要 rollback,MQ 知道后,就将 half 消息进行删除。

Q: 如果服务 A 本地事务执行失败了,怎么办?

A: 服务 A 本地事务执行失败后,先对自己本地事务进行回滚,然后再向 MQ 发送 rollback 操作。

Q: 服务 A 本地事务提交成功或失败后,向 MQ 发送的 commit 或者 rollback 消息,因为网络问题丢失了,又该怎么处理?

A: 和上一个问题一样,MQ 长时间没有接收到 half 消息的 commit 或者 rollback 消息,MQ 会主动回调服务 A  的接口,通过这个接口来判断自己该对这个 half 消息如何处理。

Q: 前面说的全是事务消息的实现流程,这和事务消息如何保证数据的最终一致性有什么关系呢?

A: 有关系。首先,服务 A 执行本地事务并提交和向 MQ 中发送消息这是两个写操作,然后通过 RocketMQ  的事务消息,我们保证了这两个写操作要么都执行成功,要么都执行失败。然后让其他系统,如服务 B 通过消费 MQ  中的消息,然后再去执行自己本地的事务,这样到最后,服务 A 和服务 B 这两个系统的数据状态是不是达到了一致?这就是最终一致性的含义。

如果要求服务 A 和服务 B 的数据状态,在服务 A 返回给客户端之间,这两者就达到一致,这是强一致性,RocketMQ 是没法保证强一致性的。

目前通过「可靠消息来保证数据的最终一致性」是很多大厂都采用的方案,基本都是通过 MQ  和补偿机制来保证数据的一致性。(所谓的可靠消息,就是消息不丢失,如何保证 MQ 的消息不丢失,下篇文章会写,这也是面试常考题)

Q: 服务 B 本地事务提交失败了,怎么办?

A: 如果服务 B 本地事务提交失败了,可以进行多次重试,直到成功。如果重试多次后,还是提交失败,例如此时服务 B 对应的 DB 宕机了,这个时候只要服务  B 不向 MQ 提交本次消息的 offset 即可。如果不提交 offset,那么 MQ 会在一定时间后,继续将这条消息推送给服务 B,服务 B  就可以继续执行本地事务并提交了,直到成功。这样,依旧是保证了服务 A 和服务 B 数据的最终一致性。

代码实现

使用 RokcetMQ 的事务消息主要涉及到两个部分:

如何发送半事务消息,这个可以通过「TransactionMQProducer」 类来实现。

TransactionMQProducer transactionMQProducer = new TransactionMQProducer("producerGroup"); TransactionSendResult result = transactionMQProducer.sendMessageInTransaction(msg, null); // 通过result来判断half消息是否发送成功 if(result.getSendStatus() == SendStatus.SEND_OK){     // 成功 }else{     // 失败 }

在前面我们提到了服务 A 需要提供一个接口,用来供 MQ 回调服务  A,实际上这个接口就是一个监听器:「TransactionListener」的方法。这是一个接口,提供了两个方法。

public interface TransactionListener {       // 当half消息发送成功后,我们在这里实现自己的业务逻辑,然后commit或者rollback 给MQ     LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);        // 这个方法就是供MQ回调的方法,MQ通过回调该方法来判断half消息的状态      // 可以看到,这个方法的参数是MessageExt,也就是half消息的内容,如果根据MessageExt,我们完全能在服务A中判断之前的业务是否处理成功     LocalTransactionState checkLocalTransaction(final MessageExt msg); }

实际使用时,我们需要实现该接口,例如:

public class MyTransactionListener implements TransactionListener {      @Override     public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {         try{             // 处理业务逻辑             // ....              // 业务逻辑处理成功,commit             return LocalTransactionState.COMMIT_MESSAGE;         }catch (Exception e){          }         // 业务处理失败,rollback         return LocalTransactionState.ROLLBACK_MESSAGE;     }      @Override     public LocalTransactionState checkLocalTransaction(MessageExt msg) {         return null;     } }

另外,在创建 producer 时,指定我们实现实现的监听器

TransactionMQProducer transactionMQProducer = new TransactionMQProducer("producerGroup"); transactionMQProducer.setTransactionListener(new MyTransactionListener());

到此,关于“RocketMQ事务消息是怎么保证数据的一致性”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注编程网网站,小编会继续努力为大家带来更多实用的文章!

--结束END--

本文标题: RocketMQ事务消息是怎么保证数据的一致性

本文链接: https://lsjlt.com/news/83466.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • RocketMQ事务消息是怎么保证数据的一致性
    这篇文章主要介绍“RocketMQ事务消息是怎么保证数据的一致性”,在日常操作中,相信很多人在RocketMQ事务消息是怎么保证数据的一致性问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望...
    99+
    2024-04-02
  • RocketMQ事务消息保证消息的可靠性和一致性
    这篇讲解一下rocketMq的事务消息的原理 在发送事务消息的时候,会加一个标识,表示这个消息是事务消息。broker接收到消息后,在我们之前看的代码里org.apache.rock...
    99+
    2023-05-17
    RocketMQ事务消息 RocketMQ事务消息原理 RocketMQ事务消息使用
  • rabbitmq怎么保证消息的顺序一致性
    RabbitMQ本身并不保证消息的顺序一致性。RabbitMQ是一个多线程的消息队列系统,它会根据不同的策略将消息分发给多个消费者进...
    99+
    2023-10-09
    rabbitmq
  • mq怎么保证消息的顺序一致性
    保证消息的顺序一致性是消息队列(MQ)中一个重要的问题。下面是几种常用的方法来解决这个问题:1. 单个消费者:只有一个消费者的情况下...
    99+
    2023-10-12
    mq
  • 怎么保证mq消息的顺序一致性
    要保证MQ消息的顺序一致性,可以采取以下几种方式:1. 使用单个消息队列:将所有需要保持顺序的消息发送到同一个消息队列中。这样可以确...
    99+
    2023-10-20
    mq
  • Cassandra中是怎么保证数据一致性的
    Cassandra 使用了一种称为 “分布式一致性” 的模型来保证数据一致性。在 Cassandra 中,数据被分布在多个节点上,每...
    99+
    2024-04-02
  • redis怎么保证数据一致性
    一般来说,只要你用到了缓存,不管是Redis还是memcache,就可能会涉及到数据库缓存与数据的一致性问题,这里我们以Redis为例。我们该如何保证Redis与数据库的一致性呢? So easy: (推荐...
    99+
    2017-04-27
    redis
  • mysql怎么保证数据一致性
    在MySQL中,可以采取以下几种方式来保证数据的一致性:1. 使用事务:事务可以将一系列操作单独的执行单元,要么全部成功提交,要么全...
    99+
    2023-09-15
    mysql
  • canal怎么保证数据一致性
    canal可以通过以下方式来保证数据一致性: 基于事务日志解析:canal通过解析数据库的事务日志来获取数据变更的信息。由于数据...
    99+
    2023-10-22
    canal
  • Cassandra的数据一致性怎么保证
    Cassandra使用了一种称为“最终一致性”的数据一致性模型来保证数据一致性。在这种模型下,不同节点之间的数据可能会出现短暂的不一...
    99+
    2024-05-11
    Cassandra
  • ​怎么保证Redis和数据库的一致性
    怎么保证Redis和数据库的一致性?针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。一般来说,只要你用到了缓存,不管是Redis还是memca...
    99+
    2024-04-02
  • MySQL怎么保证备份数据的一致性
    这篇文章主要讲解了“MySQL怎么保证备份数据的一致性”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“MySQL怎么保证备份数据的一致性”吧!前言为了数据安全,数据库需要定期备份,这个大家都懂...
    99+
    2023-06-30
  • ZooKeeper是如何保证数据的一致性的
    ZooKeeper通过以下方式保证数据的一致性: 原子性操作:ZooKeeper的所有写操作都是原子性的,要么成功要么失败,不会...
    99+
    2024-03-06
    ZooKeeper
  • 高并发怎么保证数据一致性
    使用redis本身的原子性操作库储存。// redis会返回操作之后的结果,这个过程是原子性的Long currStock = redisTemplate.opsForHash().increment...
    99+
    2024-04-02
  • redis和mysql数据一致性怎么保证
    为了确保 redis 和 mysql 之间的数据一致性,可以采用以下策略:1. 主从复制:利用 mysql 的复制功能,将 mysql 作为主数据库,并将数据同步到 redis 作为从数...
    99+
    2024-04-08
    mysql redis
  • redis怎么保证和数据库数据一致性
    Redis是一个内存数据库,通常用作缓存。相比于传统的磁盘数据库,Redis在性能上具有优势,但它也有可能在某些情况下出现数据不一致...
    99+
    2024-02-29
    redis 数据库
  • 怎么保证缓存和数据库的数据一致性
    本篇内容主要讲解“怎么保证缓存和数据库的数据一致性”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“怎么保证缓存和数据库的数据一致性”吧!1、错误的解决方案1.1、...
    99+
    2023-04-21
    数据库
  • Kafka中的数据一致性是如何保证的
    Kafka使用副本机制来保证数据的一致性。在Kafka中,每个消息会被复制到多个副本中,副本数量可以根据配置来指定。当消息被发送到K...
    99+
    2024-04-02
  • MySQL保证数据一致性的方式
    这篇文章主要讲解了“MySQL保证数据一致性的方式”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“MySQL保证数据一致性的方式”吧!一、MySQL事务模型A...
    99+
    2024-04-02
  • rabbitmq如何保证数据的一致性
    RabbitMQ 通过以下方式来保证数据的一致性: 事务: RabbitMQ 支持事务机制,可以将多条消息发送到队列中原子操作。...
    99+
    2023-10-26
    rabbitmq
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作