返回顶部
首页 > 资讯 > 前端开发 > html >SpringBoot分布式事务中最大努力通知是怎样的
  • 517
分享到

SpringBoot分布式事务中最大努力通知是怎样的

2024-04-02 19:04:59 517人浏览 八月长安
摘要

今天就跟大家聊聊有关SpringBoot分布式事务中最大努力通知是怎样的,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。环境:springboot.2

今天就跟大家聊聊有关SpringBoot分布式事务中最大努力通知是怎样的,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

环境:springboot.2.4.9 + RabbitMQ3.7.4

什么是最大努力通知

这是一个充值的案例

SpringBoot分布式事务中最大努力通知是怎样的

交互流程 :

1、账户系统调用充值系统接口。

2、充值系统完成支付向账户系统发起充值结果通知 若通知失败,则充值系统按策略进行重复通知。

3、账户系统接收到充值结果通知修改充值状态。

4、账户系统未接收到通知会主动调用充值系统的接口查询充值结果。

通过上边的例子我们总结最大努力通知方案的目标 : 发起通知方通过一定的机制最大努力将业务处理结果通知到接收方。 具体包括 :

1、有一定的消息重复通知机制。 因为接收通知方可能没有接收到通知,此时要有一定的机制对消息重复通知。

2、消息校对机制。 如果尽最大努力也没有通知到接收方,或者接收方消费消息后要再次消费,此时可由接收方主动向通知方查询消息信息来满足需求。

最大努力通知与可靠消息一致性有什么不同?

1、解决方案思想不同 可靠消息一致性,发起通知方需要保证将消息发出去,并且将消息发到接收通知方,消息的可靠性关键由发起通知方来保证。  最大努力通知,发起通知方尽最大的努力将业务处理结果通知为接收通知方,但是可能消息接收不到,此时需要接收通知方主动调用发起通知方的接口查询业务处理结果,通知的可靠性关键在接收通知方。

2、两者的业务应用场景不同 可靠消息一致性关注的是交易过程的事务一致,以异步的方式完成交易。  最大努力通知关注的是交易后的通知事务,即将交易结果可靠的通知出去。

3、技术解决方向不同 可靠消息一致性要解决消息从发出到接收的一致性,即消息发出并且被接收到。  最大努力通知无法保证消息从发出到接收的一致性,只提供消息接收的可靠性机制。可靠机制是,最大努力地将消息通知给接收方,当消息无法被接收方接收时,由接收方主动查询消费。

通过RabbitMQ实现最大努力通知

关于RabbitMQ相关文章《SpringBoot RabbitMQ消息可靠发送与接收 》,《RabbitMQ消息确认机制confirm 》。

项目结构

SpringBoot分布式事务中最大努力通知是怎样的

两个子模块users-mananger(账户模块),pay-manager(支付模块)

依赖

<dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-WEB</artifactId> </dependency> <dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency>  <groupId>Mysql</groupId>  <artifactId>mysql-connector-java</artifactId>  <scope>runtime</scope> </dependency>

子模块pay-manager

配置文件

server:   port: 8080 --- spring:   rabbitmq:     host: localhost     port: 5672     username: guest     passWord: guest     virtual-host: /     publisherConfirmType: correlated     publisherReturns: true     listener:       simple:         concurrency: 5         maxConcurrency: 10         prefetch: 5         acknowledgeMode: MANUAL         retry:           enabled: true           initialInterval: 3000           maxAttempts: 3         defaultRequeueRejected: false

实体类

记录充值金额及账户信息

@Entity @Table(name = "t_pay_info") public class PayInfo implements Serializable{  @Id  private Long id;  private BigDecimal money ;  private Long accountId ; }

DAO及Service

