返回顶部
首页 > 资讯 > 后端开发 > Python >springevent事件异步处理方式(发布,监听,异步处理)
  • 710
分享到

springevent事件异步处理方式(发布,监听,异步处理)

Python 官方文档:入门教程 => 点击学习

摘要

目录spring event 事件异步处理(发布,监听,异步处理)spring事件之异步线程执行Spring 事件发布监听的多种实现异步方法解决方案总结spring event 事件

spring event 事件异步处理(发布,监听,异步处理)

// 定义事件
public class EventDemo extends ApplicationEvent {
 
    private String supplierCode;
    private String productCode;
 
    public EventDemo(Object source, String supplierCode, String productCode) {
        super(source);
        this.supplierCode = supplierCode;
        this.productCode = productCode;
    }
 
    public String getSupplierCode() {
        return supplierCode;
    }
 
    public String getProductCode() {
        return productCode;
    }
}
// 发布事件
@Component
public class EventDemoPublish {
 
    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;
 
    public void publish(String message) {
        EventDemo demo = new EventDemo(this, message);
        applicationEventPublisher.publishEvent(demo);
        System.out.println("发布事件执行结束");
    }
}
// 监听事件
@Component
public class EventDemoListener implements ApplicationListener<EventDemo> {
    @Override
    public void onApplicationEvent(EventDemo event) {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("事件监听开始...... " + "商家编码:" + event.getSupplierCode() + ",商品编码:" + event.getProductCode());
    }
}
<!--定义事件异步处理-->
 
<bean id="commonTaskExecutor"
		  class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
		<!-- 线程池维持处于Keep-alive状态的线程数量。如果设置了allowCoreThreadTimeOut为true,该值可能为0。
            并发线程数,想达到真正的并发效果,最好对应CPU的线程数及核心数 -->
		<property name="corePoolSize" value="2" />
		<!-- 最大线程池容量 -->
		<property name="maxPoolSize" value="2" />
		<!-- 超过最大线程池容量后,允许的线程队列数 -->
		<property name="queueCapacity" value="2" />
		<!-- 线程池维护线程所允许的空闲时间 .单位毫秒,默认为60s,超过这个时间后会将大于corePoolSize的线程关闭,保持corePoolSize的个数 -->
		<property name="keepAliveSeconds" value="1000" />
		<!-- 允许核心线程超时: false(默认值)不允许超时,哪怕空闲;true则使用keepAliveSeconds来控制等待超时时间,最终corePoolSize的个数可能为0 -->
		<property name="allowCoreThreadTimeOut" value="true" />
 
		<!-- 线程池对拒绝任务(无线程可用)的处理策略 -->
		<property name="rejectedExecutionHandler">
			<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
			<!-- java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中 -->
			<!-- java.util.concurrent.ThreadPoolExecutor$AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常 -->
		</property>
	</bean>
 
	<!--名字必须是applicationEventMulticaster,因为AbstractApplicationContext默认找个-->
	<bean id="applicationEventMulticaster" class="org.springframework.context.event.SimpleApplicationEventMulticaster">
		<!--注入任务执行器 这样就实现了异步调用-->
		<property name="taskExecutor" ref="commonTaskExecutor"></property>
	</bean>

spring事件之异步线程执行

Spring 不仅为我们提供了ioc , aop功能外,还在这个基础上提供了许多的功能,我们用的最多的可能就是Spring mvc了吧,但是让我们来看下spring-context包,其中包含了缓存、调度、校验功能等等

image.png

这里主要想介绍一下Spring提供的观察者模式实现(事件发布监听)及异步方法执行,这些功能也都是基于AOP实现的

Spring 事件

观察者模式大家都了解,它可以解耦各个功能,但是自己实现的话比较麻烦,Spring为我们提供了一种事件发布机制,可以按需要发布事件,之后由监听此事件的类或方法来执行各自对应的功能,代码互相不影响,以后修改订单后续的逻辑时不会影响到订单创建,有点类似于使用MQ的感觉~

比如在配置中心apollo项目中,在portal创建了app后会发送app创建事件,监听此事件的逻辑处将此消息同步到各个环境的admin sevice中,大家有兴趣可以看下相关代码

现在我们来看看具体如何使用:假设一个下单场景,订单创建成功后可能有一些后续逻辑要处理,但是和创建订单本身没有关系,此时就可以在创建订单完成后,发送一个消息,又相应部分的代码进行监听处理,避免代码耦合到一起

首先创建对应的事件

import org.springframework.context.ApplicationEvent;
 
public class CreatedOrderEvent extends ApplicationEvent {
	
    private final String orderSn;
    
    public CreatedOrderEvent(Object source, String orderSn) {
    	super(source);
        this.orderSn = orderSn;
    }
    
    public String getOrderSn() {
    	return this.orderSn;
    }
}

现在还需要一个事件发布者和监听者,创建一下

发布

import org.springframework.context.ApplicationEventPublisher;
 
private ApplicationEventPublisher applicationEventPublisher;
 
applicationEventPublisher.publishEvent(new CreatedOrderEvent(this, orderSn));

监听的多种实现

1:注解实现  @EventListener

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
 
@Slf4j
@Component
public class OrderEventListener {
    
    @EventListener
    public void orderEventListener(CreatedOrderEvent event) {
    	
    }
}

2:代码实现

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
 
@Slf4j
@Component
public class OrderEventListener implements ApplicationListener<CreatedOrderEvent> {
    
    @Override
    public void onApplicationEvent(CreatedOrderEvent event) {
    	
    }
}

简单的事件发布就完成了,其中的其他复杂逻辑由Spring替我们处理了

这里我们要注意一点:发布和监听后处理的逻辑是在一个线程中执行的,不是异步执行

异步方法

有时候我们为了提高响应速度,有些方法可以异步去执行,一般情况下我们可能是手动将方法调用提交到线程池中去执行,但是Spring 为我们提供了简化的写法,在开启了异步情况下,不用修改代码,只使用注解即可完成此功能

这时只需要在要异步执行的方法上添加@Async注解即可异步执行;@EnableAsync 启动异步线程, 如

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Component;
 
@Slf4j
@Component
@EnableAsync
public class OrderEventListener {
    @Async
    @EventListener
    public void orderEventListener(CreatedOrderEvent event) {
    	
    }
}

在使用@Async会有一些问题建议看各位看下相关文档及源码

我们通过Spring事件同步线程改为异步线程,默认的线程池是不复用线程

我觉得这是这个注解最坑的地方,没有之一!我们来看看它默认使用的线程池是哪个,在前文的源码分析中,我们可以看到决定要使用线程池的方法是

org.springframework.aop.interceptor.AsyncExecutionAspectSupport#determineAsyncExecutor

其源码如下:

protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
    AsyncTaskExecutor executor = this.executors.get(method);
    if (executor == null) {
        Executor targetExecutor;
        // 可以在@Async注解中配置线程池的名字
        String qualifier = getExecutorQualifier(method);
        if (StringUtils.hasLength(qualifier)) {
            targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
        }
        else {
            // 获取默认的线程池
            targetExecutor = this.defaultExecutor.get();
        }
        if (targetExecutor == null) {
            return null;
        }
        executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
                    (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
        this.executors.put(method, executor);
    }
    return executor;
}

最终会调用到

org.springframework.aop.interceptor.AsyncExecutionInterceptor#getDefaultExecutor

这个方法中

protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
   Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
   return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}

可以看到,它默认使用的线程池是SimpleAsyncTaskExecutor。我们不看这个类的源码,只看它上面的文档注释,如下:

image-20200720160047340

主要说了三点

  • 为每个任务新起一个线程
  • 默认线程数不做限制
  • 不复用线程

就这三点,你还敢用吗?只要你的任务耗时长一点,说不定服务器就给你来个OOM

解决方案

最好的办法就是使用自定义的线程池,主要有这么几种配置方法

1.在之前的源码分析中,我们可以知道,可以通过AsyncConfigurer来配置使用的线程池

如下:

import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.lang.NonNull;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
 
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
 