public interface PayInfoRepository extends JpaRepository<PayInfo, Long> {  PayInfo findByOrderId(String orderId) ; }
@Service public class PayInfoService {          @Resource     private PayInfoRepository payInfoRepository ;     @Resource     private RabbitTemplate rabbitTemplate ;        // 数据保存完后发送消息(这里发送消息可以应用确认模式或事物模式)     @Transactional     public PayInfo savePayInfo(PayInfo payInfo) {         payInfo.setId(System.currentTimeMillis()) ;         PayInfo result = payInfoRepository.save(payInfo) ;         CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString().replaceAll("-", "")) ;         try {             rabbitTemplate.convertAndSend("pay-exchange", "pay.#", new ObjectMapper().writeValueAsString(payInfo), correlationData) ;         } catch (AmqpException | JSONProcessingException e) {             e.printStackTrace();         }         return result ;     }          public PayInfo queryByOrderId(String orderId) {         return payInfoRepository.findByOrderId(orderId) ;     }      }

支付完成后发送消息。

Controller接口

@RestController @RequestMapping("/payInfos") public class PayInfoController {  @Resource  private PayInfoService payInfoService ;        // 支付接口  @PostMapping("/pay")  public Object pay(@RequestBody PayInfo payInfo) {   payInfoService.savePayInfo(payInfo) ;   return "支付已提交,等待结果" ;  }       @GetMapping("/queryPay")  public Object queryPay(String orderId) {   return payInfoService.queryByOrderId(orderId) ;  }      }

子模块users-manager

应用配置

server:   port: 8081 --- spring:   rabbitmq:     host: localhost     port: 5672     username: guest     password: guest     virtual-host: /     publisherConfirmType: correlated     publisherReturns: true     listener:       simple:         concurrency: 5         maxConcurrency: 10         prefetch: 5         acknowledgeMode: MANUAL         retry:           enabled: true           initialInterval: 3000           maxAttempts: 3         defaultRequeueRejected: false

实体类

@Entity @Table(name = "t_users") public class Users {  @Id  private Long id;  private String name ;  private BigDecimal money ; }

账户信息表

@Entity @Table(name = "t_users_log") public class UsersLog {  @Id  private Long id;  private String orderId ;  // 0: 支付中,1:已支付,2:已取消  @Column(columnDefinition = "int default 0")  private Integer status = 0 ;  private BigDecimal money ;  private Date createTime ; }

账户充值记录表(去重)

DAO及Service

public interface UsersRepository extends JpaRepository<Users, Long> { } public interface UsersLogRepository extends JpaRepository<UsersLog, Long> {  UsersLog findByOrderId(String orderId) ; }

Service类

@Service public class UsersService {      @Resource     private UsersRepository usersRepository ;     @Resource     private UsersLogRepository usersLogRepository ;       @Transactional  public boolean updateMoneyAndLogStatus(Long id, String orderId) {   UsersLog usersLog = usersLogRepository.findByOrderId(orderId) ;   if (usersLog != null && 1 == usersLog.getStatus()) {    throw new RuntimeException("已支付") ;   }   Users users = usersRepository.findById(id).orElse(null) ;   if (users == null) {    throw new RuntimeException("账户不存在") ;   }   users.setMoney(users.getMoney().add(usersLog.getMoney())) ;   usersRepository.save(users) ;   usersLog.setStatus(1) ;   usersLogRepository.save(usersLog) ;   return true ;  }       @Transactional  public boolean saveLog(UsersLog usersLog) {   usersLog.setId(System.currentTimeMillis()) ;   usersLogRepository.save(usersLog) ;   return true ;  } }

消息监听

@Component public class PayMessageListener {       private static final Logger logger = LoggerFactory.getLogger(PayMessageListener.class) ;       @Resource  private  UsersService usersService ;       @SuppressWarnings("unchecked")  @RabbitListener(queues = {"pay-queue"})  @RabbitHandler  public void receive(Message message, Channel channel) {   long deliveryTag = message.getMessageProperties().getDeliveryTag() ;   byte[] buf =  null ;   try {    buf = message.getBody() ;    logger.info("接受到消息:{}", new String(buf, "UTF-8")) ;    Map<String, Object> result = new jsonMapper().readValue(buf, Map.class) ;    Long id = ((Integer) result.get("accountId")) + 0L ;    String orderId = (String) result.get("orderId") ;    usersService.updateMoneyAndLogStatus(id, orderId) ;    channel.basicAck(deliveryTag, true) ;   } catch (Exception e) {    logger.error("消息接受出现异常:{}, 异常消息:{}", e.getMessage(), new String(buf, Charset.forName("UTF-8"))) ;    e.printStackTrace() ;    try {     // 应该将这类异常的消息放入死信队列中,以便人工排查。     channel.basicReject(deliveryTag, false);    } catch (IOException e1) {     logger.error("拒绝消息重入队列异常:{}", e1.getMessage()) ;     e1.printStackTrace();    }   }  } }

Controller接口

@RestController @RequestMapping("/users") public class UsersController {          @Resource     private RestTemplate restTemplate ;     @Resource     private UsersService usersService ;          @PostMapping("/pay")     public Object pay(Long id, BigDecimal money) throws Exception {         HttpHeaders headers = new HttpHeaders() ;         headers.setContentType(MediaType.APPLICATION_JSON) ;         String orderId = UUID.randomUUID().toString().replaceAll("-", "") ;         Map<String, String> params = new HashMap<>() ;         params.put("accountId", String.valueOf(id)) ;         params.put("orderId", orderId) ;         params.put("money", money.toString()) ;                  UsersLog usersLog = new UsersLog() ;         usersLog.setCreateTime(new Date()) ;         usersLog.setOrderId(orderId);         usersLog.setMoney(money) ;         usersLog.setStatus(0) ;         usersService.saveLog(usersLog) ;         HttpEntity<String> requestEntity = new HttpEntity<String>(new ObjectMapper().writeValueAsString(params), headers) ;         return restTemplate.postForObject("http://localhost:8080/payInfos/pay", requestEntity, String.class) ;     }      }

以上是两个子模块的所有代码了

测试

初始数据

SpringBoot分布式事务中最大努力通知是怎样的

SpringBoot分布式事务中最大努力通知是怎样的

账户子模块控制台

SpringBoot分布式事务中最大努力通知是怎样的

支付子模块控制台

SpringBoot分布式事务中最大努力通知是怎样的

数据表数据

SpringBoot分布式事务中最大努力通知是怎样的

看完上述内容,你们对SpringBoot分布式事务中最大努力通知是怎样的有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注编程网html频道,感谢大家的支持。

--结束END--

本文标题: SpringBoot分布式事务中最大努力通知是怎样的

本文链接: https://lsjlt.com/news/82065.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • SpringBoot分布式事务中最大努力通知是怎样的
    今天就跟大家聊聊有关SpringBoot分布式事务中最大努力通知是怎样的,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。环境:springboot.2...
    99+
    2024-04-02
  • web开发中的分布式事务是怎样的
    web开发中的分布式事务是怎样的,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。事务(Transaction):一般是指要做的或...
    99+
    2024-04-02
  • 分布式事务的7种解决方案是怎样的
    分布式事务的7种解决方案是怎样的,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。 分布式事务最经典的七种解决方案随着...
    99+
    2024-04-02
  • 大型电商分布式架构是怎样的
    这篇文章主要介绍了大型电商分布式架构是怎样的的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇大型电商分布式架构是怎样的文章都会有所收获,下面我们一起来看看吧。1. 大型分布式网站架构概述1.1. 大型网站的特点用...
    99+
    2023-06-02
  • Mycat分布式事务两阶段提交过程是怎样的
    本篇文章为大家展示了Mycat分布式事务两阶段提交过程是怎样的,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。 两阶段提交过程可以用以下图...
    99+
    2024-04-02
  • 微服务分布式事务4种解决方案是怎么样的
    本篇文章给大家分享的是有关微服务分布式事务4种解决方案是怎么样的,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。分布式事务分布式事务是指事务的参与者,支持事务的服务器,资源服务器...
    99+
    2023-06-02
  • MySQL中的事务分析是怎样的
    MySQL中的事务分析是怎样的,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。我们都知道,计算机处理的速度非常地快,但是再快的计算机,也面临...
    99+
    2024-04-02
  • 服务器分布式架构的演进是怎样的
    本篇内容介绍了“服务器分布式架构的演进是怎样的”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!什么是分布式架构?分布式系统(distribut...
    99+
    2023-06-02
  • Storm的分布式任务调度机制是怎样的
    Storm的分布式任务调度机制是基于一个称为Nimbus的主节点来进行任务调度和协调工作。Nimbus负责接收拓扑结构、分配任务给S...
    99+
    2024-04-02
  • 微服务架构中分布式事务实现方案怎样何取舍
    提起微服务架构,不可避免的两个话题就是服务治理和分布式事务。数据库和业务模块的垂直拆分为我们带来了系统性能、稳定性和开发效率的提升的同时也引入了一些更复杂的问题,例如在数据一致性问题上,我们不再能够依赖数据库的本地事务,对于一系列的跨库写入...
    99+
    2023-06-05
  • Kafka内核中的分布式机制实现是怎样的
    Kafka内核中的分布式机制实现是怎样的,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。一个Topic中的所有数据分布式的存储在kafka集群的所有机器(broker)上,以分区...
    99+
    2023-06-04
  • 分布式锁中的数据库、缓存、Zookeeper实现是怎样的
    分布式锁中的数据库、缓存、Zookeeper实现是怎样的,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。分布式锁的几种实现方式目前几乎很多大型网站及应用都是分布式部署的,分布...
    99+
    2023-06-05
  • Java企业级应用架构设计中的分布式结构是怎样的
    本篇文章为大家展示了Java企业级应用架构设计中的分布式结构是怎样的,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。Java企业级应用架构设计中的分布式结构大致可以分为单级结构、2级结构、3级结构和N...
    99+
    2023-06-17
  • 分布式图数据库 Nebula Graph 中的集群快照实践是怎样进行的
    今天就跟大家聊聊有关分布式图数据库 Nebula Graph 中的集群快照实践是怎样进行的,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。1.1 需求...
    99+
    2024-04-02
软考高级职称资格查询
推荐阅读
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作