@Slf4j
@Component
public class AsyncConfig implements AsyncConfigurer {
 
    @Override
    public Executor getAsyncExecutor() {
        MdcThreadPoolTaskExecutor executor = new MdcThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(200);
        executor.seTKEepAliveSeconds(5 * 60);
        executor.setQueueCapacity(1000);
        // 自定义实现拒绝策略
        executor.setRejectedExecutionHandler((Runnable runnable, ThreadPoolExecutor exe) -> log.error("当前任务线程池队列已满."));
        // 或者选择已经定义好的其中一种拒绝策略
        // 丢弃任务并抛出RejectedExecutionException异常
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        // 丢弃任务,但是不抛出异常
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        // 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 由调用线程处理该任务
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
        // 线程名称前缀
        executor.setThreadNamePrefix("Async-");
        executor.initialize();
        return executor;
    }
 
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (ex, method, params) -> log.error("线程池执行任务发生未知异常.", ex);
    }
 
    
	public static class MdcThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
 
        
        private Map<String, String> getContextForTask() {
            return MDC.getCopyOfContextMap();
        }
 
        
        @Override
        public void execute(@NonNull Runnable command) {
            super.execute(wrap(command, getContextForTask()));
        }
 
        
        @NonNull
        @Override
        public Future<?> submit(@NonNull Runnable task) {
            return super.submit(wrap(task, getContextForTask()));
        }
 
        
        @NonNull
        @Override
        public <T> Future<T> submit(@NonNull Callable<T> task) {
            return super.submit(wrap(task, getContextForTask()));
        }
 
        
        private <T> Callable<T> wrap(final Callable<T> task, final Map<String, String> context) {
            return () -> {
                Map<String, String> previous = MDC.getCopyOfContextMap();
                if (context == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(context);
                }
                try {
                    return task.call();
                } finally {
                    if (previous == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(previous);
                    }
                }
            };
        }
 
        
        private Runnable wrap(final Runnable runnable, final Map<String, String> context) {
            return () -> {
                Map<String, String> previous = MDC.getCopyOfContextMap();
                if (context == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(context);
                }
                try {
                    runnable.run();
                } finally {
                    if (previous == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(previous);
                    }
                }
            };
        }
    }
 
}

该方式实现线程的复用以及,子线程继承父线程全链路traceId,方便定位问题

2.直接在@Async注解中配置要使用的线程池的名称

@Async(value = "自定义线程名")

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程网。

--结束END--

本文标题: springevent事件异步处理方式(发布,监听,异步处理)

本文链接: https://lsjlt.com/news/196081.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • springevent事件异步处理方式(发布,监听,异步处理)
    目录spring event 事件异步处理(发布,监听,异步处理)spring事件之异步线程执行Spring 事件发布监听的多种实现异步方法解决方案总结spring event 事件...
    99+
    2023-02-14
    spring event事件异步 event事件发布 event事件监听 event事件异步处理
  • Spring事件发布监听,顺序监听,异步监听方式
    目录1. Spring的事件通知2. Spring事件通知使用2.1 Spring的事件2.2 事件监听2.2.1 接口方式实现2.2.2 注解实现2.3 事件发布2.4 Sprin...
    99+
    2024-04-02
  • 如何实现Spring事件发布监听、顺序监听和异步监听
    这篇文章给大家分享的是有关如何实现Spring事件发布监听、顺序监听和异步监听的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。1. Spring的事件通知Spring的事件通知本质上就是发布-订阅,即生产者-消费者...
    99+
    2023-06-22
  • 处理异步事件的方式有哪些
    本篇内容介绍了“处理异步事件的方式有哪些”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!同步异步首先当然要先...
    99+
    2024-04-02
  • 详解SpringBoot实现事件同步与异步监听
    目录简介事件监听简述实例同步监听(无序)同步监听(有序)异步监听(无序)简介 说明 本文用示例介绍SpringBoot中的事件的用法及原理。 事件监听简述 事件的发布与监听从属于观察...
    99+
    2024-04-02
  • js异步处理方案,js的异步串行与异步并行
    目录js异步处理方案,js的异步串行与异步并行一、什么是串行,并行,并发二、实现异步串行三、实现异步并行js异步处理方案,js的异步串行与异步并行 一、什么是串行,并行,并发 串行:...
    99+
    2023-03-24
    javascript异步处理 js异步串行与并行
  • Spring的事件和监听器-同步与异步详解
    目录Spring的事件和监听器-同步与异步1、首先新建StartWorkflowEvent.java,2、新建一个监听器StartWorkflowListener.java3、创建一...
    99+
    2024-04-02
  • 用Golang函数实现异步事件处理
    go 函数可用于实现异步事件处理,其中涉及使用 goroutine 监听事件,并通过 channel 接收和处理事件。在实战案例中,goroutine 循环接收并处理从 channel ...
    99+
    2024-05-04
    golang
  • JavaScript异步处理的方式有哪些
    这篇“JavaScript异步处理的方式有哪些”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇...
    99+
    2024-04-02
  • PHP异常处理:处理异步操作中的异常
    处理异步操作中的 php 异常需要:协程中,使用 try-catch-finally 语法捕获异常。promise 中,使用 then() 和 catch() 方法处理异常。实战案例:使...
    99+
    2024-05-14
    异常处理 异步处理 swoole
  • Node.js 事件循环如何处理异步请求
    ...
    99+
    2024-04-02
  • JavaScript中的异步处理方法
    本篇内容介绍了“JavaScript中的异步处理方法”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!在 JavaScript 的世界中,所有代...
    99+
    2023-06-17
  • SpringBoot异步处理的四种实现方式
    本篇文章我们以SpringBoot中异步的使用(包括:异步调用和异步方法两个维度)来进行讲解。 异步请求与同步请求 我们先通过一张图来区分一下异步请求和同步请求的区别: 在上图...
    99+
    2024-04-02
  • JavaScript中怎么处理异步
    JavaScript中怎么处理异步,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。一、回调函数回调是一个函数被作为一个参数传递到另一个函数里,在...
    99+
    2024-04-02
  • Java SpringMVC异步处理详解
    目录1、本篇内容2、看段代码,分析问题3、springmvc 中异步处理4、模拟非超时请求5、模拟超时请求6、总结1、本篇内容 本文让大家掌握 springmvc 中异步处理请求,特...
    99+
    2024-04-02
  • PHP开发中如何处理异步请求和并发处理
    在Web开发中,经常会遇到需要处理大量并发请求和异步请求的情况。以PHP为例,我们可以利用一些技术和工具来处理这些需求,提高系统的性能和响应能力。本文将介绍如何处理异步请求和并发处理,并提供一些具体的代码示例。一、异步请求的处理使用Ajax...
    99+
    2023-10-21
    异步请求 PHP开发 并发处理
  • PHP异常处理:捕获和处理异步任务错误
    php中异常处理通过try-catch-finally块实现,允许捕获和处理异步任务错误,以确保系统稳定性。具体操作步骤包括:获取文件、处理文件、存储文件、捕获异常(如记录错误和发送电子...
    99+
    2024-05-14
    php 异常处理 代码可读性
  • 如何利用Redis和Perl 6开发异步事件处理功能
    如何利用Redis和Perl 6开发异步事件处理功能引言:随着互联网技术的不断发展和应用场景的不断增加,异步事件处理功能成为现代编程中不可或缺的一部分。而在异步事件处理中,Redis和Perl 6是两个强大的工具和语言,它们的结合能够为我们...
    99+
    2023-10-22
    redis Perl 异步事件处理
  • Spring注解驱动之ApplicationListener异步处理事件说明
    目录概述示例自定义事件多波器测试用例ApplicationListener异步执行源码分析概述 之前我们讲过简单使用ApplicationListener发布事件,处理事件,但是发现...
    99+
    2024-04-02
  • Springboot 异步任务和定时任务的异步处理
    目录1 前言2 异步任务设置3 定时任务配置4 总结1 前言 在 Springboot 中,异步任务和定时任务是经常遇到的处理问题方式,为了能够用好这两项配置,不干扰正常的业务,需要...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作